ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Handle.pm
Revision: 1.67
Committed: Mon Nov 24 04:56:38 2008 UTC (15 years, 6 months ago) by root
Branch: MAIN
Changes since 1.66: +14 -21 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 Coro::Handle - non-blocking I/O with a blocking interface.
4
5 =head1 SYNOPSIS
6
7 use Coro::Handle;
8
9 =head1 DESCRIPTION
10
11 This module is an L<AnyEvent> user, you need to make sure that you use and
12 run a supported event loop.
13
14 This module implements IO-handles in a coroutine-compatible way, that is,
15 other coroutines can run while reads or writes block on the handle.
16
17 It does so by using L<AnyEvent|AnyEvent> to wait for readable/writable
18 data, allowing other coroutines to run while one coroutine waits for I/O.
19
20 Coro::Handle does NOT inherit from IO::Handle but uses tied objects.
21
22 If at all possible, you should I<always> prefer method calls on the handle object over invoking
23 tied methods, i.e.:
24
25 $fh->print ($str); # NOT print $fh $str;
26 my $line = $fh->readline; # NOT my $line = <$fh>;
27
28 The reason is that perl recurses within the interpreter when invoking tie
29 magic, forcing the (temporary) allocation of a (big) stack. If you have
30 lots of socket connections and they happen to wait in e.g. <$fh>, then
31 they would all have a costly C coroutine associated with them.
32
33 =over 4
34
35 =cut
36
37 package Coro::Handle;
38
39 no warnings;
40 use strict;
41
42 use Carp ();
43 use Errno qw(EAGAIN EINTR EINPROGRESS);
44
45 use AnyEvent::Util qw(WSAEWOULDBLOCK WSAEINPROGRESS);
46
47 use base 'Exporter';
48
49 our $VERSION = "5.0";
50 our @EXPORT = qw(unblock);
51
52 =item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...]
53
54 Create a new non-blocking io-handle using the given
55 perl-filehandle. Returns C<undef> if no filehandle is given. The only
56 other supported argument is "timeout", which sets a timeout for each
57 operation.
58
59 =cut
60
61 sub new_from_fh {
62 my $class = shift;
63 my $fh = shift or return;
64 my $self = do { local *Coro::Handle };
65
66 tie $self, 'Coro::Handle::FH', fh => $fh, @_;
67
68 bless \$self, ref $class ? ref $class : $class
69 }
70
71 =item $fh = unblock $fh
72
73 This is a convinience function that just calls C<new_from_fh> on the
74 given filehandle. Use it to replace a normal perl filehandle by a
75 non-(coroutine-)blocking equivalent.
76
77 =cut
78
79 sub unblock($) {
80 new_from_fh Coro::Handle $_[0]
81 }
82
83 =item $fh->writable, $fh->readable
84
85 Wait until the filehandle is readable or writable (and return true) or
86 until an error condition happens (and return false).
87
88 =cut
89
90 sub readable { Coro::Handle::FH::readable (tied ${$_[0]}) }
91 sub writable { Coro::Handle::FH::writable (tied ${$_[0]}) }
92
93 =item $fh->readline ([$terminator])
94
95 Like the builtin of the same name, but allows you to specify the input
96 record separator in a coroutine-safe manner (i.e. not using a global
97 variable).
98
99 =cut
100
101 sub readline { tied(${+shift})->READLINE (@_) }
102
103 =item $fh->autoflush ([...])
104
105 Always returns true, arguments are being ignored (exists for compatibility
106 only). Might change in the future.
107
108 =cut
109
110 sub autoflush { !0 }
111
112 =item $fh->fileno, $fh->close, $fh->read, $fh->sysread, $fh->syswrite, $fh->print, $fh->printf
113
114 Work like their function equivalents (except read, which works like
115 sysread. You should not use the read function with Coro::Handle's, it will
116 work but it's not efficient).
117
118 =cut
119
120 sub read { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
121 sub sysread { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
122 sub syswrite { Coro::Handle::FH::WRITE (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
123 sub print { Coro::Handle::FH::WRITE (tied ${+shift}, join "", @_) }
124 sub printf { Coro::Handle::FH::PRINTF (tied ${+shift}, @_) }
125 sub fileno { Coro::Handle::FH::FILENO (tied ${$_[0]}) }
126 sub close { Coro::Handle::FH::CLOSE (tied ${$_[0]}) }
127 sub blocking { !0 } # this handler always blocks the caller
128
129 sub partial {
130 my $obj = tied ${$_[0]};
131
132 my $retval = $obj->[8];
133 $obj->[8] = $_[1] if @_ > 1;
134 $retval
135 }
136
137 =item connect, listen, bind, getsockopt, setsockopt,
138 send, recv, peername, sockname, shutdown, peerport, peerhost
139
140 Do the same thing as the perl builtins or IO::Socket methods (but return
141 true on EINPROGRESS). Remember that these must be method calls.
142
143 =cut
144
145 sub connect { connect tied(${$_[0]})->[0], $_[1] or $! == EINPROGRESS or $! == WSAEINPROGRESS }
146 sub bind { bind tied(${$_[0]})->[0], $_[1] }
147 sub listen { listen tied(${$_[0]})->[0], $_[1] }
148 sub getsockopt { getsockopt tied(${$_[0]})->[0], $_[1], $_[2] }
149 sub setsockopt { setsockopt tied(${$_[0]})->[0], $_[1], $_[2], $_[3] }
150 sub send { send tied(${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
151 sub recv { recv tied(${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
152 sub sockname { getsockname tied(${$_[0]})->[0] }
153 sub peername { getpeername tied(${$_[0]})->[0] }
154 sub shutdown { shutdown tied(${$_[0]})->[0], $_[1] }
155
156 =item ($fh, $peername) = $listen_fh->accept
157
158 In scalar context, returns the newly accepted socket (or undef) and in
159 list context return the ($fh, $peername) pair (or nothing).
160
161 =cut
162
163 sub accept {
164 my ($peername, $fh);
165 while () {
166 $peername = accept $fh, tied(${$_[0]})->[0]
167 and return wantarray
168 ? ($_[0]->new_from_fh($fh), $peername)
169 : $_[0]->new_from_fh($fh);
170
171 return if $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK;
172
173 $_[0]->readable or return;
174 }
175 }
176
177 =item $fh->timeout ([...])
178
179 The optional argument sets the new timeout (in seconds) for this
180 handle. Returns the current (new) value.
181
182 C<0> is a valid timeout, use C<undef> to disable the timeout.
183
184 =cut
185
186 sub timeout {
187 my $self = tied ${$_[0]};
188 if (@_ > 1) {
189 $self->[2] = $_[1];
190 $self->[5]->timeout ($_[1]) if $self->[5];
191 $self->[6]->timeout ($_[1]) if $self->[6];
192 }
193 $self->[2]
194 }
195
196 =item $fh->fh
197
198 Returns the "real" (non-blocking) filehandle. Use this if you want to
199 do operations on the file handle you cannot do using the Coro::Handle
200 interface.
201
202 =item $fh->rbuf
203
204 Returns the current contents of the read buffer (this is an lvalue, so you
205 can change the read buffer if you like).
206
207 You can use this function to implement your own optimized reader when neither
208 readline nor sysread are viable candidates, like this:
209
210 # first get the _real_ non-blocking filehandle
211 # and fetch a reference to the read buffer
212 my $nb_fh = $fh->fh;
213 my $buf = \$fh->rbuf;
214
215 while () {
216 # now use buffer contents, modifying
217 # if necessary to reflect the removed data
218
219 last if $$buf ne ""; # we have leftover data
220
221 # read another buffer full of data
222 $fh->readable or die "end of file";
223 sysread $nb_fh, $$buf, 8192;
224 }
225
226 =cut
227
228 sub fh {
229 (tied ${$_[0]})->[0];
230 }
231
232 sub rbuf : lvalue {
233 (tied ${$_[0]})->[3];
234 }
235
236 sub DESTROY {
237 # nop
238 }
239
240 our $AUTOLOAD;
241
242 sub AUTOLOAD {
243 my $self = tied ${$_[0]};
244
245 (my $func = $AUTOLOAD) =~ s/^(.*):://;
246
247 my $forward = UNIVERSAL::can $self->[7], $func;
248
249 $forward or
250 die "Can't locate object method \"$func\" via package \"" . (ref $self) . "\"";
251
252 goto &$forward;
253 }
254
255 package Coro::Handle::FH;
256
257 no warnings;
258 use strict;
259
260 use Carp 'croak';
261 use Errno qw(EAGAIN EINTR);
262
263 use AnyEvent ();
264 use AnyEvent::Util qw(WSAEWOULDBLOCK);
265
266 use Coro::AnyEvent;
267
268 # formerly a hash, but we are speed-critical, so try
269 # to be faster even if it hurts.
270 #
271 # 0 FH
272 # 1 desc
273 # 2 timeout
274 # 3 rb
275 # 4 wb # unused
276 # 5 read watcher, if Coro::Event|EV used
277 # 6 write watcher, if Coro::Event|EV used
278 # 7 forward class
279 # 8 blocking
280
281 sub TIEHANDLE {
282 my ($class, %arg) = @_;
283
284 my $self = bless [], $class;
285 $self->[0] = $arg{fh};
286 $self->[1] = $arg{desc};
287 $self->[2] = $arg{timeout};
288 $self->[3] = "";
289 $self->[4] = "";
290 $self->[7] = $arg{forward_class};
291 $self->[8] = $arg{partial};
292
293 AnyEvent::Util::fh_nonblocking $self->[0], 1;
294
295 $self
296 }
297
298 sub cleanup {
299 eval {
300 $_[0][5]->cancel if $_[0][5];
301 $_[0][6]->cancel if $_[0][6];
302 };
303 @{$_[0]} = ();
304 }
305
306 sub OPEN {
307 &cleanup;
308 my $self = shift;
309 my $r = @_ == 2 ? open $self->[0], $_[0], $_[1]
310 : open $self->[0], $_[0], $_[1], $_[2];
311
312 if ($r) {
313 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
314 or croak "fcntl(O_NONBLOCK): $!";
315 }
316
317 $r
318 }
319
320 sub PRINT {
321 WRITE (shift, join "", @_)
322 }
323
324 sub PRINTF {
325 WRITE (shift, sprintf shift, @_)
326 }
327
328 sub GETC {
329 my $buf;
330 READ ($_[0], $buf, 1);
331 $buf
332 }
333
334 sub BINMODE {
335 binmode $_[0][0];
336 }
337
338 sub TELL {
339 Carp::croak "Coro::Handle's don't support tell()";
340 }
341
342 sub SEEK {
343 Carp::croak "Coro::Handle's don't support seek()";
344 }
345
346 sub EOF {
347 Carp::croak "Coro::Handle's don't support eof()";
348 }
349
350 sub CLOSE {
351 &cleanup;
352 close $_[0][0]
353 }
354
355 sub DESTROY {
356 &cleanup;
357 }
358
359 sub FILENO {
360 fileno $_[0][0]
361 }
362
363 # seems to be called for stringification (how weird), at least
364 # when DumpValue::dumpValue is used to print this.
365 sub FETCH {
366 "$_[0]<$_[0][1]>"
367 }
368
369 sub readable_anyevent {
370 my $cb = Coro::rouse_cb;
371 my $io = 1;
372
373 my $w = AnyEvent->io (
374 fh => $_[0][0],
375 poll => 'r',
376 cb => $cb,
377 );
378
379 my $t = (defined $_[0][2]) && AnyEvent->timer (
380 after => $_[0][2],
381 cb => sub {
382 $io = 0;
383 $cb->();
384 },
385 );
386
387 Coro::rouse_wait;
388
389 $io
390 }
391
392 sub writable_anyevent {
393 my $cb = Coro::rouse_cb;
394 my $io = 1;
395
396 my $w = AnyEvent->io (
397 fh => $_[0][0],
398 poll => 'w',
399 cb => $cb,
400 );
401
402 my $t = (defined $_[0][2]) && AnyEvent->timer (
403 after => $_[0][2],
404 cb => sub {
405 $io = 0;
406 $cb->();
407 },
408 );
409
410 Coro::rouse_wait;
411
412 $io
413 }
414
415 sub readable_coro {
416 ($_[0][5] ||= "Coro::Event"->io (
417 fd => $_[0][0],
418 desc => "fh $_[0][1] read watcher",
419 timeout => $_[0][2],
420 poll => &Event::Watcher::R + &Event::Watcher::E + &Event::Watcher::T,
421 ))->next->[4] & &Event::Watcher::R
422 }
423
424 sub writable_coro {
425 ($_[0][6] ||= "Coro::Event"->io (
426 fd => $_[0][0],
427 desc => "fh $_[0][1] write watcher",
428 timeout => $_[0][2],
429 poll => &Event::Watcher::W + &Event::Watcher::E + &Event::Watcher::T,
430 ))->next->[4] & &Event::Watcher::W
431 }
432
433 #sub readable_ev {
434 # &EV::READ == Coro::EV::timed_io_once (fileno $_[0][0], &EV::READ , $_[0][2])
435 #}
436 #
437 #sub writable_ev {
438 # &EV::WRITE == Coro::EV::timed_io_once (fileno $_[0][0], &EV::WRITE, $_[0][2])
439 #}
440
441 # decide on event model at runtime
442 for my $rw (qw(readable writable)) {
443 no strict 'refs';
444
445 *$rw = sub {
446 AnyEvent::detect;
447 if ($AnyEvent::MODEL eq "AnyEvent::Impl::Coro" or $AnyEvent::MODEL eq "AnyEvent::Impl::Event") {
448 require Coro::Event;
449 *$rw = \&{"$rw\_coro"};
450 } elsif ($AnyEvent::MODEL eq "AnyEvent::Impl::CoroEV" or $AnyEvent::MODEL eq "AnyEvent::Impl::EV") {
451 require Coro::EV;
452 *$rw = \&{"Coro::EV::$rw\_ev"};
453 return &$rw; # Coro 5.0+ doesn't support goto &SLF
454 } else {
455 *$rw = \&{"$rw\_anyevent"};
456 }
457 goto &$rw
458 };
459 };
460
461 sub WRITE {
462 my $len = defined $_[2] ? $_[2] : length $_[1];
463 my $ofs = $_[3];
464 my $res = 0;
465
466 while () {
467 my $r = syswrite ($_[0][0], $_[1], $len, $ofs);
468 if (defined $r) {
469 $len -= $r;
470 $ofs += $r;
471 $res += $r;
472 last unless $len;
473 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
474 last;
475 }
476 last unless &writable;
477 }
478
479 return $res;
480 }
481
482 sub READ {
483 my $len = $_[2];
484 my $ofs = $_[3];
485 my $res = 0;
486
487 # first deplete the read buffer
488 if (length $_[0][3]) {
489 my $l = length $_[0][3];
490 if ($l <= $len) {
491 substr ($_[1], $ofs) = $_[0][3]; $_[0][3] = "";
492 $len -= $l;
493 $ofs += $l;
494 $res += $l;
495 return $res unless $len;
496 } else {
497 substr ($_[1], $ofs) = substr ($_[0][3], 0, $len);
498 substr ($_[0][3], 0, $len) = "";
499 return $len;
500 }
501 }
502
503 while() {
504 my $r = sysread $_[0][0], $_[1], $len, $ofs;
505 if (defined $r) {
506 $len -= $r;
507 $ofs += $r;
508 $res += $r;
509 last unless $len && $r;
510 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
511 last;
512 }
513 last if $_[0][8] || !&readable;
514 }
515
516 return $res;
517 }
518
519 sub READLINE {
520 my $irs = @_ > 1 ? $_[1] : $/;
521 my ($ofs, $len);
522
523 while() {
524 if (defined $irs) {
525 my $pos = index $_[0][3], $irs, $ofs < 0 ? 0 : $ofs;
526 if ($pos >= 0) {
527 $pos += length $irs;
528 my $res = substr $_[0][3], 0, $pos;
529 substr ($_[0][3], 0, $pos) = "";
530 return $res;
531 }
532
533 $ofs = (length $_[0][3]) - (length $irs);
534 }
535
536 $len = sysread $_[0][0], $_[0][3], $len + 4096, length $_[0][3];
537 if (defined $len) {
538 return length $_[0][3] ? delete $_[0][3] : undef
539 unless $len;
540 } elsif (($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) || !&readable) {
541 return length $_[0][3] ? delete $_[0][3] : undef;
542 }
543 }
544 }
545
546 1;
547
548 =back
549
550 =head1 BUGS
551
552 - Perl's IO-Handle model is THE bug.
553
554 =head1 AUTHOR
555
556 Marc Lehmann <schmorp@schmorp.de>
557 http://home.schmorp.de/
558
559 =cut
560