ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Handle.pm
Revision: 1.115
Committed: Wed Mar 6 06:00:08 2013 UTC (11 years, 2 months ago) by root
Branch: MAIN
CVS Tags: rel-6_28
Changes since 1.114: +1 -1 lines
Log Message:
6.28

File Contents

# Content
1 =head1 NAME
2
3 Coro::Handle - non-blocking I/O with a blocking interface.
4
5 =head1 SYNOPSIS
6
7 use Coro::Handle;
8
9 =head1 DESCRIPTION
10
11 This module is an L<AnyEvent> user, you need to make sure that you use and
12 run a supported event loop.
13
14 This module implements IO-handles in a coroutine-compatible way, that is,
15 other coroutines can run while reads or writes block on the handle.
16
17 It does so by using L<AnyEvent|AnyEvent> to wait for readable/writable
18 data, allowing other coroutines to run while one coroutine waits for I/O.
19
20 Coro::Handle does NOT inherit from IO::Handle but uses tied objects.
21
22 If at all possible, you should I<always> prefer method calls on the handle object over invoking
23 tied methods, i.e.:
24
25 $fh->print ($str); # NOT print $fh $str;
26 my $line = $fh->readline; # NOT my $line = <$fh>;
27
28 The reason is that perl recurses within the interpreter when invoking tie
29 magic, forcing the (temporary) allocation of a (big) stack. If you have
30 lots of socket connections and they happen to wait in e.g. <$fh>, then
31 they would all have a costly C coroutine associated with them.
32
33 =over 4
34
35 =cut
36
37 package Coro::Handle;
38
39 use common::sense;
40
41 use Carp ();
42 use Errno qw(EAGAIN EINTR EINPROGRESS);
43
44 use AnyEvent::Util qw(WSAEWOULDBLOCK WSAEINPROGRESS);
45 use AnyEvent::Socket ();
46
47 use base 'Exporter';
48
49 our $VERSION = 6.28;
50 our @EXPORT = qw(unblock);
51
52 =item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...]
53
54 Create a new non-blocking io-handle using the given
55 perl-filehandle. Returns C<undef> if no filehandle is given. The only
56 other supported argument is "timeout", which sets a timeout for each
57 operation.
58
59 =cut
60
61 sub new_from_fh {
62 my $class = shift;
63 my $fh = shift or return;
64 my $self = do { local *Coro::Handle };
65
66 tie *$self, 'Coro::Handle::FH', fh => $fh, @_;
67
68 bless \$self, ref $class ? ref $class : $class
69 }
70
71 =item $fh = unblock $fh
72
73 This is a convenience function that just calls C<new_from_fh> on the
74 given filehandle. Use it to replace a normal perl filehandle by a
75 non-(coroutine-)blocking equivalent.
76
77 =cut
78
79 sub unblock($) {
80 new_from_fh Coro::Handle $_[0]
81 }
82
83 =item $fh->writable, $fh->readable
84
85 Wait until the filehandle is readable or writable (and return true) or
86 until an error condition happens (and return false).
87
88 =cut
89
90 sub readable { Coro::Handle::FH::readable (tied *${$_[0]}) }
91 sub writable { Coro::Handle::FH::writable (tied *${$_[0]}) }
92
93 =item $fh->readline ([$terminator])
94
95 Similar to the builtin of the same name, but allows you to specify the
96 input record separator in a coroutine-safe manner (i.e. not using a global
97 variable). Paragraph mode is not supported, use "\n\n" to achieve the same
98 effect.
99
100 =cut
101
102 sub readline { tied(*${+shift})->READLINE (@_) }
103
104 =item $fh->autoflush ([...])
105
106 Always returns true, arguments are being ignored (exists for compatibility
107 only). Might change in the future.
108
109 =cut
110
111 sub autoflush { !0 }
112
113 =item $fh->fileno, $fh->close, $fh->read, $fh->sysread, $fh->syswrite, $fh->print, $fh->printf
114
115 Work like their function equivalents (except read, which works like
116 sysread. You should not use the read function with Coro::Handle's, it will
117 work but it's not efficient).
118
119 =cut
120
121 sub read { Coro::Handle::FH::READ (tied *${$_[0]}, $_[1], $_[2], $_[3]) }
122 sub sysread { Coro::Handle::FH::READ (tied *${$_[0]}, $_[1], $_[2], $_[3]) }
123 sub syswrite { Coro::Handle::FH::WRITE (tied *${$_[0]}, $_[1], $_[2], $_[3]) }
124 sub print { Coro::Handle::FH::WRITE (tied *${+shift}, join "", @_) }
125 sub printf { Coro::Handle::FH::PRINTF (tied *${+shift}, @_) }
126 sub fileno { Coro::Handle::FH::FILENO (tied *${$_[0]}) }
127 sub close { Coro::Handle::FH::CLOSE (tied *${$_[0]}) }
128 sub blocking { !0 } # this handler always blocks the caller
129
130 sub partial {
131 my $obj = tied *${$_[0]};
132
133 my $retval = $obj->[8];
134 $obj->[8] = $_[1] if @_ > 1;
135 $retval
136 }
137
138 =item connect, listen, bind, getsockopt, setsockopt,
139 send, recv, peername, sockname, shutdown, peerport, peerhost
140
141 Do the same thing as the perl builtins or IO::Socket methods (but return
142 true on EINPROGRESS). Remember that these must be method calls.
143
144 =cut
145
146 sub connect { connect tied(*${$_[0]})->[0], $_[1] or $! == EINPROGRESS or $! == EAGAIN or $! == WSAEWOULDBLOCK }
147 sub bind { bind tied(*${$_[0]})->[0], $_[1] }
148 sub listen { listen tied(*${$_[0]})->[0], $_[1] }
149 sub getsockopt { getsockopt tied(*${$_[0]})->[0], $_[1], $_[2] }
150 sub setsockopt { setsockopt tied(*${$_[0]})->[0], $_[1], $_[2], $_[3] }
151 sub send { send tied(*${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
152 sub recv { recv tied(*${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
153 sub sockname { getsockname tied(*${$_[0]})->[0] }
154 sub peername { getpeername tied(*${$_[0]})->[0] }
155 sub shutdown { shutdown tied(*${$_[0]})->[0], $_[1] }
156
157 =item peeraddr, peerhost, peerport
158
159 Return the peer host (as numericla IP address) and peer port (as integer).
160
161 =cut
162
163 sub peeraddr {
164 (AnyEvent::Socket::unpack_sockaddr getpeername tied(*${$_[0]})->[0])[1]
165 }
166
167 sub peerport {
168 (AnyEvent::Socket::unpack_sockaddr getpeername tied(*${$_[0]})->[0])[0]
169 }
170
171 sub peerhost {
172 AnyEvent::Socket::format_address &peeraddr
173 }
174
175 =item ($fh, $peername) = $listen_fh->accept
176
177 In scalar context, returns the newly accepted socket (or undef) and in
178 list context return the ($fh, $peername) pair (or nothing).
179
180 =cut
181
182 sub accept {
183 my ($peername, $fh);
184 while () {
185 $peername = accept $fh, tied(*${$_[0]})->[0]
186 and return wantarray
187 ? ($_[0]->new_from_fh($fh), $peername)
188 : $_[0]->new_from_fh($fh);
189
190 return if $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK;
191
192 $_[0]->readable or return;
193 }
194 }
195
196 =item $fh->timeout ([...])
197
198 The optional argument sets the new timeout (in seconds) for this
199 handle. Returns the current (new) value.
200
201 C<0> is a valid timeout, use C<undef> to disable the timeout.
202
203 =cut
204
205 sub timeout {
206 my $self = tied *${$_[0]};
207 if (@_ > 1) {
208 $self->[2] = $_[1];
209 $self->[5]->timeout ($_[1]) if $self->[5];
210 $self->[6]->timeout ($_[1]) if $self->[6];
211 }
212 $self->[2]
213 }
214
215 =item $fh->fh
216
217 Returns the "real" (non-blocking) filehandle. Use this if you want to
218 do operations on the file handle you cannot do using the Coro::Handle
219 interface.
220
221 =item $fh->rbuf
222
223 Returns the current contents of the read buffer (this is an lvalue, so you
224 can change the read buffer if you like).
225
226 You can use this function to implement your own optimized reader when neither
227 readline nor sysread are viable candidates, like this:
228
229 # first get the _real_ non-blocking filehandle
230 # and fetch a reference to the read buffer
231 my $nb_fh = $fh->fh;
232 my $buf = \$fh->rbuf;
233
234 while () {
235 # now use buffer contents, modifying
236 # if necessary to reflect the removed data
237
238 last if $$buf ne ""; # we have leftover data
239
240 # read another buffer full of data
241 $fh->readable or die "end of file";
242 sysread $nb_fh, $$buf, 8192;
243 }
244
245 =cut
246
247 sub fh {
248 (tied *${$_[0]})->[0];
249 }
250
251 sub rbuf : lvalue {
252 (tied *${$_[0]})->[3];
253 }
254
255 sub DESTROY {
256 # nop
257 }
258
259 our $AUTOLOAD;
260
261 sub AUTOLOAD {
262 my $self = tied *${$_[0]};
263
264 (my $func = $AUTOLOAD) =~ s/^(.*):://;
265
266 my $forward = UNIVERSAL::can $self->[7], $func;
267
268 $forward or
269 die "Can't locate object method \"$func\" via package \"" . (ref $self) . "\"";
270
271 goto &$forward;
272 }
273
274 package Coro::Handle::FH;
275
276 use common::sense;
277
278 use Carp 'croak';
279 use Errno qw(EAGAIN EINTR);
280
281 use AnyEvent::Util qw(WSAEWOULDBLOCK);
282
283 use Coro::AnyEvent;
284
285 # formerly a hash, but we are speed-critical, so try
286 # to be faster even if it hurts.
287 #
288 # 0 FH
289 # 1 desc
290 # 2 timeout
291 # 3 rb
292 # 4 wb # unused
293 # 5 read watcher, if Coro::Event|EV used
294 # 6 write watcher, if Coro::Event|EV used
295 # 7 forward class
296 # 8 blocking
297
298 sub TIEHANDLE {
299 my ($class, %arg) = @_;
300
301 my $self = bless [], $class;
302 $self->[0] = $arg{fh};
303 $self->[1] = $arg{desc};
304 $self->[2] = $arg{timeout};
305 $self->[3] = "";
306 $self->[4] = "";
307 $self->[7] = $arg{forward_class};
308 $self->[8] = $arg{partial};
309
310 AnyEvent::Util::fh_nonblocking $self->[0], 1;
311
312 $self
313 }
314
315 sub cleanup {
316 # gets overriden for Coro::Event
317 @{$_[0]} = ();
318 }
319
320 sub OPEN {
321 &cleanup;
322 my $self = shift;
323 my $r = @_ == 2 ? open $self->[0], $_[0], $_[1]
324 : open $self->[0], $_[0], $_[1], $_[2];
325
326 if ($r) {
327 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
328 or croak "fcntl(O_NONBLOCK): $!";
329 }
330
331 $r
332 }
333
334 sub PRINT {
335 WRITE (shift, join "", @_)
336 }
337
338 sub PRINTF {
339 WRITE (shift, sprintf shift, @_)
340 }
341
342 sub GETC {
343 my $buf;
344 READ ($_[0], $buf, 1);
345 $buf
346 }
347
348 sub BINMODE {
349 binmode $_[0][0];
350 }
351
352 sub TELL {
353 Carp::croak "Coro::Handle's don't support tell()";
354 }
355
356 sub SEEK {
357 Carp::croak "Coro::Handle's don't support seek()";
358 }
359
360 sub EOF {
361 Carp::croak "Coro::Handle's don't support eof()";
362 }
363
364 sub CLOSE {
365 my $fh = $_[0][0];
366 &cleanup;
367 close $fh
368 }
369
370 sub DESTROY {
371 &cleanup;
372 }
373
374 sub FILENO {
375 fileno $_[0][0]
376 }
377
378 # seems to be called for stringification (how weird), at least
379 # when DumpValue::dumpValue is used to print this.
380 sub FETCH {
381 "$_[0]<$_[0][1]>"
382 }
383
384 sub _readable_anyevent {
385 my $cb = Coro::rouse_cb;
386
387 my $w = AE::io $_[0][0], 0, sub { $cb->(1) };
388 my $t = (defined $_[0][2]) && AE::timer $_[0][2], 0, sub { $cb->(0) };
389
390 Coro::rouse_wait
391 }
392
393 sub _writable_anyevent {
394 my $cb = Coro::rouse_cb;
395
396 my $w = AE::io $_[0][0], 1, sub { $cb->(1) };
397 my $t = (defined $_[0][2]) && AE::timer $_[0][2], 0, sub { $cb->(0) };
398
399 Coro::rouse_wait
400 }
401
402 sub _readable_coro {
403 ($_[0][5] ||= "Coro::Event"->io (
404 fd => $_[0][0],
405 desc => "fh $_[0][1] read watcher",
406 timeout => $_[0][2],
407 poll => &Event::Watcher::R + &Event::Watcher::E + &Event::Watcher::T,
408 ))->next->[4] & &Event::Watcher::R
409 }
410
411 sub _writable_coro {
412 ($_[0][6] ||= "Coro::Event"->io (
413 fd => $_[0][0],
414 desc => "fh $_[0][1] write watcher",
415 timeout => $_[0][2],
416 poll => &Event::Watcher::W + &Event::Watcher::E + &Event::Watcher::T,
417 ))->next->[4] & &Event::Watcher::W
418 }
419
420 #sub _readable_ev {
421 # &EV::READ == Coro::EV::timed_io_once (fileno $_[0][0], &EV::READ , $_[0][2])
422 #}
423 #
424 #sub _writable_ev {
425 # &EV::WRITE == Coro::EV::timed_io_once (fileno $_[0][0], &EV::WRITE, $_[0][2])
426 #}
427
428 # decide on event model at runtime
429 for my $rw (qw(readable writable)) {
430 *$rw = sub {
431 AnyEvent::detect;
432 if ($AnyEvent::MODEL eq "AnyEvent::Impl::Event" and eval { require Coro::Event }) {
433 *$rw = \&{"_$rw\_coro"};
434 *cleanup = sub {
435 eval {
436 $_[0][5]->cancel if $_[0][5];
437 $_[0][6]->cancel if $_[0][6];
438 };
439 @{$_[0]} = ();
440 };
441
442 } elsif ($AnyEvent::MODEL eq "AnyEvent::Impl::EV" and eval { require Coro::EV }) {
443 *$rw = \&{"Coro::EV::_$rw\_ev"};
444 return &$rw; # Coro 5.0+ doesn't support goto &SLF, and this line is executed once only
445
446 } else {
447 *$rw = \&{"_$rw\_anyevent"};
448 }
449 goto &$rw
450 };
451 };
452
453 sub WRITE {
454 my $len = defined $_[2] ? $_[2] : length $_[1];
455 my $ofs = $_[3];
456 my $res;
457
458 while () {
459 my $r = syswrite ($_[0][0], $_[1], $len, $ofs);
460 if (defined $r) {
461 $len -= $r;
462 $ofs += $r;
463 $res += $r;
464 last unless $len;
465 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
466 last;
467 }
468 last unless &writable;
469 }
470
471 $res
472 }
473
474 sub READ {
475 my $len = $_[2];
476 my $ofs = $_[3];
477 my $res;
478
479 # first deplete the read buffer
480 if (length $_[0][3]) {
481 my $l = length $_[0][3];
482 if ($l <= $len) {
483 substr ($_[1], $ofs) = $_[0][3]; $_[0][3] = "";
484 $len -= $l;
485 $ofs += $l;
486 $res += $l;
487 return $res unless $len;
488 } else {
489 substr ($_[1], $ofs) = substr ($_[0][3], 0, $len);
490 substr ($_[0][3], 0, $len) = "";
491 return $len;
492 }
493 }
494
495 while() {
496 my $r = sysread $_[0][0], $_[1], $len, $ofs;
497 if (defined $r) {
498 $len -= $r;
499 $ofs += $r;
500 $res += $r;
501 last unless $len && $r;
502 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
503 last;
504 }
505 last if $_[0][8] || !&readable;
506 }
507
508 $res
509 }
510
511 sub READLINE {
512 my $irs = @_ > 1 ? $_[1] : $/;
513 my ($ofs, $len, $pos);
514 my $bufsize = 1020;
515
516 while () {
517 if (length $irs) {
518 $pos = index $_[0][3], $irs, $ofs < 0 ? 0 : $ofs;
519
520 return substr $_[0][3], 0, $pos + length $irs, ""
521 if $pos >= 0;
522
523 $ofs = (length $_[0][3]) - (length $irs);
524 } elsif (defined $irs) {
525 $pos = index $_[0][3], "\n\n", $ofs < 1 ? 1 : $ofs;
526
527 if ($pos >= 0) {
528 my $res = substr $_[0][3], 0, $pos + 2, "";
529 $res =~ s/\A\n+//;
530 return $res;
531 }
532
533 $ofs = (length $_[0][3]) - 1;
534 }
535
536 $len = $bufsize - length $_[0][3];
537 $len = $bufsize *= 2 if $len < $bufsize * 0.5;
538 $len = sysread $_[0][0], $_[0][3], $len, length $_[0][3];
539
540 unless ($len) {
541 if (defined $len) {
542 # EOF
543 return undef unless length $_[0][3];
544
545 $_[0][3] =~ s/\A\n+//
546 if ! length $irs && defined $irs;
547
548 return delete $_[0][3];
549 } elsif (($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) || !&readable) {
550 return length $_[0][3] ? delete $_[0][3] : undef;
551 }
552 }
553 }
554 }
555
556 1;
557
558 =back
559
560 =head1 BUGS
561
562 - Perl's IO-Handle model is THE bug.
563
564 =head1 AUTHOR
565
566 Marc Lehmann <schmorp@schmorp.de>
567 http://home.schmorp.de/
568
569 =cut
570