ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
Revision: 1.9
Committed: Tue Aug 4 00:35:16 2015 UTC (8 years, 9 months ago) by root
Branch: MAIN
Changes since 1.8: +132 -5 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::FCP - freenet client protocol 2.0
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::FCP;
8
9 my $fcp = new AnyEvent::FCP;
10
11 # transactions return condvars
12 my $lp_cv = $fcp->list_peers;
13 my $pr_cv = $fcp->list_persistent_requests;
14
15 my $peers = $lp_cv->recv;
16 my $reqs = $pr_cv->recv;
17
18 =head1 DESCRIPTION
19
20 This module implements the freenet client protocol version 2.0, as used by
21 freenet 0.7. See L<Net::FCP> for the earlier freenet 0.5 version.
22
23 See L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0> for a
24 description of what the messages do.
25
26 The module uses L<AnyEvent> to find a suitable event module.
27
28 Only very little is implemented, ask if you need more, and look at the
29 example program later in this section.
30
31 =head2 EXAMPLE
32
33 This example fetches the download list and sets the priority of all files
34 with "a" in their name to "emergency":
35
36 use AnyEvent::FCP;
37
38 my $fcp = new AnyEvent::FCP;
39
40 $fcp->watch_global_sync (1, 0);
41 my $req = $fcp->list_persistent_requests_sync;
42
43 for my $req (values %$req) {
44 if ($req->{filename} =~ /a/) {
45 $fcp->modify_persistent_request_sync (1, $req->{identifier}, undef, 0);
46 }
47 }
48
49 =head2 IMPORT TAGS
50
51 Nothing much can be "imported" from this module right now.
52
53 =head2 THE AnyEvent::FCP CLASS
54
55 =over 4
56
57 =cut
58
59 package AnyEvent::FCP;
60
61 use common::sense;
62
63 use Carp;
64
65 our $VERSION = '0.3';
66
67 use Scalar::Util ();
68
69 use AnyEvent;
70 use AnyEvent::Handle;
71 use AnyEvent::Util ();
72
73 sub touc($) {
74 local $_ = shift;
75 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
76 s/(?:^|_)(.)/\U$1/g;
77 $_
78 }
79
80 sub tolc($) {
81 local $_ = shift;
82 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/i;
83 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/i;
84 s/(?<=[a-z])(?=[A-Z])/_/g;
85 lc
86 }
87
88 =item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name]
89
90 Create a new FCP connection to the given host and port (default
91 127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
92
93 If no C<name> was specified, then AnyEvent::FCP will generate a
94 (hopefully) unique client name for you.
95
96 You can install a progress callback that is being called with the AnyEvent::FCP
97 object, the type, a hashref with key-value pairs and a reference to any received data,
98 for all unsolicited messages.
99
100 Example:
101
102 sub progress_cb {
103 my ($self, $type, $kv, $rdata) = @_;
104
105 if ($type eq "simple_progress") {
106 warn "$kv->{identifier} $kv->{succeeded}/$kv->{required}\n";
107 }
108 }
109
110 =cut
111
112 sub new {
113 my $class = shift;
114 my $self = bless { @_ }, $class;
115
116 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1";
117 $self->{port} ||= $ENV{FREDPORT} || 9481;
118 $self->{name} ||= time.rand.rand.rand; # lame
119 $self->{timeout} ||= 3600*2;
120 $self->{progress} ||= sub { };
121
122 $self->{id} = "a0";
123
124 {
125 Scalar::Util::weaken (my $self = $self);
126
127 $self->{hdl} = new AnyEvent::Handle
128 connect => [$self->{host} => $self->{port}],
129 timeout => $self->{timeout},
130 on_error => sub {
131 warn "@_\n";#d#
132 exit 1;
133 },
134 on_read => sub { $self->on_read (@_) },
135 on_eof => $self->{on_eof} || sub { };
136
137 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
138 }
139
140 $self->send_msg (
141 client_hello =>
142 name => $self->{name},
143 expected_version => "2.0",
144 );
145
146 $self
147 }
148
149 sub send_msg {
150 my ($self, $type, %kv) = @_;
151
152 my $data = delete $kv{data};
153
154 if (exists $kv{id_cb}) {
155 my $id = $kv{identifier} ||= ++$self->{id};
156 $self->{id}{$id} = delete $kv{id_cb};
157 }
158
159 my $msg = (touc $type) . "\012"
160 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
161
162 sub id {
163 my ($self) = @_;
164
165
166 }
167
168 if (defined $data) {
169 $msg .= "DataLength=" . (length $data) . "\012"
170 . "Data\012$data";
171 } else {
172 $msg .= "EndMessage\012";
173 }
174
175 $self->{hdl}->push_write ($msg);
176 }
177
178 sub on {
179 my ($self, $cb) = @_;
180
181 # cb return undef - message eaten, remove cb
182 # cb return 0 - message eaten
183 # cb return 1 - pass to next
184
185 push @{ $self->{on} }, $cb;
186 }
187
188 sub _push_queue {
189 my ($self, $queue) = @_;
190
191 warn "oush @$queue\n";#d#
192 shift @$queue;
193 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
194 if @$queue;
195 }
196
197 # lock so only one $type (arbitrary string) is in flight,
198 # to work around horribly misdesigned protocol.
199 sub serialise {
200 my ($self, $type, $cb) = @_;
201
202 my $queue = $self->{serialise}{$type} ||= [];
203 push @$queue, $cb;
204 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
205 unless $#$queue;
206 }
207
208 sub on_read {
209 my ($self) = @_;
210
211 my $type;
212 my %kv;
213 my $rdata;
214
215 my $done_cb = sub {
216 $kv{pkt_type} = $type;
217
218 my $on = $self->{on};
219 for (0 .. $#$on) {
220 unless (my $res = $on->[$_]($type, \%kv, $rdata)) {
221 splice @$on, $_, 1 unless defined $res;
222 return;
223 }
224 }
225
226 if (my $cb = $self->{queue}[0]) {
227 $cb->($self, $type, \%kv, $rdata)
228 and shift @{ $self->{queue} };
229 } else {
230 $self->default_recv ($type, \%kv, $rdata);
231 }
232 };
233
234 my $hdr_cb; $hdr_cb = sub {
235 if ($_[1] =~ /^([^=]+)=(.*)$/) {
236 my ($k, $v) = ($1, $2);
237 my @k = split /\./, tolc $k;
238 my $ro = \\%kv;
239
240 while (@k) {
241 my $k = shift @k;
242 if ($k =~ /^\d+$/) {
243 $ro = \$$ro->[$k];
244 } else {
245 $ro = \$$ro->{$k};
246 }
247 }
248
249 $$ro = $v;
250
251 $_[0]->push_read (line => $hdr_cb);
252 } elsif ($_[1] eq "Data") {
253 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
254 $rdata = \$_[1];
255 $done_cb->();
256 });
257 } elsif ($_[1] eq "EndMessage") {
258 $done_cb->();
259 } else {
260 die "protocol error, expected message end, got $_[1]\n";#d#
261 }
262 };
263
264 $self->{hdl}->push_read (line => sub {
265 $type = tolc $_[1];
266 $_[0]->push_read (line => $hdr_cb);
267 });
268 }
269
270 sub default_recv {
271 my ($self, $type, $kv, $rdata) = @_;
272
273 if ($type eq "node_hello") {
274 $self->{node_hello} = $kv;
275 } elsif (exists $self->{id}{$kv->{identifier}}) {
276 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
277 and delete $self->{id}{$kv->{identifier}};
278 } else {
279 &{ $self->{progress} };
280 }
281 }
282
283 sub _txn {
284 my ($name, $sub) = @_;
285
286 *{$name} = sub {
287 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
288 &$sub;
289 $cv
290 };
291
292 *{"$name\_sync"} = sub {
293 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
294 &$sub;
295 $cv->recv
296 };
297 }
298
299 =item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]])
300
301 =item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]])
302
303 =cut
304
305 _txn list_peers => sub {
306 my ($self, $cv, $with_metadata, $with_volatile) = @_;
307
308 my @res;
309
310 $self->send_msg (list_peers =>
311 with_metadata => $with_metadata ? "true" : "false",
312 with_volatile => $with_volatile ? "true" : "false",
313 id_cb => sub {
314 my ($self, $type, $kv, $rdata) = @_;
315
316 if ($type eq "end_list_peers") {
317 $cv->(\@res);
318 1
319 } else {
320 push @res, $kv;
321 0
322 }
323 },
324 );
325 };
326
327 =item $cv = $fcp->list_peer_notes ($node_identifier)
328
329 =item $notes = $fcp->list_peer_notes_sync ($node_identifier)
330
331 =cut
332
333 _txn list_peer_notes => sub {
334 my ($self, $cv, $node_identifier) = @_;
335
336 $self->send_msg (list_peer_notes =>
337 node_identifier => $node_identifier,
338 id_cb => sub {
339 my ($self, $type, $kv, $rdata) = @_;
340
341 $cv->($kv);
342 1
343 },
344 );
345 };
346
347 =item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
348
349 =item $fcp->watch_global_sync ($enabled[, $verbosity_mask])
350
351 =cut
352
353 _txn watch_global => sub {
354 my ($self, $cv, $enabled, $verbosity_mask) = @_;
355
356 $self->send_msg (watch_global =>
357 enabled => $enabled ? "true" : "false",
358 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
359 );
360
361 $cv->();
362 };
363
364 =item $cv = $fcp->list_persistent_requests
365
366 =item $reqs = $fcp->list_persistent_requests_sync
367
368 =cut
369
370 _txn list_persistent_requests => sub {
371 my ($self, $cv) = @_;
372
373 my %res;
374
375 $self->send_msg ("list_persistent_requests");
376
377 push @{ $self->{queue} }, sub {
378 my ($self, $type, $kv, $rdata) = @_;
379
380 if ($type eq "end_list_persistent_requests") {
381 $cv->(\%res);
382 1
383 } else {
384 my $id = $kv->{identifier};
385
386 if ($type =~ /^persistent_(get|put|put_dir)$/) {
387 $res{$id} = {
388 type => $1,
389 %{ $res{$id} },
390 %$kv,
391 };
392 } elsif ($type eq "simple_progress") {
393 delete $kv->{pkt_type}; # save memory
394 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
395 } else {
396 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
397 }
398 0
399 }
400 };
401 };
402
403 =item $cv = $fcp->remove_request ($global, $identifier)
404
405 =item $status = $fcp->remove_request_sync ($global, $identifier)
406
407 =cut
408
409 _txn remove_request => sub {
410 my ($self, $cv, $global, $identifier) = @_;
411
412 $self->send_msg (remove_request =>
413 global => $global ? "true" : "false",
414 identifier => $identifier,
415 id_cb => sub {
416 my ($self, $type, $kv, $rdata) = @_;
417
418 $cv->($kv);
419 1
420 },
421 );
422 };
423
424 =item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
425
426 =item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
427
428 =cut
429
430 _txn modify_persistent_request => sub {
431 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
432
433 $self->send_msg (modify_persistent_request =>
434 global => $global ? "true" : "false",
435 defined $client_token ? (client_token => $client_token ) : (),
436 defined $priority_class ? (priority_class => $priority_class) : (),
437 identifier => $identifier,
438 id_cb => sub {
439 my ($self, $type, $kv, $rdata) = @_;
440
441 $cv->($kv);
442 1
443 },
444 );
445 };
446
447 =item $cv = $fcp->get_plugin_info ($name, $detailed)
448
449 =item $info = $fcp->get_plugin_info_sync ($name, $detailed)
450
451 =cut
452
453 _txn get_plugin_info => sub {
454 my ($self, $cv, $name, $detailed) = @_;
455
456 $self->send_msg (get_plugin_info =>
457 plugin_name => $name,
458 detailed => $detailed ? "true" : "false",
459 id_cb => sub {
460 my ($self, $type, $kv, $rdata) = @_;
461
462 $cv->($kv);
463 1
464 },
465 );
466 };
467
468 =item $cv = $fcp->client_get ($uri, $identifier, %kv)
469
470 =item $status = $fcp->client_get_sync ($uri, $identifier, %kv)
471
472 %kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
473
474 ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
475 priority_class, persistence, client_token, global, return_type,
476 binary_blob, allowed_mime_types, filename, temp_filename
477
478 =cut
479
480 _txn client_get => sub {
481 my ($self, $cv, $uri, $identifier, %kv) = @_;
482
483 $self->send_msg (client_get =>
484 %kv,
485 uri => $uri,
486 identifier => $identifier,
487 id_cb => sub {
488 my ($self, $type, $kv, $rdata) = @_;
489
490 $cv->($kv);
491 1
492 },
493 );
494 };
495
496 =item $cv = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write)
497
498 =item ($can_read, $can_write) = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write))
499
500 The DDA test in FCP is probably the single most broken protocol - only
501 one directory test can be outstanding at any time, and some guessing and
502 heuristics are involved in mangling the paths.
503
504 This function combines C<TestDDARequest> and C<TestDDAResponse> in one
505 request, handling file reading and writing as well.
506
507 =cut
508
509 _txn test_dda => sub {
510 my ($self, $cv, $local, $remote, $want_read, $want_write) = @_;
511
512 $self->serialise (test_dda => sub {
513 my ($self, $guard) = @_;
514
515 $self->send_msg (test_dda_request =>
516 directory => $remote,
517 want_read_directory => $want_read ? "true" : "false",
518 want_write_directory => $want_write ? "true" : "false",
519 );
520 $self->on (sub {
521 my ($type, $kv) = @_;
522
523 if ($type eq "test_dda_reply") {
524 # the filenames are all relative to the server-side directory,
525 # which might or might not match $remote anymore, so we
526 # need to rewrite the paths to be relative to $local
527 for my $k (qw(read_filename write_filename)) {
528 my $f = $kv->{$k};
529 for my $dir ($kv->{directory}, $remote) {
530 if ($dir eq substr $f, 0, length $dir) {
531 substr $f, 0, 1 + length $dir, "";
532 $kv->{$k} = $f;
533 last;
534 }
535 }
536 }
537
538 my %response = (directory => $remote);
539
540 if (length $kv->{read_filename}) {
541 warn "$local/$kv->{read_filename}";#d#
542 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
543 sysread $fh, my $buf, -s $fh;
544 $response{read_content} = $buf;
545 }
546 }
547
548 if (length $kv->{write_filename}) {
549 if (open my $fh, ">:raw", "$local/$kv->{write_filename}") {
550 syswrite $fh, $kv->{content_to_write};
551 }
552 }
553
554 $self->send_msg (test_dda_response => %response);
555
556 $self->on (sub {
557 my ($type, $kv) = @_;
558
559 $guard if 0; # reference
560
561 if ($type eq "test_dda_complete") {
562 $cv->(
563 $kv->{read_directory_allowed} eq "true",
564 $kv->{write_directory_allowed} eq "true",
565 );
566 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
567 $cv->croak ($kv->{extra_description});
568 return;
569 }
570
571 1
572 });
573
574 return;
575 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
576 $cv->croak ($kv->{extra_description});
577 return;
578 }
579
580 1
581 });
582 });
583 };
584
585 =back
586
587 =head1 EXAMPLE PROGRAM
588
589 use AnyEvent::FCP;
590
591 my $fcp = new AnyEvent::FCP;
592
593 # let us look at the global request list
594 $fcp->watch_global (1, 0);
595
596 # list them, synchronously
597 my $req = $fcp->list_persistent_requests_sync;
598
599 # go through all requests
600 for my $req (values %$req) {
601 # skip jobs not directly-to-disk
602 next unless $req->{return_type} eq "disk";
603 # skip jobs not issued by FProxy
604 next unless $req->{identifier} =~ /^FProxy:/;
605
606 if ($req->{data_found}) {
607 # file has been successfully downloaded
608
609 ... move the file away
610 (left as exercise)
611
612 # remove the request
613
614 $fcp->remove_request (1, $req->{identifier});
615 } elsif ($req->{get_failed}) {
616 # request has failed
617 if ($req->{get_failed}{code} == 11) {
618 # too many path components, should restart
619 } else {
620 # other failure
621 }
622 } else {
623 # modify priorities randomly, to improve download rates
624 $fcp->modify_persistent_request (1, $req->{identifier}, undef, int 6 - 5 * (rand) ** 1.7)
625 if 0.1 > rand;
626 }
627 }
628
629 # see if the dummy plugin is loaded, to ensure all previous requests have finished.
630 $fcp->get_plugin_info_sync ("dummy");
631
632 =head1 SEE ALSO
633
634 L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0>, L<Net::FCP>.
635
636 =head1 BUGS
637
638 =head1 AUTHOR
639
640 Marc Lehmann <schmorp@schmorp.de>
641 http://home.schmorp.de/
642
643 =cut
644
645 1
646