1 |
root |
1.1 |
!init OPT_STYLE="paper" |
2 |
|
|
|
3 |
|
|
!define DOC_NAME "Einführung in Event::" |
4 |
|
|
!define DOC_TYPE "[Vortrag]" |
5 |
|
|
!define DOC_AUTHOR "(c) 2000 Marc Lehmann <pcg@goof.com>" |
6 |
|
|
!build_title |
7 |
|
|
|
8 |
|
|
!block abstract |
9 |
|
|
|
10 |
|
|
Wenn viele Jobs parallel ausgeführt werden sollen, eignet sich das |
11 |
|
|
bekannte fork-Paradigma von Unix nicht mehr: Die Interprozeßkommunikation |
12 |
|
|
und der Mehraufwand an Speicher und Ressourcen überwiegt dir Vorteile der |
13 |
|
|
einfacheren Programmstruktur bei weitem. Diese kurze Einführung in die |
14 |
|
|
Ereignis-gesteuerte Programmierung in Perl zeigt an einem konkreten Beispiel |
15 |
|
|
(News-Scanner), wie einfach sich selbst komplexe Strukturen in Perl |
16 |
|
|
realisieren lassen. |
17 |
|
|
|
18 |
|
|
!endblock |
19 |
|
|
|
20 |
|
|
H1: C<Event> in der Praxis --- oder wie man 500 Newsserver gleichzeitig scannt. |
21 |
|
|
|
22 |
|
|
H2: Ereignis-gesteuerte Programmierung? |
23 |
|
|
|
24 |
|
|
Zur Lösung paralleler ablaufender Prozesse sind heute drei Ansätze |
25 |
|
|
gebräuchlich: |
26 |
|
|
|
27 |
|
|
* Prozesse mit getrenntem Adressraum (z.B. mit C<fork>) |
28 |
|
|
* eng gekoppelte Prozesse mit gemeinsamen Adreßraum (z.B. mit {{1:pthreads}}) |
29 |
|
|
* Ereignis-gesteuerte Prozesse |
30 |
|
|
|
31 |
|
|
Jeder dieser Ansätze hat verschiedene Vor- und Nachteile: Das |
32 |
|
|
C<fork>-Modell ist sehr einfach zu programmieren und eignet sich |
33 |
|
|
besonders für einfache Probleme, die sozusagen in kleine "Stückzahlen" |
34 |
|
|
anfallen. Durch die Abschottung der Prozesse wird eine einfach |
35 |
|
|
Parallelisierung möglich, da die Prozesse (z.B.) auf unterschiedlichen |
36 |
|
|
Rechnern arbeiten können. Größter Nachteil ist die relative aufwendige |
37 |
|
|
Interprozeßkommunikation, die einen großen Overhead nach sich ziehen |
38 |
|
|
kann. |
39 |
|
|
|
40 |
|
|
{{1:Threads}} werden vielfach als das Mittel der Wahl angesehen. Der größte |
41 |
|
|
Vorteil von Threads ist das Vorhandensein mehrere Ablauf-Instanzen, |
42 |
|
|
die getrennt blockieren können. Leider werden Threads in den meisten |
43 |
|
|
Fällen nur dazu mißbraucht, das Blockieren des gesamten Prozesses zu |
44 |
|
|
verhindern (z.B. wenn Daten nicht sofort zur Verfügung stehen), werden |
45 |
|
|
also effektiv nur als Krücke für asynchrone-EA verwendet. Diesen Vorteil |
46 |
|
|
erkauft man sich durch eine zwar schnelle aber dafür extrem komplizierte |
47 |
|
|
Synchronisation innerhalb der Threads. {{1:Threads sind in in den seltensten |
48 |
|
|
Fällen die richtige Wahl für ein Problem.}} |
49 |
|
|
|
50 |
|
|
Ereignis-gesteuerte Programmierung beruht auf dem |
51 |
|
|
{{1:Callback}}-Prinzip: Eine zentrale Anlaufstelle innerhalb des Prozesses |
52 |
|
|
wartet auf Ereignisse (engl. "Events", also z.B. "Daten angekommen", |
53 |
|
|
"Zeit abgelaufen" etc...). Je nach Ereignis werden entsprechende |
54 |
|
|
Callback-Funktionen aufgerufen. Der Vorteil dieses Ansatzes ist eine |
55 |
|
|
übersichtliche Programmstruktur, eine extreme schnelle Kommunikation |
56 |
|
|
(nur ein Prozeß) und ein ressourcenschonendes Endprodukt. Auch dieser |
57 |
|
|
Ansatz hat seine Nachteile. Der größte ist, daß man bei vielen Problemen |
58 |
|
|
"Umdenken" muß, da sich Callbacks eben keine lineare Programmstruktur |
59 |
|
|
verwirklichen läßt ({{1:Closures}} können dabei jedoch helfen). Außerdem |
60 |
|
|
muß man sich bewußt sein, das ein blockierender Funktionsaufruf |
61 |
|
|
(z.B. C<read>) das gesamte Programm anhält. |
62 |
|
|
|
63 |
|
|
H2: Das Problem... |
64 |
|
|
|
65 |
|
|
{{...}}ist oberflächlich betrachtet, recht einfach: Eine (kleine) Menge |
66 |
|
|
von Usenet-Servern soll nach Newsgruppen abgesucht werden. Das kann |
67 |
|
|
auf faire Weise geschehen: man öffnet eine NNTP-Verbindung und schickt |
68 |
|
|
Requests. Dies läßt sich durch Pipelining (senden mehrere Befehle |
69 |
|
|
gleichzeitig) beschleunigen. Durch die Zeiten, die der News-Server |
70 |
|
|
benötigt um Artikel zu suchen, wird die Datenrate in der Praxis allerdings |
71 |
|
|
drastisch beschränkt. |
72 |
|
|
|
73 |
|
|
Also die unfaire Weise: statt einer öffnet man 5, 10 oder gleich mehrere |
74 |
|
|
hundert Verbindungen zu einem (oder mehreren) Servern und verteilt so die |
75 |
|
|
Verbindungslatenz und die Antwortzeit. |
76 |
|
|
|
77 |
|
|
H2: Die Planung |
78 |
|
|
|
79 |
|
|
Die (für mich) naheligende Idee, dies mit mehreren Scanprozessen zu |
80 |
|
|
implementieren, scheiterte an zwei Problemen: |
81 |
|
|
|
82 |
|
|
* Die Scanprozesse müssen sich untereinander absprechen, um Duplikate zu |
83 |
|
|
vermeiden. Dies ist zwangsläufig Interprozeßkommunikation (z.B. über eine |
84 |
|
|
SQL-Datenbank), die sehr aufwendig zu implementieren ist. Hinzu kommt, das |
85 |
|
|
einzelne Jobs zuerst markiert werden müssen ("in Arbeit"), damit sie nicht |
86 |
|
|
von mehreren Prozessen gleichzeitig bearbeitet werden, was jedoch sehr |
87 |
|
|
schwierig ist, wenn man Wert darauf legt, Prozesse beliebig abbrechen zu |
88 |
|
|
können, ohne Artikel zu verlieren. |
89 |
|
|
|
90 |
|
|
* Das Zielsystem, ein Pentium-166-System, hat weder unendlich Rechen- |
91 |
|
|
noch Speicherressourcen. Da Perl von beidem gerne viel nimmt, wäre die |
92 |
|
|
Sättigung schon bei relativ wenigen Verbindungen erreicht. Stichwort Speicher: |
93 |
|
|
jeder Prozeß benötigt einen Interpreter, eine Kopie der |
94 |
|
|
libc-Variablen, eine eigene Kopie des Scanprogramms und seine eigene |
95 |
|
|
SQL-Anbindung. Auch moderne Systeme mit einem effizienten C<fork()> leiden |
96 |
|
|
darunter, da gerade Perl nicht zimperlich mit dem Speicher umgeht. |
97 |
|
|
|
98 |
|
|
Die Lösung (klar!) lag im Event-Modul. Da alle Verbindungen von einem |
99 |
|
|
Prozeß bearbeitet werden, gibt es keine Synchronisationsprobleme. Der |
100 |
|
|
Overhead pro Verbindung beschränkt sich ebenfalls auf einen Hash, und das |
101 |
|
|
umschalten von Prozessen entfällt ebenfalls (schneller). |
102 |
|
|
|
103 |
|
|
H1: Die Implementation |
104 |
|
|
|
105 |
|
|
Die folgenden Abschnitte stellen die wichtigsten "Knotenpunkte" des |
106 |
|
|
Scanprogrammes vor. Jedesmal wird kurz das Problem erläutert und die |
107 |
|
|
Lösung mit Hilfe des C<Event>-Moduls diskutiert. |
108 |
|
|
|
109 |
|
|
H2: Der "Scheduler" |
110 |
|
|
|
111 |
|
|
Der komplizierteste Teil des Programmes ist der Scheduler: Er verteilt |
112 |
|
|
einzelne Jobs auf die Scanner, bzw. beendet das Programm, wenn alle Jobs |
113 |
|
|
abgearbeitet wurden. Es gibt nur zwei Typen von "Jobs": |
114 |
|
|
|
115 |
|
|
* 'S': {{1:S}}canne eine Gruppe. Der Scanner sucht eine bestimmte |
116 |
|
|
Newsgruppe auf dem Server und stellt mit Hilfe einer SQL-Tabelle fest, |
117 |
|
|
welche Artikel(-nummern) noch nicht gescannt wurden. |
118 |
|
|
|
119 |
|
|
* 'A': {{1:A}}rtikel holen. Da Gruppen Tausende von Artikeln enthalten |
120 |
|
|
können, wird nur ein "Job" pro Gruppe erzeugt. Ein Scanner sucht sich eine |
121 |
|
|
Artikelnummer aus, bearbeitet sie und legt die restlichen wieder zurück in |
122 |
|
|
die Warteschlange. |
123 |
|
|
|
124 |
|
|
Ein "Scanner" ist dabei kein Prozeß, sondern nur eine Instanz der |
125 |
|
|
C<Scanner>-Klasse, in der im wesentlichen der Zustand einer Verbindung |
126 |
|
|
gespeichert wird (Server, Port, aktuelle Gruppe...). Für jede potentielle |
127 |
|
|
Verbindung wird ein solches Objekt erzeugt. Für hundert Verbindungen sieht |
128 |
|
|
das z.B. so aus: |
129 |
|
|
|
130 |
|
|
>new Scanner for 1..100; |
131 |
|
|
|
132 |
|
|
Die Objekte reihen sich automatisch in die C<idle>-Warteschlange ein. |
133 |
|
|
|
134 |
|
|
Beim Programmstart werden alle Server- und Gruppen aus einer Datei gelesen und in die Job-Warteschlange eingefügt. Dann wird |
135 |
|
|
in die Hauptschleife gesprungen: |
136 |
|
|
|
137 |
|
|
>Scanner::loop(); # Hauptschleife |
138 |
|
|
|
139 |
|
|
!block perl |
140 |
|
|
sub loop { |
141 |
|
|
while (@queue || @idle < $scanners) { |
142 |
|
|
runq; |
143 |
|
|
Event::loop; |
144 |
|
|
} |
145 |
|
|
} |
146 |
|
|
!endblock |
147 |
|
|
|
148 |
|
|
Dabei stehen die zu bearbeitenden Jobs in C<@queue> und die verfügbaren |
149 |
|
|
Scanner-Objekte in C<@idle>. Solange noch Jobs vorhanden sind (C<@queue != |
150 |
|
|
0>) und nicht alle (C<$scanners>) Scanner idlen, wird C<runq> aufgerufen |
151 |
|
|
und in die Hauptschleife von C<Event> gesprungen. |
152 |
|
|
|
153 |
|
|
C<runq> (das steht für "run queue") nimmt Jobs aus der Warteschlange und |
154 |
|
|
teilt sie verfügbaren Scannern zu. Der Algorithmus ist sehr primitiv |
155 |
|
|
(FCFS) und könnte wesentlich verbessert werden. Wichtig ist, daß die |
156 |
|
|
Lastverteilung in diesen wenigen Zeilen stattfindet und sehr gut |
157 |
|
|
lokalisiert und damit sehr einfach änderbar ist. |
158 |
|
|
|
159 |
|
|
!block perl |
160 |
|
|
sub runq { |
161 |
|
|
while (@queue && @idle) { |
162 |
|
|
my $c = pop @queue; |
163 |
|
|
my $s = pop @idle; |
164 |
|
|
$s->run(@$c); |
165 |
|
|
} |
166 |
|
|
Event::unloop_all unless @queue || @idle < $scanners; |
167 |
|
|
} |
168 |
|
|
!endblock |
169 |
|
|
|
170 |
|
|
Der Aufruf von C<unloop_all> beendet alle Event-Schleifen, wenn alle Jobs abgearbeitet wurden. |
171 |
|
|
|
172 |
|
|
H2: Job Management & Rescheduling |
173 |
|
|
|
174 |
|
|
Um neue Jobs in das System einzufügen, gibt die Funktion C<add_job>: |
175 |
|
|
|
176 |
|
|
!block perl |
177 |
|
|
sub add_job { |
178 |
|
|
push @queue, [@_]; |
179 |
|
|
$reschedule->start if @idle; |
180 |
|
|
} |
181 |
|
|
!endblock |
182 |
|
|
|
183 |
|
|
Die wichtigste Teil ist der Aufruf von {{C:$reschedule->start}}: Wenn |
184 |
|
|
ein Scanner verfügbar ist (C<@idle> nicht leer ist), muß der Scheduler |
185 |
|
|
aufgerufen werden. Da der Aufruf von C<add_job> sehr häufig ist, und |
186 |
|
|
der Scheduler (C<loop>) eine Rekursion bedeutet, wird er nicht direkt |
187 |
|
|
aufgerufen, sondern nur, wenn sonst keine Ereignisse anliegen. Dies wird |
188 |
|
|
mit einem C<idle>-Event-Handler erreicht, der in der globalen Variable |
189 |
|
|
C<$reschedule> steht: |
190 |
|
|
|
191 |
|
|
!block perl |
192 |
|
|
my $reschedule = Event->idle( |
193 |
|
|
desc => "reschedule hook", |
194 |
|
|
max => 5, |
195 |
|
|
cb => sub { |
196 |
|
|
$_[0]->w->stop; |
197 |
|
|
Event::unloop; |
198 |
|
|
} |
199 |
|
|
); |
200 |
|
|
$reschedule->stop; |
201 |
|
|
!endblock |
202 |
|
|
|
203 |
|
|
{{C:Event->idle}} ist der {{Konstruktor}}, der einen Ereignis-Handler |
204 |
|
|
vom Typ "idle" erzeugt. Die einzelnen Attribute bedeuten: |
205 |
|
|
|
206 |
|
|
!block table |
207 |
|
|
Attribut Beschreibung |
208 |
|
|
desc Eine Beschreibung, z.B. für das C<NetServer::ProcessTop>-Modul. |
209 |
|
|
max Zeit (in Sekunden) nach dem der Callback {{auf jeden Fall}} ausgeführt wird. |
210 |
|
|
cb Die Callback-Funktion, die aufgerufen wird. |
211 |
|
|
!endblock |
212 |
|
|
|
213 |
|
|
Übertragen auf den C<$rescheduler> bedeutet dies, daß aus der |
214 |
|
|
Event-Schleife gesprungen wird, wenn gerade kein Datentransfer oder |
215 |
|
|
sonstige Aufgaben anliegen, {{oder nach fünf Sekunden}}, je nachdem, |
216 |
|
|
was früher eintrit. Diese Einschränkung verhindert, das ein schnell |
217 |
|
|
eintreffender Artikel den gesamten Prozeß "am Laufen hält" und damit |
218 |
|
|
verhindert, das freie (idle) Scanner nicht mit neuen Jobs versorgt werden. |
219 |
|
|
|
220 |
|
|
Wenn der Callback angesprungen wird, bekommt er ein {{Ereignis-Objekt}} |
221 |
|
|
übergeben (unter X entspricht dies einem C<XEvent>, bei Gtk ist es ein |
222 |
|
|
C<Gdk::Event>). Als erstes sucht er über dieses Ereignis-Objekt (in |
223 |
|
|
C<$_[0]>) den ursprünglichen {{Watcher}} ({{C:$_[0]->w}}, "w" steht für |
224 |
|
|
"watcher") und ruft die C<stop>-Methode auf. Damit wird erreicht, daß |
225 |
|
|
der Callback nicht mehr aufgerufen wird, bis er das nächste mal gestartet |
226 |
|
|
wird (z.B. in C<add_job>). {{C:$_[0]->w->stop}} ist übrigens das gleiche |
227 |
|
|
wie {{C:$rescheduler->stop}}, die Variable C<$rescheduler> ist wegen C<my> |
228 |
|
|
jedoch erst {{nach}} dem Aufruf des Konstruktors sichtbar. |
229 |
|
|
|
230 |
|
|
Das zweite (und wichtigste) was der Callback unternimmt, ist, den |
231 |
|
|
eigentlichen Scheduler wieder anzuspringen C<loop>. In C<loop> wurde die |
232 |
|
|
Hauptschleife des Event-Moduls aufgerufen (C<Event::loop>): C<unloop> ist |
233 |
|
|
das Gegenstück dazu und springt aus dieser Schleife heraus, so daß der |
234 |
|
|
Scheduler neue Jobs verteilen kann. |
235 |
|
|
|
236 |
|
|
H3: Beendigung eines Jobs |
237 |
|
|
|
238 |
|
|
Wenn ein Scanner-Objekt einen Job verarbeitet hat, muß es sich wieder in |
239 |
|
|
die C<@idle>-Queue eintragen: |
240 |
|
|
|
241 |
|
|
!block perl |
242 |
|
|
sub idle { |
243 |
|
|
my $self = shift; |
244 |
|
|
push @idle, $self; |
245 |
|
|
$reschedule->start; |
246 |
|
|
} |
247 |
|
|
!endblock |
248 |
|
|
|
249 |
|
|
der Aufbau gleicht C<add_job>. |
250 |
|
|
|
251 |
|
|
H2: Die Jobschleife |
252 |
|
|
|
253 |
|
|
Für die Abarbeitung der Jobs ist die Methode C<run> zuständig. Sie hat |
254 |
|
|
mindestens drei Parameter: C<self> (das Scanner-Objekt), C<host> (der |
255 |
|
|
NNTP-Server, inkl. Port) und C<cmd> (der Jobtyp). |
256 |
|
|
|
257 |
|
|
Da das NNTP-Protokoll "stateful" ist, muß der aktuelle NNTP-Server und |
258 |
|
|
die aktuelle Gruppe gespeichert werden. Gilt der neue Job für denselben |
259 |
|
|
Rechner und dieselbe Gruppe (der Normalfall) passiert nichts, ansonsten |
260 |
|
|
wird die Verbindung zum NNTP-Server neu aufgebaut, bzw. die Gruppe |
261 |
|
|
gewechselt. |
262 |
|
|
|
263 |
|
|
Das Aufbauen der NNTP-Verbindung ist ein Problem für den Event-Ansatz: ein |
264 |
|
|
C<connect>-Aufruf {{blockiert}} den Prozeß, bis entweder die Verbindung |
265 |
|
|
steht oder ein Fehler passiert. Da ein solcher C<connect> einige Sekunden |
266 |
|
|
benötigen kann (bei Netzwerkproblemen auch wesentlich länger), müssen sog. |
267 |
|
|
"non-blocking-calls" verwendet werden. |
268 |
|
|
|
269 |
|
|
Das ist auch der Grund, weshalb das Programm auf Standardmodule wie |
270 |
|
|
C<IO::Socket> oder C<Net::NNTP> verzichten muß: Unterstützung für |
271 |
|
|
nicht-blockierende Aufrufe ist kaum oder überhaupt nicht vorhanden. Das |
272 |
|
|
C<Net::NNTP>-Modul ist in dieser Hinsicht besonders schlecht, dnen |
273 |
|
|
man kann die entsprechende Methoden nicht einfach in einer Subklasse |
274 |
|
|
überschreiben. |
275 |
|
|
|
276 |
|
|
Der schwierigste Teil war der Aufruf von C<connect>, der ebenfalls nicht |
277 |
|
|
blockieren sollte: |
278 |
|
|
|
279 |
|
|
!block perl |
280 |
|
|
if (socket $fd, PF_INET, SOCK_STREAM, getprotobyname 'tcp') { |
281 |
|
|
sub TCP_NODELAY(){1} sub SOL_TCP(){6}; # linux-2.2 |
282 |
|
|
setsockopt $fd, SOL_TCP, TCP_NODELAY, 1; |
283 |
|
|
fcntl $fd, F_SETFL, O_NONBLOCK; |
284 |
|
|
connect $fd, sockaddr_in $port, inet_aton($ip); |
285 |
|
|
fcntl $fd, F_SETFL, 0; |
286 |
|
|
} else { |
287 |
|
|
undef $fd; |
288 |
|
|
} |
289 |
|
|
!endblock |
290 |
|
|
|
291 |
|
|
Einige Konstanten (z.B. C<SOL_TCP>) sind in Perl nicht einfach zu |
292 |
|
|
bekommen. Da das Script mehr ein Hack als eine professionelle Anwendung |
293 |
|
|
ist, wurden sie einfach hardcodiert. |
294 |
|
|
|
295 |
|
|
Wenn der Server gewechselt wird, wechselt auch der Filehandle, so daß eine neuer |
296 |
|
|
Event-Watcher erzeugt werden muß: |
297 |
|
|
|
298 |
|
|
!block perl |
299 |
|
|
($self->{w} = Event->io(fd => fileno $fd, poll => 'r'))->stop; |
300 |
|
|
!endblock |
301 |
|
|
|
302 |
|
|
H2: NNTP-Befehle |
303 |
|
|
|
304 |
|
|
Das NNTP-Protokoll ist sehr einfach: Kommandos bestehen aus einer |
305 |
|
|
einzelnen Textzeile, Antworten aus einem Zifferncode und einer |
306 |
|
|
beschreibenden Textzeile. Artikel werden als Textblock übertragen, wobei |
307 |
|
|
die letzte Zeile einen einzelnen Punkt als Endekennung enthält. |
308 |
|
|
|
309 |
|
|
Das Absetzen eines Befehls geschieht über die Methode C<rcb>. Ihr werden |
310 |
|
|
zwei Argumente übergeben, das Kommando (ohne Zeilenende) und eine |
311 |
|
|
{{1:Callback}}-Funktion. Das Kommando wird an den NNTP-Server geschickt, |
312 |
|
|
die Callback-Funktion wird aufgerufen, wenn die erste Zeile der Antwort |
313 |
|
|
angekommen ist (mit dem Statuscode). |
314 |
|
|
|
315 |
|
|
Dies wird erreicht, indem der Event-Watcher für die NNTP-Verbindung |
316 |
|
|
(C<$self->{w}>) gefüttert und gestartet wird: |
317 |
|
|
|
318 |
|
|
!block perl |
319 |
|
|
sub rcb { |
320 |
|
|
my $self = shift; |
321 |
|
|
my $cmd = shift; |
322 |
|
|
my $cb = shift; |
323 |
|
|
if ($cmd) { |
324 |
|
|
$self->command($cmd); |
325 |
|
|
} else { |
326 |
|
|
$cmd = "<anonymous command>"; |
327 |
|
|
} |
328 |
|
|
|
329 |
|
|
$self->{w}->desc($cmd); |
330 |
|
|
$self->{w}->cb(sub { |
331 |
|
|
$self->{w}->stop; |
332 |
|
|
$cb->($self); |
333 |
|
|
}); |
334 |
|
|
$self->{w}->start; |
335 |
|
|
} |
336 |
|
|
!endblock |
337 |
|
|
|
338 |
|
|
Falls eine Befehl (C<$cmd>) übergeben wurde, wird dieser über die Leitung |
339 |
|
|
gepustet (C<$self->command>) und als beschreibender Text verwendet. Mit |
340 |
|
|
C<desc> wird diese Beschreibung gesetzt (hilfreich zum Debuggen oder |
341 |
|
|
Tollfühlen, wenn es hinterher funktioniert). |
342 |
|
|
|
343 |
|
|
Dann wird der Callback (C<cb>) gesetzt, der lediglich den Watcher |
344 |
|
|
stoppt (Befehle sind einmalige Angelegenheiten) und die {{eigentliche}} |
345 |
|
|
Callback-Funktion aufruft, und schließlich wird der Watcher gestartet. |
346 |
|
|
|
347 |
|
|
H2: Lesen der Antwort |
348 |
|
|
|
349 |
|
|
Der schwierigste Teil des Skriptes ist das zeilenweise Lesen, das vom |
350 |
|
|
NNTP-Protokoll vorausgesetzt wird. Da Perl von sich aus (noch) keinerlei |
351 |
|
|
Support dafür anbietet ({{C:<>}} blockiert den Prozeß oder liefert |
352 |
|
|
keine ganzen Zeilen zurück), mußte das Zusammensetzen der Zeilen selbst |
353 |
|
|
implementiert werden. |
354 |
|
|
|
355 |
|
|
Grundlage dafür ist die Methode C<refill>, die alle Zeichen liest, die |
356 |
|
|
angekommen sind (ohne zu blockieren) und sie in einem Puffer ablegt: |
357 |
|
|
|
358 |
|
|
!block perl |
359 |
|
|
sub refill { |
360 |
|
|
my $self = shift; |
361 |
|
|
my $wait = shift; |
362 |
|
|
my $fd = $self->{fd}; |
363 |
|
|
fcntl $fd, F_SETFL, O_NONBLOCK; |
364 |
|
|
for(;;) { |
365 |
|
|
my $r = sysread $fd, $self->{buff}, 32768, length $self->{buff}; |
366 |
|
|
if ($r>0) { |
367 |
|
|
last; |
368 |
|
|
} elsif (!defined $r && $! == EAGAIN) { |
369 |
|
|
last unless $wait; |
370 |
|
|
$self->{w}->cb(sub { $self->{w}->stop; Event::unloop }); |
371 |
|
|
$self->{w}->start; |
372 |
|
|
Event::loop(); |
373 |
|
|
} else { |
374 |
|
|
$self->{buff} = "500 I/O error: $!\015\012.\015\012"; |
375 |
|
|
delete $self->{host}; |
376 |
|
|
last; |
377 |
|
|
} |
378 |
|
|
} |
379 |
|
|
fcntl $fd, F_SETFL, 0; |
380 |
|
|
} |
381 |
|
|
!endblock |
382 |
|
|
|
383 |
|
|
Das Argument C<$wait> bestimmt, ob auf jeden Fall gewartet werden soll, |
384 |
|
|
oder ob C<refill> zurückkehren soll, auch wenn keine neuen Daten verfügbar |
385 |
|
|
sind. Letzteres ist außerst selten der Fall und wurde entsprechend |
386 |
|
|
ineffizient implementiert, indem ein "leerer" Watcher gestartet wird und |
387 |
|
|
dann auf dessen Unloop gewartet wird. |
388 |
|
|
|
389 |
|
|
Als nächstes in der Hierarchy steht C<getline>, das einfach die nächste |
390 |
|
|
Zeile liefert, notfalls durch Warten: |
391 |
|
|
|
392 |
|
|
!block perl |
393 |
|
|
sub getline { |
394 |
|
|
my $self = shift; |
395 |
|
|
$self->refill(1) while $self->{buff} !~ s/^([^\015\012]*)\015\012//o; |
396 |
|
|
$1; |
397 |
|
|
} |
398 |
|
|
!endblock |
399 |
|
|
|
400 |
|
|
Sie ist sehr einfach: gibt es schon eine ganze Zeile im Puffer, dann |
401 |
|
|
schneide sie heraus und gib sie zurück. Nicht sehr effizient, aber einfach |
402 |
|
|
zu benutzen. |
403 |
|
|
|
404 |
|
|
Sie wird benutzt von C<response>, wo die Zeile in ihre beiden Kompomenten |
405 |
|
|
(Statuscode, Meldung) zerlegt wird, und die erste Ziffer des Statuscodes |
406 |
|
|
zurückgegeben wird (der für das weitere Vorgehen am entscheidensten ist). |
407 |
|
|
|
408 |
|
|
!block perl |
409 |
|
|
sub response { |
410 |
|
|
my $self = shift; |
411 |
|
|
@{$self}{'code','message'} = split m/ /, $self->getline, 2; |
412 |
|
|
substr $self->{code}, 0, 1; |
413 |
|
|
} |
414 |
|
|
!endblock |
415 |
|
|
|
416 |
|
|
H2: Scannen einer Gruppe |
417 |
|
|
|
418 |
|
|
Um herauszufinden, welche Artikel seit dem letzten Mal neu hinzugekommen sind, |
419 |
|
|
wird die Statusmeldung ausgewertet, die der Server beim Wechsel in eine Gruppe liefert: |
420 |
|
|
|
421 |
|
|
!block example |
422 |
|
|
{{BEFEHL }} GROUP comp.lang.perl.moderated |
423 |
|
|
{{ANTWORT}} 211 125 4886 5010 comp.lang.perl.moderated group selected |
424 |
|
|
!endblock |
425 |
|
|
|
426 |
|
|
C<211> ist der Statuscode für "O.K.", C<125> ist die Zahl der Artikel, |
427 |
|
|
C<4886> ist die erste und C<5010> die letzte Artikelnummer. |
428 |
|
|
|
429 |
|
|
Dies ist eine ideale Anwendung für C<rcb>: |
430 |
|
|
|
431 |
|
|
!block perl |
432 |
|
|
$self->rcb("GROUP $group", sub { |
433 |
|
|
if ($self->response == 2 && $self->{message} =~ /(\d+)\s+(\d+)\s+(\d+)/) { |
434 |
|
|
my($count, $first, $last, $name) = ($1, $2, $3, $3); |
435 |
|
|
if ($count) { |
436 |
|
|
$self->slog("selected group $group"); |
437 |
|
|
$self->{group} = $group; |
438 |
|
|
$self->{first} = $first; |
439 |
|
|
$self->{last} = $last; |
440 |
|
|
$cb->($self); |
441 |
|
|
return; |
442 |
|
|
} else { |
443 |
|
|
$self->slog("SKIPPED empty group $group: ", substr($self->{message},0,-1)); |
444 |
|
|
} |
445 |
|
|
} else { |
446 |
|
|
$self->slog("SKIPPED bogus group $group on ".$self->{host}[0].": ", substr($self->{message},0,-1)); |
447 |
|
|
} |
448 |
|
|
$self->idle; |
449 |
|
|
}); |
450 |
|
|
!endblock |
451 |
|
|
|
452 |
|
|
C<rcb> bekommt zwei Argumente übergeben: C<"GROUP $group"> ist das |
453 |
|
|
NNTP-Kommando zum Wechseln der (News-) Gruppe, das zweite Argument ist die |
454 |
|
|
Callback-Funktion, die die NNTP-Anwtort als Argument bekommt. |
455 |
|
|
|
456 |
|
|
Die Verwendung einer Closure erlaubt es, Befehl (C<rcb>) und die Reaktion |
457 |
|
|
(das C<sub {}>) direkt hintereinander zu schreiben, so, als wäre C<rcb> |
458 |
|
|
ein "normaler", blockierender Aufruf zum Lesen einer Zeile, mit dem einzigen Unterschied, daß |
459 |
|
|
die Auswertung des Ergebnisses im einem eingerückten Block stattfindent. Anders gesagt, aus: |
460 |
|
|
|
461 |
|
|
!block perl |
462 |
|
|
$response = $self->rcb("GROUP $group"); |
463 |
|
|
|
464 |
|
|
if ($response....) { |
465 |
|
|
} |
466 |
|
|
!endblock |
467 |
|
|
|
468 |
|
|
wird: |
469 |
|
|
|
470 |
|
|
!block perl |
471 |
|
|
$self->rcb(GROUP $group", sub { |
472 |
|
|
|
473 |
|
|
if ($response....) { |
474 |
|
|
} |
475 |
|
|
}); |
476 |
|
|
!endblock |
477 |
|
|
|
478 |
|
|
C<rcb> kehrt jedoch sofort zurück (ein C<sub>, daß C<rcb> verwendet, kann |
479 |
|
|
deshalb nicht sofort ein Resultat an den Aufrufer zurückliefern. |
480 |
|
|
|
481 |
|
|
Die Information über die Gruppe (C<first> und C<last>, wird aus der |
482 |
|
|
NNTP-Antwort genommen) wird später mit den Daten aus der SQL-Datenbank |
483 |
|
|
verglichen (das hat allerdings nichts mit C<Event> zu tun): |
484 |
|
|
|
485 |
|
|
!block perl |
486 |
|
|
sub group_scan { |
487 |
|
|
my $self = shift; |
488 |
|
|
my $group = $self->{group}; |
489 |
|
|
my $todo = new Set::IntSpan $self->{first}."-".$self->{last}; |
490 |
|
|
$todo = $todo->intersect($self->gs_done->complement); |
491 |
|
|
if ($todo->empty) { |
492 |
|
|
$self->slog("[no new articles in $group]"); |
493 |
|
|
} else { |
494 |
|
|
$self->slog("scanning group $group: ", $todo->run_list); |
495 |
|
|
add_job($self->{host},'A',$group,$todo); |
496 |
|
|
} |
497 |
|
|
$self->idle; |
498 |
|
|
} |
499 |
|
|
!endblock |
500 |
|
|
|
501 |
|
|
Das C<Set::IntSpan>-Modul wird dazu benutzt, um aus der Menge der |
502 |
|
|
vorhandenen Artikel die bereits gescannten (die von C<gs_done> |
503 |
|
|
zurückgegeben werden) zu entfernen. Ist die resultierende Menge nicht |
504 |
|
|
leer, wird ein neuer Job ("hole alle diese Artikel") erzeugt. |
505 |
|
|
|
506 |
|
|
H2: Holen eines Artikels |
507 |
|
|
|
508 |
|
|
Das Holen geschieht in zwei Stufen. Zuerst wird die {{Message-Id}} mit |
509 |
|
|
einem C<STAT>-Befehl ausgewertet. Damit wird außerdem festgestellt, ob ein |
510 |
|
|
bestimmter Artikel überhaupt existiert. |
511 |
|
|
|
512 |
|
|
!block perl |
513 |
|
|
$self->rcb("STAT ".$self->{num}, \&got_stat); |
514 |
|
|
!endblock |
515 |
|
|
|
516 |
|
|
Ein Protokollbeispiel: |
517 |
|
|
|
518 |
|
|
!block example |
519 |
|
|
{{BEFEHL }} STAT 5010 |
520 |
|
|
{{ANTWORT}} 223 5010 <85j7jc$68n@junior.apk.net> article retrieved - request text separately |
521 |
|
|
{{BEFEHL }} STAT 4977 |
522 |
|
|
{{ANTWORT}} 430 No such article: 4977 |
523 |
|
|
!endblock |
524 |
|
|
|
525 |
|
|
Der Callback C<got_stat> wertet diese Information aus: |
526 |
|
|
|
527 |
|
|
!block perl |
528 |
|
|
sub got_stat { |
529 |
|
|
my $self = shift; |
530 |
|
|
my $r = $self->response; |
531 |
|
|
$self->mark_article_done; |
532 |
|
|
|
533 |
|
|
($self->{mid}) = $self->{message} =~ /<([^>]+)>/g; |
534 |
|
|
|
535 |
|
|
if ($r == 2) { |
536 |
|
|
my $aid = sql_fetch("select count(*) from art where mid=? limit 1", "".$self->{mid}); |
537 |
|
|
$self->mark_article_present; |
538 |
|
|
if ($aid) { |
539 |
|
|
sql_exec("replace into lnk values (?,?)", $self->gid, $aid); |
540 |
|
|
$self->idle; |
541 |
|
|
} else { |
542 |
|
|
$busy{$self->{mid}}++; |
543 |
|
|
$stat_article++; |
544 |
|
|
$self->rcb_dot("ARTICLE ".$self->{num}, \&got_article); |
545 |
|
|
} |
546 |
|
|
} else { |
547 |
|
|
$self->idle; |
548 |
|
|
} |
549 |
|
|
} |
550 |
|
|
!endblock |
551 |
|
|
|
552 |
|
|
Existiert der Artikel nicht, ist der Job beendet und es wird in den |
553 |
|
|
idle-Modus gegangen. Wurde er schon einmal geholt (z.B. in einer anderen Gruppe) |
554 |
|
|
wird er nicht noch einmal geholt, sondern lediglich in die Gruppe "gelinkt" (Artikel können sehr groß werden). |
555 |
|
|
|
556 |
|
|
Ansonsten wird ein C<ARTICLE>-Befehl abgesetzt, mit dem der gesamte Artikel geholt wird. |
557 |
|
|
|
558 |
|
|
!block example |
559 |
|
|
{{BEFEHL }} ARTICLE 5010 |
560 |
|
|
{{ANTWORT}} 220 5010 <85j7jc$68n@junior.apk.net> article retrieved - text follows |
561 |
|
|
{{ANTWORT}} From: allbery@apk.net (Brandon S. Allbery KF8NH) |
562 |
|
|
{{ANTWORT}} Newsgroups: comp.lang.perl.moderated |
563 |
|
|
{{ANTWORT}} Subject: Re: Usefulness of Pseudo Hashes |
564 |
|
|
{{ANTWORT}} Message-ID: <85j7jc$68n@junior.apk.net> |
565 |
|
|
{{ANTWORT}} |
566 |
|
|
{{ANTWORT}} Also sprach Alex Rhomberg <rhomberg@ife.ee.ethz.ch> (<384E39B8.D8635949@ife.ee.ethz.ch>): |
567 |
|
|
{{ANTWORT}} +----- |
568 |
|
|
{{ANTWORT}} | I wonder why pseudo hashes were invented |
569 |
|
|
{{ANTWORT}} +--->8 |
570 |
|
|
{{ANTWORT}} |
571 |
|
|
{{ANTWORT}} Sometimes you need an ordered list (so you can't use hashes) with keyed access |
572 |
|
|
{{ANTWORT}} to the list (so lists/arrays are slow and a pain in the butt to use). Pseudo |
573 |
|
|
{{ANTWORT}} hashes are a better solution than the usual hack of maintaining duplicate |
574 |
|
|
{{ANTWORT}} information in a hash and an array/list. |
575 |
|
|
{{ANTWORT}} |
576 |
|
|
{{ANTWORT}} -- |
577 |
|
|
{{ANTWORT}} brandon s. allbery [os/2][linux][solaris][japh] allbery@kf8nh.apk.net |
578 |
|
|
{{ANTWORT}} system administrator [WAY too many hats] allbery@ece.cmu.edu |
579 |
|
|
{{ANTWORT}} carnegie mellon / electrical and computer engineering KF8NH |
580 |
|
|
{{ANTWORT}} Kiss my bits, Billy-boy. |
581 |
|
|
{{ANTWORT}} . |
582 |
|
|
!endblock |
583 |
|
|
|
584 |
|
|
Hierbei tritt das Problem auf, daß nach der Statuszeile ein Artikel |
585 |
|
|
folgt. Deshalb wird statt C<rcb> die Methode C<rcb_dot> benutzt (das steht |
586 |
|
|
für "read callback + data read until dot"): |
587 |
|
|
|
588 |
|
|
!block perl |
589 |
|
|
sub rcb_dot { |
590 |
|
|
my $self = shift; |
591 |
|
|
my $cmd = shift; |
592 |
|
|
$self->{rcb_cb} = shift; |
593 |
|
|
delete $self->{body}; |
594 |
|
|
$self->rcb($cmd, sub { |
595 |
|
|
if ($self->response == 2) { |
596 |
|
|
$self->{w}->cb([$self, 'rcb_cb']); |
597 |
|
|
$self->{w}->start; |
598 |
|
|
$self->rcb_cb; |
599 |
|
|
} else { |
600 |
|
|
$self->{rcb_cb}->($self); |
601 |
|
|
} |
602 |
|
|
}); |
603 |
|
|
} |
604 |
|
|
|
605 |
|
|
sub rcb_cb { |
606 |
|
|
my $self = shift; |
607 |
|
|
$self->refill(0); |
608 |
|
|
if ($self->{buff} =~ s/^\.\015\012|^(.*?)\015\012\.\015\012//s) { |
609 |
|
|
$self->{body} .= $1; |
610 |
|
|
$self->{w}->stop; |
611 |
|
|
$self->{body} =~ s/\015\012/\n/g; |
612 |
|
|
$self->{rcb_cb}->($self, delete $self->{body}); |
613 |
|
|
} elsif ($self->{buff} =~ s/^(.*\015\012)//s) { |
614 |
|
|
$self->{body} .= $1; |
615 |
|
|
} |
616 |
|
|
} |
617 |
|
|
!endblock |
618 |
|
|
|
619 |
|
|
Der komplizierteste Teil ist C<rcb_cb>, in der die Artikeldaten |
620 |
|
|
akkumuliert werden, wozu furchtbare regexes benutzt wurden. Im Gegensatz |
621 |
|
|
zu vielen anderen Stellen wurden die Callbacks nicht durch Closures |
622 |
|
|
implementiert, da Event+Closures im allgemeinen ein großes Memory-Leak |
623 |
|
|
ist (soll ab Event-0.59 besser sein, aber man kann sichs nicht immer |
624 |
|
|
ausssuchen). |
625 |
|
|
|
626 |
|
|
H2: Updaten von SQL-Tabellen |
627 |
|
|
|
628 |
|
|
Die Aufrufe C<mark_article_done> und C<mark_article_present> markieren |
629 |
|
|
einen Artikel in der Datenbank als bearbeitet bzw. vorhanden. Sie setzen |
630 |
|
|
einfach ein Element in der entsprechenden C<Set::IntSpan>-Menge. |
631 |
|
|
|
632 |
|
|
Diese Mengen werden in einer SQL-Tabelle gespeichert. Da sie relativ groß |
633 |
|
|
sind (einige Kilobytes), serh häufig geändert werden (bis zu 100 mal pro |
634 |
|
|
Sekunde) und der Zielrechner sehr langsam ist, sollten die Tabellen nicht |
635 |
|
|
bei jeder Änderung gespeichert werden. Dies wird mit einem C<idle>-Watcher |
636 |
|
|
erreicht, der jedesmal gestartet wird, wenn sich die Daten ändern: |
637 |
|
|
|
638 |
|
|
!block perl |
639 |
|
|
my $save_gs = Event->idle( |
640 |
|
|
desc => "groupstatus saver", |
641 |
|
|
max => 60, |
642 |
|
|
cb => sub { |
643 |
|
|
$_[0]->w->stop; |
644 |
|
|
# zurückschreiben der Tabellen |
645 |
|
|
} |
646 |
|
|
); |
647 |
|
|
$save_gs->stop; |
648 |
|
|
|
649 |
|
|
sub mark_article_done { |
650 |
|
|
my $self = shift; |
651 |
|
|
$gs{$self->hid,$self->gid}[0]->insert($self->{num}); |
652 |
|
|
$save_gs->start; |
653 |
|
|
} |
654 |
|
|
!endblock |
655 |
|
|
|
656 |
|
|
Sollte der Draht so richtig dampfen, sorgt der Timeout von 60 Sekunden |
657 |
|
|
dafür, daß bei einem Absturz maximal die letzte Minute fehlt. In der |
658 |
|
|
Praxis wird er viel häufiger aufgerufen, nämlich dann, wenn alle |
659 |
|
|
einkommenden Verbindungen einmal bedient wurden und noch keine weiteren |
660 |
|
|
Daten angekommen sind. |
661 |
|
|
|
662 |
|
|
H2: Künstliche "Lastsimulation" |
663 |
|
|
|
664 |
|
|
Da der Test-Server auf der lokalen Maschine lief, mußte künstlich Last |
665 |
|
|
erzeugt werden, um einigermaßen wirklichkeitsnahe Ergebnisse zu erhalten. |
666 |
|
|
Die größten Zeitfaktoren bei NNTP sind die Latenz zum Server (abhängig von |
667 |
|
|
der Entfernung) und die Bandbreite. |
668 |
|
|
|
669 |
|
|
Um eine künstliche Latenz einzuführen, wird die C<command>-Funktion leicht abgeändert: |
670 |
|
|
|
671 |
|
|
!block perl |
672 |
|
|
sub command { |
673 |
|
|
my ($self, $cmd) = @_; |
674 |
|
|
Event->timer(after => rand, cb => sub { |
675 |
|
|
$_[0]->w->cancel; |
676 |
|
|
syswrite $self->{fd}, "$cmd\015\012"; |
677 |
|
|
}); |
678 |
|
|
} |
679 |
|
|
!endblock |
680 |
|
|
|
681 |
|
|
Statt das Kommando sofort zu verschicken, wird ein kurzer Timer |
682 |
|
|
gestartet. Die Verzögerung liegt zwischen 0 und 1 Sekunde (C<rand>) und |
683 |
|
|
sorgt für eine Streuung. Ohne diese zufällige Verzögerung würde ein |
684 |
|
|
unerwünschtes Bearbietungsmuster entstehen, bei dem effektiv nur ein |
685 |
|
|
Scan-Vorgang gleichzeitig stattfindet. |
686 |
|
|
|
687 |
|
|
Die obige Version von C<command> schneidet in ihrer Kürze recht gut gegen |
688 |
|
|
die "normale" Version ab: |
689 |
|
|
|
690 |
|
|
!block perl |
691 |
|
|
sub command { |
692 |
|
|
my ($self, $cmd) = @_; |
693 |
|
|
syswrite $self->{fd}, "$cmd\015\012"; |
694 |
|
|
} |
695 |
|
|
!endblock |
696 |
|
|
|
697 |
|
|
H2: C<NetServer::ProcessTop> |
698 |
|
|
|
699 |
|
|
Ein recht interessantes Modul ist C<NetServer::ProcessTop>. Wird es |
700 |
|
|
benutzt, bindet es sich auf einen TCP-Port, den man per C<telnet> |
701 |
|
|
ansprechen kann, um ein C<top>-artiges Listing der Event-Watcher zu |
702 |
|
|
bekommen, Außerdem kann man die Watcher edieren. |
703 |
|
|
|
704 |
|
|
Die Benutzung ist denkbar einfach: |
705 |
|
|
|
706 |
|
|
!block perl |
707 |
|
|
eval { |
708 |
|
|
require NetServer::ProcessTop; |
709 |
|
|
NetServer::ProcessTop->new(7000); |
710 |
|
|
}; |
711 |
|
|
!endblock |
712 |
|
|
|
713 |
|
|
Ein C<telnet localhost 7000> erzeugt dann dieses Bild: |
714 |
|
|
|
715 |
|
|
!block verbatim |
716 |
|
|
get PID=3407 @ cerebro | 14:26:46 [ 60s] |
717 |
|
|
10 events; load averages: 0.75, 0.73, 0.00; lag 0% |
718 |
|
|
|
719 |
|
|
EID PRI STATE RAN TIME CPU TYPE DESCRIPTION P1 |
720 |
|
|
0 7 912 0:00 26.6% sys idle |
721 |
|
|
3 4 zomb 227 0:00 16.9% io ARTICLE 273573 |
722 |
|
|
6 4 zomb 236 0:00 16.6% io ARTICLE 273572 |
723 |
|
|
4 4 sleep 232 0:00 16.4% io ARTICLE 273575 |
724 |
|
|
5 4 sleep 221 0:00 16.0% io ARTICLE 273574 |
725 |
|
|
9 4 wait 117 0:00 7.3% idle groupstatus saver |
726 |
|
|
10 4 wait 180 0:00 0.3% idle reschedule hook |
727 |
|
|
2 3 sleep 1 0:00 0.0% time Event::Stats |
728 |
|
|
1 3 cpu 0 0:00 0.0% io NetServer::ProcessTop::Client localhost |
729 |
|
|
7 3 sleep 0 0:00 0.0% io NetServer::ProcessTop |
730 |
|
|
8 4 sleep 0 0:00 0.0% io user input |
731 |
|
|
0 -1 0 0:00 0.0% sys other processes |
732 |
|
|
|
733 |
|
|
% |
734 |
|
|
!endblock |
735 |
|
|
|
736 |
|
|
Weil das Modul aber ein potentielles Sicherheitsproblem sein kann, sollte |
737 |
|
|
es nur zum Debuggen/Erfreuen verwendet werden. |
738 |
|
|
|
739 |
|
|
A1: Der Quellcode |
740 |
|
|
|
741 |
|
|
!include "get"; perl |
742 |
|
|
|
743 |
|
|
A2: Mehr! |
744 |
|
|
|
745 |
|
|
Die folgenden Module/Programme/RFCs wurden für das Projekt verwendet. |
746 |
|
|
|
747 |
|
|
* C<Event> - Event loop processing. {{URL:http://www.cpan.org/}} |
748 |
|
|
* C<Set::IntSpan> - Manages sets of integers. {{URL:http://www.cpan.org/}} |
749 |
|
|
* C<RFC-977> Network News Transfer Protocol. {{URL:ftp://ftp.isi.edu/in-notes/rfc977.txt}} |
750 |
|
|
* C<NetServer::ProcessTop> - Make event loop statistics easily available. {{URL:http://www.cpan.org/}} |
751 |
|
|
* C<Time::HiRes> - High resolution ualarm, usleep, and gettimeofday. {{URL:http://www.cpan.org/}} |
752 |
|
|
* C<Socket> - load the C socket.h defines and structure manipulators. (Teil der Perl-Distribution). |
753 |
|
|
* C<DBI> - Database independent interface for Perl |
754 |
|
|
* C<MySQL> SQL-Datebank. {{URL:http://www.mysql.com}}. |
755 |
|
|
|
756 |
|
|
|