ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/deliantra/server/socket/lowlevel.C
Revision: 1.54
Committed: Sun Jul 29 02:24:34 2007 UTC (16 years, 9 months ago) by root
Content type: text/plain
Branch: MAIN
Changes since 1.53: +26 -20 lines
Log Message:
implement the most elaborate rate limit system to date, from the errors before:

- output-rate is now an upper bound only. its purpose is to give the user
  some control over bandwith use. it should be set too high rather than too low.
- the server will (on linux only, or systems that support tcpi), guess
  how much the kernel is willing to send without delay (this is imperfect as
  we do not know the remote receive window, but we assume its "large enough").
- we accuretyl measure mss and ensure that preferably mss-sized packets
  leave the server, by sending less in some cases and more in others
  to reacht eh desired bandwidth goal
  (e.g. 15000 == http://ue.tst.eu/545350740128735b13aaf541c88bfaf2.txt)

the net effect is that the server will never send (much) more data than the kernel
thinks the network is able to handle. that is, when the connection was idle for a time
and the congestion window is small, we will only start sending small amounts of
data, prompting the kernel to accuratly model the bandwidth.

in essence, this creates a tcp stream that never has more data buffered
than neccessary for in-flight data, ensuring that we can get low-latency
map updates through to the client whole using all excess bandwidth the
network can handle.

I mostly tested with netem, e.g.

   ifconfig lo mtu 1500
   tc qdisc change dev lo root netem delay 190ms 10ms drop 0.1

gave me roughtly 20kb/s throughput even though output-rate was 100kb/s,
without stalling the conenction even when downloading backgorund music and
other large chunks of data.

File Contents

# User Rev Content
1 elmex 1.1 /*
2 root 1.49 * This file is part of Crossfire TRT, the Roguelike Realtime MORPG.
3 pippijn 1.32 *
4 root 1.42 * Copyright (©) 2005,2006,2007 Marc Alexander Lehmann / Robin Redeker / the Crossfire TRT team
5     * Copyright (©) 1992,2007 Frank Tore Johansen
6     *
7 root 1.49 * Crossfire TRT is free software: you can redistribute it and/or modify
8     * it under the terms of the GNU General Public License as published by
9     * the Free Software Foundation, either version 3 of the License, or
10     * (at your option) any later version.
11 root 1.42 *
12 root 1.49 * This program is distributed in the hope that it will be useful,
13     * but WITHOUT ANY WARRANTY; without even the implied warranty of
14     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15     * GNU General Public License for more details.
16 root 1.42 *
17 root 1.49 * You should have received a copy of the GNU General Public License
18     * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 pippijn 1.32 *
20 root 1.42 * The authors can be reached via e-mail to <crossfire@schmorp.de>
21 pippijn 1.32 */
22 root 1.42
23 elmex 1.1 using namespace std;
24    
25     #include <global.h>
26     #include <sproto.h>
27 root 1.11 #include <cstdarg>
28 elmex 1.1
29 root 1.45 #if HAVE_TCP_INFO
30 elmex 1.1 # include <sys/types.h>
31     # include <sys/socket.h>
32     # include <netinet/in.h>
33     # include <netinet/tcp.h>
34     #endif
35    
36 root 1.43 // disconnect a socket after this many seconds without an ack
37     #define SOCKET_TIMEOUT 8.
38    
39     // force a packet when idle for more than this many seconds,
40     // forcing an ack regularly.
41     #define IDLE_PING 2.
42 elmex 1.1
43 root 1.5 void
44 root 1.22 client::flush ()
45 elmex 1.1 {
46 root 1.29 if (destroyed ())
47     return;
48    
49 root 1.45 #if HAVE_TCP_INFO
50 root 1.54 // check time of last ack, and, if too old, kill connection
51     socklen_t len = sizeof (tcpi);
52    
53     if (!getsockopt (fd, IPPROTO_TCP, TCP_INFO, &tcpi, &len) && len == sizeof (tcpi))
54 root 1.43 {
55 root 1.54 if (tcpi.tcpi_snd_mss)
56     mss = tcpi.tcpi_snd_mss;
57    
58     #if 0
59     fprintf (stderr, "uack %d ack %d lost %d ret %d fack %d sst %d cwnd %d mss %d pmtu %d advmss %d EXC %d\n",
60     tcpi.tcpi_unacked,
61     tcpi.tcpi_sacked,
62     tcpi.tcpi_lost,
63     tcpi.tcpi_retrans,
64     tcpi.tcpi_fackets,
65     tcpi.tcpi_snd_ssthresh, tcpi.tcpi_snd_cwnd, tcpi.tcpi_advmss, tcpi.tcpi_pmtu, tcpi.tcpi_advmss,
66    
67     tcpi.tcpi_snd_cwnd - (tcpi.tcpi_unacked - tcpi.tcpi_sacked));
68     #endif
69 elmex 1.1
70 root 1.54 // do this only when player is active
71     if (pl && pl->active
72     && tcpi.tcpi_last_ack_recv > int (SOCKET_TIMEOUT * 1000))
73 root 1.43 {
74 root 1.54 send_msg (NDI_RED | NDI_REPLY, "connection-timeout", "safety disconnect due to tcp/ip timeout (no packets received)");
75     write_outputbuffer ();
76 root 1.46
77 root 1.54 LOG (llevDebug, "connection on fd %d closed due to ack timeout (%u/%u/%u)\n", fd,
78     (unsigned)tcpi.tcpi_last_ack_recv, (unsigned)tcpi.tcpi_last_data_sent, (unsigned)tcpi.tcpi_unacked);
79     destroy ();
80 elmex 1.1 }
81     }
82 root 1.20 #endif
83    
84     /**
85     * Writes data to socket.
86     *
87     * When the socket is clear to write, and we have backlogged data, this
88     * is called to write it out.
89     */
90    
91 root 1.43 // write a nop to the socket at least every IDLE_NOP seconds.
92     if (!outputbuffer.len)
93     {
94 root 1.47 if (last_send + IDLE_PING <= NOW && pl && pl->active)
95 root 1.43 {
96     // this is a bit ugly, but map1/map1a seem to be the only
97     // nop'able commands and they are quite small.
98     packet sl (mapmode == Map1Cmd ? "map1" : "map1a");
99     send_packet (sl);
100     }
101     else
102     return;
103     }
104    
105     if (socket_ev.poll () & PE_W)
106 root 1.20 return;
107    
108 root 1.43 last_send = NOW;
109 root 1.20 write_outputbuffer ();
110     }
111 elmex 1.1
112 root 1.20 void
113 root 1.22 client::write_outputbuffer ()
114 root 1.20 {
115     while (outputbuffer.len)
116     {
117     int res = write (fd, outputbuffer.data + outputbuffer.start,
118     min (outputbuffer.len, SOCKETBUFSIZE - outputbuffer.start));
119 root 1.5
120 root 1.20 if (res > 0)
121     {
122     outputbuffer.start += res;
123     /* wrap back to start of buffer */
124     if (outputbuffer.start == SOCKETBUFSIZE)
125     outputbuffer.start = 0;
126    
127     outputbuffer.len -= res;
128     }
129     else if (res == 0)
130     {
131     LOG (llevError, "socket write failed, connection closed.\n");
132 root 1.23 destroy ();
133 root 1.20 return;
134     }
135     else if (errno == EINTR)
136     {
137     // just retry
138     }
139     else if (errno == EAGAIN)
140     {
141     // delay till ready
142     socket_ev.poll (socket_ev.poll () | PE_W);
143     socket_ev.start ();
144     return;
145     }
146     else
147     {
148     LOG (llevError, "socket write failed: %s\n", strerror (errno));
149 root 1.23 destroy ();
150 root 1.20 return;
151     }
152     }
153    
154     socket_ev.poll (socket_ev.poll () & ~PE_W);
155 elmex 1.1 }
156    
157 root 1.23 /******************************************************************************
158 elmex 1.1 *
159 root 1.23 * Start of read routines.
160 elmex 1.1 *
161 root 1.23 ******************************************************************************/
162 elmex 1.1
163 root 1.23 int
164     client::next_packet ()
165 elmex 1.1 {
166 root 1.23 if (inbuf_len >= 2)
167 root 1.11 {
168 root 1.23 int pkt_len = (inbuf [0] << 8) | inbuf [1];
169    
170     if (inbuf_len >= 2 + pkt_len)
171     return 2 + pkt_len;
172    
173     if (inbuf_len == sizeof (inbuf))
174 root 1.15 {
175 root 1.53 send_packet_printf ("drawinfo %d input buffer overflow - closing connection.", NDI_RED | NDI_REPLY);
176 root 1.23 destroy ();
177     return -1;
178 root 1.15 }
179 root 1.11 }
180 root 1.9
181 root 1.23 return 0;
182 root 1.11 }
183 root 1.9
184 root 1.23 void
185     client::skip_packet (int len)
186 root 1.11 {
187 root 1.23 inbuf_len -= len;
188     memmove (inbuf, inbuf + len, inbuf_len);
189 elmex 1.1 }
190    
191 root 1.23 /*****************************************************************************
192     * Start of command dispatch area.
193     * The commands here are protocol commands.
194     ****************************************************************************/
195    
196     // SocketCommand, PlayingCommand, should not exist with those ugly casts
197     #define SC(cb) (void *)static_cast<void (*)(char *, int, client *)>(cb),
198     #define PC(cb) (void *)static_cast<void (*)(char *, int, player *)>(cb), PF_PLAYER |
199    
200     /**
201     * Dispatch table for the server.
202     */
203     static struct packet_type packets[] = {
204 root 1.33 {"ncom", PC(NewPlayerCmd) PF_PLAYING | PF_COMMAND6 },
205     {"command", PC(PlayerCmd) PF_PLAYING | PF_COMMAND0 },
206 root 1.23
207     {"examine", PC(ExamineCmd) PF_PLAYING },
208 root 1.39 {"ex", PC(ExCmd) PF_PLAYING },
209 root 1.23 {"apply", PC(ApplyCmd) PF_PLAYING },
210     {"lookat", PC(LookAt) PF_PLAYING },
211     {"lock", PC(LockItem) PF_PLAYING },
212     {"mark", PC(MarkItem) PF_PLAYING },
213     {"move", PC(MoveCmd) PF_PLAYING },
214 root 1.39 {"ext", PC(ExtCmd) 0 }, // CF+
215     {"mapredraw", PC(MapRedrawCmd) 0 },
216     {"mapinfo", PC(MapInfoCmd) 0 }, // CF+
217 root 1.23
218 root 1.28 {"reply", SC(ReplyCmd) 0 },
219 root 1.39 {"exti", SC(ExtiCmd) 0 }, // CF+
220 root 1.26 {"addme", SC(AddMeCmd) 0 },
221 root 1.36 {"askface", SC(AskFaceCmd) 0 },
222 root 1.26 {"requestinfo", SC(RequestInfo) 0 },
223     {"setfacemode", SC(SetFaceMode) 0 },
224     {"setsound", SC(SetSound) 0 },
225     {"setup", SC(SetUp) 0 },
226     {"version", SC(VersionCmd) 0 },
227     {"toggleextendedinfos", SC(ToggleExtendedInfos) 0 }, /*Added: tchize */
228     {"toggleextendedtext", SC(ToggleExtendedText) 0 }, /*Added: tchize */
229     {"asksmooth", SC(AskSmooth) 0 }, /*Added: tchize (smoothing technologies) */
230 root 1.23 };
231    
232     bool
233     client::may_execute (const packet_type *pkt) const
234 elmex 1.1 {
235 root 1.23 return (!(pkt->flags & PF_PLAYER) || pl)
236 root 1.29 && (!(pkt->flags & PF_PLAYING) || state == ST_PLAYING);
237 root 1.11 }
238 root 1.9
239 root 1.33 // HACK: some commands currently should be executed
240     // even when the player is frozen. this hack detects
241     // those commands. it should be folded into may_execute,
242     // but kept seperate to emphasise the hack aspect, i.e.
243     // do it better, then remove.
244     static bool
245     always_immediate (const client *ns, const packet_type *pkt, const char *data, int len)
246     {
247     if (!(pkt->flags & (PF_COMMAND0 | PF_COMMAND6)))
248     return false;
249    
250     if (!ns->pl || !ns->pl->ob || !ns->pl->ob->map)
251     return false;
252    
253     if (pkt->flags & PF_COMMAND6)
254     {
255     data += 6;
256     len -= 6;
257     }
258    
259     if (len > 4 && !strncmp (data, "say " , 4))
260     return true;
261     if (len > 5 && !strncmp (data, "chat ", 5))
262     return true;
263    
264     return false;
265     }
266    
267 root 1.23 void
268     client::execute (const packet_type *pkt, char *data, int datalen)
269 root 1.11 {
270 root 1.33 if (may_execute (pkt) || always_immediate (this, pkt, data, datalen))
271 root 1.23 {
272     //TODO: only one format
273     if (pkt->flags & PF_PLAYER)
274     ((void (*)(char *, int, player *))pkt->cb)((char *)data, datalen, pl);
275     else
276     ((void (*)(char *, int, client *))pkt->cb)((char *)data, datalen, this);
277     }
278     else
279 root 1.53 send_packet_printf ("drawinfo %d ERROR: you cannot execute '%s' now.", NDI_RED | NDI_REPLY, pkt->name);
280 elmex 1.1 }
281    
282 root 1.23 bool
283     client::handle_packet ()
284 root 1.10 {
285 root 1.23 int pkt_len = next_packet ();
286    
287     if (!pkt_len)
288     return false;
289     else if (pkt_len < 0)
290     {
291     LOG (llevError, "read error on player %s\n",
292     pl && pl->ob ? &pl->ob->name : "[anonymous]");
293     destroy ();
294     return false;
295     }
296    
297     inbuf [pkt_len] = 0; /* Terminate buffer - useful for string data */
298    
299     /* First, break out beginning word. There are at least
300     * a few commands that do not have any paremeters. If
301     * we get such a command, don't worry about trying
302     * to break it up.
303     */
304     int datalen;
305     char *data = strchr ((char *)inbuf + 2, ' ');
306    
307     if (data)
308     {
309     *data++ = 0;
310     datalen = pkt_len - (data - (char *)inbuf);
311     }
312     else
313     {
314     data = (char *)inbuf + 2; // better read garbage than segfault
315     datalen = 0;
316     }
317 root 1.15
318 root 1.23 for (packet_type *pkt = packets; pkt < packets + (sizeof (packets) / sizeof (packets[0])); ++pkt)
319     if (!strcmp ((char *)inbuf + 2, pkt->name))
320     {
321 root 1.33 if (pkt->flags & PF_PLAYER && !always_immediate (this, pkt, data, datalen))
322 root 1.26 queue_command (pkt, data, datalen);
323     else
324 root 1.23 execute (pkt, data, datalen);
325    
326     goto next_packet;
327     }
328    
329 root 1.26 // If we get here, we didn't find a valid command.
330 root 1.53 send_packet_printf ("drawinfo %d ERROR: command '%s' not supported.", NDI_RED | NDI_REPLY, (char *)inbuf + 2);
331 root 1.23 next_packet:
332     skip_packet (pkt_len);
333 root 1.11
334 root 1.23 // input buffer has space again
335     socket_ev.poll (socket_ev.poll () | PE_R);
336 root 1.10
337 root 1.23 return true;
338 root 1.10 }
339    
340 root 1.23 // callback called when socket is either readable or writable
341     void
342     client::socket_cb (iow &w, int got)
343     {
344     //TODO remove when we have better socket cleanup logic
345 root 1.29 if (destroyed ())
346 root 1.23 {
347     socket_ev.poll (0);
348     return;
349     }
350 elmex 1.1
351 root 1.23 if (got & PE_W)
352 root 1.5 {
353 root 1.23 write_outputbuffer ();
354    
355     if (!outputbuffer.len)
356     socket_ev.poll (socket_ev.poll () & ~PE_W);
357     }
358 root 1.12
359 root 1.23 if (got & PE_R)
360     {
361     //TODO: rate-limit tcp connection in better ways, important
362 pippijn 1.8
363 root 1.12 int amount = sizeof (inbuf) - inbuf_len;
364    
365 root 1.23 if (!amount)
366 root 1.5 {
367 root 1.23 // input buffer full
368     socket_ev.poll (socket_ev.poll () & ~PE_R);
369     return;
370 root 1.3 }
371 elmex 1.1
372 root 1.12 amount = read (fd, inbuf + inbuf_len, amount);
373    
374     if (!amount)
375 root 1.5 {
376 root 1.23 destroy ();
377     return;
378 root 1.5 }
379 root 1.12 else if (amount < 0)
380 root 1.5 {
381 root 1.12 if (errno != EAGAIN && errno != EINTR)
382 root 1.5 {
383 root 1.20 LOG (llevError, "read error: %s\n", strerror (errno));
384 root 1.23 destroy ();
385     return;
386 root 1.3 }
387 root 1.12
388 root 1.23 // should not be here, normally
389 root 1.3 }
390 root 1.23 else
391     {
392     inbuf_len += amount;
393 root 1.12
394 root 1.23 cmd_ev.start ();
395     }
396 root 1.5 }
397 root 1.12 }
398    
399 root 1.23 // called whenever we have additional commands to process
400 root 1.12 void
401 root 1.23 client::cmd_cb (iw &w)
402 root 1.12 {
403 root 1.26 if (handle_packet ())
404 root 1.23 w.start ();
405     else
406     flush ();
407 elmex 1.1 }
408    
409     /*******************************************************************************
410     *
411     * Start of write related routines.
412     *
413     ******************************************************************************/
414    
415     /**
416     * Adds data to a socket buffer for whatever reason.
417     *
418     * ns is the socket we are adding the data to, buf is the start of the
419     * data, and len is the number of bytes to add.
420     */
421 root 1.20 void
422 root 1.22 client::send (void *buf_, int len)
423 root 1.20 {
424     char *buf = (char *)buf_;
425 elmex 1.1
426 root 1.29 if (destroyed () || !buf)
427 root 1.23 return;
428 elmex 1.1
429 root 1.31 if (len + outputbuffer.len > SOCKETBUFSIZE)
430 root 1.5 {
431 root 1.23 LOG (llevDebug, "socket on fd %d has overrun internal buffer - marking as dead\n", fd);
432 root 1.36 // shutdown the socket, this is safer than destroying it immediately
433     // as lots of code in the callchain might still access the map etc.
434     shutdown (fd, SHUT_RDWR);
435 root 1.5 return;
436     }
437    
438 root 1.20 int avail, end;
439    
440 root 1.5 /* data + end is where we start putting the new data. The last byte
441     * currently in use is actually data + end -1
442     */
443 root 1.20 end = outputbuffer.start + outputbuffer.len;
444 root 1.5 /* The buffer is already in a wrapped state, so adjust end */
445     if (end >= SOCKETBUFSIZE)
446     end -= SOCKETBUFSIZE;
447 root 1.7
448 root 1.5 avail = SOCKETBUFSIZE - end;
449 elmex 1.1
450 root 1.5 /* We can all fit it behind the current data without wrapping */
451     if (avail >= len)
452 root 1.20 memcpy (outputbuffer.data + end, buf, len);
453 root 1.5 else
454     {
455 root 1.20 memcpy (outputbuffer.data + end, buf, avail);
456     memcpy (outputbuffer.data, buf + avail, len - avail);
457 elmex 1.1 }
458 root 1.7
459 root 1.20 outputbuffer.len += len;
460 elmex 1.1 }
461    
462     /**
463     * Takes a string of data, and writes it out to the socket. A very handy
464     * shortcut function.
465     */
466 root 1.13 void
467 root 1.22 client::send_packet (packet &sl)
468 root 1.13 {
469 root 1.29 if (destroyed ())
470 root 1.19 return;
471    
472     if (sl.length () >= MAXSOCKBUF)
473     {
474     LOG (llevError, "Trying to send a buffer beyond properly size, len =%d\n", sl.length ());
475     /* Almost certainly we've overflowed a buffer, so quit now to make
476     * it easier to debug.
477     */
478     abort ();
479     }
480    
481     if (!sl.length ())
482     return;
483    
484     assert (sl.hdrlen == 2);
485    
486     sl.buf_ [0] = sl.length () >> 8;
487     sl.buf_ [1] = sl.length () ;
488    
489     send (sl.buf_, sl.length () + sl.hdrlen);
490 root 1.13 }
491    
492 root 1.5 void
493 root 1.22 client::send_packet (const char *buf, int len)
494 elmex 1.1 {
495 root 1.14 packet sl;
496 elmex 1.1
497 root 1.13 sl << data (buf, len);
498     send_packet (sl);
499 elmex 1.1 }
500    
501 root 1.13 void
502 root 1.22 client::send_packet (const char *buf)
503 root 1.13 {
504     send_packet (buf, strlen (buf));
505     }
506 elmex 1.1
507 root 1.23 void
508     client::send_packet_printf (const char *format, ...)
509     {
510     packet sl;
511    
512     va_list ap;
513     va_start (ap, format);
514     sl.vprintf (format, ap);
515     va_end (ap);
516    
517     send_packet (sl);
518     }
519    
520 root 1.48 // returns true when the message needs special (read: perl) treatment
521     static bool
522     msg_is_special (const char *msg)
523     {
524     return msg [strcspn (msg, "<[&\n")];
525 root 1.35 }
526    
527 root 1.40 void
528     client::send_msg (int color, const char *type, const char *msg)
529     {
530 root 1.48 if (msg_is_special (msg))
531     cfperl_send_msg (this, color, type, msg);
532     else if (can_msg)
533 root 1.53 send_packet_printf ("msg %d %s %s", color & NDI_CLIENT_MASK, type, msg);
534 root 1.40 else if (color < 0)
535     return; // client cannot handle this
536     else
537 root 1.53 send_packet_printf ("drawinfo %d %s", color & NDI_COLOR_MASK, msg);
538 root 1.40 }
539    
540 root 1.48 void
541     client::send_drawinfo (const char *msg, int flags)
542     {
543     send_msg (flags, "log", msg);
544     }
545    
546 root 1.23 /***********************************************************************
547     *
548     * packet functions/utilities
549     *
550     **********************************************************************/
551    
552 root 1.27 packet::packet (const char *name)
553     {
554     reset ();
555    
556     int len = strlen (name);
557     memcpy (cur, name, len); cur += len;
558     *cur++ = ' ';
559     }
560    
561 root 1.37 packet &packet::operator <<(const ber32 v)
562     {
563     enum { maxlen = 32 / 7 + 1};
564     uint8 buf[maxlen];
565     uint8 *p = buf + maxlen;
566     uint32 val = v.val;
567    
568 root 1.38 *--p = val & 0x7F;
569    
570 root 1.37 while (val > 0x7F)
571     {
572 root 1.38 val >>= 7;
573 root 1.37 *--p = (val & 0x7F) | 0x80;
574     }
575    
576     return *this << data (p, buf + maxlen - p);
577     }
578    
579 root 1.23 packet &packet::operator <<(const data &v)
580     {
581     if (room () < v.len)
582     reset ();
583     else
584     {
585     if (v.len)
586     {
587     memcpy (cur, v.ptr, v.len);
588     cur += v.len;
589     }
590     }
591    
592     return *this;
593     }
594    
595     packet &packet::operator <<(const data8 &v)
596     {
597     unsigned int len = min (v.len, 0x00FF);
598     return *this << uint8 (len) << data (v.ptr, len);
599     }
600    
601     packet &packet::operator <<(const data16 &v)
602     {
603     unsigned int len = min (v.len, 0xFFFF);
604     return *this << uint16 (len) << data (v.ptr, len);
605     }
606    
607     packet &packet::operator <<(const char *v)
608     {
609     return *this << data (v, strlen (v ? v : 0));
610     }
611    
612     void
613     packet::vprintf (const char *format, va_list ap)
614     {
615     int size = room ();
616    
617     int len = vsnprintf ((char *)cur, size, format, ap);
618    
619     if (len >= size)
620     return reset ();
621    
622     cur += len;
623     }
624