ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Handle.pm
Revision: 1.95
Committed: Sun Feb 13 04:39:15 2011 UTC (13 years, 3 months ago) by root
Branch: MAIN
CVS Tags: rel-5_36
Changes since 1.94: +1 -1 lines
Log Message:
5.26

File Contents

# Content
1 =head1 NAME
2
3 Coro::Handle - non-blocking I/O with a blocking interface.
4
5 =head1 SYNOPSIS
6
7 use Coro::Handle;
8
9 =head1 DESCRIPTION
10
11 This module is an L<AnyEvent> user, you need to make sure that you use and
12 run a supported event loop.
13
14 This module implements IO-handles in a coroutine-compatible way, that is,
15 other coroutines can run while reads or writes block on the handle.
16
17 It does so by using L<AnyEvent|AnyEvent> to wait for readable/writable
18 data, allowing other coroutines to run while one coroutine waits for I/O.
19
20 Coro::Handle does NOT inherit from IO::Handle but uses tied objects.
21
22 If at all possible, you should I<always> prefer method calls on the handle object over invoking
23 tied methods, i.e.:
24
25 $fh->print ($str); # NOT print $fh $str;
26 my $line = $fh->readline; # NOT my $line = <$fh>;
27
28 The reason is that perl recurses within the interpreter when invoking tie
29 magic, forcing the (temporary) allocation of a (big) stack. If you have
30 lots of socket connections and they happen to wait in e.g. <$fh>, then
31 they would all have a costly C coroutine associated with them.
32
33 =over 4
34
35 =cut
36
37 package Coro::Handle;
38
39 use common::sense;
40
41 use Carp ();
42 use Errno qw(EAGAIN EINTR EINPROGRESS);
43
44 use AnyEvent::Util qw(WSAEWOULDBLOCK WSAEINPROGRESS);
45
46 use base 'Exporter';
47
48 our $VERSION = 5.26;
49 our @EXPORT = qw(unblock);
50
51 =item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...]
52
53 Create a new non-blocking io-handle using the given
54 perl-filehandle. Returns C<undef> if no filehandle is given. The only
55 other supported argument is "timeout", which sets a timeout for each
56 operation.
57
58 =cut
59
60 sub new_from_fh {
61 my $class = shift;
62 my $fh = shift or return;
63 my $self = do { local *Coro::Handle };
64
65 tie $self, 'Coro::Handle::FH', fh => $fh, @_;
66
67 bless \$self, ref $class ? ref $class : $class
68 }
69
70 =item $fh = unblock $fh
71
72 This is a convenience function that just calls C<new_from_fh> on the
73 given filehandle. Use it to replace a normal perl filehandle by a
74 non-(coroutine-)blocking equivalent.
75
76 =cut
77
78 sub unblock($) {
79 new_from_fh Coro::Handle $_[0]
80 }
81
82 =item $fh->writable, $fh->readable
83
84 Wait until the filehandle is readable or writable (and return true) or
85 until an error condition happens (and return false).
86
87 =cut
88
89 sub readable { Coro::Handle::FH::readable (tied ${$_[0]}) }
90 sub writable { Coro::Handle::FH::writable (tied ${$_[0]}) }
91
92 =item $fh->readline ([$terminator])
93
94 Similar to the builtin of the same name, but allows you to specify the
95 input record separator in a coroutine-safe manner (i.e. not using a global
96 variable). Paragraph mode is not supported, use "\n\n" to achieve the same
97 effect.
98
99 =cut
100
101 sub readline { tied(${+shift})->READLINE (@_) }
102
103 =item $fh->autoflush ([...])
104
105 Always returns true, arguments are being ignored (exists for compatibility
106 only). Might change in the future.
107
108 =cut
109
110 sub autoflush { !0 }
111
112 =item $fh->fileno, $fh->close, $fh->read, $fh->sysread, $fh->syswrite, $fh->print, $fh->printf
113
114 Work like their function equivalents (except read, which works like
115 sysread. You should not use the read function with Coro::Handle's, it will
116 work but it's not efficient).
117
118 =cut
119
120 sub read { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
121 sub sysread { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
122 sub syswrite { Coro::Handle::FH::WRITE (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
123 sub print { Coro::Handle::FH::WRITE (tied ${+shift}, join "", @_) }
124 sub printf { Coro::Handle::FH::PRINTF (tied ${+shift}, @_) }
125 sub fileno { Coro::Handle::FH::FILENO (tied ${$_[0]}) }
126 sub close { Coro::Handle::FH::CLOSE (tied ${$_[0]}) }
127 sub blocking { !0 } # this handler always blocks the caller
128
129 sub partial {
130 my $obj = tied ${$_[0]};
131
132 my $retval = $obj->[8];
133 $obj->[8] = $_[1] if @_ > 1;
134 $retval
135 }
136
137 =item connect, listen, bind, getsockopt, setsockopt,
138 send, recv, peername, sockname, shutdown, peerport, peerhost
139
140 Do the same thing as the perl builtins or IO::Socket methods (but return
141 true on EINPROGRESS). Remember that these must be method calls.
142
143 =cut
144
145 sub connect { connect tied(${$_[0]})->[0], $_[1] or $! == EINPROGRESS or $! == EAGAIN or $! == WSAEWOULDBLOCK }
146 sub bind { bind tied(${$_[0]})->[0], $_[1] }
147 sub listen { listen tied(${$_[0]})->[0], $_[1] }
148 sub getsockopt { getsockopt tied(${$_[0]})->[0], $_[1], $_[2] }
149 sub setsockopt { setsockopt tied(${$_[0]})->[0], $_[1], $_[2], $_[3] }
150 sub send { send tied(${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
151 sub recv { recv tied(${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
152 sub sockname { getsockname tied(${$_[0]})->[0] }
153 sub peername { getpeername tied(${$_[0]})->[0] }
154 sub shutdown { shutdown tied(${$_[0]})->[0], $_[1] }
155
156 =item ($fh, $peername) = $listen_fh->accept
157
158 In scalar context, returns the newly accepted socket (or undef) and in
159 list context return the ($fh, $peername) pair (or nothing).
160
161 =cut
162
163 sub accept {
164 my ($peername, $fh);
165 while () {
166 $peername = accept $fh, tied(${$_[0]})->[0]
167 and return wantarray
168 ? ($_[0]->new_from_fh($fh), $peername)
169 : $_[0]->new_from_fh($fh);
170
171 return if $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK;
172
173 $_[0]->readable or return;
174 }
175 }
176
177 =item $fh->timeout ([...])
178
179 The optional argument sets the new timeout (in seconds) for this
180 handle. Returns the current (new) value.
181
182 C<0> is a valid timeout, use C<undef> to disable the timeout.
183
184 =cut
185
186 sub timeout {
187 my $self = tied ${$_[0]};
188 if (@_ > 1) {
189 $self->[2] = $_[1];
190 $self->[5]->timeout ($_[1]) if $self->[5];
191 $self->[6]->timeout ($_[1]) if $self->[6];
192 }
193 $self->[2]
194 }
195
196 =item $fh->fh
197
198 Returns the "real" (non-blocking) filehandle. Use this if you want to
199 do operations on the file handle you cannot do using the Coro::Handle
200 interface.
201
202 =item $fh->rbuf
203
204 Returns the current contents of the read buffer (this is an lvalue, so you
205 can change the read buffer if you like).
206
207 You can use this function to implement your own optimized reader when neither
208 readline nor sysread are viable candidates, like this:
209
210 # first get the _real_ non-blocking filehandle
211 # and fetch a reference to the read buffer
212 my $nb_fh = $fh->fh;
213 my $buf = \$fh->rbuf;
214
215 while () {
216 # now use buffer contents, modifying
217 # if necessary to reflect the removed data
218
219 last if $$buf ne ""; # we have leftover data
220
221 # read another buffer full of data
222 $fh->readable or die "end of file";
223 sysread $nb_fh, $$buf, 8192;
224 }
225
226 =cut
227
228 sub fh {
229 (tied ${$_[0]})->[0];
230 }
231
232 sub rbuf : lvalue {
233 (tied ${$_[0]})->[3];
234 }
235
236 sub DESTROY {
237 # nop
238 }
239
240 our $AUTOLOAD;
241
242 sub AUTOLOAD {
243 my $self = tied ${$_[0]};
244
245 (my $func = $AUTOLOAD) =~ s/^(.*):://;
246
247 my $forward = UNIVERSAL::can $self->[7], $func;
248
249 $forward or
250 die "Can't locate object method \"$func\" via package \"" . (ref $self) . "\"";
251
252 goto &$forward;
253 }
254
255 package Coro::Handle::FH;
256
257 use common::sense;
258
259 use Carp 'croak';
260 use Errno qw(EAGAIN EINTR);
261
262 use AnyEvent::Util qw(WSAEWOULDBLOCK);
263
264 use Coro::AnyEvent;
265
266 # formerly a hash, but we are speed-critical, so try
267 # to be faster even if it hurts.
268 #
269 # 0 FH
270 # 1 desc
271 # 2 timeout
272 # 3 rb
273 # 4 wb # unused
274 # 5 read watcher, if Coro::Event|EV used
275 # 6 write watcher, if Coro::Event|EV used
276 # 7 forward class
277 # 8 blocking
278
279 sub TIEHANDLE {
280 my ($class, %arg) = @_;
281
282 my $self = bless [], $class;
283 $self->[0] = $arg{fh};
284 $self->[1] = $arg{desc};
285 $self->[2] = $arg{timeout};
286 $self->[3] = "";
287 $self->[4] = "";
288 $self->[7] = $arg{forward_class};
289 $self->[8] = $arg{partial};
290
291 AnyEvent::Util::fh_nonblocking $self->[0], 1;
292
293 $self
294 }
295
296 sub cleanup {
297 # gets overriden for Coro::Event
298 @{$_[0]} = ();
299 }
300
301 sub OPEN {
302 &cleanup;
303 my $self = shift;
304 my $r = @_ == 2 ? open $self->[0], $_[0], $_[1]
305 : open $self->[0], $_[0], $_[1], $_[2];
306
307 if ($r) {
308 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
309 or croak "fcntl(O_NONBLOCK): $!";
310 }
311
312 $r
313 }
314
315 sub PRINT {
316 WRITE (shift, join "", @_)
317 }
318
319 sub PRINTF {
320 WRITE (shift, sprintf shift, @_)
321 }
322
323 sub GETC {
324 my $buf;
325 READ ($_[0], $buf, 1);
326 $buf
327 }
328
329 sub BINMODE {
330 binmode $_[0][0];
331 }
332
333 sub TELL {
334 Carp::croak "Coro::Handle's don't support tell()";
335 }
336
337 sub SEEK {
338 Carp::croak "Coro::Handle's don't support seek()";
339 }
340
341 sub EOF {
342 Carp::croak "Coro::Handle's don't support eof()";
343 }
344
345 sub CLOSE {
346 my $fh = $_[0][0];
347 &cleanup;
348 close $fh
349 }
350
351 sub DESTROY {
352 &cleanup;
353 }
354
355 sub FILENO {
356 fileno $_[0][0]
357 }
358
359 # seems to be called for stringification (how weird), at least
360 # when DumpValue::dumpValue is used to print this.
361 sub FETCH {
362 "$_[0]<$_[0][1]>"
363 }
364
365 sub _readable_anyevent {
366 my $cb = Coro::rouse_cb;
367
368 my $w = AE::io $_[0][0], 0, sub { $cb->(1) };
369 my $t = (defined $_[0][2]) && AE::timer $_[0][2], 0, sub { $cb->(0) };
370
371 Coro::rouse_wait
372 }
373
374 sub _writable_anyevent {
375 my $cb = Coro::rouse_cb;
376
377 my $w = AE::io $_[0][0], 1, sub { $cb->(1) };
378 my $t = (defined $_[0][2]) && AE::timer $_[0][2], 0, sub { $cb->(0) };
379
380 Coro::rouse_wait
381 }
382
383 sub _readable_coro {
384 ($_[0][5] ||= "Coro::Event"->io (
385 fd => $_[0][0],
386 desc => "fh $_[0][1] read watcher",
387 timeout => $_[0][2],
388 poll => &Event::Watcher::R + &Event::Watcher::E + &Event::Watcher::T,
389 ))->next->[4] & &Event::Watcher::R
390 }
391
392 sub _writable_coro {
393 ($_[0][6] ||= "Coro::Event"->io (
394 fd => $_[0][0],
395 desc => "fh $_[0][1] write watcher",
396 timeout => $_[0][2],
397 poll => &Event::Watcher::W + &Event::Watcher::E + &Event::Watcher::T,
398 ))->next->[4] & &Event::Watcher::W
399 }
400
401 #sub _readable_ev {
402 # &EV::READ == Coro::EV::timed_io_once (fileno $_[0][0], &EV::READ , $_[0][2])
403 #}
404 #
405 #sub _writable_ev {
406 # &EV::WRITE == Coro::EV::timed_io_once (fileno $_[0][0], &EV::WRITE, $_[0][2])
407 #}
408
409 # decide on event model at runtime
410 for my $rw (qw(readable writable)) {
411 *$rw = sub {
412 AnyEvent::detect;
413 if ($AnyEvent::MODEL eq "AnyEvent::Impl::Event" and eval { require Coro::Event }) {
414 *$rw = \&{"_$rw\_coro"};
415 *cleanup = sub {
416 eval {
417 $_[0][5]->cancel if $_[0][5];
418 $_[0][6]->cancel if $_[0][6];
419 };
420 @{$_[0]} = ();
421 };
422
423 } elsif ($AnyEvent::MODEL eq "AnyEvent::Impl::EV" and eval { require Coro::EV }) {
424 *$rw = \&{"Coro::EV::_$rw\_ev"};
425 return &$rw; # Coro 5.0+ doesn't support goto &SLF, and this line is executed once only
426
427 } else {
428 *$rw = \&{"_$rw\_anyevent"};
429 }
430 goto &$rw
431 };
432 };
433
434 sub WRITE {
435 my $len = defined $_[2] ? $_[2] : length $_[1];
436 my $ofs = $_[3];
437 my $res;
438
439 while () {
440 my $r = syswrite ($_[0][0], $_[1], $len, $ofs);
441 if (defined $r) {
442 $len -= $r;
443 $ofs += $r;
444 $res += $r;
445 last unless $len;
446 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
447 last;
448 }
449 last unless &writable;
450 }
451
452 $res
453 }
454
455 sub READ {
456 my $len = $_[2];
457 my $ofs = $_[3];
458 my $res;
459
460 # first deplete the read buffer
461 if (length $_[0][3]) {
462 my $l = length $_[0][3];
463 if ($l <= $len) {
464 substr ($_[1], $ofs) = $_[0][3]; $_[0][3] = "";
465 $len -= $l;
466 $ofs += $l;
467 $res += $l;
468 return $res unless $len;
469 } else {
470 substr ($_[1], $ofs) = substr ($_[0][3], 0, $len);
471 substr ($_[0][3], 0, $len) = "";
472 return $len;
473 }
474 }
475
476 while() {
477 my $r = sysread $_[0][0], $_[1], $len, $ofs;
478 if (defined $r) {
479 $len -= $r;
480 $ofs += $r;
481 $res += $r;
482 last unless $len && $r;
483 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
484 last;
485 }
486 last if $_[0][8] || !&readable;
487 }
488
489 $res
490 }
491
492 sub READLINE {
493 my $irs = @_ > 1 ? $_[1] : $/;
494 my ($ofs, $len, $pos);
495
496 while () {
497 if (length $irs) {
498 $pos = index $_[0][3], $irs, $ofs < 0 ? 0 : $ofs;
499
500 return substr $_[0][3], 0, $pos + length $irs, ""
501 if $pos >= 0;
502
503 $ofs = (length $_[0][3]) - (length $irs);
504 } elsif (defined $irs) {
505 $pos = index $_[0][3], "\n\n", $ofs < 1 ? 1 : $ofs;
506
507 if ($pos >= 0) {
508 my $res = substr $_[0][3], 0, $pos + 2, "";
509 $res =~ s/\A\n+//;
510 return $res;
511 }
512
513 $ofs = (length $_[0][3]) - 1;
514 }
515
516 $len = sysread $_[0][0], $_[0][3], $len + 4096, length $_[0][3];
517
518 unless ($len) {
519 if (defined $len) {
520 # EOF
521 return undef unless length $_[0][3];
522
523 $_[0][3] =~ s/\A\n+//
524 if ! length $irs && defined $irs;
525
526 return delete $_[0][3];
527 } elsif (($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) || !&readable) {
528 return length $_[0][3] ? delete $_[0][3] : undef;
529 }
530 }
531 }
532 }
533
534 1;
535
536 =back
537
538 =head1 BUGS
539
540 - Perl's IO-Handle model is THE bug.
541
542 =head1 AUTHOR
543
544 Marc Lehmann <schmorp@schmorp.de>
545 http://home.schmorp.de/
546
547 =cut
548