1 |
!init OPT_STYLE="paper" |
2 |
|
3 |
!define DOC_NAME "Einführung in Event::" |
4 |
!define DOC_TYPE "[Vortrag]" |
5 |
!define DOC_AUTHOR "(c) 2000 Marc Lehmann <pcg@goof.com>" |
6 |
!build_title |
7 |
|
8 |
!block abstract |
9 |
|
10 |
Wenn viele Jobs parallel ausgeführt werden sollen, eignet sich das |
11 |
bekannte fork-Paradigma von Unix nicht mehr: Die Interprozeßkommunikation |
12 |
und der Mehraufwand an Speicher und Ressourcen überwiegt dir Vorteile der |
13 |
einfacheren Programmstruktur bei weitem. Diese kurze Einführung in die |
14 |
Ereignis-gesteuerte Programmierung in Perl zeigt an einem konkreten Beispiel |
15 |
(News-Scanner), wie einfach sich selbst komplexe Strukturen in Perl |
16 |
realisieren lassen. |
17 |
|
18 |
!endblock |
19 |
|
20 |
H1: C<Event> in der Praxis --- oder wie man 500 Newsserver gleichzeitig scannt. |
21 |
|
22 |
H2: Ereignis-gesteuerte Programmierung? |
23 |
|
24 |
Zur Lösung paralleler ablaufender Prozesse sind heute drei Ansätze |
25 |
gebräuchlich: |
26 |
|
27 |
* Prozesse mit getrenntem Adressraum (z.B. mit C<fork>) |
28 |
* eng gekoppelte Prozesse mit gemeinsamen Adreßraum (z.B. mit {{1:pthreads}}) |
29 |
* Ereignis-gesteuerte Prozesse |
30 |
|
31 |
Jeder dieser Ansätze hat verschiedene Vor- und Nachteile: Das |
32 |
C<fork>-Modell ist sehr einfach zu programmieren und eignet sich |
33 |
besonders für einfache Probleme, die sozusagen in kleine "Stückzahlen" |
34 |
anfallen. Durch die Abschottung der Prozesse wird eine einfach |
35 |
Parallelisierung möglich, da die Prozesse (z.B.) auf unterschiedlichen |
36 |
Rechnern arbeiten können. Größter Nachteil ist die relative aufwendige |
37 |
Interprozeßkommunikation, die einen großen Overhead nach sich ziehen |
38 |
kann. |
39 |
|
40 |
{{1:Threads}} werden vielfach als das Mittel der Wahl angesehen. Der größte |
41 |
Vorteil von Threads ist das Vorhandensein mehrere Ablauf-Instanzen, |
42 |
die getrennt blockieren können. Leider werden Threads in den meisten |
43 |
Fällen nur dazu mißbraucht, das Blockieren des gesamten Prozesses zu |
44 |
verhindern (z.B. wenn Daten nicht sofort zur Verfügung stehen), werden |
45 |
also effektiv nur als Krücke für asynchrone-EA verwendet. Diesen Vorteil |
46 |
erkauft man sich durch eine zwar schnelle aber dafür extrem komplizierte |
47 |
Synchronisation innerhalb der Threads. {{1:Threads sind in in den seltensten |
48 |
Fällen die richtige Wahl für ein Problem.}} |
49 |
|
50 |
Ereignis-gesteuerte Programmierung beruht auf dem |
51 |
{{1:Callback}}-Prinzip: Eine zentrale Anlaufstelle innerhalb des Prozesses |
52 |
wartet auf Ereignisse (engl. "Events", also z.B. "Daten angekommen", |
53 |
"Zeit abgelaufen" etc...). Je nach Ereignis werden entsprechende |
54 |
Callback-Funktionen aufgerufen. Der Vorteil dieses Ansatzes ist eine |
55 |
übersichtliche Programmstruktur, eine extreme schnelle Kommunikation |
56 |
(nur ein Prozeß) und ein ressourcenschonendes Endprodukt. Auch dieser |
57 |
Ansatz hat seine Nachteile. Der größte ist, daß man bei vielen Problemen |
58 |
"Umdenken" muß, da sich Callbacks eben keine lineare Programmstruktur |
59 |
verwirklichen läßt ({{1:Closures}} können dabei jedoch helfen). Außerdem |
60 |
muß man sich bewußt sein, das ein blockierender Funktionsaufruf |
61 |
(z.B. C<read>) das gesamte Programm anhält. |
62 |
|
63 |
H2: Das Problem... |
64 |
|
65 |
{{...}}ist oberflächlich betrachtet, recht einfach: Eine (kleine) Menge |
66 |
von Usenet-Servern soll nach Newsgruppen abgesucht werden. Das kann |
67 |
auf faire Weise geschehen: man öffnet eine NNTP-Verbindung und schickt |
68 |
Requests. Dies läßt sich durch Pipelining (senden mehrere Befehle |
69 |
gleichzeitig) beschleunigen. Durch die Zeiten, die der News-Server |
70 |
benötigt um Artikel zu suchen, wird die Datenrate in der Praxis allerdings |
71 |
drastisch beschränkt. |
72 |
|
73 |
Also die unfaire Weise: statt einer öffnet man 5, 10 oder gleich mehrere |
74 |
hundert Verbindungen zu einem (oder mehreren) Servern und verteilt so die |
75 |
Verbindungslatenz und die Antwortzeit. |
76 |
|
77 |
H2: Die Planung |
78 |
|
79 |
Die (für mich) naheligende Idee, dies mit mehreren Scanprozessen zu |
80 |
implementieren, scheiterte an zwei Problemen: |
81 |
|
82 |
* Die Scanprozesse müssen sich untereinander absprechen, um Duplikate zu |
83 |
vermeiden. Dies ist zwangsläufig Interprozeßkommunikation (z.B. über eine |
84 |
SQL-Datenbank), die sehr aufwendig zu implementieren ist. Hinzu kommt, das |
85 |
einzelne Jobs zuerst markiert werden müssen ("in Arbeit"), damit sie nicht |
86 |
von mehreren Prozessen gleichzeitig bearbeitet werden, was jedoch sehr |
87 |
schwierig ist, wenn man Wert darauf legt, Prozesse beliebig abbrechen zu |
88 |
können, ohne Artikel zu verlieren. |
89 |
|
90 |
* Das Zielsystem, ein Pentium-166-System, hat weder unendlich Rechen- |
91 |
noch Speicherressourcen. Da Perl von beidem gerne viel nimmt, wäre die |
92 |
Sättigung schon bei relativ wenigen Verbindungen erreicht. Stichwort Speicher: |
93 |
jeder Prozeß benötigt einen Interpreter, eine Kopie der |
94 |
libc-Variablen, eine eigene Kopie des Scanprogramms und seine eigene |
95 |
SQL-Anbindung. Auch moderne Systeme mit einem effizienten C<fork()> leiden |
96 |
darunter, da gerade Perl nicht zimperlich mit dem Speicher umgeht. |
97 |
|
98 |
Die Lösung (klar!) lag im Event-Modul. Da alle Verbindungen von einem |
99 |
Prozeß bearbeitet werden, gibt es keine Synchronisationsprobleme. Der |
100 |
Overhead pro Verbindung beschränkt sich ebenfalls auf einen Hash, und das |
101 |
umschalten von Prozessen entfällt ebenfalls (schneller). |
102 |
|
103 |
H1: Die Implementation |
104 |
|
105 |
Die folgenden Abschnitte stellen die wichtigsten "Knotenpunkte" des |
106 |
Scanprogrammes vor. Jedesmal wird kurz das Problem erläutert und die |
107 |
Lösung mit Hilfe des C<Event>-Moduls diskutiert. |
108 |
|
109 |
H2: Der "Scheduler" |
110 |
|
111 |
Der komplizierteste Teil des Programmes ist der Scheduler: Er verteilt |
112 |
einzelne Jobs auf die Scanner, bzw. beendet das Programm, wenn alle Jobs |
113 |
abgearbeitet wurden. Es gibt nur zwei Typen von "Jobs": |
114 |
|
115 |
* 'S': {{1:S}}canne eine Gruppe. Der Scanner sucht eine bestimmte |
116 |
Newsgruppe auf dem Server und stellt mit Hilfe einer SQL-Tabelle fest, |
117 |
welche Artikel(-nummern) noch nicht gescannt wurden. |
118 |
|
119 |
* 'A': {{1:A}}rtikel holen. Da Gruppen Tausende von Artikeln enthalten |
120 |
können, wird nur ein "Job" pro Gruppe erzeugt. Ein Scanner sucht sich eine |
121 |
Artikelnummer aus, bearbeitet sie und legt die restlichen wieder zurück in |
122 |
die Warteschlange. |
123 |
|
124 |
Ein "Scanner" ist dabei kein Prozeß, sondern nur eine Instanz der |
125 |
C<Scanner>-Klasse, in der im wesentlichen der Zustand einer Verbindung |
126 |
gespeichert wird (Server, Port, aktuelle Gruppe...). Für jede potentielle |
127 |
Verbindung wird ein solches Objekt erzeugt. Für hundert Verbindungen sieht |
128 |
das z.B. so aus: |
129 |
|
130 |
>new Scanner for 1..100; |
131 |
|
132 |
Die Objekte reihen sich automatisch in die C<idle>-Warteschlange ein. |
133 |
|
134 |
Beim Programmstart werden alle Server- und Gruppen aus einer Datei gelesen und in die Job-Warteschlange eingefügt. Dann wird |
135 |
in die Hauptschleife gesprungen: |
136 |
|
137 |
>Scanner::loop(); # Hauptschleife |
138 |
|
139 |
!block perl |
140 |
sub loop { |
141 |
while (@queue || @idle < $scanners) { |
142 |
runq; |
143 |
Event::loop; |
144 |
} |
145 |
} |
146 |
!endblock |
147 |
|
148 |
Dabei stehen die zu bearbeitenden Jobs in C<@queue> und die verfügbaren |
149 |
Scanner-Objekte in C<@idle>. Solange noch Jobs vorhanden sind (C<@queue != |
150 |
0>) und nicht alle (C<$scanners>) Scanner idlen, wird C<runq> aufgerufen |
151 |
und in die Hauptschleife von C<Event> gesprungen. |
152 |
|
153 |
C<runq> (das steht für "run queue") nimmt Jobs aus der Warteschlange und |
154 |
teilt sie verfügbaren Scannern zu. Der Algorithmus ist sehr primitiv |
155 |
(FCFS) und könnte wesentlich verbessert werden. Wichtig ist, daß die |
156 |
Lastverteilung in diesen wenigen Zeilen stattfindet und sehr gut |
157 |
lokalisiert und damit sehr einfach änderbar ist. |
158 |
|
159 |
!block perl |
160 |
sub runq { |
161 |
while (@queue && @idle) { |
162 |
my $c = pop @queue; |
163 |
my $s = pop @idle; |
164 |
$s->run(@$c); |
165 |
} |
166 |
Event::unloop_all unless @queue || @idle < $scanners; |
167 |
} |
168 |
!endblock |
169 |
|
170 |
Der Aufruf von C<unloop_all> beendet alle Event-Schleifen, wenn alle Jobs abgearbeitet wurden. |
171 |
|
172 |
H2: Job Management & Rescheduling |
173 |
|
174 |
Um neue Jobs in das System einzufügen, gibt die Funktion C<add_job>: |
175 |
|
176 |
!block perl |
177 |
sub add_job { |
178 |
push @queue, [@_]; |
179 |
$reschedule->start if @idle; |
180 |
} |
181 |
!endblock |
182 |
|
183 |
Die wichtigste Teil ist der Aufruf von {{C:$reschedule->start}}: Wenn |
184 |
ein Scanner verfügbar ist (C<@idle> nicht leer ist), muß der Scheduler |
185 |
aufgerufen werden. Da der Aufruf von C<add_job> sehr häufig ist, und |
186 |
der Scheduler (C<loop>) eine Rekursion bedeutet, wird er nicht direkt |
187 |
aufgerufen, sondern nur, wenn sonst keine Ereignisse anliegen. Dies wird |
188 |
mit einem C<idle>-Event-Handler erreicht, der in der globalen Variable |
189 |
C<$reschedule> steht: |
190 |
|
191 |
!block perl |
192 |
my $reschedule = Event->idle( |
193 |
desc => "reschedule hook", |
194 |
max => 5, |
195 |
cb => sub { |
196 |
$_[0]->w->stop; |
197 |
Event::unloop; |
198 |
} |
199 |
); |
200 |
$reschedule->stop; |
201 |
!endblock |
202 |
|
203 |
{{C:Event->idle}} ist der {{Konstruktor}}, der einen Ereignis-Handler |
204 |
vom Typ "idle" erzeugt. Die einzelnen Attribute bedeuten: |
205 |
|
206 |
!block table |
207 |
Attribut Beschreibung |
208 |
desc Eine Beschreibung, z.B. für das C<NetServer::ProcessTop>-Modul. |
209 |
max Zeit (in Sekunden) nach dem der Callback {{auf jeden Fall}} ausgeführt wird. |
210 |
cb Die Callback-Funktion, die aufgerufen wird. |
211 |
!endblock |
212 |
|
213 |
Übertragen auf den C<$rescheduler> bedeutet dies, daß aus der |
214 |
Event-Schleife gesprungen wird, wenn gerade kein Datentransfer oder |
215 |
sonstige Aufgaben anliegen, {{oder nach fünf Sekunden}}, je nachdem, |
216 |
was früher eintrit. Diese Einschränkung verhindert, das ein schnell |
217 |
eintreffender Artikel den gesamten Prozeß "am Laufen hält" und damit |
218 |
verhindert, das freie (idle) Scanner nicht mit neuen Jobs versorgt werden. |
219 |
|
220 |
Wenn der Callback angesprungen wird, bekommt er ein {{Ereignis-Objekt}} |
221 |
übergeben (unter X entspricht dies einem C<XEvent>, bei Gtk ist es ein |
222 |
C<Gdk::Event>). Als erstes sucht er über dieses Ereignis-Objekt (in |
223 |
C<$_[0]>) den ursprünglichen {{Watcher}} ({{C:$_[0]->w}}, "w" steht für |
224 |
"watcher") und ruft die C<stop>-Methode auf. Damit wird erreicht, daß |
225 |
der Callback nicht mehr aufgerufen wird, bis er das nächste mal gestartet |
226 |
wird (z.B. in C<add_job>). {{C:$_[0]->w->stop}} ist übrigens das gleiche |
227 |
wie {{C:$rescheduler->stop}}, die Variable C<$rescheduler> ist wegen C<my> |
228 |
jedoch erst {{nach}} dem Aufruf des Konstruktors sichtbar. |
229 |
|
230 |
Das zweite (und wichtigste) was der Callback unternimmt, ist, den |
231 |
eigentlichen Scheduler wieder anzuspringen C<loop>. In C<loop> wurde die |
232 |
Hauptschleife des Event-Moduls aufgerufen (C<Event::loop>): C<unloop> ist |
233 |
das Gegenstück dazu und springt aus dieser Schleife heraus, so daß der |
234 |
Scheduler neue Jobs verteilen kann. |
235 |
|
236 |
H3: Beendigung eines Jobs |
237 |
|
238 |
Wenn ein Scanner-Objekt einen Job verarbeitet hat, muß es sich wieder in |
239 |
die C<@idle>-Queue eintragen: |
240 |
|
241 |
!block perl |
242 |
sub idle { |
243 |
my $self = shift; |
244 |
push @idle, $self; |
245 |
$reschedule->start; |
246 |
} |
247 |
!endblock |
248 |
|
249 |
der Aufbau gleicht C<add_job>. |
250 |
|
251 |
H2: Die Jobschleife |
252 |
|
253 |
Für die Abarbeitung der Jobs ist die Methode C<run> zuständig. Sie hat |
254 |
mindestens drei Parameter: C<self> (das Scanner-Objekt), C<host> (der |
255 |
NNTP-Server, inkl. Port) und C<cmd> (der Jobtyp). |
256 |
|
257 |
Da das NNTP-Protokoll "stateful" ist, muß der aktuelle NNTP-Server und |
258 |
die aktuelle Gruppe gespeichert werden. Gilt der neue Job für denselben |
259 |
Rechner und dieselbe Gruppe (der Normalfall) passiert nichts, ansonsten |
260 |
wird die Verbindung zum NNTP-Server neu aufgebaut, bzw. die Gruppe |
261 |
gewechselt. |
262 |
|
263 |
Das Aufbauen der NNTP-Verbindung ist ein Problem für den Event-Ansatz: ein |
264 |
C<connect>-Aufruf {{blockiert}} den Prozeß, bis entweder die Verbindung |
265 |
steht oder ein Fehler passiert. Da ein solcher C<connect> einige Sekunden |
266 |
benötigen kann (bei Netzwerkproblemen auch wesentlich länger), müssen sog. |
267 |
"non-blocking-calls" verwendet werden. |
268 |
|
269 |
Das ist auch der Grund, weshalb das Programm auf Standardmodule wie |
270 |
C<IO::Socket> oder C<Net::NNTP> verzichten muß: Unterstützung für |
271 |
nicht-blockierende Aufrufe ist kaum oder überhaupt nicht vorhanden. Das |
272 |
C<Net::NNTP>-Modul ist in dieser Hinsicht besonders schlecht, dnen |
273 |
man kann die entsprechende Methoden nicht einfach in einer Subklasse |
274 |
überschreiben. |
275 |
|
276 |
Der schwierigste Teil war der Aufruf von C<connect>, der ebenfalls nicht |
277 |
blockieren sollte: |
278 |
|
279 |
!block perl |
280 |
if (socket $fd, PF_INET, SOCK_STREAM, getprotobyname 'tcp') { |
281 |
sub TCP_NODELAY(){1} sub SOL_TCP(){6}; # linux-2.2 |
282 |
setsockopt $fd, SOL_TCP, TCP_NODELAY, 1; |
283 |
fcntl $fd, F_SETFL, O_NONBLOCK; |
284 |
connect $fd, sockaddr_in $port, inet_aton($ip); |
285 |
fcntl $fd, F_SETFL, 0; |
286 |
} else { |
287 |
undef $fd; |
288 |
} |
289 |
!endblock |
290 |
|
291 |
Einige Konstanten (z.B. C<SOL_TCP>) sind in Perl nicht einfach zu |
292 |
bekommen. Da das Script mehr ein Hack als eine professionelle Anwendung |
293 |
ist, wurden sie einfach hardcodiert. |
294 |
|
295 |
Wenn der Server gewechselt wird, wechselt auch der Filehandle, so daß eine neuer |
296 |
Event-Watcher erzeugt werden muß: |
297 |
|
298 |
!block perl |
299 |
($self->{w} = Event->io(fd => fileno $fd, poll => 'r'))->stop; |
300 |
!endblock |
301 |
|
302 |
H2: NNTP-Befehle |
303 |
|
304 |
Das NNTP-Protokoll ist sehr einfach: Kommandos bestehen aus einer |
305 |
einzelnen Textzeile, Antworten aus einem Zifferncode und einer |
306 |
beschreibenden Textzeile. Artikel werden als Textblock übertragen, wobei |
307 |
die letzte Zeile einen einzelnen Punkt als Endekennung enthält. |
308 |
|
309 |
Das Absetzen eines Befehls geschieht über die Methode C<rcb>. Ihr werden |
310 |
zwei Argumente übergeben, das Kommando (ohne Zeilenende) und eine |
311 |
{{1:Callback}}-Funktion. Das Kommando wird an den NNTP-Server geschickt, |
312 |
die Callback-Funktion wird aufgerufen, wenn die erste Zeile der Antwort |
313 |
angekommen ist (mit dem Statuscode). |
314 |
|
315 |
Dies wird erreicht, indem der Event-Watcher für die NNTP-Verbindung |
316 |
(C<$self->{w}>) gefüttert und gestartet wird: |
317 |
|
318 |
!block perl |
319 |
sub rcb { |
320 |
my $self = shift; |
321 |
my $cmd = shift; |
322 |
my $cb = shift; |
323 |
if ($cmd) { |
324 |
$self->command($cmd); |
325 |
} else { |
326 |
$cmd = "<anonymous command>"; |
327 |
} |
328 |
|
329 |
$self->{w}->desc($cmd); |
330 |
$self->{w}->cb(sub { |
331 |
$self->{w}->stop; |
332 |
$cb->($self); |
333 |
}); |
334 |
$self->{w}->start; |
335 |
} |
336 |
!endblock |
337 |
|
338 |
Falls eine Befehl (C<$cmd>) übergeben wurde, wird dieser über die Leitung |
339 |
gepustet (C<$self->command>) und als beschreibender Text verwendet. Mit |
340 |
C<desc> wird diese Beschreibung gesetzt (hilfreich zum Debuggen oder |
341 |
Tollfühlen, wenn es hinterher funktioniert). |
342 |
|
343 |
Dann wird der Callback (C<cb>) gesetzt, der lediglich den Watcher |
344 |
stoppt (Befehle sind einmalige Angelegenheiten) und die {{eigentliche}} |
345 |
Callback-Funktion aufruft, und schließlich wird der Watcher gestartet. |
346 |
|
347 |
H2: Lesen der Antwort |
348 |
|
349 |
Der schwierigste Teil des Skriptes ist das zeilenweise Lesen, das vom |
350 |
NNTP-Protokoll vorausgesetzt wird. Da Perl von sich aus (noch) keinerlei |
351 |
Support dafür anbietet ({{C:<>}} blockiert den Prozeß oder liefert |
352 |
keine ganzen Zeilen zurück), mußte das Zusammensetzen der Zeilen selbst |
353 |
implementiert werden. |
354 |
|
355 |
Grundlage dafür ist die Methode C<refill>, die alle Zeichen liest, die |
356 |
angekommen sind (ohne zu blockieren) und sie in einem Puffer ablegt: |
357 |
|
358 |
!block perl |
359 |
sub refill { |
360 |
my $self = shift; |
361 |
my $wait = shift; |
362 |
my $fd = $self->{fd}; |
363 |
fcntl $fd, F_SETFL, O_NONBLOCK; |
364 |
for(;;) { |
365 |
my $r = sysread $fd, $self->{buff}, 32768, length $self->{buff}; |
366 |
if ($r>0) { |
367 |
last; |
368 |
} elsif (!defined $r && $! == EAGAIN) { |
369 |
last unless $wait; |
370 |
$self->{w}->cb(sub { $self->{w}->stop; Event::unloop }); |
371 |
$self->{w}->start; |
372 |
Event::loop(); |
373 |
} else { |
374 |
$self->{buff} = "500 I/O error: $!\015\012.\015\012"; |
375 |
delete $self->{host}; |
376 |
last; |
377 |
} |
378 |
} |
379 |
fcntl $fd, F_SETFL, 0; |
380 |
} |
381 |
!endblock |
382 |
|
383 |
Das Argument C<$wait> bestimmt, ob auf jeden Fall gewartet werden soll, |
384 |
oder ob C<refill> zurückkehren soll, auch wenn keine neuen Daten verfügbar |
385 |
sind. Letzteres ist außerst selten der Fall und wurde entsprechend |
386 |
ineffizient implementiert, indem ein "leerer" Watcher gestartet wird und |
387 |
dann auf dessen Unloop gewartet wird. |
388 |
|
389 |
Als nächstes in der Hierarchy steht C<getline>, das einfach die nächste |
390 |
Zeile liefert, notfalls durch Warten: |
391 |
|
392 |
!block perl |
393 |
sub getline { |
394 |
my $self = shift; |
395 |
$self->refill(1) while $self->{buff} !~ s/^([^\015\012]*)\015\012//o; |
396 |
$1; |
397 |
} |
398 |
!endblock |
399 |
|
400 |
Sie ist sehr einfach: gibt es schon eine ganze Zeile im Puffer, dann |
401 |
schneide sie heraus und gib sie zurück. Nicht sehr effizient, aber einfach |
402 |
zu benutzen. |
403 |
|
404 |
Sie wird benutzt von C<response>, wo die Zeile in ihre beiden Kompomenten |
405 |
(Statuscode, Meldung) zerlegt wird, und die erste Ziffer des Statuscodes |
406 |
zurückgegeben wird (der für das weitere Vorgehen am entscheidensten ist). |
407 |
|
408 |
!block perl |
409 |
sub response { |
410 |
my $self = shift; |
411 |
@{$self}{'code','message'} = split m/ /, $self->getline, 2; |
412 |
substr $self->{code}, 0, 1; |
413 |
} |
414 |
!endblock |
415 |
|
416 |
H2: Scannen einer Gruppe |
417 |
|
418 |
Um herauszufinden, welche Artikel seit dem letzten Mal neu hinzugekommen sind, |
419 |
wird die Statusmeldung ausgewertet, die der Server beim Wechsel in eine Gruppe liefert: |
420 |
|
421 |
!block example |
422 |
{{BEFEHL }} GROUP comp.lang.perl.moderated |
423 |
{{ANTWORT}} 211 125 4886 5010 comp.lang.perl.moderated group selected |
424 |
!endblock |
425 |
|
426 |
C<211> ist der Statuscode für "O.K.", C<125> ist die Zahl der Artikel, |
427 |
C<4886> ist die erste und C<5010> die letzte Artikelnummer. |
428 |
|
429 |
Dies ist eine ideale Anwendung für C<rcb>: |
430 |
|
431 |
!block perl |
432 |
$self->rcb("GROUP $group", sub { |
433 |
if ($self->response == 2 && $self->{message} =~ /(\d+)\s+(\d+)\s+(\d+)/) { |
434 |
my($count, $first, $last, $name) = ($1, $2, $3, $3); |
435 |
if ($count) { |
436 |
$self->slog("selected group $group"); |
437 |
$self->{group} = $group; |
438 |
$self->{first} = $first; |
439 |
$self->{last} = $last; |
440 |
$cb->($self); |
441 |
return; |
442 |
} else { |
443 |
$self->slog("SKIPPED empty group $group: ", substr($self->{message},0,-1)); |
444 |
} |
445 |
} else { |
446 |
$self->slog("SKIPPED bogus group $group on ".$self->{host}[0].": ", substr($self->{message},0,-1)); |
447 |
} |
448 |
$self->idle; |
449 |
}); |
450 |
!endblock |
451 |
|
452 |
C<rcb> bekommt zwei Argumente übergeben: C<"GROUP $group"> ist das |
453 |
NNTP-Kommando zum Wechseln der (News-) Gruppe, das zweite Argument ist die |
454 |
Callback-Funktion, die die NNTP-Anwtort als Argument bekommt. |
455 |
|
456 |
Die Verwendung einer Closure erlaubt es, Befehl (C<rcb>) und die Reaktion |
457 |
(das C<sub {}>) direkt hintereinander zu schreiben, so, als wäre C<rcb> |
458 |
ein "normaler", blockierender Aufruf zum Lesen einer Zeile, mit dem einzigen Unterschied, daß |
459 |
die Auswertung des Ergebnisses im einem eingerückten Block stattfindent. Anders gesagt, aus: |
460 |
|
461 |
!block perl |
462 |
$response = $self->rcb("GROUP $group"); |
463 |
|
464 |
if ($response....) { |
465 |
} |
466 |
!endblock |
467 |
|
468 |
wird: |
469 |
|
470 |
!block perl |
471 |
$self->rcb(GROUP $group", sub { |
472 |
|
473 |
if ($response....) { |
474 |
} |
475 |
}); |
476 |
!endblock |
477 |
|
478 |
C<rcb> kehrt jedoch sofort zurück (ein C<sub>, daß C<rcb> verwendet, kann |
479 |
deshalb nicht sofort ein Resultat an den Aufrufer zurückliefern. |
480 |
|
481 |
Die Information über die Gruppe (C<first> und C<last>, wird aus der |
482 |
NNTP-Antwort genommen) wird später mit den Daten aus der SQL-Datenbank |
483 |
verglichen (das hat allerdings nichts mit C<Event> zu tun): |
484 |
|
485 |
!block perl |
486 |
sub group_scan { |
487 |
my $self = shift; |
488 |
my $group = $self->{group}; |
489 |
my $todo = new Set::IntSpan $self->{first}."-".$self->{last}; |
490 |
$todo = $todo->intersect($self->gs_done->complement); |
491 |
if ($todo->empty) { |
492 |
$self->slog("[no new articles in $group]"); |
493 |
} else { |
494 |
$self->slog("scanning group $group: ", $todo->run_list); |
495 |
add_job($self->{host},'A',$group,$todo); |
496 |
} |
497 |
$self->idle; |
498 |
} |
499 |
!endblock |
500 |
|
501 |
Das C<Set::IntSpan>-Modul wird dazu benutzt, um aus der Menge der |
502 |
vorhandenen Artikel die bereits gescannten (die von C<gs_done> |
503 |
zurückgegeben werden) zu entfernen. Ist die resultierende Menge nicht |
504 |
leer, wird ein neuer Job ("hole alle diese Artikel") erzeugt. |
505 |
|
506 |
H2: Holen eines Artikels |
507 |
|
508 |
Das Holen geschieht in zwei Stufen. Zuerst wird die {{Message-Id}} mit |
509 |
einem C<STAT>-Befehl ausgewertet. Damit wird außerdem festgestellt, ob ein |
510 |
bestimmter Artikel überhaupt existiert. |
511 |
|
512 |
!block perl |
513 |
$self->rcb("STAT ".$self->{num}, \&got_stat); |
514 |
!endblock |
515 |
|
516 |
Ein Protokollbeispiel: |
517 |
|
518 |
!block example |
519 |
{{BEFEHL }} STAT 5010 |
520 |
{{ANTWORT}} 223 5010 <85j7jc$68n@junior.apk.net> article retrieved - request text separately |
521 |
{{BEFEHL }} STAT 4977 |
522 |
{{ANTWORT}} 430 No such article: 4977 |
523 |
!endblock |
524 |
|
525 |
Der Callback C<got_stat> wertet diese Information aus: |
526 |
|
527 |
!block perl |
528 |
sub got_stat { |
529 |
my $self = shift; |
530 |
my $r = $self->response; |
531 |
$self->mark_article_done; |
532 |
|
533 |
($self->{mid}) = $self->{message} =~ /<([^>]+)>/g; |
534 |
|
535 |
if ($r == 2) { |
536 |
my $aid = sql_fetch("select count(*) from art where mid=? limit 1", "".$self->{mid}); |
537 |
$self->mark_article_present; |
538 |
if ($aid) { |
539 |
sql_exec("replace into lnk values (?,?)", $self->gid, $aid); |
540 |
$self->idle; |
541 |
} else { |
542 |
$busy{$self->{mid}}++; |
543 |
$stat_article++; |
544 |
$self->rcb_dot("ARTICLE ".$self->{num}, \&got_article); |
545 |
} |
546 |
} else { |
547 |
$self->idle; |
548 |
} |
549 |
} |
550 |
!endblock |
551 |
|
552 |
Existiert der Artikel nicht, ist der Job beendet und es wird in den |
553 |
idle-Modus gegangen. Wurde er schon einmal geholt (z.B. in einer anderen Gruppe) |
554 |
wird er nicht noch einmal geholt, sondern lediglich in die Gruppe "gelinkt" (Artikel können sehr groß werden). |
555 |
|
556 |
Ansonsten wird ein C<ARTICLE>-Befehl abgesetzt, mit dem der gesamte Artikel geholt wird. |
557 |
|
558 |
!block example |
559 |
{{BEFEHL }} ARTICLE 5010 |
560 |
{{ANTWORT}} 220 5010 <85j7jc$68n@junior.apk.net> article retrieved - text follows |
561 |
{{ANTWORT}} From: allbery@apk.net (Brandon S. Allbery KF8NH) |
562 |
{{ANTWORT}} Newsgroups: comp.lang.perl.moderated |
563 |
{{ANTWORT}} Subject: Re: Usefulness of Pseudo Hashes |
564 |
{{ANTWORT}} Message-ID: <85j7jc$68n@junior.apk.net> |
565 |
{{ANTWORT}} |
566 |
{{ANTWORT}} Also sprach Alex Rhomberg <rhomberg@ife.ee.ethz.ch> (<384E39B8.D8635949@ife.ee.ethz.ch>): |
567 |
{{ANTWORT}} +----- |
568 |
{{ANTWORT}} | I wonder why pseudo hashes were invented |
569 |
{{ANTWORT}} +--->8 |
570 |
{{ANTWORT}} |
571 |
{{ANTWORT}} Sometimes you need an ordered list (so you can't use hashes) with keyed access |
572 |
{{ANTWORT}} to the list (so lists/arrays are slow and a pain in the butt to use). Pseudo |
573 |
{{ANTWORT}} hashes are a better solution than the usual hack of maintaining duplicate |
574 |
{{ANTWORT}} information in a hash and an array/list. |
575 |
{{ANTWORT}} |
576 |
{{ANTWORT}} -- |
577 |
{{ANTWORT}} brandon s. allbery [os/2][linux][solaris][japh] allbery@kf8nh.apk.net |
578 |
{{ANTWORT}} system administrator [WAY too many hats] allbery@ece.cmu.edu |
579 |
{{ANTWORT}} carnegie mellon / electrical and computer engineering KF8NH |
580 |
{{ANTWORT}} Kiss my bits, Billy-boy. |
581 |
{{ANTWORT}} . |
582 |
!endblock |
583 |
|
584 |
Hierbei tritt das Problem auf, daß nach der Statuszeile ein Artikel |
585 |
folgt. Deshalb wird statt C<rcb> die Methode C<rcb_dot> benutzt (das steht |
586 |
für "read callback + data read until dot"): |
587 |
|
588 |
!block perl |
589 |
sub rcb_dot { |
590 |
my $self = shift; |
591 |
my $cmd = shift; |
592 |
$self->{rcb_cb} = shift; |
593 |
delete $self->{body}; |
594 |
$self->rcb($cmd, sub { |
595 |
if ($self->response == 2) { |
596 |
$self->{w}->cb([$self, 'rcb_cb']); |
597 |
$self->{w}->start; |
598 |
$self->rcb_cb; |
599 |
} else { |
600 |
$self->{rcb_cb}->($self); |
601 |
} |
602 |
}); |
603 |
} |
604 |
|
605 |
sub rcb_cb { |
606 |
my $self = shift; |
607 |
$self->refill(0); |
608 |
if ($self->{buff} =~ s/^\.\015\012|^(.*?)\015\012\.\015\012//s) { |
609 |
$self->{body} .= $1; |
610 |
$self->{w}->stop; |
611 |
$self->{body} =~ s/\015\012/\n/g; |
612 |
$self->{rcb_cb}->($self, delete $self->{body}); |
613 |
} elsif ($self->{buff} =~ s/^(.*\015\012)//s) { |
614 |
$self->{body} .= $1; |
615 |
} |
616 |
} |
617 |
!endblock |
618 |
|
619 |
Der komplizierteste Teil ist C<rcb_cb>, in der die Artikeldaten |
620 |
akkumuliert werden, wozu furchtbare regexes benutzt wurden. Im Gegensatz |
621 |
zu vielen anderen Stellen wurden die Callbacks nicht durch Closures |
622 |
implementiert, da Event+Closures im allgemeinen ein großes Memory-Leak |
623 |
ist (soll ab Event-0.59 besser sein, aber man kann sichs nicht immer |
624 |
ausssuchen). |
625 |
|
626 |
H2: Updaten von SQL-Tabellen |
627 |
|
628 |
Die Aufrufe C<mark_article_done> und C<mark_article_present> markieren |
629 |
einen Artikel in der Datenbank als bearbeitet bzw. vorhanden. Sie setzen |
630 |
einfach ein Element in der entsprechenden C<Set::IntSpan>-Menge. |
631 |
|
632 |
Diese Mengen werden in einer SQL-Tabelle gespeichert. Da sie relativ groß |
633 |
sind (einige Kilobytes), serh häufig geändert werden (bis zu 100 mal pro |
634 |
Sekunde) und der Zielrechner sehr langsam ist, sollten die Tabellen nicht |
635 |
bei jeder Änderung gespeichert werden. Dies wird mit einem C<idle>-Watcher |
636 |
erreicht, der jedesmal gestartet wird, wenn sich die Daten ändern: |
637 |
|
638 |
!block perl |
639 |
my $save_gs = Event->idle( |
640 |
desc => "groupstatus saver", |
641 |
max => 60, |
642 |
cb => sub { |
643 |
$_[0]->w->stop; |
644 |
# zurückschreiben der Tabellen |
645 |
} |
646 |
); |
647 |
$save_gs->stop; |
648 |
|
649 |
sub mark_article_done { |
650 |
my $self = shift; |
651 |
$gs{$self->hid,$self->gid}[0]->insert($self->{num}); |
652 |
$save_gs->start; |
653 |
} |
654 |
!endblock |
655 |
|
656 |
Sollte der Draht so richtig dampfen, sorgt der Timeout von 60 Sekunden |
657 |
dafür, daß bei einem Absturz maximal die letzte Minute fehlt. In der |
658 |
Praxis wird er viel häufiger aufgerufen, nämlich dann, wenn alle |
659 |
einkommenden Verbindungen einmal bedient wurden und noch keine weiteren |
660 |
Daten angekommen sind. |
661 |
|
662 |
H2: Künstliche "Lastsimulation" |
663 |
|
664 |
Da der Test-Server auf der lokalen Maschine lief, mußte künstlich Last |
665 |
erzeugt werden, um einigermaßen wirklichkeitsnahe Ergebnisse zu erhalten. |
666 |
Die größten Zeitfaktoren bei NNTP sind die Latenz zum Server (abhängig von |
667 |
der Entfernung) und die Bandbreite. |
668 |
|
669 |
Um eine künstliche Latenz einzuführen, wird die C<command>-Funktion leicht abgeändert: |
670 |
|
671 |
!block perl |
672 |
sub command { |
673 |
my ($self, $cmd) = @_; |
674 |
Event->timer(after => rand, cb => sub { |
675 |
$_[0]->w->cancel; |
676 |
syswrite $self->{fd}, "$cmd\015\012"; |
677 |
}); |
678 |
} |
679 |
!endblock |
680 |
|
681 |
Statt das Kommando sofort zu verschicken, wird ein kurzer Timer |
682 |
gestartet. Die Verzögerung liegt zwischen 0 und 1 Sekunde (C<rand>) und |
683 |
sorgt für eine Streuung. Ohne diese zufällige Verzögerung würde ein |
684 |
unerwünschtes Bearbietungsmuster entstehen, bei dem effektiv nur ein |
685 |
Scan-Vorgang gleichzeitig stattfindet. |
686 |
|
687 |
Die obige Version von C<command> schneidet in ihrer Kürze recht gut gegen |
688 |
die "normale" Version ab: |
689 |
|
690 |
!block perl |
691 |
sub command { |
692 |
my ($self, $cmd) = @_; |
693 |
syswrite $self->{fd}, "$cmd\015\012"; |
694 |
} |
695 |
!endblock |
696 |
|
697 |
H2: C<NetServer::ProcessTop> |
698 |
|
699 |
Ein recht interessantes Modul ist C<NetServer::ProcessTop>. Wird es |
700 |
benutzt, bindet es sich auf einen TCP-Port, den man per C<telnet> |
701 |
ansprechen kann, um ein C<top>-artiges Listing der Event-Watcher zu |
702 |
bekommen, Außerdem kann man die Watcher edieren. |
703 |
|
704 |
Die Benutzung ist denkbar einfach: |
705 |
|
706 |
!block perl |
707 |
eval { |
708 |
require NetServer::ProcessTop; |
709 |
NetServer::ProcessTop->new(7000); |
710 |
}; |
711 |
!endblock |
712 |
|
713 |
Ein C<telnet localhost 7000> erzeugt dann dieses Bild: |
714 |
|
715 |
!block verbatim |
716 |
get PID=3407 @ cerebro | 14:26:46 [ 60s] |
717 |
10 events; load averages: 0.75, 0.73, 0.00; lag 0% |
718 |
|
719 |
EID PRI STATE RAN TIME CPU TYPE DESCRIPTION P1 |
720 |
0 7 912 0:00 26.6% sys idle |
721 |
3 4 zomb 227 0:00 16.9% io ARTICLE 273573 |
722 |
6 4 zomb 236 0:00 16.6% io ARTICLE 273572 |
723 |
4 4 sleep 232 0:00 16.4% io ARTICLE 273575 |
724 |
5 4 sleep 221 0:00 16.0% io ARTICLE 273574 |
725 |
9 4 wait 117 0:00 7.3% idle groupstatus saver |
726 |
10 4 wait 180 0:00 0.3% idle reschedule hook |
727 |
2 3 sleep 1 0:00 0.0% time Event::Stats |
728 |
1 3 cpu 0 0:00 0.0% io NetServer::ProcessTop::Client localhost |
729 |
7 3 sleep 0 0:00 0.0% io NetServer::ProcessTop |
730 |
8 4 sleep 0 0:00 0.0% io user input |
731 |
0 -1 0 0:00 0.0% sys other processes |
732 |
|
733 |
% |
734 |
!endblock |
735 |
|
736 |
Weil das Modul aber ein potentielles Sicherheitsproblem sein kann, sollte |
737 |
es nur zum Debuggen/Erfreuen verwendet werden. |
738 |
|
739 |
A1: Der Quellcode |
740 |
|
741 |
!include "get"; perl |
742 |
|
743 |
A2: Mehr! |
744 |
|
745 |
Die folgenden Module/Programme/RFCs wurden für das Projekt verwendet. |
746 |
|
747 |
* C<Event> - Event loop processing. {{URL:http://www.cpan.org/}} |
748 |
* C<Set::IntSpan> - Manages sets of integers. {{URL:http://www.cpan.org/}} |
749 |
* C<RFC-977> Network News Transfer Protocol. {{URL:ftp://ftp.isi.edu/in-notes/rfc977.txt}} |
750 |
* C<NetServer::ProcessTop> - Make event loop statistics easily available. {{URL:http://www.cpan.org/}} |
751 |
* C<Time::HiRes> - High resolution ualarm, usleep, and gettimeofday. {{URL:http://www.cpan.org/}} |
752 |
* C<Socket> - load the C socket.h defines and structure manipulators. (Teil der Perl-Distribution). |
753 |
* C<DBI> - Database independent interface for Perl |
754 |
* C<MySQL> SQL-Datebank. {{URL:http://www.mysql.com}}. |
755 |
|
756 |
|