ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Handle.pm
Revision: 1.7
Committed: Thu May 1 16:35:40 2008 UTC (16 years, 1 month ago) by root
Branch: MAIN
Changes since 1.6: +3 -3 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 package AnyEvent::Handle;
2
3 no warnings;
4 use strict;
5
6 use AnyEvent;
7 use IO::Handle;
8 use Errno qw/EAGAIN EINTR/;
9
10 =head1 NAME
11
12 AnyEvent::Handle - non-blocking I/O on filehandles via AnyEvent
13
14 =head1 VERSION
15
16 Version 0.01
17
18 =cut
19
20 our $VERSION = '0.01';
21
22 =head1 SYNOPSIS
23
24 use AnyEvent;
25 use AnyEvent::Handle;
26
27 my $cv = AnyEvent->condvar;
28
29 my $ae_fh = AnyEvent::Handle->new (fh => \*STDIN);
30
31 $ae_fh->on_eof (sub { $cv->broadcast });
32
33 $ae_fh->readlines (sub {
34 my ($ae_fh, @lines) = @_;
35 for (@lines) {
36 chomp;
37 print "Line: $_";
38 }
39 });
40
41 # or use the constructor to pass the callback:
42
43 my $ae_fh2 =
44 AnyEvent::Handle->new (
45 fh => \*STDIN,
46 on_eof => sub {
47 $cv->broadcast;
48 },
49 on_readline => sub {
50 my ($ae_fh, @lines) = @_;
51 for (@lines) {
52 chomp;
53 print "Line: $_";
54 }
55 }
56 );
57
58 $cv->wait;
59
60 =head1 DESCRIPTION
61
62 This module is a helper module to make it easier to do non-blocking I/O
63 on filehandles (and sockets, see L<AnyEvent::Socket>).
64
65 The event loop is provided by L<AnyEvent>.
66
67 =head1 METHODS
68
69 =over 4
70
71 =item B<new (%args)>
72
73 The constructor has these arguments:
74
75 =over 4
76
77 =item fh => $filehandle
78
79 The filehandle this L<AnyEvent::Handle> object will operate on.
80
81 NOTE: The filehandle will be set to non-blocking.
82
83 =item read_block_size => $size
84
85 The default read block size use for reads via the C<on_read>
86 method.
87
88 =item on_read => $cb
89
90 =item on_eof => $cb
91
92 =item on_error => $cb
93
94 These are shortcuts, that will call the corresponding method and set the callback to C<$cb>.
95
96 =item on_readline => $cb
97
98 The C<readlines> method is called with the default separated and C<$cb> as callback
99 for you.
100
101 =back
102
103 =cut
104
105 sub new {
106 my $this = shift;
107 my $class = ref($this) || $this;
108 my $self = {
109 read_block_size => 4096,
110 rbuf => '',
111 @_
112 };
113 bless $self, $class;
114
115 $self->{fh}->blocking (0) if $self->{fh};
116
117 if ($self->{on_read}) {
118 $self->on_read ($self->{on_read});
119
120 } elsif ($self->{on_readline}) {
121 $self->readlines ($self->{on_readline});
122
123 } elsif ($self->{on_eof}) {
124 $self->on_eof ($self->{on_eof});
125
126 } elsif ($self->{on_error}) {
127 $self->on_eof ($self->{on_error});
128 }
129
130 return $self
131 }
132
133 =item B<fh>
134
135 This method returns the filehandle of the L<AnyEvent::Handle> object.
136
137 =cut
138
139 sub fh { $_[0]->{fh} }
140
141 =item B<on_read ($callback)>
142
143 This method installs a C<$callback> that will be called
144 when new data arrived. You can access the read buffer via the C<rbuf>
145 method (see below).
146
147 The first argument of the C<$callback> will be the L<AnyEvent::Handle> object.
148
149 =cut
150
151 sub on_read {
152 my ($self, $cb) = @_;
153 $self->{on_read} = $cb;
154
155 unless (defined $self->{on_read}) {
156 delete $self->{on_read_w};
157 return;
158 }
159
160 $self->{on_read_w} =
161 AnyEvent->io (poll => 'r', fh => $self->{fh}, cb => sub {
162 #d# warn "READ:[$self->{read_size}] $self->{read_block_size} : ".length ($self->{rbuf})."\n";
163 my $rbuf_len = length $self->{rbuf};
164 my $l;
165 if (defined $self->{read_size}) {
166 $l = sysread $self->{fh}, $self->{rbuf},
167 ($self->{read_size} - $rbuf_len), $rbuf_len;
168 } else {
169 $l = sysread $self->{fh}, $self->{rbuf}, $self->{read_block_size}, $rbuf_len;
170 }
171 #d# warn "READL $l [$self->{rbuf}]\n";
172
173 if (not defined $l) {
174 return if $! == EAGAIN || $! == EINTR;
175 $self->{on_error}->($self) if $self->{on_error};
176 delete $self->{on_read_w};
177
178 } elsif ($l == 0) {
179 $self->{on_eof}->($self) if $self->{on_eof};
180 delete $self->{on_read_w};
181
182 } else {
183 $self->{on_read}->($self);
184 }
185 });
186 }
187
188 =item B<on_error ($callback)>
189
190 Whenever a read or write operation resulted in an error the C<$callback>
191 will be called.
192
193 The first argument of C<$callback> will be the L<AnyEvent::Handle> object itself.
194 The error is given as errno in C<$!>.
195
196 =cut
197
198 sub on_error {
199 $_[0]->{on_error} = $_[1];
200 }
201
202 =item B<on_eof ($callback)>
203
204 Installs the C<$callback> that will be called when the end of file is
205 encountered in a read operation this C<$callback> will be called. The first
206 argument will be the L<AnyEvent::Handle> object itself.
207
208 =cut
209
210 sub on_eof {
211 $_[0]->{on_eof} = $_[1];
212 }
213
214 =item B<rbuf>
215
216 Returns a reference to the read buffer.
217
218 NOTE: The read buffer should only be used or modified if the C<on_read>
219 method is used directly. The C<read> and C<readlines> methods will provide
220 the read data to their callbacks.
221
222 =cut
223
224 sub rbuf : lvalue {
225 $_[0]->{rbuf}
226 }
227
228 =item B<read ($len, $callback)>
229
230 Will read exactly C<$len> bytes from the filehandle and call the C<$callback>
231 if done so. The first argument to the C<$callback> will be the L<AnyEvent::Handle>
232 object itself and the second argument the read data.
233
234 NOTE: This method will override any callbacks installed via the C<on_read> method.
235
236 =cut
237
238 sub read {
239 my ($self, $len, $cb) = @_;
240
241 $self->{read_cb} = $cb;
242 my $old_blk_size = $self->{read_block_size};
243 $self->{read_block_size} = $len;
244
245 $self->on_read (sub {
246 #d# warn "OFOFO $len || ".length($_[0]->{rbuf})."||\n";
247
248 if ($len == length $_[0]->{rbuf}) {
249 $_[0]->{read_block_size} = $old_blk_size;
250 $_[0]->on_read (undef);
251 $_[0]->{read_cb}->($_[0], (substr $self->{rbuf}, 0, $len, ''));
252 }
253 });
254 }
255
256 =item B<readlines ($callback)>
257
258 =item B<readlines ($sep, $callback)>
259
260 This method will read lines from the filehandle, separated by C<$sep> or C<"\n">
261 if C<$sep> is not provided. C<$sep> will be used as "line" separated.
262
263 The C<$callback> will be called when at least one
264 line could be read. The first argument to the C<$callback> will be the L<AnyEvent::Handle>
265 object itself and the rest of the arguments will be the read lines.
266
267 NOTE: This method will override any callbacks installed via the C<on_read> method.
268
269 =cut
270
271 sub readlines {
272 my ($self, $sep, $cb) = @_;
273
274 if (ref $sep) {
275 $cb = $sep;
276 $sep = "\n";
277
278 } elsif (not defined $sep) {
279 $sep = "\n";
280 }
281
282 my $sep_len = length $sep;
283
284 $self->{on_readline} = $cb;
285
286 $self->on_read (sub {
287 my @lines;
288 my $rb = \$_[0]->{rbuf};
289 my $pos;
290 while (($pos = index ($$rb, $sep)) >= 0) {
291 push @lines, substr $$rb, 0, $pos + $sep_len, '';
292 }
293 $self->{on_readline}->($_[0], @lines);
294 });
295 }
296
297 =item B<write ($data)>
298
299 =item B<write ($callback)>
300
301 =item B<write ($data, $callback)>
302
303 This method will write C<$data> to the filehandle and call the C<$callback>
304 afterwards. If only C<$callback> is provided it will be called when the
305 write buffer becomes empty the next time (or immediately if it already is empty).
306
307 =cut
308
309 sub write {
310 my ($self, $data, $cb) = @_;
311 if (ref $data) { $cb = $data; undef $data }
312 push @{$self->{write_bufs}}, [$data, $cb];
313 $self->_check_writer;
314 }
315
316 sub _check_writer {
317 my ($self) = @_;
318
319 if ($self->{write_w}) {
320 unless ($self->{write_cb}) {
321 while (@{$self->{write_bufs}} && not defined $self->{write_bufs}->[0]->[1]) {
322 my $wba = shift @{$self->{write_bufs}};
323 $self->{wbuf} .= $wba->[0];
324 }
325 }
326 return;
327 }
328
329 my $wba = shift @{$self->{write_bufs}}
330 or return;
331
332 unless (defined $wba->[0]) {
333 $wba->[1]->($self) if $wba->[1];
334 $self->_check_writer;
335 return;
336 }
337
338 $self->{wbuf} = $wba->[0];
339 $self->{write_cb} = $wba->[1];
340
341 $self->{write_w} =
342 AnyEvent->io (poll => 'w', fh => $self->{fh}, cb => sub {
343 my $l = syswrite $self->{fh}, $self->{wbuf}, length $self->{wbuf};
344
345 if (not defined $l) {
346 return if $! == EAGAIN || $! == EINTR;
347 delete $self->{write_w};
348 $self->{on_error}->($self) if $self->{on_error};
349
350 } else {
351 substr $self->{wbuf}, 0, $l, '';
352
353 if (length ($self->{wbuf}) == 0) {
354 $self->{write_cb}->($self) if $self->{write_cb};
355
356 delete $self->{write_w};
357 delete $self->{wbuf};
358 delete $self->{write_cb};
359
360 $self->_check_writer;
361 }
362 }
363 });
364 }
365
366 =back
367
368 =head1 AUTHOR
369
370 Robin Redeker, C<< <elmex at ta-sa.org> >>
371
372 =cut
373
374 1; # End of AnyEvent::Handle