ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
Revision: 1.8
Committed: Fri Jun 18 16:59:13 2010 UTC (13 years, 11 months ago) by root
Branch: MAIN
Changes since 1.7: +2 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::FCP - freenet client protocol 2.0
4    
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::FCP;
8 root 1.1
9 root 1.3 my $fcp = new AnyEvent::FCP;
10 root 1.1
11 root 1.7 # transactions return condvars
12 root 1.3 my $lp_cv = $fcp->list_peers;
13     my $pr_cv = $fcp->list_persistent_requests;
14    
15     my $peers = $lp_cv->recv;
16     my $reqs = $pr_cv->recv;
17 root 1.1
18     =head1 DESCRIPTION
19    
20     This module implements the freenet client protocol version 2.0, as used by
21     freenet 0.7. See L<Net::FCP> for the earlier freenet 0.5 version.
22    
23 root 1.3 See L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0> for a
24     description of what the messages do.
25 root 1.1
26     The module uses L<AnyEvent> to find a suitable event module.
27    
28 root 1.3 Only very little is implemented, ask if you need more, and look at the
29     example program later in this section.
30    
31 root 1.7 =head2 EXAMPLE
32    
33     This example fetches the download list and sets the priority of all files
34     with "a" in their name to "emergency":
35    
36     use AnyEvent::FCP;
37    
38     my $fcp = new AnyEvent::FCP;
39    
40     $fcp->watch_global_sync (1, 0);
41     my $req = $fcp->list_persistent_requests_sync;
42    
43     for my $req (values %$req) {
44     if ($req->{filename} =~ /a/) {
45     $fcp->modify_persistent_request_sync (1, $req->{identifier}, undef, 0);
46     }
47     }
48    
49 root 1.1 =head2 IMPORT TAGS
50    
51     Nothing much can be "imported" from this module right now.
52    
53     =head2 THE AnyEvent::FCP CLASS
54    
55     =over 4
56    
57     =cut
58    
59     package AnyEvent::FCP;
60    
61 root 1.2 use common::sense;
62    
63 root 1.1 use Carp;
64    
65 root 1.7 our $VERSION = '0.3';
66 root 1.1
67 root 1.2 use Scalar::Util ();
68 root 1.1
69     use AnyEvent;
70 root 1.2 use AnyEvent::Handle;
71 root 1.1
72     sub touc($) {
73     local $_ = shift;
74 root 1.4 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime)(?:_|$))/\U$1/;
75 root 1.1 s/(?:^|_)(.)/\U$1/g;
76     $_
77     }
78    
79     sub tolc($) {
80     local $_ = shift;
81 root 1.4 1 while s/(SVK|CHK|URI|FCP|DS|MIME)([^_])/$1\_$2/i;
82     1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME)/$1\_$2/i;
83 root 1.1 s/(?<=[a-z])(?=[A-Z])/_/g;
84     lc
85     }
86    
87     =item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name]
88    
89     Create a new FCP connection to the given host and port (default
90 root 1.2 127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
91 root 1.1
92 root 1.3 If no C<name> was specified, then AnyEvent::FCP will generate a
93     (hopefully) unique client name for you.
94    
95 root 1.6 You can install a progress callback that is being called with the AnyEvent::FCP
96     object, the type, a hashref with key-value pairs and a reference to any received data,
97     for all unsolicited messages.
98    
99     Example:
100    
101     sub progress_cb {
102     my ($self, $type, $kv, $rdata) = @_;
103    
104     if ($type eq "simple_progress") {
105     warn "$kv->{identifier} $kv->{succeeded}/$kv->{required}\n";
106     }
107     }
108    
109 root 1.3 =cut
110 root 1.1
111     sub new {
112     my $class = shift;
113     my $self = bless { @_ }, $class;
114    
115 root 1.6 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1";
116     $self->{port} ||= $ENV{FREDPORT} || 9481;
117     $self->{name} ||= time.rand.rand.rand; # lame
118 root 1.8 $self->{timeout} ||= 3600*2;
119 root 1.6 $self->{progress} ||= sub { };
120 root 1.2
121     $self->{id} = "a0";
122    
123     {
124     Scalar::Util::weaken (my $self = $self);
125    
126     $self->{hdl} = new AnyEvent::Handle
127     connect => [$self->{host} => $self->{port}],
128     timeout => $self->{timeout},
129     on_error => sub {
130 root 1.8 warn "@_\n";#d#
131 root 1.2 exit 1;
132     },
133     on_read => sub { $self->on_read (@_) },
134     on_eof => $self->{on_eof} || sub { };
135    
136     Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
137     }
138    
139     $self->send_msg (
140     client_hello =>
141     name => $self->{name},
142     expected_version => "2.0",
143     );
144 root 1.1
145     $self
146     }
147    
148 root 1.2 sub send_msg {
149     my ($self, $type, %kv) = @_;
150 root 1.1
151 root 1.2 my $data = delete $kv{data};
152 root 1.1
153 root 1.2 if (exists $kv{id_cb}) {
154     my $id = $kv{identifier} || ++$self->{id};
155     $self->{id}{$id} = delete $kv{id_cb};
156     $kv{identifier} = $id;
157 root 1.1 }
158    
159 root 1.2 my $msg = (touc $type) . "\012"
160     . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
161 root 1.1
162 root 1.2 sub id {
163     my ($self) = @_;
164 root 1.1
165    
166 root 1.2 }
167 root 1.1
168     if (defined $data) {
169 root 1.2 $msg .= "DataLength=" . (length $data) . "\012"
170     . "Data\012$data";
171 root 1.1 } else {
172 root 1.2 $msg .= "EndMessage\012";
173 root 1.1 }
174    
175 root 1.2 $self->{hdl}->push_write ($msg);
176 root 1.1 }
177    
178 root 1.2 sub on_read {
179 root 1.1 my ($self) = @_;
180    
181 root 1.2 my $type;
182     my %kv;
183     my $rdata;
184    
185     my $done_cb = sub {
186     $kv{pkt_type} = $type;
187    
188     if (my $cb = $self->{queue}[0]) {
189     $cb->($self, $type, \%kv, $rdata)
190     and shift @{ $self->{queue} };
191     } else {
192     $self->default_recv ($type, \%kv, $rdata);
193 root 1.1 }
194 root 1.2 };
195 root 1.1
196 root 1.2 my $hdr_cb; $hdr_cb = sub {
197     if ($_[1] =~ /^([^=]+)=(.*)$/) {
198     my ($k, $v) = ($1, $2);
199     my @k = split /\./, tolc $k;
200     my $ro = \\%kv;
201    
202     while (@k) {
203     my $k = shift @k;
204     if ($k =~ /^\d+$/) {
205     $ro = \$$ro->[$k];
206 root 1.1 } else {
207 root 1.2 $ro = \$$ro->{$k};
208 root 1.1 }
209     }
210    
211 root 1.2 $$ro = $v;
212 root 1.1
213 root 1.2 $_[0]->push_read (line => $hdr_cb);
214     } elsif ($_[1] eq "Data") {
215     $_[0]->push_read (chunk => delete $kv{data_length}, sub {
216     $rdata = \$_[1];
217     $done_cb->();
218     });
219     } elsif ($_[1] eq "EndMessage") {
220     $done_cb->();
221     } else {
222     die "protocol error, expected message end, got $_[1]\n";#d#
223     }
224     };
225 root 1.1
226 root 1.2 $self->{hdl}->push_read (line => sub {
227     $type = tolc $_[1];
228     $_[0]->push_read (line => $hdr_cb);
229     });
230     }
231 root 1.1
232 root 1.2 sub default_recv {
233     my ($self, $type, $kv, $rdata) = @_;
234    
235     if ($type eq "node_hello") {
236     $self->{node_hello} = $kv;
237     } elsif (exists $self->{id}{$kv->{identifier}}) {
238     $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
239     and delete $self->{id}{$kv->{identifier}};
240 root 1.1 } else {
241 root 1.6 &{ $self->{progress} };
242 root 1.1 }
243     }
244    
245 root 1.2 sub _txn {
246     my ($name, $sub) = @_;
247 root 1.1
248 root 1.2 *{$name} = sub {
249     splice @_, 1, 0, (my $cv = AnyEvent->condvar);
250     &$sub;
251     $cv
252     };
253 root 1.1
254 root 1.2 *{"$name\_sync"} = sub {
255     splice @_, 1, 0, (my $cv = AnyEvent->condvar);
256     &$sub;
257     $cv->recv
258     };
259 root 1.1 }
260    
261 root 1.3 =item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]])
262    
263     =item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]])
264    
265     =cut
266    
267 root 1.2 _txn list_peers => sub {
268     my ($self, $cv, $with_metadata, $with_volatile) = @_;
269 root 1.1
270 root 1.2 my @res;
271 root 1.1
272 root 1.2 $self->send_msg (list_peers =>
273     with_metadata => $with_metadata ? "true" : "false",
274     with_volatile => $with_volatile ? "true" : "false",
275     id_cb => sub {
276     my ($self, $type, $kv, $rdata) = @_;
277    
278     if ($type eq "end_list_peers") {
279     $cv->(\@res);
280     1
281     } else {
282     push @res, $kv;
283     0
284     }
285     },
286     );
287     };
288 root 1.1
289 root 1.3 =item $cv = $fcp->list_peer_notes ($node_identifier)
290    
291     =item $notes = $fcp->list_peer_notes_sync ($node_identifier)
292    
293     =cut
294    
295 root 1.2 _txn list_peer_notes => sub {
296     my ($self, $cv, $node_identifier) = @_;
297 root 1.1
298 root 1.2 $self->send_msg (list_peer_notes =>
299     node_identifier => $node_identifier,
300     id_cb => sub {
301     my ($self, $type, $kv, $rdata) = @_;
302    
303     $cv->($kv);
304     1
305     },
306     );
307     };
308 root 1.1
309 root 1.3 =item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
310    
311     =item $fcp->watch_global_sync ($enabled[, $verbosity_mask])
312    
313     =cut
314    
315 root 1.2 _txn watch_global => sub {
316     my ($self, $cv, $enabled, $verbosity_mask) = @_;
317 root 1.1
318 root 1.2 $self->send_msg (watch_global =>
319     enabled => $enabled ? "true" : "false",
320     defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
321     );
322 root 1.1
323 root 1.2 $cv->();
324     };
325 root 1.1
326 root 1.3 =item $cv = $fcp->list_persistent_requests
327    
328     =item $reqs = $fcp->list_persistent_requests_sync
329    
330     =cut
331    
332 root 1.2 _txn list_persistent_requests => sub {
333     my ($self, $cv) = @_;
334 root 1.1
335 root 1.2 my %res;
336 root 1.1
337 root 1.2 $self->send_msg ("list_persistent_requests");
338 root 1.1
339 root 1.2 push @{ $self->{queue} }, sub {
340     my ($self, $type, $kv, $rdata) = @_;
341    
342     if ($type eq "end_list_persistent_requests") {
343     $cv->(\%res);
344     1
345     } else {
346     my $id = $kv->{identifier};
347    
348     if ($type =~ /^persistent_(get|put|put_dir)$/) {
349     $res{$id} = {
350     type => $1,
351     %{ $res{$id} },
352     %$kv,
353     };
354     } elsif ($type eq "simple_progress") {
355     delete $kv->{pkt_type}; # save memory
356     push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
357     } else {
358     $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
359     }
360     0
361     }
362 root 1.1 };
363 root 1.2 };
364 root 1.1
365 root 1.3 =item $cv = $fcp->remove_request ($global, $identifier)
366    
367     =item $status = $fcp->remove_request_sync ($global, $identifier)
368    
369     =cut
370    
371 root 1.2 _txn remove_request => sub {
372     my ($self, $cv, $global, $identifier) = @_;
373 root 1.1
374 root 1.2 $self->send_msg (remove_request =>
375     global => $global ? "true" : "false",
376     identifier => $identifier,
377     id_cb => sub {
378     my ($self, $type, $kv, $rdata) = @_;
379    
380     $cv->($kv);
381     1
382     },
383     );
384     };
385 root 1.1
386 root 1.3 =item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
387    
388 root 1.4 =item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
389 root 1.3
390     =cut
391    
392 root 1.2 _txn modify_persistent_request => sub {
393     my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
394 root 1.1
395 root 1.2 $self->send_msg (modify_persistent_request =>
396     global => $global ? "true" : "false",
397     defined $client_token ? (client_token => $client_token ) : (),
398     defined $priority_class ? (priority_class => $priority_class) : (),
399 root 1.4 identifier => $identifier,
400 root 1.2 id_cb => sub {
401     my ($self, $type, $kv, $rdata) = @_;
402    
403     $cv->($kv);
404     1
405     },
406     );
407     };
408 root 1.1
409 root 1.3 =item $cv = $fcp->get_plugin_info ($name, $detailed)
410    
411     =item $info = $fcp->get_plugin_info_sync ($name, $detailed)
412    
413     =cut
414    
415 root 1.2 _txn get_plugin_info => sub {
416     my ($self, $cv, $name, $detailed) = @_;
417 root 1.1
418 root 1.2 $self->send_msg (get_plugin_info =>
419     plugin_name => $name,
420     detailed => $detailed ? "true" : "false",
421     id_cb => sub {
422     my ($self, $type, $kv, $rdata) = @_;
423    
424     $cv->($kv);
425     1
426     },
427     );
428 root 1.4 };
429    
430     =item $cv = $fcp->client_get ($uri, $identifier, %kv)
431    
432     =item $status = $fcp->client_get_sync ($uri, $identifier, %kv)
433 root 1.1
434 root 1.4 %kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
435    
436     ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
437     priority_class, persistence, client_token, global, return_type,
438     binary_blob, allowed_mime_types, filename, temp_filename
439    
440     =cut
441    
442     _txn client_get => sub {
443     my ($self, $cv, $uri, $identifier, %kv) = @_;
444    
445     $self->send_msg (client_get =>
446     %kv,
447     uri => $uri,
448     identifier => $identifier,
449     id_cb => sub {
450     my ($self, $type, $kv, $rdata) = @_;
451    
452     $cv->($kv);
453     1
454     },
455     );
456 root 1.2 };
457 root 1.1
458     =back
459    
460 root 1.3 =head1 EXAMPLE PROGRAM
461    
462     use AnyEvent::FCP;
463    
464     my $fcp = new AnyEvent::FCP;
465    
466     # let us look at the global request list
467     $fcp->watch_global (1, 0);
468    
469     # list them, synchronously
470     my $req = $fcp->list_persistent_requests_sync;
471    
472     # go through all requests
473     for my $req (values %$req) {
474     # skip jobs not directly-to-disk
475     next unless $req->{return_type} eq "disk";
476     # skip jobs not issued by FProxy
477     next unless $req->{identifier} =~ /^FProxy:/;
478    
479     if ($req->{data_found}) {
480     # file has been successfully downloaded
481    
482     ... move the file away
483     (left as exercise)
484    
485     # remove the request
486    
487     $fcp->remove_request (1, $req->{identifier});
488     } elsif ($req->{get_failed}) {
489     # request has failed
490     if ($req->{get_failed}{code} == 11) {
491     # too many path components, should restart
492     } else {
493     # other failure
494     }
495     } else {
496     # modify priorities randomly, to improve download rates
497     $fcp->modify_persistent_request (1, $req->{identifier}, undef, int 6 - 5 * (rand) ** 1.7)
498     if 0.1 > rand;
499     }
500     }
501    
502     # see if the dummy plugin is loaded, to ensure all previous requests have finished.
503     $fcp->get_plugin_info_sync ("dummy");
504    
505 root 1.1 =head1 SEE ALSO
506    
507 root 1.2 L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0>, L<Net::FCP>.
508 root 1.1
509     =head1 BUGS
510    
511     =head1 AUTHOR
512    
513     Marc Lehmann <schmorp@schmorp.de>
514     http://home.schmorp.de/
515    
516     =cut
517    
518     1
519