ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Handle.pm
Revision: 1.13
Committed: Tue Nov 7 11:59:54 2006 UTC (17 years, 7 months ago) by root
Branch: MAIN
CVS Tags: rel-2_5
Changes since 1.12: +235 -79 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 Coro::Handle - non-blocking io with a blocking interface.
4
5 =head1 SYNOPSIS
6
7 use Coro::Handle;
8
9 =head1 DESCRIPTION
10
11 This module implements IO-handles in a coroutine-compatible way, that is,
12 other coroutines can run while reads or writes block on the handle. It
13 does NOT inherit from IO::Handle but uses tied objects.
14
15 =over 4
16
17 =cut
18
19 package Coro::Handle;
20
21 BEGIN { eval { require warnings } && warnings->unimport ("uninitialized") }
22
23 use Errno ();
24 use base 'Exporter';
25
26 $VERSION = 2.5;
27
28 @EXPORT = qw(unblock);
29
30 =item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...]
31
32 Create a new non-blocking io-handle using the given
33 perl-filehandle. Returns undef if no fhandle is given. The only other
34 supported argument is "timeout", which sets a timeout for each operation.
35
36 =cut
37
38 sub new_from_fh {
39 my $class = shift;
40 my $fh = shift or return;
41 my $self = do { local *Coro::Handle };
42
43 my ($package, $filename, $line) = caller;
44 $filename =~ s/^.*[\/\\]//;
45
46 tie $self, Coro::Handle::FH, fh => $fh, desc => "$filename:$line", @_;
47
48 my $_fh = select bless \$self, ref $class ? ref $class : $class; $| = 1; select $_fh;
49 }
50
51 =item $fh = unblock $fh
52
53 This is a convinience function that just calls C<new_from_fh> on the given
54 filehandle. Use it to replace a normal perl filehandle by a non-blocking
55 equivalent.
56
57 =cut
58
59 sub unblock($) {
60 new_from_fh Coro::Handle $_[0];
61 }
62
63 =item $fh->writable, $fh->readable
64
65 Wait until the filehandle is readable or writable (and return true) or
66 until an error condition happens (and return false).
67
68 =cut
69
70 sub readable { Coro::Handle::FH::readable(tied ${$_[0]}) }
71 sub writable { Coro::Handle::FH::writable(tied ${$_[0]}) }
72
73 =item $fh->readline([$terminator])
74
75 Like the builtin of the same name, but allows you to specify the input
76 record separator in a coroutine-safe manner (i.e. not using a global
77 variable).
78
79 =cut
80
81 sub readline { tied(${+shift})->READLINE(@_) }
82
83 =item $fh->autoflush([...])
84
85 Always returns true, arguments are being ignored (exists for compatibility
86 only). Might change in the future.
87
88 =cut
89
90 sub autoflush { !0 }
91
92 =item $fh->fileno, $fh->close, $fh->read, $fh->sysread, $fh->syswrite, $fh->print, $fh->printf
93
94 Work like their function equivalents (except read, which works like
95 sysread. You should not use the read function with Coro::Handles, it will
96 work but it's not efficient).
97
98 =cut
99
100 sub read { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
101 sub sysread { Coro::Handle::FH::READ (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
102 sub syswrite { Coro::Handle::FH::WRITE (tied ${$_[0]}, $_[1], $_[2], $_[3]) }
103 sub print { Coro::Handle::FH::WRITE (tied ${+shift}, join "", @_) }
104 sub printf { Coro::Handle::FH::PRINTF (tied ${+shift}, @_) }
105 sub fileno { Coro::Handle::FH::FILENO (tied ${$_[0]}) }
106 sub close { Coro::Handle::FH::CLOSE (tied ${$_[0]}) }
107 sub blocking { !0 } # this handler always blocks the caller
108
109 sub partial {
110 my $obj = tied ${$_[0]};
111
112 my $retval = $obj->[8];
113 $obj->[8] = $_[1] if @_ > 1;
114 $retval
115 }
116
117 =item $fh->timeout([...])
118
119 The optional argument sets the new timeout (in seconds) for this
120 handle. Returns the current (new) value.
121
122 C<0> is a valid timeout, use C<undef> to disable the timeout.
123
124 =cut
125
126 sub timeout {
127 my $self = tied(${$_[0]});
128 if (@_ > 1) {
129 $self->[2] = $_[1];
130 $self->[5]->timeout($_[1]) if $self->[5];
131 $self->[6]->timeout($_[1]) if $self->[6];
132 }
133 $self->[2];
134 }
135
136 =item $fh->fh
137
138 Returns the "real" (non-blocking) filehandle. Use this if you want to
139 do operations on the file handle you cannot do using the Coro::Handle
140 interface.
141
142 =item $fh->rbuf
143
144 Returns the current contents of the read buffer (this is an lvalue, so you
145 can change the read buffer if you like).
146
147 You can use this function to implement your own optimized reader when neither
148 readline nor sysread are viable candidates, like this:
149
150 # first get the _real_ non-blocking filehandle
151 # and fetch a reference to the read buffer
152 my $nb_fh = $fh->fh;
153 my $buf = \$fh->rbuf;
154
155 for(;;) {
156 # now use buffer contents, modifying
157 # if necessary to reflect the removed data
158
159 last if $$buf ne ""; # we have leftover data
160
161 # read another buffer full of data
162 $fh->readable or die "end of file";
163 sysread $nb_fh, $$buf, 8192;
164 }
165
166 =cut
167
168 sub fh {
169 (tied ${$_[0]})->[0];
170 }
171
172 sub rbuf : lvalue {
173 (tied ${$_[0]})->[3];
174 }
175
176 sub DESTROY {
177 # nop
178 }
179
180 sub AUTOLOAD {
181 my $self = tied ${$_[0]};
182
183 (my $func = $AUTOLOAD) =~ s/^(.*):://;
184
185 my $forward = UNIVERSAL::can $self->[7], $func;
186
187 $forward or
188 die "Can't locate object method \"$func\" via package \"" . (ref $self) . "\"";
189
190 goto &$forward;
191 }
192
193 package Coro::Handle::FH;
194
195 BEGIN { eval { require warnings } && warnings->unimport ("uninitialized") }
196
197 use Fcntl ();
198 use Errno ();
199 use Carp 'croak';
200
201 use AnyEvent;
202
203 # formerly a hash, but we are speed-critical, so try
204 # to be faster even if it hurts.
205 #
206 # 0 FH
207 # 1 desc
208 # 2 timeout
209 # 3 rb
210 # 4 wb # unused
211 # 5 unused
212 # 6 unused
213 # 7 forward class
214 # 8 blocking
215
216 sub TIEHANDLE {
217 my ($class, %arg) = @_;
218
219 my $self = bless [], $class;
220 $self->[0] = $arg{fh};
221 $self->[1] = $arg{desc};
222 $self->[2] = $arg{timeout};
223 $self->[3] = "";
224 $self->[4] = "";
225 $self->[7] = $arg{forward_class};
226 $self->[8] = $arg{partial};
227
228 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
229 or croak "fcntl(O_NONBLOCK): $!";
230
231 $self
232 }
233
234 sub cleanup {
235 $_[0][3] = "";
236 $_[0][4] = "";
237 }
238
239 sub OPEN {
240 &cleanup;
241 my $self = shift;
242 my $r = @_ == 2 ? open $self->[0], $_[0], $_[1]
243 : open $self->[0], $_[0], $_[1], $_[2];
244 if ($r) {
245 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
246 or croak "fcntl(O_NONBLOCK): $!";
247 }
248 $r;
249 }
250
251 sub PRINT {
252 WRITE(shift, join "", @_);
253 }
254
255 sub PRINTF {
256 WRITE(shift, sprintf(shift,@_));
257 }
258
259 sub GETC {
260 my $buf;
261 READ($_[0], $buf, 1);
262 $buf;
263 }
264
265 sub BINMODE {
266 binmode $_[0][0];
267 }
268
269 sub TELL {
270 use Carp (); Carp::croak("Coro::Handle's don't support tell()");
271 }
272
273 sub SEEK {
274 use Carp (); Carp::croak("Coro::Handle's don't support seek()");
275 }
276
277 sub EOF {
278 use Carp (); Carp::croak("Coro::Handle's don't support eof()");
279 }
280
281 sub CLOSE {
282 &cleanup;
283 close $_[0][0];
284 }
285
286 sub DESTROY {
287 &cleanup;
288 }
289
290 sub FILENO {
291 fileno $_[0][0];
292 }
293
294 # seems to be called for stringification (how weird), at least
295 # when DumpValue::dumpValue is used to print this.
296 sub FETCH {
297 "$_[0]<$_[0][1]>";
298 }
299
300 sub readable {
301 my $current = $Coro::current;
302 my $io = 1;
303
304 my $w = AnyEvent->io (
305 fh => $_[0][0],
306 desc => "$_[0][1] readable",
307 poll => 'r',
308 cb => sub {
309 $current->ready;
310 },
311 );
312
313 my $t = $_[0][2] && AnyEvent->timer (
314 after => $_[0][2],
315 cb => sub {
316 $io = 0;
317 $current->ready;
318 },
319 );
320
321 &Coro::schedule;
322 $io
323 }
324
325 sub writable {
326 my $current = $Coro::current;
327 my $io = 1;
328
329 my $w = AnyEvent->io (
330 fh => $_[0][0],
331 desc => "$_[0][1] writable",
332 poll => 'w',
333 cb => sub {
334 $current->ready;
335 },
336 );
337
338 my $t = $_[0][2] && AnyEvent->timer (
339 after => $_[0][2],
340 cb => sub {
341 $io = 0;
342 $current->ready;
343 },
344 );
345
346 &Coro::schedule;
347 $io
348 }
349
350 sub WRITE {
351 my $len = defined $_[2] ? $_[2] : length $_[1];
352 my $ofs = $_[3];
353 my $res = 0;
354
355 while() {
356 my $r = syswrite $_[0][0], $_[1], $len, $ofs;
357 if (defined $r) {
358 $len -= $r;
359 $ofs += $r;
360 $res += $r;
361 last unless $len;
362 } elsif ($! != Errno::EAGAIN) {
363 last;
364 }
365 last unless &writable;
366 }
367
368 return $res;
369 }
370
371 sub READ {
372 my $len = $_[2];
373 my $ofs = $_[3];
374 my $res = 0;
375
376 # first deplete the read buffer
377 if (length $_[0][3]) {
378 my $l = length $_[0][3];
379 if ($l <= $len) {
380 substr($_[1], $ofs) = $_[0][3]; $_[0][3] = "";
381 $len -= $l;
382 $ofs += $l;
383 $res += $l;
384 return $res unless $len;
385 } else {
386 substr($_[1], $ofs) = substr($_[0][3], 0, $len);
387 substr($_[0][3], 0, $len) = "";
388 return $len;
389 }
390 }
391
392 while() {
393 my $r = sysread $_[0][0], $_[1], $len, $ofs;
394 if (defined $r) {
395 $len -= $r;
396 $ofs += $r;
397 $res += $r;
398 last unless $len && $r;
399 } elsif ($! != Errno::EAGAIN) {
400 last;
401 }
402 last if $_[0][8] || !&readable;
403 }
404
405 return $res;
406 }
407
408 sub READLINE {
409 my $irs = @_ > 1 ? $_[1] : $/;
410
411 while() {
412 my $pos = index $_[0][3], $irs;
413 if ($pos >= 0) {
414 $pos += length $irs;
415 my $res = substr $_[0][3], 0, $pos;
416 substr ($_[0][3], 0, $pos) = "";
417 return $res;
418 }
419
420 my $r = sysread $_[0][0], $_[0][3], 8192, length $_[0][3];
421 if (defined $r) {
422 return undef unless $r;
423 } elsif ($! != Errno::EAGAIN || !&readable) {
424 return undef;
425 }
426 }
427 }
428
429 1;
430
431 =back
432
433 =head1 BUGS
434
435 - Perl's IO-Handle model is THE bug.
436
437 =head1 AUTHOR
438
439 Marc Lehmann <schmorp@schmorp.de>
440 http://home.schmorp.de/
441
442 =cut
443