ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Handle.pm
Revision: 1.125
Committed: Thu Jun 4 22:58:28 2015 UTC (9 years ago) by root
Branch: MAIN
CVS Tags: rel-6_43
Changes since 1.124: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 Coro::Handle - non-blocking I/O with a blocking interface.
4
5 =head1 SYNOPSIS
6
7 use Coro::Handle;
8
9 =head1 DESCRIPTION
10
11 This module is an L<AnyEvent> user, you need to make sure that you use and
12 run a supported event loop.
13
14 This module implements IO-handles in a coroutine-compatible way, that is,
15 other coroutines can run while reads or writes block on the handle.
16
17 It does so by using L<AnyEvent|AnyEvent> to wait for readable/writable
18 data, allowing other coroutines to run while one coroutine waits for I/O.
19
20 Coro::Handle does NOT inherit from IO::Handle but uses tied objects.
21
22 If at all possible, you should I<always> prefer method calls on the handle object over invoking
23 tied methods, i.e.:
24
25 $fh->print ($str); # NOT print $fh $str;
26 my $line = $fh->readline; # NOT my $line = <$fh>;
27
28 The reason is that perl recurses within the interpreter when invoking tie
29 magic, forcing the (temporary) allocation of a (big) stack. If you have
30 lots of socket connections and they happen to wait in e.g. <$fh>, then
31 they would all have a costly C coroutine associated with them.
32
33 =over 4
34
35 =cut
36
37 package Coro::Handle;
38
39 use common::sense;
40
41 use Carp ();
42 use Errno qw(EAGAIN EINTR EINPROGRESS);
43
44 use AnyEvent::Util qw(WSAEWOULDBLOCK WSAEINPROGRESS);
45 use AnyEvent::Socket ();
46
47 use base 'Exporter';
48
49 our $VERSION = 6.43;
50 our @EXPORT = qw(unblock);
51
52 =item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...]
53
54 Create a new non-blocking io-handle using the given
55 perl-filehandle. Returns C<undef> if no filehandle is given. The only
56 other supported argument is "timeout", which sets a timeout for each
57 operation.
58
59 =cut
60
61 sub new_from_fh {
62 my $class = shift;
63 my $fh = shift or return;
64 my $self = do { local *Coro::Handle };
65
66 tie *$self, 'Coro::Handle::FH', fh => $fh, @_;
67
68 bless \$self, ref $class ? ref $class : $class
69 }
70
71 =item $fh = unblock $fh
72
73 This is a convenience function that just calls C<new_from_fh> on the
74 given filehandle. Use it to replace a normal perl filehandle by a
75 non-(coroutine-)blocking equivalent.
76
77 =cut
78
79 sub unblock($) {
80 new_from_fh Coro::Handle $_[0]
81 }
82
83 =item $fh->writable, $fh->readable
84
85 Wait until the filehandle is readable or writable (and return true) or
86 until an error condition happens (and return false).
87
88 =cut
89
90 sub readable { Coro::Handle::FH::readable (tied *${$_[0]}) }
91 sub writable { Coro::Handle::FH::writable (tied *${$_[0]}) }
92
93 =item $fh->readline ([$terminator])
94
95 Similar to the builtin of the same name, but allows you to specify the
96 input record separator in a coroutine-safe manner (i.e. not using a global
97 variable). Paragraph mode is not supported, use "\n\n" to achieve the same
98 effect.
99
100 =cut
101
102 sub readline { tied(*${+shift})->READLINE (@_) }
103
104 =item $fh->autoflush ([...])
105
106 Always returns true, arguments are being ignored (exists for compatibility
107 only). Might change in the future.
108
109 =cut
110
111 sub autoflush { !0 }
112
113 =item $fh->fileno, $fh->close, $fh->read, $fh->sysread, $fh->syswrite, $fh->print, $fh->printf
114
115 Work like their function equivalents (except read, which works like
116 sysread. You should not use the read function with Coro::Handle's, it will
117 work but it's not efficient).
118
119 =cut
120
121 sub read { Coro::Handle::FH::READ (tied *${$_[0]}, $_[1], $_[2], $_[3]) }
122 sub sysread { Coro::Handle::FH::READ (tied *${$_[0]}, $_[1], $_[2], $_[3]) }
123 sub syswrite { Coro::Handle::FH::WRITE (tied *${$_[0]}, $_[1], $_[2], $_[3]) }
124 sub print { Coro::Handle::FH::WRITE (tied *${+shift}, join "", @_) }
125 sub printf { Coro::Handle::FH::PRINTF (tied *${+shift}, @_) }
126 sub fileno { Coro::Handle::FH::FILENO (tied *${$_[0]}) }
127 sub close { Coro::Handle::FH::CLOSE (tied *${$_[0]}) }
128 sub blocking { !0 } # this handler always blocks the caller
129
130 sub partial {
131 my $obj = tied *${$_[0]};
132
133 my $retval = $obj->[8];
134 $obj->[8] = $_[1] if @_ > 1;
135 $retval
136 }
137
138 =item connect, listen, bind, getsockopt, setsockopt,
139 send, recv, peername, sockname, shutdown, peerport, peerhost
140
141 Do the same thing as the perl builtins or IO::Socket methods (but return
142 true on EINPROGRESS). Remember that these must be method calls.
143
144 =cut
145
146 sub connect { connect tied(*${$_[0]})->[0], $_[1] or $! == EINPROGRESS or $! == EAGAIN or $! == WSAEWOULDBLOCK }
147 sub bind { bind tied(*${$_[0]})->[0], $_[1] }
148 sub listen { listen tied(*${$_[0]})->[0], $_[1] }
149 sub getsockopt { getsockopt tied(*${$_[0]})->[0], $_[1], $_[2] }
150 sub setsockopt { setsockopt tied(*${$_[0]})->[0], $_[1], $_[2], $_[3] }
151 sub send { send tied(*${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
152 sub recv { recv tied(*${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
153 sub sockname { getsockname tied(*${$_[0]})->[0] }
154 sub peername { getpeername tied(*${$_[0]})->[0] }
155 sub shutdown { shutdown tied(*${$_[0]})->[0], $_[1] }
156
157 =item peeraddr, peerhost, peerport
158
159 Return the peer host (as numericla IP address) and peer port (as integer).
160
161 =cut
162
163 sub peeraddr {
164 (AnyEvent::Socket::unpack_sockaddr getpeername tied(*${$_[0]})->[0])[1]
165 }
166
167 sub peerport {
168 (AnyEvent::Socket::unpack_sockaddr getpeername tied(*${$_[0]})->[0])[0]
169 }
170
171 sub peerhost {
172 AnyEvent::Socket::format_address &peeraddr
173 }
174
175 =item ($fh, $peername) = $listen_fh->accept
176
177 In scalar context, returns the newly accepted socket (or undef) and in
178 list context return the ($fh, $peername) pair (or nothing).
179
180 =cut
181
182 sub accept {
183 my ($peername, $fh);
184 while () {
185 $peername = accept $fh, tied(*${$_[0]})->[0]
186 and return wantarray
187 ? ($_[0]->new_from_fh($fh), $peername)
188 : $_[0]->new_from_fh($fh);
189
190 return if $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK;
191
192 $_[0]->readable or return;
193 }
194 }
195
196 =item $fh->timeout ([...])
197
198 The optional argument sets the new timeout (in seconds) for this
199 handle. Returns the current (new) value.
200
201 C<0> is a valid timeout, use C<undef> to disable the timeout.
202
203 =cut
204
205 sub timeout {
206 my $self = tied *${$_[0]};
207 if (@_ > 1) {
208 $self->[2] = $_[1];
209 $self->[5]->timeout ($_[1]) if $self->[5];
210 $self->[6]->timeout ($_[1]) if $self->[6];
211 }
212 $self->[2]
213 }
214
215 =item $fh->fh
216
217 Returns the "real" (non-blocking) filehandle. Use this if you want to
218 do operations on the file handle you cannot do using the Coro::Handle
219 interface.
220
221 =item $fh->rbuf
222
223 Returns the current contents of the read buffer (this is an lvalue, so you
224 can change the read buffer if you like).
225
226 You can use this function to implement your own optimized reader when neither
227 readline nor sysread are viable candidates, like this:
228
229 # first get the _real_ non-blocking filehandle
230 # and fetch a reference to the read buffer
231 my $nb_fh = $fh->fh;
232 my $buf = \$fh->rbuf;
233
234 while () {
235 # now use buffer contents, modifying
236 # if necessary to reflect the removed data
237
238 last if $$buf ne ""; # we have leftover data
239
240 # read another buffer full of data
241 $fh->readable or die "end of file";
242 sysread $nb_fh, $$buf, 8192;
243 }
244
245 =cut
246
247 sub fh {
248 (tied *${$_[0]})->[0];
249 }
250
251 sub rbuf : lvalue {
252 (tied *${$_[0]})->[3];
253 }
254
255 sub DESTROY {
256 # nop
257 }
258
259 our $AUTOLOAD;
260
261 sub AUTOLOAD {
262 my $self = tied *${$_[0]};
263
264 (my $func = $AUTOLOAD) =~ s/^(.*):://;
265
266 my $forward = UNIVERSAL::can $self->[7], $func;
267
268 $forward or
269 die "Can't locate object method \"$func\" via package \"" . (ref $self) . "\"";
270
271 goto &$forward;
272 }
273
274 package Coro::Handle::FH;
275
276 use common::sense;
277
278 use Carp 'croak';
279 use Errno qw(EAGAIN EINTR);
280
281 use AnyEvent::Util qw(WSAEWOULDBLOCK);
282
283 use Coro::AnyEvent;
284
285 # formerly a hash, but we are speed-critical, so try
286 # to be faster even if it hurts.
287 #
288 # 0 FH
289 # 1 desc
290 # 2 timeout
291 # 3 rb
292 # 4 wb # unused
293 # 5 read watcher, if Coro::Event|EV used
294 # 6 write watcher, if Coro::Event|EV used
295 # 7 forward class
296 # 8 blocking
297
298 sub TIEHANDLE {
299 my ($class, %arg) = @_;
300
301 my $self = bless [], $class;
302 $self->[0] = $arg{fh};
303 $self->[1] = $arg{desc};
304 $self->[2] = $arg{timeout};
305 $self->[3] = "";
306 $self->[4] = "";
307 $self->[5] = undef; # work around changes in 5.20, which requires initialisation
308 $self->[6] = undef; # work around changes in 5.20, which requires initialisation
309 $self->[7] = $arg{forward_class};
310 $self->[8] = $arg{partial};
311
312 AnyEvent::Util::fh_nonblocking $self->[0], 1;
313
314 $self
315 }
316
317 sub cleanup {
318 # gets overriden for Coro::Event
319 @{$_[0]} = ();
320 }
321
322 sub OPEN {
323 &cleanup;
324 my $self = shift;
325 my $r = @_ == 2 ? open $self->[0], $_[0], $_[1]
326 : open $self->[0], $_[0], $_[1], $_[2];
327
328 if ($r) {
329 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
330 or croak "fcntl(O_NONBLOCK): $!";
331 }
332
333 $r
334 }
335
336 sub PRINT {
337 WRITE (shift, join "", @_)
338 }
339
340 sub PRINTF {
341 WRITE (shift, sprintf shift, @_)
342 }
343
344 sub GETC {
345 my $buf;
346 READ ($_[0], $buf, 1);
347 $buf
348 }
349
350 sub BINMODE {
351 binmode $_[0][0];
352 }
353
354 sub TELL {
355 Carp::croak "Coro::Handle's don't support tell()";
356 }
357
358 sub SEEK {
359 Carp::croak "Coro::Handle's don't support seek()";
360 }
361
362 sub EOF {
363 Carp::croak "Coro::Handle's don't support eof()";
364 }
365
366 sub CLOSE {
367 my $fh = $_[0][0];
368 &cleanup;
369 close $fh
370 }
371
372 sub DESTROY {
373 &cleanup;
374 }
375
376 sub FILENO {
377 fileno $_[0][0]
378 }
379
380 # seems to be called for stringification (how weird), at least
381 # when DumpValue::dumpValue is used to print this.
382 sub FETCH {
383 "$_[0]<$_[0][1]>"
384 }
385
386 sub _readable_anyevent {
387 my $cb = Coro::rouse_cb;
388
389 my $w = AE::io $_[0][0], 0, sub { $cb->(1) };
390 my $t = (defined $_[0][2]) && AE::timer $_[0][2], 0, sub { $cb->(0) };
391
392 Coro::rouse_wait
393 }
394
395 sub _writable_anyevent {
396 my $cb = Coro::rouse_cb;
397
398 my $w = AE::io $_[0][0], 1, sub { $cb->(1) };
399 my $t = (defined $_[0][2]) && AE::timer $_[0][2], 0, sub { $cb->(0) };
400
401 Coro::rouse_wait
402 }
403
404 sub _readable_coro {
405 ($_[0][5] ||= "Coro::Event"->io (
406 fd => $_[0][0],
407 desc => "fh $_[0][1] read watcher",
408 timeout => $_[0][2],
409 poll => &Event::Watcher::R + &Event::Watcher::E + &Event::Watcher::T,
410 ))->next->[4] & &Event::Watcher::R
411 }
412
413 sub _writable_coro {
414 ($_[0][6] ||= "Coro::Event"->io (
415 fd => $_[0][0],
416 desc => "fh $_[0][1] write watcher",
417 timeout => $_[0][2],
418 poll => &Event::Watcher::W + &Event::Watcher::E + &Event::Watcher::T,
419 ))->next->[4] & &Event::Watcher::W
420 }
421
422 #sub _readable_ev {
423 # &EV::READ == Coro::EV::timed_io_once (fileno $_[0][0], &EV::READ , $_[0][2])
424 #}
425 #
426 #sub _writable_ev {
427 # &EV::WRITE == Coro::EV::timed_io_once (fileno $_[0][0], &EV::WRITE, $_[0][2])
428 #}
429
430 # decide on event model at runtime
431 for my $rw (qw(readable writable)) {
432 *$rw = sub {
433 AnyEvent::detect;
434 if ($AnyEvent::MODEL eq "AnyEvent::Impl::Event" and eval { require Coro::Event }) {
435 *$rw = \&{"_$rw\_coro"};
436 *cleanup = sub {
437 eval {
438 $_[0][5]->cancel if $_[0][5];
439 $_[0][6]->cancel if $_[0][6];
440 };
441 @{$_[0]} = ();
442 };
443
444 } elsif ($AnyEvent::MODEL eq "AnyEvent::Impl::EV" and eval { require Coro::EV }) {
445 *$rw = \&{"Coro::EV::_$rw\_ev"};
446 return &$rw; # Coro 5.0+ doesn't support goto &SLF, and this line is executed once only
447
448 } else {
449 *$rw = \&{"_$rw\_anyevent"};
450 }
451 goto &$rw
452 };
453 };
454
455 sub WRITE {
456 my $len = defined $_[2] ? $_[2] : length $_[1];
457 my $ofs = $_[3];
458 my $res;
459
460 while () {
461 my $r = syswrite ($_[0][0], $_[1], $len, $ofs);
462 if (defined $r) {
463 $len -= $r;
464 $ofs += $r;
465 $res += $r;
466 last unless $len;
467 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
468 last;
469 }
470 last unless &writable;
471 }
472
473 $res
474 }
475
476 sub READ {
477 my $len = $_[2];
478 my $ofs = $_[3];
479 my $res;
480
481 # first deplete the read buffer
482 if (length $_[0][3]) {
483 my $l = length $_[0][3];
484 if ($l <= $len) {
485 substr ($_[1], $ofs) = $_[0][3]; $_[0][3] = "";
486 $len -= $l;
487 $ofs += $l;
488 $res += $l;
489 return $res unless $len;
490 } else {
491 substr ($_[1], $ofs) = substr ($_[0][3], 0, $len);
492 substr ($_[0][3], 0, $len) = "";
493 return $len;
494 }
495 }
496
497 while() {
498 my $r = sysread $_[0][0], $_[1], $len, $ofs;
499 if (defined $r) {
500 $len -= $r;
501 $ofs += $r;
502 $res += $r;
503 last unless $len && $r;
504 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
505 last;
506 }
507 last if $_[0][8] || !&readable;
508 }
509
510 $res
511 }
512
513 sub READLINE {
514 my $irs = @_ > 1 ? $_[1] : $/;
515 my ($ofs, $len, $pos);
516 my $bufsize = 1020;
517
518 while () {
519 if (length $irs) {
520 $pos = index $_[0][3], $irs, $ofs < 0 ? 0 : $ofs;
521
522 return substr $_[0][3], 0, $pos + length $irs, ""
523 if $pos >= 0;
524
525 $ofs = (length $_[0][3]) - (length $irs);
526 } elsif (defined $irs) {
527 $pos = index $_[0][3], "\n\n", $ofs < 1 ? 1 : $ofs;
528
529 if ($pos >= 0) {
530 my $res = substr $_[0][3], 0, $pos + 2, "";
531 $res =~ s/\A\n+//;
532 return $res;
533 }
534
535 $ofs = (length $_[0][3]) - 1;
536 }
537
538 $len = $bufsize - length $_[0][3];
539 $len = $bufsize *= 2 if $len < $bufsize * 0.5;
540 $len = sysread $_[0][0], $_[0][3], $len, length $_[0][3];
541
542 unless ($len) {
543 if (defined $len) {
544 # EOF
545 return undef unless length $_[0][3];
546
547 $_[0][3] =~ s/\A\n+//
548 if ! length $irs && defined $irs;
549
550 return delete $_[0][3];
551 } elsif (($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) || !&readable) {
552 return length $_[0][3] ? delete $_[0][3] : undef;
553 }
554 }
555 }
556 }
557
558 1;
559
560 =back
561
562 =head1 BUGS
563
564 - Perl's IO-Handle model is THE bug.
565
566 =head1 AUTHOR
567
568 Marc Lehmann <schmorp@schmorp.de>
569 http://home.schmorp.de/
570
571 =cut
572