ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/AnyEvent.pm
Revision: 1.119
Committed: Sun Jul 26 10:30:56 2020 UTC (3 years, 10 months ago) by root
Branch: MAIN
CVS Tags: rel-6_57, HEAD
Changes since 1.118: +1 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.39 Coro::AnyEvent - integrate threads into AnyEvent
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8 root 1.54 use AnyEvent;
9     # using both Coro and AnyEvent will automatically load Coro::AnyEvent
10 root 1.1
11 root 1.77 # or load it manually for its utility functions:
12 root 1.62 use Coro::AnyEvent;
13    
14     Coro::AnyEvent::sleep 5; # block current thread for 5s
15     Coro::AnyEvent::poll; # poll for new events once
16     Coro::AnyEvent::idle; # block until process no longer busy
17     Coro::AnyEvent::idle_upto 5; # same, but only up to 5 seconds
18    
19     Coro::AnyEvent::readable $fh, 60
20 root 1.63 or die "fh didn't become readable within 60 seconds\n";
21 root 1.2
22 root 1.39 =head1 DESCRIPTION
23 root 1.2
24 root 1.39 When one naively starts to use threads in Perl, one will quickly run
25 root 1.77 into the problem that threads which block on a syscall (sleeping,
26 root 1.39 reading from a socket etc.) will block all threads.
27 root 1.2
28     If one then uses an event loop, the problem is that the event loop has
29 root 1.39 no knowledge of threads and will not run them before it polls for new
30 root 1.2 events, again blocking the whole process.
31    
32 root 1.39 This module integrates threads into any event loop supported by
33 root 1.2 AnyEvent, combining event-based programming with coroutine-based
34     programming in a natural way.
35    
36 root 1.54 As of Coro 5.21 and newer, this module gets loaded automatically when
37     AnyEvent initialises itself and Coro is used in the same process, thus
38     there is no need to load it manually if you just want your threads to
39     coexist with AnyEvent.
40    
41     If you want to use any functions from this module, you of course still
42     need to C<use Coro::AnyEvent>, just as with other perl modules.
43 root 1.1
44 root 1.55 Also, this module autodetects the event loop used (by relying on
45     L<AnyEvent>) and will either automatically defer to the high-performance
46     L<Coro::EV> or L<Coro::Event> modules, or will use a generic integration
47     method that should work with any event loop supported by L<AnyEvent>.
48    
49 root 1.39 =head1 USAGE
50 root 1.1
51 root 1.55 =head2 RUN AN EVENT LOOP - OR NOT?
52    
53     For performance reasons, it is recommended that the main program or
54 root 1.77 something else run the event loop of the event model you use, i.e.
55 root 1.55
56     use Gtk2; # <- the event model
57     use AnyEvent;
58     use Coro:
59    
60     # initialise stuff
61     async { ... };
62    
63     # now run mainloop of Gtk2
64     main Gtk2;
65    
66     You can move the event loop into a thread as well, although this tends to
67     get confusing:
68    
69     use Gtk2;
70     use AnyEvent;
71     use Coro:
72    
73     async { main Gtk2 };
74    
75     # do other things...
76     while () {
77     use Coro::AnyEvent;
78     Coro::AnyEvent::sleep 1;
79     print "ping...\n";
80     }
81    
82 root 1.77 You can also do nothing, in which case Coro::AnyEvent will invoke the event
83 root 1.55 loop as needed, which is less efficient, but sometimes very convenient.
84    
85 root 1.77 What you I<MUST NOT EVER DO> is to block inside an event loop
86     callback. The reason is that most event loops are not reentrant and
87     this can cause a deadlock at best and corrupt memory at worst.
88 root 1.55
89     Coro will try to catch you when you block in the event loop
90 root 1.82 ("FATAL: $Coro::IDLE blocked itself"), but this is just best effort and
91 root 1.55 only works when you do not run your own event loop.
92    
93 root 1.83 To avoid this problem, start a new thread (e.g. with C<Coro::async_pool>)
94     or use C<Coro::unblock_sub> to run blocking tasks.
95 root 1.55
96     =head2 INVERSION OF CONTROL
97 root 1.2
98 root 1.55 If you need to wait for a single event, the rouse functions will come in
99     handy (see the Coro manpage for details):
100 root 1.39
101 root 1.40 # wait for single SIGINT
102     {
103     my $int_w = AnyEvent->signal (signal => "INT", cb => Coro::rouse_cb);
104     Coro::rouse_wait;
105     }
106 root 1.39
107 root 1.77 =head2 EVENT MODULES OTHER THAN ANYEVENT
108 root 1.54
109     Keep in mind that, as shipped, Coro and Coro::AnyEvent only work with
110     AnyEvent, and only when AnyEvent is actually used (i.e. initialised), so
111     this will not work:
112    
113 root 1.55 # does not work: EV without AnyEvent is not recognised
114 root 1.54 use EV;
115     use Coro;
116    
117     EV::loop;
118    
119     And neither does this, unless you actually I<use> AnyEvent for something:
120    
121 root 1.55 # does not work: AnyEvent must be initialised (e.g. by creating watchers)
122 root 1.54 use EV;
123     use AnyEvent;
124     use Coro;
125    
126     EV::loop;
127    
128     This does work, however, because you create a watcher (condvars work,
129     too), thus forcing AnyEvent to initialise itself:
130    
131     # does work: AnyEvent is actually used
132     use EV;
133     use AnyEvent;
134     use Coro;
135    
136     my $timer = AE::timer 1, 1, sub { };
137    
138     EV::loop;
139    
140     And if you want to use AnyEvent just to bridge between Coro and your event
141     model of choice, you can simply force it to initialise itself, like this:
142    
143     # does work: AnyEvent is initialised manually
144     use POE;
145     use AnyEvent;
146     use Coro;
147    
148 root 1.55 AnyEvent::detect; # force AnyEvent to integrate Coro into POE
149 root 1.54 POE::Kernel->run;
150    
151 root 1.40 =head1 FUNCTIONS
152 root 1.2
153 root 1.55 Coro::AnyEvent also offers a few functions that might be useful.
154 root 1.2
155 root 1.4 =over 4
156    
157 root 1.1 =cut
158    
159     package Coro::AnyEvent;
160    
161 root 1.52 use common::sense;
162 root 1.1
163     use Coro;
164     use AnyEvent ();
165    
166 root 1.116 our $VERSION = 6.57;
167 root 1.1
168 root 1.5 #############################################################################
169     # idle handler
170    
171 root 1.9 our $IDLE;
172 root 1.1
173 root 1.5 #############################################################################
174     # 0-timeout idle emulation watcher
175    
176 root 1.1 our $ACTIVITY;
177    
178     sub _activity {
179 root 1.51 $ACTIVITY ||= AE::timer 0, 0, \&_schedule;
180 root 1.1 }
181    
182 root 1.14 Coro::_set_readyhook (\&AnyEvent::detect);
183 root 1.8
184     AnyEvent::post_detect {
185     my $model = $AnyEvent::MODEL;
186 root 1.1
187 root 1.40 if ($model eq "AnyEvent::Impl::EV" and eval { require Coro::EV }) {
188 root 1.58 # provide faster versions of some functions
189     Coro::EV::_set_readyhook ();
190 root 1.40
191     eval '
192     *sleep = \&Coro::EV::timer_once;
193 root 1.48 *poll = \&Coro::EV::_poll;
194 root 1.52 *idle = sub() {
195 root 1.40 my $w = EV::idle Coro::rouse_cb;
196     Coro::rouse_wait;
197     };
198 root 1.52 *idle_upto = sub($) {
199 root 1.40 my $cb = Coro::rouse_cb;
200     my $t = EV::timer $_[0], 0, $cb;
201     my $w = EV::idle $cb;
202     Coro::rouse_wait;
203     };
204 root 1.52 *readable = sub($;$) {
205 root 1.40 EV::READ & Coro::EV::timed_io_once $_[0], EV::READ , $_[1]
206     };
207 root 1.52 *writable = sub($;$) {
208 root 1.40 EV::WRITE & Coro::EV::timed_io_once $_[0], EV::WRITE, $_[1]
209     };
210     ';
211     die if $@;
212    
213     } elsif ($model eq "AnyEvent::Impl::Event" and eval { require Coro::Event }) {
214 root 1.58 Coro::_set_readyhook undef;
215 root 1.40 # let Coro::Event do its thing
216 root 1.1 } else {
217 root 1.40 # do the inefficient thing ourselves
218 root 1.1 Coro::_set_readyhook \&_activity;
219 root 1.9
220     $IDLE = new Coro sub {
221 root 1.72 my $_poll = AnyEvent->can ("_poll")
222     || AnyEvent->can ("one_event"); # AnyEvent < 6.0
223 root 1.40
224 root 1.9 while () {
225 root 1.72 $_poll->();
226 root 1.55 Coro::schedule if Coro::nready;
227 root 1.9 }
228     };
229     $IDLE->{desc} = "[AnyEvent idle process]";
230    
231 root 1.32 $Coro::idle = $IDLE;
232 root 1.47
233     # call the readyhook, in case coroutines were already readied
234     _activity;
235 root 1.1 }
236 root 1.72
237     # augment condvars
238 root 1.73 unshift @AnyEvent::CondVar::ISA, "Coro::AnyEvent::CondVar";
239 root 1.8 };
240 root 1.5
241 root 1.40 =item Coro::AnyEvent::poll
242    
243     This call will block the current thread until the event loop has polled
244 root 1.68 for potential new events and instructs the event loop to poll for new
245     events once, without blocking.
246 root 1.40
247 root 1.68 Note that this call will not actually execute the poll, nor will it wait
248     until there are some events, just block until the event loop has polled
249     for new events, so other threads will have a chance to run.
250 root 1.40
251     This is useful when you have a thread that does some computations, but you
252     still want to poll for new events from time to time. Simply call C<poll>
253     from time to time:
254    
255     my $long_calc = async {
256     for (1..10000) {
257 root 1.88 Coro::AnyEvent::poll;
258 root 1.40 # do some stuff, make sure it takes at least 0.001s or so
259     }
260     }
261    
262     Although you should also consider C<idle> or C<idle_upto> in such cases.
263    
264     =item Coro::AnyEvent::sleep $seconds
265    
266     This blocks the current thread for at least the given number of seconds.
267    
268     =item Coro::AnyEvent::idle
269    
270     This call is similar to C<poll> in that it will also poll for
271     events. Unlike C<poll>, it will only resume the thread once there are no
272     events to handle anymore, i.e. when the process is otherwise idle.
273    
274 root 1.56 This is good for background threads that shouldn't use CPU time when
275     foreground jobs are ready to run.
276    
277 root 1.40 =item Coro::AnyEvent::idle_upto $seconds
278    
279     Like C<idle>, but with a maximum waiting time.
280    
281     If your process is busy handling events, calling C<idle> can mean that
282     your thread will never be resumed. To avoid this, you can use C<idle_upto>
283     and specify a timeout, after which your thread will be resumed even if the
284     process is completely busy.
285    
286 root 1.44 =item Coro::AnyEvent::readable $fh_or_fileno[, $timeout]
287 root 1.40
288 root 1.44 =item Coro::AnyEvent::writable $fh_or_fileno[, $timeout]
289 root 1.40
290 root 1.44 Blocks the current thread until the given file handle (or file descriptor)
291     becomes readable (or writable), or the given timeout has elapsed,
292     whichever happens first. No timeout counts as infinite timeout.
293 root 1.40
294     Returns true when the file handle became ready, false when a timeout
295 root 1.107 occurred.
296 root 1.40
297     Note that these functions are quite inefficient as compared to using a
298     single watcher (they recreate watchers on every invocation) or compared to
299     using Coro::Handle.
300    
301     Note also that they only work for sources that have reasonable
302     non-blocking behaviour (e.g. not files).
303    
304     Example: wait until STDIN becomes readable, then quit the program.
305    
306     use Coro::AnyEvent;
307     print "press enter to quit...\n";
308     Coro::AnyEvent::readable *STDIN;
309     exit 0;
310    
311     =cut
312    
313     sub poll() {
314 root 1.51 my $w = AE::timer 0, 0, Coro::rouse_cb;
315 root 1.40 Coro::rouse_wait;
316     }
317    
318     sub sleep($) {
319 root 1.51 my $w = AE::timer $_[0], 0, Coro::rouse_cb;
320 root 1.40 Coro::rouse_wait;
321     }
322    
323     sub idle() {
324 root 1.51 my $w = AE::idle Coro::rouse_cb;
325 root 1.40 Coro::rouse_wait;
326     }
327    
328     sub idle_upto($) {
329     my $cb = Coro::rouse_cb;
330 root 1.51 my $t = AE::timer shift, 0, $cb;
331     my $w = AE::idle $cb;
332 root 1.40 Coro::rouse_wait;
333     }
334    
335     sub readable($;$) {
336     my $cb = Coro::rouse_cb;
337 root 1.51 my $w = AE::io $_[0], 0, sub { $cb->(1) };
338     my $t = defined $_[1] && AE::timer $_[1], 0, sub { $cb->(0) };
339 root 1.40 Coro::rouse_wait
340     }
341    
342     sub writable($;$) {
343     my $cb = Coro::rouse_cb;
344 root 1.51 my $w = AE::io $_[0], 1, sub { $cb->(1) };
345     my $t = defined $_[1] && AE::timer $_[1], 0, sub { $cb->(0) };
346 root 1.40 Coro::rouse_wait
347     }
348    
349 root 1.118 sub Coro::AnyEvent::CondVar::_send {
350 root 1.73 (delete $_[0]{_ae_coro})->ready if $_[0]{_ae_coro};
351     };
352    
353 root 1.118 sub Coro::AnyEvent::CondVar::_wait {
354 root 1.73 until ($_[0]{_ae_sent}) {
355 root 1.119 $AnyEvent::CondVar::Base::WAITING = 0; # avoid recursive check by AnyEvent
356 root 1.73 local $_[0]{_ae_coro} = $Coro::current;
357     Coro::schedule;
358     }
359     };
360    
361 root 1.1 1;
362    
363 root 1.40 =back
364    
365     =head1 IMPLEMENTATION DETAILS
366    
367     Unfortunately, few event loops (basically only L<EV> and L<Event>)
368     support the kind of integration required for smooth operations well, and
369     consequently, AnyEvent cannot completely offer the functionality required
370     by this module, so we need to improvise.
371    
372     Here is what this module does when it has to work with other event loops:
373    
374     =over 4
375    
376     =item * run ready threads before blocking the process
377    
378     Each time a thread is put into the ready queue (and there are no other
379     threads in the ready queue), a timer with an C<after> value of C<0> is
380     registered with AnyEvent.
381    
382     This creates something similar to an I<idle> watcher, i.e. a watcher
383     that keeps the event loop from blocking but still polls for new
384     events. (Unfortunately, some badly designed event loops (e.g. Event::Lib)
385     don't support a timeout of C<0> and will always block for a bit).
386    
387     The callback for that timer will C<cede> to other threads of the same or
388     higher priority for as long as such threads exists. This has the effect of
389     running all threads that have work to do until all threads block to wait
390     for external events.
391    
392     If no threads of equal or higher priority are ready, it will cede to any
393     thread, but only once. This has the effect of running lower-priority
394     threads as well, but it will not keep higher priority threads from
395     receiving new events.
396    
397     The priority used is simply the priority of the thread that runs the event
398     loop, usually the main program, which usually has a priority of C<0>. Note
399     that Coro::AnyEvent does I<not> run an event loop for you, so unless the
400     main program runs one, there will simply be no event loop to C<cede> to
401     (event handling will still work, somewhat inefficiently, but any thread
402     will have a higher priority than event handling in that case).
403    
404     =item * provide a suitable idle callback.
405    
406     In addition to hooking into C<ready>, this module will also provide a
407     C<$Coro::idle> handler that runs the event loop. It is best not to take
408     advantage of this too often, as this is rather inefficient, but it should
409     work perfectly fine.
410    
411     =item * provide overrides for AnyEvent's condvars
412    
413     This module installs overrides for AnyEvent's condvars. That is, when
414     the module is loaded it will provide its own condition variables. This
415     makes them coroutine-safe, i.e. you can safely block on them from within a
416     coroutine.
417    
418     =item * lead to data corruption or worse
419    
420     As C<unblock_sub> cannot be used by this module (as it is the module
421     that implements it, basically), you must not call into the event
422     loop recursively from any coroutine. This is not usually a difficult
423     restriction to live with, just use condvars, C<unblock_sub> or other means
424     of inter-coroutine-communications.
425    
426 root 1.77 If you use a module that supports AnyEvent (or uses the same event
427     loop as AnyEvent, making it implicitly compatible), and it offers
428     callbacks of any kind, then you must not block in them, either (or use
429     e.g. C<unblock_sub>), see the description of C<unblock_sub> in the
430     L<Coro> module.
431 root 1.40
432     This also means that you should load the module as early as possible,
433     as only condvars created after this module has been loaded will work
434     correctly.
435    
436     =back
437    
438 root 1.3 =head1 SEE ALSO
439    
440     L<AnyEvent>, to see which event loops are supported, L<Coro::EV> and
441     L<Coro::Event> for more efficient and more correct solutions (they will be
442     used automatically if applicable).
443    
444 root 1.96 =head1 AUTHOR/SUPPORT/CONTACT
445 root 1.1
446 root 1.96 Marc A. Lehmann <schmorp@schmorp.de>
447     http://software.schmorp.de/pkg/Coro.html
448 root 1.1
449     =cut
450