ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Handle.pm
Revision: 1.32
Committed: Sun Nov 25 13:53:48 2007 UTC (16 years, 6 months ago) by root
Branch: MAIN
CVS Tags: rel-4_22, rel-4_3, rel-4_31
Changes since 1.31: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 Coro::Handle - non-blocking io with a blocking interface.
4
5 =head1 SYNOPSIS
6
7 use Coro::Handle;
8
9 =head1 DESCRIPTION
10
11 This module implements IO-handles in a coroutine-compatible way, that is,
12 other coroutines can run while reads or writes block on the handle.
13
14 It does so by using L<AnyEvent|AnyEvent> to wait for readable/writable
15 data, allowing other coroutines to run while one coroutine waits for I/O.
16
17 Coro::Handle does NOT inherit from IO::Handle but uses tied objects.
18
19 If at all possible, you should I<always> prefer method calls on the handle object over invoking
20 tied methods, i.e.:
21
22 $fh->print ($str); # NOT print $fh $str;
23 my $line = $fh->readline; # NOT my $line = <$fh>;
24
25 The reason is that perl recurses within the interpreter when invoking tie
26 magic, forcing the (temporary) allocation of a (big) stack. If you have
27 lots of socket connections and they happen to wait in e.g. <$fh>, then
28 they would all have a costly C coroutine associated with them.
29
30 =over 4
31
32 =cut
33
34 package Coro::Handle;
35
36 no warnings;
37 use strict;
38
39 use Carp ();
40 use Errno ();
41 use base 'Exporter';
42
43 our $VERSION = '3.0';
44 our @EXPORT = qw(unblock);
45
46 =item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...]
47
48 Create a new non-blocking io-handle using the given
49 perl-filehandle. Returns C<undef> if no filehandle is given. The only
50 other supported argument is "timeout", which sets a timeout for each
51 operation.
52
53 =cut
54
55 sub new_from_fh {
56 my $class = shift;
57 my $fh = shift or return;
58 my $self = do { local *Coro::Handle };
59
60 tie $self, 'Coro::Handle::FH', fh => $fh, @_;
61
62 bless \$self, ref $class ? ref $class : $class
63 }
64
65 =item $fh = unblock $fh
66
67 This is a convinience function that just calls C<new_from_fh> on the
68 given filehandle. Use it to replace a normal perl filehandle by a
69 non-(coroutine-)blocking equivalent.
70
71 =cut
72
73 sub unblock($) {
74 new_from_fh Coro::Handle $_[0]
75 }
76
77 =item $fh->writable, $fh->readable
78
79 Wait until the filehandle is readable or writable (and return true) or
80 until an error condition happens (and return false).
81
82 =cut
83
84 sub readable { Coro::Handle::FH::readable (tied ${$_[0]}) }
85 sub writable { Coro::Handle::FH::writable (tied ${$_[0]}) }
86
87 =item $fh->readline ([$terminator])
88
89 Like the builtin of the same name, but allows you to specify the input
90 record separator in a coroutine-safe manner (i.e. not using a global
91 variable).
92
93 =cut
94
95 sub readline { tied(${+shift})->READLINE (@_) }
96
97 =item $fh->autoflush ([...])
98
99 Always returns true, arguments are being ignored (exists for compatibility
100 only). Might change in the future.
101
102 =cut
103
104 sub autoflush { !0 }
105
106 =item $fh->fileno, $fh->close, $fh->read, $fh->sysread, $fh->syswrite, $fh->print, $fh->printf
107
108 Work like their function equivalents (except read, which works like
109 sysread. You should not use the read function with Coro::Handle's, it will
110 work but it's not efficient).
111
112 =cut
113
114 sub read { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
115 sub sysread { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
116 sub syswrite { Coro::Handle::FH::WRITE (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
117 sub print { Coro::Handle::FH::WRITE (tied ${+shift}, join "", @_) }
118 sub printf { Coro::Handle::FH::PRINTF (tied ${+shift}, @_) }
119 sub fileno { Coro::Handle::FH::FILENO (tied ${$_[0]}) }
120 sub close { Coro::Handle::FH::CLOSE (tied ${$_[0]}) }
121 sub blocking { !0 } # this handler always blocks the caller
122
123 sub partial {
124 my $obj = tied ${$_[0]};
125
126 my $retval = $obj->[8];
127 $obj->[8] = $_[1] if @_ > 1;
128 $retval
129 }
130
131 =item connect, listen, bind, getsockopt, setsockopt,
132 send, recv, peername, sockname, shutdown, peerport, peerhost
133
134 Do the same thing as the perl builtins or IO::Socket methods (but return
135 true on EINPROGRESS). Remember that these must be method calls.
136
137 =cut
138
139 sub connect { connect tied(${$_[0]})->[0], $_[1] or $! == Errno::EINPROGRESS }
140 sub bind { bind tied(${$_[0]})->[0], $_[1] }
141 sub listen { listen tied(${$_[0]})->[0], $_[1] }
142 sub getsockopt { getsockopt tied(${$_[0]})->[0], $_[1], $_[2] }
143 sub setsockopt { setsockopt tied(${$_[0]})->[0], $_[1], $_[2], $_[3] }
144 sub send { send tied(${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
145 sub recv { recv tied(${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
146 sub sockname { getsockname tied(${$_[0]})->[0] }
147 sub peername { getpeername tied(${$_[0]})->[0] }
148 sub shutdown { shutdown tied(${$_[0]})->[0], $_[1] }
149
150 =item ($fh, $peername) = $listen_fh->accept
151
152 In scalar context, returns the newly accepted socket (or undef) and in
153 list context return the ($fh, $peername) pair (or nothing).
154
155 =cut
156
157 sub accept {
158 my ($peername, $fh);
159 while () {
160 $peername = accept $fh, tied(${$_[0]})->[0]
161 and return wantarray
162 ? ($_[0]->new_from_fh($fh), $peername)
163 : $_[0]->new_from_fh($fh);
164
165 return unless $!{EAGAIN};
166
167 $_[0]->readable or return;
168 }
169 }
170
171 =item $fh->timeout ([...])
172
173 The optional argument sets the new timeout (in seconds) for this
174 handle. Returns the current (new) value.
175
176 C<0> is a valid timeout, use C<undef> to disable the timeout.
177
178 =cut
179
180 sub timeout {
181 my $self = tied ${$_[0]};
182 if (@_ > 1) {
183 $self->[2] = $_[1];
184 $self->[5]->timeout ($_[1]) if $self->[5];
185 $self->[6]->timeout ($_[1]) if $self->[6];
186 }
187 $self->[2]
188 }
189
190 =item $fh->fh
191
192 Returns the "real" (non-blocking) filehandle. Use this if you want to
193 do operations on the file handle you cannot do using the Coro::Handle
194 interface.
195
196 =item $fh->rbuf
197
198 Returns the current contents of the read buffer (this is an lvalue, so you
199 can change the read buffer if you like).
200
201 You can use this function to implement your own optimized reader when neither
202 readline nor sysread are viable candidates, like this:
203
204 # first get the _real_ non-blocking filehandle
205 # and fetch a reference to the read buffer
206 my $nb_fh = $fh->fh;
207 my $buf = \$fh->rbuf;
208
209 while () {
210 # now use buffer contents, modifying
211 # if necessary to reflect the removed data
212
213 last if $$buf ne ""; # we have leftover data
214
215 # read another buffer full of data
216 $fh->readable or die "end of file";
217 sysread $nb_fh, $$buf, 8192;
218 }
219
220 =cut
221
222 sub fh {
223 (tied ${$_[0]})->[0];
224 }
225
226 sub rbuf : lvalue {
227 (tied ${$_[0]})->[3];
228 }
229
230 sub DESTROY {
231 # nop
232 }
233
234 our $AUTOLOAD;
235
236 sub AUTOLOAD {
237 my $self = tied ${$_[0]};
238
239 (my $func = $AUTOLOAD) =~ s/^(.*):://;
240
241 my $forward = UNIVERSAL::can $self->[7], $func;
242
243 $forward or
244 die "Can't locate object method \"$func\" via package \"" . (ref $self) . "\"";
245
246 goto &$forward;
247 }
248
249 package Coro::Handle::FH;
250
251 no warnings;
252 use strict;
253
254 use Fcntl ();
255 use Errno ();
256 use Carp 'croak';
257
258 use AnyEvent;
259
260 # formerly a hash, but we are speed-critical, so try
261 # to be faster even if it hurts.
262 #
263 # 0 FH
264 # 1 desc
265 # 2 timeout
266 # 3 rb
267 # 4 wb # unused
268 # 5 read watcher, if Coro::Event used
269 # 6 write watcher, if Coro::Event used
270 # 7 forward class
271 # 8 blocking
272
273 sub TIEHANDLE {
274 my ($class, %arg) = @_;
275
276 my $self = bless [], $class;
277 $self->[0] = $arg{fh};
278 $self->[1] = $arg{desc};
279 $self->[2] = $arg{timeout};
280 $self->[3] = "";
281 $self->[4] = "";
282 $self->[7] = $arg{forward_class};
283 $self->[8] = $arg{partial};
284
285 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
286 or croak "fcntl(O_NONBLOCK): $!";
287
288 $self
289 }
290
291 sub cleanup {
292 $_[0][5]->cancel if $_[0][5];
293 $_[0][6]->cancel if $_[0][6];
294 @{$_[0]} = ();
295 }
296
297 sub OPEN {
298 &cleanup;
299 my $self = shift;
300 my $r = @_ == 2 ? open $self->[0], $_[0], $_[1]
301 : open $self->[0], $_[0], $_[1], $_[2];
302
303 if ($r) {
304 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
305 or croak "fcntl(O_NONBLOCK): $!";
306 }
307
308 $r
309 }
310
311 sub PRINT {
312 WRITE (shift, join "", @_)
313 }
314
315 sub PRINTF {
316 WRITE (shift, sprintf shift,@_)
317 }
318
319 sub GETC {
320 my $buf;
321 READ ($_[0], $buf, 1);
322 $buf
323 }
324
325 sub BINMODE {
326 binmode $_[0][0];
327 }
328
329 sub TELL {
330 Carp::croak "Coro::Handle's don't support tell()";
331 }
332
333 sub SEEK {
334 Carp::croak "Coro::Handle's don't support seek()";
335 }
336
337 sub EOF {
338 Carp::croak "Coro::Handle's don't support eof()";
339 }
340
341 sub CLOSE {
342 &cleanup;
343 close $_[0][0]
344 }
345
346 sub DESTROY {
347 &cleanup;
348 }
349
350 sub FILENO {
351 fileno $_[0][0]
352 }
353
354 # seems to be called for stringification (how weird), at least
355 # when DumpValue::dumpValue is used to print this.
356 sub FETCH {
357 "$_[0]<$_[0][1]>"
358 }
359
360 sub readable_anyevent {
361 my $current = $Coro::current;
362 my $io = 1;
363
364 my $w = AnyEvent->io (
365 desc => "$_[0][1] read watcher",
366 fh => $_[0][0],
367 poll => 'r',
368 cb => sub {
369 $current->ready if $current;
370 undef $current;
371 },
372 );
373
374 my $t = (defined $_[0][2]) && AnyEvent->timer (
375 desc => "fh $_[0][1] read timeout",
376 after => $_[0][2],
377 cb => sub {
378 $io = 0;
379 $current->ready if $current;
380 undef $current;
381 },
382 );
383
384 &Coro::schedule;
385 &Coro::schedule while $current;
386
387 $io
388 }
389
390 sub writable_anyevent {
391 my $current = $Coro::current;
392 my $io = 1;
393
394 my $w = AnyEvent->io (
395 desc => "fh $_[0][1] write watcher",
396 fh => $_[0][0],
397 poll => 'w',
398 cb => sub {
399 $current->ready if $current;
400 undef $current;
401 },
402 );
403
404 my $t = (defined $_[0][2]) && AnyEvent->timer (
405 desc => "fh $_[0][1] write timeout",
406 after => $_[0][2],
407 cb => sub {
408 $io = 0;
409 $current->ready if $current;
410 undef $current;
411 },
412 );
413
414 &Coro::schedule while $current;
415
416 $io
417 }
418
419 sub readable_coro {
420 ($_[0][5] ||= "Coro::Event"->io (
421 fd => $_[0][0],
422 desc => "fh $_[0][1] read watcher",
423 timeout => $_[0][2],
424 poll => &Event::Watcher::R + &Event::Watcher::E + &Event::Watcher::T,
425 ))->next->[4] & &Event::Watcher::R
426 }
427
428 sub writable_coro {
429 ($_[0][6] ||= "Coro::Event"->io (
430 fd => $_[0][0],
431 desc => "fh $_[0][1] write watcher",
432 timeout => $_[0][2],
433 poll => &Event::Watcher::W + &Event::Watcher::E + &Event::Watcher::T,
434 ))->next->[4] & &Event::Watcher::W
435 }
436
437 sub readable_ev {
438 &EV::READ == Coro::EV::timed_io_once (fileno $_[0][0], &EV::READ , $_[0][2])
439 }
440
441 sub writable_ev {
442 &EV::WRITE == Coro::EV::timed_io_once (fileno $_[0][0], &EV::WRITE, $_[0][2])
443 }
444
445 # decide on event model at runtime
446 for my $rw (qw(readable writable)) {
447 no strict 'refs';
448
449 *$rw = sub {
450 AnyEvent::detect;
451 if ($AnyEvent::MODEL eq "AnyEvent::Impl::Coro" or $AnyEvent::MODEL eq "AnyEvent::Impl::Event") {
452 require Coro::Event;
453 *$rw = \&{"$rw\_coro"};
454 } elsif ($AnyEvent::MODEL eq "AnyEvent::Impl::CoroEV" or $AnyEvent::MODEL eq "AnyEvent::Impl::EV") {
455 require Coro::EV;
456 *$rw = \&{"$rw\_ev"};
457 } else {
458 *$rw = \&{"$rw\_anyevent"};
459 }
460 goto &$rw
461 };
462 };
463
464 sub WRITE {
465 my $len = defined $_[2] ? $_[2] : length $_[1];
466 my $ofs = $_[3];
467 my $res = 0;
468
469 while () {
470 my $r = syswrite ($_[0][0], $_[1], $len, $ofs);
471 if (defined $r) {
472 $len -= $r;
473 $ofs += $r;
474 $res += $r;
475 last unless $len;
476 } elsif ($! != Errno::EAGAIN) {
477 last;
478 }
479 last unless &writable;
480 }
481
482 return $res;
483 }
484
485 sub READ {
486 my $len = $_[2];
487 my $ofs = $_[3];
488 my $res = 0;
489
490 # first deplete the read buffer
491 if (length $_[0][3]) {
492 my $l = length $_[0][3];
493 if ($l <= $len) {
494 substr ($_[1], $ofs) = $_[0][3]; $_[0][3] = "";
495 $len -= $l;
496 $ofs += $l;
497 $res += $l;
498 return $res unless $len;
499 } else {
500 substr ($_[1], $ofs) = substr ($_[0][3], 0, $len);
501 substr ($_[0][3], 0, $len) = "";
502 return $len;
503 }
504 }
505
506 while() {
507 my $r = sysread $_[0][0], $_[1], $len, $ofs;
508 if (defined $r) {
509 $len -= $r;
510 $ofs += $r;
511 $res += $r;
512 last unless $len && $r;
513 } elsif ($! != Errno::EAGAIN) {
514 last;
515 }
516 last if $_[0][8] || !&readable;
517 }
518
519 return $res;
520 }
521
522 sub READLINE {
523 my $irs = @_ > 1 ? $_[1] : $/;
524
525 while() {
526 if (defined $irs) {
527 my $pos = index $_[0][3], $irs;
528 if ($pos >= 0) {
529 $pos += length $irs;
530 my $res = substr $_[0][3], 0, $pos;
531 substr ($_[0][3], 0, $pos) = "";
532 return $res;
533 }
534 }
535
536 my $r = sysread $_[0][0], $_[0][3], 8192, length $_[0][3];
537 if (defined $r) {
538 return length $_[0][3] ? delete $_[0][3] : undef unless $r;
539 } elsif ($! != Errno::EAGAIN || !&readable) {
540 return length $_[0][3] ? delete $_[0][3] : undef;
541 }
542 }
543 }
544
545 1;
546
547 =back
548
549 =head1 BUGS
550
551 - Perl's IO-Handle model is THE bug.
552
553 =head1 AUTHOR
554
555 Marc Lehmann <schmorp@schmorp.de>
556 http://home.schmorp.de/
557
558 =cut
559