ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/AnyEvent.pm
Revision: 1.44
Committed: Thu Jul 16 02:19:17 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Changes since 1.43: +7 -5 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.39 Coro::AnyEvent - integrate threads into AnyEvent
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use Coro;
8     use Coro::AnyEvent;
9    
10 root 1.2 # use coro within an AnyEvent environment
11    
12 root 1.39 =head1 DESCRIPTION
13 root 1.2
14 root 1.39 When one naively starts to use threads in Perl, one will quickly run
15     into the problem that threads that block on a syscall (sleeping,
16     reading from a socket etc.) will block all threads.
17 root 1.2
18     If one then uses an event loop, the problem is that the event loop has
19 root 1.39 no knowledge of threads and will not run them before it polls for new
20 root 1.2 events, again blocking the whole process.
21    
22 root 1.39 This module integrates threads into any event loop supported by
23 root 1.2 AnyEvent, combining event-based programming with coroutine-based
24     programming in a natural way.
25    
26 root 1.39 All you have to do is C<use Coro::AnyEvent>, run the event loop of your
27     choice in some thread and then you can run threads freely.
28 root 1.1
29 root 1.39 =head1 USAGE
30 root 1.1
31 root 1.2 This module autodetects the event loop used (by relying on L<AnyEvent>)
32     and will either automatically defer to the high-performance L<Coro::EV> or
33     L<Coro::Event> modules, or will use a generic integration into any event
34     loop supported by L<AnyEvent>.
35    
36 root 1.40 Note that if you need to wait for a single event, the rouse functions will
37     come in handy (see the Coro manpage for details):
38 root 1.39
39 root 1.40 # wait for single SIGINT
40     {
41     my $int_w = AnyEvent->signal (signal => "INT", cb => Coro::rouse_cb);
42     Coro::rouse_wait;
43     }
44 root 1.39
45 root 1.40 =head1 FUNCTIONS
46 root 1.2
47 root 1.40 Coro::AnyEvent offers a few functions that might be useful for
48     "background" threads:
49 root 1.2
50 root 1.4 =over 4
51    
52 root 1.1 =cut
53    
54     package Coro::AnyEvent;
55    
56     no warnings;
57     use strict;
58    
59     use Coro;
60     use AnyEvent ();
61    
62 root 1.43 our $VERSION = 5.151;
63 root 1.1
64 root 1.5 #############################################################################
65     # idle handler
66    
67 root 1.9 our $IDLE;
68 root 1.1
69 root 1.5 #############################################################################
70     # 0-timeout idle emulation watcher
71    
72 root 1.1 our $ACTIVITY;
73    
74     sub _activity {
75     $ACTIVITY ||= AnyEvent->timer (after => 0, cb => \&_schedule);
76     }
77    
78 root 1.14 Coro::_set_readyhook (\&AnyEvent::detect);
79 root 1.8
80     AnyEvent::post_detect {
81 root 1.10 unshift @AnyEvent::CondVar::ISA, "Coro::AnyEvent::CondVar";
82 root 1.5
83 root 1.8 Coro::_set_readyhook undef;
84    
85     my $model = $AnyEvent::MODEL;
86 root 1.1
87 root 1.40 if ($model eq "AnyEvent::Impl::EV" and eval { require Coro::EV }) {
88     # provider faster versions of some functions
89    
90     eval '
91     *sleep = \&Coro::EV::timer_once;
92     *poll = \&Coro::EV::timer_once;
93     *idle = sub {
94     my $w = EV::idle Coro::rouse_cb;
95     Coro::rouse_wait;
96     };
97     *idle_upto = sub {
98     my $cb = Coro::rouse_cb;
99     my $t = EV::timer $_[0], 0, $cb;
100     my $w = EV::idle $cb;
101     Coro::rouse_wait;
102     };
103     *readable = sub {
104     EV::READ & Coro::EV::timed_io_once $_[0], EV::READ , $_[1]
105     };
106     *writable = sub {
107     EV::WRITE & Coro::EV::timed_io_once $_[0], EV::WRITE, $_[1]
108     };
109     ';
110     die if $@;
111    
112     } elsif ($model eq "AnyEvent::Impl::Event" and eval { require Coro::Event }) {
113     # let Coro::Event do its thing
114 root 1.1 } else {
115 root 1.40 # do the inefficient thing ourselves
116 root 1.1 Coro::_set_readyhook \&_activity;
117 root 1.9
118     $IDLE = new Coro sub {
119 root 1.32 my $one_event = AnyEvent->can ("one_event");
120 root 1.40
121 root 1.9 while () {
122 root 1.32 $one_event->();
123     Coro::schedule;
124 root 1.9 }
125     };
126     $IDLE->{desc} = "[AnyEvent idle process]";
127    
128 root 1.32 $Coro::idle = $IDLE;
129 root 1.1 }
130 root 1.44
131     _activity;
132 root 1.8 };
133 root 1.5
134 root 1.40 =item Coro::AnyEvent::poll
135    
136     This call will block the current thread until the event loop has polled
137     for new events and instructs the event loop to poll for new events once,
138     without blocking.
139    
140     Note that this call will not actually execute the poll, just block until
141     new events have been polled, so other threads will have a chance to run.
142    
143     This is useful when you have a thread that does some computations, but you
144     still want to poll for new events from time to time. Simply call C<poll>
145     from time to time:
146    
147     my $long_calc = async {
148     for (1..10000) {
149     Coro::AnyEvent::poll:
150     # do some stuff, make sure it takes at least 0.001s or so
151     }
152     }
153    
154     Although you should also consider C<idle> or C<idle_upto> in such cases.
155    
156     =item Coro::AnyEvent::sleep $seconds
157    
158     This blocks the current thread for at least the given number of seconds.
159    
160     =item Coro::AnyEvent::idle
161    
162     This call is similar to C<poll> in that it will also poll for
163     events. Unlike C<poll>, it will only resume the thread once there are no
164     events to handle anymore, i.e. when the process is otherwise idle.
165    
166     =item Coro::AnyEvent::idle_upto $seconds
167    
168     Like C<idle>, but with a maximum waiting time.
169    
170     If your process is busy handling events, calling C<idle> can mean that
171     your thread will never be resumed. To avoid this, you can use C<idle_upto>
172     and specify a timeout, after which your thread will be resumed even if the
173     process is completely busy.
174    
175 root 1.44 =item Coro::AnyEvent::readable $fh_or_fileno[, $timeout]
176 root 1.40
177 root 1.44 =item Coro::AnyEvent::writable $fh_or_fileno[, $timeout]
178 root 1.40
179 root 1.44 Blocks the current thread until the given file handle (or file descriptor)
180     becomes readable (or writable), or the given timeout has elapsed,
181     whichever happens first. No timeout counts as infinite timeout.
182 root 1.40
183     Returns true when the file handle became ready, false when a timeout
184     occured.
185    
186     Note that these functions are quite inefficient as compared to using a
187     single watcher (they recreate watchers on every invocation) or compared to
188     using Coro::Handle.
189    
190     Note also that they only work for sources that have reasonable
191     non-blocking behaviour (e.g. not files).
192    
193     Example: wait until STDIN becomes readable, then quit the program.
194    
195     use Coro::AnyEvent;
196     print "press enter to quit...\n";
197     Coro::AnyEvent::readable *STDIN;
198     exit 0;
199    
200     =cut
201    
202     sub poll() {
203     my $w = AnyEvent->timer (after => 0, cb => Coro::rouse_cb);
204     Coro::rouse_wait;
205     }
206    
207     sub sleep($) {
208     my $w = AnyEvent->timer (after => $_[0], cb => Coro::rouse_cb);
209     Coro::rouse_wait;
210     }
211    
212     sub idle() {
213     my $w = AnyEvent->idle (cb => Coro::rouse_cb);
214     Coro::rouse_wait;
215     }
216    
217     sub idle_upto($) {
218     my $cb = Coro::rouse_cb;
219     my $t = AnyEvent->timer (after => shift, cb => $cb);
220     my $w = AnyEvent->idle (cb => $cb);
221     Coro::rouse_wait;
222     }
223    
224     sub readable($;$) {
225     my $cb = Coro::rouse_cb;
226     my $w = AnyEvent->io (fh => $_[0], poll => "r", cb => sub { $cb->(1) });
227     my $t = defined $_[1] && AnyEvent->timer (after => $_[1], cb => sub { $cb->(0) });
228     Coro::rouse_wait
229     }
230    
231     sub writable($;$) {
232     my $cb = Coro::rouse_cb;
233     my $w = AnyEvent->io (fh => $_[0], poll => "w", cb => sub { $cb->(1) });
234     my $t = defined $_[1] && AnyEvent->timer (after => $_[1], cb => sub { $cb->(0) });
235     Coro::rouse_wait
236     }
237    
238 root 1.5 #############################################################################
239     # override condvars
240    
241 root 1.10 package Coro::AnyEvent::CondVar;
242 root 1.5
243 root 1.10 sub _send {
244     (delete $_[0]{_ae_coro})->ready if $_[0]{_ae_coro};
245 root 1.5 }
246    
247 root 1.10 sub _wait {
248     while (!$_[0]{_ae_sent}) {
249     local $_[0]{_ae_coro} = $Coro::current;
250 root 1.5 Coro::schedule;
251     }
252     }
253 root 1.1
254     1;
255    
256 root 1.40 =back
257    
258     =head1 IMPLEMENTATION DETAILS
259    
260     Unfortunately, few event loops (basically only L<EV> and L<Event>)
261     support the kind of integration required for smooth operations well, and
262     consequently, AnyEvent cannot completely offer the functionality required
263     by this module, so we need to improvise.
264    
265     Here is what this module does when it has to work with other event loops:
266    
267     =over 4
268    
269     =item * run ready threads before blocking the process
270    
271     Each time a thread is put into the ready queue (and there are no other
272     threads in the ready queue), a timer with an C<after> value of C<0> is
273     registered with AnyEvent.
274    
275     This creates something similar to an I<idle> watcher, i.e. a watcher
276     that keeps the event loop from blocking but still polls for new
277     events. (Unfortunately, some badly designed event loops (e.g. Event::Lib)
278     don't support a timeout of C<0> and will always block for a bit).
279    
280     The callback for that timer will C<cede> to other threads of the same or
281     higher priority for as long as such threads exists. This has the effect of
282     running all threads that have work to do until all threads block to wait
283     for external events.
284    
285     If no threads of equal or higher priority are ready, it will cede to any
286     thread, but only once. This has the effect of running lower-priority
287     threads as well, but it will not keep higher priority threads from
288     receiving new events.
289    
290     The priority used is simply the priority of the thread that runs the event
291     loop, usually the main program, which usually has a priority of C<0>. Note
292     that Coro::AnyEvent does I<not> run an event loop for you, so unless the
293     main program runs one, there will simply be no event loop to C<cede> to
294     (event handling will still work, somewhat inefficiently, but any thread
295     will have a higher priority than event handling in that case).
296    
297     =item * provide a suitable idle callback.
298    
299     In addition to hooking into C<ready>, this module will also provide a
300     C<$Coro::idle> handler that runs the event loop. It is best not to take
301     advantage of this too often, as this is rather inefficient, but it should
302     work perfectly fine.
303    
304     =item * provide overrides for AnyEvent's condvars
305    
306     This module installs overrides for AnyEvent's condvars. That is, when
307     the module is loaded it will provide its own condition variables. This
308     makes them coroutine-safe, i.e. you can safely block on them from within a
309     coroutine.
310    
311     =item * lead to data corruption or worse
312    
313     As C<unblock_sub> cannot be used by this module (as it is the module
314     that implements it, basically), you must not call into the event
315     loop recursively from any coroutine. This is not usually a difficult
316     restriction to live with, just use condvars, C<unblock_sub> or other means
317     of inter-coroutine-communications.
318    
319     If you use a module that supports AnyEvent (or uses the same event loop
320     as AnyEvent, making the compatible), and it offers callbacks of any kind,
321     then you must not block in them, either (or use e.g. C<unblock_sub>), see
322     the description of C<unblock_sub> in the L<Coro> module.
323    
324     This also means that you should load the module as early as possible,
325     as only condvars created after this module has been loaded will work
326     correctly.
327    
328     =back
329    
330 root 1.3 =head1 SEE ALSO
331    
332     L<AnyEvent>, to see which event loops are supported, L<Coro::EV> and
333     L<Coro::Event> for more efficient and more correct solutions (they will be
334     used automatically if applicable).
335    
336 root 1.1 =head1 AUTHOR
337    
338     Marc Lehmann <schmorp@schmorp.de>
339     http://home.schmorp.de/
340    
341     =cut
342