ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/AnyEvent.pm
Revision: 1.48
Committed: Wed Jul 22 03:02:07 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
CVS Tags: rel-5_161
Changes since 1.47: +2 -2 lines
Log Message:
5.161

File Contents

# Content
1 =head1 NAME
2
3 Coro::AnyEvent - integrate threads into AnyEvent
4
5 =head1 SYNOPSIS
6
7 use Coro;
8 use Coro::AnyEvent;
9
10 # use coro within an AnyEvent environment
11
12 =head1 DESCRIPTION
13
14 When one naively starts to use threads in Perl, one will quickly run
15 into the problem that threads that block on a syscall (sleeping,
16 reading from a socket etc.) will block all threads.
17
18 If one then uses an event loop, the problem is that the event loop has
19 no knowledge of threads and will not run them before it polls for new
20 events, again blocking the whole process.
21
22 This module integrates threads into any event loop supported by
23 AnyEvent, combining event-based programming with coroutine-based
24 programming in a natural way.
25
26 All you have to do is C<use Coro::AnyEvent>, run the event loop of your
27 choice in some thread and then you can run threads freely.
28
29 =head1 USAGE
30
31 This module autodetects the event loop used (by relying on L<AnyEvent>)
32 and will either automatically defer to the high-performance L<Coro::EV> or
33 L<Coro::Event> modules, or will use a generic integration into any event
34 loop supported by L<AnyEvent>.
35
36 Note that if you need to wait for a single event, the rouse functions will
37 come in handy (see the Coro manpage for details):
38
39 # wait for single SIGINT
40 {
41 my $int_w = AnyEvent->signal (signal => "INT", cb => Coro::rouse_cb);
42 Coro::rouse_wait;
43 }
44
45 =head1 FUNCTIONS
46
47 Coro::AnyEvent offers a few functions that might be useful for
48 "background" threads:
49
50 =over 4
51
52 =cut
53
54 package Coro::AnyEvent;
55
56 no warnings;
57 use strict;
58
59 use Coro;
60 use AnyEvent ();
61
62 our $VERSION = 5.161;
63
64 #############################################################################
65 # idle handler
66
67 our $IDLE;
68
69 #############################################################################
70 # 0-timeout idle emulation watcher
71
72 our $ACTIVITY;
73
74 sub _activity {
75 $ACTIVITY ||= AnyEvent->timer (after => 0, cb => \&_schedule);
76 }
77
78 Coro::_set_readyhook (\&AnyEvent::detect);
79
80 AnyEvent::post_detect {
81 unshift @AnyEvent::CondVar::ISA, "Coro::AnyEvent::CondVar";
82
83 Coro::_set_readyhook undef;
84
85 my $model = $AnyEvent::MODEL;
86
87 if ($model eq "AnyEvent::Impl::EV" and eval { require Coro::EV }) {
88 # provider faster versions of some functions
89
90 eval '
91 *sleep = \&Coro::EV::timer_once;
92 *poll = \&Coro::EV::_poll;
93 *idle = sub {
94 my $w = EV::idle Coro::rouse_cb;
95 Coro::rouse_wait;
96 };
97 *idle_upto = sub {
98 my $cb = Coro::rouse_cb;
99 my $t = EV::timer $_[0], 0, $cb;
100 my $w = EV::idle $cb;
101 Coro::rouse_wait;
102 };
103 *readable = sub {
104 EV::READ & Coro::EV::timed_io_once $_[0], EV::READ , $_[1]
105 };
106 *writable = sub {
107 EV::WRITE & Coro::EV::timed_io_once $_[0], EV::WRITE, $_[1]
108 };
109 ';
110 die if $@;
111
112 } elsif ($model eq "AnyEvent::Impl::Event" and eval { require Coro::Event }) {
113 # let Coro::Event do its thing
114 } else {
115 # do the inefficient thing ourselves
116 Coro::_set_readyhook \&_activity;
117
118 $IDLE = new Coro sub {
119 my $one_event = AnyEvent->can ("one_event");
120
121 while () {
122 $one_event->();
123 Coro::schedule;
124 }
125 };
126 $IDLE->{desc} = "[AnyEvent idle process]";
127
128 $Coro::idle = $IDLE;
129
130 # call the readyhook, in case coroutines were already readied
131 _activity;
132 }
133 };
134
135 =item Coro::AnyEvent::poll
136
137 This call will block the current thread until the event loop has polled
138 for new events and instructs the event loop to poll for new events once,
139 without blocking.
140
141 Note that this call will not actually execute the poll, just block until
142 new events have been polled, so other threads will have a chance to run.
143
144 This is useful when you have a thread that does some computations, but you
145 still want to poll for new events from time to time. Simply call C<poll>
146 from time to time:
147
148 my $long_calc = async {
149 for (1..10000) {
150 Coro::AnyEvent::poll:
151 # do some stuff, make sure it takes at least 0.001s or so
152 }
153 }
154
155 Although you should also consider C<idle> or C<idle_upto> in such cases.
156
157 =item Coro::AnyEvent::sleep $seconds
158
159 This blocks the current thread for at least the given number of seconds.
160
161 =item Coro::AnyEvent::idle
162
163 This call is similar to C<poll> in that it will also poll for
164 events. Unlike C<poll>, it will only resume the thread once there are no
165 events to handle anymore, i.e. when the process is otherwise idle.
166
167 =item Coro::AnyEvent::idle_upto $seconds
168
169 Like C<idle>, but with a maximum waiting time.
170
171 If your process is busy handling events, calling C<idle> can mean that
172 your thread will never be resumed. To avoid this, you can use C<idle_upto>
173 and specify a timeout, after which your thread will be resumed even if the
174 process is completely busy.
175
176 =item Coro::AnyEvent::readable $fh_or_fileno[, $timeout]
177
178 =item Coro::AnyEvent::writable $fh_or_fileno[, $timeout]
179
180 Blocks the current thread until the given file handle (or file descriptor)
181 becomes readable (or writable), or the given timeout has elapsed,
182 whichever happens first. No timeout counts as infinite timeout.
183
184 Returns true when the file handle became ready, false when a timeout
185 occured.
186
187 Note that these functions are quite inefficient as compared to using a
188 single watcher (they recreate watchers on every invocation) or compared to
189 using Coro::Handle.
190
191 Note also that they only work for sources that have reasonable
192 non-blocking behaviour (e.g. not files).
193
194 Example: wait until STDIN becomes readable, then quit the program.
195
196 use Coro::AnyEvent;
197 print "press enter to quit...\n";
198 Coro::AnyEvent::readable *STDIN;
199 exit 0;
200
201 =cut
202
203 sub poll() {
204 my $w = AnyEvent->timer (after => 0, cb => Coro::rouse_cb);
205 Coro::rouse_wait;
206 }
207
208 sub sleep($) {
209 my $w = AnyEvent->timer (after => $_[0], cb => Coro::rouse_cb);
210 Coro::rouse_wait;
211 }
212
213 sub idle() {
214 my $w = AnyEvent->idle (cb => Coro::rouse_cb);
215 Coro::rouse_wait;
216 }
217
218 sub idle_upto($) {
219 my $cb = Coro::rouse_cb;
220 my $t = AnyEvent->timer (after => shift, cb => $cb);
221 my $w = AnyEvent->idle (cb => $cb);
222 Coro::rouse_wait;
223 }
224
225 sub readable($;$) {
226 my $cb = Coro::rouse_cb;
227 my $w = AnyEvent->io (fh => $_[0], poll => "r", cb => sub { $cb->(1) });
228 my $t = defined $_[1] && AnyEvent->timer (after => $_[1], cb => sub { $cb->(0) });
229 Coro::rouse_wait
230 }
231
232 sub writable($;$) {
233 my $cb = Coro::rouse_cb;
234 my $w = AnyEvent->io (fh => $_[0], poll => "w", cb => sub { $cb->(1) });
235 my $t = defined $_[1] && AnyEvent->timer (after => $_[1], cb => sub { $cb->(0) });
236 Coro::rouse_wait
237 }
238
239 #############################################################################
240 # override condvars
241
242 package Coro::AnyEvent::CondVar;
243
244 sub _send {
245 (delete $_[0]{_ae_coro})->ready if $_[0]{_ae_coro};
246 }
247
248 sub _wait {
249 while (!$_[0]{_ae_sent}) {
250 local $_[0]{_ae_coro} = $Coro::current;
251 Coro::schedule;
252 }
253 }
254
255 1;
256
257 =back
258
259 =head1 IMPLEMENTATION DETAILS
260
261 Unfortunately, few event loops (basically only L<EV> and L<Event>)
262 support the kind of integration required for smooth operations well, and
263 consequently, AnyEvent cannot completely offer the functionality required
264 by this module, so we need to improvise.
265
266 Here is what this module does when it has to work with other event loops:
267
268 =over 4
269
270 =item * run ready threads before blocking the process
271
272 Each time a thread is put into the ready queue (and there are no other
273 threads in the ready queue), a timer with an C<after> value of C<0> is
274 registered with AnyEvent.
275
276 This creates something similar to an I<idle> watcher, i.e. a watcher
277 that keeps the event loop from blocking but still polls for new
278 events. (Unfortunately, some badly designed event loops (e.g. Event::Lib)
279 don't support a timeout of C<0> and will always block for a bit).
280
281 The callback for that timer will C<cede> to other threads of the same or
282 higher priority for as long as such threads exists. This has the effect of
283 running all threads that have work to do until all threads block to wait
284 for external events.
285
286 If no threads of equal or higher priority are ready, it will cede to any
287 thread, but only once. This has the effect of running lower-priority
288 threads as well, but it will not keep higher priority threads from
289 receiving new events.
290
291 The priority used is simply the priority of the thread that runs the event
292 loop, usually the main program, which usually has a priority of C<0>. Note
293 that Coro::AnyEvent does I<not> run an event loop for you, so unless the
294 main program runs one, there will simply be no event loop to C<cede> to
295 (event handling will still work, somewhat inefficiently, but any thread
296 will have a higher priority than event handling in that case).
297
298 =item * provide a suitable idle callback.
299
300 In addition to hooking into C<ready>, this module will also provide a
301 C<$Coro::idle> handler that runs the event loop. It is best not to take
302 advantage of this too often, as this is rather inefficient, but it should
303 work perfectly fine.
304
305 =item * provide overrides for AnyEvent's condvars
306
307 This module installs overrides for AnyEvent's condvars. That is, when
308 the module is loaded it will provide its own condition variables. This
309 makes them coroutine-safe, i.e. you can safely block on them from within a
310 coroutine.
311
312 =item * lead to data corruption or worse
313
314 As C<unblock_sub> cannot be used by this module (as it is the module
315 that implements it, basically), you must not call into the event
316 loop recursively from any coroutine. This is not usually a difficult
317 restriction to live with, just use condvars, C<unblock_sub> or other means
318 of inter-coroutine-communications.
319
320 If you use a module that supports AnyEvent (or uses the same event loop
321 as AnyEvent, making the compatible), and it offers callbacks of any kind,
322 then you must not block in them, either (or use e.g. C<unblock_sub>), see
323 the description of C<unblock_sub> in the L<Coro> module.
324
325 This also means that you should load the module as early as possible,
326 as only condvars created after this module has been loaded will work
327 correctly.
328
329 =back
330
331 =head1 SEE ALSO
332
333 L<AnyEvent>, to see which event loops are supported, L<Coro::EV> and
334 L<Coro::Event> for more efficient and more correct solutions (they will be
335 used automatically if applicable).
336
337 =head1 AUTHOR
338
339 Marc Lehmann <schmorp@schmorp.de>
340 http://home.schmorp.de/
341
342 =cut
343