ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
Revision: 1.5
Committed: Tue Dec 1 13:49:56 2009 UTC (14 years, 5 months ago) by root
Branch: MAIN
CVS Tags: rel-0_21
Changes since 1.4: +1 -1 lines
Log Message:
0.21

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::FCP - freenet client protocol 2.0
4    
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::FCP;
8 root 1.1
9 root 1.3 my $fcp = new AnyEvent::FCP;
10 root 1.1
11 root 1.3 # transactions return condvars
12     my $lp_cv = $fcp->list_peers;
13     my $pr_cv = $fcp->list_persistent_requests;
14    
15     my $peers = $lp_cv->recv;
16     my $reqs = $pr_cv->recv;
17 root 1.1
18     =head1 DESCRIPTION
19    
20     This module implements the freenet client protocol version 2.0, as used by
21     freenet 0.7. See L<Net::FCP> for the earlier freenet 0.5 version.
22    
23 root 1.3 See L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0> for a
24     description of what the messages do.
25 root 1.1
26     The module uses L<AnyEvent> to find a suitable event module.
27    
28 root 1.3 Only very little is implemented, ask if you need more, and look at the
29     example program later in this section.
30    
31 root 1.1 =head2 IMPORT TAGS
32    
33     Nothing much can be "imported" from this module right now.
34    
35     =head2 THE AnyEvent::FCP CLASS
36    
37     =over 4
38    
39     =cut
40    
41     package AnyEvent::FCP;
42    
43 root 1.2 use common::sense;
44    
45 root 1.1 use Carp;
46    
47 root 1.5 our $VERSION = '0.21';
48 root 1.1
49 root 1.2 use Scalar::Util ();
50 root 1.1
51     use AnyEvent;
52 root 1.2 use AnyEvent::Handle;
53 root 1.1
54     sub touc($) {
55     local $_ = shift;
56 root 1.4 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime)(?:_|$))/\U$1/;
57 root 1.1 s/(?:^|_)(.)/\U$1/g;
58     $_
59     }
60    
61     sub tolc($) {
62     local $_ = shift;
63 root 1.4 1 while s/(SVK|CHK|URI|FCP|DS|MIME)([^_])/$1\_$2/i;
64     1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME)/$1\_$2/i;
65 root 1.1 s/(?<=[a-z])(?=[A-Z])/_/g;
66     lc
67     }
68    
69     =item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name]
70    
71     Create a new FCP connection to the given host and port (default
72 root 1.2 127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
73 root 1.1
74 root 1.3 If no C<name> was specified, then AnyEvent::FCP will generate a
75     (hopefully) unique client name for you.
76    
77     =cut
78 root 1.1
79     #TODO
80 root 1.2 #You can install a progress callback that is being called with the AnyEvent::FCP
81 root 1.1 #object, a txn object, the type of the transaction and the attributes. Use
82     #it like this:
83     #
84     # sub progress_cb {
85     # my ($self, $txn, $type, $attr) = @_;
86     #
87     # warn "progress<$txn,$type," . (join ":", %$attr) . ">\n";
88     # }
89    
90     sub new {
91     my $class = shift;
92     my $self = bless { @_ }, $class;
93    
94 root 1.2 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1";
95     $self->{port} ||= $ENV{FREDPORT} || 9481;
96     $self->{name} ||= time.rand.rand.rand; # lame
97     $self->{timeout} ||= 600;
98    
99     $self->{id} = "a0";
100    
101     {
102     Scalar::Util::weaken (my $self = $self);
103    
104     $self->{hdl} = new AnyEvent::Handle
105     connect => [$self->{host} => $self->{port}],
106     timeout => $self->{timeout},
107     on_error => sub {
108     warn "<@_>\n";
109     exit 1;
110     },
111     on_read => sub { $self->on_read (@_) },
112     on_eof => $self->{on_eof} || sub { };
113    
114     Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
115     }
116    
117     $self->send_msg (
118     client_hello =>
119     name => $self->{name},
120     expected_version => "2.0",
121     );
122 root 1.1
123     $self
124     }
125    
126 root 1.3 #sub progress {
127     # my ($self, $txn, $type, $attr) = @_;
128     #
129     # $self->{progress}->($self, $txn, $type, $attr)
130     # if $self->{progress};
131     #}
132 root 1.1
133 root 1.2 sub send_msg {
134     my ($self, $type, %kv) = @_;
135 root 1.1
136 root 1.2 my $data = delete $kv{data};
137 root 1.1
138 root 1.2 if (exists $kv{id_cb}) {
139     my $id = $kv{identifier} || ++$self->{id};
140     $self->{id}{$id} = delete $kv{id_cb};
141     $kv{identifier} = $id;
142 root 1.1 }
143    
144 root 1.2 my $msg = (touc $type) . "\012"
145     . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
146 root 1.1
147 root 1.2 sub id {
148     my ($self) = @_;
149 root 1.1
150    
151 root 1.2 }
152 root 1.1
153     if (defined $data) {
154 root 1.2 $msg .= "DataLength=" . (length $data) . "\012"
155     . "Data\012$data";
156 root 1.1 } else {
157 root 1.2 $msg .= "EndMessage\012";
158 root 1.1 }
159    
160 root 1.2 $self->{hdl}->push_write ($msg);
161 root 1.1 }
162    
163 root 1.2 sub on_read {
164 root 1.1 my ($self) = @_;
165    
166 root 1.2 my $type;
167     my %kv;
168     my $rdata;
169    
170     my $done_cb = sub {
171     $kv{pkt_type} = $type;
172    
173     if (my $cb = $self->{queue}[0]) {
174     $cb->($self, $type, \%kv, $rdata)
175     and shift @{ $self->{queue} };
176     } else {
177     $self->default_recv ($type, \%kv, $rdata);
178 root 1.1 }
179 root 1.2 };
180 root 1.1
181 root 1.2 my $hdr_cb; $hdr_cb = sub {
182     if ($_[1] =~ /^([^=]+)=(.*)$/) {
183     my ($k, $v) = ($1, $2);
184     my @k = split /\./, tolc $k;
185     my $ro = \\%kv;
186    
187     while (@k) {
188     my $k = shift @k;
189     if ($k =~ /^\d+$/) {
190     $ro = \$$ro->[$k];
191 root 1.1 } else {
192 root 1.2 $ro = \$$ro->{$k};
193 root 1.1 }
194     }
195    
196 root 1.2 $$ro = $v;
197 root 1.1
198 root 1.2 $_[0]->push_read (line => $hdr_cb);
199     } elsif ($_[1] eq "Data") {
200     $_[0]->push_read (chunk => delete $kv{data_length}, sub {
201     $rdata = \$_[1];
202     $done_cb->();
203     });
204     } elsif ($_[1] eq "EndMessage") {
205     $done_cb->();
206     } else {
207     die "protocol error, expected message end, got $_[1]\n";#d#
208     }
209     };
210 root 1.1
211 root 1.2 $self->{hdl}->push_read (line => sub {
212     $type = tolc $_[1];
213     $_[0]->push_read (line => $hdr_cb);
214     });
215     }
216 root 1.1
217 root 1.2 sub default_recv {
218     my ($self, $type, $kv, $rdata) = @_;
219    
220     if ($type eq "node_hello") {
221     $self->{node_hello} = $kv;
222     } elsif (exists $self->{id}{$kv->{identifier}}) {
223     $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
224     and delete $self->{id}{$kv->{identifier}};
225 root 1.1 } else {
226 root 1.2 # on_warn
227     #warn "protocol warning (unexpected $type message)\n";
228 root 1.1 }
229     }
230    
231 root 1.2 sub _txn {
232     my ($name, $sub) = @_;
233 root 1.1
234 root 1.2 *{$name} = sub {
235     splice @_, 1, 0, (my $cv = AnyEvent->condvar);
236     &$sub;
237     $cv
238     };
239 root 1.1
240 root 1.2 *{"$name\_sync"} = sub {
241     splice @_, 1, 0, (my $cv = AnyEvent->condvar);
242     &$sub;
243     $cv->recv
244     };
245 root 1.1 }
246    
247 root 1.3 =item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]])
248    
249     =item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]])
250    
251     =cut
252    
253 root 1.2 _txn list_peers => sub {
254     my ($self, $cv, $with_metadata, $with_volatile) = @_;
255 root 1.1
256 root 1.2 my @res;
257 root 1.1
258 root 1.2 $self->send_msg (list_peers =>
259     with_metadata => $with_metadata ? "true" : "false",
260     with_volatile => $with_volatile ? "true" : "false",
261     id_cb => sub {
262     my ($self, $type, $kv, $rdata) = @_;
263    
264     if ($type eq "end_list_peers") {
265     $cv->(\@res);
266     1
267     } else {
268     push @res, $kv;
269     0
270     }
271     },
272     );
273     };
274 root 1.1
275 root 1.3 =item $cv = $fcp->list_peer_notes ($node_identifier)
276    
277     =item $notes = $fcp->list_peer_notes_sync ($node_identifier)
278    
279     =cut
280    
281 root 1.2 _txn list_peer_notes => sub {
282     my ($self, $cv, $node_identifier) = @_;
283 root 1.1
284 root 1.2 $self->send_msg (list_peer_notes =>
285     node_identifier => $node_identifier,
286     id_cb => sub {
287     my ($self, $type, $kv, $rdata) = @_;
288    
289     $cv->($kv);
290     1
291     },
292     );
293     };
294 root 1.1
295 root 1.3 =item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
296    
297     =item $fcp->watch_global_sync ($enabled[, $verbosity_mask])
298    
299     =cut
300    
301 root 1.2 _txn watch_global => sub {
302     my ($self, $cv, $enabled, $verbosity_mask) = @_;
303 root 1.1
304 root 1.2 $self->send_msg (watch_global =>
305     enabled => $enabled ? "true" : "false",
306     defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
307     );
308 root 1.1
309 root 1.2 $cv->();
310     };
311 root 1.1
312 root 1.3 =item $cv = $fcp->list_persistent_requests
313    
314     =item $reqs = $fcp->list_persistent_requests_sync
315    
316     =cut
317    
318 root 1.2 _txn list_persistent_requests => sub {
319     my ($self, $cv) = @_;
320 root 1.1
321 root 1.2 my %res;
322 root 1.1
323 root 1.2 $self->send_msg ("list_persistent_requests");
324 root 1.1
325 root 1.2 push @{ $self->{queue} }, sub {
326     my ($self, $type, $kv, $rdata) = @_;
327    
328     if ($type eq "end_list_persistent_requests") {
329     $cv->(\%res);
330     1
331     } else {
332     my $id = $kv->{identifier};
333    
334     if ($type =~ /^persistent_(get|put|put_dir)$/) {
335     $res{$id} = {
336     type => $1,
337     %{ $res{$id} },
338     %$kv,
339     };
340     } elsif ($type eq "simple_progress") {
341     delete $kv->{pkt_type}; # save memory
342     push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
343     } else {
344     $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
345     }
346     0
347     }
348 root 1.1 };
349 root 1.2 };
350 root 1.1
351 root 1.3 =item $cv = $fcp->remove_request ($global, $identifier)
352    
353     =item $status = $fcp->remove_request_sync ($global, $identifier)
354    
355     =cut
356    
357 root 1.2 _txn remove_request => sub {
358     my ($self, $cv, $global, $identifier) = @_;
359 root 1.1
360 root 1.2 $self->send_msg (remove_request =>
361     global => $global ? "true" : "false",
362     identifier => $identifier,
363     id_cb => sub {
364     my ($self, $type, $kv, $rdata) = @_;
365    
366     $cv->($kv);
367     1
368     },
369     );
370     };
371 root 1.1
372 root 1.3 =item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
373    
374 root 1.4 =item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
375 root 1.3
376     =cut
377    
378 root 1.2 _txn modify_persistent_request => sub {
379     my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
380 root 1.1
381 root 1.2 $self->send_msg (modify_persistent_request =>
382     global => $global ? "true" : "false",
383     defined $client_token ? (client_token => $client_token ) : (),
384     defined $priority_class ? (priority_class => $priority_class) : (),
385 root 1.4 identifier => $identifier,
386 root 1.2 id_cb => sub {
387     my ($self, $type, $kv, $rdata) = @_;
388    
389     $cv->($kv);
390     1
391     },
392     );
393     };
394 root 1.1
395 root 1.3 =item $cv = $fcp->get_plugin_info ($name, $detailed)
396    
397     =item $info = $fcp->get_plugin_info_sync ($name, $detailed)
398    
399     =cut
400    
401 root 1.2 _txn get_plugin_info => sub {
402     my ($self, $cv, $name, $detailed) = @_;
403 root 1.1
404 root 1.2 $self->send_msg (get_plugin_info =>
405     plugin_name => $name,
406     detailed => $detailed ? "true" : "false",
407     id_cb => sub {
408     my ($self, $type, $kv, $rdata) = @_;
409    
410     $cv->($kv);
411     1
412     },
413     );
414 root 1.4 };
415    
416     =item $cv = $fcp->client_get ($uri, $identifier, %kv)
417    
418     =item $status = $fcp->client_get_sync ($uri, $identifier, %kv)
419 root 1.1
420 root 1.4 %kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
421    
422     ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
423     priority_class, persistence, client_token, global, return_type,
424     binary_blob, allowed_mime_types, filename, temp_filename
425    
426     =cut
427    
428     _txn client_get => sub {
429     my ($self, $cv, $uri, $identifier, %kv) = @_;
430    
431     $self->send_msg (client_get =>
432     %kv,
433     uri => $uri,
434     identifier => $identifier,
435     id_cb => sub {
436     my ($self, $type, $kv, $rdata) = @_;
437    
438     $cv->($kv);
439     1
440     },
441     );
442 root 1.2 };
443 root 1.1
444     =back
445    
446 root 1.3 =head1 EXAMPLE PROGRAM
447    
448     use AnyEvent::FCP;
449    
450     my $fcp = new AnyEvent::FCP;
451    
452     # let us look at the global request list
453     $fcp->watch_global (1, 0);
454    
455     # list them, synchronously
456     my $req = $fcp->list_persistent_requests_sync;
457    
458     # go through all requests
459     for my $req (values %$req) {
460     # skip jobs not directly-to-disk
461     next unless $req->{return_type} eq "disk";
462     # skip jobs not issued by FProxy
463     next unless $req->{identifier} =~ /^FProxy:/;
464    
465     if ($req->{data_found}) {
466     # file has been successfully downloaded
467    
468     ... move the file away
469     (left as exercise)
470    
471     # remove the request
472    
473     $fcp->remove_request (1, $req->{identifier});
474     } elsif ($req->{get_failed}) {
475     # request has failed
476     if ($req->{get_failed}{code} == 11) {
477     # too many path components, should restart
478     } else {
479     # other failure
480     }
481     } else {
482     # modify priorities randomly, to improve download rates
483     $fcp->modify_persistent_request (1, $req->{identifier}, undef, int 6 - 5 * (rand) ** 1.7)
484     if 0.1 > rand;
485     }
486     }
487    
488     # see if the dummy plugin is loaded, to ensure all previous requests have finished.
489     $fcp->get_plugin_info_sync ("dummy");
490    
491 root 1.1 =head1 SEE ALSO
492    
493 root 1.2 L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0>, L<Net::FCP>.
494 root 1.1
495     =head1 BUGS
496    
497     =head1 AUTHOR
498    
499     Marc Lehmann <schmorp@schmorp.de>
500     http://home.schmorp.de/
501    
502     =cut
503    
504     1
505