ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
Revision: 1.3
Committed: Tue Jul 28 02:20:51 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
CVS Tags: rel-0_2
Changes since 1.2: +111 -35 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::FCP - freenet client protocol 2.0
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::FCP;
8
9 my $fcp = new AnyEvent::FCP;
10
11 # transactions return condvars
12 my $lp_cv = $fcp->list_peers;
13 my $pr_cv = $fcp->list_persistent_requests;
14
15 my $peers = $lp_cv->recv;
16 my $reqs = $pr_cv->recv;
17
18 =head1 DESCRIPTION
19
20 This module implements the freenet client protocol version 2.0, as used by
21 freenet 0.7. See L<Net::FCP> for the earlier freenet 0.5 version.
22
23 See L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0> for a
24 description of what the messages do.
25
26 The module uses L<AnyEvent> to find a suitable event module.
27
28 Only very little is implemented, ask if you need more, and look at the
29 example program later in this section.
30
31 =head2 IMPORT TAGS
32
33 Nothing much can be "imported" from this module right now.
34
35 =head2 THE AnyEvent::FCP CLASS
36
37 =over 4
38
39 =cut
40
41 package AnyEvent::FCP;
42
43 use common::sense;
44
45 use Carp;
46
47 our $VERSION = '0.2';
48
49 use Scalar::Util ();
50
51 use AnyEvent;
52 use AnyEvent::Handle;
53
54 sub touc($) {
55 local $_ = shift;
56 1 while s/((?:^|_)(?:svk|chk|uri|fcp)(?:_|$))/\U$1/;
57 s/(?:^|_)(.)/\U$1/g;
58 $_
59 }
60
61 sub tolc($) {
62 local $_ = shift;
63 1 while s/(SVK|CHK|URI|FCP)([^_])/$1\_$2/i;
64 1 while s/([^_])(SVK|CHK|URI|FCP)/$1\_$2/i;
65 s/(?<=[a-z])(?=[A-Z])/_/g;
66 lc
67 }
68
69 =item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name]
70
71 Create a new FCP connection to the given host and port (default
72 127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
73
74 If no C<name> was specified, then AnyEvent::FCP will generate a
75 (hopefully) unique client name for you.
76
77 =cut
78
79 #TODO
80 #You can install a progress callback that is being called with the AnyEvent::FCP
81 #object, a txn object, the type of the transaction and the attributes. Use
82 #it like this:
83 #
84 # sub progress_cb {
85 # my ($self, $txn, $type, $attr) = @_;
86 #
87 # warn "progress<$txn,$type," . (join ":", %$attr) . ">\n";
88 # }
89
90 sub new {
91 my $class = shift;
92 my $self = bless { @_ }, $class;
93
94 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1";
95 $self->{port} ||= $ENV{FREDPORT} || 9481;
96 $self->{name} ||= time.rand.rand.rand; # lame
97 $self->{timeout} ||= 600;
98
99 $self->{id} = "a0";
100
101 {
102 Scalar::Util::weaken (my $self = $self);
103
104 $self->{hdl} = new AnyEvent::Handle
105 connect => [$self->{host} => $self->{port}],
106 timeout => $self->{timeout},
107 on_error => sub {
108 warn "<@_>\n";
109 exit 1;
110 },
111 on_read => sub { $self->on_read (@_) },
112 on_eof => $self->{on_eof} || sub { };
113
114 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
115 }
116
117 $self->send_msg (
118 client_hello =>
119 name => $self->{name},
120 expected_version => "2.0",
121 );
122
123 $self
124 }
125
126 #sub progress {
127 # my ($self, $txn, $type, $attr) = @_;
128 #
129 # $self->{progress}->($self, $txn, $type, $attr)
130 # if $self->{progress};
131 #}
132
133 sub send_msg {
134 my ($self, $type, %kv) = @_;
135
136 my $data = delete $kv{data};
137
138 if (exists $kv{id_cb}) {
139 my $id = $kv{identifier} || ++$self->{id};
140 $self->{id}{$id} = delete $kv{id_cb};
141 $kv{identifier} = $id;
142 }
143
144 my $msg = (touc $type) . "\012"
145 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
146
147 sub id {
148 my ($self) = @_;
149
150
151 }
152
153 if (defined $data) {
154 $msg .= "DataLength=" . (length $data) . "\012"
155 . "Data\012$data";
156 } else {
157 $msg .= "EndMessage\012";
158 }
159
160 $self->{hdl}->push_write ($msg);
161 }
162
163 sub on_read {
164 my ($self) = @_;
165
166 my $type;
167 my %kv;
168 my $rdata;
169
170 my $done_cb = sub {
171 $kv{pkt_type} = $type;
172
173 if (my $cb = $self->{queue}[0]) {
174 $cb->($self, $type, \%kv, $rdata)
175 and shift @{ $self->{queue} };
176 } else {
177 $self->default_recv ($type, \%kv, $rdata);
178 }
179 };
180
181 my $hdr_cb; $hdr_cb = sub {
182 if ($_[1] =~ /^([^=]+)=(.*)$/) {
183 my ($k, $v) = ($1, $2);
184 my @k = split /\./, tolc $k;
185 my $ro = \\%kv;
186
187 while (@k) {
188 my $k = shift @k;
189 if ($k =~ /^\d+$/) {
190 $ro = \$$ro->[$k];
191 } else {
192 $ro = \$$ro->{$k};
193 }
194 }
195
196 $$ro = $v;
197
198 $_[0]->push_read (line => $hdr_cb);
199 } elsif ($_[1] eq "Data") {
200 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
201 $rdata = \$_[1];
202 $done_cb->();
203 });
204 } elsif ($_[1] eq "EndMessage") {
205 $done_cb->();
206 } else {
207 die "protocol error, expected message end, got $_[1]\n";#d#
208 }
209 };
210
211 $self->{hdl}->push_read (line => sub {
212 $type = tolc $_[1];
213 $_[0]->push_read (line => $hdr_cb);
214 });
215 }
216
217 sub default_recv {
218 my ($self, $type, $kv, $rdata) = @_;
219
220 if ($type eq "node_hello") {
221 $self->{node_hello} = $kv;
222 } elsif (exists $self->{id}{$kv->{identifier}}) {
223 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
224 and delete $self->{id}{$kv->{identifier}};
225 } else {
226 # on_warn
227 #warn "protocol warning (unexpected $type message)\n";
228 }
229 }
230
231 sub _txn {
232 my ($name, $sub) = @_;
233
234 *{$name} = sub {
235 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
236 &$sub;
237 $cv
238 };
239
240 *{"$name\_sync"} = sub {
241 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
242 &$sub;
243 $cv->recv
244 };
245 }
246
247 =item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]])
248
249 =item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]])
250
251 =cut
252
253 _txn list_peers => sub {
254 my ($self, $cv, $with_metadata, $with_volatile) = @_;
255
256 my @res;
257
258 $self->send_msg (list_peers =>
259 with_metadata => $with_metadata ? "true" : "false",
260 with_volatile => $with_volatile ? "true" : "false",
261 id_cb => sub {
262 my ($self, $type, $kv, $rdata) = @_;
263
264 if ($type eq "end_list_peers") {
265 $cv->(\@res);
266 1
267 } else {
268 push @res, $kv;
269 0
270 }
271 },
272 );
273 };
274
275 =item $cv = $fcp->list_peer_notes ($node_identifier)
276
277 =item $notes = $fcp->list_peer_notes_sync ($node_identifier)
278
279 =cut
280
281 _txn list_peer_notes => sub {
282 my ($self, $cv, $node_identifier) = @_;
283
284 $self->send_msg (list_peer_notes =>
285 node_identifier => $node_identifier,
286 id_cb => sub {
287 my ($self, $type, $kv, $rdata) = @_;
288
289 $cv->($kv);
290 1
291 },
292 );
293 };
294
295 =item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
296
297 =item $fcp->watch_global_sync ($enabled[, $verbosity_mask])
298
299 =cut
300
301 _txn watch_global => sub {
302 my ($self, $cv, $enabled, $verbosity_mask) = @_;
303
304 $self->send_msg (watch_global =>
305 enabled => $enabled ? "true" : "false",
306 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
307 );
308
309 $cv->();
310 };
311
312 =item $cv = $fcp->list_persistent_requests
313
314 =item $reqs = $fcp->list_persistent_requests_sync
315
316 =cut
317
318 _txn list_persistent_requests => sub {
319 my ($self, $cv) = @_;
320
321 my %res;
322
323 $self->send_msg ("list_persistent_requests");
324
325 push @{ $self->{queue} }, sub {
326 my ($self, $type, $kv, $rdata) = @_;
327
328 if ($type eq "end_list_persistent_requests") {
329 $cv->(\%res);
330 1
331 } else {
332 my $id = $kv->{identifier};
333
334 if ($type =~ /^persistent_(get|put|put_dir)$/) {
335 $res{$id} = {
336 type => $1,
337 %{ $res{$id} },
338 %$kv,
339 };
340 } elsif ($type eq "simple_progress") {
341 delete $kv->{pkt_type}; # save memory
342 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
343 } else {
344 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
345 }
346 0
347 }
348 };
349 };
350
351 =item $cv = $fcp->remove_request ($global, $identifier)
352
353 =item $status = $fcp->remove_request_sync ($global, $identifier)
354
355 =cut
356
357 _txn remove_request => sub {
358 my ($self, $cv, $global, $identifier) = @_;
359
360 $self->send_msg (remove_request =>
361 global => $global ? "true" : "false",
362 identifier => $identifier,
363 id_cb => sub {
364 my ($self, $type, $kv, $rdata) = @_;
365
366 $cv->($kv);
367 1
368 },
369 );
370
371 $cv->();
372 };
373
374 =item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
375
376 =item $status = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
377
378 =cut
379
380 _txn modify_persistent_request => sub {
381 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
382
383 $self->send_msg (modify_persistent_request =>
384 global => $global ? "true" : "false",
385 identifier => $identifier,
386 defined $client_token ? (client_token => $client_token ) : (),
387 defined $priority_class ? (priority_class => $priority_class) : (),
388 id_cb => sub {
389 my ($self, $type, $kv, $rdata) = @_;
390
391 $cv->($kv);
392 1
393 },
394 );
395
396 $cv->();
397 };
398
399 =item $cv = $fcp->get_plugin_info ($name, $detailed)
400
401 =item $info = $fcp->get_plugin_info_sync ($name, $detailed)
402
403 =cut
404
405 _txn get_plugin_info => sub {
406 my ($self, $cv, $name, $detailed) = @_;
407
408 $self->send_msg (get_plugin_info =>
409 plugin_name => $name,
410 detailed => $detailed ? "true" : "false",
411 id_cb => sub {
412 my ($self, $type, $kv, $rdata) = @_;
413
414 $cv->($kv);
415 1
416 },
417 );
418
419 $cv->();
420 };
421
422 =back
423
424 =head1 EXAMPLE PROGRAM
425
426 use AnyEvent::FCP;
427
428 my $fcp = new AnyEvent::FCP;
429
430 # let us look at the global request list
431 $fcp->watch_global (1, 0);
432
433 # list them, synchronously
434 my $req = $fcp->list_persistent_requests_sync;
435
436 # go through all requests
437 for my $req (values %$req) {
438 # skip jobs not directly-to-disk
439 next unless $req->{return_type} eq "disk";
440 # skip jobs not issued by FProxy
441 next unless $req->{identifier} =~ /^FProxy:/;
442
443 if ($req->{data_found}) {
444 # file has been successfully downloaded
445
446 ... move the file away
447 (left as exercise)
448
449 # remove the request
450
451 $fcp->remove_request (1, $req->{identifier});
452 } elsif ($req->{get_failed}) {
453 # request has failed
454 if ($req->{get_failed}{code} == 11) {
455 # too many path components, should restart
456 } else {
457 # other failure
458 }
459 } else {
460 # modify priorities randomly, to improve download rates
461 $fcp->modify_persistent_request (1, $req->{identifier}, undef, int 6 - 5 * (rand) ** 1.7)
462 if 0.1 > rand;
463 }
464 }
465
466 # see if the dummy plugin is loaded, to ensure all previous requests have finished.
467 $fcp->get_plugin_info_sync ("dummy");
468
469 =head1 SEE ALSO
470
471 L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0>, L<Net::FCP>.
472
473 =head1 BUGS
474
475 =head1 AUTHOR
476
477 Marc Lehmann <schmorp@schmorp.de>
478 http://home.schmorp.de/
479
480 =cut
481
482 1
483