ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
Revision: 1.9
Committed: Tue Aug 4 00:35:16 2015 UTC (8 years, 9 months ago) by root
Branch: MAIN
Changes since 1.8: +132 -5 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::FCP - freenet client protocol 2.0
4    
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::FCP;
8 root 1.1
9 root 1.3 my $fcp = new AnyEvent::FCP;
10 root 1.1
11 root 1.7 # transactions return condvars
12 root 1.3 my $lp_cv = $fcp->list_peers;
13     my $pr_cv = $fcp->list_persistent_requests;
14    
15     my $peers = $lp_cv->recv;
16     my $reqs = $pr_cv->recv;
17 root 1.1
18     =head1 DESCRIPTION
19    
20     This module implements the freenet client protocol version 2.0, as used by
21     freenet 0.7. See L<Net::FCP> for the earlier freenet 0.5 version.
22    
23 root 1.3 See L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0> for a
24     description of what the messages do.
25 root 1.1
26     The module uses L<AnyEvent> to find a suitable event module.
27    
28 root 1.3 Only very little is implemented, ask if you need more, and look at the
29     example program later in this section.
30    
31 root 1.7 =head2 EXAMPLE
32    
33     This example fetches the download list and sets the priority of all files
34     with "a" in their name to "emergency":
35    
36     use AnyEvent::FCP;
37    
38     my $fcp = new AnyEvent::FCP;
39    
40     $fcp->watch_global_sync (1, 0);
41     my $req = $fcp->list_persistent_requests_sync;
42    
43     for my $req (values %$req) {
44     if ($req->{filename} =~ /a/) {
45     $fcp->modify_persistent_request_sync (1, $req->{identifier}, undef, 0);
46     }
47     }
48    
49 root 1.1 =head2 IMPORT TAGS
50    
51     Nothing much can be "imported" from this module right now.
52    
53     =head2 THE AnyEvent::FCP CLASS
54    
55     =over 4
56    
57     =cut
58    
59     package AnyEvent::FCP;
60    
61 root 1.2 use common::sense;
62    
63 root 1.1 use Carp;
64    
65 root 1.7 our $VERSION = '0.3';
66 root 1.1
67 root 1.2 use Scalar::Util ();
68 root 1.1
69     use AnyEvent;
70 root 1.2 use AnyEvent::Handle;
71 root 1.9 use AnyEvent::Util ();
72 root 1.1
73     sub touc($) {
74     local $_ = shift;
75 root 1.9 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
76 root 1.1 s/(?:^|_)(.)/\U$1/g;
77     $_
78     }
79    
80     sub tolc($) {
81     local $_ = shift;
82 root 1.9 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/i;
83     1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/i;
84 root 1.1 s/(?<=[a-z])(?=[A-Z])/_/g;
85     lc
86     }
87    
88     =item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name]
89    
90     Create a new FCP connection to the given host and port (default
91 root 1.2 127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
92 root 1.1
93 root 1.3 If no C<name> was specified, then AnyEvent::FCP will generate a
94     (hopefully) unique client name for you.
95    
96 root 1.6 You can install a progress callback that is being called with the AnyEvent::FCP
97     object, the type, a hashref with key-value pairs and a reference to any received data,
98     for all unsolicited messages.
99    
100     Example:
101    
102     sub progress_cb {
103     my ($self, $type, $kv, $rdata) = @_;
104    
105     if ($type eq "simple_progress") {
106     warn "$kv->{identifier} $kv->{succeeded}/$kv->{required}\n";
107     }
108     }
109    
110 root 1.3 =cut
111 root 1.1
112     sub new {
113     my $class = shift;
114     my $self = bless { @_ }, $class;
115    
116 root 1.6 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1";
117     $self->{port} ||= $ENV{FREDPORT} || 9481;
118     $self->{name} ||= time.rand.rand.rand; # lame
119 root 1.8 $self->{timeout} ||= 3600*2;
120 root 1.6 $self->{progress} ||= sub { };
121 root 1.2
122     $self->{id} = "a0";
123    
124     {
125     Scalar::Util::weaken (my $self = $self);
126    
127     $self->{hdl} = new AnyEvent::Handle
128     connect => [$self->{host} => $self->{port}],
129     timeout => $self->{timeout},
130     on_error => sub {
131 root 1.8 warn "@_\n";#d#
132 root 1.2 exit 1;
133     },
134     on_read => sub { $self->on_read (@_) },
135     on_eof => $self->{on_eof} || sub { };
136    
137     Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
138     }
139    
140     $self->send_msg (
141     client_hello =>
142     name => $self->{name},
143     expected_version => "2.0",
144     );
145 root 1.1
146     $self
147     }
148    
149 root 1.2 sub send_msg {
150     my ($self, $type, %kv) = @_;
151 root 1.1
152 root 1.2 my $data = delete $kv{data};
153 root 1.1
154 root 1.2 if (exists $kv{id_cb}) {
155 root 1.9 my $id = $kv{identifier} ||= ++$self->{id};
156 root 1.2 $self->{id}{$id} = delete $kv{id_cb};
157 root 1.1 }
158    
159 root 1.2 my $msg = (touc $type) . "\012"
160     . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
161 root 1.1
162 root 1.2 sub id {
163     my ($self) = @_;
164 root 1.1
165    
166 root 1.2 }
167 root 1.1
168     if (defined $data) {
169 root 1.2 $msg .= "DataLength=" . (length $data) . "\012"
170     . "Data\012$data";
171 root 1.1 } else {
172 root 1.2 $msg .= "EndMessage\012";
173 root 1.1 }
174    
175 root 1.2 $self->{hdl}->push_write ($msg);
176 root 1.1 }
177    
178 root 1.9 sub on {
179     my ($self, $cb) = @_;
180    
181     # cb return undef - message eaten, remove cb
182     # cb return 0 - message eaten
183     # cb return 1 - pass to next
184    
185     push @{ $self->{on} }, $cb;
186     }
187    
188     sub _push_queue {
189     my ($self, $queue) = @_;
190    
191     warn "oush @$queue\n";#d#
192     shift @$queue;
193     $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
194     if @$queue;
195     }
196    
197     # lock so only one $type (arbitrary string) is in flight,
198     # to work around horribly misdesigned protocol.
199     sub serialise {
200     my ($self, $type, $cb) = @_;
201    
202     my $queue = $self->{serialise}{$type} ||= [];
203     push @$queue, $cb;
204     $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
205     unless $#$queue;
206     }
207    
208 root 1.2 sub on_read {
209 root 1.1 my ($self) = @_;
210    
211 root 1.2 my $type;
212     my %kv;
213     my $rdata;
214    
215     my $done_cb = sub {
216     $kv{pkt_type} = $type;
217    
218 root 1.9 my $on = $self->{on};
219     for (0 .. $#$on) {
220     unless (my $res = $on->[$_]($type, \%kv, $rdata)) {
221     splice @$on, $_, 1 unless defined $res;
222     return;
223     }
224     }
225    
226 root 1.2 if (my $cb = $self->{queue}[0]) {
227     $cb->($self, $type, \%kv, $rdata)
228     and shift @{ $self->{queue} };
229     } else {
230     $self->default_recv ($type, \%kv, $rdata);
231 root 1.1 }
232 root 1.2 };
233 root 1.1
234 root 1.2 my $hdr_cb; $hdr_cb = sub {
235     if ($_[1] =~ /^([^=]+)=(.*)$/) {
236     my ($k, $v) = ($1, $2);
237     my @k = split /\./, tolc $k;
238     my $ro = \\%kv;
239    
240     while (@k) {
241     my $k = shift @k;
242     if ($k =~ /^\d+$/) {
243     $ro = \$$ro->[$k];
244 root 1.1 } else {
245 root 1.2 $ro = \$$ro->{$k};
246 root 1.1 }
247     }
248    
249 root 1.2 $$ro = $v;
250 root 1.1
251 root 1.2 $_[0]->push_read (line => $hdr_cb);
252     } elsif ($_[1] eq "Data") {
253     $_[0]->push_read (chunk => delete $kv{data_length}, sub {
254     $rdata = \$_[1];
255     $done_cb->();
256     });
257     } elsif ($_[1] eq "EndMessage") {
258     $done_cb->();
259     } else {
260     die "protocol error, expected message end, got $_[1]\n";#d#
261     }
262     };
263 root 1.1
264 root 1.2 $self->{hdl}->push_read (line => sub {
265     $type = tolc $_[1];
266     $_[0]->push_read (line => $hdr_cb);
267     });
268     }
269 root 1.1
270 root 1.2 sub default_recv {
271     my ($self, $type, $kv, $rdata) = @_;
272    
273     if ($type eq "node_hello") {
274     $self->{node_hello} = $kv;
275     } elsif (exists $self->{id}{$kv->{identifier}}) {
276     $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
277     and delete $self->{id}{$kv->{identifier}};
278 root 1.1 } else {
279 root 1.6 &{ $self->{progress} };
280 root 1.1 }
281     }
282    
283 root 1.2 sub _txn {
284     my ($name, $sub) = @_;
285 root 1.1
286 root 1.2 *{$name} = sub {
287     splice @_, 1, 0, (my $cv = AnyEvent->condvar);
288     &$sub;
289     $cv
290     };
291 root 1.1
292 root 1.2 *{"$name\_sync"} = sub {
293     splice @_, 1, 0, (my $cv = AnyEvent->condvar);
294     &$sub;
295     $cv->recv
296     };
297 root 1.1 }
298    
299 root 1.3 =item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]])
300    
301     =item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]])
302    
303     =cut
304    
305 root 1.2 _txn list_peers => sub {
306     my ($self, $cv, $with_metadata, $with_volatile) = @_;
307 root 1.1
308 root 1.2 my @res;
309 root 1.1
310 root 1.2 $self->send_msg (list_peers =>
311     with_metadata => $with_metadata ? "true" : "false",
312     with_volatile => $with_volatile ? "true" : "false",
313     id_cb => sub {
314     my ($self, $type, $kv, $rdata) = @_;
315    
316     if ($type eq "end_list_peers") {
317     $cv->(\@res);
318     1
319     } else {
320     push @res, $kv;
321     0
322     }
323     },
324     );
325     };
326 root 1.1
327 root 1.3 =item $cv = $fcp->list_peer_notes ($node_identifier)
328    
329     =item $notes = $fcp->list_peer_notes_sync ($node_identifier)
330    
331     =cut
332    
333 root 1.2 _txn list_peer_notes => sub {
334     my ($self, $cv, $node_identifier) = @_;
335 root 1.1
336 root 1.2 $self->send_msg (list_peer_notes =>
337     node_identifier => $node_identifier,
338     id_cb => sub {
339     my ($self, $type, $kv, $rdata) = @_;
340    
341     $cv->($kv);
342     1
343     },
344     );
345     };
346 root 1.1
347 root 1.3 =item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
348    
349     =item $fcp->watch_global_sync ($enabled[, $verbosity_mask])
350    
351     =cut
352    
353 root 1.2 _txn watch_global => sub {
354     my ($self, $cv, $enabled, $verbosity_mask) = @_;
355 root 1.1
356 root 1.2 $self->send_msg (watch_global =>
357     enabled => $enabled ? "true" : "false",
358     defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
359     );
360 root 1.1
361 root 1.2 $cv->();
362     };
363 root 1.1
364 root 1.3 =item $cv = $fcp->list_persistent_requests
365    
366     =item $reqs = $fcp->list_persistent_requests_sync
367    
368     =cut
369    
370 root 1.2 _txn list_persistent_requests => sub {
371     my ($self, $cv) = @_;
372 root 1.1
373 root 1.2 my %res;
374 root 1.1
375 root 1.2 $self->send_msg ("list_persistent_requests");
376 root 1.1
377 root 1.2 push @{ $self->{queue} }, sub {
378     my ($self, $type, $kv, $rdata) = @_;
379    
380     if ($type eq "end_list_persistent_requests") {
381     $cv->(\%res);
382     1
383     } else {
384     my $id = $kv->{identifier};
385    
386     if ($type =~ /^persistent_(get|put|put_dir)$/) {
387     $res{$id} = {
388     type => $1,
389     %{ $res{$id} },
390     %$kv,
391     };
392     } elsif ($type eq "simple_progress") {
393     delete $kv->{pkt_type}; # save memory
394     push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
395     } else {
396     $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
397     }
398     0
399     }
400 root 1.1 };
401 root 1.2 };
402 root 1.1
403 root 1.3 =item $cv = $fcp->remove_request ($global, $identifier)
404    
405     =item $status = $fcp->remove_request_sync ($global, $identifier)
406    
407     =cut
408    
409 root 1.2 _txn remove_request => sub {
410     my ($self, $cv, $global, $identifier) = @_;
411 root 1.1
412 root 1.2 $self->send_msg (remove_request =>
413     global => $global ? "true" : "false",
414     identifier => $identifier,
415     id_cb => sub {
416     my ($self, $type, $kv, $rdata) = @_;
417    
418     $cv->($kv);
419     1
420     },
421     );
422     };
423 root 1.1
424 root 1.3 =item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
425    
426 root 1.4 =item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
427 root 1.3
428     =cut
429    
430 root 1.2 _txn modify_persistent_request => sub {
431     my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
432 root 1.1
433 root 1.2 $self->send_msg (modify_persistent_request =>
434     global => $global ? "true" : "false",
435     defined $client_token ? (client_token => $client_token ) : (),
436     defined $priority_class ? (priority_class => $priority_class) : (),
437 root 1.4 identifier => $identifier,
438 root 1.2 id_cb => sub {
439     my ($self, $type, $kv, $rdata) = @_;
440    
441     $cv->($kv);
442     1
443     },
444     );
445     };
446 root 1.1
447 root 1.3 =item $cv = $fcp->get_plugin_info ($name, $detailed)
448    
449     =item $info = $fcp->get_plugin_info_sync ($name, $detailed)
450    
451     =cut
452    
453 root 1.2 _txn get_plugin_info => sub {
454     my ($self, $cv, $name, $detailed) = @_;
455 root 1.1
456 root 1.2 $self->send_msg (get_plugin_info =>
457     plugin_name => $name,
458     detailed => $detailed ? "true" : "false",
459     id_cb => sub {
460     my ($self, $type, $kv, $rdata) = @_;
461    
462     $cv->($kv);
463     1
464     },
465     );
466 root 1.4 };
467    
468     =item $cv = $fcp->client_get ($uri, $identifier, %kv)
469    
470     =item $status = $fcp->client_get_sync ($uri, $identifier, %kv)
471 root 1.1
472 root 1.4 %kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
473    
474     ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
475     priority_class, persistence, client_token, global, return_type,
476     binary_blob, allowed_mime_types, filename, temp_filename
477    
478     =cut
479    
480     _txn client_get => sub {
481     my ($self, $cv, $uri, $identifier, %kv) = @_;
482    
483     $self->send_msg (client_get =>
484     %kv,
485     uri => $uri,
486     identifier => $identifier,
487     id_cb => sub {
488     my ($self, $type, $kv, $rdata) = @_;
489    
490     $cv->($kv);
491     1
492     },
493     );
494 root 1.2 };
495 root 1.1
496 root 1.9 =item $cv = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write)
497    
498     =item ($can_read, $can_write) = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write))
499    
500     The DDA test in FCP is probably the single most broken protocol - only
501     one directory test can be outstanding at any time, and some guessing and
502     heuristics are involved in mangling the paths.
503    
504     This function combines C<TestDDARequest> and C<TestDDAResponse> in one
505     request, handling file reading and writing as well.
506    
507     =cut
508    
509     _txn test_dda => sub {
510     my ($self, $cv, $local, $remote, $want_read, $want_write) = @_;
511    
512     $self->serialise (test_dda => sub {
513     my ($self, $guard) = @_;
514    
515     $self->send_msg (test_dda_request =>
516     directory => $remote,
517     want_read_directory => $want_read ? "true" : "false",
518     want_write_directory => $want_write ? "true" : "false",
519     );
520     $self->on (sub {
521     my ($type, $kv) = @_;
522    
523     if ($type eq "test_dda_reply") {
524     # the filenames are all relative to the server-side directory,
525     # which might or might not match $remote anymore, so we
526     # need to rewrite the paths to be relative to $local
527     for my $k (qw(read_filename write_filename)) {
528     my $f = $kv->{$k};
529     for my $dir ($kv->{directory}, $remote) {
530     if ($dir eq substr $f, 0, length $dir) {
531     substr $f, 0, 1 + length $dir, "";
532     $kv->{$k} = $f;
533     last;
534     }
535     }
536     }
537    
538     my %response = (directory => $remote);
539    
540     if (length $kv->{read_filename}) {
541     warn "$local/$kv->{read_filename}";#d#
542     if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
543     sysread $fh, my $buf, -s $fh;
544     $response{read_content} = $buf;
545     }
546     }
547    
548     if (length $kv->{write_filename}) {
549     if (open my $fh, ">:raw", "$local/$kv->{write_filename}") {
550     syswrite $fh, $kv->{content_to_write};
551     }
552     }
553    
554     $self->send_msg (test_dda_response => %response);
555    
556     $self->on (sub {
557     my ($type, $kv) = @_;
558    
559     $guard if 0; # reference
560    
561     if ($type eq "test_dda_complete") {
562     $cv->(
563     $kv->{read_directory_allowed} eq "true",
564     $kv->{write_directory_allowed} eq "true",
565     );
566     } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
567     $cv->croak ($kv->{extra_description});
568     return;
569     }
570    
571     1
572     });
573    
574     return;
575     } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
576     $cv->croak ($kv->{extra_description});
577     return;
578     }
579    
580     1
581     });
582     });
583     };
584    
585 root 1.1 =back
586    
587 root 1.3 =head1 EXAMPLE PROGRAM
588    
589     use AnyEvent::FCP;
590    
591     my $fcp = new AnyEvent::FCP;
592    
593     # let us look at the global request list
594     $fcp->watch_global (1, 0);
595    
596     # list them, synchronously
597     my $req = $fcp->list_persistent_requests_sync;
598    
599     # go through all requests
600     for my $req (values %$req) {
601     # skip jobs not directly-to-disk
602     next unless $req->{return_type} eq "disk";
603     # skip jobs not issued by FProxy
604     next unless $req->{identifier} =~ /^FProxy:/;
605    
606     if ($req->{data_found}) {
607     # file has been successfully downloaded
608    
609     ... move the file away
610     (left as exercise)
611    
612     # remove the request
613    
614     $fcp->remove_request (1, $req->{identifier});
615     } elsif ($req->{get_failed}) {
616     # request has failed
617     if ($req->{get_failed}{code} == 11) {
618     # too many path components, should restart
619     } else {
620     # other failure
621     }
622     } else {
623     # modify priorities randomly, to improve download rates
624     $fcp->modify_persistent_request (1, $req->{identifier}, undef, int 6 - 5 * (rand) ** 1.7)
625     if 0.1 > rand;
626     }
627     }
628    
629     # see if the dummy plugin is loaded, to ensure all previous requests have finished.
630     $fcp->get_plugin_info_sync ("dummy");
631    
632 root 1.1 =head1 SEE ALSO
633    
634 root 1.2 L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0>, L<Net::FCP>.
635 root 1.1
636     =head1 BUGS
637    
638     =head1 AUTHOR
639    
640     Marc Lehmann <schmorp@schmorp.de>
641     http://home.schmorp.de/
642    
643     =cut
644    
645     1
646