ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/deliantra/server/socket/lowlevel.C
Revision: 1.54
Committed: Sun Jul 29 02:24:34 2007 UTC (16 years, 10 months ago) by root
Content type: text/plain
Branch: MAIN
Changes since 1.53: +26 -20 lines
Log Message:
implement the most elaborate rate limit system to date, from the errors before:

- output-rate is now an upper bound only. its purpose is to give the user
  some control over bandwith use. it should be set too high rather than too low.
- the server will (on linux only, or systems that support tcpi), guess
  how much the kernel is willing to send without delay (this is imperfect as
  we do not know the remote receive window, but we assume its "large enough").
- we accuretyl measure mss and ensure that preferably mss-sized packets
  leave the server, by sending less in some cases and more in others
  to reacht eh desired bandwidth goal
  (e.g. 15000 == http://ue.tst.eu/545350740128735b13aaf541c88bfaf2.txt)

the net effect is that the server will never send (much) more data than the kernel
thinks the network is able to handle. that is, when the connection was idle for a time
and the congestion window is small, we will only start sending small amounts of
data, prompting the kernel to accuratly model the bandwidth.

in essence, this creates a tcp stream that never has more data buffered
than neccessary for in-flight data, ensuring that we can get low-latency
map updates through to the client whole using all excess bandwidth the
network can handle.

I mostly tested with netem, e.g.

   ifconfig lo mtu 1500
   tc qdisc change dev lo root netem delay 190ms 10ms drop 0.1

gave me roughtly 20kb/s throughput even though output-rate was 100kb/s,
without stalling the conenction even when downloading backgorund music and
other large chunks of data.

File Contents

# Content
1 /*
2 * This file is part of Crossfire TRT, the Roguelike Realtime MORPG.
3 *
4 * Copyright (©) 2005,2006,2007 Marc Alexander Lehmann / Robin Redeker / the Crossfire TRT team
5 * Copyright (©) 1992,2007 Frank Tore Johansen
6 *
7 * Crossfire TRT is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 *
20 * The authors can be reached via e-mail to <crossfire@schmorp.de>
21 */
22
23 using namespace std;
24
25 #include <global.h>
26 #include <sproto.h>
27 #include <cstdarg>
28
29 #if HAVE_TCP_INFO
30 # include <sys/types.h>
31 # include <sys/socket.h>
32 # include <netinet/in.h>
33 # include <netinet/tcp.h>
34 #endif
35
36 // disconnect a socket after this many seconds without an ack
37 #define SOCKET_TIMEOUT 8.
38
39 // force a packet when idle for more than this many seconds,
40 // forcing an ack regularly.
41 #define IDLE_PING 2.
42
43 void
44 client::flush ()
45 {
46 if (destroyed ())
47 return;
48
49 #if HAVE_TCP_INFO
50 // check time of last ack, and, if too old, kill connection
51 socklen_t len = sizeof (tcpi);
52
53 if (!getsockopt (fd, IPPROTO_TCP, TCP_INFO, &tcpi, &len) && len == sizeof (tcpi))
54 {
55 if (tcpi.tcpi_snd_mss)
56 mss = tcpi.tcpi_snd_mss;
57
58 #if 0
59 fprintf (stderr, "uack %d ack %d lost %d ret %d fack %d sst %d cwnd %d mss %d pmtu %d advmss %d EXC %d\n",
60 tcpi.tcpi_unacked,
61 tcpi.tcpi_sacked,
62 tcpi.tcpi_lost,
63 tcpi.tcpi_retrans,
64 tcpi.tcpi_fackets,
65 tcpi.tcpi_snd_ssthresh, tcpi.tcpi_snd_cwnd, tcpi.tcpi_advmss, tcpi.tcpi_pmtu, tcpi.tcpi_advmss,
66
67 tcpi.tcpi_snd_cwnd - (tcpi.tcpi_unacked - tcpi.tcpi_sacked));
68 #endif
69
70 // do this only when player is active
71 if (pl && pl->active
72 && tcpi.tcpi_last_ack_recv > int (SOCKET_TIMEOUT * 1000))
73 {
74 send_msg (NDI_RED | NDI_REPLY, "connection-timeout", "safety disconnect due to tcp/ip timeout (no packets received)");
75 write_outputbuffer ();
76
77 LOG (llevDebug, "connection on fd %d closed due to ack timeout (%u/%u/%u)\n", fd,
78 (unsigned)tcpi.tcpi_last_ack_recv, (unsigned)tcpi.tcpi_last_data_sent, (unsigned)tcpi.tcpi_unacked);
79 destroy ();
80 }
81 }
82 #endif
83
84 /**
85 * Writes data to socket.
86 *
87 * When the socket is clear to write, and we have backlogged data, this
88 * is called to write it out.
89 */
90
91 // write a nop to the socket at least every IDLE_NOP seconds.
92 if (!outputbuffer.len)
93 {
94 if (last_send + IDLE_PING <= NOW && pl && pl->active)
95 {
96 // this is a bit ugly, but map1/map1a seem to be the only
97 // nop'able commands and they are quite small.
98 packet sl (mapmode == Map1Cmd ? "map1" : "map1a");
99 send_packet (sl);
100 }
101 else
102 return;
103 }
104
105 if (socket_ev.poll () & PE_W)
106 return;
107
108 last_send = NOW;
109 write_outputbuffer ();
110 }
111
112 void
113 client::write_outputbuffer ()
114 {
115 while (outputbuffer.len)
116 {
117 int res = write (fd, outputbuffer.data + outputbuffer.start,
118 min (outputbuffer.len, SOCKETBUFSIZE - outputbuffer.start));
119
120 if (res > 0)
121 {
122 outputbuffer.start += res;
123 /* wrap back to start of buffer */
124 if (outputbuffer.start == SOCKETBUFSIZE)
125 outputbuffer.start = 0;
126
127 outputbuffer.len -= res;
128 }
129 else if (res == 0)
130 {
131 LOG (llevError, "socket write failed, connection closed.\n");
132 destroy ();
133 return;
134 }
135 else if (errno == EINTR)
136 {
137 // just retry
138 }
139 else if (errno == EAGAIN)
140 {
141 // delay till ready
142 socket_ev.poll (socket_ev.poll () | PE_W);
143 socket_ev.start ();
144 return;
145 }
146 else
147 {
148 LOG (llevError, "socket write failed: %s\n", strerror (errno));
149 destroy ();
150 return;
151 }
152 }
153
154 socket_ev.poll (socket_ev.poll () & ~PE_W);
155 }
156
157 /******************************************************************************
158 *
159 * Start of read routines.
160 *
161 ******************************************************************************/
162
163 int
164 client::next_packet ()
165 {
166 if (inbuf_len >= 2)
167 {
168 int pkt_len = (inbuf [0] << 8) | inbuf [1];
169
170 if (inbuf_len >= 2 + pkt_len)
171 return 2 + pkt_len;
172
173 if (inbuf_len == sizeof (inbuf))
174 {
175 send_packet_printf ("drawinfo %d input buffer overflow - closing connection.", NDI_RED | NDI_REPLY);
176 destroy ();
177 return -1;
178 }
179 }
180
181 return 0;
182 }
183
184 void
185 client::skip_packet (int len)
186 {
187 inbuf_len -= len;
188 memmove (inbuf, inbuf + len, inbuf_len);
189 }
190
191 /*****************************************************************************
192 * Start of command dispatch area.
193 * The commands here are protocol commands.
194 ****************************************************************************/
195
196 // SocketCommand, PlayingCommand, should not exist with those ugly casts
197 #define SC(cb) (void *)static_cast<void (*)(char *, int, client *)>(cb),
198 #define PC(cb) (void *)static_cast<void (*)(char *, int, player *)>(cb), PF_PLAYER |
199
200 /**
201 * Dispatch table for the server.
202 */
203 static struct packet_type packets[] = {
204 {"ncom", PC(NewPlayerCmd) PF_PLAYING | PF_COMMAND6 },
205 {"command", PC(PlayerCmd) PF_PLAYING | PF_COMMAND0 },
206
207 {"examine", PC(ExamineCmd) PF_PLAYING },
208 {"ex", PC(ExCmd) PF_PLAYING },
209 {"apply", PC(ApplyCmd) PF_PLAYING },
210 {"lookat", PC(LookAt) PF_PLAYING },
211 {"lock", PC(LockItem) PF_PLAYING },
212 {"mark", PC(MarkItem) PF_PLAYING },
213 {"move", PC(MoveCmd) PF_PLAYING },
214 {"ext", PC(ExtCmd) 0 }, // CF+
215 {"mapredraw", PC(MapRedrawCmd) 0 },
216 {"mapinfo", PC(MapInfoCmd) 0 }, // CF+
217
218 {"reply", SC(ReplyCmd) 0 },
219 {"exti", SC(ExtiCmd) 0 }, // CF+
220 {"addme", SC(AddMeCmd) 0 },
221 {"askface", SC(AskFaceCmd) 0 },
222 {"requestinfo", SC(RequestInfo) 0 },
223 {"setfacemode", SC(SetFaceMode) 0 },
224 {"setsound", SC(SetSound) 0 },
225 {"setup", SC(SetUp) 0 },
226 {"version", SC(VersionCmd) 0 },
227 {"toggleextendedinfos", SC(ToggleExtendedInfos) 0 }, /*Added: tchize */
228 {"toggleextendedtext", SC(ToggleExtendedText) 0 }, /*Added: tchize */
229 {"asksmooth", SC(AskSmooth) 0 }, /*Added: tchize (smoothing technologies) */
230 };
231
232 bool
233 client::may_execute (const packet_type *pkt) const
234 {
235 return (!(pkt->flags & PF_PLAYER) || pl)
236 && (!(pkt->flags & PF_PLAYING) || state == ST_PLAYING);
237 }
238
239 // HACK: some commands currently should be executed
240 // even when the player is frozen. this hack detects
241 // those commands. it should be folded into may_execute,
242 // but kept seperate to emphasise the hack aspect, i.e.
243 // do it better, then remove.
244 static bool
245 always_immediate (const client *ns, const packet_type *pkt, const char *data, int len)
246 {
247 if (!(pkt->flags & (PF_COMMAND0 | PF_COMMAND6)))
248 return false;
249
250 if (!ns->pl || !ns->pl->ob || !ns->pl->ob->map)
251 return false;
252
253 if (pkt->flags & PF_COMMAND6)
254 {
255 data += 6;
256 len -= 6;
257 }
258
259 if (len > 4 && !strncmp (data, "say " , 4))
260 return true;
261 if (len > 5 && !strncmp (data, "chat ", 5))
262 return true;
263
264 return false;
265 }
266
267 void
268 client::execute (const packet_type *pkt, char *data, int datalen)
269 {
270 if (may_execute (pkt) || always_immediate (this, pkt, data, datalen))
271 {
272 //TODO: only one format
273 if (pkt->flags & PF_PLAYER)
274 ((void (*)(char *, int, player *))pkt->cb)((char *)data, datalen, pl);
275 else
276 ((void (*)(char *, int, client *))pkt->cb)((char *)data, datalen, this);
277 }
278 else
279 send_packet_printf ("drawinfo %d ERROR: you cannot execute '%s' now.", NDI_RED | NDI_REPLY, pkt->name);
280 }
281
282 bool
283 client::handle_packet ()
284 {
285 int pkt_len = next_packet ();
286
287 if (!pkt_len)
288 return false;
289 else if (pkt_len < 0)
290 {
291 LOG (llevError, "read error on player %s\n",
292 pl && pl->ob ? &pl->ob->name : "[anonymous]");
293 destroy ();
294 return false;
295 }
296
297 inbuf [pkt_len] = 0; /* Terminate buffer - useful for string data */
298
299 /* First, break out beginning word. There are at least
300 * a few commands that do not have any paremeters. If
301 * we get such a command, don't worry about trying
302 * to break it up.
303 */
304 int datalen;
305 char *data = strchr ((char *)inbuf + 2, ' ');
306
307 if (data)
308 {
309 *data++ = 0;
310 datalen = pkt_len - (data - (char *)inbuf);
311 }
312 else
313 {
314 data = (char *)inbuf + 2; // better read garbage than segfault
315 datalen = 0;
316 }
317
318 for (packet_type *pkt = packets; pkt < packets + (sizeof (packets) / sizeof (packets[0])); ++pkt)
319 if (!strcmp ((char *)inbuf + 2, pkt->name))
320 {
321 if (pkt->flags & PF_PLAYER && !always_immediate (this, pkt, data, datalen))
322 queue_command (pkt, data, datalen);
323 else
324 execute (pkt, data, datalen);
325
326 goto next_packet;
327 }
328
329 // If we get here, we didn't find a valid command.
330 send_packet_printf ("drawinfo %d ERROR: command '%s' not supported.", NDI_RED | NDI_REPLY, (char *)inbuf + 2);
331 next_packet:
332 skip_packet (pkt_len);
333
334 // input buffer has space again
335 socket_ev.poll (socket_ev.poll () | PE_R);
336
337 return true;
338 }
339
340 // callback called when socket is either readable or writable
341 void
342 client::socket_cb (iow &w, int got)
343 {
344 //TODO remove when we have better socket cleanup logic
345 if (destroyed ())
346 {
347 socket_ev.poll (0);
348 return;
349 }
350
351 if (got & PE_W)
352 {
353 write_outputbuffer ();
354
355 if (!outputbuffer.len)
356 socket_ev.poll (socket_ev.poll () & ~PE_W);
357 }
358
359 if (got & PE_R)
360 {
361 //TODO: rate-limit tcp connection in better ways, important
362
363 int amount = sizeof (inbuf) - inbuf_len;
364
365 if (!amount)
366 {
367 // input buffer full
368 socket_ev.poll (socket_ev.poll () & ~PE_R);
369 return;
370 }
371
372 amount = read (fd, inbuf + inbuf_len, amount);
373
374 if (!amount)
375 {
376 destroy ();
377 return;
378 }
379 else if (amount < 0)
380 {
381 if (errno != EAGAIN && errno != EINTR)
382 {
383 LOG (llevError, "read error: %s\n", strerror (errno));
384 destroy ();
385 return;
386 }
387
388 // should not be here, normally
389 }
390 else
391 {
392 inbuf_len += amount;
393
394 cmd_ev.start ();
395 }
396 }
397 }
398
399 // called whenever we have additional commands to process
400 void
401 client::cmd_cb (iw &w)
402 {
403 if (handle_packet ())
404 w.start ();
405 else
406 flush ();
407 }
408
409 /*******************************************************************************
410 *
411 * Start of write related routines.
412 *
413 ******************************************************************************/
414
415 /**
416 * Adds data to a socket buffer for whatever reason.
417 *
418 * ns is the socket we are adding the data to, buf is the start of the
419 * data, and len is the number of bytes to add.
420 */
421 void
422 client::send (void *buf_, int len)
423 {
424 char *buf = (char *)buf_;
425
426 if (destroyed () || !buf)
427 return;
428
429 if (len + outputbuffer.len > SOCKETBUFSIZE)
430 {
431 LOG (llevDebug, "socket on fd %d has overrun internal buffer - marking as dead\n", fd);
432 // shutdown the socket, this is safer than destroying it immediately
433 // as lots of code in the callchain might still access the map etc.
434 shutdown (fd, SHUT_RDWR);
435 return;
436 }
437
438 int avail, end;
439
440 /* data + end is where we start putting the new data. The last byte
441 * currently in use is actually data + end -1
442 */
443 end = outputbuffer.start + outputbuffer.len;
444 /* The buffer is already in a wrapped state, so adjust end */
445 if (end >= SOCKETBUFSIZE)
446 end -= SOCKETBUFSIZE;
447
448 avail = SOCKETBUFSIZE - end;
449
450 /* We can all fit it behind the current data without wrapping */
451 if (avail >= len)
452 memcpy (outputbuffer.data + end, buf, len);
453 else
454 {
455 memcpy (outputbuffer.data + end, buf, avail);
456 memcpy (outputbuffer.data, buf + avail, len - avail);
457 }
458
459 outputbuffer.len += len;
460 }
461
462 /**
463 * Takes a string of data, and writes it out to the socket. A very handy
464 * shortcut function.
465 */
466 void
467 client::send_packet (packet &sl)
468 {
469 if (destroyed ())
470 return;
471
472 if (sl.length () >= MAXSOCKBUF)
473 {
474 LOG (llevError, "Trying to send a buffer beyond properly size, len =%d\n", sl.length ());
475 /* Almost certainly we've overflowed a buffer, so quit now to make
476 * it easier to debug.
477 */
478 abort ();
479 }
480
481 if (!sl.length ())
482 return;
483
484 assert (sl.hdrlen == 2);
485
486 sl.buf_ [0] = sl.length () >> 8;
487 sl.buf_ [1] = sl.length () ;
488
489 send (sl.buf_, sl.length () + sl.hdrlen);
490 }
491
492 void
493 client::send_packet (const char *buf, int len)
494 {
495 packet sl;
496
497 sl << data (buf, len);
498 send_packet (sl);
499 }
500
501 void
502 client::send_packet (const char *buf)
503 {
504 send_packet (buf, strlen (buf));
505 }
506
507 void
508 client::send_packet_printf (const char *format, ...)
509 {
510 packet sl;
511
512 va_list ap;
513 va_start (ap, format);
514 sl.vprintf (format, ap);
515 va_end (ap);
516
517 send_packet (sl);
518 }
519
520 // returns true when the message needs special (read: perl) treatment
521 static bool
522 msg_is_special (const char *msg)
523 {
524 return msg [strcspn (msg, "<[&\n")];
525 }
526
527 void
528 client::send_msg (int color, const char *type, const char *msg)
529 {
530 if (msg_is_special (msg))
531 cfperl_send_msg (this, color, type, msg);
532 else if (can_msg)
533 send_packet_printf ("msg %d %s %s", color & NDI_CLIENT_MASK, type, msg);
534 else if (color < 0)
535 return; // client cannot handle this
536 else
537 send_packet_printf ("drawinfo %d %s", color & NDI_COLOR_MASK, msg);
538 }
539
540 void
541 client::send_drawinfo (const char *msg, int flags)
542 {
543 send_msg (flags, "log", msg);
544 }
545
546 /***********************************************************************
547 *
548 * packet functions/utilities
549 *
550 **********************************************************************/
551
552 packet::packet (const char *name)
553 {
554 reset ();
555
556 int len = strlen (name);
557 memcpy (cur, name, len); cur += len;
558 *cur++ = ' ';
559 }
560
561 packet &packet::operator <<(const ber32 v)
562 {
563 enum { maxlen = 32 / 7 + 1};
564 uint8 buf[maxlen];
565 uint8 *p = buf + maxlen;
566 uint32 val = v.val;
567
568 *--p = val & 0x7F;
569
570 while (val > 0x7F)
571 {
572 val >>= 7;
573 *--p = (val & 0x7F) | 0x80;
574 }
575
576 return *this << data (p, buf + maxlen - p);
577 }
578
579 packet &packet::operator <<(const data &v)
580 {
581 if (room () < v.len)
582 reset ();
583 else
584 {
585 if (v.len)
586 {
587 memcpy (cur, v.ptr, v.len);
588 cur += v.len;
589 }
590 }
591
592 return *this;
593 }
594
595 packet &packet::operator <<(const data8 &v)
596 {
597 unsigned int len = min (v.len, 0x00FF);
598 return *this << uint8 (len) << data (v.ptr, len);
599 }
600
601 packet &packet::operator <<(const data16 &v)
602 {
603 unsigned int len = min (v.len, 0xFFFF);
604 return *this << uint16 (len) << data (v.ptr, len);
605 }
606
607 packet &packet::operator <<(const char *v)
608 {
609 return *this << data (v, strlen (v ? v : 0));
610 }
611
612 void
613 packet::vprintf (const char *format, va_list ap)
614 {
615 int size = room ();
616
617 int len = vsnprintf ((char *)cur, size, format, ap);
618
619 if (len >= size)
620 return reset ();
621
622 cur += len;
623 }
624