ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
Revision: 1.10
Committed: Tue Aug 4 00:50:25 2015 UTC (8 years, 9 months ago) by root
Branch: MAIN
Changes since 1.9: +32 -26 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::FCP - freenet client protocol 2.0
4    
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::FCP;
8 root 1.1
9 root 1.3 my $fcp = new AnyEvent::FCP;
10 root 1.1
11 root 1.7 # transactions return condvars
12 root 1.3 my $lp_cv = $fcp->list_peers;
13     my $pr_cv = $fcp->list_persistent_requests;
14    
15     my $peers = $lp_cv->recv;
16     my $reqs = $pr_cv->recv;
17 root 1.1
18     =head1 DESCRIPTION
19    
20     This module implements the freenet client protocol version 2.0, as used by
21     freenet 0.7. See L<Net::FCP> for the earlier freenet 0.5 version.
22    
23 root 1.3 See L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0> for a
24     description of what the messages do.
25 root 1.1
26     The module uses L<AnyEvent> to find a suitable event module.
27    
28 root 1.3 Only very little is implemented, ask if you need more, and look at the
29     example program later in this section.
30    
31 root 1.7 =head2 EXAMPLE
32    
33     This example fetches the download list and sets the priority of all files
34     with "a" in their name to "emergency":
35    
36     use AnyEvent::FCP;
37    
38     my $fcp = new AnyEvent::FCP;
39    
40     $fcp->watch_global_sync (1, 0);
41     my $req = $fcp->list_persistent_requests_sync;
42    
43     for my $req (values %$req) {
44     if ($req->{filename} =~ /a/) {
45     $fcp->modify_persistent_request_sync (1, $req->{identifier}, undef, 0);
46     }
47     }
48    
49 root 1.1 =head2 IMPORT TAGS
50    
51     Nothing much can be "imported" from this module right now.
52    
53     =head2 THE AnyEvent::FCP CLASS
54    
55     =over 4
56    
57     =cut
58    
59     package AnyEvent::FCP;
60    
61 root 1.2 use common::sense;
62    
63 root 1.1 use Carp;
64    
65 root 1.7 our $VERSION = '0.3';
66 root 1.1
67 root 1.2 use Scalar::Util ();
68 root 1.1
69     use AnyEvent;
70 root 1.2 use AnyEvent::Handle;
71 root 1.9 use AnyEvent::Util ();
72 root 1.1
73     sub touc($) {
74     local $_ = shift;
75 root 1.9 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
76 root 1.1 s/(?:^|_)(.)/\U$1/g;
77     $_
78     }
79    
80     sub tolc($) {
81     local $_ = shift;
82 root 1.9 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/i;
83     1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/i;
84 root 1.1 s/(?<=[a-z])(?=[A-Z])/_/g;
85     lc
86     }
87    
88     =item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name]
89    
90     Create a new FCP connection to the given host and port (default
91 root 1.2 127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
92 root 1.1
93 root 1.3 If no C<name> was specified, then AnyEvent::FCP will generate a
94     (hopefully) unique client name for you.
95    
96 root 1.6 You can install a progress callback that is being called with the AnyEvent::FCP
97     object, the type, a hashref with key-value pairs and a reference to any received data,
98     for all unsolicited messages.
99    
100     Example:
101    
102     sub progress_cb {
103     my ($self, $type, $kv, $rdata) = @_;
104    
105     if ($type eq "simple_progress") {
106     warn "$kv->{identifier} $kv->{succeeded}/$kv->{required}\n";
107     }
108     }
109    
110 root 1.3 =cut
111 root 1.1
112     sub new {
113     my $class = shift;
114     my $self = bless { @_ }, $class;
115    
116 root 1.6 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1";
117     $self->{port} ||= $ENV{FREDPORT} || 9481;
118     $self->{name} ||= time.rand.rand.rand; # lame
119 root 1.8 $self->{timeout} ||= 3600*2;
120 root 1.6 $self->{progress} ||= sub { };
121 root 1.2
122     $self->{id} = "a0";
123    
124     {
125     Scalar::Util::weaken (my $self = $self);
126    
127     $self->{hdl} = new AnyEvent::Handle
128     connect => [$self->{host} => $self->{port}],
129     timeout => $self->{timeout},
130     on_error => sub {
131 root 1.8 warn "@_\n";#d#
132 root 1.2 exit 1;
133     },
134     on_read => sub { $self->on_read (@_) },
135     on_eof => $self->{on_eof} || sub { };
136    
137     Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
138     }
139    
140     $self->send_msg (
141     client_hello =>
142     name => $self->{name},
143     expected_version => "2.0",
144     );
145 root 1.1
146     $self
147     }
148    
149 root 1.2 sub send_msg {
150     my ($self, $type, %kv) = @_;
151 root 1.1
152 root 1.2 my $data = delete $kv{data};
153 root 1.1
154 root 1.2 if (exists $kv{id_cb}) {
155 root 1.9 my $id = $kv{identifier} ||= ++$self->{id};
156 root 1.2 $self->{id}{$id} = delete $kv{id_cb};
157 root 1.1 }
158    
159 root 1.2 my $msg = (touc $type) . "\012"
160     . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
161 root 1.1
162 root 1.2 sub id {
163     my ($self) = @_;
164 root 1.1
165    
166 root 1.2 }
167 root 1.1
168     if (defined $data) {
169 root 1.2 $msg .= "DataLength=" . (length $data) . "\012"
170     . "Data\012$data";
171 root 1.1 } else {
172 root 1.2 $msg .= "EndMessage\012";
173 root 1.1 }
174    
175 root 1.2 $self->{hdl}->push_write ($msg);
176 root 1.1 }
177    
178 root 1.9 sub on {
179     my ($self, $cb) = @_;
180    
181     # cb return undef - message eaten, remove cb
182     # cb return 0 - message eaten
183     # cb return 1 - pass to next
184    
185     push @{ $self->{on} }, $cb;
186     }
187    
188     sub _push_queue {
189     my ($self, $queue) = @_;
190    
191     shift @$queue;
192     $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
193     if @$queue;
194     }
195    
196     # lock so only one $type (arbitrary string) is in flight,
197     # to work around horribly misdesigned protocol.
198     sub serialise {
199     my ($self, $type, $cb) = @_;
200    
201     my $queue = $self->{serialise}{$type} ||= [];
202     push @$queue, $cb;
203     $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
204     unless $#$queue;
205     }
206    
207 root 1.2 sub on_read {
208 root 1.1 my ($self) = @_;
209    
210 root 1.2 my $type;
211     my %kv;
212     my $rdata;
213    
214     my $done_cb = sub {
215     $kv{pkt_type} = $type;
216    
217 root 1.9 my $on = $self->{on};
218     for (0 .. $#$on) {
219 root 1.10 unless (my $res = $on->[$_]($self, $type, \%kv, $rdata)) {
220 root 1.9 splice @$on, $_, 1 unless defined $res;
221     return;
222     }
223     }
224    
225 root 1.2 if (my $cb = $self->{queue}[0]) {
226     $cb->($self, $type, \%kv, $rdata)
227     and shift @{ $self->{queue} };
228     } else {
229     $self->default_recv ($type, \%kv, $rdata);
230 root 1.1 }
231 root 1.2 };
232 root 1.1
233 root 1.2 my $hdr_cb; $hdr_cb = sub {
234     if ($_[1] =~ /^([^=]+)=(.*)$/) {
235     my ($k, $v) = ($1, $2);
236     my @k = split /\./, tolc $k;
237     my $ro = \\%kv;
238    
239     while (@k) {
240     my $k = shift @k;
241     if ($k =~ /^\d+$/) {
242     $ro = \$$ro->[$k];
243 root 1.1 } else {
244 root 1.2 $ro = \$$ro->{$k};
245 root 1.1 }
246     }
247    
248 root 1.2 $$ro = $v;
249 root 1.1
250 root 1.2 $_[0]->push_read (line => $hdr_cb);
251     } elsif ($_[1] eq "Data") {
252     $_[0]->push_read (chunk => delete $kv{data_length}, sub {
253     $rdata = \$_[1];
254     $done_cb->();
255     });
256     } elsif ($_[1] eq "EndMessage") {
257     $done_cb->();
258     } else {
259     die "protocol error, expected message end, got $_[1]\n";#d#
260     }
261     };
262 root 1.1
263 root 1.2 $self->{hdl}->push_read (line => sub {
264     $type = tolc $_[1];
265     $_[0]->push_read (line => $hdr_cb);
266     });
267     }
268 root 1.1
269 root 1.2 sub default_recv {
270     my ($self, $type, $kv, $rdata) = @_;
271    
272     if ($type eq "node_hello") {
273     $self->{node_hello} = $kv;
274     } elsif (exists $self->{id}{$kv->{identifier}}) {
275     $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
276     and delete $self->{id}{$kv->{identifier}};
277 root 1.1 } else {
278 root 1.6 &{ $self->{progress} };
279 root 1.1 }
280     }
281    
282 root 1.2 sub _txn {
283     my ($name, $sub) = @_;
284 root 1.1
285 root 1.2 *{$name} = sub {
286     splice @_, 1, 0, (my $cv = AnyEvent->condvar);
287     &$sub;
288     $cv
289     };
290 root 1.1
291 root 1.2 *{"$name\_sync"} = sub {
292     splice @_, 1, 0, (my $cv = AnyEvent->condvar);
293     &$sub;
294     $cv->recv
295     };
296 root 1.1 }
297    
298 root 1.3 =item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]])
299    
300     =item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]])
301    
302     =cut
303    
304 root 1.2 _txn list_peers => sub {
305     my ($self, $cv, $with_metadata, $with_volatile) = @_;
306 root 1.1
307 root 1.2 my @res;
308 root 1.1
309 root 1.2 $self->send_msg (list_peers =>
310     with_metadata => $with_metadata ? "true" : "false",
311     with_volatile => $with_volatile ? "true" : "false",
312     id_cb => sub {
313     my ($self, $type, $kv, $rdata) = @_;
314    
315     if ($type eq "end_list_peers") {
316     $cv->(\@res);
317     1
318     } else {
319     push @res, $kv;
320     0
321     }
322     },
323     );
324     };
325 root 1.1
326 root 1.3 =item $cv = $fcp->list_peer_notes ($node_identifier)
327    
328     =item $notes = $fcp->list_peer_notes_sync ($node_identifier)
329    
330     =cut
331    
332 root 1.2 _txn list_peer_notes => sub {
333     my ($self, $cv, $node_identifier) = @_;
334 root 1.1
335 root 1.2 $self->send_msg (list_peer_notes =>
336     node_identifier => $node_identifier,
337     id_cb => sub {
338     my ($self, $type, $kv, $rdata) = @_;
339    
340     $cv->($kv);
341     1
342     },
343     );
344     };
345 root 1.1
346 root 1.3 =item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
347    
348     =item $fcp->watch_global_sync ($enabled[, $verbosity_mask])
349    
350     =cut
351    
352 root 1.2 _txn watch_global => sub {
353     my ($self, $cv, $enabled, $verbosity_mask) = @_;
354 root 1.1
355 root 1.2 $self->send_msg (watch_global =>
356     enabled => $enabled ? "true" : "false",
357     defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
358     );
359 root 1.1
360 root 1.2 $cv->();
361     };
362 root 1.1
363 root 1.3 =item $cv = $fcp->list_persistent_requests
364    
365     =item $reqs = $fcp->list_persistent_requests_sync
366    
367     =cut
368    
369 root 1.2 _txn list_persistent_requests => sub {
370     my ($self, $cv) = @_;
371 root 1.1
372 root 1.10 $self->serialise (list_persistent_requests => sub {
373     my ($self, $guard) = @_;
374    
375     my %res;
376 root 1.1
377 root 1.10 $self->send_msg ("list_persistent_requests");
378 root 1.1
379 root 1.10 $self->on (sub {
380     my ($self, $type, $kv, $rdata) = @_;
381 root 1.2
382 root 1.10 $guard if 0;
383 root 1.2
384 root 1.10 if ($type eq "end_list_persistent_requests") {
385     $cv->(\%res);
386     return;
387 root 1.2 } else {
388 root 1.10 my $id = $kv->{identifier};
389    
390     if ($type =~ /^persistent_(get|put|put_dir)$/) {
391     $res{$id} = {
392     type => $1,
393     %{ $res{$id} },
394     %$kv,
395     };
396     } elsif ($type eq "simple_progress") {
397     delete $kv->{pkt_type}; # save memory
398     push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
399     } else {
400     $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
401     }
402 root 1.2 }
403 root 1.10
404     1
405     });
406     });
407 root 1.2 };
408 root 1.1
409 root 1.3 =item $cv = $fcp->remove_request ($global, $identifier)
410    
411     =item $status = $fcp->remove_request_sync ($global, $identifier)
412    
413     =cut
414    
415 root 1.2 _txn remove_request => sub {
416     my ($self, $cv, $global, $identifier) = @_;
417 root 1.1
418 root 1.2 $self->send_msg (remove_request =>
419     global => $global ? "true" : "false",
420     identifier => $identifier,
421     id_cb => sub {
422     my ($self, $type, $kv, $rdata) = @_;
423    
424     $cv->($kv);
425     1
426     },
427     );
428     };
429 root 1.1
430 root 1.3 =item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
431    
432 root 1.4 =item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
433 root 1.3
434     =cut
435    
436 root 1.2 _txn modify_persistent_request => sub {
437     my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
438 root 1.1
439 root 1.2 $self->send_msg (modify_persistent_request =>
440     global => $global ? "true" : "false",
441     defined $client_token ? (client_token => $client_token ) : (),
442     defined $priority_class ? (priority_class => $priority_class) : (),
443 root 1.4 identifier => $identifier,
444 root 1.2 id_cb => sub {
445     my ($self, $type, $kv, $rdata) = @_;
446    
447     $cv->($kv);
448     1
449     },
450     );
451     };
452 root 1.1
453 root 1.3 =item $cv = $fcp->get_plugin_info ($name, $detailed)
454    
455     =item $info = $fcp->get_plugin_info_sync ($name, $detailed)
456    
457     =cut
458    
459 root 1.2 _txn get_plugin_info => sub {
460     my ($self, $cv, $name, $detailed) = @_;
461 root 1.1
462 root 1.2 $self->send_msg (get_plugin_info =>
463     plugin_name => $name,
464     detailed => $detailed ? "true" : "false",
465     id_cb => sub {
466     my ($self, $type, $kv, $rdata) = @_;
467    
468     $cv->($kv);
469     1
470     },
471     );
472 root 1.4 };
473    
474     =item $cv = $fcp->client_get ($uri, $identifier, %kv)
475    
476     =item $status = $fcp->client_get_sync ($uri, $identifier, %kv)
477 root 1.1
478 root 1.4 %kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
479    
480     ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
481     priority_class, persistence, client_token, global, return_type,
482     binary_blob, allowed_mime_types, filename, temp_filename
483    
484     =cut
485    
486     _txn client_get => sub {
487     my ($self, $cv, $uri, $identifier, %kv) = @_;
488    
489     $self->send_msg (client_get =>
490     %kv,
491     uri => $uri,
492     identifier => $identifier,
493     id_cb => sub {
494     my ($self, $type, $kv, $rdata) = @_;
495    
496     $cv->($kv);
497     1
498     },
499     );
500 root 1.2 };
501 root 1.1
502 root 1.9 =item $cv = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write)
503    
504     =item ($can_read, $can_write) = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write))
505    
506     The DDA test in FCP is probably the single most broken protocol - only
507     one directory test can be outstanding at any time, and some guessing and
508     heuristics are involved in mangling the paths.
509    
510     This function combines C<TestDDARequest> and C<TestDDAResponse> in one
511     request, handling file reading and writing as well.
512    
513     =cut
514    
515     _txn test_dda => sub {
516     my ($self, $cv, $local, $remote, $want_read, $want_write) = @_;
517    
518     $self->serialise (test_dda => sub {
519     my ($self, $guard) = @_;
520    
521     $self->send_msg (test_dda_request =>
522     directory => $remote,
523     want_read_directory => $want_read ? "true" : "false",
524     want_write_directory => $want_write ? "true" : "false",
525     );
526     $self->on (sub {
527 root 1.10 my ($self, $type, $kv) = @_;
528 root 1.9
529     if ($type eq "test_dda_reply") {
530     # the filenames are all relative to the server-side directory,
531     # which might or might not match $remote anymore, so we
532     # need to rewrite the paths to be relative to $local
533     for my $k (qw(read_filename write_filename)) {
534     my $f = $kv->{$k};
535     for my $dir ($kv->{directory}, $remote) {
536     if ($dir eq substr $f, 0, length $dir) {
537     substr $f, 0, 1 + length $dir, "";
538     $kv->{$k} = $f;
539     last;
540     }
541     }
542     }
543    
544     my %response = (directory => $remote);
545    
546     if (length $kv->{read_filename}) {
547     warn "$local/$kv->{read_filename}";#d#
548     if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
549     sysread $fh, my $buf, -s $fh;
550     $response{read_content} = $buf;
551     }
552     }
553    
554     if (length $kv->{write_filename}) {
555     if (open my $fh, ">:raw", "$local/$kv->{write_filename}") {
556     syswrite $fh, $kv->{content_to_write};
557     }
558     }
559    
560     $self->send_msg (test_dda_response => %response);
561    
562     $self->on (sub {
563 root 1.10 my ($self, $type, $kv) = @_;
564 root 1.9
565     $guard if 0; # reference
566    
567     if ($type eq "test_dda_complete") {
568     $cv->(
569     $kv->{read_directory_allowed} eq "true",
570     $kv->{write_directory_allowed} eq "true",
571     );
572     } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
573     $cv->croak ($kv->{extra_description});
574     return;
575     }
576    
577     1
578     });
579    
580     return;
581     } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
582     $cv->croak ($kv->{extra_description});
583     return;
584     }
585    
586     1
587     });
588     });
589     };
590    
591 root 1.1 =back
592    
593 root 1.3 =head1 EXAMPLE PROGRAM
594    
595     use AnyEvent::FCP;
596    
597     my $fcp = new AnyEvent::FCP;
598    
599     # let us look at the global request list
600     $fcp->watch_global (1, 0);
601    
602     # list them, synchronously
603     my $req = $fcp->list_persistent_requests_sync;
604    
605     # go through all requests
606     for my $req (values %$req) {
607     # skip jobs not directly-to-disk
608     next unless $req->{return_type} eq "disk";
609     # skip jobs not issued by FProxy
610     next unless $req->{identifier} =~ /^FProxy:/;
611    
612     if ($req->{data_found}) {
613     # file has been successfully downloaded
614    
615     ... move the file away
616     (left as exercise)
617    
618     # remove the request
619    
620     $fcp->remove_request (1, $req->{identifier});
621     } elsif ($req->{get_failed}) {
622     # request has failed
623     if ($req->{get_failed}{code} == 11) {
624     # too many path components, should restart
625     } else {
626     # other failure
627     }
628     } else {
629     # modify priorities randomly, to improve download rates
630     $fcp->modify_persistent_request (1, $req->{identifier}, undef, int 6 - 5 * (rand) ** 1.7)
631     if 0.1 > rand;
632     }
633     }
634    
635     # see if the dummy plugin is loaded, to ensure all previous requests have finished.
636     $fcp->get_plugin_info_sync ("dummy");
637    
638 root 1.1 =head1 SEE ALSO
639    
640 root 1.2 L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0>, L<Net::FCP>.
641 root 1.1
642     =head1 BUGS
643    
644     =head1 AUTHOR
645    
646     Marc Lehmann <schmorp@schmorp.de>
647     http://home.schmorp.de/
648    
649     =cut
650    
651     1
652