ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
Revision: 1.10
Committed: Tue Aug 4 00:50:25 2015 UTC (8 years, 9 months ago) by root
Branch: MAIN
Changes since 1.9: +32 -26 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::FCP - freenet client protocol 2.0
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::FCP;
8
9 my $fcp = new AnyEvent::FCP;
10
11 # transactions return condvars
12 my $lp_cv = $fcp->list_peers;
13 my $pr_cv = $fcp->list_persistent_requests;
14
15 my $peers = $lp_cv->recv;
16 my $reqs = $pr_cv->recv;
17
18 =head1 DESCRIPTION
19
20 This module implements the freenet client protocol version 2.0, as used by
21 freenet 0.7. See L<Net::FCP> for the earlier freenet 0.5 version.
22
23 See L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0> for a
24 description of what the messages do.
25
26 The module uses L<AnyEvent> to find a suitable event module.
27
28 Only very little is implemented, ask if you need more, and look at the
29 example program later in this section.
30
31 =head2 EXAMPLE
32
33 This example fetches the download list and sets the priority of all files
34 with "a" in their name to "emergency":
35
36 use AnyEvent::FCP;
37
38 my $fcp = new AnyEvent::FCP;
39
40 $fcp->watch_global_sync (1, 0);
41 my $req = $fcp->list_persistent_requests_sync;
42
43 for my $req (values %$req) {
44 if ($req->{filename} =~ /a/) {
45 $fcp->modify_persistent_request_sync (1, $req->{identifier}, undef, 0);
46 }
47 }
48
49 =head2 IMPORT TAGS
50
51 Nothing much can be "imported" from this module right now.
52
53 =head2 THE AnyEvent::FCP CLASS
54
55 =over 4
56
57 =cut
58
59 package AnyEvent::FCP;
60
61 use common::sense;
62
63 use Carp;
64
65 our $VERSION = '0.3';
66
67 use Scalar::Util ();
68
69 use AnyEvent;
70 use AnyEvent::Handle;
71 use AnyEvent::Util ();
72
73 sub touc($) {
74 local $_ = shift;
75 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
76 s/(?:^|_)(.)/\U$1/g;
77 $_
78 }
79
80 sub tolc($) {
81 local $_ = shift;
82 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/i;
83 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/i;
84 s/(?<=[a-z])(?=[A-Z])/_/g;
85 lc
86 }
87
88 =item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name]
89
90 Create a new FCP connection to the given host and port (default
91 127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
92
93 If no C<name> was specified, then AnyEvent::FCP will generate a
94 (hopefully) unique client name for you.
95
96 You can install a progress callback that is being called with the AnyEvent::FCP
97 object, the type, a hashref with key-value pairs and a reference to any received data,
98 for all unsolicited messages.
99
100 Example:
101
102 sub progress_cb {
103 my ($self, $type, $kv, $rdata) = @_;
104
105 if ($type eq "simple_progress") {
106 warn "$kv->{identifier} $kv->{succeeded}/$kv->{required}\n";
107 }
108 }
109
110 =cut
111
112 sub new {
113 my $class = shift;
114 my $self = bless { @_ }, $class;
115
116 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1";
117 $self->{port} ||= $ENV{FREDPORT} || 9481;
118 $self->{name} ||= time.rand.rand.rand; # lame
119 $self->{timeout} ||= 3600*2;
120 $self->{progress} ||= sub { };
121
122 $self->{id} = "a0";
123
124 {
125 Scalar::Util::weaken (my $self = $self);
126
127 $self->{hdl} = new AnyEvent::Handle
128 connect => [$self->{host} => $self->{port}],
129 timeout => $self->{timeout},
130 on_error => sub {
131 warn "@_\n";#d#
132 exit 1;
133 },
134 on_read => sub { $self->on_read (@_) },
135 on_eof => $self->{on_eof} || sub { };
136
137 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
138 }
139
140 $self->send_msg (
141 client_hello =>
142 name => $self->{name},
143 expected_version => "2.0",
144 );
145
146 $self
147 }
148
149 sub send_msg {
150 my ($self, $type, %kv) = @_;
151
152 my $data = delete $kv{data};
153
154 if (exists $kv{id_cb}) {
155 my $id = $kv{identifier} ||= ++$self->{id};
156 $self->{id}{$id} = delete $kv{id_cb};
157 }
158
159 my $msg = (touc $type) . "\012"
160 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
161
162 sub id {
163 my ($self) = @_;
164
165
166 }
167
168 if (defined $data) {
169 $msg .= "DataLength=" . (length $data) . "\012"
170 . "Data\012$data";
171 } else {
172 $msg .= "EndMessage\012";
173 }
174
175 $self->{hdl}->push_write ($msg);
176 }
177
178 sub on {
179 my ($self, $cb) = @_;
180
181 # cb return undef - message eaten, remove cb
182 # cb return 0 - message eaten
183 # cb return 1 - pass to next
184
185 push @{ $self->{on} }, $cb;
186 }
187
188 sub _push_queue {
189 my ($self, $queue) = @_;
190
191 shift @$queue;
192 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
193 if @$queue;
194 }
195
196 # lock so only one $type (arbitrary string) is in flight,
197 # to work around horribly misdesigned protocol.
198 sub serialise {
199 my ($self, $type, $cb) = @_;
200
201 my $queue = $self->{serialise}{$type} ||= [];
202 push @$queue, $cb;
203 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
204 unless $#$queue;
205 }
206
207 sub on_read {
208 my ($self) = @_;
209
210 my $type;
211 my %kv;
212 my $rdata;
213
214 my $done_cb = sub {
215 $kv{pkt_type} = $type;
216
217 my $on = $self->{on};
218 for (0 .. $#$on) {
219 unless (my $res = $on->[$_]($self, $type, \%kv, $rdata)) {
220 splice @$on, $_, 1 unless defined $res;
221 return;
222 }
223 }
224
225 if (my $cb = $self->{queue}[0]) {
226 $cb->($self, $type, \%kv, $rdata)
227 and shift @{ $self->{queue} };
228 } else {
229 $self->default_recv ($type, \%kv, $rdata);
230 }
231 };
232
233 my $hdr_cb; $hdr_cb = sub {
234 if ($_[1] =~ /^([^=]+)=(.*)$/) {
235 my ($k, $v) = ($1, $2);
236 my @k = split /\./, tolc $k;
237 my $ro = \\%kv;
238
239 while (@k) {
240 my $k = shift @k;
241 if ($k =~ /^\d+$/) {
242 $ro = \$$ro->[$k];
243 } else {
244 $ro = \$$ro->{$k};
245 }
246 }
247
248 $$ro = $v;
249
250 $_[0]->push_read (line => $hdr_cb);
251 } elsif ($_[1] eq "Data") {
252 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
253 $rdata = \$_[1];
254 $done_cb->();
255 });
256 } elsif ($_[1] eq "EndMessage") {
257 $done_cb->();
258 } else {
259 die "protocol error, expected message end, got $_[1]\n";#d#
260 }
261 };
262
263 $self->{hdl}->push_read (line => sub {
264 $type = tolc $_[1];
265 $_[0]->push_read (line => $hdr_cb);
266 });
267 }
268
269 sub default_recv {
270 my ($self, $type, $kv, $rdata) = @_;
271
272 if ($type eq "node_hello") {
273 $self->{node_hello} = $kv;
274 } elsif (exists $self->{id}{$kv->{identifier}}) {
275 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
276 and delete $self->{id}{$kv->{identifier}};
277 } else {
278 &{ $self->{progress} };
279 }
280 }
281
282 sub _txn {
283 my ($name, $sub) = @_;
284
285 *{$name} = sub {
286 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
287 &$sub;
288 $cv
289 };
290
291 *{"$name\_sync"} = sub {
292 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
293 &$sub;
294 $cv->recv
295 };
296 }
297
298 =item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]])
299
300 =item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]])
301
302 =cut
303
304 _txn list_peers => sub {
305 my ($self, $cv, $with_metadata, $with_volatile) = @_;
306
307 my @res;
308
309 $self->send_msg (list_peers =>
310 with_metadata => $with_metadata ? "true" : "false",
311 with_volatile => $with_volatile ? "true" : "false",
312 id_cb => sub {
313 my ($self, $type, $kv, $rdata) = @_;
314
315 if ($type eq "end_list_peers") {
316 $cv->(\@res);
317 1
318 } else {
319 push @res, $kv;
320 0
321 }
322 },
323 );
324 };
325
326 =item $cv = $fcp->list_peer_notes ($node_identifier)
327
328 =item $notes = $fcp->list_peer_notes_sync ($node_identifier)
329
330 =cut
331
332 _txn list_peer_notes => sub {
333 my ($self, $cv, $node_identifier) = @_;
334
335 $self->send_msg (list_peer_notes =>
336 node_identifier => $node_identifier,
337 id_cb => sub {
338 my ($self, $type, $kv, $rdata) = @_;
339
340 $cv->($kv);
341 1
342 },
343 );
344 };
345
346 =item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
347
348 =item $fcp->watch_global_sync ($enabled[, $verbosity_mask])
349
350 =cut
351
352 _txn watch_global => sub {
353 my ($self, $cv, $enabled, $verbosity_mask) = @_;
354
355 $self->send_msg (watch_global =>
356 enabled => $enabled ? "true" : "false",
357 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
358 );
359
360 $cv->();
361 };
362
363 =item $cv = $fcp->list_persistent_requests
364
365 =item $reqs = $fcp->list_persistent_requests_sync
366
367 =cut
368
369 _txn list_persistent_requests => sub {
370 my ($self, $cv) = @_;
371
372 $self->serialise (list_persistent_requests => sub {
373 my ($self, $guard) = @_;
374
375 my %res;
376
377 $self->send_msg ("list_persistent_requests");
378
379 $self->on (sub {
380 my ($self, $type, $kv, $rdata) = @_;
381
382 $guard if 0;
383
384 if ($type eq "end_list_persistent_requests") {
385 $cv->(\%res);
386 return;
387 } else {
388 my $id = $kv->{identifier};
389
390 if ($type =~ /^persistent_(get|put|put_dir)$/) {
391 $res{$id} = {
392 type => $1,
393 %{ $res{$id} },
394 %$kv,
395 };
396 } elsif ($type eq "simple_progress") {
397 delete $kv->{pkt_type}; # save memory
398 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
399 } else {
400 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
401 }
402 }
403
404 1
405 });
406 });
407 };
408
409 =item $cv = $fcp->remove_request ($global, $identifier)
410
411 =item $status = $fcp->remove_request_sync ($global, $identifier)
412
413 =cut
414
415 _txn remove_request => sub {
416 my ($self, $cv, $global, $identifier) = @_;
417
418 $self->send_msg (remove_request =>
419 global => $global ? "true" : "false",
420 identifier => $identifier,
421 id_cb => sub {
422 my ($self, $type, $kv, $rdata) = @_;
423
424 $cv->($kv);
425 1
426 },
427 );
428 };
429
430 =item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
431
432 =item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
433
434 =cut
435
436 _txn modify_persistent_request => sub {
437 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
438
439 $self->send_msg (modify_persistent_request =>
440 global => $global ? "true" : "false",
441 defined $client_token ? (client_token => $client_token ) : (),
442 defined $priority_class ? (priority_class => $priority_class) : (),
443 identifier => $identifier,
444 id_cb => sub {
445 my ($self, $type, $kv, $rdata) = @_;
446
447 $cv->($kv);
448 1
449 },
450 );
451 };
452
453 =item $cv = $fcp->get_plugin_info ($name, $detailed)
454
455 =item $info = $fcp->get_plugin_info_sync ($name, $detailed)
456
457 =cut
458
459 _txn get_plugin_info => sub {
460 my ($self, $cv, $name, $detailed) = @_;
461
462 $self->send_msg (get_plugin_info =>
463 plugin_name => $name,
464 detailed => $detailed ? "true" : "false",
465 id_cb => sub {
466 my ($self, $type, $kv, $rdata) = @_;
467
468 $cv->($kv);
469 1
470 },
471 );
472 };
473
474 =item $cv = $fcp->client_get ($uri, $identifier, %kv)
475
476 =item $status = $fcp->client_get_sync ($uri, $identifier, %kv)
477
478 %kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
479
480 ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
481 priority_class, persistence, client_token, global, return_type,
482 binary_blob, allowed_mime_types, filename, temp_filename
483
484 =cut
485
486 _txn client_get => sub {
487 my ($self, $cv, $uri, $identifier, %kv) = @_;
488
489 $self->send_msg (client_get =>
490 %kv,
491 uri => $uri,
492 identifier => $identifier,
493 id_cb => sub {
494 my ($self, $type, $kv, $rdata) = @_;
495
496 $cv->($kv);
497 1
498 },
499 );
500 };
501
502 =item $cv = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write)
503
504 =item ($can_read, $can_write) = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write))
505
506 The DDA test in FCP is probably the single most broken protocol - only
507 one directory test can be outstanding at any time, and some guessing and
508 heuristics are involved in mangling the paths.
509
510 This function combines C<TestDDARequest> and C<TestDDAResponse> in one
511 request, handling file reading and writing as well.
512
513 =cut
514
515 _txn test_dda => sub {
516 my ($self, $cv, $local, $remote, $want_read, $want_write) = @_;
517
518 $self->serialise (test_dda => sub {
519 my ($self, $guard) = @_;
520
521 $self->send_msg (test_dda_request =>
522 directory => $remote,
523 want_read_directory => $want_read ? "true" : "false",
524 want_write_directory => $want_write ? "true" : "false",
525 );
526 $self->on (sub {
527 my ($self, $type, $kv) = @_;
528
529 if ($type eq "test_dda_reply") {
530 # the filenames are all relative to the server-side directory,
531 # which might or might not match $remote anymore, so we
532 # need to rewrite the paths to be relative to $local
533 for my $k (qw(read_filename write_filename)) {
534 my $f = $kv->{$k};
535 for my $dir ($kv->{directory}, $remote) {
536 if ($dir eq substr $f, 0, length $dir) {
537 substr $f, 0, 1 + length $dir, "";
538 $kv->{$k} = $f;
539 last;
540 }
541 }
542 }
543
544 my %response = (directory => $remote);
545
546 if (length $kv->{read_filename}) {
547 warn "$local/$kv->{read_filename}";#d#
548 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
549 sysread $fh, my $buf, -s $fh;
550 $response{read_content} = $buf;
551 }
552 }
553
554 if (length $kv->{write_filename}) {
555 if (open my $fh, ">:raw", "$local/$kv->{write_filename}") {
556 syswrite $fh, $kv->{content_to_write};
557 }
558 }
559
560 $self->send_msg (test_dda_response => %response);
561
562 $self->on (sub {
563 my ($self, $type, $kv) = @_;
564
565 $guard if 0; # reference
566
567 if ($type eq "test_dda_complete") {
568 $cv->(
569 $kv->{read_directory_allowed} eq "true",
570 $kv->{write_directory_allowed} eq "true",
571 );
572 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
573 $cv->croak ($kv->{extra_description});
574 return;
575 }
576
577 1
578 });
579
580 return;
581 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
582 $cv->croak ($kv->{extra_description});
583 return;
584 }
585
586 1
587 });
588 });
589 };
590
591 =back
592
593 =head1 EXAMPLE PROGRAM
594
595 use AnyEvent::FCP;
596
597 my $fcp = new AnyEvent::FCP;
598
599 # let us look at the global request list
600 $fcp->watch_global (1, 0);
601
602 # list them, synchronously
603 my $req = $fcp->list_persistent_requests_sync;
604
605 # go through all requests
606 for my $req (values %$req) {
607 # skip jobs not directly-to-disk
608 next unless $req->{return_type} eq "disk";
609 # skip jobs not issued by FProxy
610 next unless $req->{identifier} =~ /^FProxy:/;
611
612 if ($req->{data_found}) {
613 # file has been successfully downloaded
614
615 ... move the file away
616 (left as exercise)
617
618 # remove the request
619
620 $fcp->remove_request (1, $req->{identifier});
621 } elsif ($req->{get_failed}) {
622 # request has failed
623 if ($req->{get_failed}{code} == 11) {
624 # too many path components, should restart
625 } else {
626 # other failure
627 }
628 } else {
629 # modify priorities randomly, to improve download rates
630 $fcp->modify_persistent_request (1, $req->{identifier}, undef, int 6 - 5 * (rand) ** 1.7)
631 if 0.1 > rand;
632 }
633 }
634
635 # see if the dummy plugin is loaded, to ensure all previous requests have finished.
636 $fcp->get_plugin_info_sync ("dummy");
637
638 =head1 SEE ALSO
639
640 L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0>, L<Net::FCP>.
641
642 =head1 BUGS
643
644 =head1 AUTHOR
645
646 Marc Lehmann <schmorp@schmorp.de>
647 http://home.schmorp.de/
648
649 =cut
650
651 1
652