ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/AnyEvent.pm
Revision: 1.52
Committed: Thu Oct 1 23:16:27 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.51: +5 -6 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 Coro::AnyEvent - integrate threads into AnyEvent
4
5 =head1 SYNOPSIS
6
7 use Coro;
8 use Coro::AnyEvent;
9
10 # use coro within an AnyEvent environment
11
12 =head1 DESCRIPTION
13
14 When one naively starts to use threads in Perl, one will quickly run
15 into the problem that threads that block on a syscall (sleeping,
16 reading from a socket etc.) will block all threads.
17
18 If one then uses an event loop, the problem is that the event loop has
19 no knowledge of threads and will not run them before it polls for new
20 events, again blocking the whole process.
21
22 This module integrates threads into any event loop supported by
23 AnyEvent, combining event-based programming with coroutine-based
24 programming in a natural way.
25
26 All you have to do is C<use Coro::AnyEvent>, run the event loop of your
27 choice in some thread and then you can run threads freely.
28
29 =head1 USAGE
30
31 This module autodetects the event loop used (by relying on L<AnyEvent>)
32 and will either automatically defer to the high-performance L<Coro::EV> or
33 L<Coro::Event> modules, or will use a generic integration into any event
34 loop supported by L<AnyEvent>.
35
36 Note that if you need to wait for a single event, the rouse functions will
37 come in handy (see the Coro manpage for details):
38
39 # wait for single SIGINT
40 {
41 my $int_w = AnyEvent->signal (signal => "INT", cb => Coro::rouse_cb);
42 Coro::rouse_wait;
43 }
44
45 =head1 FUNCTIONS
46
47 Coro::AnyEvent offers a few functions that might be useful.
48
49 =over 4
50
51 =cut
52
53 package Coro::AnyEvent;
54
55 use common::sense;
56
57 use Coro;
58 use AnyEvent ();
59
60 our $VERSION = 5.17;
61
62 #############################################################################
63 # idle handler
64
65 our $IDLE;
66
67 #############################################################################
68 # 0-timeout idle emulation watcher
69
70 our $ACTIVITY;
71
72 sub _activity {
73 $ACTIVITY ||= AE::timer 0, 0, \&_schedule;
74 }
75
76 Coro::_set_readyhook (\&AnyEvent::detect);
77
78 AnyEvent::post_detect {
79 unshift @AnyEvent::CondVar::ISA, "Coro::AnyEvent::CondVar";
80
81 Coro::_set_readyhook undef;
82
83 my $model = $AnyEvent::MODEL;
84
85 if ($model eq "AnyEvent::Impl::EV" and eval { require Coro::EV }) {
86 # provider faster versions of some functions
87
88 eval '
89 *sleep = \&Coro::EV::timer_once;
90 *poll = \&Coro::EV::_poll;
91 *idle = sub() {
92 my $w = EV::idle Coro::rouse_cb;
93 Coro::rouse_wait;
94 };
95 *idle_upto = sub($) {
96 my $cb = Coro::rouse_cb;
97 my $t = EV::timer $_[0], 0, $cb;
98 my $w = EV::idle $cb;
99 Coro::rouse_wait;
100 };
101 *readable = sub($;$) {
102 EV::READ & Coro::EV::timed_io_once $_[0], EV::READ , $_[1]
103 };
104 *writable = sub($;$) {
105 EV::WRITE & Coro::EV::timed_io_once $_[0], EV::WRITE, $_[1]
106 };
107 ';
108 die if $@;
109
110 } elsif ($model eq "AnyEvent::Impl::Event" and eval { require Coro::Event }) {
111 # let Coro::Event do its thing
112 } else {
113 # do the inefficient thing ourselves
114 Coro::_set_readyhook \&_activity;
115
116 $IDLE = new Coro sub {
117 my $one_event = AnyEvent->can ("one_event");
118
119 while () {
120 $one_event->();
121 Coro::schedule;
122 }
123 };
124 $IDLE->{desc} = "[AnyEvent idle process]";
125
126 $Coro::idle = $IDLE;
127
128 # call the readyhook, in case coroutines were already readied
129 _activity;
130 }
131 };
132
133 =item Coro::AnyEvent::poll
134
135 This call will block the current thread until the event loop has polled
136 for new events and instructs the event loop to poll for new events once,
137 without blocking.
138
139 Note that this call will not actually execute the poll, just block until
140 new events have been polled, so other threads will have a chance to run.
141
142 This is useful when you have a thread that does some computations, but you
143 still want to poll for new events from time to time. Simply call C<poll>
144 from time to time:
145
146 my $long_calc = async {
147 for (1..10000) {
148 Coro::AnyEvent::poll:
149 # do some stuff, make sure it takes at least 0.001s or so
150 }
151 }
152
153 Although you should also consider C<idle> or C<idle_upto> in such cases.
154
155 =item Coro::AnyEvent::sleep $seconds
156
157 This blocks the current thread for at least the given number of seconds.
158
159 =item Coro::AnyEvent::idle
160
161 This call is similar to C<poll> in that it will also poll for
162 events. Unlike C<poll>, it will only resume the thread once there are no
163 events to handle anymore, i.e. when the process is otherwise idle.
164
165 =item Coro::AnyEvent::idle_upto $seconds
166
167 Like C<idle>, but with a maximum waiting time.
168
169 If your process is busy handling events, calling C<idle> can mean that
170 your thread will never be resumed. To avoid this, you can use C<idle_upto>
171 and specify a timeout, after which your thread will be resumed even if the
172 process is completely busy.
173
174 =item Coro::AnyEvent::readable $fh_or_fileno[, $timeout]
175
176 =item Coro::AnyEvent::writable $fh_or_fileno[, $timeout]
177
178 Blocks the current thread until the given file handle (or file descriptor)
179 becomes readable (or writable), or the given timeout has elapsed,
180 whichever happens first. No timeout counts as infinite timeout.
181
182 Returns true when the file handle became ready, false when a timeout
183 occured.
184
185 Note that these functions are quite inefficient as compared to using a
186 single watcher (they recreate watchers on every invocation) or compared to
187 using Coro::Handle.
188
189 Note also that they only work for sources that have reasonable
190 non-blocking behaviour (e.g. not files).
191
192 Example: wait until STDIN becomes readable, then quit the program.
193
194 use Coro::AnyEvent;
195 print "press enter to quit...\n";
196 Coro::AnyEvent::readable *STDIN;
197 exit 0;
198
199 =cut
200
201 sub poll() {
202 my $w = AE::timer 0, 0, Coro::rouse_cb;
203 Coro::rouse_wait;
204 }
205
206 sub sleep($) {
207 my $w = AE::timer $_[0], 0, Coro::rouse_cb;
208 Coro::rouse_wait;
209 }
210
211 sub idle() {
212 my $w = AE::idle Coro::rouse_cb;
213 Coro::rouse_wait;
214 }
215
216 sub idle_upto($) {
217 my $cb = Coro::rouse_cb;
218 my $t = AE::timer shift, 0, $cb;
219 my $w = AE::idle $cb;
220 Coro::rouse_wait;
221 }
222
223 sub readable($;$) {
224 my $cb = Coro::rouse_cb;
225 my $w = AE::io $_[0], 0, sub { $cb->(1) };
226 my $t = defined $_[1] && AE::timer $_[1], 0, sub { $cb->(0) };
227 Coro::rouse_wait
228 }
229
230 sub writable($;$) {
231 my $cb = Coro::rouse_cb;
232 my $w = AE::io $_[0], 1, sub { $cb->(1) };
233 my $t = defined $_[1] && AE::timer $_[1], 0, sub { $cb->(0) };
234 Coro::rouse_wait
235 }
236
237 #############################################################################
238 # override condvars
239
240 package Coro::AnyEvent::CondVar;
241
242 sub _send {
243 (delete $_[0]{_ae_coro})->ready if $_[0]{_ae_coro};
244 }
245
246 sub _wait {
247 while (!$_[0]{_ae_sent}) {
248 local $_[0]{_ae_coro} = $Coro::current;
249 Coro::schedule;
250 }
251 }
252
253 1;
254
255 =back
256
257 =head1 IMPLEMENTATION DETAILS
258
259 Unfortunately, few event loops (basically only L<EV> and L<Event>)
260 support the kind of integration required for smooth operations well, and
261 consequently, AnyEvent cannot completely offer the functionality required
262 by this module, so we need to improvise.
263
264 Here is what this module does when it has to work with other event loops:
265
266 =over 4
267
268 =item * run ready threads before blocking the process
269
270 Each time a thread is put into the ready queue (and there are no other
271 threads in the ready queue), a timer with an C<after> value of C<0> is
272 registered with AnyEvent.
273
274 This creates something similar to an I<idle> watcher, i.e. a watcher
275 that keeps the event loop from blocking but still polls for new
276 events. (Unfortunately, some badly designed event loops (e.g. Event::Lib)
277 don't support a timeout of C<0> and will always block for a bit).
278
279 The callback for that timer will C<cede> to other threads of the same or
280 higher priority for as long as such threads exists. This has the effect of
281 running all threads that have work to do until all threads block to wait
282 for external events.
283
284 If no threads of equal or higher priority are ready, it will cede to any
285 thread, but only once. This has the effect of running lower-priority
286 threads as well, but it will not keep higher priority threads from
287 receiving new events.
288
289 The priority used is simply the priority of the thread that runs the event
290 loop, usually the main program, which usually has a priority of C<0>. Note
291 that Coro::AnyEvent does I<not> run an event loop for you, so unless the
292 main program runs one, there will simply be no event loop to C<cede> to
293 (event handling will still work, somewhat inefficiently, but any thread
294 will have a higher priority than event handling in that case).
295
296 =item * provide a suitable idle callback.
297
298 In addition to hooking into C<ready>, this module will also provide a
299 C<$Coro::idle> handler that runs the event loop. It is best not to take
300 advantage of this too often, as this is rather inefficient, but it should
301 work perfectly fine.
302
303 =item * provide overrides for AnyEvent's condvars
304
305 This module installs overrides for AnyEvent's condvars. That is, when
306 the module is loaded it will provide its own condition variables. This
307 makes them coroutine-safe, i.e. you can safely block on them from within a
308 coroutine.
309
310 =item * lead to data corruption or worse
311
312 As C<unblock_sub> cannot be used by this module (as it is the module
313 that implements it, basically), you must not call into the event
314 loop recursively from any coroutine. This is not usually a difficult
315 restriction to live with, just use condvars, C<unblock_sub> or other means
316 of inter-coroutine-communications.
317
318 If you use a module that supports AnyEvent (or uses the same event loop
319 as AnyEvent, making the compatible), and it offers callbacks of any kind,
320 then you must not block in them, either (or use e.g. C<unblock_sub>), see
321 the description of C<unblock_sub> in the L<Coro> module.
322
323 This also means that you should load the module as early as possible,
324 as only condvars created after this module has been loaded will work
325 correctly.
326
327 =back
328
329 =head1 SEE ALSO
330
331 L<AnyEvent>, to see which event loops are supported, L<Coro::EV> and
332 L<Coro::Event> for more efficient and more correct solutions (they will be
333 used automatically if applicable).
334
335 =head1 AUTHOR
336
337 Marc Lehmann <schmorp@schmorp.de>
338 http://home.schmorp.de/
339
340 =cut
341