ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Handle.pm
Revision: 1.47
Committed: Tue Sep 23 00:02:20 2008 UTC (15 years, 8 months ago) by root
Branch: MAIN
CVS Tags: rel-4_747
Changes since 1.46: +1 -1 lines
Log Message:
4.747

File Contents

# Content
1 =head1 NAME
2
3 Coro::Handle - non-blocking io with a blocking interface.
4
5 =head1 SYNOPSIS
6
7 use Coro::Handle;
8
9 =head1 DESCRIPTION
10
11 This module is an L<AnyEvent> user, you need to make sure that you use and
12 run a supported event loop.
13
14 This module implements IO-handles in a coroutine-compatible way, that is,
15 other coroutines can run while reads or writes block on the handle.
16
17 It does so by using L<AnyEvent|AnyEvent> to wait for readable/writable
18 data, allowing other coroutines to run while one coroutine waits for I/O.
19
20 Coro::Handle does NOT inherit from IO::Handle but uses tied objects.
21
22 If at all possible, you should I<always> prefer method calls on the handle object over invoking
23 tied methods, i.e.:
24
25 $fh->print ($str); # NOT print $fh $str;
26 my $line = $fh->readline; # NOT my $line = <$fh>;
27
28 The reason is that perl recurses within the interpreter when invoking tie
29 magic, forcing the (temporary) allocation of a (big) stack. If you have
30 lots of socket connections and they happen to wait in e.g. <$fh>, then
31 they would all have a costly C coroutine associated with them.
32
33 =over 4
34
35 =cut
36
37 package Coro::Handle;
38
39 no warnings;
40 use strict;
41
42 use Carp ();
43 use Errno qw(EAGAIN EINTR EINPROGRESS);
44
45 use AnyEvent::Util qw(WSAEWOULDBLOCK WSAEINPROGRESS);
46
47 use base 'Exporter';
48
49 our $VERSION = 4.747;
50 our @EXPORT = qw(unblock);
51
52 =item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...]
53
54 Create a new non-blocking io-handle using the given
55 perl-filehandle. Returns C<undef> if no filehandle is given. The only
56 other supported argument is "timeout", which sets a timeout for each
57 operation.
58
59 =cut
60
61 sub new_from_fh {
62 my $class = shift;
63 my $fh = shift or return;
64 my $self = do { local *Coro::Handle };
65
66 tie $self, 'Coro::Handle::FH', fh => $fh, @_;
67
68 bless \$self, ref $class ? ref $class : $class
69 }
70
71 =item $fh = unblock $fh
72
73 This is a convinience function that just calls C<new_from_fh> on the
74 given filehandle. Use it to replace a normal perl filehandle by a
75 non-(coroutine-)blocking equivalent.
76
77 =cut
78
79 sub unblock($) {
80 new_from_fh Coro::Handle $_[0]
81 }
82
83 =item $fh->writable, $fh->readable
84
85 Wait until the filehandle is readable or writable (and return true) or
86 until an error condition happens (and return false).
87
88 =cut
89
90 sub readable { Coro::Handle::FH::readable (tied ${$_[0]}) }
91 sub writable { Coro::Handle::FH::writable (tied ${$_[0]}) }
92
93 =item $fh->readline ([$terminator])
94
95 Like the builtin of the same name, but allows you to specify the input
96 record separator in a coroutine-safe manner (i.e. not using a global
97 variable).
98
99 =cut
100
101 sub readline { tied(${+shift})->READLINE (@_) }
102
103 =item $fh->autoflush ([...])
104
105 Always returns true, arguments are being ignored (exists for compatibility
106 only). Might change in the future.
107
108 =cut
109
110 sub autoflush { !0 }
111
112 =item $fh->fileno, $fh->close, $fh->read, $fh->sysread, $fh->syswrite, $fh->print, $fh->printf
113
114 Work like their function equivalents (except read, which works like
115 sysread. You should not use the read function with Coro::Handle's, it will
116 work but it's not efficient).
117
118 =cut
119
120 sub read { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
121 sub sysread { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
122 sub syswrite { Coro::Handle::FH::WRITE (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
123 sub print { Coro::Handle::FH::WRITE (tied ${+shift}, join "", @_) }
124 sub printf { Coro::Handle::FH::PRINTF (tied ${+shift}, @_) }
125 sub fileno { Coro::Handle::FH::FILENO (tied ${$_[0]}) }
126 sub close { Coro::Handle::FH::CLOSE (tied ${$_[0]}) }
127 sub blocking { !0 } # this handler always blocks the caller
128
129 sub partial {
130 my $obj = tied ${$_[0]};
131
132 my $retval = $obj->[8];
133 $obj->[8] = $_[1] if @_ > 1;
134 $retval
135 }
136
137 =item connect, listen, bind, getsockopt, setsockopt,
138 send, recv, peername, sockname, shutdown, peerport, peerhost
139
140 Do the same thing as the perl builtins or IO::Socket methods (but return
141 true on EINPROGRESS). Remember that these must be method calls.
142
143 =cut
144
145 sub connect { connect tied(${$_[0]})->[0], $_[1] or $! == EINPROGRESS or $! == WSAEINPROGRESS }
146 sub bind { bind tied(${$_[0]})->[0], $_[1] }
147 sub listen { listen tied(${$_[0]})->[0], $_[1] }
148 sub getsockopt { getsockopt tied(${$_[0]})->[0], $_[1], $_[2] }
149 sub setsockopt { setsockopt tied(${$_[0]})->[0], $_[1], $_[2], $_[3] }
150 sub send { send tied(${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
151 sub recv { recv tied(${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
152 sub sockname { getsockname tied(${$_[0]})->[0] }
153 sub peername { getpeername tied(${$_[0]})->[0] }
154 sub shutdown { shutdown tied(${$_[0]})->[0], $_[1] }
155
156 =item ($fh, $peername) = $listen_fh->accept
157
158 In scalar context, returns the newly accepted socket (or undef) and in
159 list context return the ($fh, $peername) pair (or nothing).
160
161 =cut
162
163 sub accept {
164 my ($peername, $fh);
165 while () {
166 $peername = accept $fh, tied(${$_[0]})->[0]
167 and return wantarray
168 ? ($_[0]->new_from_fh($fh), $peername)
169 : $_[0]->new_from_fh($fh);
170
171 return if $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK;
172
173 $_[0]->readable or return;
174 }
175 }
176
177 =item $fh->timeout ([...])
178
179 The optional argument sets the new timeout (in seconds) for this
180 handle. Returns the current (new) value.
181
182 C<0> is a valid timeout, use C<undef> to disable the timeout.
183
184 =cut
185
186 sub timeout {
187 my $self = tied ${$_[0]};
188 if (@_ > 1) {
189 $self->[2] = $_[1];
190 $self->[5]->timeout ($_[1]) if $self->[5];
191 $self->[6]->timeout ($_[1]) if $self->[6];
192 }
193 $self->[2]
194 }
195
196 =item $fh->fh
197
198 Returns the "real" (non-blocking) filehandle. Use this if you want to
199 do operations on the file handle you cannot do using the Coro::Handle
200 interface.
201
202 =item $fh->rbuf
203
204 Returns the current contents of the read buffer (this is an lvalue, so you
205 can change the read buffer if you like).
206
207 You can use this function to implement your own optimized reader when neither
208 readline nor sysread are viable candidates, like this:
209
210 # first get the _real_ non-blocking filehandle
211 # and fetch a reference to the read buffer
212 my $nb_fh = $fh->fh;
213 my $buf = \$fh->rbuf;
214
215 while () {
216 # now use buffer contents, modifying
217 # if necessary to reflect the removed data
218
219 last if $$buf ne ""; # we have leftover data
220
221 # read another buffer full of data
222 $fh->readable or die "end of file";
223 sysread $nb_fh, $$buf, 8192;
224 }
225
226 =cut
227
228 sub fh {
229 (tied ${$_[0]})->[0];
230 }
231
232 sub rbuf : lvalue {
233 (tied ${$_[0]})->[3];
234 }
235
236 sub DESTROY {
237 # nop
238 }
239
240 our $AUTOLOAD;
241
242 sub AUTOLOAD {
243 my $self = tied ${$_[0]};
244
245 (my $func = $AUTOLOAD) =~ s/^(.*):://;
246
247 my $forward = UNIVERSAL::can $self->[7], $func;
248
249 $forward or
250 die "Can't locate object method \"$func\" via package \"" . (ref $self) . "\"";
251
252 goto &$forward;
253 }
254
255 package Coro::Handle::FH;
256
257 no warnings;
258 use strict;
259
260 use Carp 'croak';
261 use Errno qw(EAGAIN EINTR);
262
263 use AnyEvent ();
264 use AnyEvent::Util qw(WSAEWOULDBLOCK);
265
266 # formerly a hash, but we are speed-critical, so try
267 # to be faster even if it hurts.
268 #
269 # 0 FH
270 # 1 desc
271 # 2 timeout
272 # 3 rb
273 # 4 wb # unused
274 # 5 read watcher, if Coro::Event used
275 # 6 write watcher, if Coro::Event used
276 # 7 forward class
277 # 8 blocking
278
279 sub TIEHANDLE {
280 my ($class, %arg) = @_;
281
282 my $self = bless [], $class;
283 $self->[0] = $arg{fh};
284 $self->[1] = $arg{desc};
285 $self->[2] = $arg{timeout};
286 $self->[3] = "";
287 $self->[4] = "";
288 $self->[7] = $arg{forward_class};
289 $self->[8] = $arg{partial};
290
291 AnyEvent::Util::fh_nonblocking $self->[0], 1;
292
293 $self
294 }
295
296 sub cleanup {
297 $_[0][5]->cancel if $_[0][5];
298 $_[0][6]->cancel if $_[0][6];
299 @{$_[0]} = ();
300 }
301
302 sub OPEN {
303 &cleanup;
304 my $self = shift;
305 my $r = @_ == 2 ? open $self->[0], $_[0], $_[1]
306 : open $self->[0], $_[0], $_[1], $_[2];
307
308 if ($r) {
309 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
310 or croak "fcntl(O_NONBLOCK): $!";
311 }
312
313 $r
314 }
315
316 sub PRINT {
317 WRITE (shift, join "", @_)
318 }
319
320 sub PRINTF {
321 WRITE (shift, sprintf shift,@_)
322 }
323
324 sub GETC {
325 my $buf;
326 READ ($_[0], $buf, 1);
327 $buf
328 }
329
330 sub BINMODE {
331 binmode $_[0][0];
332 }
333
334 sub TELL {
335 Carp::croak "Coro::Handle's don't support tell()";
336 }
337
338 sub SEEK {
339 Carp::croak "Coro::Handle's don't support seek()";
340 }
341
342 sub EOF {
343 Carp::croak "Coro::Handle's don't support eof()";
344 }
345
346 sub CLOSE {
347 &cleanup;
348 close $_[0][0]
349 }
350
351 sub DESTROY {
352 &cleanup;
353 }
354
355 sub FILENO {
356 fileno $_[0][0]
357 }
358
359 # seems to be called for stringification (how weird), at least
360 # when DumpValue::dumpValue is used to print this.
361 sub FETCH {
362 "$_[0]<$_[0][1]>"
363 }
364
365 sub readable_anyevent {
366 my $current = $Coro::current;
367 my $io = 1;
368
369 my $w = AnyEvent->io (
370 fh => $_[0][0],
371 poll => 'r',
372 cb => sub {
373 $current->ready if $current;
374 undef $current;
375 },
376 );
377
378 my $t = (defined $_[0][2]) && AnyEvent->timer (
379 after => $_[0][2],
380 cb => sub {
381 $io = 0;
382 $current->ready if $current;
383 undef $current;
384 },
385 );
386
387 &Coro::schedule;
388 &Coro::schedule while $current;
389
390 $io
391 }
392
393 sub writable_anyevent {
394 my $current = $Coro::current;
395 my $io = 1;
396
397 my $w = AnyEvent->io (
398 fh => $_[0][0],
399 poll => 'w',
400 cb => sub {
401 $current->ready if $current;
402 undef $current;
403 },
404 );
405
406 my $t = (defined $_[0][2]) && AnyEvent->timer (
407 after => $_[0][2],
408 cb => sub {
409 $io = 0;
410 $current->ready if $current;
411 undef $current;
412 },
413 );
414
415 &Coro::schedule while $current;
416
417 $io
418 }
419
420 sub readable_coro {
421 ($_[0][5] ||= "Coro::Event"->io (
422 fd => $_[0][0],
423 desc => "fh $_[0][1] read watcher",
424 timeout => $_[0][2],
425 poll => &Event::Watcher::R + &Event::Watcher::E + &Event::Watcher::T,
426 ))->next->[4] & &Event::Watcher::R
427 }
428
429 sub writable_coro {
430 ($_[0][6] ||= "Coro::Event"->io (
431 fd => $_[0][0],
432 desc => "fh $_[0][1] write watcher",
433 timeout => $_[0][2],
434 poll => &Event::Watcher::W + &Event::Watcher::E + &Event::Watcher::T,
435 ))->next->[4] & &Event::Watcher::W
436 }
437
438 #sub readable_ev {
439 # &EV::READ == Coro::EV::timed_io_once (fileno $_[0][0], &EV::READ , $_[0][2])
440 #}
441 #
442 #sub writable_ev {
443 # &EV::WRITE == Coro::EV::timed_io_once (fileno $_[0][0], &EV::WRITE, $_[0][2])
444 #}
445
446 # decide on event model at runtime
447 for my $rw (qw(readable writable)) {
448 no strict 'refs';
449
450 *$rw = sub {
451 AnyEvent::detect;
452 if ($AnyEvent::MODEL eq "AnyEvent::Impl::Coro" or $AnyEvent::MODEL eq "AnyEvent::Impl::Event") {
453 require Coro::Event;
454 *$rw = \&{"$rw\_coro"};
455 } elsif ($AnyEvent::MODEL eq "AnyEvent::Impl::CoroEV" or $AnyEvent::MODEL eq "AnyEvent::Impl::EV") {
456 require Coro::EV;
457 *$rw = \&{"Coro::EV::$rw\_ev"};
458 } else {
459 *$rw = \&{"$rw\_anyevent"};
460 }
461 goto &$rw
462 };
463 };
464
465 sub WRITE {
466 my $len = defined $_[2] ? $_[2] : length $_[1];
467 my $ofs = $_[3];
468 my $res = 0;
469
470 while () {
471 my $r = syswrite ($_[0][0], $_[1], $len, $ofs);
472 if (defined $r) {
473 $len -= $r;
474 $ofs += $r;
475 $res += $r;
476 last unless $len;
477 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
478 last;
479 }
480 last unless &writable;
481 }
482
483 return $res;
484 }
485
486 sub READ {
487 my $len = $_[2];
488 my $ofs = $_[3];
489 my $res = 0;
490
491 # first deplete the read buffer
492 if (length $_[0][3]) {
493 my $l = length $_[0][3];
494 if ($l <= $len) {
495 substr ($_[1], $ofs) = $_[0][3]; $_[0][3] = "";
496 $len -= $l;
497 $ofs += $l;
498 $res += $l;
499 return $res unless $len;
500 } else {
501 substr ($_[1], $ofs) = substr ($_[0][3], 0, $len);
502 substr ($_[0][3], 0, $len) = "";
503 return $len;
504 }
505 }
506
507 while() {
508 my $r = sysread $_[0][0], $_[1], $len, $ofs;
509 if (defined $r) {
510 $len -= $r;
511 $ofs += $r;
512 $res += $r;
513 last unless $len && $r;
514 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
515 last;
516 }
517 last if $_[0][8] || !&readable;
518 }
519
520 return $res;
521 }
522
523 sub READLINE {
524 my $irs = @_ > 1 ? $_[1] : $/;
525
526 while() {
527 if (defined $irs) {
528 my $pos = index $_[0][3], $irs;
529 if ($pos >= 0) {
530 $pos += length $irs;
531 my $res = substr $_[0][3], 0, $pos;
532 substr ($_[0][3], 0, $pos) = "";
533 return $res;
534 }
535 }
536
537 my $r = sysread $_[0][0], $_[0][3], 8192, length $_[0][3];
538 if (defined $r) {
539 return length $_[0][3] ? delete $_[0][3] : undef unless $r;
540 } elsif (($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) || !&readable) {
541 return length $_[0][3] ? delete $_[0][3] : undef;
542 }
543 }
544 }
545
546 1;
547
548 =back
549
550 =head1 BUGS
551
552 - Perl's IO-Handle model is THE bug.
553
554 =head1 AUTHOR
555
556 Marc Lehmann <schmorp@schmorp.de>
557 http://home.schmorp.de/
558
559 =cut
560