| 1 |
=head1 NAME |
| 2 |
|
| 3 |
Coro::Handle - non-blocking io with a blocking interface. |
| 4 |
|
| 5 |
=head1 SYNOPSIS |
| 6 |
|
| 7 |
use Coro::Handle; |
| 8 |
|
| 9 |
=head1 DESCRIPTION |
| 10 |
|
| 11 |
This module implements IO-handles in a coroutine-compatible way, that is, |
| 12 |
other coroutines can run while reads or writes block on the handle. It |
| 13 |
does NOT inherit from IO::Handle but uses tied objects. |
| 14 |
|
| 15 |
=over 4 |
| 16 |
|
| 17 |
=cut |
| 18 |
|
| 19 |
package Coro::Handle; |
| 20 |
|
| 21 |
BEGIN { eval { require warnings } && warnings->unimport ("uninitialized") } |
| 22 |
|
| 23 |
use Errno (); |
| 24 |
use base 'Exporter'; |
| 25 |
|
| 26 |
$VERSION = 1.9; |
| 27 |
|
| 28 |
@EXPORT = qw(unblock); |
| 29 |
|
| 30 |
=item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...] |
| 31 |
|
| 32 |
Create a new non-blocking io-handle using the given |
| 33 |
perl-filehandle. Returns undef if no fhandle is given. The only other |
| 34 |
supported argument is "timeout", which sets a timeout for each operation. |
| 35 |
|
| 36 |
=cut |
| 37 |
|
| 38 |
sub new_from_fh { |
| 39 |
my $class = shift; |
| 40 |
my $fh = shift or return; |
| 41 |
my $self = do { local *Coro::Handle }; |
| 42 |
|
| 43 |
my ($package, $filename, $line) = caller; |
| 44 |
$filename =~ s/^.*[\/\\]//; |
| 45 |
|
| 46 |
tie $self, Coro::Handle::FH, fh => $fh, desc => "$filename:$line", @_; |
| 47 |
|
| 48 |
my $_fh = select bless \$self, ref $class ? ref $class : $class; $| = 1; select $_fh; |
| 49 |
} |
| 50 |
|
| 51 |
=item $fh = unblock $fh |
| 52 |
|
| 53 |
This is a convinience function that just calls C<new_from_fh> on the given |
| 54 |
filehandle. Use it to replace a normal perl filehandle by a non-blocking |
| 55 |
equivalent. |
| 56 |
|
| 57 |
=cut |
| 58 |
|
| 59 |
sub unblock($) { |
| 60 |
new_from_fh Coro::Handle $_[0]; |
| 61 |
} |
| 62 |
|
| 63 |
=item $fh->writable, $fh->readable |
| 64 |
|
| 65 |
Wait until the filehandle is readable or writable (and return true) or |
| 66 |
until an error condition happens (and return false). |
| 67 |
|
| 68 |
=cut |
| 69 |
|
| 70 |
sub readable { Coro::Handle::FH::readable(tied ${$_[0]}) } |
| 71 |
sub writable { Coro::Handle::FH::writable(tied ${$_[0]}) } |
| 72 |
|
| 73 |
=item $fh->readline([$terminator]) |
| 74 |
|
| 75 |
Like the builtin of the same name, but allows you to specify the input |
| 76 |
record separator in a coroutine-safe manner (i.e. not using a global |
| 77 |
variable). |
| 78 |
|
| 79 |
=cut |
| 80 |
|
| 81 |
sub readline { tied(${+shift})->READLINE(@_) } |
| 82 |
|
| 83 |
=item $fh->autoflush([...]) |
| 84 |
|
| 85 |
Always returns true, arguments are being ignored (exists for compatibility |
| 86 |
only). Might change in the future. |
| 87 |
|
| 88 |
=cut |
| 89 |
|
| 90 |
sub autoflush { !0 } |
| 91 |
|
| 92 |
=item $fh->fileno, $fh->close, $fh->read, $fh->sysread, $fh->syswrite, $fh->print, $fh->printf |
| 93 |
|
| 94 |
Work like their function equivalents (except read, which works like |
| 95 |
sysread. You should not use the read function with Coro::Handles, it will |
| 96 |
work but it's not efficient). |
| 97 |
|
| 98 |
=cut |
| 99 |
|
| 100 |
sub read { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) } |
| 101 |
sub sysread { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) } |
| 102 |
sub syswrite { Coro::Handle::FH::WRITE (tied ${$_[0]}, $_[1], $_[2], $_[3]) } |
| 103 |
sub print { Coro::Handle::FH::WRITE (tied ${+shift}, join "", @_) } |
| 104 |
sub printf { Coro::Handle::FH::PRINTF (tied ${+shift}, @_) } |
| 105 |
sub fileno { Coro::Handle::FH::FILENO (tied ${$_[0]}) } |
| 106 |
sub close { Coro::Handle::FH::CLOSE (tied ${$_[0]}) } |
| 107 |
sub blocking { !0 } # this handler always blocks the caller |
| 108 |
|
| 109 |
sub partial { |
| 110 |
my $obj = tied ${$_[0]}; |
| 111 |
|
| 112 |
my $retval = $obj->[8]; |
| 113 |
$obj->[8] = $_[1] if @_ > 1; |
| 114 |
$retval |
| 115 |
} |
| 116 |
|
| 117 |
=item $fh->timeout([...]) |
| 118 |
|
| 119 |
The optional argument sets the new timeout (in seconds) for this |
| 120 |
handle. Returns the current (new) value. |
| 121 |
|
| 122 |
C<0> is a valid timeout, use C<undef> to disable the timeout. |
| 123 |
|
| 124 |
=cut |
| 125 |
|
| 126 |
sub timeout { |
| 127 |
my $self = tied(${$_[0]}); |
| 128 |
if (@_ > 1) { |
| 129 |
$self->[2] = $_[1]; |
| 130 |
$self->[5]->timeout($_[1]) if $self->[5]; |
| 131 |
$self->[6]->timeout($_[1]) if $self->[6]; |
| 132 |
} |
| 133 |
$self->[2]; |
| 134 |
} |
| 135 |
|
| 136 |
=item $fh->fh |
| 137 |
|
| 138 |
Returns the "real" (non-blocking) filehandle. Use this if you want to |
| 139 |
do operations on the file handle you cannot do using the Coro::Handle |
| 140 |
interface. |
| 141 |
|
| 142 |
=item $fh->rbuf |
| 143 |
|
| 144 |
Returns the current contents of the read buffer (this is an lvalue, so you |
| 145 |
can change the read buffer if you like). |
| 146 |
|
| 147 |
You can use this function to implement your own optimized reader when neither |
| 148 |
readline nor sysread are viable candidates, like this: |
| 149 |
|
| 150 |
# first get the _real_ non-blocking filehandle |
| 151 |
# and fetch a reference to the read buffer |
| 152 |
my $nb_fh = $fh->fh; |
| 153 |
my $buf = \$fh->rbuf; |
| 154 |
|
| 155 |
for(;;) { |
| 156 |
# now use buffer contents, modifying |
| 157 |
# if necessary to reflect the removed data |
| 158 |
|
| 159 |
last if $$buf ne ""; # we have leftover data |
| 160 |
|
| 161 |
# read another buffer full of data |
| 162 |
$fh->readable or die "end of file"; |
| 163 |
sysread $nb_fh, $$buf, 8192; |
| 164 |
} |
| 165 |
|
| 166 |
=cut |
| 167 |
|
| 168 |
sub fh { |
| 169 |
(tied ${$_[0]})->[0]; |
| 170 |
} |
| 171 |
|
| 172 |
sub rbuf : lvalue { |
| 173 |
(tied ${$_[0]})->[3]; |
| 174 |
} |
| 175 |
|
| 176 |
sub DESTROY { |
| 177 |
# nop |
| 178 |
} |
| 179 |
|
| 180 |
sub AUTOLOAD { |
| 181 |
my $self = tied ${$_[0]}; |
| 182 |
|
| 183 |
(my $func = $AUTOLOAD) =~ s/^(.*):://; |
| 184 |
|
| 185 |
my $forward = UNIVERSAL::can $self->[7], $func; |
| 186 |
|
| 187 |
$forward or |
| 188 |
die "Can't locate object method \"$func\" via package \"" . (ref $self) . "\""; |
| 189 |
|
| 190 |
goto &$forward; |
| 191 |
} |
| 192 |
|
| 193 |
package Coro::Handle::FH; |
| 194 |
|
| 195 |
BEGIN { eval { require warnings } && warnings->unimport ("uninitialized") } |
| 196 |
|
| 197 |
use Fcntl (); |
| 198 |
use Errno (); |
| 199 |
use Carp 'croak'; |
| 200 |
|
| 201 |
use Coro::Event; |
| 202 |
use Event::Watcher qw(R W E); |
| 203 |
|
| 204 |
# formerly a hash, but we are speed-critical, so try |
| 205 |
# to be faster even if it hurts. |
| 206 |
# |
| 207 |
# 0 FH |
| 208 |
# 1 desc |
| 209 |
# 2 timeout |
| 210 |
# 3 rb |
| 211 |
# 4 wb # unused |
| 212 |
# 5 rw |
| 213 |
# 6 ww |
| 214 |
# 7 forward class |
| 215 |
# 8 blocking |
| 216 |
|
| 217 |
sub TIEHANDLE { |
| 218 |
my ($class, %arg) = @_; |
| 219 |
|
| 220 |
my $self = bless [], $class; |
| 221 |
$self->[0] = $arg{fh}; |
| 222 |
$self->[1] = $arg{desc}; |
| 223 |
$self->[2] = $arg{timeout}; |
| 224 |
$self->[3] = ""; |
| 225 |
$self->[4] = ""; |
| 226 |
$self->[7] = $arg{forward_class}; |
| 227 |
$self->[8] = $arg{partial}; |
| 228 |
|
| 229 |
fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK |
| 230 |
or croak "fcntl(O_NONBLOCK): $!"; |
| 231 |
|
| 232 |
$self |
| 233 |
} |
| 234 |
|
| 235 |
sub cleanup { |
| 236 |
$_[0][3] = ""; |
| 237 |
($_[0][5])->cancel if defined $_[0][5]; $_[0][5] = undef; |
| 238 |
|
| 239 |
$_[0][4] = ""; |
| 240 |
($_[0][6])->cancel if defined $_[0][6]; $_[0][6] = undef; |
| 241 |
} |
| 242 |
|
| 243 |
sub OPEN { |
| 244 |
&cleanup; |
| 245 |
my $self = shift; |
| 246 |
my $r = @_ == 2 ? open $self->[0], $_[0], $_[1] |
| 247 |
: open $self->[0], $_[0], $_[1], $_[2]; |
| 248 |
if ($r) { |
| 249 |
fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK |
| 250 |
or croak "fcntl(O_NONBLOCK): $!"; |
| 251 |
} |
| 252 |
$r; |
| 253 |
} |
| 254 |
|
| 255 |
sub PRINT { |
| 256 |
WRITE(shift, join "", @_); |
| 257 |
} |
| 258 |
|
| 259 |
sub PRINTF { |
| 260 |
WRITE(shift, sprintf(shift,@_)); |
| 261 |
} |
| 262 |
|
| 263 |
sub GETC { |
| 264 |
my $buf; |
| 265 |
READ($_[0], $buf, 1); |
| 266 |
$buf; |
| 267 |
} |
| 268 |
|
| 269 |
sub BINMODE { |
| 270 |
binmode $_[0][0]; |
| 271 |
} |
| 272 |
|
| 273 |
sub TELL { |
| 274 |
use Carp (); Carp::croak("Coro::Handle's don't support tell()"); |
| 275 |
} |
| 276 |
|
| 277 |
sub SEEK { |
| 278 |
use Carp (); Carp::croak("Coro::Handle's don't support seek()"); |
| 279 |
} |
| 280 |
|
| 281 |
sub EOF { |
| 282 |
use Carp (); Carp::croak("Coro::Handle's don't support eof()"); |
| 283 |
} |
| 284 |
|
| 285 |
sub CLOSE { |
| 286 |
&cleanup; |
| 287 |
close $_[0][0]; |
| 288 |
} |
| 289 |
|
| 290 |
sub DESTROY { |
| 291 |
&cleanup; |
| 292 |
} |
| 293 |
|
| 294 |
sub FILENO { |
| 295 |
fileno $_[0][0]; |
| 296 |
} |
| 297 |
|
| 298 |
# seems to be called for stringification (how weird), at least |
| 299 |
# when DumpValue::dumpValue is used to print this. |
| 300 |
sub FETCH { |
| 301 |
"$_[0]<$_[0][1]>"; |
| 302 |
} |
| 303 |
|
| 304 |
sub readable { |
| 305 |
($_[0][5] ||= Coro::Event->io( |
| 306 |
fd => $_[0][0], |
| 307 |
desc => "$_[0][1] R", |
| 308 |
timeout => $_[0][2], |
| 309 |
poll => R+E, |
| 310 |
))->next->{Coro::Event}[5] & R; |
| 311 |
} |
| 312 |
|
| 313 |
sub writable { |
| 314 |
($_[0][6] ||= Coro::Event->io( |
| 315 |
fd => $_[0][0], |
| 316 |
desc => "$_[0][1] W", |
| 317 |
timeout => $_[0][2], |
| 318 |
poll => W+E, |
| 319 |
))->next->{Coro::Event}[5] & W; |
| 320 |
} |
| 321 |
|
| 322 |
sub WRITE { |
| 323 |
my $len = defined $_[2] ? $_[2] : length $_[1]; |
| 324 |
my $ofs = $_[3]; |
| 325 |
my $res = 0; |
| 326 |
|
| 327 |
while() { |
| 328 |
my $r = syswrite $_[0][0], $_[1], $len, $ofs; |
| 329 |
if (defined $r) { |
| 330 |
$len -= $r; |
| 331 |
$ofs += $r; |
| 332 |
$res += $r; |
| 333 |
last unless $len; |
| 334 |
} elsif ($! != Errno::EAGAIN) { |
| 335 |
last; |
| 336 |
} |
| 337 |
last unless &writable; |
| 338 |
} |
| 339 |
|
| 340 |
return $res; |
| 341 |
} |
| 342 |
|
| 343 |
sub READ { |
| 344 |
my $len = $_[2]; |
| 345 |
my $ofs = $_[3]; |
| 346 |
my $res = 0; |
| 347 |
|
| 348 |
# first deplete the read buffer |
| 349 |
if (length $_[0][3]) { |
| 350 |
my $l = length $_[0][3]; |
| 351 |
if ($l <= $len) { |
| 352 |
substr($_[1], $ofs) = $_[0][3]; $_[0][3] = ""; |
| 353 |
$len -= $l; |
| 354 |
$ofs += $l; |
| 355 |
$res += $l; |
| 356 |
return $res unless $len; |
| 357 |
} else { |
| 358 |
substr($_[1], $ofs) = substr($_[0][3], 0, $len); |
| 359 |
substr($_[0][3], 0, $len) = ""; |
| 360 |
return $len; |
| 361 |
} |
| 362 |
} |
| 363 |
|
| 364 |
while() { |
| 365 |
my $r = sysread $_[0][0], $_[1], $len, $ofs; |
| 366 |
if (defined $r) { |
| 367 |
$len -= $r; |
| 368 |
$ofs += $r; |
| 369 |
$res += $r; |
| 370 |
last unless $len && $r; |
| 371 |
} elsif ($! != Errno::EAGAIN) { |
| 372 |
last; |
| 373 |
} |
| 374 |
last if $_[0][8] || !&readable; |
| 375 |
} |
| 376 |
|
| 377 |
return $res; |
| 378 |
} |
| 379 |
|
| 380 |
sub READLINE { |
| 381 |
my $irs = @_ > 1 ? $_[1] : $/; |
| 382 |
|
| 383 |
while() { |
| 384 |
my $pos = index $_[0][3], $irs; |
| 385 |
if ($pos >= 0) { |
| 386 |
$pos += length $irs; |
| 387 |
my $res = substr $_[0][3], 0, $pos; |
| 388 |
substr ($_[0][3], 0, $pos) = ""; |
| 389 |
return $res; |
| 390 |
} |
| 391 |
|
| 392 |
my $r = sysread $_[0][0], $_[0][3], 8192, length $_[0][3]; |
| 393 |
if (defined $r) { |
| 394 |
return undef unless $r; |
| 395 |
} elsif ($! != Errno::EAGAIN || !&readable) { |
| 396 |
return undef; |
| 397 |
} |
| 398 |
} |
| 399 |
} |
| 400 |
|
| 401 |
1; |
| 402 |
|
| 403 |
=back |
| 404 |
|
| 405 |
=head1 BUGS |
| 406 |
|
| 407 |
- Perl's IO-Handle model is THE bug. |
| 408 |
|
| 409 |
=head1 AUTHOR |
| 410 |
|
| 411 |
Marc Lehmann <schmorp@schmorp.de> |
| 412 |
http://home.schmorp.de/ |
| 413 |
|
| 414 |
=cut |
| 415 |
|