ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/AnyEvent.pm
Revision: 1.73
Committed: Wed Aug 3 14:52:19 2011 UTC (12 years, 10 months ago) by root
Branch: MAIN
CVS Tags: rel-6_04
Changes since 1.72: +17 -12 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.39 Coro::AnyEvent - integrate threads into AnyEvent
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8 root 1.54 use AnyEvent;
9     # using both Coro and AnyEvent will automatically load Coro::AnyEvent
10 root 1.1
11 root 1.62 # or load it manually for it's utility functions:
12     use Coro::AnyEvent;
13    
14     Coro::AnyEvent::sleep 5; # block current thread for 5s
15     Coro::AnyEvent::poll; # poll for new events once
16     Coro::AnyEvent::idle; # block until process no longer busy
17     Coro::AnyEvent::idle_upto 5; # same, but only up to 5 seconds
18    
19     Coro::AnyEvent::readable $fh, 60
20 root 1.63 or die "fh didn't become readable within 60 seconds\n";
21 root 1.2
22 root 1.39 =head1 DESCRIPTION
23 root 1.2
24 root 1.39 When one naively starts to use threads in Perl, one will quickly run
25     into the problem that threads that block on a syscall (sleeping,
26     reading from a socket etc.) will block all threads.
27 root 1.2
28     If one then uses an event loop, the problem is that the event loop has
29 root 1.39 no knowledge of threads and will not run them before it polls for new
30 root 1.2 events, again blocking the whole process.
31    
32 root 1.39 This module integrates threads into any event loop supported by
33 root 1.2 AnyEvent, combining event-based programming with coroutine-based
34     programming in a natural way.
35    
36 root 1.54 As of Coro 5.21 and newer, this module gets loaded automatically when
37     AnyEvent initialises itself and Coro is used in the same process, thus
38     there is no need to load it manually if you just want your threads to
39     coexist with AnyEvent.
40    
41     If you want to use any functions from this module, you of course still
42     need to C<use Coro::AnyEvent>, just as with other perl modules.
43 root 1.1
44 root 1.55 Also, this module autodetects the event loop used (by relying on
45     L<AnyEvent>) and will either automatically defer to the high-performance
46     L<Coro::EV> or L<Coro::Event> modules, or will use a generic integration
47     method that should work with any event loop supported by L<AnyEvent>.
48    
49 root 1.39 =head1 USAGE
50 root 1.1
51 root 1.55 =head2 RUN AN EVENT LOOP - OR NOT?
52    
53     For performance reasons, it is recommended that the main program or
54     something else runs the event loop of the event model you use, i.e.
55    
56     use Gtk2; # <- the event model
57     use AnyEvent;
58     use Coro:
59    
60     # initialise stuff
61     async { ... };
62    
63     # now run mainloop of Gtk2
64     main Gtk2;
65    
66     You can move the event loop into a thread as well, although this tends to
67     get confusing:
68    
69     use Gtk2;
70     use AnyEvent;
71     use Coro:
72    
73     async { main Gtk2 };
74    
75     # do other things...
76     while () {
77     use Coro::AnyEvent;
78     Coro::AnyEvent::sleep 1;
79     print "ping...\n";
80     }
81    
82     You can also do nothing, in which case Coro:AnyEvent will invoke the event
83     loop as needed, which is less efficient, but sometimes very convenient.
84    
85     What you I<MUST NOT DO EVER> is to block inside an event loop
86     callback. The reason is that most event loops are not reentrant and this
87     can cause a deadlock at best and corrupt memory at worst.
88    
89     Coro will try to catch you when you block in the event loop
90     ("FATAL:$Coro::IDLE blocked itself"), but this is just best effort and
91     only works when you do not run your own event loop.
92    
93     To avoid this problem, simply do not block inside an event callback
94     - start a new thread (e.g. with C<Coro:async_pool>) or use
95     C<Coro::unblock_sub>.
96    
97     =head2 INVERSION OF CONTROL
98 root 1.2
99 root 1.55 If you need to wait for a single event, the rouse functions will come in
100     handy (see the Coro manpage for details):
101 root 1.39
102 root 1.40 # wait for single SIGINT
103     {
104     my $int_w = AnyEvent->signal (signal => "INT", cb => Coro::rouse_cb);
105     Coro::rouse_wait;
106     }
107 root 1.39
108 root 1.54 =head2 EVENT MODULES OTHER THEN ANYEVENT
109    
110     Keep in mind that, as shipped, Coro and Coro::AnyEvent only work with
111     AnyEvent, and only when AnyEvent is actually used (i.e. initialised), so
112     this will not work:
113    
114 root 1.55 # does not work: EV without AnyEvent is not recognised
115 root 1.54 use EV;
116     use Coro;
117    
118     EV::loop;
119    
120     And neither does this, unless you actually I<use> AnyEvent for something:
121    
122 root 1.55 # does not work: AnyEvent must be initialised (e.g. by creating watchers)
123 root 1.54 use EV;
124     use AnyEvent;
125     use Coro;
126    
127     EV::loop;
128    
129     This does work, however, because you create a watcher (condvars work,
130     too), thus forcing AnyEvent to initialise itself:
131    
132     # does work: AnyEvent is actually used
133     use EV;
134     use AnyEvent;
135     use Coro;
136    
137     my $timer = AE::timer 1, 1, sub { };
138    
139     EV::loop;
140    
141     And if you want to use AnyEvent just to bridge between Coro and your event
142     model of choice, you can simply force it to initialise itself, like this:
143    
144     # does work: AnyEvent is initialised manually
145     use POE;
146     use AnyEvent;
147     use Coro;
148    
149 root 1.55 AnyEvent::detect; # force AnyEvent to integrate Coro into POE
150 root 1.54 POE::Kernel->run;
151    
152 root 1.40 =head1 FUNCTIONS
153 root 1.2
154 root 1.55 Coro::AnyEvent also offers a few functions that might be useful.
155 root 1.2
156 root 1.4 =over 4
157    
158 root 1.1 =cut
159    
160     package Coro::AnyEvent;
161    
162 root 1.52 use common::sense;
163 root 1.1
164     use Coro;
165     use AnyEvent ();
166    
167 root 1.73 our $VERSION = 6.04;
168 root 1.1
169 root 1.5 #############################################################################
170     # idle handler
171    
172 root 1.9 our $IDLE;
173 root 1.1
174 root 1.5 #############################################################################
175     # 0-timeout idle emulation watcher
176    
177 root 1.1 our $ACTIVITY;
178    
179     sub _activity {
180 root 1.51 $ACTIVITY ||= AE::timer 0, 0, \&_schedule;
181 root 1.1 }
182    
183 root 1.14 Coro::_set_readyhook (\&AnyEvent::detect);
184 root 1.8
185     AnyEvent::post_detect {
186     my $model = $AnyEvent::MODEL;
187 root 1.1
188 root 1.40 if ($model eq "AnyEvent::Impl::EV" and eval { require Coro::EV }) {
189 root 1.58 # provide faster versions of some functions
190     Coro::EV::_set_readyhook ();
191 root 1.40
192     eval '
193     *sleep = \&Coro::EV::timer_once;
194 root 1.48 *poll = \&Coro::EV::_poll;
195 root 1.52 *idle = sub() {
196 root 1.40 my $w = EV::idle Coro::rouse_cb;
197     Coro::rouse_wait;
198     };
199 root 1.52 *idle_upto = sub($) {
200 root 1.40 my $cb = Coro::rouse_cb;
201     my $t = EV::timer $_[0], 0, $cb;
202     my $w = EV::idle $cb;
203     Coro::rouse_wait;
204     };
205 root 1.52 *readable = sub($;$) {
206 root 1.40 EV::READ & Coro::EV::timed_io_once $_[0], EV::READ , $_[1]
207     };
208 root 1.52 *writable = sub($;$) {
209 root 1.40 EV::WRITE & Coro::EV::timed_io_once $_[0], EV::WRITE, $_[1]
210     };
211     ';
212     die if $@;
213    
214     } elsif ($model eq "AnyEvent::Impl::Event" and eval { require Coro::Event }) {
215 root 1.58 Coro::_set_readyhook undef;
216 root 1.40 # let Coro::Event do its thing
217 root 1.1 } else {
218 root 1.40 # do the inefficient thing ourselves
219 root 1.1 Coro::_set_readyhook \&_activity;
220 root 1.9
221     $IDLE = new Coro sub {
222 root 1.72 my $_poll = AnyEvent->can ("_poll")
223     || AnyEvent->can ("one_event"); # AnyEvent < 6.0
224 root 1.40
225 root 1.9 while () {
226 root 1.72 $_poll->();
227 root 1.55 Coro::schedule if Coro::nready;
228 root 1.9 }
229     };
230     $IDLE->{desc} = "[AnyEvent idle process]";
231    
232 root 1.32 $Coro::idle = $IDLE;
233 root 1.47
234     # call the readyhook, in case coroutines were already readied
235     _activity;
236 root 1.1 }
237 root 1.72
238     # augment condvars
239 root 1.73 unshift @AnyEvent::CondVar::ISA, "Coro::AnyEvent::CondVar";
240 root 1.8 };
241 root 1.5
242 root 1.40 =item Coro::AnyEvent::poll
243    
244     This call will block the current thread until the event loop has polled
245 root 1.68 for potential new events and instructs the event loop to poll for new
246     events once, without blocking.
247 root 1.40
248 root 1.68 Note that this call will not actually execute the poll, nor will it wait
249     until there are some events, just block until the event loop has polled
250     for new events, so other threads will have a chance to run.
251 root 1.40
252     This is useful when you have a thread that does some computations, but you
253     still want to poll for new events from time to time. Simply call C<poll>
254     from time to time:
255    
256     my $long_calc = async {
257     for (1..10000) {
258     Coro::AnyEvent::poll:
259     # do some stuff, make sure it takes at least 0.001s or so
260     }
261     }
262    
263     Although you should also consider C<idle> or C<idle_upto> in such cases.
264    
265     =item Coro::AnyEvent::sleep $seconds
266    
267     This blocks the current thread for at least the given number of seconds.
268    
269     =item Coro::AnyEvent::idle
270    
271     This call is similar to C<poll> in that it will also poll for
272     events. Unlike C<poll>, it will only resume the thread once there are no
273     events to handle anymore, i.e. when the process is otherwise idle.
274    
275 root 1.56 This is good for background threads that shouldn't use CPU time when
276     foreground jobs are ready to run.
277    
278 root 1.40 =item Coro::AnyEvent::idle_upto $seconds
279    
280     Like C<idle>, but with a maximum waiting time.
281    
282     If your process is busy handling events, calling C<idle> can mean that
283     your thread will never be resumed. To avoid this, you can use C<idle_upto>
284     and specify a timeout, after which your thread will be resumed even if the
285     process is completely busy.
286    
287 root 1.44 =item Coro::AnyEvent::readable $fh_or_fileno[, $timeout]
288 root 1.40
289 root 1.44 =item Coro::AnyEvent::writable $fh_or_fileno[, $timeout]
290 root 1.40
291 root 1.44 Blocks the current thread until the given file handle (or file descriptor)
292     becomes readable (or writable), or the given timeout has elapsed,
293     whichever happens first. No timeout counts as infinite timeout.
294 root 1.40
295     Returns true when the file handle became ready, false when a timeout
296     occured.
297    
298     Note that these functions are quite inefficient as compared to using a
299     single watcher (they recreate watchers on every invocation) or compared to
300     using Coro::Handle.
301    
302     Note also that they only work for sources that have reasonable
303     non-blocking behaviour (e.g. not files).
304    
305     Example: wait until STDIN becomes readable, then quit the program.
306    
307     use Coro::AnyEvent;
308     print "press enter to quit...\n";
309     Coro::AnyEvent::readable *STDIN;
310     exit 0;
311    
312     =cut
313    
314     sub poll() {
315 root 1.51 my $w = AE::timer 0, 0, Coro::rouse_cb;
316 root 1.40 Coro::rouse_wait;
317     }
318    
319     sub sleep($) {
320 root 1.51 my $w = AE::timer $_[0], 0, Coro::rouse_cb;
321 root 1.40 Coro::rouse_wait;
322     }
323    
324     sub idle() {
325 root 1.51 my $w = AE::idle Coro::rouse_cb;
326 root 1.40 Coro::rouse_wait;
327     }
328    
329     sub idle_upto($) {
330     my $cb = Coro::rouse_cb;
331 root 1.51 my $t = AE::timer shift, 0, $cb;
332     my $w = AE::idle $cb;
333 root 1.40 Coro::rouse_wait;
334     }
335    
336     sub readable($;$) {
337     my $cb = Coro::rouse_cb;
338 root 1.51 my $w = AE::io $_[0], 0, sub { $cb->(1) };
339     my $t = defined $_[1] && AE::timer $_[1], 0, sub { $cb->(0) };
340 root 1.40 Coro::rouse_wait
341     }
342    
343     sub writable($;$) {
344     my $cb = Coro::rouse_cb;
345 root 1.51 my $w = AE::io $_[0], 1, sub { $cb->(1) };
346     my $t = defined $_[1] && AE::timer $_[1], 0, sub { $cb->(0) };
347 root 1.40 Coro::rouse_wait
348     }
349    
350 root 1.73 sub Coro::AnyEvent::CondVar::send {
351     (delete $_[0]{_ae_coro})->ready if $_[0]{_ae_coro};
352    
353     AnyEvent::CondVar::Base::send $_[0];
354     };
355    
356     sub Coro::AnyEvent::CondVar::recv {
357     until ($_[0]{_ae_sent}) {
358     local $_[0]{_ae_coro} = $Coro::current;
359     Coro::schedule;
360     }
361    
362     AnyEvent::CondVar::Base::recv $_[0];
363     };
364    
365 root 1.1 1;
366    
367 root 1.40 =back
368    
369     =head1 IMPLEMENTATION DETAILS
370    
371     Unfortunately, few event loops (basically only L<EV> and L<Event>)
372     support the kind of integration required for smooth operations well, and
373     consequently, AnyEvent cannot completely offer the functionality required
374     by this module, so we need to improvise.
375    
376     Here is what this module does when it has to work with other event loops:
377    
378     =over 4
379    
380     =item * run ready threads before blocking the process
381    
382     Each time a thread is put into the ready queue (and there are no other
383     threads in the ready queue), a timer with an C<after> value of C<0> is
384     registered with AnyEvent.
385    
386     This creates something similar to an I<idle> watcher, i.e. a watcher
387     that keeps the event loop from blocking but still polls for new
388     events. (Unfortunately, some badly designed event loops (e.g. Event::Lib)
389     don't support a timeout of C<0> and will always block for a bit).
390    
391     The callback for that timer will C<cede> to other threads of the same or
392     higher priority for as long as such threads exists. This has the effect of
393     running all threads that have work to do until all threads block to wait
394     for external events.
395    
396     If no threads of equal or higher priority are ready, it will cede to any
397     thread, but only once. This has the effect of running lower-priority
398     threads as well, but it will not keep higher priority threads from
399     receiving new events.
400    
401     The priority used is simply the priority of the thread that runs the event
402     loop, usually the main program, which usually has a priority of C<0>. Note
403     that Coro::AnyEvent does I<not> run an event loop for you, so unless the
404     main program runs one, there will simply be no event loop to C<cede> to
405     (event handling will still work, somewhat inefficiently, but any thread
406     will have a higher priority than event handling in that case).
407    
408     =item * provide a suitable idle callback.
409    
410     In addition to hooking into C<ready>, this module will also provide a
411     C<$Coro::idle> handler that runs the event loop. It is best not to take
412     advantage of this too often, as this is rather inefficient, but it should
413     work perfectly fine.
414    
415     =item * provide overrides for AnyEvent's condvars
416    
417     This module installs overrides for AnyEvent's condvars. That is, when
418     the module is loaded it will provide its own condition variables. This
419     makes them coroutine-safe, i.e. you can safely block on them from within a
420     coroutine.
421    
422     =item * lead to data corruption or worse
423    
424     As C<unblock_sub> cannot be used by this module (as it is the module
425     that implements it, basically), you must not call into the event
426     loop recursively from any coroutine. This is not usually a difficult
427     restriction to live with, just use condvars, C<unblock_sub> or other means
428     of inter-coroutine-communications.
429    
430     If you use a module that supports AnyEvent (or uses the same event loop
431     as AnyEvent, making the compatible), and it offers callbacks of any kind,
432     then you must not block in them, either (or use e.g. C<unblock_sub>), see
433     the description of C<unblock_sub> in the L<Coro> module.
434    
435     This also means that you should load the module as early as possible,
436     as only condvars created after this module has been loaded will work
437     correctly.
438    
439     =back
440    
441 root 1.3 =head1 SEE ALSO
442    
443     L<AnyEvent>, to see which event loops are supported, L<Coro::EV> and
444     L<Coro::Event> for more efficient and more correct solutions (they will be
445     used automatically if applicable).
446    
447 root 1.1 =head1 AUTHOR
448    
449     Marc Lehmann <schmorp@schmorp.de>
450     http://home.schmorp.de/
451    
452     =cut
453