ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Handle.pm (file contents):
Revision 1.7 by root, Thu May 1 16:35:40 2008 UTC vs.
Revision 1.9 by root, Fri May 2 16:07:46 2008 UTC

1package AnyEvent::Handle; 1package AnyEvent::Handle;
2 2
3no warnings; 3no warnings;
4use strict; 4use strict;
5 5
6use AnyEvent; 6use AnyEvent ();
7use IO::Handle; 7use AnyEvent::Util ();
8use Scalar::Util ();
9use Carp ();
10use Fcntl ();
8use Errno qw/EAGAIN EINTR/; 11use Errno qw/EAGAIN EINTR/;
9 12
10=head1 NAME 13=head1 NAME
11 14
12AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent 15AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent
13 16
14=head1 VERSION
15
16Version 0.01
17
18=cut 17=cut
19 18
20our $VERSION = '0.01'; 19our $VERSION = '0.02';
21 20
22=head1 SYNOPSIS 21=head1 SYNOPSIS
23 22
24 use AnyEvent; 23 use AnyEvent;
25 use AnyEvent::Handle; 24 use AnyEvent::Handle;
26 25
27 my $cv = AnyEvent->condvar; 26 my $cv = AnyEvent->condvar;
28 27
29 my $ae_fh = AnyEvent::Handle->new (fh => \*STDIN); 28 my $ae_fh = AnyEvent::Handle->new (fh => \*STDIN);
30 29
31 $ae_fh->on_eof (sub { $cv->broadcast }); 30 #TODO
32
33 $ae_fh->readlines (sub {
34 my ($ae_fh, @lines) = @_;
35 for (@lines) {
36 chomp;
37 print "Line: $_";
38 }
39 });
40 31
41 # or use the constructor to pass the callback: 32 # or use the constructor to pass the callback:
42 33
43 my $ae_fh2 = 34 my $ae_fh2 =
44 AnyEvent::Handle->new ( 35 AnyEvent::Handle->new (
45 fh => \*STDIN, 36 fh => \*STDIN,
46 on_eof => sub { 37 on_eof => sub {
47 $cv->broadcast; 38 $cv->broadcast;
48 }, 39 },
49 on_readline => sub { 40 #TODO
50 my ($ae_fh, @lines) = @_; 41 );
51 for (@lines) { 42
52 chomp; 43 $cv->wait;
53 print "Line: $_"; 44
54 } 45=head1 DESCRIPTION
46
47This module is a helper module to make it easier to do event-based I/O on
48filehandles (and sockets, see L<AnyEvent::Socket> for an easy way to make
49non-blocking resolves and connects).
50
51In the following, when the documentation refers to of "bytes" then this
52means characters. As sysread and syswrite are used for all I/O, their
53treatment of characters applies to this module as well.
54
55All callbacks will be invoked with the handle object as their first
56argument.
57
58=head1 METHODS
59
60=over 4
61
62=item B<new (%args)>
63
64The constructor supports these arguments (all as key => value pairs).
65
66=over 4
67
68=item fh => $filehandle [MANDATORY]
69
70The filehandle this L<AnyEvent::Handle> object will operate on.
71
72NOTE: The filehandle will be set to non-blocking (using
73AnyEvent::Util::fh_nonblocking).
74
75=item on_error => $cb->($self) [MANDATORY]
76
77This is the fatal error callback, that is called when a fatal error ocurs,
78such as not being able to resolve the hostname, failure to connect or a
79read error.
80
81The object will not be in a usable state when this callback has been
82called.
83
84On callback entrance, the value of C<$!> contains the opertaing system
85error (or C<ENOSPC> or C<EPIPE>).
86
87=item on_eof => $cb->($self) [MANDATORY]
88
89Set the callback to be called on EOF.
90
91=item on_read => $cb->($self)
92
93This sets the default read callback, which is called when data arrives
94and no read request is in the queue. If the read callback is C<undef>
95or has never been set, than AnyEvent::Handle will cease reading from the
96filehandle.
97
98To access (and remove data from) the read buffer, use the C<< ->rbuf >>
99method or acces sthe C<$self->{rbuf}> member directly.
100
101When an EOF condition is detected then AnyEvent::Handle will first try to
102feed all the remaining data to the queued callbacks and C<on_read> before
103calling the C<on_eof> callback. If no progress can be made, then a fatal
104error will be raised (with C<$!> set to C<EPIPE>).
105
106=item on_drain => $cb->()
107
108This sets the callback that is called when the write buffer becomes empty
109(or when the callback is set and the buffer is empty already).
110
111To append to the write buffer, use the C<< ->push_write >> method.
112
113=item rbuf_max => <bytes>
114
115If defined, then a fatal error will be raised (with C<$!> set to C<ENOSPC>)
116when the read buffer ever (strictly) exceeds this size. This is useful to
117avoid denial-of-service attacks.
118
119For example, a server accepting connections from untrusted sources should
120be configured to accept only so-and-so much data that it cannot act on
121(for example, when expecting a line, an attacker could send an unlimited
122amount of data without a callback ever being called as long as the line
123isn't finished).
124
125=item read_size => <bytes>
126
127The default read block size (the amount of bytes this module will try to read
128on each [loop iteration). Default: C<4096>.
129
130=item low_water_mark => <bytes>
131
132Sets the amount of bytes (default: C<0>) that make up an "empty" write
133buffer: If the write reaches this size or gets even samller it is
134considered empty.
135
136=back
137
138=cut
139
140sub new {
141 my $class = shift;
142
143 my $self = bless { @_ }, $class;
144
145 $self->{fh} or Carp::croak "mandatory argument fh is missing";
146
147 AnyEvent::Util::fh_nonblocking $self->{fh}, 1;
148
149 $self->on_error ((delete $self->{on_error}) or Carp::croak "mandatory argument on_error is missing");
150 $self->on_eof ((delete $self->{on_eof} ) or Carp::croak "mandatory argument on_eof is missing");
151
152 $self->on_drain (delete $self->{on_drain}) if $self->{on_drain};
153 $self->on_read (delete $self->{on_read} ) if $self->{on_read};
154
155 $self
156}
157
158sub _shutdown {
159 my ($self) = @_;
160
161 delete $self->{rw};
162 delete $self->{ww};
163 delete $self->{fh};
164}
165
166sub error {
167 my ($self) = @_;
168
169 {
170 local $!;
171 $self->_shutdown;
172 }
173
174 $self->{on_error}($self);
175}
176
177=item $fh = $handle->fh
178
179This method returns the filehandle of the L<AnyEvent::Handle> object.
180
181=cut
182
183sub fh { $_[0]->{fh} }
184
185=item $handle->on_error ($cb)
186
187Replace the current C<on_error> callback (see the C<on_error> constructor argument).
188
189=cut
190
191sub on_error {
192 $_[0]{on_error} = $_[1];
193}
194
195=item $handle->on_eof ($cb)
196
197Replace the current C<on_eof> callback (see the C<on_eof> constructor argument).
198
199=cut
200
201sub on_eof {
202 $_[0]{on_eof} = $_[1];
203}
204
205#############################################################################
206
207=back
208
209=head2 WRITE QUEUE
210
211AnyEvent::Handle manages two queues per handle, one for writing and one
212for reading.
213
214The write queue is very simple: you can add data to its end, and
215AnyEvent::Handle will automatically try to get rid of it for you.
216
217When data could be writtena nd the write buffer is shorter then the low
218water mark, the C<on_drain> callback will be invoked.
219
220=over 4
221
222=item $handle->on_drain ($cb)
223
224Sets the C<on_drain> callback or clears it (see the description of
225C<on_drain> in the constructor).
226
227=cut
228
229sub on_drain {
230 my ($self, $cb) = @_;
231
232 $self->{on_drain} = $cb;
233
234 $cb->($self)
235 if $cb && $self->{low_water_mark} >= length $self->{wbuf};
236}
237
238=item $handle->push_write ($data)
239
240Queues the given scalar to be written. You can push as much data as you
241want (only limited by the available memory), as C<AnyEvent::Handle>
242buffers it independently of the kernel.
243
244=cut
245
246sub push_write {
247 my ($self, $data) = @_;
248
249 $self->{wbuf} .= $data;
250
251 unless ($self->{ww}) {
252 Scalar::Util::weaken $self;
253 my $cb = sub {
254 my $len = syswrite $self->{fh}, $self->{wbuf};
255
256 if ($len > 0) {
257 substr $self->{wbuf}, 0, $len, "";
258
259
260 $self->{on_drain}($self)
261 if $self->{low_water_mark} >= length $self->{wbuf}
262 && $self->{on_drain};
263
264 delete $self->{ww} unless length $self->{wbuf};
265 } elsif ($! != EAGAIN && $! != EINTR) {
266 $self->error;
55 } 267 }
268 };
269
270 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $cb);
271
272 $cb->($self);
273 };
274}
275
276#############################################################################
277
278=back
279
280=head2 READ QUEUE
281
282AnyEvent::Handle manages two queues per handle, one for writing and one
283for reading.
284
285The read queue is more complex than the write queue. It can be used in two
286ways, the "simple" way, using only C<on_read> and the "complex" way, using
287a queue.
288
289In the simple case, you just install an C<on_read> callback and whenever
290new data arrives, it will be called. You can then remove some data (if
291enough is there) from the read buffer (C<< $handle->rbuf >>) if you want
292or not.
293
294In the more complex case, you want to queue multiple callbacks. In this
295case, AnyEvent::Handle will call the first queued callback each time new
296data arrives and removes it when it has done its job (see C<push_read>,
297below).
298
299This way you can, for example, push three line-reads, followed by reading
300a chunk of data, and AnyEvent::Handle will execute them in order.
301
302Example 1: EPP protocol parser. EPP sends 4 byte length info, followed by
303the specified number of bytes which give an XML datagram.
304
305 # in the default state, expect some header bytes
306 $handle->on_read (sub {
307 # some data is here, now queue the length-header-read (4 octets)
308 shift->unshift_read_chunk (4, sub {
309 # header arrived, decode
310 my $len = unpack "N", $_[1];
311
312 # now read the payload
313 shift->unshift_read_chunk ($len, sub {
314 my $xml = $_[1];
315 # handle xml
316 });
56 ); 317 });
318 });
57 319
58 $cv->wait; 320Example 2: Implement a client for a protocol that replies either with
321"OK" and another line or "ERROR" for one request, and 64 bytes for the
322second request. Due tot he availability of a full queue, we can just
323pipeline sending both requests and manipulate the queue as necessary in
324the callbacks:
59 325
60=head1 DESCRIPTION 326 # request one
327 $handle->push_write ("request 1\015\012");
61 328
62This module is a helper module to make it easier to do non-blocking I/O 329 # we expect "ERROR" or "OK" as response, so push a line read
63on filehandles (and sockets, see L<AnyEvent::Socket>). 330 $handle->push_read_line (sub {
331 # if we got an "OK", we have to _prepend_ another line,
332 # so it will be read before the second request reads its 64 bytes
333 # which are already in the queue when this callback is called
334 # we don't do this in case we got an error
335 if ($_[1] eq "OK") {
336 $_[0]->unshift_read_line (sub {
337 my $response = $_[1];
338 ...
339 });
340 }
341 });
64 342
65The event loop is provided by L<AnyEvent>. 343 # request two
344 $handle->push_write ("request 2\015\012");
66 345
67=head1 METHODS 346 # simply read 64 bytes, always
347 $handle->push_read_chunk (64, sub {
348 my $response = $_[1];
349 ...
350 });
68 351
69=over 4 352=over 4
70 353
71=item B<new (%args)> 354sub _drain_rbuf {
72
73The constructor has these arguments:
74
75=over 4
76
77=item fh => $filehandle
78
79The filehandle this L<AnyEvent::Handle> object will operate on.
80
81NOTE: The filehandle will be set to non-blocking.
82
83=item read_block_size => $size
84
85The default read block size use for reads via the C<on_read>
86method.
87
88=item on_read => $cb
89
90=item on_eof => $cb
91
92=item on_error => $cb
93
94These are shortcuts, that will call the corresponding method and set the callback to C<$cb>.
95
96=item on_readline => $cb
97
98The C<readlines> method is called with the default separated and C<$cb> as callback
99for you.
100
101=back
102
103=cut
104
105sub new {
106 my $this = shift;
107 my $class = ref($this) || $this;
108 my $self = { 355 my ($self) = @_;
109 read_block_size => 4096,
110 rbuf => '',
111 @_
112 };
113 bless $self, $class;
114 356
115 $self->{fh}->blocking (0) if $self->{fh}; 357 return if exists $self->{in_drain};
358 local $self->{in_drain} = 1;
116 359
117 if ($self->{on_read}) { 360 while (my $len = length $self->{rbuf}) {
118 $self->on_read ($self->{on_read}); 361 no strict 'refs';
119 362 if (@{ $self->{queue} }) {
363 if ($self->{queue}[0]($self)) {
364 shift @{ $self->{queue} };
365 } elsif ($self->{eof}) {
366 # no progress can be made (not enough data and no data forthcoming)
367 $! = &Errno::EPIPE; return $self->error;
368 } else {
369 return;
370 }
120 } elsif ($self->{on_readline}) { 371 } elsif ($self->{on_read}) {
121 $self->readlines ($self->{on_readline}); 372 $self->{on_read}($self);
122 373
123 } elsif ($self->{on_eof}) { 374 if (
124 $self->on_eof ($self->{on_eof}); 375 $self->{eof} # if no further data will arrive
125 376 && $len == length $self->{rbuf} # and no data has been consumed
126 } elsif ($self->{on_error}) { 377 && !@{ $self->{queue} } # and the queue is still empty
127 $self->on_eof ($self->{on_error}); 378 && $self->{on_read} # and we still want to read data
379 ) {
380 # then no progress can be made
381 $! = &Errno::EPIPE; return $self->error;
382 }
383 } else {
384 # read side becomes idle
385 delete $self->{rw};
386 return;
387 }
128 } 388 }
129 389
130 return $self 390 if ($self->{eof}) {
391 $self->_shutdown;
392 $self->{on_eof}($self);
393 }
131} 394}
132 395
133=item B<fh> 396=item $handle->on_read ($cb)
134 397
135This method returns the filehandle of the L<AnyEvent::Handle> object. 398This replaces the currently set C<on_read> callback, or clears it (when
136 399the new callback is C<undef>). See the description of C<on_read> in the
137=cut 400constructor.
138
139sub fh { $_[0]->{fh} }
140
141=item B<on_read ($callback)>
142
143This method installs a C<$callback> that will be called
144when new data arrived. You can access the read buffer via the C<rbuf>
145method (see below).
146
147The first argument of the C<$callback> will be the L<AnyEvent::Handle> object.
148 401
149=cut 402=cut
150 403
151sub on_read { 404sub on_read {
152 my ($self, $cb) = @_; 405 my ($self, $cb) = @_;
406
153 $self->{on_read} = $cb; 407 $self->{on_read} = $cb;
154 408
155 unless (defined $self->{on_read}) { 409 unless ($self->{rw} || $self->{eof}) {
410 Scalar::Util::weaken $self;
411
412 $self->{rw} = AnyEvent->io (fh => $self->{fh}, poll => "r", cb => sub {
413 my $len = sysread $self->{fh}, $self->{rbuf}, $self->{read_size} || 8192, length $self->{rbuf};
414
415 if ($len > 0) {
416 if (exists $self->{rbuf_max}) {
417 if ($self->{rbuf_max} < length $self->{rbuf}) {
418 $! = &Errno::ENOSPC; return $self->error;
419 }
420 }
421
422 } elsif (defined $len) {
423 $self->{eof} = 1;
156 delete $self->{on_read_w}; 424 delete $self->{rw};
157 return; 425
426 } elsif ($! != EAGAIN && $! != EINTR) {
427 return $self->error;
428 }
429
430 $self->_drain_rbuf;
431 });
158 } 432 }
159
160 $self->{on_read_w} =
161 AnyEvent->io (poll => 'r', fh => $self->{fh}, cb => sub {
162 #d# warn "READ:[$self->{read_size}] $self->{read_block_size} : ".length ($self->{rbuf})."\n";
163 my $rbuf_len = length $self->{rbuf};
164 my $l;
165 if (defined $self->{read_size}) {
166 $l = sysread $self->{fh}, $self->{rbuf},
167 ($self->{read_size} - $rbuf_len), $rbuf_len;
168 } else {
169 $l = sysread $self->{fh}, $self->{rbuf}, $self->{read_block_size}, $rbuf_len;
170 }
171 #d# warn "READL $l [$self->{rbuf}]\n";
172
173 if (not defined $l) {
174 return if $! == EAGAIN || $! == EINTR;
175 $self->{on_error}->($self) if $self->{on_error};
176 delete $self->{on_read_w};
177
178 } elsif ($l == 0) {
179 $self->{on_eof}->($self) if $self->{on_eof};
180 delete $self->{on_read_w};
181
182 } else {
183 $self->{on_read}->($self);
184 }
185 });
186} 433}
187 434
188=item B<on_error ($callback)> 435=item $handle->rbuf
189 436
190Whenever a read or write operation resulted in an error the C<$callback> 437Returns the read buffer (as a modifiable lvalue).
191will be called.
192 438
193The first argument of C<$callback> will be the L<AnyEvent::Handle> object itself. 439You can access the read buffer directly as the C<< ->{rbuf} >> member, if
194The error is given as errno in C<$!>. 440you want.
195 441
196=cut
197
198sub on_error {
199 $_[0]->{on_error} = $_[1];
200}
201
202=item B<on_eof ($callback)>
203
204Installs the C<$callback> that will be called when the end of file is
205encountered in a read operation this C<$callback> will be called. The first
206argument will be the L<AnyEvent::Handle> object itself.
207
208=cut
209
210sub on_eof {
211 $_[0]->{on_eof} = $_[1];
212}
213
214=item B<rbuf>
215
216Returns a reference to the read buffer.
217
218NOTE: The read buffer should only be used or modified if the C<on_read> 442NOTE: The read buffer should only be used or modified if the C<on_read>,
219method is used directly. The C<read> and C<readlines> methods will provide 443C<push_read> or C<unshift_read> methods are used. The other read methods
220the read data to their callbacks. 444automatically manage the read buffer.
221 445
222=cut 446=cut
223 447
224sub rbuf : lvalue { 448sub rbuf : lvalue {
225 $_[0]->{rbuf} 449 $_[0]{rbuf}
226} 450}
227 451
228=item B<read ($len, $callback)> 452=item $handle->push_read ($cb)
229 453
230Will read exactly C<$len> bytes from the filehandle and call the C<$callback> 454=item $handle->unshift_read ($cb)
231if done so. The first argument to the C<$callback> will be the L<AnyEvent::Handle>
232object itself and the second argument the read data.
233 455
234NOTE: This method will override any callbacks installed via the C<on_read> method. 456Append the given callback to the end of the queue (C<push_read>) or
457prepend it (C<unshift_read>).
235 458
236=cut 459The callback is called each time some additional read data arrives.
237 460
461It must check wether enough data is in the read buffer already.
462
463If not enough data is available, it must return the empty list or a false
464value, in which case it will be called repeatedly until enough data is
465available (or an error condition is detected).
466
467If enough data was available, then the callback must remove all data it is
468interested in (which can be none at all) and return a true value. After returning
469true, it will be removed from the queue.
470
471=cut
472
238sub read { 473sub push_read {
474 my ($self, $cb) = @_;
475
476 push @{ $self->{queue} }, $cb;
477 $self->_drain_rbuf;
478}
479
480sub unshift_read {
481 my ($self, $cb) = @_;
482
483 push @{ $self->{queue} }, $cb;
484 $self->_drain_rbuf;
485}
486
487=item $handle->push_read_chunk ($len, $cb->($self, $data))
488
489=item $handle->unshift_read_chunk ($len, $cb->($self, $data))
490
491Append the given callback to the end of the queue (C<push_read_chunk>) or
492prepend it (C<unshift_read_chunk>).
493
494The callback will be called only once C<$len> bytes have been read, and
495these C<$len> bytes will be passed to the callback.
496
497=cut
498
499sub _read_chunk($$) {
500 my ($len, $cb) = @_;
501
502 sub {
503 $len <= length $_[0]{rbuf} or return;
504 $cb->($_[0], substr $_[0]{rbuf}, 0, $len, "");
505 1
506 }
507}
508
509sub push_read_chunk {
239 my ($self, $len, $cb) = @_; 510 my ($self, $len, $cb) = @_;
240 511
241 $self->{read_cb} = $cb; 512 $self->push_read (_read_chunk $len, $cb);
242 my $old_blk_size = $self->{read_block_size};
243 $self->{read_block_size} = $len;
244
245 $self->on_read (sub {
246 #d# warn "OFOFO $len || ".length($_[0]->{rbuf})."||\n";
247
248 if ($len == length $_[0]->{rbuf}) {
249 $_[0]->{read_block_size} = $old_blk_size;
250 $_[0]->on_read (undef);
251 $_[0]->{read_cb}->($_[0], (substr $self->{rbuf}, 0, $len, ''));
252 }
253 });
254} 513}
255 514
256=item B<readlines ($callback)>
257 515
258=item B<readlines ($sep, $callback)> 516sub unshift_read_chunk {
259
260This method will read lines from the filehandle, separated by C<$sep> or C<"\n">
261if C<$sep> is not provided. C<$sep> will be used as "line" separated.
262
263The C<$callback> will be called when at least one
264line could be read. The first argument to the C<$callback> will be the L<AnyEvent::Handle>
265object itself and the rest of the arguments will be the read lines.
266
267NOTE: This method will override any callbacks installed via the C<on_read> method.
268
269=cut
270
271sub readlines {
272 my ($self, $sep, $cb) = @_; 517 my ($self, $len, $cb) = @_;
273 518
274 if (ref $sep) { 519 $self->unshift_read (_read_chunk $len, $cb);
275 $cb = $sep; 520}
276 $sep = "\n";
277 521
278 } elsif (not defined $sep) { 522=item $handle->push_read_line ([$eol, ]$cb->($self, $line, $eol))
279 $sep = "\n"; 523
524=item $handle->unshift_read_line ([$eol, ]$cb->($self, $line, $eol))
525
526Append the given callback to the end of the queue (C<push_read_line>) or
527prepend it (C<unshift_read_line>).
528
529The callback will be called only once a full line (including the end of
530line marker, C<$eol>) has been read. This line (excluding the end of line
531marker) will be passed to the callback as second argument (C<$line>), and
532the end of line marker as the third argument (C<$eol>).
533
534The end of line marker, C<$eol>, can be either a string, in which case it
535will be interpreted as a fixed record end marker, or it can be a regex
536object (e.g. created by C<qr>), in which case it is interpreted as a
537regular expression.
538
539The end of line marker argument C<$eol> is optional, if it is missing (NOT
540undef), then C<qr|\015?\012|> is used (which is good for most internet
541protocols).
542
543Partial lines at the end of the stream will never be returned, as they are
544not marked by the end of line marker.
545
546=cut
547
548sub _read_line($$) {
549 my $cb = pop;
550 my $eol = @_ ? shift : qr|(\015?\012)|;
551 my $pos;
552
553 $eol = qr|(\Q$eol\E)| unless ref $eol;
554 $eol = qr|^(.*?)($eol)|;
555
556 sub {
557 $_[0]{rbuf} =~ s/$eol// or return;
558
559 $cb->($1, $2);
560 1
280 } 561 }
281
282 my $sep_len = length $sep;
283
284 $self->{on_readline} = $cb;
285
286 $self->on_read (sub {
287 my @lines;
288 my $rb = \$_[0]->{rbuf};
289 my $pos;
290 while (($pos = index ($$rb, $sep)) >= 0) {
291 push @lines, substr $$rb, 0, $pos + $sep_len, '';
292 }
293 $self->{on_readline}->($_[0], @lines);
294 });
295} 562}
296 563
297=item B<write ($data)> 564sub push_read_line {
298
299=item B<write ($callback)>
300
301=item B<write ($data, $callback)>
302
303This method will write C<$data> to the filehandle and call the C<$callback>
304afterwards. If only C<$callback> is provided it will be called when the
305write buffer becomes empty the next time (or immediately if it already is empty).
306
307=cut
308
309sub write {
310 my ($self, $data, $cb) = @_;
311 if (ref $data) { $cb = $data; undef $data }
312 push @{$self->{write_bufs}}, [$data, $cb];
313 $self->_check_writer;
314}
315
316sub _check_writer {
317 my ($self) = @_; 565 my $self = shift;
318 566
319 if ($self->{write_w}) { 567 $self->push_read (&_read_line);
320 unless ($self->{write_cb}) { 568}
321 while (@{$self->{write_bufs}} && not defined $self->{write_bufs}->[0]->[1]) {
322 my $wba = shift @{$self->{write_bufs}};
323 $self->{wbuf} .= $wba->[0];
324 }
325 }
326 return;
327 }
328 569
329 my $wba = shift @{$self->{write_bufs}} 570sub unshift_read_line {
330 or return; 571 my $self = shift;
331 572
332 unless (defined $wba->[0]) { 573 $self->unshift_read (&_read_line);
333 $wba->[1]->($self) if $wba->[1];
334 $self->_check_writer;
335 return;
336 }
337
338 $self->{wbuf} = $wba->[0];
339 $self->{write_cb} = $wba->[1];
340
341 $self->{write_w} =
342 AnyEvent->io (poll => 'w', fh => $self->{fh}, cb => sub {
343 my $l = syswrite $self->{fh}, $self->{wbuf}, length $self->{wbuf};
344
345 if (not defined $l) {
346 return if $! == EAGAIN || $! == EINTR;
347 delete $self->{write_w};
348 $self->{on_error}->($self) if $self->{on_error};
349
350 } else {
351 substr $self->{wbuf}, 0, $l, '';
352
353 if (length ($self->{wbuf}) == 0) {
354 $self->{write_cb}->($self) if $self->{write_cb};
355
356 delete $self->{write_w};
357 delete $self->{wbuf};
358 delete $self->{write_cb};
359
360 $self->_check_writer;
361 }
362 }
363 });
364} 574}
365 575
366=back 576=back
367 577
368=head1 AUTHOR 578=head1 AUTHOR
369 579
370Robin Redeker, C<< <elmex at ta-sa.org> >> 580Robin Redeker C<< <elmex at ta-sa.org> >>, Marc Lehmann <schmorp@schmorp.de>.
371 581
372=cut 582=cut
373 583
3741; # End of AnyEvent::Handle 5841; # End of AnyEvent::Handle

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines