ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/AnyEvent.pm
Revision: 1.46
Committed: Mon Jul 20 23:46:28 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
CVS Tags: rel-5_16
Changes since 1.45: +1 -1 lines
Log Message:
5.16

File Contents

# Content
1 =head1 NAME
2
3 Coro::AnyEvent - integrate threads into AnyEvent
4
5 =head1 SYNOPSIS
6
7 use Coro;
8 use Coro::AnyEvent;
9
10 # use coro within an AnyEvent environment
11
12 =head1 DESCRIPTION
13
14 When one naively starts to use threads in Perl, one will quickly run
15 into the problem that threads that block on a syscall (sleeping,
16 reading from a socket etc.) will block all threads.
17
18 If one then uses an event loop, the problem is that the event loop has
19 no knowledge of threads and will not run them before it polls for new
20 events, again blocking the whole process.
21
22 This module integrates threads into any event loop supported by
23 AnyEvent, combining event-based programming with coroutine-based
24 programming in a natural way.
25
26 All you have to do is C<use Coro::AnyEvent>, run the event loop of your
27 choice in some thread and then you can run threads freely.
28
29 =head1 USAGE
30
31 This module autodetects the event loop used (by relying on L<AnyEvent>)
32 and will either automatically defer to the high-performance L<Coro::EV> or
33 L<Coro::Event> modules, or will use a generic integration into any event
34 loop supported by L<AnyEvent>.
35
36 Note that if you need to wait for a single event, the rouse functions will
37 come in handy (see the Coro manpage for details):
38
39 # wait for single SIGINT
40 {
41 my $int_w = AnyEvent->signal (signal => "INT", cb => Coro::rouse_cb);
42 Coro::rouse_wait;
43 }
44
45 =head1 FUNCTIONS
46
47 Coro::AnyEvent offers a few functions that might be useful for
48 "background" threads:
49
50 =over 4
51
52 =cut
53
54 package Coro::AnyEvent;
55
56 no warnings;
57 use strict;
58
59 use Coro;
60 use AnyEvent ();
61
62 our $VERSION = 5.16;
63
64 #############################################################################
65 # idle handler
66
67 our $IDLE;
68
69 #############################################################################
70 # 0-timeout idle emulation watcher
71
72 our $ACTIVITY;
73
74 sub _activity {
75 $ACTIVITY ||= AnyEvent->timer (after => 0, cb => \&_schedule);
76 }
77
78 Coro::_set_readyhook (\&AnyEvent::detect);
79
80 AnyEvent::post_detect {
81 unshift @AnyEvent::CondVar::ISA, "Coro::AnyEvent::CondVar";
82
83 Coro::_set_readyhook undef;
84
85 my $model = $AnyEvent::MODEL;
86
87 if ($model eq "AnyEvent::Impl::EV" and eval { require Coro::EV }) {
88 # provider faster versions of some functions
89
90 eval '
91 *sleep = \&Coro::EV::timer_once;
92 *poll = \&Coro::EV::timer_once;
93 *idle = sub {
94 my $w = EV::idle Coro::rouse_cb;
95 Coro::rouse_wait;
96 };
97 *idle_upto = sub {
98 my $cb = Coro::rouse_cb;
99 my $t = EV::timer $_[0], 0, $cb;
100 my $w = EV::idle $cb;
101 Coro::rouse_wait;
102 };
103 *readable = sub {
104 EV::READ & Coro::EV::timed_io_once $_[0], EV::READ , $_[1]
105 };
106 *writable = sub {
107 EV::WRITE & Coro::EV::timed_io_once $_[0], EV::WRITE, $_[1]
108 };
109 ';
110 die if $@;
111
112 } elsif ($model eq "AnyEvent::Impl::Event" and eval { require Coro::Event }) {
113 # let Coro::Event do its thing
114 } else {
115 # do the inefficient thing ourselves
116 Coro::_set_readyhook \&_activity;
117
118 $IDLE = new Coro sub {
119 my $one_event = AnyEvent->can ("one_event");
120
121 while () {
122 $one_event->();
123 Coro::schedule;
124 }
125 };
126 $IDLE->{desc} = "[AnyEvent idle process]";
127
128 $Coro::idle = $IDLE;
129 }
130 };
131
132 =item Coro::AnyEvent::poll
133
134 This call will block the current thread until the event loop has polled
135 for new events and instructs the event loop to poll for new events once,
136 without blocking.
137
138 Note that this call will not actually execute the poll, just block until
139 new events have been polled, so other threads will have a chance to run.
140
141 This is useful when you have a thread that does some computations, but you
142 still want to poll for new events from time to time. Simply call C<poll>
143 from time to time:
144
145 my $long_calc = async {
146 for (1..10000) {
147 Coro::AnyEvent::poll:
148 # do some stuff, make sure it takes at least 0.001s or so
149 }
150 }
151
152 Although you should also consider C<idle> or C<idle_upto> in such cases.
153
154 =item Coro::AnyEvent::sleep $seconds
155
156 This blocks the current thread for at least the given number of seconds.
157
158 =item Coro::AnyEvent::idle
159
160 This call is similar to C<poll> in that it will also poll for
161 events. Unlike C<poll>, it will only resume the thread once there are no
162 events to handle anymore, i.e. when the process is otherwise idle.
163
164 =item Coro::AnyEvent::idle_upto $seconds
165
166 Like C<idle>, but with a maximum waiting time.
167
168 If your process is busy handling events, calling C<idle> can mean that
169 your thread will never be resumed. To avoid this, you can use C<idle_upto>
170 and specify a timeout, after which your thread will be resumed even if the
171 process is completely busy.
172
173 =item Coro::AnyEvent::readable $fh_or_fileno[, $timeout]
174
175 =item Coro::AnyEvent::writable $fh_or_fileno[, $timeout]
176
177 Blocks the current thread until the given file handle (or file descriptor)
178 becomes readable (or writable), or the given timeout has elapsed,
179 whichever happens first. No timeout counts as infinite timeout.
180
181 Returns true when the file handle became ready, false when a timeout
182 occured.
183
184 Note that these functions are quite inefficient as compared to using a
185 single watcher (they recreate watchers on every invocation) or compared to
186 using Coro::Handle.
187
188 Note also that they only work for sources that have reasonable
189 non-blocking behaviour (e.g. not files).
190
191 Example: wait until STDIN becomes readable, then quit the program.
192
193 use Coro::AnyEvent;
194 print "press enter to quit...\n";
195 Coro::AnyEvent::readable *STDIN;
196 exit 0;
197
198 =cut
199
200 sub poll() {
201 my $w = AnyEvent->timer (after => 0, cb => Coro::rouse_cb);
202 Coro::rouse_wait;
203 }
204
205 sub sleep($) {
206 my $w = AnyEvent->timer (after => $_[0], cb => Coro::rouse_cb);
207 Coro::rouse_wait;
208 }
209
210 sub idle() {
211 my $w = AnyEvent->idle (cb => Coro::rouse_cb);
212 Coro::rouse_wait;
213 }
214
215 sub idle_upto($) {
216 my $cb = Coro::rouse_cb;
217 my $t = AnyEvent->timer (after => shift, cb => $cb);
218 my $w = AnyEvent->idle (cb => $cb);
219 Coro::rouse_wait;
220 }
221
222 sub readable($;$) {
223 my $cb = Coro::rouse_cb;
224 my $w = AnyEvent->io (fh => $_[0], poll => "r", cb => sub { $cb->(1) });
225 my $t = defined $_[1] && AnyEvent->timer (after => $_[1], cb => sub { $cb->(0) });
226 Coro::rouse_wait
227 }
228
229 sub writable($;$) {
230 my $cb = Coro::rouse_cb;
231 my $w = AnyEvent->io (fh => $_[0], poll => "w", cb => sub { $cb->(1) });
232 my $t = defined $_[1] && AnyEvent->timer (after => $_[1], cb => sub { $cb->(0) });
233 Coro::rouse_wait
234 }
235
236 #############################################################################
237 # override condvars
238
239 package Coro::AnyEvent::CondVar;
240
241 sub _send {
242 (delete $_[0]{_ae_coro})->ready if $_[0]{_ae_coro};
243 }
244
245 sub _wait {
246 while (!$_[0]{_ae_sent}) {
247 local $_[0]{_ae_coro} = $Coro::current;
248 Coro::schedule;
249 }
250 }
251
252 1;
253
254 =back
255
256 =head1 IMPLEMENTATION DETAILS
257
258 Unfortunately, few event loops (basically only L<EV> and L<Event>)
259 support the kind of integration required for smooth operations well, and
260 consequently, AnyEvent cannot completely offer the functionality required
261 by this module, so we need to improvise.
262
263 Here is what this module does when it has to work with other event loops:
264
265 =over 4
266
267 =item * run ready threads before blocking the process
268
269 Each time a thread is put into the ready queue (and there are no other
270 threads in the ready queue), a timer with an C<after> value of C<0> is
271 registered with AnyEvent.
272
273 This creates something similar to an I<idle> watcher, i.e. a watcher
274 that keeps the event loop from blocking but still polls for new
275 events. (Unfortunately, some badly designed event loops (e.g. Event::Lib)
276 don't support a timeout of C<0> and will always block for a bit).
277
278 The callback for that timer will C<cede> to other threads of the same or
279 higher priority for as long as such threads exists. This has the effect of
280 running all threads that have work to do until all threads block to wait
281 for external events.
282
283 If no threads of equal or higher priority are ready, it will cede to any
284 thread, but only once. This has the effect of running lower-priority
285 threads as well, but it will not keep higher priority threads from
286 receiving new events.
287
288 The priority used is simply the priority of the thread that runs the event
289 loop, usually the main program, which usually has a priority of C<0>. Note
290 that Coro::AnyEvent does I<not> run an event loop for you, so unless the
291 main program runs one, there will simply be no event loop to C<cede> to
292 (event handling will still work, somewhat inefficiently, but any thread
293 will have a higher priority than event handling in that case).
294
295 =item * provide a suitable idle callback.
296
297 In addition to hooking into C<ready>, this module will also provide a
298 C<$Coro::idle> handler that runs the event loop. It is best not to take
299 advantage of this too often, as this is rather inefficient, but it should
300 work perfectly fine.
301
302 =item * provide overrides for AnyEvent's condvars
303
304 This module installs overrides for AnyEvent's condvars. That is, when
305 the module is loaded it will provide its own condition variables. This
306 makes them coroutine-safe, i.e. you can safely block on them from within a
307 coroutine.
308
309 =item * lead to data corruption or worse
310
311 As C<unblock_sub> cannot be used by this module (as it is the module
312 that implements it, basically), you must not call into the event
313 loop recursively from any coroutine. This is not usually a difficult
314 restriction to live with, just use condvars, C<unblock_sub> or other means
315 of inter-coroutine-communications.
316
317 If you use a module that supports AnyEvent (or uses the same event loop
318 as AnyEvent, making the compatible), and it offers callbacks of any kind,
319 then you must not block in them, either (or use e.g. C<unblock_sub>), see
320 the description of C<unblock_sub> in the L<Coro> module.
321
322 This also means that you should load the module as early as possible,
323 as only condvars created after this module has been loaded will work
324 correctly.
325
326 =back
327
328 =head1 SEE ALSO
329
330 L<AnyEvent>, to see which event loops are supported, L<Coro::EV> and
331 L<Coro::Event> for more efficient and more correct solutions (they will be
332 used automatically if applicable).
333
334 =head1 AUTHOR
335
336 Marc Lehmann <schmorp@schmorp.de>
337 http://home.schmorp.de/
338
339 =cut
340