ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
Revision: 1.3
Committed: Tue Jul 28 02:20:51 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
CVS Tags: rel-0_2
Changes since 1.2: +111 -35 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::FCP - freenet client protocol 2.0
4    
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::FCP;
8 root 1.1
9 root 1.3 my $fcp = new AnyEvent::FCP;
10 root 1.1
11 root 1.3 # transactions return condvars
12     my $lp_cv = $fcp->list_peers;
13     my $pr_cv = $fcp->list_persistent_requests;
14    
15     my $peers = $lp_cv->recv;
16     my $reqs = $pr_cv->recv;
17 root 1.1
18     =head1 DESCRIPTION
19    
20     This module implements the freenet client protocol version 2.0, as used by
21     freenet 0.7. See L<Net::FCP> for the earlier freenet 0.5 version.
22    
23 root 1.3 See L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0> for a
24     description of what the messages do.
25 root 1.1
26     The module uses L<AnyEvent> to find a suitable event module.
27    
28 root 1.3 Only very little is implemented, ask if you need more, and look at the
29     example program later in this section.
30    
31 root 1.1 =head2 IMPORT TAGS
32    
33     Nothing much can be "imported" from this module right now.
34    
35     =head2 THE AnyEvent::FCP CLASS
36    
37     =over 4
38    
39     =cut
40    
41     package AnyEvent::FCP;
42    
43 root 1.2 use common::sense;
44    
45 root 1.1 use Carp;
46    
47 root 1.3 our $VERSION = '0.2';
48 root 1.1
49 root 1.2 use Scalar::Util ();
50 root 1.1
51     use AnyEvent;
52 root 1.2 use AnyEvent::Handle;
53 root 1.1
54     sub touc($) {
55     local $_ = shift;
56 root 1.2 1 while s/((?:^|_)(?:svk|chk|uri|fcp)(?:_|$))/\U$1/;
57 root 1.1 s/(?:^|_)(.)/\U$1/g;
58     $_
59     }
60    
61     sub tolc($) {
62     local $_ = shift;
63 root 1.2 1 while s/(SVK|CHK|URI|FCP)([^_])/$1\_$2/i;
64     1 while s/([^_])(SVK|CHK|URI|FCP)/$1\_$2/i;
65 root 1.1 s/(?<=[a-z])(?=[A-Z])/_/g;
66     lc
67     }
68    
69     =item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name]
70    
71     Create a new FCP connection to the given host and port (default
72 root 1.2 127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
73 root 1.1
74 root 1.3 If no C<name> was specified, then AnyEvent::FCP will generate a
75     (hopefully) unique client name for you.
76    
77     =cut
78 root 1.1
79     #TODO
80 root 1.2 #You can install a progress callback that is being called with the AnyEvent::FCP
81 root 1.1 #object, a txn object, the type of the transaction and the attributes. Use
82     #it like this:
83     #
84     # sub progress_cb {
85     # my ($self, $txn, $type, $attr) = @_;
86     #
87     # warn "progress<$txn,$type," . (join ":", %$attr) . ">\n";
88     # }
89    
90     sub new {
91     my $class = shift;
92     my $self = bless { @_ }, $class;
93    
94 root 1.2 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1";
95     $self->{port} ||= $ENV{FREDPORT} || 9481;
96     $self->{name} ||= time.rand.rand.rand; # lame
97     $self->{timeout} ||= 600;
98    
99     $self->{id} = "a0";
100    
101     {
102     Scalar::Util::weaken (my $self = $self);
103    
104     $self->{hdl} = new AnyEvent::Handle
105     connect => [$self->{host} => $self->{port}],
106     timeout => $self->{timeout},
107     on_error => sub {
108     warn "<@_>\n";
109     exit 1;
110     },
111     on_read => sub { $self->on_read (@_) },
112     on_eof => $self->{on_eof} || sub { };
113    
114     Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
115     }
116    
117     $self->send_msg (
118     client_hello =>
119     name => $self->{name},
120     expected_version => "2.0",
121     );
122 root 1.1
123     $self
124     }
125    
126 root 1.3 #sub progress {
127     # my ($self, $txn, $type, $attr) = @_;
128     #
129     # $self->{progress}->($self, $txn, $type, $attr)
130     # if $self->{progress};
131     #}
132 root 1.1
133 root 1.2 sub send_msg {
134     my ($self, $type, %kv) = @_;
135 root 1.1
136 root 1.2 my $data = delete $kv{data};
137 root 1.1
138 root 1.2 if (exists $kv{id_cb}) {
139     my $id = $kv{identifier} || ++$self->{id};
140     $self->{id}{$id} = delete $kv{id_cb};
141     $kv{identifier} = $id;
142 root 1.1 }
143    
144 root 1.2 my $msg = (touc $type) . "\012"
145     . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
146 root 1.1
147 root 1.2 sub id {
148     my ($self) = @_;
149 root 1.1
150    
151 root 1.2 }
152 root 1.1
153     if (defined $data) {
154 root 1.2 $msg .= "DataLength=" . (length $data) . "\012"
155     . "Data\012$data";
156 root 1.1 } else {
157 root 1.2 $msg .= "EndMessage\012";
158 root 1.1 }
159    
160 root 1.2 $self->{hdl}->push_write ($msg);
161 root 1.1 }
162    
163 root 1.2 sub on_read {
164 root 1.1 my ($self) = @_;
165    
166 root 1.2 my $type;
167     my %kv;
168     my $rdata;
169    
170     my $done_cb = sub {
171     $kv{pkt_type} = $type;
172    
173     if (my $cb = $self->{queue}[0]) {
174     $cb->($self, $type, \%kv, $rdata)
175     and shift @{ $self->{queue} };
176     } else {
177     $self->default_recv ($type, \%kv, $rdata);
178 root 1.1 }
179 root 1.2 };
180 root 1.1
181 root 1.2 my $hdr_cb; $hdr_cb = sub {
182     if ($_[1] =~ /^([^=]+)=(.*)$/) {
183     my ($k, $v) = ($1, $2);
184     my @k = split /\./, tolc $k;
185     my $ro = \\%kv;
186    
187     while (@k) {
188     my $k = shift @k;
189     if ($k =~ /^\d+$/) {
190     $ro = \$$ro->[$k];
191 root 1.1 } else {
192 root 1.2 $ro = \$$ro->{$k};
193 root 1.1 }
194     }
195    
196 root 1.2 $$ro = $v;
197 root 1.1
198 root 1.2 $_[0]->push_read (line => $hdr_cb);
199     } elsif ($_[1] eq "Data") {
200     $_[0]->push_read (chunk => delete $kv{data_length}, sub {
201     $rdata = \$_[1];
202     $done_cb->();
203     });
204     } elsif ($_[1] eq "EndMessage") {
205     $done_cb->();
206     } else {
207     die "protocol error, expected message end, got $_[1]\n";#d#
208     }
209     };
210 root 1.1
211 root 1.2 $self->{hdl}->push_read (line => sub {
212     $type = tolc $_[1];
213     $_[0]->push_read (line => $hdr_cb);
214     });
215     }
216 root 1.1
217 root 1.2 sub default_recv {
218     my ($self, $type, $kv, $rdata) = @_;
219    
220     if ($type eq "node_hello") {
221     $self->{node_hello} = $kv;
222     } elsif (exists $self->{id}{$kv->{identifier}}) {
223     $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
224     and delete $self->{id}{$kv->{identifier}};
225 root 1.1 } else {
226 root 1.2 # on_warn
227     #warn "protocol warning (unexpected $type message)\n";
228 root 1.1 }
229     }
230    
231 root 1.2 sub _txn {
232     my ($name, $sub) = @_;
233 root 1.1
234 root 1.2 *{$name} = sub {
235     splice @_, 1, 0, (my $cv = AnyEvent->condvar);
236     &$sub;
237     $cv
238     };
239 root 1.1
240 root 1.2 *{"$name\_sync"} = sub {
241     splice @_, 1, 0, (my $cv = AnyEvent->condvar);
242     &$sub;
243     $cv->recv
244     };
245 root 1.1 }
246    
247 root 1.3 =item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]])
248    
249     =item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]])
250    
251     =cut
252    
253 root 1.2 _txn list_peers => sub {
254     my ($self, $cv, $with_metadata, $with_volatile) = @_;
255 root 1.1
256 root 1.2 my @res;
257 root 1.1
258 root 1.2 $self->send_msg (list_peers =>
259     with_metadata => $with_metadata ? "true" : "false",
260     with_volatile => $with_volatile ? "true" : "false",
261     id_cb => sub {
262     my ($self, $type, $kv, $rdata) = @_;
263    
264     if ($type eq "end_list_peers") {
265     $cv->(\@res);
266     1
267     } else {
268     push @res, $kv;
269     0
270     }
271     },
272     );
273     };
274 root 1.1
275 root 1.3 =item $cv = $fcp->list_peer_notes ($node_identifier)
276    
277     =item $notes = $fcp->list_peer_notes_sync ($node_identifier)
278    
279     =cut
280    
281 root 1.2 _txn list_peer_notes => sub {
282     my ($self, $cv, $node_identifier) = @_;
283 root 1.1
284 root 1.2 $self->send_msg (list_peer_notes =>
285     node_identifier => $node_identifier,
286     id_cb => sub {
287     my ($self, $type, $kv, $rdata) = @_;
288    
289     $cv->($kv);
290     1
291     },
292     );
293     };
294 root 1.1
295 root 1.3 =item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
296    
297     =item $fcp->watch_global_sync ($enabled[, $verbosity_mask])
298    
299     =cut
300    
301 root 1.2 _txn watch_global => sub {
302     my ($self, $cv, $enabled, $verbosity_mask) = @_;
303 root 1.1
304 root 1.2 $self->send_msg (watch_global =>
305     enabled => $enabled ? "true" : "false",
306     defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
307     );
308 root 1.1
309 root 1.2 $cv->();
310     };
311 root 1.1
312 root 1.3 =item $cv = $fcp->list_persistent_requests
313    
314     =item $reqs = $fcp->list_persistent_requests_sync
315    
316     =cut
317    
318 root 1.2 _txn list_persistent_requests => sub {
319     my ($self, $cv) = @_;
320 root 1.1
321 root 1.2 my %res;
322 root 1.1
323 root 1.2 $self->send_msg ("list_persistent_requests");
324 root 1.1
325 root 1.2 push @{ $self->{queue} }, sub {
326     my ($self, $type, $kv, $rdata) = @_;
327    
328     if ($type eq "end_list_persistent_requests") {
329     $cv->(\%res);
330     1
331     } else {
332     my $id = $kv->{identifier};
333    
334     if ($type =~ /^persistent_(get|put|put_dir)$/) {
335     $res{$id} = {
336     type => $1,
337     %{ $res{$id} },
338     %$kv,
339     };
340     } elsif ($type eq "simple_progress") {
341     delete $kv->{pkt_type}; # save memory
342     push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
343     } else {
344     $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
345     }
346     0
347     }
348 root 1.1 };
349 root 1.2 };
350 root 1.1
351 root 1.3 =item $cv = $fcp->remove_request ($global, $identifier)
352    
353     =item $status = $fcp->remove_request_sync ($global, $identifier)
354    
355     =cut
356    
357 root 1.2 _txn remove_request => sub {
358     my ($self, $cv, $global, $identifier) = @_;
359 root 1.1
360 root 1.2 $self->send_msg (remove_request =>
361     global => $global ? "true" : "false",
362     identifier => $identifier,
363     id_cb => sub {
364     my ($self, $type, $kv, $rdata) = @_;
365    
366     $cv->($kv);
367     1
368     },
369     );
370 root 1.1
371 root 1.2 $cv->();
372     };
373 root 1.1
374 root 1.3 =item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
375    
376     =item $status = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
377    
378     =cut
379    
380 root 1.2 _txn modify_persistent_request => sub {
381     my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
382 root 1.1
383 root 1.2 $self->send_msg (modify_persistent_request =>
384     global => $global ? "true" : "false",
385     identifier => $identifier,
386     defined $client_token ? (client_token => $client_token ) : (),
387     defined $priority_class ? (priority_class => $priority_class) : (),
388     id_cb => sub {
389     my ($self, $type, $kv, $rdata) = @_;
390    
391     $cv->($kv);
392     1
393     },
394     );
395 root 1.1
396 root 1.2 $cv->();
397     };
398 root 1.1
399 root 1.3 =item $cv = $fcp->get_plugin_info ($name, $detailed)
400    
401     =item $info = $fcp->get_plugin_info_sync ($name, $detailed)
402    
403     =cut
404    
405 root 1.2 _txn get_plugin_info => sub {
406     my ($self, $cv, $name, $detailed) = @_;
407 root 1.1
408 root 1.2 $self->send_msg (get_plugin_info =>
409     plugin_name => $name,
410     detailed => $detailed ? "true" : "false",
411     id_cb => sub {
412     my ($self, $type, $kv, $rdata) = @_;
413    
414     $cv->($kv);
415     1
416     },
417     );
418 root 1.1
419 root 1.2 $cv->();
420     };
421 root 1.1
422     =back
423    
424 root 1.3 =head1 EXAMPLE PROGRAM
425    
426     use AnyEvent::FCP;
427    
428     my $fcp = new AnyEvent::FCP;
429    
430     # let us look at the global request list
431     $fcp->watch_global (1, 0);
432    
433     # list them, synchronously
434     my $req = $fcp->list_persistent_requests_sync;
435    
436     # go through all requests
437     for my $req (values %$req) {
438     # skip jobs not directly-to-disk
439     next unless $req->{return_type} eq "disk";
440     # skip jobs not issued by FProxy
441     next unless $req->{identifier} =~ /^FProxy:/;
442    
443     if ($req->{data_found}) {
444     # file has been successfully downloaded
445    
446     ... move the file away
447     (left as exercise)
448    
449     # remove the request
450    
451     $fcp->remove_request (1, $req->{identifier});
452     } elsif ($req->{get_failed}) {
453     # request has failed
454     if ($req->{get_failed}{code} == 11) {
455     # too many path components, should restart
456     } else {
457     # other failure
458     }
459     } else {
460     # modify priorities randomly, to improve download rates
461     $fcp->modify_persistent_request (1, $req->{identifier}, undef, int 6 - 5 * (rand) ** 1.7)
462     if 0.1 > rand;
463     }
464     }
465    
466     # see if the dummy plugin is loaded, to ensure all previous requests have finished.
467     $fcp->get_plugin_info_sync ("dummy");
468    
469 root 1.1 =head1 SEE ALSO
470    
471 root 1.2 L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0>, L<Net::FCP>.
472 root 1.1
473     =head1 BUGS
474    
475     =head1 AUTHOR
476    
477     Marc Lehmann <schmorp@schmorp.de>
478     http://home.schmorp.de/
479    
480     =cut
481    
482     1
483