ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/AnyEvent.pm
Revision: 1.54
Committed: Fri Dec 11 18:32:23 2009 UTC (14 years, 6 months ago) by root
Branch: MAIN
Changes since 1.53: +57 -7 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 Coro::AnyEvent - integrate threads into AnyEvent
4
5 =head1 SYNOPSIS
6
7 use Coro;
8 use AnyEvent;
9 # using both Coro and AnyEvent will automatically load Coro::AnyEvent
10
11 # use Coro within an AnyEvent environment
12
13 =head1 DESCRIPTION
14
15 When one naively starts to use threads in Perl, one will quickly run
16 into the problem that threads that block on a syscall (sleeping,
17 reading from a socket etc.) will block all threads.
18
19 If one then uses an event loop, the problem is that the event loop has
20 no knowledge of threads and will not run them before it polls for new
21 events, again blocking the whole process.
22
23 This module integrates threads into any event loop supported by
24 AnyEvent, combining event-based programming with coroutine-based
25 programming in a natural way.
26
27 As of Coro 5.21 and newer, this module gets loaded automatically when
28 AnyEvent initialises itself and Coro is used in the same process, thus
29 there is no need to load it manually if you just want your threads to
30 coexist with AnyEvent.
31
32 If you want to use any functions from this module, you of course still
33 need to C<use Coro::AnyEvent>, just as with other perl modules.
34
35 =head1 USAGE
36
37 This module autodetects the event loop used (by relying on L<AnyEvent>)
38 and will either automatically defer to the high-performance L<Coro::EV> or
39 L<Coro::Event> modules, or will use a generic integration method into any
40 event loop supported by L<AnyEvent>.
41
42 Note that if you need to wait for a single event, the rouse functions will
43 come in handy (see the Coro manpage for details):
44
45 # wait for single SIGINT
46 {
47 my $int_w = AnyEvent->signal (signal => "INT", cb => Coro::rouse_cb);
48 Coro::rouse_wait;
49 }
50
51 =head2 EVENT MODULES OTHER THEN ANYEVENT
52
53 Keep in mind that, as shipped, Coro and Coro::AnyEvent only work with
54 AnyEvent, and only when AnyEvent is actually used (i.e. initialised), so
55 this will not work:
56
57 # does not work: only AnyEvent is supported
58 use EV;
59 use Coro;
60
61 EV::loop;
62
63 And neither does this, unless you actually I<use> AnyEvent for something:
64
65 # does not work: AnyEvent must be initialised
66 use EV;
67 use AnyEvent;
68 use Coro;
69
70 EV::loop;
71
72 This does work, however, because you create a watcher (condvars work,
73 too), thus forcing AnyEvent to initialise itself:
74
75 # does work: AnyEvent is actually used
76 use EV;
77 use AnyEvent;
78 use Coro;
79
80 my $timer = AE::timer 1, 1, sub { };
81
82 EV::loop;
83
84 And if you want to use AnyEvent just to bridge between Coro and your event
85 model of choice, you can simply force it to initialise itself, like this:
86
87 # does work: AnyEvent is initialised manually
88 use POE;
89 use AnyEvent;
90 use Coro;
91
92 AnyEvent::detect;
93 POE::Kernel->run;
94
95 =head1 FUNCTIONS
96
97 Coro::AnyEvent offers a few functions that might be useful.
98
99 =over 4
100
101 =cut
102
103 package Coro::AnyEvent;
104
105 use common::sense;
106
107 use Coro;
108 use AnyEvent ();
109
110 our $VERSION = 5.21;
111
112 #############################################################################
113 # idle handler
114
115 our $IDLE;
116
117 #############################################################################
118 # 0-timeout idle emulation watcher
119
120 our $ACTIVITY;
121
122 sub _activity {
123 $ACTIVITY ||= AE::timer 0, 0, \&_schedule;
124 }
125
126 Coro::_set_readyhook (\&AnyEvent::detect);
127
128 AnyEvent::post_detect {
129 unshift @AnyEvent::CondVar::ISA, "Coro::AnyEvent::CondVar";
130
131 Coro::_set_readyhook undef;
132
133 my $model = $AnyEvent::MODEL;
134
135 if ($model eq "AnyEvent::Impl::EV" and eval { require Coro::EV }) {
136 # provider faster versions of some functions
137
138 eval '
139 *sleep = \&Coro::EV::timer_once;
140 *poll = \&Coro::EV::_poll;
141 *idle = sub() {
142 my $w = EV::idle Coro::rouse_cb;
143 Coro::rouse_wait;
144 };
145 *idle_upto = sub($) {
146 my $cb = Coro::rouse_cb;
147 my $t = EV::timer $_[0], 0, $cb;
148 my $w = EV::idle $cb;
149 Coro::rouse_wait;
150 };
151 *readable = sub($;$) {
152 EV::READ & Coro::EV::timed_io_once $_[0], EV::READ , $_[1]
153 };
154 *writable = sub($;$) {
155 EV::WRITE & Coro::EV::timed_io_once $_[0], EV::WRITE, $_[1]
156 };
157 ';
158 die if $@;
159
160 } elsif ($model eq "AnyEvent::Impl::Event" and eval { require Coro::Event }) {
161 # let Coro::Event do its thing
162 } else {
163 # do the inefficient thing ourselves
164 Coro::_set_readyhook \&_activity;
165
166 $IDLE = new Coro sub {
167 my $one_event = AnyEvent->can ("one_event");
168
169 while () {
170 $one_event->();
171 Coro::schedule;
172 }
173 };
174 $IDLE->{desc} = "[AnyEvent idle process]";
175
176 $Coro::idle = $IDLE;
177
178 # call the readyhook, in case coroutines were already readied
179 _activity;
180 }
181 };
182
183 =item Coro::AnyEvent::poll
184
185 This call will block the current thread until the event loop has polled
186 for new events and instructs the event loop to poll for new events once,
187 without blocking.
188
189 Note that this call will not actually execute the poll, just block until
190 new events have been polled, so other threads will have a chance to run.
191
192 This is useful when you have a thread that does some computations, but you
193 still want to poll for new events from time to time. Simply call C<poll>
194 from time to time:
195
196 my $long_calc = async {
197 for (1..10000) {
198 Coro::AnyEvent::poll:
199 # do some stuff, make sure it takes at least 0.001s or so
200 }
201 }
202
203 Although you should also consider C<idle> or C<idle_upto> in such cases.
204
205 =item Coro::AnyEvent::sleep $seconds
206
207 This blocks the current thread for at least the given number of seconds.
208
209 =item Coro::AnyEvent::idle
210
211 This call is similar to C<poll> in that it will also poll for
212 events. Unlike C<poll>, it will only resume the thread once there are no
213 events to handle anymore, i.e. when the process is otherwise idle.
214
215 =item Coro::AnyEvent::idle_upto $seconds
216
217 Like C<idle>, but with a maximum waiting time.
218
219 If your process is busy handling events, calling C<idle> can mean that
220 your thread will never be resumed. To avoid this, you can use C<idle_upto>
221 and specify a timeout, after which your thread will be resumed even if the
222 process is completely busy.
223
224 =item Coro::AnyEvent::readable $fh_or_fileno[, $timeout]
225
226 =item Coro::AnyEvent::writable $fh_or_fileno[, $timeout]
227
228 Blocks the current thread until the given file handle (or file descriptor)
229 becomes readable (or writable), or the given timeout has elapsed,
230 whichever happens first. No timeout counts as infinite timeout.
231
232 Returns true when the file handle became ready, false when a timeout
233 occured.
234
235 Note that these functions are quite inefficient as compared to using a
236 single watcher (they recreate watchers on every invocation) or compared to
237 using Coro::Handle.
238
239 Note also that they only work for sources that have reasonable
240 non-blocking behaviour (e.g. not files).
241
242 Example: wait until STDIN becomes readable, then quit the program.
243
244 use Coro::AnyEvent;
245 print "press enter to quit...\n";
246 Coro::AnyEvent::readable *STDIN;
247 exit 0;
248
249 =cut
250
251 sub poll() {
252 my $w = AE::timer 0, 0, Coro::rouse_cb;
253 Coro::rouse_wait;
254 }
255
256 sub sleep($) {
257 my $w = AE::timer $_[0], 0, Coro::rouse_cb;
258 Coro::rouse_wait;
259 }
260
261 sub idle() {
262 my $w = AE::idle Coro::rouse_cb;
263 Coro::rouse_wait;
264 }
265
266 sub idle_upto($) {
267 my $cb = Coro::rouse_cb;
268 my $t = AE::timer shift, 0, $cb;
269 my $w = AE::idle $cb;
270 Coro::rouse_wait;
271 }
272
273 sub readable($;$) {
274 my $cb = Coro::rouse_cb;
275 my $w = AE::io $_[0], 0, sub { $cb->(1) };
276 my $t = defined $_[1] && AE::timer $_[1], 0, sub { $cb->(0) };
277 Coro::rouse_wait
278 }
279
280 sub writable($;$) {
281 my $cb = Coro::rouse_cb;
282 my $w = AE::io $_[0], 1, sub { $cb->(1) };
283 my $t = defined $_[1] && AE::timer $_[1], 0, sub { $cb->(0) };
284 Coro::rouse_wait
285 }
286
287 #############################################################################
288 # override condvars
289
290 package Coro::AnyEvent::CondVar;
291
292 sub _send {
293 (delete $_[0]{_ae_coro})->ready if $_[0]{_ae_coro};
294 }
295
296 sub _wait {
297 while (!$_[0]{_ae_sent}) {
298 local $_[0]{_ae_coro} = $Coro::current;
299 Coro::schedule;
300 }
301 }
302
303 1;
304
305 =back
306
307 =head1 IMPLEMENTATION DETAILS
308
309 Unfortunately, few event loops (basically only L<EV> and L<Event>)
310 support the kind of integration required for smooth operations well, and
311 consequently, AnyEvent cannot completely offer the functionality required
312 by this module, so we need to improvise.
313
314 Here is what this module does when it has to work with other event loops:
315
316 =over 4
317
318 =item * run ready threads before blocking the process
319
320 Each time a thread is put into the ready queue (and there are no other
321 threads in the ready queue), a timer with an C<after> value of C<0> is
322 registered with AnyEvent.
323
324 This creates something similar to an I<idle> watcher, i.e. a watcher
325 that keeps the event loop from blocking but still polls for new
326 events. (Unfortunately, some badly designed event loops (e.g. Event::Lib)
327 don't support a timeout of C<0> and will always block for a bit).
328
329 The callback for that timer will C<cede> to other threads of the same or
330 higher priority for as long as such threads exists. This has the effect of
331 running all threads that have work to do until all threads block to wait
332 for external events.
333
334 If no threads of equal or higher priority are ready, it will cede to any
335 thread, but only once. This has the effect of running lower-priority
336 threads as well, but it will not keep higher priority threads from
337 receiving new events.
338
339 The priority used is simply the priority of the thread that runs the event
340 loop, usually the main program, which usually has a priority of C<0>. Note
341 that Coro::AnyEvent does I<not> run an event loop for you, so unless the
342 main program runs one, there will simply be no event loop to C<cede> to
343 (event handling will still work, somewhat inefficiently, but any thread
344 will have a higher priority than event handling in that case).
345
346 =item * provide a suitable idle callback.
347
348 In addition to hooking into C<ready>, this module will also provide a
349 C<$Coro::idle> handler that runs the event loop. It is best not to take
350 advantage of this too often, as this is rather inefficient, but it should
351 work perfectly fine.
352
353 =item * provide overrides for AnyEvent's condvars
354
355 This module installs overrides for AnyEvent's condvars. That is, when
356 the module is loaded it will provide its own condition variables. This
357 makes them coroutine-safe, i.e. you can safely block on them from within a
358 coroutine.
359
360 =item * lead to data corruption or worse
361
362 As C<unblock_sub> cannot be used by this module (as it is the module
363 that implements it, basically), you must not call into the event
364 loop recursively from any coroutine. This is not usually a difficult
365 restriction to live with, just use condvars, C<unblock_sub> or other means
366 of inter-coroutine-communications.
367
368 If you use a module that supports AnyEvent (or uses the same event loop
369 as AnyEvent, making the compatible), and it offers callbacks of any kind,
370 then you must not block in them, either (or use e.g. C<unblock_sub>), see
371 the description of C<unblock_sub> in the L<Coro> module.
372
373 This also means that you should load the module as early as possible,
374 as only condvars created after this module has been loaded will work
375 correctly.
376
377 =back
378
379 =head1 SEE ALSO
380
381 L<AnyEvent>, to see which event loops are supported, L<Coro::EV> and
382 L<Coro::Event> for more efficient and more correct solutions (they will be
383 used automatically if applicable).
384
385 =head1 AUTHOR
386
387 Marc Lehmann <schmorp@schmorp.de>
388 http://home.schmorp.de/
389
390 =cut
391