ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/docs/event.sdf
Revision: 1.1
Committed: Mon Mar 19 11:08:50 2001 UTC (23 years, 2 months ago) by root
Branch: MAIN
CVS Tags: HEAD
Log Message:
*** empty log message ***

File Contents

# Content
1 !init OPT_STYLE="paper"
2
3 !define DOC_NAME "Einführung in Event::"
4 !define DOC_TYPE "[Vortrag]"
5 !define DOC_AUTHOR "(c) 2000 Marc Lehmann <pcg@goof.com>"
6 !build_title
7
8 !block abstract
9
10 Wenn viele Jobs parallel ausgeführt werden sollen, eignet sich das
11 bekannte fork-Paradigma von Unix nicht mehr: Die Interprozeßkommunikation
12 und der Mehraufwand an Speicher und Ressourcen überwiegt dir Vorteile der
13 einfacheren Programmstruktur bei weitem. Diese kurze Einführung in die
14 Ereignis-gesteuerte Programmierung in Perl zeigt an einem konkreten Beispiel
15 (News-Scanner), wie einfach sich selbst komplexe Strukturen in Perl
16 realisieren lassen.
17
18 !endblock
19
20 H1: C<Event> in der Praxis --- oder wie man 500 Newsserver gleichzeitig scannt.
21
22 H2: Ereignis-gesteuerte Programmierung?
23
24 Zur Lösung paralleler ablaufender Prozesse sind heute drei Ansätze
25 gebräuchlich:
26
27 * Prozesse mit getrenntem Adressraum (z.B. mit C<fork>)
28 * eng gekoppelte Prozesse mit gemeinsamen Adreßraum (z.B. mit {{1:pthreads}})
29 * Ereignis-gesteuerte Prozesse
30
31 Jeder dieser Ansätze hat verschiedene Vor- und Nachteile: Das
32 C<fork>-Modell ist sehr einfach zu programmieren und eignet sich
33 besonders für einfache Probleme, die sozusagen in kleine "Stückzahlen"
34 anfallen. Durch die Abschottung der Prozesse wird eine einfach
35 Parallelisierung möglich, da die Prozesse (z.B.) auf unterschiedlichen
36 Rechnern arbeiten können. Größter Nachteil ist die relative aufwendige
37 Interprozeßkommunikation, die einen großen Overhead nach sich ziehen
38 kann.
39
40 {{1:Threads}} werden vielfach als das Mittel der Wahl angesehen. Der größte
41 Vorteil von Threads ist das Vorhandensein mehrere Ablauf-Instanzen,
42 die getrennt blockieren können. Leider werden Threads in den meisten
43 Fällen nur dazu mißbraucht, das Blockieren des gesamten Prozesses zu
44 verhindern (z.B. wenn Daten nicht sofort zur Verfügung stehen), werden
45 also effektiv nur als Krücke für asynchrone-EA verwendet. Diesen Vorteil
46 erkauft man sich durch eine zwar schnelle aber dafür extrem komplizierte
47 Synchronisation innerhalb der Threads. {{1:Threads sind in in den seltensten
48 Fällen die richtige Wahl für ein Problem.}}
49
50 Ereignis-gesteuerte Programmierung beruht auf dem
51 {{1:Callback}}-Prinzip: Eine zentrale Anlaufstelle innerhalb des Prozesses
52 wartet auf Ereignisse (engl. "Events", also z.B. "Daten angekommen",
53 "Zeit abgelaufen" etc...). Je nach Ereignis werden entsprechende
54 Callback-Funktionen aufgerufen. Der Vorteil dieses Ansatzes ist eine
55 übersichtliche Programmstruktur, eine extreme schnelle Kommunikation
56 (nur ein Prozeß) und ein ressourcenschonendes Endprodukt. Auch dieser
57 Ansatz hat seine Nachteile. Der größte ist, daß man bei vielen Problemen
58 "Umdenken" muß, da sich Callbacks eben keine lineare Programmstruktur
59 verwirklichen läßt ({{1:Closures}} können dabei jedoch helfen). Außerdem
60 muß man sich bewußt sein, das ein blockierender Funktionsaufruf
61 (z.B. C<read>) das gesamte Programm anhält.
62
63 H2: Das Problem...
64
65 {{...}}ist oberflächlich betrachtet, recht einfach: Eine (kleine) Menge
66 von Usenet-Servern soll nach Newsgruppen abgesucht werden. Das kann
67 auf faire Weise geschehen: man öffnet eine NNTP-Verbindung und schickt
68 Requests. Dies läßt sich durch Pipelining (senden mehrere Befehle
69 gleichzeitig) beschleunigen. Durch die Zeiten, die der News-Server
70 benötigt um Artikel zu suchen, wird die Datenrate in der Praxis allerdings
71 drastisch beschränkt.
72
73 Also die unfaire Weise: statt einer öffnet man 5, 10 oder gleich mehrere
74 hundert Verbindungen zu einem (oder mehreren) Servern und verteilt so die
75 Verbindungslatenz und die Antwortzeit.
76
77 H2: Die Planung
78
79 Die (für mich) naheligende Idee, dies mit mehreren Scanprozessen zu
80 implementieren, scheiterte an zwei Problemen:
81
82 * Die Scanprozesse müssen sich untereinander absprechen, um Duplikate zu
83 vermeiden. Dies ist zwangsläufig Interprozeßkommunikation (z.B. über eine
84 SQL-Datenbank), die sehr aufwendig zu implementieren ist. Hinzu kommt, das
85 einzelne Jobs zuerst markiert werden müssen ("in Arbeit"), damit sie nicht
86 von mehreren Prozessen gleichzeitig bearbeitet werden, was jedoch sehr
87 schwierig ist, wenn man Wert darauf legt, Prozesse beliebig abbrechen zu
88 können, ohne Artikel zu verlieren.
89
90 * Das Zielsystem, ein Pentium-166-System, hat weder unendlich Rechen-
91 noch Speicherressourcen. Da Perl von beidem gerne viel nimmt, wäre die
92 Sättigung schon bei relativ wenigen Verbindungen erreicht. Stichwort Speicher:
93 jeder Prozeß benötigt einen Interpreter, eine Kopie der
94 libc-Variablen, eine eigene Kopie des Scanprogramms und seine eigene
95 SQL-Anbindung. Auch moderne Systeme mit einem effizienten C<fork()> leiden
96 darunter, da gerade Perl nicht zimperlich mit dem Speicher umgeht.
97
98 Die Lösung (klar!) lag im Event-Modul. Da alle Verbindungen von einem
99 Prozeß bearbeitet werden, gibt es keine Synchronisationsprobleme. Der
100 Overhead pro Verbindung beschränkt sich ebenfalls auf einen Hash, und das
101 umschalten von Prozessen entfällt ebenfalls (schneller).
102
103 H1: Die Implementation
104
105 Die folgenden Abschnitte stellen die wichtigsten "Knotenpunkte" des
106 Scanprogrammes vor. Jedesmal wird kurz das Problem erläutert und die
107 Lösung mit Hilfe des C<Event>-Moduls diskutiert.
108
109 H2: Der "Scheduler"
110
111 Der komplizierteste Teil des Programmes ist der Scheduler: Er verteilt
112 einzelne Jobs auf die Scanner, bzw. beendet das Programm, wenn alle Jobs
113 abgearbeitet wurden. Es gibt nur zwei Typen von "Jobs":
114
115 * 'S': {{1:S}}canne eine Gruppe. Der Scanner sucht eine bestimmte
116 Newsgruppe auf dem Server und stellt mit Hilfe einer SQL-Tabelle fest,
117 welche Artikel(-nummern) noch nicht gescannt wurden.
118
119 * 'A': {{1:A}}rtikel holen. Da Gruppen Tausende von Artikeln enthalten
120 können, wird nur ein "Job" pro Gruppe erzeugt. Ein Scanner sucht sich eine
121 Artikelnummer aus, bearbeitet sie und legt die restlichen wieder zurück in
122 die Warteschlange.
123
124 Ein "Scanner" ist dabei kein Prozeß, sondern nur eine Instanz der
125 C<Scanner>-Klasse, in der im wesentlichen der Zustand einer Verbindung
126 gespeichert wird (Server, Port, aktuelle Gruppe...). Für jede potentielle
127 Verbindung wird ein solches Objekt erzeugt. Für hundert Verbindungen sieht
128 das z.B. so aus:
129
130 >new Scanner for 1..100;
131
132 Die Objekte reihen sich automatisch in die C<idle>-Warteschlange ein.
133
134 Beim Programmstart werden alle Server- und Gruppen aus einer Datei gelesen und in die Job-Warteschlange eingefügt. Dann wird
135 in die Hauptschleife gesprungen:
136
137 >Scanner::loop(); # Hauptschleife
138
139 !block perl
140 sub loop {
141 while (@queue || @idle < $scanners) {
142 runq;
143 Event::loop;
144 }
145 }
146 !endblock
147
148 Dabei stehen die zu bearbeitenden Jobs in C<@queue> und die verfügbaren
149 Scanner-Objekte in C<@idle>. Solange noch Jobs vorhanden sind (C<@queue !=
150 0>) und nicht alle (C<$scanners>) Scanner idlen, wird C<runq> aufgerufen
151 und in die Hauptschleife von C<Event> gesprungen.
152
153 C<runq> (das steht für "run queue") nimmt Jobs aus der Warteschlange und
154 teilt sie verfügbaren Scannern zu. Der Algorithmus ist sehr primitiv
155 (FCFS) und könnte wesentlich verbessert werden. Wichtig ist, daß die
156 Lastverteilung in diesen wenigen Zeilen stattfindet und sehr gut
157 lokalisiert und damit sehr einfach änderbar ist.
158
159 !block perl
160 sub runq {
161 while (@queue && @idle) {
162 my $c = pop @queue;
163 my $s = pop @idle;
164 $s->run(@$c);
165 }
166 Event::unloop_all unless @queue || @idle < $scanners;
167 }
168 !endblock
169
170 Der Aufruf von C<unloop_all> beendet alle Event-Schleifen, wenn alle Jobs abgearbeitet wurden.
171
172 H2: Job Management & Rescheduling
173
174 Um neue Jobs in das System einzufügen, gibt die Funktion C<add_job>:
175
176 !block perl
177 sub add_job {
178 push @queue, [@_];
179 $reschedule->start if @idle;
180 }
181 !endblock
182
183 Die wichtigste Teil ist der Aufruf von {{C:$reschedule->start}}: Wenn
184 ein Scanner verfügbar ist (C<@idle> nicht leer ist), muß der Scheduler
185 aufgerufen werden. Da der Aufruf von C<add_job> sehr häufig ist, und
186 der Scheduler (C<loop>) eine Rekursion bedeutet, wird er nicht direkt
187 aufgerufen, sondern nur, wenn sonst keine Ereignisse anliegen. Dies wird
188 mit einem C<idle>-Event-Handler erreicht, der in der globalen Variable
189 C<$reschedule> steht:
190
191 !block perl
192 my $reschedule = Event->idle(
193 desc => "reschedule hook",
194 max => 5,
195 cb => sub {
196 $_[0]->w->stop;
197 Event::unloop;
198 }
199 );
200 $reschedule->stop;
201 !endblock
202
203 {{C:Event->idle}} ist der {{Konstruktor}}, der einen Ereignis-Handler
204 vom Typ "idle" erzeugt. Die einzelnen Attribute bedeuten:
205
206 !block table
207 Attribut Beschreibung
208 desc Eine Beschreibung, z.B. für das C<NetServer::ProcessTop>-Modul.
209 max Zeit (in Sekunden) nach dem der Callback {{auf jeden Fall}} ausgeführt wird.
210 cb Die Callback-Funktion, die aufgerufen wird.
211 !endblock
212
213 Übertragen auf den C<$rescheduler> bedeutet dies, daß aus der
214 Event-Schleife gesprungen wird, wenn gerade kein Datentransfer oder
215 sonstige Aufgaben anliegen, {{oder nach fünf Sekunden}}, je nachdem,
216 was früher eintrit. Diese Einschränkung verhindert, das ein schnell
217 eintreffender Artikel den gesamten Prozeß "am Laufen hält" und damit
218 verhindert, das freie (idle) Scanner nicht mit neuen Jobs versorgt werden.
219
220 Wenn der Callback angesprungen wird, bekommt er ein {{Ereignis-Objekt}}
221 übergeben (unter X entspricht dies einem C<XEvent>, bei Gtk ist es ein
222 C<Gdk::Event>). Als erstes sucht er über dieses Ereignis-Objekt (in
223 C<$_[0]>) den ursprünglichen {{Watcher}} ({{C:$_[0]->w}}, "w" steht für
224 "watcher") und ruft die C<stop>-Methode auf. Damit wird erreicht, daß
225 der Callback nicht mehr aufgerufen wird, bis er das nächste mal gestartet
226 wird (z.B. in C<add_job>). {{C:$_[0]->w->stop}} ist übrigens das gleiche
227 wie {{C:$rescheduler->stop}}, die Variable C<$rescheduler> ist wegen C<my>
228 jedoch erst {{nach}} dem Aufruf des Konstruktors sichtbar.
229
230 Das zweite (und wichtigste) was der Callback unternimmt, ist, den
231 eigentlichen Scheduler wieder anzuspringen C<loop>. In C<loop> wurde die
232 Hauptschleife des Event-Moduls aufgerufen (C<Event::loop>): C<unloop> ist
233 das Gegenstück dazu und springt aus dieser Schleife heraus, so daß der
234 Scheduler neue Jobs verteilen kann.
235
236 H3: Beendigung eines Jobs
237
238 Wenn ein Scanner-Objekt einen Job verarbeitet hat, muß es sich wieder in
239 die C<@idle>-Queue eintragen:
240
241 !block perl
242 sub idle {
243 my $self = shift;
244 push @idle, $self;
245 $reschedule->start;
246 }
247 !endblock
248
249 der Aufbau gleicht C<add_job>.
250
251 H2: Die Jobschleife
252
253 Für die Abarbeitung der Jobs ist die Methode C<run> zuständig. Sie hat
254 mindestens drei Parameter: C<self> (das Scanner-Objekt), C<host> (der
255 NNTP-Server, inkl. Port) und C<cmd> (der Jobtyp).
256
257 Da das NNTP-Protokoll "stateful" ist, muß der aktuelle NNTP-Server und
258 die aktuelle Gruppe gespeichert werden. Gilt der neue Job für denselben
259 Rechner und dieselbe Gruppe (der Normalfall) passiert nichts, ansonsten
260 wird die Verbindung zum NNTP-Server neu aufgebaut, bzw. die Gruppe
261 gewechselt.
262
263 Das Aufbauen der NNTP-Verbindung ist ein Problem für den Event-Ansatz: ein
264 C<connect>-Aufruf {{blockiert}} den Prozeß, bis entweder die Verbindung
265 steht oder ein Fehler passiert. Da ein solcher C<connect> einige Sekunden
266 benötigen kann (bei Netzwerkproblemen auch wesentlich länger), müssen sog.
267 "non-blocking-calls" verwendet werden.
268
269 Das ist auch der Grund, weshalb das Programm auf Standardmodule wie
270 C<IO::Socket> oder C<Net::NNTP> verzichten muß: Unterstützung für
271 nicht-blockierende Aufrufe ist kaum oder überhaupt nicht vorhanden. Das
272 C<Net::NNTP>-Modul ist in dieser Hinsicht besonders schlecht, dnen
273 man kann die entsprechende Methoden nicht einfach in einer Subklasse
274 überschreiben.
275
276 Der schwierigste Teil war der Aufruf von C<connect>, der ebenfalls nicht
277 blockieren sollte:
278
279 !block perl
280 if (socket $fd, PF_INET, SOCK_STREAM, getprotobyname 'tcp') {
281 sub TCP_NODELAY(){1} sub SOL_TCP(){6}; # linux-2.2
282 setsockopt $fd, SOL_TCP, TCP_NODELAY, 1;
283 fcntl $fd, F_SETFL, O_NONBLOCK;
284 connect $fd, sockaddr_in $port, inet_aton($ip);
285 fcntl $fd, F_SETFL, 0;
286 } else {
287 undef $fd;
288 }
289 !endblock
290
291 Einige Konstanten (z.B. C<SOL_TCP>) sind in Perl nicht einfach zu
292 bekommen. Da das Script mehr ein Hack als eine professionelle Anwendung
293 ist, wurden sie einfach hardcodiert.
294
295 Wenn der Server gewechselt wird, wechselt auch der Filehandle, so daß eine neuer
296 Event-Watcher erzeugt werden muß:
297
298 !block perl
299 ($self->{w} = Event->io(fd => fileno $fd, poll => 'r'))->stop;
300 !endblock
301
302 H2: NNTP-Befehle
303
304 Das NNTP-Protokoll ist sehr einfach: Kommandos bestehen aus einer
305 einzelnen Textzeile, Antworten aus einem Zifferncode und einer
306 beschreibenden Textzeile. Artikel werden als Textblock übertragen, wobei
307 die letzte Zeile einen einzelnen Punkt als Endekennung enthält.
308
309 Das Absetzen eines Befehls geschieht über die Methode C<rcb>. Ihr werden
310 zwei Argumente übergeben, das Kommando (ohne Zeilenende) und eine
311 {{1:Callback}}-Funktion. Das Kommando wird an den NNTP-Server geschickt,
312 die Callback-Funktion wird aufgerufen, wenn die erste Zeile der Antwort
313 angekommen ist (mit dem Statuscode).
314
315 Dies wird erreicht, indem der Event-Watcher für die NNTP-Verbindung
316 (C<$self->{w}>) gefüttert und gestartet wird:
317
318 !block perl
319 sub rcb {
320 my $self = shift;
321 my $cmd = shift;
322 my $cb = shift;
323 if ($cmd) {
324 $self->command($cmd);
325 } else {
326 $cmd = "<anonymous command>";
327 }
328
329 $self->{w}->desc($cmd);
330 $self->{w}->cb(sub {
331 $self->{w}->stop;
332 $cb->($self);
333 });
334 $self->{w}->start;
335 }
336 !endblock
337
338 Falls eine Befehl (C<$cmd>) übergeben wurde, wird dieser über die Leitung
339 gepustet (C<$self->command>) und als beschreibender Text verwendet. Mit
340 C<desc> wird diese Beschreibung gesetzt (hilfreich zum Debuggen oder
341 Tollfühlen, wenn es hinterher funktioniert).
342
343 Dann wird der Callback (C<cb>) gesetzt, der lediglich den Watcher
344 stoppt (Befehle sind einmalige Angelegenheiten) und die {{eigentliche}}
345 Callback-Funktion aufruft, und schließlich wird der Watcher gestartet.
346
347 H2: Lesen der Antwort
348
349 Der schwierigste Teil des Skriptes ist das zeilenweise Lesen, das vom
350 NNTP-Protokoll vorausgesetzt wird. Da Perl von sich aus (noch) keinerlei
351 Support dafür anbietet ({{C:<>}} blockiert den Prozeß oder liefert
352 keine ganzen Zeilen zurück), mußte das Zusammensetzen der Zeilen selbst
353 implementiert werden.
354
355 Grundlage dafür ist die Methode C<refill>, die alle Zeichen liest, die
356 angekommen sind (ohne zu blockieren) und sie in einem Puffer ablegt:
357
358 !block perl
359 sub refill {
360 my $self = shift;
361 my $wait = shift;
362 my $fd = $self->{fd};
363 fcntl $fd, F_SETFL, O_NONBLOCK;
364 for(;;) {
365 my $r = sysread $fd, $self->{buff}, 32768, length $self->{buff};
366 if ($r>0) {
367 last;
368 } elsif (!defined $r && $! == EAGAIN) {
369 last unless $wait;
370 $self->{w}->cb(sub { $self->{w}->stop; Event::unloop });
371 $self->{w}->start;
372 Event::loop();
373 } else {
374 $self->{buff} = "500 I/O error: $!\015\012.\015\012";
375 delete $self->{host};
376 last;
377 }
378 }
379 fcntl $fd, F_SETFL, 0;
380 }
381 !endblock
382
383 Das Argument C<$wait> bestimmt, ob auf jeden Fall gewartet werden soll,
384 oder ob C<refill> zurückkehren soll, auch wenn keine neuen Daten verfügbar
385 sind. Letzteres ist außerst selten der Fall und wurde entsprechend
386 ineffizient implementiert, indem ein "leerer" Watcher gestartet wird und
387 dann auf dessen Unloop gewartet wird.
388
389 Als nächstes in der Hierarchy steht C<getline>, das einfach die nächste
390 Zeile liefert, notfalls durch Warten:
391
392 !block perl
393 sub getline {
394 my $self = shift;
395 $self->refill(1) while $self->{buff} !~ s/^([^\015\012]*)\015\012//o;
396 $1;
397 }
398 !endblock
399
400 Sie ist sehr einfach: gibt es schon eine ganze Zeile im Puffer, dann
401 schneide sie heraus und gib sie zurück. Nicht sehr effizient, aber einfach
402 zu benutzen.
403
404 Sie wird benutzt von C<response>, wo die Zeile in ihre beiden Kompomenten
405 (Statuscode, Meldung) zerlegt wird, und die erste Ziffer des Statuscodes
406 zurückgegeben wird (der für das weitere Vorgehen am entscheidensten ist).
407
408 !block perl
409 sub response {
410 my $self = shift;
411 @{$self}{'code','message'} = split m/ /, $self->getline, 2;
412 substr $self->{code}, 0, 1;
413 }
414 !endblock
415
416 H2: Scannen einer Gruppe
417
418 Um herauszufinden, welche Artikel seit dem letzten Mal neu hinzugekommen sind,
419 wird die Statusmeldung ausgewertet, die der Server beim Wechsel in eine Gruppe liefert:
420
421 !block example
422 {{BEFEHL }} GROUP comp.lang.perl.moderated
423 {{ANTWORT}} 211 125 4886 5010 comp.lang.perl.moderated group selected
424 !endblock
425
426 C<211> ist der Statuscode für "O.K.", C<125> ist die Zahl der Artikel,
427 C<4886> ist die erste und C<5010> die letzte Artikelnummer.
428
429 Dies ist eine ideale Anwendung für C<rcb>:
430
431 !block perl
432 $self->rcb("GROUP $group", sub {
433 if ($self->response == 2 && $self->{message} =~ /(\d+)\s+(\d+)\s+(\d+)/) {
434 my($count, $first, $last, $name) = ($1, $2, $3, $3);
435 if ($count) {
436 $self->slog("selected group $group");
437 $self->{group} = $group;
438 $self->{first} = $first;
439 $self->{last} = $last;
440 $cb->($self);
441 return;
442 } else {
443 $self->slog("SKIPPED empty group $group: ", substr($self->{message},0,-1));
444 }
445 } else {
446 $self->slog("SKIPPED bogus group $group on ".$self->{host}[0].": ", substr($self->{message},0,-1));
447 }
448 $self->idle;
449 });
450 !endblock
451
452 C<rcb> bekommt zwei Argumente übergeben: C<"GROUP $group"> ist das
453 NNTP-Kommando zum Wechseln der (News-) Gruppe, das zweite Argument ist die
454 Callback-Funktion, die die NNTP-Anwtort als Argument bekommt.
455
456 Die Verwendung einer Closure erlaubt es, Befehl (C<rcb>) und die Reaktion
457 (das C<sub {}>) direkt hintereinander zu schreiben, so, als wäre C<rcb>
458 ein "normaler", blockierender Aufruf zum Lesen einer Zeile, mit dem einzigen Unterschied, daß
459 die Auswertung des Ergebnisses im einem eingerückten Block stattfindent. Anders gesagt, aus:
460
461 !block perl
462 $response = $self->rcb("GROUP $group");
463
464 if ($response....) {
465 }
466 !endblock
467
468 wird:
469
470 !block perl
471 $self->rcb(GROUP $group", sub {
472
473 if ($response....) {
474 }
475 });
476 !endblock
477
478 C<rcb> kehrt jedoch sofort zurück (ein C<sub>, daß C<rcb> verwendet, kann
479 deshalb nicht sofort ein Resultat an den Aufrufer zurückliefern.
480
481 Die Information über die Gruppe (C<first> und C<last>, wird aus der
482 NNTP-Antwort genommen) wird später mit den Daten aus der SQL-Datenbank
483 verglichen (das hat allerdings nichts mit C<Event> zu tun):
484
485 !block perl
486 sub group_scan {
487 my $self = shift;
488 my $group = $self->{group};
489 my $todo = new Set::IntSpan $self->{first}."-".$self->{last};
490 $todo = $todo->intersect($self->gs_done->complement);
491 if ($todo->empty) {
492 $self->slog("[no new articles in $group]");
493 } else {
494 $self->slog("scanning group $group: ", $todo->run_list);
495 add_job($self->{host},'A',$group,$todo);
496 }
497 $self->idle;
498 }
499 !endblock
500
501 Das C<Set::IntSpan>-Modul wird dazu benutzt, um aus der Menge der
502 vorhandenen Artikel die bereits gescannten (die von C<gs_done>
503 zurückgegeben werden) zu entfernen. Ist die resultierende Menge nicht
504 leer, wird ein neuer Job ("hole alle diese Artikel") erzeugt.
505
506 H2: Holen eines Artikels
507
508 Das Holen geschieht in zwei Stufen. Zuerst wird die {{Message-Id}} mit
509 einem C<STAT>-Befehl ausgewertet. Damit wird außerdem festgestellt, ob ein
510 bestimmter Artikel überhaupt existiert.
511
512 !block perl
513 $self->rcb("STAT ".$self->{num}, \&got_stat);
514 !endblock
515
516 Ein Protokollbeispiel:
517
518 !block example
519 {{BEFEHL }} STAT 5010
520 {{ANTWORT}} 223 5010 <85j7jc$68n@junior.apk.net> article retrieved - request text separately
521 {{BEFEHL }} STAT 4977
522 {{ANTWORT}} 430 No such article: 4977
523 !endblock
524
525 Der Callback C<got_stat> wertet diese Information aus:
526
527 !block perl
528 sub got_stat {
529 my $self = shift;
530 my $r = $self->response;
531 $self->mark_article_done;
532
533 ($self->{mid}) = $self->{message} =~ /<([^>]+)>/g;
534
535 if ($r == 2) {
536 my $aid = sql_fetch("select count(*) from art where mid=? limit 1", "".$self->{mid});
537 $self->mark_article_present;
538 if ($aid) {
539 sql_exec("replace into lnk values (?,?)", $self->gid, $aid);
540 $self->idle;
541 } else {
542 $busy{$self->{mid}}++;
543 $stat_article++;
544 $self->rcb_dot("ARTICLE ".$self->{num}, \&got_article);
545 }
546 } else {
547 $self->idle;
548 }
549 }
550 !endblock
551
552 Existiert der Artikel nicht, ist der Job beendet und es wird in den
553 idle-Modus gegangen. Wurde er schon einmal geholt (z.B. in einer anderen Gruppe)
554 wird er nicht noch einmal geholt, sondern lediglich in die Gruppe "gelinkt" (Artikel können sehr groß werden).
555
556 Ansonsten wird ein C<ARTICLE>-Befehl abgesetzt, mit dem der gesamte Artikel geholt wird.
557
558 !block example
559 {{BEFEHL }} ARTICLE 5010
560 {{ANTWORT}} 220 5010 <85j7jc$68n@junior.apk.net> article retrieved - text follows
561 {{ANTWORT}} From: allbery@apk.net (Brandon S. Allbery KF8NH)
562 {{ANTWORT}} Newsgroups: comp.lang.perl.moderated
563 {{ANTWORT}} Subject: Re: Usefulness of Pseudo Hashes
564 {{ANTWORT}} Message-ID: <85j7jc$68n@junior.apk.net>
565 {{ANTWORT}}
566 {{ANTWORT}} Also sprach Alex Rhomberg <rhomberg@ife.ee.ethz.ch> (<384E39B8.D8635949@ife.ee.ethz.ch>):
567 {{ANTWORT}} +-----
568 {{ANTWORT}} | I wonder why pseudo hashes were invented
569 {{ANTWORT}} +--->8
570 {{ANTWORT}}
571 {{ANTWORT}} Sometimes you need an ordered list (so you can't use hashes) with keyed access
572 {{ANTWORT}} to the list (so lists/arrays are slow and a pain in the butt to use). Pseudo
573 {{ANTWORT}} hashes are a better solution than the usual hack of maintaining duplicate
574 {{ANTWORT}} information in a hash and an array/list.
575 {{ANTWORT}}
576 {{ANTWORT}} --
577 {{ANTWORT}} brandon s. allbery [os/2][linux][solaris][japh] allbery@kf8nh.apk.net
578 {{ANTWORT}} system administrator [WAY too many hats] allbery@ece.cmu.edu
579 {{ANTWORT}} carnegie mellon / electrical and computer engineering KF8NH
580 {{ANTWORT}} Kiss my bits, Billy-boy.
581 {{ANTWORT}} .
582 !endblock
583
584 Hierbei tritt das Problem auf, daß nach der Statuszeile ein Artikel
585 folgt. Deshalb wird statt C<rcb> die Methode C<rcb_dot> benutzt (das steht
586 für "read callback + data read until dot"):
587
588 !block perl
589 sub rcb_dot {
590 my $self = shift;
591 my $cmd = shift;
592 $self->{rcb_cb} = shift;
593 delete $self->{body};
594 $self->rcb($cmd, sub {
595 if ($self->response == 2) {
596 $self->{w}->cb([$self, 'rcb_cb']);
597 $self->{w}->start;
598 $self->rcb_cb;
599 } else {
600 $self->{rcb_cb}->($self);
601 }
602 });
603 }
604
605 sub rcb_cb {
606 my $self = shift;
607 $self->refill(0);
608 if ($self->{buff} =~ s/^\.\015\012|^(.*?)\015\012\.\015\012//s) {
609 $self->{body} .= $1;
610 $self->{w}->stop;
611 $self->{body} =~ s/\015\012/\n/g;
612 $self->{rcb_cb}->($self, delete $self->{body});
613 } elsif ($self->{buff} =~ s/^(.*\015\012)//s) {
614 $self->{body} .= $1;
615 }
616 }
617 !endblock
618
619 Der komplizierteste Teil ist C<rcb_cb>, in der die Artikeldaten
620 akkumuliert werden, wozu furchtbare regexes benutzt wurden. Im Gegensatz
621 zu vielen anderen Stellen wurden die Callbacks nicht durch Closures
622 implementiert, da Event+Closures im allgemeinen ein großes Memory-Leak
623 ist (soll ab Event-0.59 besser sein, aber man kann sichs nicht immer
624 ausssuchen).
625
626 H2: Updaten von SQL-Tabellen
627
628 Die Aufrufe C<mark_article_done> und C<mark_article_present> markieren
629 einen Artikel in der Datenbank als bearbeitet bzw. vorhanden. Sie setzen
630 einfach ein Element in der entsprechenden C<Set::IntSpan>-Menge.
631
632 Diese Mengen werden in einer SQL-Tabelle gespeichert. Da sie relativ groß
633 sind (einige Kilobytes), serh häufig geändert werden (bis zu 100 mal pro
634 Sekunde) und der Zielrechner sehr langsam ist, sollten die Tabellen nicht
635 bei jeder Änderung gespeichert werden. Dies wird mit einem C<idle>-Watcher
636 erreicht, der jedesmal gestartet wird, wenn sich die Daten ändern:
637
638 !block perl
639 my $save_gs = Event->idle(
640 desc => "groupstatus saver",
641 max => 60,
642 cb => sub {
643 $_[0]->w->stop;
644 # zurückschreiben der Tabellen
645 }
646 );
647 $save_gs->stop;
648
649 sub mark_article_done {
650 my $self = shift;
651 $gs{$self->hid,$self->gid}[0]->insert($self->{num});
652 $save_gs->start;
653 }
654 !endblock
655
656 Sollte der Draht so richtig dampfen, sorgt der Timeout von 60 Sekunden
657 dafür, daß bei einem Absturz maximal die letzte Minute fehlt. In der
658 Praxis wird er viel häufiger aufgerufen, nämlich dann, wenn alle
659 einkommenden Verbindungen einmal bedient wurden und noch keine weiteren
660 Daten angekommen sind.
661
662 H2: Künstliche "Lastsimulation"
663
664 Da der Test-Server auf der lokalen Maschine lief, mußte künstlich Last
665 erzeugt werden, um einigermaßen wirklichkeitsnahe Ergebnisse zu erhalten.
666 Die größten Zeitfaktoren bei NNTP sind die Latenz zum Server (abhängig von
667 der Entfernung) und die Bandbreite.
668
669 Um eine künstliche Latenz einzuführen, wird die C<command>-Funktion leicht abgeändert:
670
671 !block perl
672 sub command {
673 my ($self, $cmd) = @_;
674 Event->timer(after => rand, cb => sub {
675 $_[0]->w->cancel;
676 syswrite $self->{fd}, "$cmd\015\012";
677 });
678 }
679 !endblock
680
681 Statt das Kommando sofort zu verschicken, wird ein kurzer Timer
682 gestartet. Die Verzögerung liegt zwischen 0 und 1 Sekunde (C<rand>) und
683 sorgt für eine Streuung. Ohne diese zufällige Verzögerung würde ein
684 unerwünschtes Bearbietungsmuster entstehen, bei dem effektiv nur ein
685 Scan-Vorgang gleichzeitig stattfindet.
686
687 Die obige Version von C<command> schneidet in ihrer Kürze recht gut gegen
688 die "normale" Version ab:
689
690 !block perl
691 sub command {
692 my ($self, $cmd) = @_;
693 syswrite $self->{fd}, "$cmd\015\012";
694 }
695 !endblock
696
697 H2: C<NetServer::ProcessTop>
698
699 Ein recht interessantes Modul ist C<NetServer::ProcessTop>. Wird es
700 benutzt, bindet es sich auf einen TCP-Port, den man per C<telnet>
701 ansprechen kann, um ein C<top>-artiges Listing der Event-Watcher zu
702 bekommen, Außerdem kann man die Watcher edieren.
703
704 Die Benutzung ist denkbar einfach:
705
706 !block perl
707 eval {
708 require NetServer::ProcessTop;
709 NetServer::ProcessTop->new(7000);
710 };
711 !endblock
712
713 Ein C<telnet localhost 7000> erzeugt dann dieses Bild:
714
715 !block verbatim
716 get PID=3407 @ cerebro | 14:26:46 [ 60s]
717 10 events; load averages: 0.75, 0.73, 0.00; lag 0%
718
719 EID PRI STATE RAN TIME CPU TYPE DESCRIPTION P1
720 0 7 912 0:00 26.6% sys idle
721 3 4 zomb 227 0:00 16.9% io ARTICLE 273573
722 6 4 zomb 236 0:00 16.6% io ARTICLE 273572
723 4 4 sleep 232 0:00 16.4% io ARTICLE 273575
724 5 4 sleep 221 0:00 16.0% io ARTICLE 273574
725 9 4 wait 117 0:00 7.3% idle groupstatus saver
726 10 4 wait 180 0:00 0.3% idle reschedule hook
727 2 3 sleep 1 0:00 0.0% time Event::Stats
728 1 3 cpu 0 0:00 0.0% io NetServer::ProcessTop::Client localhost
729 7 3 sleep 0 0:00 0.0% io NetServer::ProcessTop
730 8 4 sleep 0 0:00 0.0% io user input
731 0 -1 0 0:00 0.0% sys other processes
732
733 %
734 !endblock
735
736 Weil das Modul aber ein potentielles Sicherheitsproblem sein kann, sollte
737 es nur zum Debuggen/Erfreuen verwendet werden.
738
739 A1: Der Quellcode
740
741 !include "get"; perl
742
743 A2: Mehr!
744
745 Die folgenden Module/Programme/RFCs wurden für das Projekt verwendet.
746
747 * C<Event> - Event loop processing. {{URL:http://www.cpan.org/}}
748 * C<Set::IntSpan> - Manages sets of integers. {{URL:http://www.cpan.org/}}
749 * C<RFC-977> Network News Transfer Protocol. {{URL:ftp://ftp.isi.edu/in-notes/rfc977.txt}}
750 * C<NetServer::ProcessTop> - Make event loop statistics easily available. {{URL:http://www.cpan.org/}}
751 * C<Time::HiRes> - High resolution ualarm, usleep, and gettimeofday. {{URL:http://www.cpan.org/}}
752 * C<Socket> - load the C socket.h defines and structure manipulators. (Teil der Perl-Distribution).
753 * C<DBI> - Database independent interface for Perl
754 * C<MySQL> SQL-Datebank. {{URL:http://www.mysql.com}}.
755
756