ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Handle.pm
Revision: 1.146
Committed: Sat May 21 17:09:22 2022 UTC (23 months, 4 weeks ago) by root
Branch: MAIN
CVS Tags: HEAD
Changes since 1.145: +2 -0 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 Coro::Handle - non-blocking I/O with a blocking interface.
4
5 =head1 SYNOPSIS
6
7 use Coro::Handle;
8
9 =head1 DESCRIPTION
10
11 This module is an L<AnyEvent> user, you need to make sure that you use and
12 run a supported event loop.
13
14 This module implements IO-handles in a coroutine-compatible way, that is,
15 other coroutines can run while reads or writes block on the handle.
16
17 It does so by using L<AnyEvent|AnyEvent> to wait for readable/writable
18 data, allowing other coroutines to run while one coroutine waits for I/O.
19
20 Coro::Handle does NOT inherit from IO::Handle but uses tied objects.
21
22 If at all possible, you should I<always> prefer method calls on the handle object over invoking
23 tied methods, i.e.:
24
25 $fh->print ($str); # NOT print $fh $str;
26 my $line = $fh->readline; # NOT my $line = <$fh>;
27
28 The reason is that perl recurses within the interpreter when invoking tie
29 magic, forcing the (temporary) allocation of a (big) stack. If you have
30 lots of socket connections and they happen to wait in e.g. <$fh>, then
31 they would all have a costly C coroutine associated with them.
32
33 =over 4
34
35 =cut
36
37 package Coro::Handle;
38
39 use common::sense;
40
41 use Carp ();
42 use Errno qw(EAGAIN EINTR EINPROGRESS);
43
44 use AnyEvent::Util qw(WSAEWOULDBLOCK WSAEINPROGRESS);
45 use AnyEvent::Socket ();
46
47 use base 'Exporter';
48
49 our $VERSION = 6.57;
50 our @EXPORT = qw(unblock);
51
52 =item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...]
53
54 Create a new non-blocking io-handle using the given
55 perl-filehandle. Returns C<undef> if no filehandle is given. The only
56 other supported argument is "timeout", which sets a timeout for each
57 operation.
58
59 =cut
60
61 sub new_from_fh {
62 my $class = shift;
63 my $fh = shift or return;
64 my $self = do { local *Coro::Handle };
65
66 tie *$self, 'Coro::Handle::FH', fh => $fh, @_;
67
68 bless \$self, ref $class ? ref $class : $class
69 }
70
71 =item $fh = unblock $fh
72
73 This is a convenience function that just calls C<new_from_fh> on the
74 given filehandle. Use it to replace a normal perl filehandle by a
75 non-(coroutine-)blocking equivalent.
76
77 =cut
78
79 sub unblock($) {
80 new_from_fh Coro::Handle $_[0]
81 }
82
83 =item $fh->writable, $fh->readable
84
85 Wait until the filehandle is readable or writable (and return true) or
86 until an error condition happens (and return false).
87
88 =cut
89
90 sub readable { Coro::Handle::FH::readable (tied *${$_[0]}) }
91 sub writable { Coro::Handle::FH::writable (tied *${$_[0]}) }
92
93 =item $fh->readline ([$terminator])
94
95 Similar to the builtin of the same name, but allows you to specify the
96 input record separator in a coroutine-safe manner (i.e. not using a global
97 variable). Paragraph mode is not supported, use "\n\n" to achieve the same
98 effect.
99
100 =cut
101
102 sub readline { tied(*${+shift})->READLINE (@_) }
103
104 =item $fh->autoflush ([...])
105
106 Always returns true, arguments are being ignored (exists for compatibility
107 only). Might change in the future.
108
109 =cut
110
111 sub autoflush { !0 }
112
113 =item $fh->fileno, $fh->close, $fh->read, $fh->sysread, $fh->syswrite, $fh->print, $fh->printf
114
115 Work like their function equivalents (except read, which works like
116 sysread. You should not use the read function with Coro::Handle's, it will
117 work but it's not efficient).
118
119 =cut
120
121 sub read { Coro::Handle::FH::READ (tied *${$_[0]}, $_[1], $_[2], $_[3]) }
122 sub sysread { Coro::Handle::FH::READ (tied *${$_[0]}, $_[1], $_[2], $_[3]) }
123 sub syswrite { Coro::Handle::FH::WRITE (tied *${$_[0]}, $_[1], $_[2], $_[3]) }
124 sub print { Coro::Handle::FH::WRITE (tied *${+shift}, join "", @_) }
125 sub printf { Coro::Handle::FH::PRINTF (tied *${+shift}, @_) }
126 sub fileno { Coro::Handle::FH::FILENO (tied *${$_[0]}) }
127 sub close { Coro::Handle::FH::CLOSE (tied *${$_[0]}) }
128 sub blocking { !0 } # this handler always blocks the caller
129
130 sub partial {
131 my $obj = tied *${$_[0]};
132
133 my $retval = $obj->[8];
134 $obj->[8] = $_[1] if @_ > 1;
135 $retval
136 }
137
138 =item connect, listen, bind, getsockopt, setsockopt,
139 send, recv, peername, sockname, shutdown, peerport, peerhost
140
141 Do the same thing as the perl builtins or IO::Socket methods (but return
142 true on EINPROGRESS). Remember that these must be method calls.
143
144 =cut
145
146 sub connect { connect tied(*${$_[0]})->[0], $_[1] or $! == EINPROGRESS or $! == EAGAIN or $! == WSAEWOULDBLOCK }
147 sub bind { bind tied(*${$_[0]})->[0], $_[1] }
148 sub listen { listen tied(*${$_[0]})->[0], $_[1] }
149 sub getsockopt { getsockopt tied(*${$_[0]})->[0], $_[1], $_[2] }
150 sub setsockopt { setsockopt tied(*${$_[0]})->[0], $_[1], $_[2], $_[3] }
151 sub send { send tied(*${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
152 sub recv { recv tied(*${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
153 sub sockname { getsockname tied(*${$_[0]})->[0] }
154 sub peername { getpeername tied(*${$_[0]})->[0] }
155 sub shutdown { shutdown tied(*${$_[0]})->[0], $_[1] }
156
157 *connected = \&peername;
158
159 =item peeraddr, peerhost, peerport
160
161 Return the peer host (as numericla IP address) and peer port (as integer).
162
163 =cut
164
165 sub peeraddr {
166 (AnyEvent::Socket::unpack_sockaddr getpeername tied(*${$_[0]})->[0])[1]
167 }
168
169 sub peerport {
170 (AnyEvent::Socket::unpack_sockaddr getpeername tied(*${$_[0]})->[0])[0]
171 }
172
173 sub peerhost {
174 AnyEvent::Socket::format_address &peeraddr
175 }
176
177 =item ($fh, $peername) = $listen_fh->accept
178
179 In scalar context, returns the newly accepted socket (or undef) and in
180 list context return the ($fh, $peername) pair (or nothing).
181
182 =cut
183
184 sub accept {
185 my ($peername, $fh);
186 while () {
187 $peername = accept $fh, tied(*${$_[0]})->[0]
188 and return wantarray
189 ? ($_[0]->new_from_fh($fh), $peername)
190 : $_[0]->new_from_fh($fh);
191
192 return if $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK;
193
194 $_[0]->readable or return;
195 }
196 }
197
198 =item $fh->timeout ([...])
199
200 The optional argument sets the new timeout (in seconds) for this
201 handle. Returns the current (new) value.
202
203 C<0> is a valid timeout, use C<undef> to disable the timeout.
204
205 =cut
206
207 sub timeout {
208 my $self = tied *${$_[0]};
209 if (@_ > 1) {
210 $self->[2] = $_[1];
211 $self->[5]->timeout ($_[1]) if $self->[5];
212 $self->[6]->timeout ($_[1]) if $self->[6];
213 }
214 $self->[2]
215 }
216
217 =item $fh->fh
218
219 Returns the "real" (non-blocking) filehandle. Use this if you want to
220 do operations on the file handle you cannot do using the Coro::Handle
221 interface.
222
223 =item $fh->rbuf
224
225 Returns the current contents of the read buffer (this is an lvalue, so you
226 can change the read buffer if you like).
227
228 You can use this function to implement your own optimized reader when neither
229 readline nor sysread are viable candidates, like this:
230
231 # first get the _real_ non-blocking filehandle
232 # and fetch a reference to the read buffer
233 my $nb_fh = $fh->fh;
234 my $buf = \$fh->rbuf;
235
236 while () {
237 # now use buffer contents, modifying
238 # if necessary to reflect the removed data
239
240 last if $$buf ne ""; # we have leftover data
241
242 # read another buffer full of data
243 $fh->readable or die "end of file";
244 sysread $nb_fh, $$buf, 8192;
245 }
246
247 =cut
248
249 sub fh {
250 (tied *${$_[0]})->[0];
251 }
252
253 sub rbuf : lvalue {
254 (tied *${$_[0]})->[3];
255 }
256
257 sub DESTROY {
258 # nop
259 }
260
261 our $AUTOLOAD;
262
263 sub AUTOLOAD {
264 my $self = tied *${$_[0]};
265
266 (my $func = $AUTOLOAD) =~ s/^(.*):://;
267
268 my $forward = UNIVERSAL::can $self->[7], $func;
269
270 $forward or
271 die "Can't locate object method \"$func\" via package \"" . (ref $self) . "\"";
272
273 goto &$forward;
274 }
275
276 package Coro::Handle::FH;
277
278 use common::sense;
279
280 use Carp 'croak';
281 use Errno qw(EAGAIN EINTR);
282
283 use AnyEvent::Util qw(WSAEWOULDBLOCK);
284
285 use Coro::AnyEvent;
286
287 # formerly a hash, but we are speed-critical, so try
288 # to be faster even if it hurts.
289 #
290 # 0 FH
291 # 1 desc
292 # 2 timeout
293 # 3 rb
294 # 4 wb # unused
295 # 5 read watcher, if Coro::Event|EV used
296 # 6 write watcher, if Coro::Event|EV used
297 # 7 forward class
298 # 8 blocking
299
300 sub TIEHANDLE {
301 my ($class, %arg) = @_;
302
303 my $self = bless [], $class;
304 $self->[0] = $arg{fh};
305 $self->[1] = $arg{desc};
306 $self->[2] = $arg{timeout};
307 $self->[3] = "";
308 $self->[4] = "";
309 $self->[5] = undef; # work around changes in 5.20, which requires initialisation
310 $self->[6] = undef; # work around changes in 5.20, which requires initialisation
311 $self->[7] = $arg{forward_class};
312 $self->[8] = $arg{partial};
313
314 AnyEvent::Util::fh_nonblocking $self->[0], 1;
315
316 $self
317 }
318
319 sub cleanup {
320 # gets overriden for Coro::Event
321 @{$_[0]} = ();
322 }
323
324 sub OPEN {
325 &cleanup;
326 my $self = shift;
327 my $r = @_ == 2 ? open $self->[0], $_[0], $_[1]
328 : open $self->[0], $_[0], $_[1], $_[2];
329
330 if ($r) {
331 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
332 or croak "fcntl(O_NONBLOCK): $!";
333 }
334
335 $r
336 }
337
338 sub PRINT {
339 WRITE (shift, join "", @_)
340 }
341
342 sub PRINTF {
343 WRITE (shift, sprintf shift, @_)
344 }
345
346 sub GETC {
347 my $buf;
348 READ ($_[0], $buf, 1);
349 $buf
350 }
351
352 sub BINMODE {
353 binmode $_[0][0];
354 }
355
356 sub TELL {
357 Carp::croak "Coro::Handle's don't support tell()";
358 }
359
360 sub SEEK {
361 Carp::croak "Coro::Handle's don't support seek()";
362 }
363
364 sub EOF {
365 Carp::croak "Coro::Handle's don't support eof()";
366 }
367
368 sub CLOSE {
369 my $fh = $_[0][0];
370 &cleanup;
371 close $fh
372 }
373
374 sub DESTROY {
375 &cleanup;
376 }
377
378 sub FILENO {
379 fileno $_[0][0]
380 }
381
382 # seems to be called for stringification (how weird), at least
383 # when DumpValue::dumpValue is used to print this.
384 sub FETCH {
385 "$_[0]<$_[0][1]>"
386 }
387
388 sub _readable_anyevent {
389 my $cb = Coro::rouse_cb;
390
391 my $w = AE::io $_[0][0], 0, sub { $cb->(1) };
392 my $t = (defined $_[0][2]) && AE::timer $_[0][2], 0, sub { $cb->(0) };
393
394 Coro::rouse_wait
395 }
396
397 sub _writable_anyevent {
398 my $cb = Coro::rouse_cb;
399
400 my $w = AE::io $_[0][0], 1, sub { $cb->(1) };
401 my $t = (defined $_[0][2]) && AE::timer $_[0][2], 0, sub { $cb->(0) };
402
403 Coro::rouse_wait
404 }
405
406 sub _readable_coro {
407 ($_[0][5] ||= "Coro::Event"->io (
408 fd => $_[0][0],
409 desc => "fh $_[0][1] read watcher",
410 timeout => $_[0][2],
411 poll => &Event::Watcher::R + &Event::Watcher::E + &Event::Watcher::T,
412 ))->next->[4] & &Event::Watcher::R
413 }
414
415 sub _writable_coro {
416 ($_[0][6] ||= "Coro::Event"->io (
417 fd => $_[0][0],
418 desc => "fh $_[0][1] write watcher",
419 timeout => $_[0][2],
420 poll => &Event::Watcher::W + &Event::Watcher::E + &Event::Watcher::T,
421 ))->next->[4] & &Event::Watcher::W
422 }
423
424 #sub _readable_ev {
425 # &EV::READ == Coro::EV::timed_io_once (fileno $_[0][0], &EV::READ , $_[0][2])
426 #}
427 #
428 #sub _writable_ev {
429 # &EV::WRITE == Coro::EV::timed_io_once (fileno $_[0][0], &EV::WRITE, $_[0][2])
430 #}
431
432 # decide on event model at runtime
433 for my $rw (qw(readable writable)) {
434 *$rw = sub {
435 AnyEvent::detect;
436 if ($AnyEvent::MODEL eq "AnyEvent::Impl::Event" and eval { require Coro::Event }) {
437 *$rw = \&{"_$rw\_coro"};
438 *cleanup = sub {
439 eval {
440 $_[0][5]->cancel if $_[0][5];
441 $_[0][6]->cancel if $_[0][6];
442 };
443 @{$_[0]} = ();
444 };
445
446 } elsif ($AnyEvent::MODEL eq "AnyEvent::Impl::EV" and eval { require Coro::EV }) {
447 *$rw = \&{"Coro::EV::_$rw\_ev"};
448 return &$rw; # Coro 5.0+ doesn't support goto &SLF, and this line is executed once only
449
450 } else {
451 *$rw = \&{"_$rw\_anyevent"};
452 }
453 goto &$rw
454 };
455 };
456
457 sub WRITE {
458 my $len = defined $_[2] ? $_[2] : length $_[1];
459 my $ofs = $_[3];
460 my $res;
461
462 while () {
463 my $r = syswrite ($_[0][0], $_[1], $len, $ofs);
464 if (defined $r) {
465 $len -= $r;
466 $ofs += $r;
467 $res += $r;
468 last unless $len;
469 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
470 last;
471 }
472 last unless &writable;
473 }
474
475 $res
476 }
477
478 sub READ {
479 my $len = $_[2];
480 my $ofs = $_[3];
481 my $res;
482
483 # first deplete the read buffer
484 if (length $_[0][3]) {
485 my $l = length $_[0][3];
486 if ($l <= $len) {
487 substr ($_[1], $ofs) = $_[0][3]; $_[0][3] = "";
488 $len -= $l;
489 $ofs += $l;
490 $res += $l;
491 return $res unless $len;
492 } else {
493 substr ($_[1], $ofs) = substr ($_[0][3], 0, $len);
494 substr ($_[0][3], 0, $len) = "";
495 return $len;
496 }
497 }
498
499 while() {
500 my $r = sysread $_[0][0], $_[1], $len, $ofs;
501 if (defined $r) {
502 $len -= $r;
503 $ofs += $r;
504 $res += $r;
505 last unless $len && $r;
506 } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
507 last;
508 }
509 last if $_[0][8] || !&readable;
510 }
511
512 $res
513 }
514
515 sub READLINE {
516 my $irs = @_ > 1 ? $_[1] : $/;
517 my ($ofs, $len, $pos);
518 my $bufsize = 1020;
519
520 while () {
521 if (length $irs) {
522 $pos = index $_[0][3], $irs, $ofs < 0 ? 0 : $ofs;
523
524 return substr $_[0][3], 0, $pos + length $irs, ""
525 if $pos >= 0;
526
527 $ofs = (length $_[0][3]) - (length $irs);
528 } elsif (defined $irs) {
529 $pos = index $_[0][3], "\n\n", $ofs < 1 ? 1 : $ofs;
530
531 if ($pos >= 0) {
532 my $res = substr $_[0][3], 0, $pos + 2, "";
533 $res =~ s/\A\n+//;
534 return $res;
535 }
536
537 $ofs = (length $_[0][3]) - 1;
538 }
539
540 $len = $bufsize - length $_[0][3];
541 $len = $bufsize *= 2 if $len < $bufsize * 0.5;
542 $len = sysread $_[0][0], $_[0][3], $len, length $_[0][3];
543
544 unless ($len) {
545 if (defined $len) {
546 # EOF
547 return undef unless length $_[0][3];
548
549 $_[0][3] =~ s/\A\n+//
550 if ! length $irs && defined $irs;
551
552 return delete $_[0][3];
553 } elsif (($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) || !&readable) {
554 return length $_[0][3] ? delete $_[0][3] : undef;
555 }
556 }
557 }
558 }
559
560 1;
561
562 =back
563
564 =head1 BUGS
565
566 - Perl's IO-Handle model is THE bug.
567
568 =head1 AUTHOR/SUPPORT/CONTACT
569
570 Marc A. Lehmann <schmorp@schmorp.de>
571 http://software.schmorp.de/pkg/Coro.html
572
573 =cut
574