ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/AnyEvent.pm
Revision: 1.90
Committed: Tue Mar 4 12:32:02 2014 UTC (10 years, 3 months ago) by root
Branch: MAIN
CVS Tags: rel-6_37
Changes since 1.89: +1 -1 lines
Log Message:
6.37

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.39 Coro::AnyEvent - integrate threads into AnyEvent
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8 root 1.54 use AnyEvent;
9     # using both Coro and AnyEvent will automatically load Coro::AnyEvent
10 root 1.1
11 root 1.77 # or load it manually for its utility functions:
12 root 1.62 use Coro::AnyEvent;
13    
14     Coro::AnyEvent::sleep 5; # block current thread for 5s
15     Coro::AnyEvent::poll; # poll for new events once
16     Coro::AnyEvent::idle; # block until process no longer busy
17     Coro::AnyEvent::idle_upto 5; # same, but only up to 5 seconds
18    
19     Coro::AnyEvent::readable $fh, 60
20 root 1.63 or die "fh didn't become readable within 60 seconds\n";
21 root 1.2
22 root 1.39 =head1 DESCRIPTION
23 root 1.2
24 root 1.39 When one naively starts to use threads in Perl, one will quickly run
25 root 1.77 into the problem that threads which block on a syscall (sleeping,
26 root 1.39 reading from a socket etc.) will block all threads.
27 root 1.2
28     If one then uses an event loop, the problem is that the event loop has
29 root 1.39 no knowledge of threads and will not run them before it polls for new
30 root 1.2 events, again blocking the whole process.
31    
32 root 1.39 This module integrates threads into any event loop supported by
33 root 1.2 AnyEvent, combining event-based programming with coroutine-based
34     programming in a natural way.
35    
36 root 1.54 As of Coro 5.21 and newer, this module gets loaded automatically when
37     AnyEvent initialises itself and Coro is used in the same process, thus
38     there is no need to load it manually if you just want your threads to
39     coexist with AnyEvent.
40    
41     If you want to use any functions from this module, you of course still
42     need to C<use Coro::AnyEvent>, just as with other perl modules.
43 root 1.1
44 root 1.55 Also, this module autodetects the event loop used (by relying on
45     L<AnyEvent>) and will either automatically defer to the high-performance
46     L<Coro::EV> or L<Coro::Event> modules, or will use a generic integration
47     method that should work with any event loop supported by L<AnyEvent>.
48    
49 root 1.39 =head1 USAGE
50 root 1.1
51 root 1.55 =head2 RUN AN EVENT LOOP - OR NOT?
52    
53     For performance reasons, it is recommended that the main program or
54 root 1.77 something else run the event loop of the event model you use, i.e.
55 root 1.55
56     use Gtk2; # <- the event model
57     use AnyEvent;
58     use Coro:
59    
60     # initialise stuff
61     async { ... };
62    
63     # now run mainloop of Gtk2
64     main Gtk2;
65    
66     You can move the event loop into a thread as well, although this tends to
67     get confusing:
68    
69     use Gtk2;
70     use AnyEvent;
71     use Coro:
72    
73     async { main Gtk2 };
74    
75     # do other things...
76     while () {
77     use Coro::AnyEvent;
78     Coro::AnyEvent::sleep 1;
79     print "ping...\n";
80     }
81    
82 root 1.77 You can also do nothing, in which case Coro::AnyEvent will invoke the event
83 root 1.55 loop as needed, which is less efficient, but sometimes very convenient.
84    
85 root 1.77 What you I<MUST NOT EVER DO> is to block inside an event loop
86     callback. The reason is that most event loops are not reentrant and
87     this can cause a deadlock at best and corrupt memory at worst.
88 root 1.55
89     Coro will try to catch you when you block in the event loop
90 root 1.82 ("FATAL: $Coro::IDLE blocked itself"), but this is just best effort and
91 root 1.55 only works when you do not run your own event loop.
92    
93 root 1.83 To avoid this problem, start a new thread (e.g. with C<Coro::async_pool>)
94     or use C<Coro::unblock_sub> to run blocking tasks.
95 root 1.55
96     =head2 INVERSION OF CONTROL
97 root 1.2
98 root 1.55 If you need to wait for a single event, the rouse functions will come in
99     handy (see the Coro manpage for details):
100 root 1.39
101 root 1.40 # wait for single SIGINT
102     {
103     my $int_w = AnyEvent->signal (signal => "INT", cb => Coro::rouse_cb);
104     Coro::rouse_wait;
105     }
106 root 1.39
107 root 1.77 =head2 EVENT MODULES OTHER THAN ANYEVENT
108 root 1.54
109     Keep in mind that, as shipped, Coro and Coro::AnyEvent only work with
110     AnyEvent, and only when AnyEvent is actually used (i.e. initialised), so
111     this will not work:
112    
113 root 1.55 # does not work: EV without AnyEvent is not recognised
114 root 1.54 use EV;
115     use Coro;
116    
117     EV::loop;
118    
119     And neither does this, unless you actually I<use> AnyEvent for something:
120    
121 root 1.55 # does not work: AnyEvent must be initialised (e.g. by creating watchers)
122 root 1.54 use EV;
123     use AnyEvent;
124     use Coro;
125    
126     EV::loop;
127    
128     This does work, however, because you create a watcher (condvars work,
129     too), thus forcing AnyEvent to initialise itself:
130    
131     # does work: AnyEvent is actually used
132     use EV;
133     use AnyEvent;
134     use Coro;
135    
136     my $timer = AE::timer 1, 1, sub { };
137    
138     EV::loop;
139    
140     And if you want to use AnyEvent just to bridge between Coro and your event
141     model of choice, you can simply force it to initialise itself, like this:
142    
143     # does work: AnyEvent is initialised manually
144     use POE;
145     use AnyEvent;
146     use Coro;
147    
148 root 1.55 AnyEvent::detect; # force AnyEvent to integrate Coro into POE
149 root 1.54 POE::Kernel->run;
150    
151 root 1.40 =head1 FUNCTIONS
152 root 1.2
153 root 1.55 Coro::AnyEvent also offers a few functions that might be useful.
154 root 1.2
155 root 1.4 =over 4
156    
157 root 1.1 =cut
158    
159     package Coro::AnyEvent;
160    
161 root 1.52 use common::sense;
162 root 1.1
163     use Coro;
164     use AnyEvent ();
165    
166 root 1.90 our $VERSION = 6.37;
167 root 1.1
168 root 1.5 #############################################################################
169     # idle handler
170    
171 root 1.9 our $IDLE;
172 root 1.1
173 root 1.5 #############################################################################
174     # 0-timeout idle emulation watcher
175    
176 root 1.1 our $ACTIVITY;
177    
178     sub _activity {
179 root 1.51 $ACTIVITY ||= AE::timer 0, 0, \&_schedule;
180 root 1.1 }
181    
182 root 1.14 Coro::_set_readyhook (\&AnyEvent::detect);
183 root 1.8
184     AnyEvent::post_detect {
185     my $model = $AnyEvent::MODEL;
186 root 1.1
187 root 1.40 if ($model eq "AnyEvent::Impl::EV" and eval { require Coro::EV }) {
188 root 1.58 # provide faster versions of some functions
189     Coro::EV::_set_readyhook ();
190 root 1.40
191     eval '
192     *sleep = \&Coro::EV::timer_once;
193 root 1.48 *poll = \&Coro::EV::_poll;
194 root 1.52 *idle = sub() {
195 root 1.40 my $w = EV::idle Coro::rouse_cb;
196     Coro::rouse_wait;
197     };
198 root 1.52 *idle_upto = sub($) {
199 root 1.40 my $cb = Coro::rouse_cb;
200     my $t = EV::timer $_[0], 0, $cb;
201     my $w = EV::idle $cb;
202     Coro::rouse_wait;
203     };
204 root 1.52 *readable = sub($;$) {
205 root 1.40 EV::READ & Coro::EV::timed_io_once $_[0], EV::READ , $_[1]
206     };
207 root 1.52 *writable = sub($;$) {
208 root 1.40 EV::WRITE & Coro::EV::timed_io_once $_[0], EV::WRITE, $_[1]
209     };
210     ';
211     die if $@;
212    
213     } elsif ($model eq "AnyEvent::Impl::Event" and eval { require Coro::Event }) {
214 root 1.58 Coro::_set_readyhook undef;
215 root 1.40 # let Coro::Event do its thing
216 root 1.1 } else {
217 root 1.40 # do the inefficient thing ourselves
218 root 1.1 Coro::_set_readyhook \&_activity;
219 root 1.9
220     $IDLE = new Coro sub {
221 root 1.72 my $_poll = AnyEvent->can ("_poll")
222     || AnyEvent->can ("one_event"); # AnyEvent < 6.0
223 root 1.40
224 root 1.9 while () {
225 root 1.72 $_poll->();
226 root 1.55 Coro::schedule if Coro::nready;
227 root 1.9 }
228     };
229     $IDLE->{desc} = "[AnyEvent idle process]";
230    
231 root 1.32 $Coro::idle = $IDLE;
232 root 1.47
233     # call the readyhook, in case coroutines were already readied
234     _activity;
235 root 1.1 }
236 root 1.72
237     # augment condvars
238 root 1.73 unshift @AnyEvent::CondVar::ISA, "Coro::AnyEvent::CondVar";
239 root 1.8 };
240 root 1.5
241 root 1.40 =item Coro::AnyEvent::poll
242    
243     This call will block the current thread until the event loop has polled
244 root 1.68 for potential new events and instructs the event loop to poll for new
245     events once, without blocking.
246 root 1.40
247 root 1.68 Note that this call will not actually execute the poll, nor will it wait
248     until there are some events, just block until the event loop has polled
249     for new events, so other threads will have a chance to run.
250 root 1.40
251     This is useful when you have a thread that does some computations, but you
252     still want to poll for new events from time to time. Simply call C<poll>
253     from time to time:
254    
255     my $long_calc = async {
256     for (1..10000) {
257 root 1.88 Coro::AnyEvent::poll;
258 root 1.40 # do some stuff, make sure it takes at least 0.001s or so
259     }
260     }
261    
262     Although you should also consider C<idle> or C<idle_upto> in such cases.
263    
264     =item Coro::AnyEvent::sleep $seconds
265    
266     This blocks the current thread for at least the given number of seconds.
267    
268     =item Coro::AnyEvent::idle
269    
270     This call is similar to C<poll> in that it will also poll for
271     events. Unlike C<poll>, it will only resume the thread once there are no
272     events to handle anymore, i.e. when the process is otherwise idle.
273    
274 root 1.56 This is good for background threads that shouldn't use CPU time when
275     foreground jobs are ready to run.
276    
277 root 1.40 =item Coro::AnyEvent::idle_upto $seconds
278    
279     Like C<idle>, but with a maximum waiting time.
280    
281     If your process is busy handling events, calling C<idle> can mean that
282     your thread will never be resumed. To avoid this, you can use C<idle_upto>
283     and specify a timeout, after which your thread will be resumed even if the
284     process is completely busy.
285    
286 root 1.44 =item Coro::AnyEvent::readable $fh_or_fileno[, $timeout]
287 root 1.40
288 root 1.44 =item Coro::AnyEvent::writable $fh_or_fileno[, $timeout]
289 root 1.40
290 root 1.44 Blocks the current thread until the given file handle (or file descriptor)
291     becomes readable (or writable), or the given timeout has elapsed,
292     whichever happens first. No timeout counts as infinite timeout.
293 root 1.40
294     Returns true when the file handle became ready, false when a timeout
295     occured.
296    
297     Note that these functions are quite inefficient as compared to using a
298     single watcher (they recreate watchers on every invocation) or compared to
299     using Coro::Handle.
300    
301     Note also that they only work for sources that have reasonable
302     non-blocking behaviour (e.g. not files).
303    
304     Example: wait until STDIN becomes readable, then quit the program.
305    
306     use Coro::AnyEvent;
307     print "press enter to quit...\n";
308     Coro::AnyEvent::readable *STDIN;
309     exit 0;
310    
311     =cut
312    
313     sub poll() {
314 root 1.51 my $w = AE::timer 0, 0, Coro::rouse_cb;
315 root 1.40 Coro::rouse_wait;
316     }
317    
318     sub sleep($) {
319 root 1.51 my $w = AE::timer $_[0], 0, Coro::rouse_cb;
320 root 1.40 Coro::rouse_wait;
321     }
322    
323     sub idle() {
324 root 1.51 my $w = AE::idle Coro::rouse_cb;
325 root 1.40 Coro::rouse_wait;
326     }
327    
328     sub idle_upto($) {
329     my $cb = Coro::rouse_cb;
330 root 1.51 my $t = AE::timer shift, 0, $cb;
331     my $w = AE::idle $cb;
332 root 1.40 Coro::rouse_wait;
333     }
334    
335     sub readable($;$) {
336     my $cb = Coro::rouse_cb;
337 root 1.51 my $w = AE::io $_[0], 0, sub { $cb->(1) };
338     my $t = defined $_[1] && AE::timer $_[1], 0, sub { $cb->(0) };
339 root 1.40 Coro::rouse_wait
340     }
341    
342     sub writable($;$) {
343     my $cb = Coro::rouse_cb;
344 root 1.51 my $w = AE::io $_[0], 1, sub { $cb->(1) };
345     my $t = defined $_[1] && AE::timer $_[1], 0, sub { $cb->(0) };
346 root 1.40 Coro::rouse_wait
347     }
348    
349 root 1.73 sub Coro::AnyEvent::CondVar::send {
350     (delete $_[0]{_ae_coro})->ready if $_[0]{_ae_coro};
351    
352 root 1.74 &AnyEvent::CondVar::Base::send;
353 root 1.73 };
354    
355     sub Coro::AnyEvent::CondVar::recv {
356     until ($_[0]{_ae_sent}) {
357     local $_[0]{_ae_coro} = $Coro::current;
358     Coro::schedule;
359     }
360    
361 root 1.74 &AnyEvent::CondVar::Base::recv;
362 root 1.73 };
363    
364 root 1.1 1;
365    
366 root 1.40 =back
367    
368     =head1 IMPLEMENTATION DETAILS
369    
370     Unfortunately, few event loops (basically only L<EV> and L<Event>)
371     support the kind of integration required for smooth operations well, and
372     consequently, AnyEvent cannot completely offer the functionality required
373     by this module, so we need to improvise.
374    
375     Here is what this module does when it has to work with other event loops:
376    
377     =over 4
378    
379     =item * run ready threads before blocking the process
380    
381     Each time a thread is put into the ready queue (and there are no other
382     threads in the ready queue), a timer with an C<after> value of C<0> is
383     registered with AnyEvent.
384    
385     This creates something similar to an I<idle> watcher, i.e. a watcher
386     that keeps the event loop from blocking but still polls for new
387     events. (Unfortunately, some badly designed event loops (e.g. Event::Lib)
388     don't support a timeout of C<0> and will always block for a bit).
389    
390     The callback for that timer will C<cede> to other threads of the same or
391     higher priority for as long as such threads exists. This has the effect of
392     running all threads that have work to do until all threads block to wait
393     for external events.
394    
395     If no threads of equal or higher priority are ready, it will cede to any
396     thread, but only once. This has the effect of running lower-priority
397     threads as well, but it will not keep higher priority threads from
398     receiving new events.
399    
400     The priority used is simply the priority of the thread that runs the event
401     loop, usually the main program, which usually has a priority of C<0>. Note
402     that Coro::AnyEvent does I<not> run an event loop for you, so unless the
403     main program runs one, there will simply be no event loop to C<cede> to
404     (event handling will still work, somewhat inefficiently, but any thread
405     will have a higher priority than event handling in that case).
406    
407     =item * provide a suitable idle callback.
408    
409     In addition to hooking into C<ready>, this module will also provide a
410     C<$Coro::idle> handler that runs the event loop. It is best not to take
411     advantage of this too often, as this is rather inefficient, but it should
412     work perfectly fine.
413    
414     =item * provide overrides for AnyEvent's condvars
415    
416     This module installs overrides for AnyEvent's condvars. That is, when
417     the module is loaded it will provide its own condition variables. This
418     makes them coroutine-safe, i.e. you can safely block on them from within a
419     coroutine.
420    
421     =item * lead to data corruption or worse
422    
423     As C<unblock_sub> cannot be used by this module (as it is the module
424     that implements it, basically), you must not call into the event
425     loop recursively from any coroutine. This is not usually a difficult
426     restriction to live with, just use condvars, C<unblock_sub> or other means
427     of inter-coroutine-communications.
428    
429 root 1.77 If you use a module that supports AnyEvent (or uses the same event
430     loop as AnyEvent, making it implicitly compatible), and it offers
431     callbacks of any kind, then you must not block in them, either (or use
432     e.g. C<unblock_sub>), see the description of C<unblock_sub> in the
433     L<Coro> module.
434 root 1.40
435     This also means that you should load the module as early as possible,
436     as only condvars created after this module has been loaded will work
437     correctly.
438    
439     =back
440    
441 root 1.3 =head1 SEE ALSO
442    
443     L<AnyEvent>, to see which event loops are supported, L<Coro::EV> and
444     L<Coro::Event> for more efficient and more correct solutions (they will be
445     used automatically if applicable).
446    
447 root 1.1 =head1 AUTHOR
448    
449     Marc Lehmann <schmorp@schmorp.de>
450     http://home.schmorp.de/
451    
452     =cut
453