ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/AnyEvent.pm
Revision: 1.63
Committed: Thu Feb 10 08:06:27 2011 UTC (13 years, 4 months ago) by root
Branch: MAIN
Changes since 1.62: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.39 Coro::AnyEvent - integrate threads into AnyEvent
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8 root 1.54 use AnyEvent;
9     # using both Coro and AnyEvent will automatically load Coro::AnyEvent
10 root 1.1
11 root 1.62 # or load it manually for it's utility functions:
12     use Coro::AnyEvent;
13    
14     Coro::AnyEvent::sleep 5; # block current thread for 5s
15     Coro::AnyEvent::poll; # poll for new events once
16     Coro::AnyEvent::idle; # block until process no longer busy
17     Coro::AnyEvent::idle_upto 5; # same, but only up to 5 seconds
18    
19     Coro::AnyEvent::readable $fh, 60
20 root 1.63 or die "fh didn't become readable within 60 seconds\n";
21 root 1.2
22 root 1.39 =head1 DESCRIPTION
23 root 1.2
24 root 1.39 When one naively starts to use threads in Perl, one will quickly run
25     into the problem that threads that block on a syscall (sleeping,
26     reading from a socket etc.) will block all threads.
27 root 1.2
28     If one then uses an event loop, the problem is that the event loop has
29 root 1.39 no knowledge of threads and will not run them before it polls for new
30 root 1.2 events, again blocking the whole process.
31    
32 root 1.39 This module integrates threads into any event loop supported by
33 root 1.2 AnyEvent, combining event-based programming with coroutine-based
34     programming in a natural way.
35    
36 root 1.54 As of Coro 5.21 and newer, this module gets loaded automatically when
37     AnyEvent initialises itself and Coro is used in the same process, thus
38     there is no need to load it manually if you just want your threads to
39     coexist with AnyEvent.
40    
41     If you want to use any functions from this module, you of course still
42     need to C<use Coro::AnyEvent>, just as with other perl modules.
43 root 1.1
44 root 1.55 Also, this module autodetects the event loop used (by relying on
45     L<AnyEvent>) and will either automatically defer to the high-performance
46     L<Coro::EV> or L<Coro::Event> modules, or will use a generic integration
47     method that should work with any event loop supported by L<AnyEvent>.
48    
49 root 1.39 =head1 USAGE
50 root 1.1
51 root 1.55 =head2 RUN AN EVENT LOOP - OR NOT?
52    
53     For performance reasons, it is recommended that the main program or
54     something else runs the event loop of the event model you use, i.e.
55    
56     use Gtk2; # <- the event model
57     use AnyEvent;
58     use Coro:
59    
60     # initialise stuff
61     async { ... };
62    
63     # now run mainloop of Gtk2
64     main Gtk2;
65    
66     You can move the event loop into a thread as well, although this tends to
67     get confusing:
68    
69     use Gtk2;
70     use AnyEvent;
71     use Coro:
72    
73     async { main Gtk2 };
74    
75     # do other things...
76     while () {
77     use Coro::AnyEvent;
78     Coro::AnyEvent::sleep 1;
79     print "ping...\n";
80     }
81    
82     You can also do nothing, in which case Coro:AnyEvent will invoke the event
83     loop as needed, which is less efficient, but sometimes very convenient.
84    
85     What you I<MUST NOT DO EVER> is to block inside an event loop
86     callback. The reason is that most event loops are not reentrant and this
87     can cause a deadlock at best and corrupt memory at worst.
88    
89     Coro will try to catch you when you block in the event loop
90     ("FATAL:$Coro::IDLE blocked itself"), but this is just best effort and
91     only works when you do not run your own event loop.
92    
93     To avoid this problem, simply do not block inside an event callback
94     - start a new thread (e.g. with C<Coro:async_pool>) or use
95     C<Coro::unblock_sub>.
96    
97     =head2 INVERSION OF CONTROL
98 root 1.2
99 root 1.55 If you need to wait for a single event, the rouse functions will come in
100     handy (see the Coro manpage for details):
101 root 1.39
102 root 1.40 # wait for single SIGINT
103     {
104     my $int_w = AnyEvent->signal (signal => "INT", cb => Coro::rouse_cb);
105     Coro::rouse_wait;
106     }
107 root 1.39
108 root 1.54 =head2 EVENT MODULES OTHER THEN ANYEVENT
109    
110     Keep in mind that, as shipped, Coro and Coro::AnyEvent only work with
111     AnyEvent, and only when AnyEvent is actually used (i.e. initialised), so
112     this will not work:
113    
114 root 1.55 # does not work: EV without AnyEvent is not recognised
115 root 1.54 use EV;
116     use Coro;
117    
118     EV::loop;
119    
120     And neither does this, unless you actually I<use> AnyEvent for something:
121    
122 root 1.55 # does not work: AnyEvent must be initialised (e.g. by creating watchers)
123 root 1.54 use EV;
124     use AnyEvent;
125     use Coro;
126    
127     EV::loop;
128    
129     This does work, however, because you create a watcher (condvars work,
130     too), thus forcing AnyEvent to initialise itself:
131    
132     # does work: AnyEvent is actually used
133     use EV;
134     use AnyEvent;
135     use Coro;
136    
137     my $timer = AE::timer 1, 1, sub { };
138    
139     EV::loop;
140    
141     And if you want to use AnyEvent just to bridge between Coro and your event
142     model of choice, you can simply force it to initialise itself, like this:
143    
144     # does work: AnyEvent is initialised manually
145     use POE;
146     use AnyEvent;
147     use Coro;
148    
149 root 1.55 AnyEvent::detect; # force AnyEvent to integrate Coro into POE
150 root 1.54 POE::Kernel->run;
151    
152 root 1.40 =head1 FUNCTIONS
153 root 1.2
154 root 1.55 Coro::AnyEvent also offers a few functions that might be useful.
155 root 1.2
156 root 1.4 =over 4
157    
158 root 1.1 =cut
159    
160     package Coro::AnyEvent;
161    
162 root 1.52 use common::sense;
163 root 1.1
164     use Coro;
165     use AnyEvent ();
166    
167 root 1.61 our $VERSION = 5.25;
168 root 1.1
169 root 1.5 #############################################################################
170     # idle handler
171    
172 root 1.9 our $IDLE;
173 root 1.1
174 root 1.5 #############################################################################
175     # 0-timeout idle emulation watcher
176    
177 root 1.1 our $ACTIVITY;
178    
179     sub _activity {
180 root 1.51 $ACTIVITY ||= AE::timer 0, 0, \&_schedule;
181 root 1.1 }
182    
183 root 1.14 Coro::_set_readyhook (\&AnyEvent::detect);
184 root 1.8
185     AnyEvent::post_detect {
186 root 1.10 unshift @AnyEvent::CondVar::ISA, "Coro::AnyEvent::CondVar";
187 root 1.5
188 root 1.8 my $model = $AnyEvent::MODEL;
189 root 1.1
190 root 1.40 if ($model eq "AnyEvent::Impl::EV" and eval { require Coro::EV }) {
191 root 1.58 # provide faster versions of some functions
192     Coro::EV::_set_readyhook ();
193 root 1.40
194     eval '
195     *sleep = \&Coro::EV::timer_once;
196 root 1.48 *poll = \&Coro::EV::_poll;
197 root 1.52 *idle = sub() {
198 root 1.40 my $w = EV::idle Coro::rouse_cb;
199     Coro::rouse_wait;
200     };
201 root 1.52 *idle_upto = sub($) {
202 root 1.40 my $cb = Coro::rouse_cb;
203     my $t = EV::timer $_[0], 0, $cb;
204     my $w = EV::idle $cb;
205     Coro::rouse_wait;
206     };
207 root 1.52 *readable = sub($;$) {
208 root 1.40 EV::READ & Coro::EV::timed_io_once $_[0], EV::READ , $_[1]
209     };
210 root 1.52 *writable = sub($;$) {
211 root 1.40 EV::WRITE & Coro::EV::timed_io_once $_[0], EV::WRITE, $_[1]
212     };
213     ';
214     die if $@;
215    
216     } elsif ($model eq "AnyEvent::Impl::Event" and eval { require Coro::Event }) {
217 root 1.58 Coro::_set_readyhook undef;
218 root 1.40 # let Coro::Event do its thing
219 root 1.1 } else {
220 root 1.40 # do the inefficient thing ourselves
221 root 1.1 Coro::_set_readyhook \&_activity;
222 root 1.9
223     $IDLE = new Coro sub {
224 root 1.32 my $one_event = AnyEvent->can ("one_event");
225 root 1.40
226 root 1.9 while () {
227 root 1.32 $one_event->();
228 root 1.55 Coro::schedule if Coro::nready;
229 root 1.9 }
230     };
231     $IDLE->{desc} = "[AnyEvent idle process]";
232    
233 root 1.32 $Coro::idle = $IDLE;
234 root 1.47
235     # call the readyhook, in case coroutines were already readied
236     _activity;
237 root 1.1 }
238 root 1.8 };
239 root 1.5
240 root 1.40 =item Coro::AnyEvent::poll
241    
242     This call will block the current thread until the event loop has polled
243     for new events and instructs the event loop to poll for new events once,
244     without blocking.
245    
246     Note that this call will not actually execute the poll, just block until
247     new events have been polled, so other threads will have a chance to run.
248    
249     This is useful when you have a thread that does some computations, but you
250     still want to poll for new events from time to time. Simply call C<poll>
251     from time to time:
252    
253     my $long_calc = async {
254     for (1..10000) {
255     Coro::AnyEvent::poll:
256     # do some stuff, make sure it takes at least 0.001s or so
257     }
258     }
259    
260     Although you should also consider C<idle> or C<idle_upto> in such cases.
261    
262     =item Coro::AnyEvent::sleep $seconds
263    
264     This blocks the current thread for at least the given number of seconds.
265    
266     =item Coro::AnyEvent::idle
267    
268     This call is similar to C<poll> in that it will also poll for
269     events. Unlike C<poll>, it will only resume the thread once there are no
270     events to handle anymore, i.e. when the process is otherwise idle.
271    
272 root 1.56 This is good for background threads that shouldn't use CPU time when
273     foreground jobs are ready to run.
274    
275 root 1.40 =item Coro::AnyEvent::idle_upto $seconds
276    
277     Like C<idle>, but with a maximum waiting time.
278    
279     If your process is busy handling events, calling C<idle> can mean that
280     your thread will never be resumed. To avoid this, you can use C<idle_upto>
281     and specify a timeout, after which your thread will be resumed even if the
282     process is completely busy.
283    
284 root 1.44 =item Coro::AnyEvent::readable $fh_or_fileno[, $timeout]
285 root 1.40
286 root 1.44 =item Coro::AnyEvent::writable $fh_or_fileno[, $timeout]
287 root 1.40
288 root 1.44 Blocks the current thread until the given file handle (or file descriptor)
289     becomes readable (or writable), or the given timeout has elapsed,
290     whichever happens first. No timeout counts as infinite timeout.
291 root 1.40
292     Returns true when the file handle became ready, false when a timeout
293     occured.
294    
295     Note that these functions are quite inefficient as compared to using a
296     single watcher (they recreate watchers on every invocation) or compared to
297     using Coro::Handle.
298    
299     Note also that they only work for sources that have reasonable
300     non-blocking behaviour (e.g. not files).
301    
302     Example: wait until STDIN becomes readable, then quit the program.
303    
304     use Coro::AnyEvent;
305     print "press enter to quit...\n";
306     Coro::AnyEvent::readable *STDIN;
307     exit 0;
308    
309     =cut
310    
311     sub poll() {
312 root 1.51 my $w = AE::timer 0, 0, Coro::rouse_cb;
313 root 1.40 Coro::rouse_wait;
314     }
315    
316     sub sleep($) {
317 root 1.51 my $w = AE::timer $_[0], 0, Coro::rouse_cb;
318 root 1.40 Coro::rouse_wait;
319     }
320    
321     sub idle() {
322 root 1.51 my $w = AE::idle Coro::rouse_cb;
323 root 1.40 Coro::rouse_wait;
324     }
325    
326     sub idle_upto($) {
327     my $cb = Coro::rouse_cb;
328 root 1.51 my $t = AE::timer shift, 0, $cb;
329     my $w = AE::idle $cb;
330 root 1.40 Coro::rouse_wait;
331     }
332    
333     sub readable($;$) {
334     my $cb = Coro::rouse_cb;
335 root 1.51 my $w = AE::io $_[0], 0, sub { $cb->(1) };
336     my $t = defined $_[1] && AE::timer $_[1], 0, sub { $cb->(0) };
337 root 1.40 Coro::rouse_wait
338     }
339    
340     sub writable($;$) {
341     my $cb = Coro::rouse_cb;
342 root 1.51 my $w = AE::io $_[0], 1, sub { $cb->(1) };
343     my $t = defined $_[1] && AE::timer $_[1], 0, sub { $cb->(0) };
344 root 1.40 Coro::rouse_wait
345     }
346    
347 root 1.5 #############################################################################
348     # override condvars
349    
350 root 1.10 package Coro::AnyEvent::CondVar;
351 root 1.5
352 root 1.10 sub _send {
353     (delete $_[0]{_ae_coro})->ready if $_[0]{_ae_coro};
354 root 1.5 }
355    
356 root 1.10 sub _wait {
357     while (!$_[0]{_ae_sent}) {
358     local $_[0]{_ae_coro} = $Coro::current;
359 root 1.5 Coro::schedule;
360     }
361     }
362 root 1.1
363     1;
364    
365 root 1.40 =back
366    
367     =head1 IMPLEMENTATION DETAILS
368    
369     Unfortunately, few event loops (basically only L<EV> and L<Event>)
370     support the kind of integration required for smooth operations well, and
371     consequently, AnyEvent cannot completely offer the functionality required
372     by this module, so we need to improvise.
373    
374     Here is what this module does when it has to work with other event loops:
375    
376     =over 4
377    
378     =item * run ready threads before blocking the process
379    
380     Each time a thread is put into the ready queue (and there are no other
381     threads in the ready queue), a timer with an C<after> value of C<0> is
382     registered with AnyEvent.
383    
384     This creates something similar to an I<idle> watcher, i.e. a watcher
385     that keeps the event loop from blocking but still polls for new
386     events. (Unfortunately, some badly designed event loops (e.g. Event::Lib)
387     don't support a timeout of C<0> and will always block for a bit).
388    
389     The callback for that timer will C<cede> to other threads of the same or
390     higher priority for as long as such threads exists. This has the effect of
391     running all threads that have work to do until all threads block to wait
392     for external events.
393    
394     If no threads of equal or higher priority are ready, it will cede to any
395     thread, but only once. This has the effect of running lower-priority
396     threads as well, but it will not keep higher priority threads from
397     receiving new events.
398    
399     The priority used is simply the priority of the thread that runs the event
400     loop, usually the main program, which usually has a priority of C<0>. Note
401     that Coro::AnyEvent does I<not> run an event loop for you, so unless the
402     main program runs one, there will simply be no event loop to C<cede> to
403     (event handling will still work, somewhat inefficiently, but any thread
404     will have a higher priority than event handling in that case).
405    
406     =item * provide a suitable idle callback.
407    
408     In addition to hooking into C<ready>, this module will also provide a
409     C<$Coro::idle> handler that runs the event loop. It is best not to take
410     advantage of this too often, as this is rather inefficient, but it should
411     work perfectly fine.
412    
413     =item * provide overrides for AnyEvent's condvars
414    
415     This module installs overrides for AnyEvent's condvars. That is, when
416     the module is loaded it will provide its own condition variables. This
417     makes them coroutine-safe, i.e. you can safely block on them from within a
418     coroutine.
419    
420     =item * lead to data corruption or worse
421    
422     As C<unblock_sub> cannot be used by this module (as it is the module
423     that implements it, basically), you must not call into the event
424     loop recursively from any coroutine. This is not usually a difficult
425     restriction to live with, just use condvars, C<unblock_sub> or other means
426     of inter-coroutine-communications.
427    
428     If you use a module that supports AnyEvent (or uses the same event loop
429     as AnyEvent, making the compatible), and it offers callbacks of any kind,
430     then you must not block in them, either (or use e.g. C<unblock_sub>), see
431     the description of C<unblock_sub> in the L<Coro> module.
432    
433     This also means that you should load the module as early as possible,
434     as only condvars created after this module has been loaded will work
435     correctly.
436    
437     =back
438    
439 root 1.3 =head1 SEE ALSO
440    
441     L<AnyEvent>, to see which event loops are supported, L<Coro::EV> and
442     L<Coro::Event> for more efficient and more correct solutions (they will be
443     used automatically if applicable).
444    
445 root 1.1 =head1 AUTHOR
446    
447     Marc Lehmann <schmorp@schmorp.de>
448     http://home.schmorp.de/
449    
450     =cut
451