ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Handle.pm
Revision: 1.76
Committed: Tue Jun 23 23:40:06 2009 UTC (14 years, 11 months ago) by root
Branch: MAIN
CVS Tags: rel-5_14
Changes since 1.75: +1 -1 lines
Log Message:
5.14

File Contents

# Content
1 =head1 NAME
2
3 Coro::Handle - non-blocking I/O with a blocking interface.
4
5 =head1 SYNOPSIS
6
7 use Coro::Handle;
8
9 =head1 DESCRIPTION
10
11 This module is an L<AnyEvent> user, you need to make sure that you use and
12 run a supported event loop.
13
14 This module implements IO-handles in a coroutine-compatible way, that is,
15 other coroutines can run while reads or writes block on the handle.
16
17 It does so by using L<AnyEvent|AnyEvent> to wait for readable/writable
18 data, allowing other coroutines to run while one coroutine waits for I/O.
19
20 Coro::Handle does NOT inherit from IO::Handle but uses tied objects.
21
22 If at all possible, you should I<always> prefer method calls on the handle object over invoking
23 tied methods, i.e.:
24
25 $fh->print ($str); # NOT print $fh $str;
26 my $line = $fh->readline; # NOT my $line = <$fh>;
27
28 The reason is that perl recurses within the interpreter when invoking tie
29 magic, forcing the (temporary) allocation of a (big) stack. If you have
30 lots of socket connections and they happen to wait in e.g. <$fh>, then
31 they would all have a costly C coroutine associated with them.
32
33 =over 4
34
35 =cut
36
37 package Coro::Handle;
38
39 no warnings;
40 use strict;
41
42 use Carp ();
43 use Errno qw(EAGAIN EINTR EINPROGRESS);
44
45 use AnyEvent::Util qw(WSAEWOULDBLOCK WSAEINPROGRESS);
46
47 use base 'Exporter';
48
49 our $VERSION = 5.14;
50 our @EXPORT = qw(unblock);
51
52 =item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...]
53
54 Create a new non-blocking io-handle using the given
55 perl-filehandle. Returns C<undef> if no filehandle is given. The only
56 other supported argument is "timeout", which sets a timeout for each
57 operation.
58
59 =cut
60
61 sub new_from_fh {
62 my $class = shift;
63 my $fh = shift or return;
64 my $self = do { local *Coro::Handle };
65
66 tie $self, 'Coro::Handle::FH', fh => $fh, @_;
67
68 bless \$self, ref $class ? ref $class : $class
69 }
70
71 =item $fh = unblock $fh
72
73 This is a convinience function that just calls C<new_from_fh> on the
74 given filehandle. Use it to replace a normal perl filehandle by a
75 non-(coroutine-)blocking equivalent.
76
77 =cut
78
79 sub unblock($) {
80 new_from_fh Coro::Handle $_[0]
81 }
82
83 =item $fh->writable, $fh->readable
84
85 Wait until the filehandle is readable or writable (and return true) or
86 until an error condition happens (and return false).
87
88 =cut
89
90 sub readable { Coro::Handle::FH::readable (tied ${$_[0]}) }
91 sub writable { Coro::Handle::FH::writable (tied ${$_[0]}) }
92
93 =item $fh->readline ([$terminator])
94
95 Like the builtin of the same name, but allows you to specify the input
96 record separator in a coroutine-safe manner (i.e. not using a global
97 variable).
98
99 =cut
100
101 sub readline { tied(${+shift})->READLINE (@_) }
102
103 =item $fh->autoflush ([...])
104
105 Always returns true, arguments are being ignored (exists for compatibility
106 only). Might change in the future.
107
108 =cut
109
110 sub autoflush { !0 }
111
112 =item $fh->fileno, $fh->close, $fh->read, $fh->sysread, $fh->syswrite, $fh->print, $fh->printf
113
114 Work like their function equivalents (except read, which works like
115 sysread. You should not use the read function with Coro::Handle's, it will
116 work but it's not efficient).
117
118 =cut
119
120 sub read { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
121 sub sysread { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
122 sub syswrite { Coro::Handle::FH::WRITE (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
123 sub print { Coro::Handle::FH::WRITE (tied ${+shift}, join "", @_) }
124 sub printf { Coro::Handle::FH::PRINTF (tied ${+shift}, @_) }
125 sub fileno { Coro::Handle::FH::FILENO (tied ${$_[0]}) }
126 sub close { Coro::Handle::FH::CLOSE (tied ${$_[0]}) }
127 sub blocking { !0 } # this handler always blocks the caller
128
129 sub partial {
130 my $obj = tied ${$_[0]};
131
132 my $retval = $obj->[8];
133 $obj->[8] = $_[1] if @_ > 1;
134 $retval
135 }
136
137 =item connect, listen, bind, getsockopt, setsockopt,
138 send, recv, peername, sockname, shutdown, peerport, peerhost
139
140 Do the same thing as the perl builtins or IO::Socket methods (but return
141 true on EINPROGRESS). Remember that these must be method calls.
142
143 =cut
144
145 sub connect { connect tied(${$_[0]})->[0], $_[1] or $! == EINPROGRESS or $! == WSAEINPROGRESS }
146 sub bind { bind tied(${$_[0]})->[0], $_[1] }
147 sub listen { listen tied(${$_[0]})->[0], $_[1] }
148 sub getsockopt { getsockopt tied(${$_[0]})->[0], $_[1], $_[2] }
149 sub setsockopt { setsockopt tied(${$_[0]})->[0], $_[1], $_[2], $_[3] }
150 sub send { send tied(${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
151 sub recv { recv tied(${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
152 sub sockname { getsockname tied(${$_[0]})->[0] }
153 sub peername { getpeername tied(${$_[0]})->[0] }
154 sub shutdown { shutdown tied(${$_[0]})->[0], $_[1] }
155
156 =item ($fh, $peername) = $listen_fh->accept
157
158 In scalar context, returns the newly accepted socket (or undef) and in
159 list context return the ($fh, $peername) pair (or nothing).
160
161 =cut
162
163 sub accept {
164 my ($peername, $fh);
165 while () {
166 $peername = accept $fh, tied(${$_[0]})->[0]
167 and return wantarray
168 ? ($_[0]->new_from_fh($fh), $peername)
169 : $_[0]->new_from_fh($fh);
170
171 return if $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK;
172
173 $_[0]->readable or return;
174 }
175 }
176
177 =item $fh->timeout ([...])
178
179 The optional argument sets the new timeout (in seconds) for this
180 handle. Returns the current (new) value.
181
182 C<0> is a valid timeout, use C<undef> to disable the timeout.
183
184 =cut
185
186 sub timeout {
187 my $self = tied ${$_[0]};
188 if (@_ > 1) {
189 $self->[2] = $_[1];
190 $self->[5]->timeout ($_[1]) if $self->[5];
191 $self->[6]->timeout ($_[1]) if $self->[6];
192 }
193 $self->[2]
194 }
195
196 =item $fh->fh
197
198 Returns the "real" (non-blocking) filehandle. Use this if you want to
199 do operations on the file handle you cannot do using the Coro::Handle
200 interface.
201
202 =item $fh->rbuf
203
204 Returns the current contents of the read buffer (this is an lvalue, so you
205 can change the read buffer if you like).
206
207 You can use this function to implement your own optimized reader when neither
208 readline nor sysread are viable candidates, like this:
209
210 # first get the _real_ non-blocking filehandle
211 # and fetch a reference to the read buffer
212 my $nb_fh = $fh->fh;
213 my $buf = \$fh->rbuf;
214
215 while () {
216 # now use buffer contents, modifying
217 # if necessary to reflect the removed data
218
219 last if $$buf ne ""; # we have leftover data
220
221 # read another buffer full of data
222 $fh->readable or die "end of file";
223 sysread $nb_fh, $$buf, 8192;
224 }
225
226 =cut
227
228 sub fh {
229 (tied ${$_[0]})->[0];
230 }
231
232 sub rbuf : lvalue {
233 (tied ${$_[0]})->[3];
234 }
235
236 sub DESTROY {
237 # nop
238 }
239
240 our $AUTOLOAD;
241
242 sub AUTOLOAD {
243 my $self = tied ${$_[0]};
244
245 (my $func = $AUTOLOAD) =~ s/^(.*):://;
246
247 my $forward = UNIVERSAL::can $self->[7], $func;
248
249 $forward or
250 die "Can't locate object method \"$func\" via package \"" . (ref $self) . "\"";
251
252 goto &$forward;
253 }
254
255 package Coro::Handle::FH;
256
257 no warnings;
258 use strict;
259
260 use Carp 'croak';
261 use Errno qw(EAGAIN EINTR);
262
263 use AnyEvent ();
264 use AnyEvent::Util qw(WSAEWOULDBLOCK);
265
266 use Coro::AnyEvent;
267
268 # formerly a hash, but we are speed-critical, so try
269 # to be faster even if it hurts.
270 #
271 # 0 FH
272 # 1 desc
273 # 2 timeout
274 # 3 rb
275 # 4 wb # unused
276 # 5 read watcher, if Coro::Event|EV used
277 # 6 write watcher, if Coro::Event|EV used
278 # 7 forward class
279 # 8 blocking
280
281 sub TIEHANDLE {
282 my ($class, %arg) = @_;
283
284 my $self = bless [], $class;
285 $self->[0] = $arg{fh};
286 $self->[1] = $arg{desc};
287 $self->[2] = $arg{timeout};
288 $self->[3] = "";
289 $self->[4] = "";
290 $self->[7] = $arg{forward_class};
291 $self->[8] = $arg{partial};
292
293 AnyEvent::Util::fh_nonblocking $self->[0], 1;
294
295 $self
296 }
297
298 sub cleanup {
299 # gets overriden for Coro::Event
300 @{$_[0]} = ();
301 }
302
303 sub OPEN {
304 &cleanup;
305 my $self = shift;
306 my $r = @_ == 2 ? open $self->[0], $_[0], $_[1]
307 : open $self->[0], $_[0], $_[1], $_[2];
308
309 if ($r) {
310 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
311 or croak "fcntl(O_NONBLOCK): $!";
312 }
313
314 $r
315 }
316
317 sub PRINT {
318 WRITE (shift, join "", @_)
319 }
320
321 sub PRINTF {
322 WRITE (shift, sprintf shift, @_)
323 }
324
325 sub GETC {
326 my $buf;
327 READ ($_[0], $buf, 1);
328 $buf
329 }
330
331 sub BINMODE {
332 binmode $_[0][0];
333 }
334
335 sub TELL {
336 Carp::croak "Coro::Handle's don't support tell()";
337 }
338
339 sub SEEK {
340 Carp::croak "Coro::Handle's don't support seek()";
341 }
342
343 sub EOF {
344 Carp::croak "Coro::Handle's don't support eof()";
345 }
346
347 sub CLOSE {
348 &cleanup;
349 close $_[0][0]
350 }
351
352 sub DESTROY {
353 &cleanup;
354 }
355
356 sub FILENO {
357 fileno $_[0][0]
358 }
359
360 # seems to be called for stringification (how weird), at least
361 # when DumpValue::dumpValue is used to print this.
362 sub FETCH {
363 "$_[0]<$_[0][1]>"
364 }
365
366 sub _readable_anyevent {
367 my $cb = Coro::rouse_cb;
368 my $io = 1;
369
370 my $w = AnyEvent->io (
371 fh => $_[0][0],
372 poll => 'r',
373 cb => $cb,
374 );
375
376 my $t = (defined $_[0][2]) && AnyEvent->timer (
377 after => $_[0][2],
378 cb => sub {
379 $io = 0;
380 $cb->();
381 },
382 );
383
384 Coro::rouse_wait;
385
386 $io
387 }
388
389 sub _writable_anyevent {
390 my $cb = Coro::rouse_cb;
391 my $io = 1;
392
393 my $w = AnyEvent->io (
394 fh => $_[0][0],
395 poll => 'w',
396 cb => $cb,
397 );
398
399 my $t = (defined $_[0][2]) && AnyEvent->timer (
400 after => $_[0][2],
401 cb => sub {
402 $io = 0;
403 $cb->();
404 },
405 );
406
407 Coro::rouse_wait;
408
409 $io
410 }
411
412 sub _readable_coro {
413 ($_[0][5] ||= "Coro::Event"->io (
414 fd => $_[0][0],
415 desc => "fh $_[0][1] read watcher",
416 timeout => $_[0][2],
417 poll => &Event::Watcher::R + &Event::Watcher::E + &Event::Watcher::T,
418 ))->next->[4] & &Event::Watcher::R
419 }
420
421 sub _writable_coro {
422 ($_[0][6] ||= "Coro::Event"->io (
423 fd => $_[0][0],
424 desc => "fh $_[0][1] write watcher",
425 timeout => $_[0][2],
426 poll => &Event::Watcher::W + &Event::Watcher::E + &Event::Watcher::T,
427 ))->next->[4] & &Event::Watcher::W
428 }
429
430 #sub _readable_ev {
431 # &EV::READ == Coro::EV::timed_io_once (fileno $_[0][0], &EV::READ , $_[0][2])
432 #}
433 #
434 #sub _writable_ev {
435 # &EV::WRITE == Coro::EV::timed_io_once (fileno $_[0][0], &EV::WRITE, $_[0][2])
436 #}
437
438 # decide on event model at runtime
439 for my $rw (qw(readable writable)) {
440 no strict 'refs';
441
442 *$rw = sub {
443 AnyEvent::detect;
444 if ($AnyEvent::MODEL eq "AnyEvent::Impl::Event" and eval { require Coro::Event }) {
445 *$rw = \&{"_$rw\_coro"};
446 *cleanup = sub {
447 eval {
448 $_[0][5]->cancel if $_[0][5];
449 $_[0][6]->cancel if $_[0][6];
450 };
451 @{$_[0]} = ();
452 };
453
454 } elsif ($AnyEvent::MODEL eq "AnyEvent::Impl::EV" and eval { require Coro::EV }) {
455 *$rw = \&{"Coro::EV::_$rw\_ev"};
456 return &$rw; # Coro 5.0+ doesn't support goto &SLF, and this line is executed once only
457
458 } else {
459 *$rw = \&{"_$rw\_anyevent"};
460 }
461 goto &$rw
462 };
463 };
464
465 sub WRITE {
466 my $len = defined $_[2] ? $_[2] : length $_[1];
467 my $ofs = $_[3];
468 my $res = 0;
469
470 while () {
471 my $r = syswrite ($_[0][0], $_[1], $len, $ofs);
472 if (defined $r) {
473 $len -= $r;
474 $ofs += $r;
475 $res += $r;
476 last unless $len;
477 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
478 last;
479 }
480 last unless &writable;
481 }
482
483 return $res;
484 }
485
486 sub READ {
487 my $len = $_[2];
488 my $ofs = $_[3];
489 my $res = 0;
490
491 # first deplete the read buffer
492 if (length $_[0][3]) {
493 my $l = length $_[0][3];
494 if ($l <= $len) {
495 substr ($_[1], $ofs) = $_[0][3]; $_[0][3] = "";
496 $len -= $l;
497 $ofs += $l;
498 $res += $l;
499 return $res unless $len;
500 } else {
501 substr ($_[1], $ofs) = substr ($_[0][3], 0, $len);
502 substr ($_[0][3], 0, $len) = "";
503 return $len;
504 }
505 }
506
507 while() {
508 my $r = sysread $_[0][0], $_[1], $len, $ofs;
509 if (defined $r) {
510 $len -= $r;
511 $ofs += $r;
512 $res += $r;
513 last unless $len && $r;
514 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
515 last;
516 }
517 last if $_[0][8] || !&readable;
518 }
519
520 return $res;
521 }
522
523 sub READLINE {
524 my $irs = @_ > 1 ? $_[1] : $/;
525 my ($ofs, $len);
526
527 while () {
528 if (defined $irs) {
529 my $pos = index $_[0][3], $irs, $ofs < 0 ? 0 : $ofs;
530 if ($pos >= 0) {
531 $pos += length $irs;
532 my $res = substr $_[0][3], 0, $pos;
533 substr ($_[0][3], 0, $pos) = "";
534 return $res;
535 }
536
537 $ofs = (length $_[0][3]) - (length $irs);
538 }
539
540 $len = sysread $_[0][0], $_[0][3], $len + 4096, length $_[0][3];
541 if (defined $len) {
542 return length $_[0][3] ? delete $_[0][3] : undef
543 unless $len;
544 } elsif (($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) || !&readable) {
545 return length $_[0][3] ? delete $_[0][3] : undef;
546 }
547 }
548 }
549
550 1;
551
552 =back
553
554 =head1 BUGS
555
556 - Perl's IO-Handle model is THE bug.
557
558 =head1 AUTHOR
559
560 Marc Lehmann <schmorp@schmorp.de>
561 http://home.schmorp.de/
562
563 =cut
564