ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
Revision: 1.7
Committed: Sat Jun 5 14:49:25 2010 UTC (13 years, 11 months ago) by root
Branch: MAIN
CVS Tags: rel-0_3
Changes since 1.6: +20 -2 lines
Log Message:
0.3

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::FCP - freenet client protocol 2.0
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::FCP;
8
9 my $fcp = new AnyEvent::FCP;
10
11 # transactions return condvars
12 my $lp_cv = $fcp->list_peers;
13 my $pr_cv = $fcp->list_persistent_requests;
14
15 my $peers = $lp_cv->recv;
16 my $reqs = $pr_cv->recv;
17
18 =head1 DESCRIPTION
19
20 This module implements the freenet client protocol version 2.0, as used by
21 freenet 0.7. See L<Net::FCP> for the earlier freenet 0.5 version.
22
23 See L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0> for a
24 description of what the messages do.
25
26 The module uses L<AnyEvent> to find a suitable event module.
27
28 Only very little is implemented, ask if you need more, and look at the
29 example program later in this section.
30
31 =head2 EXAMPLE
32
33 This example fetches the download list and sets the priority of all files
34 with "a" in their name to "emergency":
35
36 use AnyEvent::FCP;
37
38 my $fcp = new AnyEvent::FCP;
39
40 $fcp->watch_global_sync (1, 0);
41 my $req = $fcp->list_persistent_requests_sync;
42
43 for my $req (values %$req) {
44 if ($req->{filename} =~ /a/) {
45 $fcp->modify_persistent_request_sync (1, $req->{identifier}, undef, 0);
46 }
47 }
48
49 =head2 IMPORT TAGS
50
51 Nothing much can be "imported" from this module right now.
52
53 =head2 THE AnyEvent::FCP CLASS
54
55 =over 4
56
57 =cut
58
59 package AnyEvent::FCP;
60
61 use common::sense;
62
63 use Carp;
64
65 our $VERSION = '0.3';
66
67 use Scalar::Util ();
68
69 use AnyEvent;
70 use AnyEvent::Handle;
71
72 sub touc($) {
73 local $_ = shift;
74 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime)(?:_|$))/\U$1/;
75 s/(?:^|_)(.)/\U$1/g;
76 $_
77 }
78
79 sub tolc($) {
80 local $_ = shift;
81 1 while s/(SVK|CHK|URI|FCP|DS|MIME)([^_])/$1\_$2/i;
82 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME)/$1\_$2/i;
83 s/(?<=[a-z])(?=[A-Z])/_/g;
84 lc
85 }
86
87 =item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name]
88
89 Create a new FCP connection to the given host and port (default
90 127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
91
92 If no C<name> was specified, then AnyEvent::FCP will generate a
93 (hopefully) unique client name for you.
94
95 You can install a progress callback that is being called with the AnyEvent::FCP
96 object, the type, a hashref with key-value pairs and a reference to any received data,
97 for all unsolicited messages.
98
99 Example:
100
101 sub progress_cb {
102 my ($self, $type, $kv, $rdata) = @_;
103
104 if ($type eq "simple_progress") {
105 warn "$kv->{identifier} $kv->{succeeded}/$kv->{required}\n";
106 }
107 }
108
109 =cut
110
111 sub new {
112 my $class = shift;
113 my $self = bless { @_ }, $class;
114
115 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1";
116 $self->{port} ||= $ENV{FREDPORT} || 9481;
117 $self->{name} ||= time.rand.rand.rand; # lame
118 $self->{timeout} ||= 600;
119 $self->{progress} ||= sub { };
120
121 $self->{id} = "a0";
122
123 {
124 Scalar::Util::weaken (my $self = $self);
125
126 $self->{hdl} = new AnyEvent::Handle
127 connect => [$self->{host} => $self->{port}],
128 timeout => $self->{timeout},
129 on_error => sub {
130 warn "<@_>\n";
131 exit 1;
132 },
133 on_read => sub { $self->on_read (@_) },
134 on_eof => $self->{on_eof} || sub { };
135
136 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
137 }
138
139 $self->send_msg (
140 client_hello =>
141 name => $self->{name},
142 expected_version => "2.0",
143 );
144
145 $self
146 }
147
148 sub send_msg {
149 my ($self, $type, %kv) = @_;
150
151 my $data = delete $kv{data};
152
153 if (exists $kv{id_cb}) {
154 my $id = $kv{identifier} || ++$self->{id};
155 $self->{id}{$id} = delete $kv{id_cb};
156 $kv{identifier} = $id;
157 }
158
159 my $msg = (touc $type) . "\012"
160 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
161
162 sub id {
163 my ($self) = @_;
164
165
166 }
167
168 if (defined $data) {
169 $msg .= "DataLength=" . (length $data) . "\012"
170 . "Data\012$data";
171 } else {
172 $msg .= "EndMessage\012";
173 }
174
175 $self->{hdl}->push_write ($msg);
176 }
177
178 sub on_read {
179 my ($self) = @_;
180
181 my $type;
182 my %kv;
183 my $rdata;
184
185 my $done_cb = sub {
186 $kv{pkt_type} = $type;
187
188 if (my $cb = $self->{queue}[0]) {
189 $cb->($self, $type, \%kv, $rdata)
190 and shift @{ $self->{queue} };
191 } else {
192 $self->default_recv ($type, \%kv, $rdata);
193 }
194 };
195
196 my $hdr_cb; $hdr_cb = sub {
197 if ($_[1] =~ /^([^=]+)=(.*)$/) {
198 my ($k, $v) = ($1, $2);
199 my @k = split /\./, tolc $k;
200 my $ro = \\%kv;
201
202 while (@k) {
203 my $k = shift @k;
204 if ($k =~ /^\d+$/) {
205 $ro = \$$ro->[$k];
206 } else {
207 $ro = \$$ro->{$k};
208 }
209 }
210
211 $$ro = $v;
212
213 $_[0]->push_read (line => $hdr_cb);
214 } elsif ($_[1] eq "Data") {
215 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
216 $rdata = \$_[1];
217 $done_cb->();
218 });
219 } elsif ($_[1] eq "EndMessage") {
220 $done_cb->();
221 } else {
222 die "protocol error, expected message end, got $_[1]\n";#d#
223 }
224 };
225
226 $self->{hdl}->push_read (line => sub {
227 $type = tolc $_[1];
228 $_[0]->push_read (line => $hdr_cb);
229 });
230 }
231
232 sub default_recv {
233 my ($self, $type, $kv, $rdata) = @_;
234
235 if ($type eq "node_hello") {
236 $self->{node_hello} = $kv;
237 } elsif (exists $self->{id}{$kv->{identifier}}) {
238 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
239 and delete $self->{id}{$kv->{identifier}};
240 } else {
241 &{ $self->{progress} };
242 }
243 }
244
245 sub _txn {
246 my ($name, $sub) = @_;
247
248 *{$name} = sub {
249 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
250 &$sub;
251 $cv
252 };
253
254 *{"$name\_sync"} = sub {
255 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
256 &$sub;
257 $cv->recv
258 };
259 }
260
261 =item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]])
262
263 =item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]])
264
265 =cut
266
267 _txn list_peers => sub {
268 my ($self, $cv, $with_metadata, $with_volatile) = @_;
269
270 my @res;
271
272 $self->send_msg (list_peers =>
273 with_metadata => $with_metadata ? "true" : "false",
274 with_volatile => $with_volatile ? "true" : "false",
275 id_cb => sub {
276 my ($self, $type, $kv, $rdata) = @_;
277
278 if ($type eq "end_list_peers") {
279 $cv->(\@res);
280 1
281 } else {
282 push @res, $kv;
283 0
284 }
285 },
286 );
287 };
288
289 =item $cv = $fcp->list_peer_notes ($node_identifier)
290
291 =item $notes = $fcp->list_peer_notes_sync ($node_identifier)
292
293 =cut
294
295 _txn list_peer_notes => sub {
296 my ($self, $cv, $node_identifier) = @_;
297
298 $self->send_msg (list_peer_notes =>
299 node_identifier => $node_identifier,
300 id_cb => sub {
301 my ($self, $type, $kv, $rdata) = @_;
302
303 $cv->($kv);
304 1
305 },
306 );
307 };
308
309 =item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
310
311 =item $fcp->watch_global_sync ($enabled[, $verbosity_mask])
312
313 =cut
314
315 _txn watch_global => sub {
316 my ($self, $cv, $enabled, $verbosity_mask) = @_;
317
318 $self->send_msg (watch_global =>
319 enabled => $enabled ? "true" : "false",
320 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
321 );
322
323 $cv->();
324 };
325
326 =item $cv = $fcp->list_persistent_requests
327
328 =item $reqs = $fcp->list_persistent_requests_sync
329
330 =cut
331
332 _txn list_persistent_requests => sub {
333 my ($self, $cv) = @_;
334
335 my %res;
336
337 $self->send_msg ("list_persistent_requests");
338
339 push @{ $self->{queue} }, sub {
340 my ($self, $type, $kv, $rdata) = @_;
341
342 if ($type eq "end_list_persistent_requests") {
343 $cv->(\%res);
344 1
345 } else {
346 my $id = $kv->{identifier};
347
348 if ($type =~ /^persistent_(get|put|put_dir)$/) {
349 $res{$id} = {
350 type => $1,
351 %{ $res{$id} },
352 %$kv,
353 };
354 } elsif ($type eq "simple_progress") {
355 delete $kv->{pkt_type}; # save memory
356 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
357 } else {
358 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
359 }
360 0
361 }
362 };
363 };
364
365 =item $cv = $fcp->remove_request ($global, $identifier)
366
367 =item $status = $fcp->remove_request_sync ($global, $identifier)
368
369 =cut
370
371 _txn remove_request => sub {
372 my ($self, $cv, $global, $identifier) = @_;
373
374 $self->send_msg (remove_request =>
375 global => $global ? "true" : "false",
376 identifier => $identifier,
377 id_cb => sub {
378 my ($self, $type, $kv, $rdata) = @_;
379
380 $cv->($kv);
381 1
382 },
383 );
384 };
385
386 =item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
387
388 =item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
389
390 =cut
391
392 _txn modify_persistent_request => sub {
393 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
394
395 $self->send_msg (modify_persistent_request =>
396 global => $global ? "true" : "false",
397 defined $client_token ? (client_token => $client_token ) : (),
398 defined $priority_class ? (priority_class => $priority_class) : (),
399 identifier => $identifier,
400 id_cb => sub {
401 my ($self, $type, $kv, $rdata) = @_;
402
403 $cv->($kv);
404 1
405 },
406 );
407 };
408
409 =item $cv = $fcp->get_plugin_info ($name, $detailed)
410
411 =item $info = $fcp->get_plugin_info_sync ($name, $detailed)
412
413 =cut
414
415 _txn get_plugin_info => sub {
416 my ($self, $cv, $name, $detailed) = @_;
417
418 $self->send_msg (get_plugin_info =>
419 plugin_name => $name,
420 detailed => $detailed ? "true" : "false",
421 id_cb => sub {
422 my ($self, $type, $kv, $rdata) = @_;
423
424 $cv->($kv);
425 1
426 },
427 );
428 };
429
430 =item $cv = $fcp->client_get ($uri, $identifier, %kv)
431
432 =item $status = $fcp->client_get_sync ($uri, $identifier, %kv)
433
434 %kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
435
436 ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
437 priority_class, persistence, client_token, global, return_type,
438 binary_blob, allowed_mime_types, filename, temp_filename
439
440 =cut
441
442 _txn client_get => sub {
443 my ($self, $cv, $uri, $identifier, %kv) = @_;
444
445 $self->send_msg (client_get =>
446 %kv,
447 uri => $uri,
448 identifier => $identifier,
449 id_cb => sub {
450 my ($self, $type, $kv, $rdata) = @_;
451
452 $cv->($kv);
453 1
454 },
455 );
456 };
457
458 =back
459
460 =head1 EXAMPLE PROGRAM
461
462 use AnyEvent::FCP;
463
464 my $fcp = new AnyEvent::FCP;
465
466 # let us look at the global request list
467 $fcp->watch_global (1, 0);
468
469 # list them, synchronously
470 my $req = $fcp->list_persistent_requests_sync;
471
472 # go through all requests
473 for my $req (values %$req) {
474 # skip jobs not directly-to-disk
475 next unless $req->{return_type} eq "disk";
476 # skip jobs not issued by FProxy
477 next unless $req->{identifier} =~ /^FProxy:/;
478
479 if ($req->{data_found}) {
480 # file has been successfully downloaded
481
482 ... move the file away
483 (left as exercise)
484
485 # remove the request
486
487 $fcp->remove_request (1, $req->{identifier});
488 } elsif ($req->{get_failed}) {
489 # request has failed
490 if ($req->{get_failed}{code} == 11) {
491 # too many path components, should restart
492 } else {
493 # other failure
494 }
495 } else {
496 # modify priorities randomly, to improve download rates
497 $fcp->modify_persistent_request (1, $req->{identifier}, undef, int 6 - 5 * (rand) ** 1.7)
498 if 0.1 > rand;
499 }
500 }
501
502 # see if the dummy plugin is loaded, to ensure all previous requests have finished.
503 $fcp->get_plugin_info_sync ("dummy");
504
505 =head1 SEE ALSO
506
507 L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0>, L<Net::FCP>.
508
509 =head1 BUGS
510
511 =head1 AUTHOR
512
513 Marc Lehmann <schmorp@schmorp.de>
514 http://home.schmorp.de/
515
516 =cut
517
518 1
519