ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.7 by root, Thu May 1 16:35:40 2008 UTC vs.
Revision 1.8 by root, Fri May 2 15:36:10 2008 UTC

1package AnyEvent::Handle; 1package AnyEvent::Handle;
2 2
3no warnings; 3no warnings;
4use strict; 4use strict;
5 5
6use AnyEvent; 6use AnyEvent ();
7use IO::Handle; 7use AnyEvent::Util ();
8use Scalar::Util ();
9use Carp ();
10use Fcntl ();
8use Errno qw/EAGAIN EINTR/; 11use Errno qw/EAGAIN EINTR/;
9 12
10=head1 NAME 13=head1 NAME
11 14
12AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent 15AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent
13 16
14=head1 VERSION
15
16Version 0.01
17
18=cut 17=cut
19 18
20our $VERSION = '0.01'; 19our $VERSION = '0.02';
21 20
22=head1 SYNOPSIS 21=head1 SYNOPSIS
23 22
24 use AnyEvent; 23 use AnyEvent;
25 use AnyEvent::Handle; 24 use AnyEvent::Handle;
26 25
27 my $cv = AnyEvent->condvar; 26 my $cv = AnyEvent->condvar;
28 27
29 my $ae_fh = AnyEvent::Handle->new (fh => \*STDIN); 28 my $ae_fh = AnyEvent::Handle->new (fh => \*STDIN);
30 29
31 $ae_fh->on_eof (sub { $cv->broadcast }); 30 #TODO
32
33 $ae_fh->readlines (sub {
34 my ($ae_fh, @lines) = @_;
35 for (@lines) {
36 chomp;
37 print "Line: $_";
38 }
39 });
40 31
41 # or use the constructor to pass the callback: 32 # or use the constructor to pass the callback:
42 33
43 my $ae_fh2 = 34 my $ae_fh2 =
44 AnyEvent::Handle->new ( 35 AnyEvent::Handle->new (
45 fh => \*STDIN, 36 fh => \*STDIN,
46 on_eof => sub { 37 on_eof => sub {
47 $cv->broadcast; 38 $cv->broadcast;
48 }, 39 },
49 on_readline => sub { 40 #TODO
50 my ($ae_fh, @lines) = @_; 41 );
51 for (@lines) { 42
52 chomp; 43 $cv->wait;
53 print "Line: $_"; 44
54 } 45=head1 DESCRIPTION
46
47This module is a helper module to make it easier to do event-based I/O on
48filehandles (and sockets, see L<AnyEvent::Socket> for an easy way to make
49non-blocking resolves and connects).
50
51In the following, when the documentation refers to of "bytes" then this
52means characters. As sysread and syswrite are used for all I/O, their
53treatment of characters applies to this module as well.
54
55All callbacks will be invoked with the handle object as their first
56argument.
57
58=head1 METHODS
59
60=over 4
61
62=item B<new (%args)>
63
64The constructor supports these arguments (all as key => value pairs).
65
66=over 4
67
68=item fh => $filehandle [MANDATORY]
69
70The filehandle this L<AnyEvent::Handle> object will operate on.
71
72NOTE: The filehandle will be set to non-blocking (using
73AnyEvent::Util::fh_nonblocking).
74
75=item on_error => $cb->($self) [MANDATORY]
76
77This is the fatal error callback, that is called when a fatal error ocurs,
78such as not being able to resolve the hostname, failure to connect or a
79read error.
80
81The object will not be in a usable state when this callback has been
82called.
83
84On callback entrance, the value of C<$!> contains the opertaing system
85error (or C<ENOSPC> or C<EPIPE>).
86
87=item on_eof => $cb->($self) [MANDATORY]
88
89Set the callback to be called on EOF.
90
91=item on_read => $cb->($self)
92
93This sets the default read callback, which is called when data arrives
94and no read request is in the queue. If the read callback is C<undef>
95or has never been set, than AnyEvent::Handle will cease reading from the
96filehandle.
97
98To access (and remove data from) the read buffer, use the C<< ->rbuf >>
99method or acces sthe C<$self->{rbuf}> member directly.
100
101When an EOF condition is detected then AnyEvent::Handle will first try to
102feed all the remaining data to the queued callbacks and C<on_read> before
103calling the C<on_eof> callback. If no progress can be made, then a fatal
104error will be raised (with C<$!> set to C<EPIPE>).
105
106=item on_drain => $cb->()
107
108This sets the callback that is called when the write buffer becomes empty
109(or when the callback is set and the buffer is empty already).
110
111To append to the write buffer, use the C<< ->push_write >> method.
112
113=item rbuf_max => <bytes>
114
115If defined, then a fatal error will be raised (with C<$!> set to C<ENOSPC>)
116when the read buffer ever (strictly) exceeds this size. This is useful to
117avoid denial-of-service attacks.
118
119For example, a server accepting connections from untrusted sources should
120be configured to accept only so-and-so much data that it cannot act on
121(for example, when expecting a line, an attacker could send an unlimited
122amount of data without a callback ever being called as long as the line
123isn't finished).
124
125=item read_size => <bytes>
126
127The default read block size (the amount of bytes this module will try to read
128on each [loop iteration). Default: C<4096>.
129
130=item low_water_mark => <bytes>
131
132Sets the amount of bytes (default: C<0>) that make up an "empty" write
133buffer: If the write reaches this size or gets even samller it is
134considered empty.
135
136=back
137
138=cut
139
140sub new {
141 my $class = shift;
142
143 my $self = bless { @_ }, $class;
144
145 $self->{fh} or Carp::croak "mandatory argument fh is missing";
146
147 AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
148
149 $self->on_error ((delete $self->{on_error}) or Carp::croak "mandatory argument on_error is missing");
150 $self->on_eof ((delete $self->{on_eof} ) or Carp::croak "mandatory argument on_eof is missing");
151
152 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
153 $self->on_read (delete $self->{on_read} ) if $self->{on_read};
154
155 $self
156}
157
158sub _shutdown {
159 my ($self) = @_;
160
161 delete $self->{rw};
162 delete $self->{ww};
163 delete $self->{fh};
164}
165
166sub error {
167 my ($self) = @_;
168
169 {
170 local $!;
171 $self->_shutdown;
172 }
173
174 $self->{on_error}($self);
175}
176
177=item $fh = $handle->fh
178
179This method returns the filehandle of the L<AnyEvent::Handle> object.
180
181=cut
182
183sub fh { $_[0]->{fh} }
184
185=item $handle->on_error ($cb)
186
187Replace the current C<on_error> callback (see the C<on_error> constructor argument).
188
189=cut
190
191sub on_error {
192 $_[0]{on_error} = $_[1];
193}
194
195=item $handle->on_eof ($cb)
196
197Replace the current C<on_eof> callback (see the C<on_eof> constructor argument).
198
199=cut
200
201#############################################################################
202
203sub on_eof {
204 $_[0]{on_eof} = $_[1];
205}
206
207=item $handle->on_drain ($cb)
208
209Sets the C<on_drain> callback or clears it (see the description of
210C<on_drain> in the constructor).
211
212=cut
213
214sub on_drain {
215 my ($self, $cb) = @_;
216
217 $self->{on_drain} = $cb;
218
219 $cb->($self)
220 if $cb && $self->{low_water_mark} >= length $self->{wbuf};
221}
222
223=item $handle->push_write ($data)
224
225Queues the given scalar to be written. You can push as much data as you
226want (only limited by the available memory), as C<AnyEvent::Handle>
227buffers it independently of the kernel.
228
229=cut
230
231sub push_write {
232 my ($self, $data) = @_;
233
234 $self->{wbuf} .= $data;
235
236 unless ($self->{ww}) {
237 Scalar::Util::weaken $self;
238 my $cb = sub {
239 my $len = syswrite $self->{fh}, $self->{wbuf};
240
241 if ($len > 0) {
242 substr $self->{wbuf}, 0, $len, "";
243
244
245 $self->{on_drain}($self)
246 if $self->{low_water_mark} >= length $self->{wbuf}
247 && $self->{on_drain};
248
249 delete $self->{ww} unless length $self->{wbuf};
250 } elsif ($! != EAGAIN && $! != EINTR) {
251 $self->error;
55 } 252 }
56 ); 253 };
57 254
58 $cv->wait; 255 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb);
59 256
60=head1 DESCRIPTION 257 $cb->($self);
61
62This module is a helper module to make it easier to do non-blocking I/O
63on filehandles (and sockets, see L<AnyEvent::Socket>).
64
65The event loop is provided by L<AnyEvent>.
66
67=head1 METHODS
68
69=over 4
70
71=item B<new (%args)>
72
73The constructor has these arguments:
74
75=over 4
76
77=item fh => $filehandle
78
79The filehandle this L<AnyEvent::Handle> object will operate on.
80
81NOTE: The filehandle will be set to non-blocking.
82
83=item read_block_size => $size
84
85The default read block size use for reads via the C<on_read>
86method.
87
88=item on_read => $cb
89
90=item on_eof => $cb
91
92=item on_error => $cb
93
94These are shortcuts, that will call the corresponding method and set the callback to C<$cb>.
95
96=item on_readline => $cb
97
98The C<readlines> method is called with the default separated and C<$cb> as callback
99for you.
100
101=back
102
103=cut
104
105sub new {
106 my $this = shift;
107 my $class = ref($this) || $this;
108 my $self = {
109 read_block_size => 4096,
110 rbuf => '',
111 @_
112 }; 258 };
113 bless $self, $class; 259}
114 260
115 $self->{fh}->blocking (0) if $self->{fh}; 261#############################################################################
116 262
117 if ($self->{on_read}) { 263sub _drain_rbuf {
118 $self->on_read ($self->{on_read}); 264 my ($self) = @_;
119 265
266 return if exists $self->{in_drain};
267 local $self->{in_drain} = 1;
268
269 while (my $len = length $self->{rbuf}) {
270 no strict 'refs';
271 if (@{ $self->{queue} }) {
272 if ($self->{queue}[0]($self)) {
273 shift @{ $self->{queue} };
274 } elsif ($self->{eof}) {
275 # no progress can be made (not enough data and no data forthcoming)
276 $! = &Errno::EPIPE; return $self->error;
277 } else {
278 return;
279 }
120 } elsif ($self->{on_readline}) { 280 } elsif ($self->{on_read}) {
121 $self->readlines ($self->{on_readline}); 281 $self->{on_read}($self);
122 282
283 if (
284 $self->{eof} # if no further data will arrive
285 && $len == length $self->{rbuf} # and no data has been consumed
286 && !@{ $self->{queue} } # and the queue is still empty
287 && $self->{on_read} # and we still want to read data
288 ) {
289 # then no progress can be made
290 $! = &Errno::EPIPE; return $self->error;
291 }
292 } else {
293 # read side becomes idle
294 delete $self->{rw};
295 return;
296 }
297 }
298
123 } elsif ($self->{on_eof}) { 299 if ($self->{eof}) {
124 $self->on_eof ($self->{on_eof}); 300 $self->_shutdown;
125 301 $self->{on_eof}($self);
126 } elsif ($self->{on_error}) {
127 $self->on_eof ($self->{on_error});
128 } 302 }
129
130 return $self
131} 303}
132 304
133=item B<fh> 305=item $handle->on_read ($cb)
134 306
135This method returns the filehandle of the L<AnyEvent::Handle> object. 307This replaces the currently set C<on_read> callback, or clears it (when
136 308the new callback is C<undef>). See the description of C<on_read> in the
137=cut 309constructor.
138
139sub fh { $_[0]->{fh} }
140
141=item B<on_read ($callback)>
142
143This method installs a C<$callback> that will be called
144when new data arrived. You can access the read buffer via the C<rbuf>
145method (see below).
146
147The first argument of the C<$callback> will be the L<AnyEvent::Handle> object.
148 310
149=cut 311=cut
150 312
151sub on_read { 313sub on_read {
152 my ($self, $cb) = @_; 314 my ($self, $cb) = @_;
315
153 $self->{on_read} = $cb; 316 $self->{on_read} = $cb;
154 317
155 unless (defined $self->{on_read}) { 318 unless ($self->{rw} || $self->{eof}) {
156 delete $self->{on_read_w}; 319 Scalar::Util::weaken $self;
157 return; 320
158 } 321 $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
159
160 $self->{on_read_w} =
161 AnyEvent->io (poll => 'r', fh => $self->{fh}, cb => sub {
162 #d# warn "READ:[$self->{read_size}] $self->{read_block_size} : ".length ($self->{rbuf})."\n";
163 my $rbuf_len = length $self->{rbuf};
164 my $l;
165 if (defined $self->{read_size}) {
166 $l = sysread $self->{fh}, $self->{rbuf},
167 ($self->{read_size} - $rbuf_len), $rbuf_len;
168 } else {
169 $l = sysread $self->{fh}, $self->{rbuf}, $self->{read_block_size}, $rbuf_len; 322 my $len = sysread $self->{fh}, $self->{rbuf}, $self->{read_size} || 8192, length $self->{rbuf};
323
324 if ($len > 0) {
325 if (exists $self->{rbuf_max}) {
326 if ($self->{rbuf_max} < length $self->{rbuf}) {
327 $! = &Errno::ENOSPC; return $self->error;
328 }
329 }
330
331 } elsif (defined $len) {
332 $self->{eof} = 1;
333 delete $self->{rw};
334
335 } elsif ($! != EAGAIN && $! != EINTR) {
336 return $self->error;
170 } 337 }
171 #d# warn "READL $l [$self->{rbuf}]\n";
172 338
173 if (not defined $l) { 339 $self->_drain_rbuf;
174 return if $! == EAGAIN || $! == EINTR;
175 $self->{on_error}->($self) if $self->{on_error};
176 delete $self->{on_read_w};
177
178 } elsif ($l == 0) {
179 $self->{on_eof}->($self) if $self->{on_eof};
180 delete $self->{on_read_w};
181
182 } else {
183 $self->{on_read}->($self);
184 }
185 }); 340 });
341 }
186} 342}
187 343
188=item B<on_error ($callback)> 344=item $handle->rbuf
189 345
190Whenever a read or write operation resulted in an error the C<$callback> 346Returns the read buffer (as a modifiable lvalue).
191will be called.
192 347
193The first argument of C<$callback> will be the L<AnyEvent::Handle> object itself. 348You can access the read buffer directly as the C<< ->{rbuf} >> member, if
194The error is given as errno in C<$!>. 349you want.
195 350
196=cut
197
198sub on_error {
199 $_[0]->{on_error} = $_[1];
200}
201
202=item B<on_eof ($callback)>
203
204Installs the C<$callback> that will be called when the end of file is
205encountered in a read operation this C<$callback> will be called. The first
206argument will be the L<AnyEvent::Handle> object itself.
207
208=cut
209
210sub on_eof {
211 $_[0]->{on_eof} = $_[1];
212}
213
214=item B<rbuf>
215
216Returns a reference to the read buffer.
217
218NOTE: The read buffer should only be used or modified if the C<on_read> 351NOTE: The read buffer should only be used or modified if the C<on_read>,
219method is used directly. The C<read> and C<readlines> methods will provide 352C<push_read> or C<unshift_read> methods are used. The other read methods
220the read data to their callbacks. 353automatically manage the read buffer.
221 354
222=cut 355=cut
223 356
224sub rbuf : lvalue { 357sub rbuf : lvalue {
225 $_[0]->{rbuf} 358 $_[0]{rbuf}
226} 359}
227 360
228=item B<read ($len, $callback)> 361=item $handle->push_read ($cb)
229 362
230Will read exactly C<$len> bytes from the filehandle and call the C<$callback> 363=item $handle->unshift_read ($cb)
231if done so. The first argument to the C<$callback> will be the L<AnyEvent::Handle>
232object itself and the second argument the read data.
233 364
234NOTE: This method will override any callbacks installed via the C<on_read> method. 365Append the given callback to the end of the queue (C<push_read>) or
366prepend it (C<unshift_read>).
235 367
236=cut 368The callback is called each time some additional read data arrives.
237 369
370It must check wether enough data is in the read buffer already.
371
372If not enough data is available, it must return the empty list or a false
373value, in which case it will be called repeatedly until enough data is
374available (or an error condition is detected).
375
376If enough data was available, then the callback must remove all data it is
377interested in (which can be none at all) and return a true value. After returning
378true, it will be removed from the queue.
379
380=cut
381
238sub read { 382sub push_read {
383 my ($self, $cb) = @_;
384
385 push @{ $self->{queue} }, $cb;
386 $self->_drain_rbuf;
387}
388
389sub unshift_read {
390 my ($self, $cb) = @_;
391
392 push @{ $self->{queue} }, $cb;
393 $self->_drain_rbuf;
394}
395
396=item $handle->push_read_chunk ($len, $cb->($self, $data))
397
398=item $handle->unshift_read_chunk ($len, $cb->($self, $data))
399
400Append the given callback to the end of the queue (C<push_read_chunk>) or
401prepend it (C<unshift_read_chunk>).
402
403The callback will be called only once C<$len> bytes have been read, and
404these C<$len> bytes will be passed to the callback.
405
406=cut
407
408sub _read_chunk($$) {
409 my ($len, $cb) = @_;
410
411 sub {
412 $len <= length $_[0]{rbuf} or return;
413 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, "");
414 1
415 }
416}
417
418sub push_read_chunk {
239 my ($self, $len, $cb) = @_; 419 my ($self, $len, $cb) = @_;
240 420
241 $self->{read_cb} = $cb; 421 $self->push_read (_read_chunk $len, $cb);
242 my $old_blk_size = $self->{read_block_size};
243 $self->{read_block_size} = $len;
244
245 $self->on_read (sub {
246 #d# warn "OFOFO $len || ".length($_[0]->{rbuf})."||\n";
247
248 if ($len == length $_[0]->{rbuf}) {
249 $_[0]->{read_block_size} = $old_blk_size;
250 $_[0]->on_read (undef);
251 $_[0]->{read_cb}->($_[0], (substr $self->{rbuf}, 0, $len, ''));
252 }
253 });
254} 422}
255 423
256=item B<readlines ($callback)>
257 424
258=item B<readlines ($sep, $callback)> 425sub unshift_read_chunk {
259
260This method will read lines from the filehandle, separated by C<$sep> or C<"\n">
261if C<$sep> is not provided. C<$sep> will be used as "line" separated.
262
263The C<$callback> will be called when at least one
264line could be read. The first argument to the C<$callback> will be the L<AnyEvent::Handle>
265object itself and the rest of the arguments will be the read lines.
266
267NOTE: This method will override any callbacks installed via the C<on_read> method.
268
269=cut
270
271sub readlines {
272 my ($self, $sep, $cb) = @_; 426 my ($self, $len, $cb) = @_;
273 427
274 if (ref $sep) { 428 $self->unshift_read (_read_chunk $len, $cb);
275 $cb = $sep; 429}
276 $sep = "\n";
277 430
278 } elsif (not defined $sep) { 431=item $handle->push_read_line ([$eol, ]$cb->($self, $line, $eol))
279 $sep = "\n";
280 }
281 432
282 my $sep_len = length $sep; 433=item $handle->unshift_read_line ([$eol, ]$cb->($self, $line, $eol))
283 434
284 $self->{on_readline} = $cb; 435Append the given callback to the end of the queue (C<push_read_line>) or
436prepend it (C<unshift_read_line>).
285 437
286 $self->on_read (sub { 438The callback will be called only once a full line (including the end of
287 my @lines; 439line marker, C<$eol>) has been read. This line (excluding the end of line
288 my $rb = \$_[0]->{rbuf}; 440marker) will be passed to the callback as second argument (C<$line>), and
441the end of line marker as the third argument (C<$eol>).
442
443The end of line marker, C<$eol>, can be either a string, in which case it
444will be interpreted as a fixed record end marker, or it can be a regex
445object (e.g. created by C<qr>), in which case it is interpreted as a
446regular expression.
447
448The end of line marker argument C<$eol> is optional, if it is missing (NOT
449undef), then C<qr|\015?\012|> is used (which is good for most internet
450protocols).
451
452Partial lines at the end of the stream will never be returned, as they are
453not marked by the end of line marker.
454
455=cut
456
457sub _read_line($$) {
458 my $cb = pop;
459 my $eol = @_ ? shift : qr|(\015?\012)|;
289 my $pos; 460 my $pos;
290 while (($pos = index ($$rb, $sep)) >= 0) { 461
291 push @lines, substr $$rb, 0, $pos + $sep_len, ''; 462 $eol = qr|(\Q$eol\E)| unless ref $eol;
463 $eol = qr|^(.*?)($eol)|;
464
465 sub {
466 $_[0]{rbuf} =~ s/$eol// or return;
467
468 $cb->($1, $2);
292 } 469 1
293 $self->{on_readline}->($_[0], @lines);
294 }); 470 }
295} 471}
296 472
297=item B<write ($data)> 473sub push_read_line {
298
299=item B<write ($callback)>
300
301=item B<write ($data, $callback)>
302
303This method will write C<$data> to the filehandle and call the C<$callback>
304afterwards. If only C<$callback> is provided it will be called when the
305write buffer becomes empty the next time (or immediately if it already is empty).
306
307=cut
308
309sub write {
310 my ($self, $data, $cb) = @_;
311 if (ref $data) { $cb = $data; undef $data }
312 push @{$self->{write_bufs}}, [$data, $cb];
313 $self->_check_writer;
314}
315
316sub _check_writer {
317 my ($self) = @_; 474 my $self = shift;
318 475
319 if ($self->{write_w}) { 476 $self->push_read (&_read_line);
320 unless ($self->{write_cb}) { 477}
321 while (@{$self->{write_bufs}} && not defined $self->{write_bufs}->[0]->[1]) {
322 my $wba = shift @{$self->{write_bufs}};
323 $self->{wbuf} .= $wba->[0];
324 }
325 }
326 return;
327 }
328 478
329 my $wba = shift @{$self->{write_bufs}} 479sub unshift_read_line {
330 or return; 480 my $self = shift;
331 481
332 unless (defined $wba->[0]) { 482 $self->unshift_read (&_read_line);
333 $wba->[1]->($self) if $wba->[1];
334 $self->_check_writer;
335 return;
336 }
337
338 $self->{wbuf} = $wba->[0];
339 $self->{write_cb} = $wba->[1];
340
341 $self->{write_w} =
342 AnyEvent->io (poll => 'w', fh => $self->{fh}, cb => sub {
343 my $l = syswrite $self->{fh}, $self->{wbuf}, length $self->{wbuf};
344
345 if (not defined $l) {
346 return if $! == EAGAIN || $! == EINTR;
347 delete $self->{write_w};
348 $self->{on_error}->($self) if $self->{on_error};
349
350 } else {
351 substr $self->{wbuf}, 0, $l, '';
352
353 if (length ($self->{wbuf}) == 0) {
354 $self->{write_cb}->($self) if $self->{write_cb};
355
356 delete $self->{write_w};
357 delete $self->{wbuf};
358 delete $self->{write_cb};
359
360 $self->_check_writer;
361 }
362 }
363 });
364} 483}
365 484
366=back 485=back
367 486
368=head1 AUTHOR 487=head1 AUTHOR
369 488
370Robin Redeker, C<< <elmex at ta-sa.org> >> 489Robin Redeker C<< <elmex at ta-sa.org> >>, Marc Lehmann <schmorp@schmorp.de>.
371 490
372=cut 491=cut
373 492
3741; # End of AnyEvent::Handle 4931; # End of AnyEvent::Handle

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines