ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
Revision: 1.20
Committed: Sun Jun 12 01:32:37 2016 UTC (7 years, 11 months ago) by root
Branch: MAIN
Changes since 1.19: +72 -7 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::FCP - freenet client protocol 2.0
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::FCP;
8
9 my $fcp = new AnyEvent::FCP;
10
11 # transactions return condvars
12 my $lp_cv = $fcp->list_peers;
13 my $pr_cv = $fcp->list_persistent_requests;
14
15 my $peers = $lp_cv->recv;
16 my $reqs = $pr_cv->recv;
17
18 =head1 DESCRIPTION
19
20 This module implements the freenet client protocol version 2.0, as used by
21 freenet 0.7. See L<Net::FCP> for the earlier freenet 0.5 version.
22
23 See L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0> for a
24 description of what the messages do.
25
26 The module uses L<AnyEvent> to find a suitable event module.
27
28 Only very little is implemented, ask if you need more, and look at the
29 example program later in this section.
30
31 =head2 EXAMPLE
32
33 This example fetches the download list and sets the priority of all files
34 with "a" in their name to "emergency":
35
36 use AnyEvent::FCP;
37
38 my $fcp = new AnyEvent::FCP;
39
40 $fcp->watch_global (1, 0);
41 my $req = $fcp->list_persistent_requests;
42
43 TODO
44 for my $req (values %$req) {
45 if ($req->{filename} =~ /a/) {
46 $fcp->modify_persistent_request (1, $req->{identifier}, undef, 0);
47 }
48 }
49
50 =head2 IMPORT TAGS
51
52 Nothing much can be "imported" from this module right now.
53
54 =head1 THE AnyEvent::FCP CLASS
55
56 =over 4
57
58 =cut
59
60 package AnyEvent::FCP;
61
62 use common::sense;
63
64 use Carp;
65
66 our $VERSION = 0.4;
67
68 use Scalar::Util ();
69
70 use AnyEvent;
71 use AnyEvent::Handle;
72 use AnyEvent::Util ();
73
74 our %TOLC; # tolc cache
75
76 sub touc($) {
77 local $_ = shift;
78 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
79 s/(?:^|_)(.)/\U$1/g;
80 $_
81 }
82
83 sub tolc($) {
84 local $_ = shift;
85 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/;
86 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/;
87 s/(?<=[a-z])(?=[A-Z])/_/g;
88 lc
89 }
90
91 =item $fcp = new AnyEvent::FCP key => value...;
92
93 Create a new FCP connection to the given host and port (default
94 127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
95
96 If no C<name> was specified, then AnyEvent::FCP will generate a
97 (hopefully) unique client name for you.
98
99 The following keys can be specified (they are all optional):
100
101 =over 4
102
103 =item name => $string
104
105 A unique name to identify this client. If none is specified, a randomly
106 generated name will be used.
107
108 =item host => $hostname
109
110 The hostname or IP address of the freenet node. Default is C<$ENV{FREDHOST}>
111 or C<127.0.0.1>.
112
113 =item port => $portnumber
114
115 The port number of the FCP port. Default is C<$ENV{FREDPORT}> or C<9481>.
116
117 =item timeout => $seconds
118
119 The timeout, in seconds, after which a connection error is assumed when
120 there is no activity. Default is C<7200>, i.e. two hours.
121
122 =item keepalive => $seconds
123
124 The interval, in seconds, at which keepalive messages will be
125 sent. Default is C<540>, i.e. nine minutes.
126
127 These keepalive messages are useful both to detect that a connection is
128 no longer working and to keep any (home) routers from expiring their
129 masquerading entry.
130
131 =item on_error => $callback->($fcp, $message)
132
133 Invoked on any (fatal) errors, such as unexpected connection close. The
134 callback receives the FCP object and a textual error message.
135
136 =item on_failure => $callback->($fcp, $backtrace, $args, $error)
137
138 Invoked when an FCP request fails that didn't have a failure callback. See
139 L<FCP REQUESTS> for details.
140
141 =back
142
143 =cut
144
145 sub new {
146 my $class = shift;
147
148 my $rand = join "", map chr 0x21 + rand 94, 1..40; # ~ 262 bits entropy
149
150 my $self = bless {
151 host => $ENV{FREDHOST} || "127.0.0.1",
152 port => $ENV{FREDPORT} || 9481,
153 timeout => 3600 * 2,
154 keepalive => 9 * 60,
155 name => time.rand.rand.rand, # lame
156 @_,
157 queue => [],
158 req => {},
159 prefix => "..:aefcpid:$rand:",
160 idseq => "a0",
161 }, $class;
162
163 {
164 Scalar::Util::weaken (my $self = $self);
165
166 $self->{kw} = AE::timer $self->{keepalive}, $self->{keepalive}, sub {
167 $self->{hdl}->push_write ("\n");
168 };
169
170 our $ENDMESSAGE = qr<\012(EndMessage|Data)\012>;
171
172 # these are declared here for performance reasons
173 my ($k, $v, $type);
174 my $rdata;
175
176 my $on_read = sub {
177 my ($hdl) = @_;
178
179 # we only carve out whole messages here
180 while ($hdl->{rbuf} =~ /\012(EndMessage|Data)\012/) {
181 # remember end marker
182 $rdata = $1 eq "Data"
183 or $1 eq "EndMessage"
184 or return $self->fatal ("protocol error, expected message end, got $1\n");
185
186 my @lines = split /\012/, substr $hdl->{rbuf}, 0, $-[0];
187
188 substr $hdl->{rbuf}, 0, $+[0], ""; # remove pkg
189
190 $type = shift @lines;
191 $type = ($TOLC{$type} ||= tolc $type);
192
193 my %kv;
194
195 for (@lines) {
196 ($k, $v) = split /=/, $_, 2;
197 $k = ($TOLC{$k} ||= tolc $k);
198
199 if ($k =~ /\./) {
200 # generic, slow case
201 my @k = split /\./, $k;
202 my $ro = \\%kv;
203
204 while (@k) {
205 $k = shift @k;
206 if ($k =~ /^\d+$/) {
207 $ro = \$$ro->[$k];
208 } else {
209 $ro = \$$ro->{$k};
210 }
211 }
212
213 $$ro = $v;
214
215 next;
216 }
217
218 # special comon case, for performance only
219 $kv{$k} = $v;
220 }
221
222 if ($rdata) {
223 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
224 $rdata = \$_[1];
225 $self->recv ($type, \%kv, $rdata);
226 });
227
228 last; # do not tgry to parse more messages
229 } else {
230 $self->recv ($type, \%kv);
231 }
232 }
233 };
234
235 $self->{hdl} = new AnyEvent::Handle
236 connect => [$self->{host} => $self->{port}],
237 timeout => $self->{timeout},
238 on_read => $on_read,
239 on_eof => $self->{on_eof},
240 on_error => sub {
241 $self->fatal ($_[2]);
242 },
243 ;
244
245 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
246 }
247
248 $self->send_msg (client_hello =>
249 name => $self->{name},
250 expected_version => "2.0",
251 );
252
253 $self
254 }
255
256 sub fatal {
257 my ($self, $msg) = @_;
258
259 $self->{hdl}->shutdown;
260 delete $self->{kw};
261
262 if ($self->{on_error}) {
263 $self->{on_error}->($self, $msg);
264 } else {
265 die $msg;
266 }
267 }
268
269 sub identifier {
270 $_[0]{prefix} . ++$_[0]{idseq}
271 }
272
273 sub send_msg {
274 my ($self, $type, %kv) = @_;
275
276 my $data = delete $kv{data};
277
278 if (exists $kv{id_cb}) {
279 my $id = $kv{identifier} ||= $self->identifier;
280 $self->{id}{$id} = delete $kv{id_cb};
281 }
282
283 my $msg = (touc $type) . "\012"
284 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
285
286 sub id {
287 my ($self) = @_;
288
289
290 }
291
292 if (defined $data) {
293 $msg .= "DataLength=" . (length $data) . "\012"
294 . "Data\012$data";
295 } else {
296 $msg .= "EndMessage\012";
297 }
298
299 $self->{hdl}->push_write ($msg);
300 }
301
302 sub on {
303 my ($self, $cb) = @_;
304
305 # cb return undef - message eaten, remove cb
306 # cb return 0 - message eaten
307 # cb return 1 - pass to next
308
309 push @{ $self->{on} }, $cb;
310 }
311
312 sub _push_queue {
313 my ($self, $queue) = @_;
314
315 shift @$queue;
316 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
317 if @$queue;
318 }
319
320 # lock so only one $type (arbitrary string) is in flight,
321 # to work around horribly misdesigned protocol.
322 sub serialise {
323 my ($self, $type, $cb) = @_;
324
325 my $queue = $self->{serialise}{$type} ||= [];
326 push @$queue, $cb;
327 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
328 unless $#$queue;
329 }
330
331 # how to merge these types into $self->{persistent}
332 our %PERSISTENT_TYPE = (
333 persistent_get => sub { %{ $_[1] } = (type => "persistent_get" , %{ $_[2] }) },
334 persistent_put => sub { %{ $_[1] } = (type => "persistent_put" , %{ $_[2] }) },
335 persistent_put_dir => sub { %{ $_[1] } = (type => "persistent_put_dir", %{ $_[2] }) },
336 persistent_request_modified => sub { %{ $_[1] } = (%{ $_[1] }, %{ $_[2] }) },
337 persistent_request_removed => sub { delete $_[0]{req}{$_[2]{identifier}} },
338
339 simple_progress => sub { $_[1]{simple_progress} = $_[2] }, # get/put
340
341 uri_generated => sub { $_[1]{uri_generated} = $_[2] }, # put
342 generated_metadata => sub { $_[1]{generated_metadata} = $_[2] }, # put
343 started_compression => sub { $_[1]{started_compression} = $_[2] }, # put
344 finished_compression => sub { $_[1]{finished_compression} = $_[2] }, # put
345 put_fetchable => sub { $_[1]{put_fetchable} = $_[2] }, # put
346 put_failed => sub { $_[1]{put_failed} = $_[2] }, # put
347 put_successful => sub { $_[1]{put_successful} = $_[2] }, # put
348
349 sending_to_network => sub { $_[1]{sending_to_network} = $_[2] }, # get
350 compatibility_mode => sub { $_[1]{compatibility_mode} = $_[2] }, # get
351 expected_hashes => sub { $_[1]{expected_hashes} = $_[2] }, # get
352 expected_mime => sub { $_[1]{expected_mime} = $_[2] }, # get
353 expected_data_length => sub { $_[1]{expected_data_length} = $_[2] }, # get
354 get_failed => sub { $_[1]{get_failed} = $_[2] }, # get
355 data_found => sub { $_[1]{data_found} = $_[2] }, # get
356 enter_finite_cooldown => sub { $_[1]{enter_finite_cooldown} = $_[2] }, # get
357 );
358
359 sub recv {
360 my ($self, $type, $kv, @extra) = @_;
361
362 if (my $cb = $PERSISTENT_TYPE{$type}) {
363 my $id = $kv->{identifier};
364 my $req = $_[0]{req}{$id} ||= {};
365 $cb->($self, $req, $kv);
366 $self->recv (request_changed => $kv, $type, @extra);
367 }
368
369 my $on = $self->{on};
370 for (0 .. $#$on) {
371 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
372 splice @$on, $_, 1 unless defined $res;
373 return;
374 }
375 }
376
377 if (my $cb = $self->{queue}[0]) {
378 $cb->($self, $type, $kv, @extra)
379 and shift @{ $self->{queue} };
380 } else {
381 $self->default_recv ($type, $kv, @extra);
382 }
383 }
384
385 sub default_recv {
386 my ($self, $type, $kv, $rdata) = @_;
387
388 if ($type eq "node_hello") {
389 $self->{node_hello} = $kv;
390 } elsif (exists $self->{id}{$kv->{identifier}}) {
391 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
392 and delete $self->{id}{$kv->{identifier}};
393 }
394 }
395
396 =back
397
398 =head2 FCP REQUESTS
399
400 The following methods implement various requests. Most of them map
401 directory to the FCP message of the same name. The added benefit of
402 these over sending requests yourself is that they handle the necessary
403 serialisation, protocol quirks, and replies.
404
405 All of them exist in two versions, the variant shown in this manpage, and
406 a variant with an extra C<_> at the end, and an extra C<$cb> argument. The
407 version as shown is I<synchronous> - it will wait for any replies, and
408 either return the reply, or croak with an error. The underscore variant
409 returns immediately and invokes one or more callbacks or condvars later.
410
411 For example, the call
412
413 $info = $fcp->get_plugin_info ($name, $detailed);
414
415 Also comes in this underscore variant:
416
417 $fcp->get_plugin_info_ ($name, $detailed, $cb);
418
419 You can thinbk of the underscore as a kind of continuation indicator - the
420 normal function waits and returns with the data, the C<_> indicates that
421 you pass the continuation yourself, and the continuation will be invoked
422 with the results.
423
424 This callback/continuation argument (C<$cb>) can come in three forms itself:
425
426 =over 4
427
428 =item A code reference (or rather anything not matching some other alternative)
429
430 This code reference will be invoked with the result on success. On an
431 error, it will invoke the C<on_failure> callback of the FCP object, or,
432 if none was defined, will die (in the event loop) with a backtrace of the
433 call site.
434
435 This is a popular choice, but it makes handling errors hard - make sure
436 you never generate protocol errors!
437
438 If an C<on_failure> hook exists, it will be invoked with the FCP object,
439 a (textual) backtrace as generated by C<Carp::longmess>, and arrayref
440 containing the arguments from the original request invocation and the
441 error object from the server, in this order, e.g.:
442
443 on_failure => sub {
444 my ($fcp, $backtrace, $orig_args, $error_object) = @_;
445 ...
446 },
447
448 =item A condvar (as returned by e.g. C<< AnyEvent->condvar >>)
449
450 When a condvar is passed, it is sent (C<< $cv->send ($results) >>) the
451 results when the request has finished. Should an error occur, the error
452 will instead result in C<< $cv->croak ($error) >>.
453
454 This is also a popular choice.
455
456 =item An array with two callbacks C<[$success, $failure]>
457
458 The C<$success> callback will be invoked with the results, while the
459 C<$failure> callback will be invoked on any errors.
460
461 The C<$failure> callback will be invoked with the error object from the
462 server.
463
464 =item C<undef>
465
466 This is the same thing as specifying C<sub { }> as callback, i.e. on
467 success, the results are ignored, while on failure, the C<on_failure> hook
468 is invoked or the module dies with a backtrace.
469
470 This is good for quick scripts, or when you really aren't interested in
471 the results.
472
473 =back
474
475 =cut
476
477 our $NOP_CB = sub { };
478
479 sub _txn {
480 my ($name, $sub) = @_;
481
482 *{$name} = sub {
483 my $cv = AE::cv;
484
485 splice @_, 1, 0, $cv, sub { $cv->croak ($_[0]{extra_description}) };
486 &$sub;
487 $cv->recv
488 };
489
490 *{"$name\_"} = sub {
491 my ($ok, $err) = pop;
492
493 if (ARRAY:: eq ref $ok) {
494 ($ok, $err) = @$ok;
495 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
496 $err = sub { $ok->croak ($_[0]{extra_description}) };
497 } else {
498 my $bt = Carp::longmess "AnyEvent::FCP request $name";
499 Scalar::Util::weaken (my $self = $_[0]);
500 my $args = [@_]; shift @$args;
501 $err = sub {
502 if ($self->{on_failure}) {
503 $self->{on_failure}($self, $args, $bt, $_[0]);
504 } else {
505 die "$_[0]{code_description} ($_[0]{extra_description})$bt";
506 }
507 };
508 }
509
510 $ok ||= $NOP_CB;
511
512 splice @_, 1, 0, $ok, $err;
513 &$sub;
514 };
515 }
516
517 =over 4
518
519 =item $peers = $fcp->list_peers ([$with_metdata[, $with_volatile]])
520
521 =cut
522
523 _txn list_peers => sub {
524 my ($self, $ok, undef, $with_metadata, $with_volatile) = @_;
525
526 my @res;
527
528 $self->send_msg (list_peers =>
529 with_metadata => $with_metadata ? "true" : "false",
530 with_volatile => $with_volatile ? "true" : "false",
531 id_cb => sub {
532 my ($self, $type, $kv, $rdata) = @_;
533
534 if ($type eq "end_list_peers") {
535 $ok->(\@res);
536 1
537 } else {
538 push @res, $kv;
539 0
540 }
541 },
542 );
543 };
544
545 =item $notes = $fcp->list_peer_notes ($node_identifier)
546
547 =cut
548
549 _txn list_peer_notes => sub {
550 my ($self, $ok, undef, $node_identifier) = @_;
551
552 $self->send_msg (list_peer_notes =>
553 node_identifier => $node_identifier,
554 id_cb => sub {
555 my ($self, $type, $kv, $rdata) = @_;
556
557 $ok->($kv);
558 1
559 },
560 );
561 };
562
563 =item $fcp->watch_global ($enabled[, $verbosity_mask])
564
565 =cut
566
567 _txn watch_global => sub {
568 my ($self, $ok, $err, $enabled, $verbosity_mask) = @_;
569
570 $self->send_msg (watch_global =>
571 enabled => $enabled ? "true" : "false",
572 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
573 );
574
575 $ok->();
576 };
577
578 =item $reqs = $fcp->list_persistent_requests
579
580 =cut
581
582 _txn list_persistent_requests => sub {
583 my ($self, $ok, $err) = @_;
584
585 $self->serialise (list_persistent_requests => sub {
586 my ($self, $guard) = @_;
587
588 my @res;
589
590 $self->send_msg ("list_persistent_requests");
591
592 $self->on (sub {
593 my ($self, $type, $kv, $rdata) = @_;
594
595 $guard if 0;
596
597 if ($type eq "end_list_persistent_requests") {
598 $ok->(\@res);
599 return;
600 } else {
601 my $id = $kv->{identifier};
602
603 if ($type =~ /^persistent_(get|put|put_dir)$/) {
604 push @res, [$type, $kv];
605 }
606 }
607
608 1
609 });
610 });
611 };
612
613 =item $sync = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
614
615 Update either the C<client_token> or C<priority_class> of a request
616 identified by C<$global> and C<$identifier>, depending on which of
617 C<$client_token> and C<$priority_class> are not C<undef>.
618
619 =cut
620
621 _txn modify_persistent_request => sub {
622 my ($self, $ok, $err, $global, $identifier, $client_token, $priority_class) = @_;
623
624 $self->serialise ($identifier => sub {
625 my ($self, $guard) = @_;
626
627 $self->send_msg (modify_persistent_request =>
628 global => $global ? "true" : "false",
629 identifier => $identifier,
630 defined $client_token ? (client_token => $client_token ) : (),
631 defined $priority_class ? (priority_class => $priority_class) : (),
632 );
633
634 $self->on (sub {
635 my ($self, $type, $kv, @extra) = @_;
636
637 $guard if 0;
638
639 if ($kv->{identifier} eq $identifier) {
640 if ($type eq "persistent_request_modified") {
641 $ok->($kv);
642 return;
643 } elsif ($type eq "protocol_error") {
644 $err->($kv);
645 return;
646 }
647 }
648
649 1
650 });
651 });
652 };
653
654 =item $info = $fcp->get_plugin_info ($name, $detailed)
655
656 =cut
657
658 _txn get_plugin_info => sub {
659 my ($self, $ok, $err, $name, $detailed) = @_;
660
661 my $id = $self->identifier;
662
663 $self->send_msg (get_plugin_info =>
664 identifier => $id,
665 plugin_name => $name,
666 detailed => $detailed ? "true" : "false",
667 );
668 $self->on (sub {
669 my ($self, $type, $kv) = @_;
670
671 if ($kv->{identifier} eq $id) {
672 if ($type eq "get_plugin_info") {
673 $ok->($kv);
674 } else {
675 $err->($kv, $type);
676 }
677 return;
678 }
679
680 1
681 });
682 };
683
684 =item $status = $fcp->client_get ($uri, $identifier, %kv)
685
686 %kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
687
688 ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
689 priority_class, persistence, client_token, global, return_type,
690 binary_blob, allowed_mime_types, filename, temp_filename
691
692 =cut
693
694 _txn client_get => sub {
695 my ($self, $ok, $err, $uri, $identifier, %kv) = @_;
696
697 $self->serialise ($identifier => sub {
698 my ($self, $guard) = @_;
699
700 $self->send_msg (client_get =>
701 %kv,
702 uri => $uri,
703 identifier => $identifier,
704 );
705
706 $self->on (sub {
707 my ($self, $type, $kv, @extra) = @_;
708
709 $guard if 0;
710
711 if ($kv->{identifier} eq $identifier) {
712 if ($type eq "persistent_get") {
713 $ok->($kv);
714 return;
715 } elsif ($type eq "protocol_error") {
716 $err->($kv);
717 return;
718 }
719 }
720
721 1
722 });
723 });
724 };
725
726 =item $status = $fcp->remove_request ($identifier[, $global])
727
728 Remove the request with the given isdentifier. Returns true if successful,
729 false on error.
730
731 =cut
732
733 _txn remove_request => sub {
734 my ($self, $ok, $err, $identifier, $global) = @_;
735
736 $self->serialise ($identifier => sub {
737 my ($self, $guard) = @_;
738
739 $self->send_msg (remove_request =>
740 identifier => $identifier,
741 global => $global ? "true" : "false",
742 );
743 $self->on (sub {
744 my ($self, $type, $kv, @extra) = @_;
745
746 $guard if 0;
747
748 if ($kv->{identifier} eq $identifier) {
749 if ($type eq "persistent_request_removed") {
750 $ok->(1);
751 return;
752 } elsif ($type eq "protocol_error") {
753 $err->($kv);
754 return;
755 }
756 }
757
758 1
759 });
760 });
761 };
762
763 =item ($can_read, $can_write) = $fcp->test_dda ($local_directory, $remote_directory, $want_read, $want_write))
764
765 The DDA test in FCP is probably the single most broken protocol - only
766 one directory test can be outstanding at any time, and some guessing and
767 heuristics are involved in mangling the paths.
768
769 This function combines C<TestDDARequest> and C<TestDDAResponse> in one
770 request, handling file reading and writing as well, and tries very hard to
771 do the right thing.
772
773 Both C<$local_directory> and C<$remote_directory> must specify the same
774 directory - C<$local_directory> is the directory path on the client (where
775 L<AnyEvent::FCP> runs) and C<$remote_directory> is the directory path on
776 the server (where the freenet node runs). When both are running on the
777 same node, the paths are generally identical.
778
779 C<$want_read> and C<$want_write> should be set to a true value when you
780 want to read (get) files or write (put) files, respectively.
781
782 On error, an exception is thrown. Otherwise, C<$can_read> and
783 C<$can_write> indicate whether you can reaqd or write to freenet via the
784 directory.
785
786 =cut
787
788 _txn test_dda => sub {
789 my ($self, $ok, $err, $local, $remote, $want_read, $want_write) = @_;
790
791 $self->serialise (test_dda => sub {
792 my ($self, $guard) = @_;
793
794 $self->send_msg (test_dda_request =>
795 directory => $remote,
796 want_read_directory => $want_read ? "true" : "false",
797 want_write_directory => $want_write ? "true" : "false",
798 );
799 $self->on (sub {
800 my ($self, $type, $kv) = @_;
801
802 if ($type eq "test_dda_reply") {
803 # the filenames are all relative to the server-side directory,
804 # which might or might not match $remote anymore, so we
805 # need to rewrite the paths to be relative to $local
806 for my $k (qw(read_filename write_filename)) {
807 my $f = $kv->{$k};
808 for my $dir ($kv->{directory}, $remote) {
809 if ($dir eq substr $f, 0, length $dir) {
810 substr $f, 0, 1 + length $dir, "";
811 $kv->{$k} = $f;
812 last;
813 }
814 }
815 }
816
817 my %response = (directory => $remote);
818
819 if (length $kv->{read_filename}) {
820 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
821 sysread $fh, my $buf, -s $fh;
822 $response{read_content} = $buf;
823 }
824 }
825
826 if (length $kv->{write_filename}) {
827 if (open my $fh, ">:raw", "$local/$kv->{write_filename}") {
828 syswrite $fh, $kv->{content_to_write};
829 }
830 }
831
832 $self->send_msg (test_dda_response => %response);
833
834 $self->on (sub {
835 my ($self, $type, $kv) = @_;
836
837 $guard if 0; # reference
838
839 if ($type eq "test_dda_complete") {
840 $ok->(
841 $kv->{read_directory_allowed} eq "true",
842 $kv->{write_directory_allowed} eq "true",
843 );
844 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
845 $err->($kv->{extra_description});
846 return;
847 }
848
849 1
850 });
851
852 return;
853 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
854 $err->($kv);
855 return;
856 }
857
858 1
859 });
860 });
861 };
862
863 =back
864
865 =head2 REQUEST CACHE
866
867 The C<AnyEvent::FCP> class keeps a request cache, where it caches all
868 information from requests.
869
870 For these messages, it will store a copy of the key-value pairs, together with a C<type> slot,
871 in C<< $fcp->{req}{$identifier} >>:
872
873 persistent_get
874 persistent_put
875 persistent_put_dir
876
877 This message updates the stored data:
878
879 persistent_request_modified
880
881 This message will remove this entry:
882
883 persistent_request_removed
884
885 These messages get merged into the cache entry, under their
886 type, i.e. a C<simple_progress> message will be stored in C<<
887 $fcp->{req}{$identifier}{simple_progress} >>:
888
889 simple_progress # get/put
890
891 uri_generated # put
892 generated_metadata # put
893 started_compression # put
894 finished_compression # put
895 put_failed # put
896 put_fetchable # put
897 put_successful # put
898
899 sending_to_network # get
900 compatibility_mode # get
901 expected_hashes # get
902 expected_mime # get
903 expected_data_length # get
904 get_failed # get
905 data_found # get
906 enter_finite_cooldown # get
907
908 In addition, an event (basically a fake message) of type C<request_changed> is generated
909 on every change, which will be called as C<< $cb->($fcp, $kv, $type) >>, where C<$type>
910 is the type of the original message triggering the change,
911
912 To fill this cache with the global queue and keep it updated,
913 call C<watch_global> to subscribe to updates, followed by
914 C<list_persistent_requests_sync>.
915
916 $fcp->watch_global_sync_; # do not wait
917 $fcp->list_persistent_requests; # wait
918
919 To get a better idea of what is stored in the cache, here is an example of
920 what might be stored in C<< $fcp->{req}{"Frost-gpl.txt"} >>:
921
922 {
923 identifier => "Frost-gpl.txt",
924 uri => 'CHK@Fnx5kzdrfE,EImdzaVyEWl,AAIC--8/gpl.txt',
925 binary_blob => "false",
926 global => "true",
927 max_retries => -1,
928 max_size => 9223372036854775807,
929 persistence => "forever",
930 priority_class => 3,
931 real_time => "false",
932 return_type => "direct",
933 started => "true",
934 type => "persistent_get",
935 verbosity => 2147483647,
936 sending_to_network => {
937 identifier => "Frost-gpl.txt",
938 global => "true",
939 },
940 compatibility_mode => {
941 identifier => "Frost-gpl.txt",
942 definitive => "true",
943 dont_compress => "false",
944 global => "true",
945 max => "COMPAT_1255",
946 min => "COMPAT_1255",
947 },
948 expected_hashes => {
949 identifier => "Frost-gpl.txt",
950 global => "true",
951 hashes => {
952 ed2k => "d83596f5ee3b7...",
953 md5 => "e0894e4a2a6...",
954 sha1 => "...",
955 sha256 => "...",
956 sha512 => "...",
957 tth => "...",
958 },
959 },
960 expected_mime => {
961 identifier => "Frost-gpl.txt",
962 global => "true",
963 metadata => { content_type => "application/rar" },
964 },
965 expected_data_length => {
966 identifier => "Frost-gpl.txt",
967 data_length => 37576,
968 global => "true",
969 },
970 simple_progress => {
971 identifier => "Frost-gpl.txt",
972 failed => 0,
973 fatally_failed => 0,
974 finalized_total => "true",
975 global => "true",
976 last_progress => 1438639282628,
977 required => 372,
978 succeeded => 102,
979 total => 747,
980 },
981 data_found => {
982 identifier => "Frost-gpl.txt",
983 completion_time => 1438663354026,
984 data_length => 37576,
985 global => "true",
986 metadata => { content_type => "image/jpeg" },
987 startup_time => 1438657196167,
988 },
989 }
990
991 =head1 EXAMPLE PROGRAM
992
993 use AnyEvent::FCP;
994
995 my $fcp = new AnyEvent::FCP;
996
997 # let us look at the global request list
998 $fcp->watch_global_ (1);
999
1000 # list them, synchronously
1001 my $req = $fcp->list_persistent_requests;
1002
1003 # go through all requests
1004 TODO
1005 for my $req (values %$req) {
1006 # skip jobs not directly-to-disk
1007 next unless $req->{return_type} eq "disk";
1008 # skip jobs not issued by FProxy
1009 next unless $req->{identifier} =~ /^FProxy:/;
1010
1011 if ($req->{data_found}) {
1012 # file has been successfully downloaded
1013
1014 ... move the file away
1015 (left as exercise)
1016
1017 # remove the request
1018
1019 $fcp->remove_request (1, $req->{identifier});
1020 } elsif ($req->{get_failed}) {
1021 # request has failed
1022 if ($req->{get_failed}{code} == 11) {
1023 # too many path components, should restart
1024 } else {
1025 # other failure
1026 }
1027 } else {
1028 # modify priorities randomly, to improve download rates
1029 $fcp->modify_persistent_request (1, $req->{identifier}, undef, int 6 - 5 * (rand) ** 1.7)
1030 if 0.1 > rand;
1031 }
1032 }
1033
1034 # see if the dummy plugin is loaded, to ensure all previous requests have finished.
1035 $fcp->get_plugin_info_sync ("dummy");
1036
1037 =head1 SEE ALSO
1038
1039 L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0>, L<Net::FCP>.
1040
1041 =head1 BUGS
1042
1043 =head1 AUTHOR
1044
1045 Marc Lehmann <schmorp@schmorp.de>
1046 http://home.schmorp.de/
1047
1048 =cut
1049
1050 1
1051