ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.12 by root, Sat Aug 8 04:02:48 2015 UTC vs.
Revision 1.23 by root, Sun Jun 12 04:39:41 2016 UTC

61 61
62use common::sense; 62use common::sense;
63 63
64use Carp; 64use Carp;
65 65
66our $VERSION = '0.3'; 66our $VERSION = 0.5;
67 67
68use Scalar::Util (); 68use Scalar::Util ();
69 69
70use AnyEvent; 70use AnyEvent;
71use AnyEvent::Handle; 71use AnyEvent::Handle;
72use AnyEvent::Util (); 72use AnyEvent::Util ();
73
74our %TOLC; # tolc cache
73 75
74sub touc($) { 76sub touc($) {
75 local $_ = shift; 77 local $_ = shift;
76 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/; 78 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
77 s/(?:^|_)(.)/\U$1/g; 79 s/(?:^|_)(.)/\U$1/g;
84 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/; 86 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/;
85 s/(?<=[a-z])(?=[A-Z])/_/g; 87 s/(?<=[a-z])(?=[A-Z])/_/g;
86 lc 88 lc
87} 89}
88 90
89=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, name => $name] 91=item $fcp = new AnyEvent::FCP key => value...;
90 92
91Create a new FCP connection to the given host and port (default 93Create a new FCP connection to the given host and port (default
92127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). 94127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
93 95
94If no C<name> was specified, then AnyEvent::FCP will generate a 96If no C<name> was specified, then AnyEvent::FCP will generate a
95(hopefully) unique client name for you. 97(hopefully) unique client name for you.
96 98
99The following keys can be specified (they are all optional):
100
101=over 4
102
103=item name => $string
104
105A unique name to identify this client. If none is specified, a randomly
106generated name will be used.
107
108=item host => $hostname
109
110The hostname or IP address of the freenet node. Default is C<$ENV{FREDHOST}>
111or C<127.0.0.1>.
112
113=item port => $portnumber
114
115The port number of the FCP port. Default is C<$ENV{FREDPORT}> or C<9481>.
116
117=item timeout => $seconds
118
119The timeout, in seconds, after which a connection error is assumed when
120there is no activity. Default is C<7200>, i.e. two hours.
121
122=item keepalive => $seconds
123
124The interval, in seconds, at which keepalive messages will be
125sent. Default is C<540>, i.e. nine minutes.
126
127These keepalive messages are useful both to detect that a connection is
128no longer working and to keep any (home) routers from expiring their
129masquerading entry.
130
131=item on_eof => $callback->($fcp)
132
133Invoked when the underlying L<AnyEvent::Handle> signals EOF, currently
134regardless of whether the EOF was expected or not.
135
136=item on_error => $callback->($fcp, $message)
137
138Invoked on any (fatal) errors, such as unexpected connection close. The
139callback receives the FCP object and a textual error message.
140
141=item on_failure => $callback->($fcp, $type, $backtrace, $args, $error)
142
143Invoked when an FCP request fails that didn't have a failure callback. See
144L<FCP REQUESTS> for details.
145
146=back
147
97=cut 148=cut
98 149
99sub new { 150sub new {
100 my $class = shift; 151 my $class = shift;
152
153 my $rand = join "", map chr 0x21 + rand 94, 1..40; # ~ 262 bits entropy
154
101 my $self = bless { 155 my $self = bless {
102 host => $ENV{FREDHOST} || "127.0.0.1", 156 host => $ENV{FREDHOST} || "127.0.0.1",
103 port => $ENV{FREDPORT} || 9481, 157 port => $ENV{FREDPORT} || 9481,
104 timeout => 3600 * 2, 158 timeout => 3600 * 2,
159 keepalive => 9 * 60,
105 name => time.rand.rand.rand, # lame 160 name => time.rand.rand.rand, # lame
106 @_, 161 @_,
107 queue => [], 162 queue => [],
108 req => {}, 163 req => {},
164 prefix => "..:aefcpid:$rand:",
109 id => "a0", 165 idseq => "a0",
110 }, $class; 166 }, $class;
111 167
112 { 168 {
113 Scalar::Util::weaken (my $self = $self); 169 Scalar::Util::weaken (my $self = $self);
170
171 $self->{kw} = AE::timer $self->{keepalive}, $self->{keepalive}, sub {
172 $self->{hdl}->push_write ("\n");
173 };
174
175 our $ENDMESSAGE = qr<\012(EndMessage|Data)\012>;
176
177 # these are declared here for performance reasons
178 my ($k, $v, $type);
179 my $rdata;
180
181 my $on_read = sub {
182 my ($hdl) = @_;
183
184 # we only carve out whole messages here
185 while ($hdl->{rbuf} =~ /\012(EndMessage|Data)\012/) {
186 # remember end marker
187 $rdata = $1 eq "Data"
188 or $1 eq "EndMessage"
189 or return $self->fatal ("protocol error, expected message end, got $1\n");
190
191 my @lines = split /\012/, substr $hdl->{rbuf}, 0, $-[0];
192
193 substr $hdl->{rbuf}, 0, $+[0], ""; # remove pkg
194
195 $type = shift @lines;
196 $type = ($TOLC{$type} ||= tolc $type);
197
198 my %kv;
199
200 for (@lines) {
201 ($k, $v) = split /=/, $_, 2;
202 $k = ($TOLC{$k} ||= tolc $k);
203
204 if ($k =~ /\./) {
205 # generic, slow case
206 my @k = split /\./, $k;
207 my $ro = \\%kv;
208
209 while (@k) {
210 $k = shift @k;
211 if ($k =~ /^\d+$/) {
212 $ro = \$$ro->[$k];
213 } else {
214 $ro = \$$ro->{$k};
215 }
216 }
217
218 $$ro = $v;
219
220 next;
221 }
222
223 # special comon case, for performance only
224 $kv{$k} = $v;
225 }
226
227 if ($rdata) {
228 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
229 $rdata = \$_[1];
230 $self->recv ($type, \%kv, $rdata);
231 });
232
233 last; # do not tgry to parse more messages
234 } else {
235 $self->recv ($type, \%kv);
236 }
237 }
238 };
114 239
115 $self->{hdl} = new AnyEvent::Handle 240 $self->{hdl} = new AnyEvent::Handle
116 connect => [$self->{host} => $self->{port}], 241 connect => [$self->{host} => $self->{port}],
117 timeout => $self->{timeout}, 242 timeout => $self->{timeout},
243 on_read => $on_read,
244 on_eof => sub {
245 if ($self->{on_eof}) {
246 $self->{on_eof}($self);
247 } else {
248 $self->fatal ("EOF");
249 }
250 },
118 on_error => sub { 251 on_error => sub {
119 warn "@_\n";#d# 252 $self->fatal ($_[2]);
120 exit 1;
121 }, 253 },
122 on_read => sub { $self->on_read (@_) }, 254 ;
123 on_eof => $self->{on_eof} || sub { };
124 255
125 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 256 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
126 } 257 }
127 258
128 $self->send_msg (client_hello => 259 $self->send_msg (client_hello =>
131 ); 262 );
132 263
133 $self 264 $self
134} 265}
135 266
267sub fatal {
268 my ($self, $msg) = @_;
269
270 $self->{hdl}->shutdown;
271 delete $self->{kw};
272
273 if ($self->{on_error}) {
274 $self->{on_error}->($self, $msg);
275 } else {
276 die $msg;
277 }
278}
279
280sub identifier {
281 $_[0]{prefix} . ++$_[0]{idseq}
282}
283
136sub send_msg { 284sub send_msg {
137 my ($self, $type, %kv) = @_; 285 my ($self, $type, %kv) = @_;
138 286
139 my $data = delete $kv{data}; 287 my $data = delete $kv{data};
140 288
141 if (exists $kv{id_cb}) { 289 if (exists $kv{id_cb}) {
142 my $id = $kv{identifier} ||= ++$self->{id}; 290 my $id = $kv{identifier} ||= $self->identifier;
143 $self->{id}{$id} = delete $kv{id_cb}; 291 $self->{id}{$id} = delete $kv{id_cb};
144 } 292 }
145 293
146 my $msg = (touc $type) . "\012" 294 my $msg = (touc $type) . "\012"
147 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; 295 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
224 372
225 if (my $cb = $PERSISTENT_TYPE{$type}) { 373 if (my $cb = $PERSISTENT_TYPE{$type}) {
226 my $id = $kv->{identifier}; 374 my $id = $kv->{identifier};
227 my $req = $_[0]{req}{$id} ||= {}; 375 my $req = $_[0]{req}{$id} ||= {};
228 $cb->($self, $req, $kv); 376 $cb->($self, $req, $kv);
229 $self->recv (request_change => $kv, $type, @extra); 377 $self->recv (request_changed => $kv, $type, @extra);
230 } 378 }
231 379
232 my $on = $self->{on}; 380 my $on = $self->{on};
233 for (0 .. $#$on) { 381 for (0 .. $#$on) {
234 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) { 382 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
243 } else { 391 } else {
244 $self->default_recv ($type, $kv, @extra); 392 $self->default_recv ($type, $kv, @extra);
245 } 393 }
246} 394}
247 395
248sub on_read {
249 my ($self) = @_;
250
251 my $type;
252 my %kv;
253 my $rdata;
254
255 my $hdr_cb; $hdr_cb = sub {
256 if ($_[1] =~ /^([^=]+)=(.*)$/) {
257 my ($k, $v) = ($1, $2);
258 my @k = split /\./, tolc $k;
259 my $ro = \\%kv;
260
261 while (@k) {
262 my $k = shift @k;
263 if ($k =~ /^\d+$/) {
264 $ro = \$$ro->[$k];
265 } else {
266 $ro = \$$ro->{$k};
267 }
268 }
269
270 $$ro = $v;
271
272 $_[0]->push_read (line => $hdr_cb);
273 } elsif ($_[1] eq "Data") {
274 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
275 $rdata = \$_[1];
276 $self->recv ($type, \%kv, $rdata);
277 });
278 } elsif ($_[1] eq "EndMessage") {
279 $self->recv ($type, \%kv);
280 } else {
281 die "protocol error, expected message end, got $_[1]\n";#d#
282 }
283 };
284
285 $self->{hdl}->push_read (line => sub {
286 $type = tolc $_[1];
287 $_[0]->push_read (line => $hdr_cb);
288 });
289}
290
291sub default_recv { 396sub default_recv {
292 my ($self, $type, $kv, $rdata) = @_; 397 my ($self, $type, $kv, $rdata) = @_;
293 398
294 if ($type eq "node_hello") { 399 if ($type eq "node_hello") {
295 $self->{node_hello} = $kv; 400 $self->{node_hello} = $kv;
320 425
321Also comes in this underscore variant: 426Also comes in this underscore variant:
322 427
323 $fcp->get_plugin_info_ ($name, $detailed, $cb); 428 $fcp->get_plugin_info_ ($name, $detailed, $cb);
324 429
325You can thinbk of the underscore as a kind of continuation indicator - the 430You can think of the underscore as a kind of continuation indicator - the
326normal function waits and returns with the data, the C<_> indicates that 431normal function waits and returns with the data, the C<_> indicates that
327you pass the continuation yourself, and the continuation will be invoked 432you pass the continuation yourself, and the continuation will be invoked
328with the results. 433with the results.
329 434
330This callback/continuation argument (C<$cb>) can come in three forms itself: 435This callback/continuation argument (C<$cb>) can come in three forms itself:
332=over 4 437=over 4
333 438
334=item A code reference (or rather anything not matching some other alternative) 439=item A code reference (or rather anything not matching some other alternative)
335 440
336This code reference will be invoked with the result on success. On an 441This code reference will be invoked with the result on success. On an
442error, it will invoke the C<on_failure> callback of the FCP object, or,
337error, it will die (in the event loop) with a backtrace of the call site. 443if none was defined, will die (in the event loop) with a backtrace of the
444call site.
338 445
339This is a popular choice, but it makes handling errors hard - make sure 446This is a popular choice, but it makes handling errors hard - make sure
340you never generate protocol errors! 447you never generate protocol errors!
448
449In the failure case, if an C<on_failure> hook exists, it will be invoked
450with the FCP object, the request type (the name of the method), a
451(textual) backtrace as generated by C<Carp::longmess>, and arrayref
452containing the arguments from the original request invocation and the
453error object from the server, in this order, e.g.:
454
455 on_failure => sub {
456 my ($fcp, $request_type, $backtrace, $orig_args, $error_object) = @_;
457
458 warn "FCP failure ($type), $error_object->{code_description} ($error_object->{extra_description})$backtrace";
459 exit 1;
460 },
341 461
342=item A condvar (as returned by e.g. C<< AnyEvent->condvar >>) 462=item A condvar (as returned by e.g. C<< AnyEvent->condvar >>)
343 463
344When a condvar is passed, it is sent (C<< $cv->send ($results) >>) the 464When a condvar is passed, it is sent (C<< $cv->send ($results) >>) the
345results when the request has finished. Should an error occur, the error 465results when the request has finished. Should an error occur, the error
350=item An array with two callbacks C<[$success, $failure]> 470=item An array with two callbacks C<[$success, $failure]>
351 471
352The C<$success> callback will be invoked with the results, while the 472The C<$success> callback will be invoked with the results, while the
353C<$failure> callback will be invoked on any errors. 473C<$failure> callback will be invoked on any errors.
354 474
475The C<$failure> callback will be invoked with the error object from the
476server.
477
355=item C<undef> 478=item C<undef>
356 479
357This is the same thing as specifying C<sub { }> as callback, i.e. on 480This is the same thing as specifying C<sub { }> as callback, i.e. on
358success, the results are ignored, while on failure, you the module dies 481success, the results are ignored, while on failure, the C<on_failure> hook
359with a backtrace. 482is invoked or the module dies with a backtrace.
360 483
361This is good for quick scripts, or when you really aren't interested in 484This is good for quick scripts, or when you really aren't interested in
362the results. 485the results.
363 486
364=back 487=back
371 my ($name, $sub) = @_; 494 my ($name, $sub) = @_;
372 495
373 *{$name} = sub { 496 *{$name} = sub {
374 my $cv = AE::cv; 497 my $cv = AE::cv;
375 498
376 splice @_, 1, 0, $cv, sub { $cv->throw ($_[0]{extra_description}) }; 499 splice @_, 1, 0, $cv, sub { $cv->croak ($_[0]{extra_description}) };
377 &$sub; 500 &$sub;
378 $cv->recv 501 $cv->recv
379 }; 502 };
380 503
381 *{"$name\_"} = sub { 504 *{"$name\_"} = sub {
382 my ($ok, $err) = pop; 505 my ($ok, $err) = pop;
383 506
384 if (ARRAY:: eq ref $ok) { 507 if (ARRAY:: eq ref $ok) {
385 ($ok, $err) = @$ok; 508 ($ok, $err) = @$ok;
386 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) { 509 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
387 $err = sub { $ok->throw ($_[0]{extra_description}) }; 510 $err = sub { $ok->croak ($_[0]{extra_description}) };
388 } else { 511 } else {
389 my $bt = Carp::longmess ""; 512 my $bt = Carp::longmess "AnyEvent::FCP request $name";
513 Scalar::Util::weaken (my $self = $_[0]);
514 my $args = [@_]; shift @$args;
390 $err = sub { 515 $err = sub {
516 if ($self->{on_failure}) {
517 $self->{on_failure}($self, $name, $args, $bt, $_[0]);
518 } else {
391 die "$_[0]{extra_description}$bt"; 519 die "$_[0]{code_description} ($_[0]{extra_description})$bt";
520 }
392 }; 521 };
393 } 522 }
394 523
395 $ok ||= $NOP_CB; 524 $ok ||= $NOP_CB;
396 525
517 ); 646 );
518 647
519 $self->on (sub { 648 $self->on (sub {
520 my ($self, $type, $kv, @extra) = @_; 649 my ($self, $type, $kv, @extra) = @_;
521 650
651 $guard if 0;
652
522 if ($kv->{identifier} eq $identifier) { 653 if ($kv->{identifier} eq $identifier) {
523 if ($type eq "persistent_request_modified") { 654 if ($type eq "persistent_request_modified") {
524 $ok->($kv); 655 $ok->($kv);
525 return; 656 return;
526 } elsif ($type eq "protocol_error") { 657 } elsif ($type eq "protocol_error") {
539=cut 670=cut
540 671
541_txn get_plugin_info => sub { 672_txn get_plugin_info => sub {
542 my ($self, $ok, $err, $name, $detailed) = @_; 673 my ($self, $ok, $err, $name, $detailed) = @_;
543 674
675 my $id = $self->identifier;
676
544 $self->send_msg (get_plugin_info => 677 $self->send_msg (get_plugin_info =>
678 identifier => $id,
545 plugin_name => $name, 679 plugin_name => $name,
546 detailed => $detailed ? "true" : "false", 680 detailed => $detailed ? "true" : "false",
547 id_cb => sub {
548 my ($self, $type, $kv, $rdata) = @_;
549
550 $ok->($kv);
551 1
552 },
553 ); 681 );
682 $self->on (sub {
683 my ($self, $type, $kv) = @_;
684
685 if ($kv->{identifier} eq $id) {
686 if ($type eq "get_plugin_info") {
687 $ok->($kv);
688 } else {
689 $err->($kv, $type);
690 }
691 return;
692 }
693
694 1
695 });
554}; 696};
555 697
556=item $status = $fcp->client_get ($uri, $identifier, %kv) 698=item $status = $fcp->client_get ($uri, $identifier, %kv)
557 699
558%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>). 700%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
564=cut 706=cut
565 707
566_txn client_get => sub { 708_txn client_get => sub {
567 my ($self, $ok, $err, $uri, $identifier, %kv) = @_; 709 my ($self, $ok, $err, $uri, $identifier, %kv) = @_;
568 710
711 $self->serialise ($identifier => sub {
712 my ($self, $guard) = @_;
713
569 $self->send_msg (client_get => 714 $self->send_msg (client_get =>
570 %kv, 715 %kv,
571 uri => $uri, 716 uri => $uri,
572 identifier => $identifier, 717 identifier => $identifier,
718 );
719
720 $self->on (sub {
721 my ($self, $type, $kv, @extra) = @_;
722
723 $guard if 0;
724
725 if ($kv->{identifier} eq $identifier) {
726 if ($type eq "persistent_get") {
727 $ok->($kv);
728 return;
729 } elsif ($type eq "protocol_error") {
730 $err->($kv);
731 return;
732 }
733 }
734
735 1
736 });
573 ); 737 });
574
575 $ok->();
576}; 738};
577 739
578=item $status = $fcp->remove_request ($identifier[, $global]) 740=item $status = $fcp->remove_request ($identifier[, $global])
579 741
580Remove the request with the given isdentifier. Returns true if successful, 742Remove the request with the given isdentifier. Returns true if successful,
592 identifier => $identifier, 754 identifier => $identifier,
593 global => $global ? "true" : "false", 755 global => $global ? "true" : "false",
594 ); 756 );
595 $self->on (sub { 757 $self->on (sub {
596 my ($self, $type, $kv, @extra) = @_; 758 my ($self, $type, $kv, @extra) = @_;
759
760 $guard if 0;
597 761
598 if ($kv->{identifier} eq $identifier) { 762 if ($kv->{identifier} eq $identifier) {
599 if ($type eq "persistent_request_removed") { 763 if ($type eq "persistent_request_removed") {
600 $ok->(1); 764 $ok->(1);
601 return; 765 return;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines