ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.2 by root, Sat Jul 25 06:28:49 2009 UTC vs.
Revision 1.20 by root, Sun Jun 12 01:32:37 2016 UTC

2 2
3AnyEvent::FCP - freenet client protocol 2.0 3AnyEvent::FCP - freenet client protocol 2.0
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::FCP; 7 use AnyEvent::FCP;
8 8
9 my $fcp = new AnyEvent::FCP; 9 my $fcp = new AnyEvent::FCP;
10 10
11 my $ni = $fcp->txn_node_info->result; 11 # transactions return condvars
12 my $ni = $fcp->node_info; 12 my $lp_cv = $fcp->list_peers;
13 my $pr_cv = $fcp->list_persistent_requests;
14
15 my $peers = $lp_cv->recv;
16 my $reqs = $pr_cv->recv;
13 17
14=head1 DESCRIPTION 18=head1 DESCRIPTION
15 19
16This module implements the freenet client protocol version 2.0, as used by 20This module implements the freenet client protocol version 2.0, as used by
17freenet 0.7. See L<Net::FCP> for the earlier freenet 0.5 version. 21freenet 0.7. See L<Net::FCP> for the earlier freenet 0.5 version.
18 22
19See L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0> for a description 23See L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0> for a
20of what the messages do. 24description of what the messages do.
21 25
22The module uses L<AnyEvent> to find a suitable event module. 26The module uses L<AnyEvent> to find a suitable event module.
23 27
28Only very little is implemented, ask if you need more, and look at the
29example program later in this section.
30
31=head2 EXAMPLE
32
33This example fetches the download list and sets the priority of all files
34with "a" in their name to "emergency":
35
36 use AnyEvent::FCP;
37
38 my $fcp = new AnyEvent::FCP;
39
40 $fcp->watch_global (1, 0);
41 my $req = $fcp->list_persistent_requests;
42
43TODO
44 for my $req (values %$req) {
45 if ($req->{filename} =~ /a/) {
46 $fcp->modify_persistent_request (1, $req->{identifier}, undef, 0);
47 }
48 }
49
24=head2 IMPORT TAGS 50=head2 IMPORT TAGS
25 51
26Nothing much can be "imported" from this module right now. 52Nothing much can be "imported" from this module right now.
27 53
28=head2 FREENET BASICS 54=head1 THE AnyEvent::FCP CLASS
29
30Ok, this section will not explain any freenet basics to you, just some
31problems I found that you might want to avoid:
32 55
33=over 4 56=over 4
34 57
35=item freenet URIs are _NOT_ URIs
36
37Whenever a "uri" is required by the protocol, freenet expects a kind of
38URI prefixed with the "freenet:" scheme, e.g. "freenet:CHK...". However,
39these are not URIs, as freeent fails to parse them correctly, that is, you
40must unescape an escaped characters ("%2c" => ",") yourself. Maybe in the
41future this library will do it for you, so watch out for this incompatible
42change.
43
44=back
45
46=head2 THE AnyEvent::FCP CLASS
47
48=over 4
49
50=cut 58=cut
51 59
52package AnyEvent::FCP; 60package AnyEvent::FCP;
53 61
54use common::sense; 62use common::sense;
55 63
56use Carp; 64use Carp;
57 65
58our $VERSION = '0.1'; 66our $VERSION = 0.4;
59 67
60use Scalar::Util (); 68use Scalar::Util ();
61 69
62use AnyEvent; 70use AnyEvent;
63use AnyEvent::Handle; 71use AnyEvent::Handle;
72use AnyEvent::Util ();
73
74our %TOLC; # tolc cache
64 75
65sub touc($) { 76sub touc($) {
66 local $_ = shift; 77 local $_ = shift;
67 1 while s/((?:^|_)(?:svk|chk|uri|fcp)(?:_|$))/\U$1/; 78 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
68 s/(?:^|_)(.)/\U$1/g; 79 s/(?:^|_)(.)/\U$1/g;
69 $_ 80 $_
70} 81}
71 82
72sub tolc($) { 83sub tolc($) {
73 local $_ = shift; 84 local $_ = shift;
74 1 while s/(SVK|CHK|URI|FCP)([^_])/$1\_$2/i; 85 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/;
75 1 while s/([^_])(SVK|CHK|URI|FCP)/$1\_$2/i; 86 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/;
76 s/(?<=[a-z])(?=[A-Z])/_/g; 87 s/(?<=[a-z])(?=[A-Z])/_/g;
77 lc 88 lc
78} 89}
79 90
80=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 91=item $fcp = new AnyEvent::FCP key => value...;
81 92
82Create a new FCP connection to the given host and port (default 93Create a new FCP connection to the given host and port (default
83127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). 94127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
84 95
85If no C<name> was specified, then AnyEvent::FCP will generate a (hopefully) 96If no C<name> was specified, then AnyEvent::FCP will generate a
86unique client name for you. 97(hopefully) unique client name for you.
87 98
88#TODO 99The following keys can be specified (they are all optional):
89#You can install a progress callback that is being called with the AnyEvent::FCP 100
90#object, a txn object, the type of the transaction and the attributes. Use 101=over 4
91#it like this: 102
92# 103=item name => $string
93# sub progress_cb { 104
94# my ($self, $txn, $type, $attr) = @_; 105A unique name to identify this client. If none is specified, a randomly
95# 106generated name will be used.
96# warn "progress<$txn,$type," . (join ":", %$attr) . ">\n"; 107
97# } 108=item host => $hostname
109
110The hostname or IP address of the freenet node. Default is C<$ENV{FREDHOST}>
111or C<127.0.0.1>.
112
113=item port => $portnumber
114
115The port number of the FCP port. Default is C<$ENV{FREDPORT}> or C<9481>.
116
117=item timeout => $seconds
118
119The timeout, in seconds, after which a connection error is assumed when
120there is no activity. Default is C<7200>, i.e. two hours.
121
122=item keepalive => $seconds
123
124The interval, in seconds, at which keepalive messages will be
125sent. Default is C<540>, i.e. nine minutes.
126
127These keepalive messages are useful both to detect that a connection is
128no longer working and to keep any (home) routers from expiring their
129masquerading entry.
130
131=item on_error => $callback->($fcp, $message)
132
133Invoked on any (fatal) errors, such as unexpected connection close. The
134callback receives the FCP object and a textual error message.
135
136=item on_failure => $callback->($fcp, $backtrace, $args, $error)
137
138Invoked when an FCP request fails that didn't have a failure callback. See
139L<FCP REQUESTS> for details.
140
141=back
98 142
99=cut 143=cut
100 144
101sub new { 145sub new {
102 my $class = shift; 146 my $class = shift;
147
148 my $rand = join "", map chr 0x21 + rand 94, 1..40; # ~ 262 bits entropy
149
103 my $self = bless { @_ }, $class; 150 my $self = bless {
104 151 host => $ENV{FREDHOST} || "127.0.0.1",
105 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 152 port => $ENV{FREDPORT} || 9481,
106 $self->{port} ||= $ENV{FREDPORT} || 9481; 153 timeout => 3600 * 2,
107 $self->{name} ||= time.rand.rand.rand; # lame 154 keepalive => 9 * 60,
108 $self->{timeout} ||= 600; 155 name => time.rand.rand.rand, # lame
109 156 @_,
110 $self->{id} = "a0"; 157 queue => [],
158 req => {},
159 prefix => "..:aefcpid:$rand:",
160 idseq => "a0",
161 }, $class;
111 162
112 { 163 {
113 Scalar::Util::weaken (my $self = $self); 164 Scalar::Util::weaken (my $self = $self);
165
166 $self->{kw} = AE::timer $self->{keepalive}, $self->{keepalive}, sub {
167 $self->{hdl}->push_write ("\n");
168 };
169
170 our $ENDMESSAGE = qr<\012(EndMessage|Data)\012>;
171
172 # these are declared here for performance reasons
173 my ($k, $v, $type);
174 my $rdata;
175
176 my $on_read = sub {
177 my ($hdl) = @_;
178
179 # we only carve out whole messages here
180 while ($hdl->{rbuf} =~ /\012(EndMessage|Data)\012/) {
181 # remember end marker
182 $rdata = $1 eq "Data"
183 or $1 eq "EndMessage"
184 or return $self->fatal ("protocol error, expected message end, got $1\n");
185
186 my @lines = split /\012/, substr $hdl->{rbuf}, 0, $-[0];
187
188 substr $hdl->{rbuf}, 0, $+[0], ""; # remove pkg
189
190 $type = shift @lines;
191 $type = ($TOLC{$type} ||= tolc $type);
192
193 my %kv;
194
195 for (@lines) {
196 ($k, $v) = split /=/, $_, 2;
197 $k = ($TOLC{$k} ||= tolc $k);
198
199 if ($k =~ /\./) {
200 # generic, slow case
201 my @k = split /\./, $k;
202 my $ro = \\%kv;
203
204 while (@k) {
205 $k = shift @k;
206 if ($k =~ /^\d+$/) {
207 $ro = \$$ro->[$k];
208 } else {
209 $ro = \$$ro->{$k};
210 }
211 }
212
213 $$ro = $v;
214
215 next;
216 }
217
218 # special comon case, for performance only
219 $kv{$k} = $v;
220 }
221
222 if ($rdata) {
223 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
224 $rdata = \$_[1];
225 $self->recv ($type, \%kv, $rdata);
226 });
227
228 last; # do not tgry to parse more messages
229 } else {
230 $self->recv ($type, \%kv);
231 }
232 }
233 };
114 234
115 $self->{hdl} = new AnyEvent::Handle 235 $self->{hdl} = new AnyEvent::Handle
116 connect => [$self->{host} => $self->{port}], 236 connect => [$self->{host} => $self->{port}],
117 timeout => $self->{timeout}, 237 timeout => $self->{timeout},
238 on_read => $on_read,
239 on_eof => $self->{on_eof},
118 on_error => sub { 240 on_error => sub {
119 warn "<@_>\n"; 241 $self->fatal ($_[2]);
120 exit 1;
121 }, 242 },
122 on_read => sub { $self->on_read (@_) }, 243 ;
123 on_eof => $self->{on_eof} || sub { };
124 244
125 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 245 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
126 } 246 }
127 247
128 $self->send_msg ( 248 $self->send_msg (client_hello =>
129 client_hello =>
130 name => $self->{name}, 249 name => $self->{name},
131 expected_version => "2.0", 250 expected_version => "2.0",
132 ); 251 );
133 252
134 $self 253 $self
135} 254}
136 255
137sub progress { 256sub fatal {
138 my ($self, $txn, $type, $attr) = @_; 257 my ($self, $msg) = @_;
139 258
140 $self->{progress}->($self, $txn, $type, $attr) 259 $self->{hdl}->shutdown;
141 if $self->{progress}; 260 delete $self->{kw};
261
262 if ($self->{on_error}) {
263 $self->{on_error}->($self, $msg);
264 } else {
265 die $msg;
266 }
267}
268
269sub identifier {
270 $_[0]{prefix} . ++$_[0]{idseq}
142} 271}
143 272
144sub send_msg { 273sub send_msg {
145 my ($self, $type, %kv) = @_; 274 my ($self, $type, %kv) = @_;
146 275
147 my $data = delete $kv{data}; 276 my $data = delete $kv{data};
148 277
149 if (exists $kv{id_cb}) { 278 if (exists $kv{id_cb}) {
150 my $id = $kv{identifier} || ++$self->{id}; 279 my $id = $kv{identifier} ||= $self->identifier;
151 $self->{id}{$id} = delete $kv{id_cb}; 280 $self->{id}{$id} = delete $kv{id_cb};
152 $kv{identifier} = $id;
153 } 281 }
154 282
155 my $msg = (touc $type) . "\012" 283 my $msg = (touc $type) . "\012"
156 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; 284 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
157 285
169 } 297 }
170 298
171 $self->{hdl}->push_write ($msg); 299 $self->{hdl}->push_write ($msg);
172} 300}
173 301
174sub on_read { 302sub on {
175 my ($self) = @_; 303 my ($self, $cb) = @_;
176 304
177 my $type; 305 # cb return undef - message eaten, remove cb
178 my %kv; 306 # cb return 0 - message eaten
179 my $rdata; 307 # cb return 1 - pass to next
180 308
181 my $done_cb = sub { 309 push @{ $self->{on} }, $cb;
182 $kv{pkt_type} = $type; 310}
183 311
184 if (my $cb = $self->{queue}[0]) { 312sub _push_queue {
185 $cb->($self, $type, \%kv, $rdata) 313 my ($self, $queue) = @_;
186 and shift @{ $self->{queue} }; 314
187 } else { 315 shift @$queue;
188 $self->default_recv ($type, \%kv, $rdata); 316 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
317 if @$queue;
318}
319
320# lock so only one $type (arbitrary string) is in flight,
321# to work around horribly misdesigned protocol.
322sub serialise {
323 my ($self, $type, $cb) = @_;
324
325 my $queue = $self->{serialise}{$type} ||= [];
326 push @$queue, $cb;
327 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
328 unless $#$queue;
329}
330
331# how to merge these types into $self->{persistent}
332our %PERSISTENT_TYPE = (
333 persistent_get => sub { %{ $_[1] } = (type => "persistent_get" , %{ $_[2] }) },
334 persistent_put => sub { %{ $_[1] } = (type => "persistent_put" , %{ $_[2] }) },
335 persistent_put_dir => sub { %{ $_[1] } = (type => "persistent_put_dir", %{ $_[2] }) },
336 persistent_request_modified => sub { %{ $_[1] } = (%{ $_[1] }, %{ $_[2] }) },
337 persistent_request_removed => sub { delete $_[0]{req}{$_[2]{identifier}} },
338
339 simple_progress => sub { $_[1]{simple_progress} = $_[2] }, # get/put
340
341 uri_generated => sub { $_[1]{uri_generated} = $_[2] }, # put
342 generated_metadata => sub { $_[1]{generated_metadata} = $_[2] }, # put
343 started_compression => sub { $_[1]{started_compression} = $_[2] }, # put
344 finished_compression => sub { $_[1]{finished_compression} = $_[2] }, # put
345 put_fetchable => sub { $_[1]{put_fetchable} = $_[2] }, # put
346 put_failed => sub { $_[1]{put_failed} = $_[2] }, # put
347 put_successful => sub { $_[1]{put_successful} = $_[2] }, # put
348
349 sending_to_network => sub { $_[1]{sending_to_network} = $_[2] }, # get
350 compatibility_mode => sub { $_[1]{compatibility_mode} = $_[2] }, # get
351 expected_hashes => sub { $_[1]{expected_hashes} = $_[2] }, # get
352 expected_mime => sub { $_[1]{expected_mime} = $_[2] }, # get
353 expected_data_length => sub { $_[1]{expected_data_length} = $_[2] }, # get
354 get_failed => sub { $_[1]{get_failed} = $_[2] }, # get
355 data_found => sub { $_[1]{data_found} = $_[2] }, # get
356 enter_finite_cooldown => sub { $_[1]{enter_finite_cooldown} = $_[2] }, # get
357);
358
359sub recv {
360 my ($self, $type, $kv, @extra) = @_;
361
362 if (my $cb = $PERSISTENT_TYPE{$type}) {
363 my $id = $kv->{identifier};
364 my $req = $_[0]{req}{$id} ||= {};
365 $cb->($self, $req, $kv);
366 $self->recv (request_changed => $kv, $type, @extra);
367 }
368
369 my $on = $self->{on};
370 for (0 .. $#$on) {
371 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
372 splice @$on, $_, 1 unless defined $res;
373 return;
189 } 374 }
190 }; 375 }
191 376
192 my $hdr_cb; $hdr_cb = sub { 377 if (my $cb = $self->{queue}[0]) {
193 if ($_[1] =~ /^([^=]+)=(.*)$/) { 378 $cb->($self, $type, $kv, @extra)
194 my ($k, $v) = ($1, $2); 379 and shift @{ $self->{queue} };
195 my @k = split /\./, tolc $k;
196 my $ro = \\%kv;
197
198 while (@k) {
199 my $k = shift @k;
200 if ($k =~ /^\d+$/) {
201 $ro = \$$ro->[$k];
202 } else {
203 $ro = \$$ro->{$k};
204 }
205 }
206
207 $$ro = $v;
208
209 $_[0]->push_read (line => $hdr_cb);
210 } elsif ($_[1] eq "Data") {
211 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
212 $rdata = \$_[1];
213 $done_cb->();
214 });
215 } elsif ($_[1] eq "EndMessage") {
216 $done_cb->();
217 } else { 380 } else {
218 die "protocol error, expected message end, got $_[1]\n";#d# 381 $self->default_recv ($type, $kv, @extra);
219 }
220 }; 382 }
221
222 $self->{hdl}->push_read (line => sub {
223 $type = tolc $_[1];
224 $_[0]->push_read (line => $hdr_cb);
225 });
226} 383}
227 384
228sub default_recv { 385sub default_recv {
229 my ($self, $type, $kv, $rdata) = @_; 386 my ($self, $type, $kv, $rdata) = @_;
230 387
231 if ($type eq "node_hello") { 388 if ($type eq "node_hello") {
232 $self->{node_hello} = $kv; 389 $self->{node_hello} = $kv;
233 } elsif (exists $self->{id}{$kv->{identifier}}) { 390 } elsif (exists $self->{id}{$kv->{identifier}}) {
234 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) 391 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
235 and delete $self->{id}{$kv->{identifier}}; 392 and delete $self->{id}{$kv->{identifier}};
236 } else {
237 # on_warn
238 #warn "protocol warning (unexpected $type message)\n";
239 } 393 }
240} 394}
395
396=back
397
398=head2 FCP REQUESTS
399
400The following methods implement various requests. Most of them map
401directory to the FCP message of the same name. The added benefit of
402these over sending requests yourself is that they handle the necessary
403serialisation, protocol quirks, and replies.
404
405All of them exist in two versions, the variant shown in this manpage, and
406a variant with an extra C<_> at the end, and an extra C<$cb> argument. The
407version as shown is I<synchronous> - it will wait for any replies, and
408either return the reply, or croak with an error. The underscore variant
409returns immediately and invokes one or more callbacks or condvars later.
410
411For example, the call
412
413 $info = $fcp->get_plugin_info ($name, $detailed);
414
415Also comes in this underscore variant:
416
417 $fcp->get_plugin_info_ ($name, $detailed, $cb);
418
419You can thinbk of the underscore as a kind of continuation indicator - the
420normal function waits and returns with the data, the C<_> indicates that
421you pass the continuation yourself, and the continuation will be invoked
422with the results.
423
424This callback/continuation argument (C<$cb>) can come in three forms itself:
425
426=over 4
427
428=item A code reference (or rather anything not matching some other alternative)
429
430This code reference will be invoked with the result on success. On an
431error, it will invoke the C<on_failure> callback of the FCP object, or,
432if none was defined, will die (in the event loop) with a backtrace of the
433call site.
434
435This is a popular choice, but it makes handling errors hard - make sure
436you never generate protocol errors!
437
438If an C<on_failure> hook exists, it will be invoked with the FCP object,
439a (textual) backtrace as generated by C<Carp::longmess>, and arrayref
440containing the arguments from the original request invocation and the
441error object from the server, in this order, e.g.:
442
443 on_failure => sub {
444 my ($fcp, $backtrace, $orig_args, $error_object) = @_;
445 ...
446 },
447
448=item A condvar (as returned by e.g. C<< AnyEvent->condvar >>)
449
450When a condvar is passed, it is sent (C<< $cv->send ($results) >>) the
451results when the request has finished. Should an error occur, the error
452will instead result in C<< $cv->croak ($error) >>.
453
454This is also a popular choice.
455
456=item An array with two callbacks C<[$success, $failure]>
457
458The C<$success> callback will be invoked with the results, while the
459C<$failure> callback will be invoked on any errors.
460
461The C<$failure> callback will be invoked with the error object from the
462server.
463
464=item C<undef>
465
466This is the same thing as specifying C<sub { }> as callback, i.e. on
467success, the results are ignored, while on failure, the C<on_failure> hook
468is invoked or the module dies with a backtrace.
469
470This is good for quick scripts, or when you really aren't interested in
471the results.
472
473=back
474
475=cut
476
477our $NOP_CB = sub { };
241 478
242sub _txn { 479sub _txn {
243 my ($name, $sub) = @_; 480 my ($name, $sub) = @_;
244 481
245 *{$name} = sub { 482 *{$name} = sub {
246 splice @_, 1, 0, (my $cv = AnyEvent->condvar); 483 my $cv = AE::cv;
247 &$sub;
248 $cv
249 };
250 484
251 *{"$name\_sync"} = sub { 485 splice @_, 1, 0, $cv, sub { $cv->croak ($_[0]{extra_description}) };
252 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
253 &$sub; 486 &$sub;
254 $cv->recv 487 $cv->recv
255 }; 488 };
489
490 *{"$name\_"} = sub {
491 my ($ok, $err) = pop;
492
493 if (ARRAY:: eq ref $ok) {
494 ($ok, $err) = @$ok;
495 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
496 $err = sub { $ok->croak ($_[0]{extra_description}) };
497 } else {
498 my $bt = Carp::longmess "AnyEvent::FCP request $name";
499 Scalar::Util::weaken (my $self = $_[0]);
500 my $args = [@_]; shift @$args;
501 $err = sub {
502 if ($self->{on_failure}) {
503 $self->{on_failure}($self, $args, $bt, $_[0]);
504 } else {
505 die "$_[0]{code_description} ($_[0]{extra_description})$bt";
506 }
507 };
508 }
509
510 $ok ||= $NOP_CB;
511
512 splice @_, 1, 0, $ok, $err;
513 &$sub;
514 };
256} 515}
516
517=over 4
518
519=item $peers = $fcp->list_peers ([$with_metdata[, $with_volatile]])
520
521=cut
257 522
258_txn list_peers => sub { 523_txn list_peers => sub {
259 my ($self, $cv, $with_metadata, $with_volatile) = @_; 524 my ($self, $ok, undef, $with_metadata, $with_volatile) = @_;
260 525
261 my @res; 526 my @res;
262 527
263 $self->send_msg (list_peers => 528 $self->send_msg (list_peers =>
264 with_metadata => $with_metadata ? "true" : "false", 529 with_metadata => $with_metadata ? "true" : "false",
265 with_volatile => $with_volatile ? "true" : "false", 530 with_volatile => $with_volatile ? "true" : "false",
266 id_cb => sub { 531 id_cb => sub {
267 my ($self, $type, $kv, $rdata) = @_; 532 my ($self, $type, $kv, $rdata) = @_;
268 533
269 if ($type eq "end_list_peers") { 534 if ($type eq "end_list_peers") {
270 $cv->(\@res); 535 $ok->(\@res);
271 1 536 1
272 } else { 537 } else {
273 push @res, $kv; 538 push @res, $kv;
274 0 539 0
275 } 540 }
276 }, 541 },
277 ); 542 );
278}; 543};
279 544
545=item $notes = $fcp->list_peer_notes ($node_identifier)
546
547=cut
548
280_txn list_peer_notes => sub { 549_txn list_peer_notes => sub {
281 my ($self, $cv, $node_identifier) = @_; 550 my ($self, $ok, undef, $node_identifier) = @_;
282 551
283 $self->send_msg (list_peer_notes => 552 $self->send_msg (list_peer_notes =>
284 node_identifier => $node_identifier, 553 node_identifier => $node_identifier,
285 id_cb => sub { 554 id_cb => sub {
286 my ($self, $type, $kv, $rdata) = @_; 555 my ($self, $type, $kv, $rdata) = @_;
287 556
288 $cv->($kv); 557 $ok->($kv);
289 1 558 1
290 }, 559 },
291 ); 560 );
292}; 561};
293 562
563=item $fcp->watch_global ($enabled[, $verbosity_mask])
564
565=cut
566
294_txn watch_global => sub { 567_txn watch_global => sub {
295 my ($self, $cv, $enabled, $verbosity_mask) = @_; 568 my ($self, $ok, $err, $enabled, $verbosity_mask) = @_;
296 569
297 $self->send_msg (watch_global => 570 $self->send_msg (watch_global =>
298 enabled => $enabled ? "true" : "false", 571 enabled => $enabled ? "true" : "false",
299 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (), 572 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
300 ); 573 );
301 574
302 $cv->(); 575 $ok->();
303}; 576};
304 577
578=item $reqs = $fcp->list_persistent_requests
579
580=cut
581
305_txn list_persistent_requests => sub { 582_txn list_persistent_requests => sub {
306 my ($self, $cv) = @_; 583 my ($self, $ok, $err) = @_;
307 584
585 $self->serialise (list_persistent_requests => sub {
586 my ($self, $guard) = @_;
587
308 my %res; 588 my @res;
309 589
310 $self->send_msg ("list_persistent_requests"); 590 $self->send_msg ("list_persistent_requests");
311 591
312 push @{ $self->{queue} }, sub { 592 $self->on (sub {
313 my ($self, $type, $kv, $rdata) = @_; 593 my ($self, $type, $kv, $rdata) = @_;
314 594
595 $guard if 0;
596
315 if ($type eq "end_list_persistent_requests") { 597 if ($type eq "end_list_persistent_requests") {
316 $cv->(\%res); 598 $ok->(\@res);
599 return;
600 } else {
601 my $id = $kv->{identifier};
602
603 if ($type =~ /^persistent_(get|put|put_dir)$/) {
604 push @res, [$type, $kv];
605 }
606 }
607
317 1 608 1
318 } else { 609 });
319 my $id = $kv->{identifier}; 610 });
611};
320 612
321 if ($type =~ /^persistent_(get|put|put_dir)$/) { 613=item $sync = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
322 $res{$id} = { 614
323 type => $1, 615Update either the C<client_token> or C<priority_class> of a request
324 %{ $res{$id} }, 616identified by C<$global> and C<$identifier>, depending on which of
617C<$client_token> and C<$priority_class> are not C<undef>.
618
619=cut
620
621_txn modify_persistent_request => sub {
622 my ($self, $ok, $err, $global, $identifier, $client_token, $priority_class) = @_;
623
624 $self->serialise ($identifier => sub {
625 my ($self, $guard) = @_;
626
627 $self->send_msg (modify_persistent_request =>
628 global => $global ? "true" : "false",
629 identifier => $identifier,
630 defined $client_token ? (client_token => $client_token ) : (),
631 defined $priority_class ? (priority_class => $priority_class) : (),
632 );
633
634 $self->on (sub {
635 my ($self, $type, $kv, @extra) = @_;
636
637 $guard if 0;
638
639 if ($kv->{identifier} eq $identifier) {
640 if ($type eq "persistent_request_modified") {
325 %$kv, 641 $ok->($kv);
642 return;
643 } elsif ($type eq "protocol_error") {
644 $err->($kv);
645 return;
326 }; 646 }
327 } elsif ($type eq "simple_progress") {
328 delete $kv->{pkt_type}; # save memory
329 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
330 } else {
331 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
332 } 647 }
648
333 0 649 1
334 } 650 });
335 }; 651 });
336}; 652};
337 653
338_txn remove_request => sub { 654=item $info = $fcp->get_plugin_info ($name, $detailed)
339 my ($self, $cv, $global, $identifier) = @_;
340 655
341 $self->send_msg (remove_request => 656=cut
342 global => $global ? "true" : "false",
343 identifier => $identifier,
344 id_cb => sub {
345 my ($self, $type, $kv, $rdata) = @_;
346
347 $cv->($kv);
348 1
349 },
350 );
351
352 $cv->();
353};
354
355_txn modify_persistent_request => sub {
356 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
357
358 $self->send_msg (modify_persistent_request =>
359 global => $global ? "true" : "false",
360 identifier => $identifier,
361 defined $client_token ? (client_token => $client_token ) : (),
362 defined $priority_class ? (priority_class => $priority_class) : (),
363 id_cb => sub {
364 my ($self, $type, $kv, $rdata) = @_;
365
366 $cv->($kv);
367 1
368 },
369 );
370
371 $cv->();
372};
373 657
374_txn get_plugin_info => sub { 658_txn get_plugin_info => sub {
375 my ($self, $cv, $name, $detailed) = @_; 659 my ($self, $ok, $err, $name, $detailed) = @_;
660
661 my $id = $self->identifier;
376 662
377 $self->send_msg (get_plugin_info => 663 $self->send_msg (get_plugin_info =>
664 identifier => $id,
378 plugin_name => $name, 665 plugin_name => $name,
379 detailed => $detailed ? "true" : "false", 666 detailed => $detailed ? "true" : "false",
380 id_cb => sub { 667 );
668 $self->on (sub {
669 my ($self, $type, $kv) = @_;
670
671 if ($kv->{identifier} eq $id) {
672 if ($type eq "get_plugin_info") {
673 $ok->($kv);
674 } else {
675 $err->($kv, $type);
676 }
677 return;
678 }
679
680 1
681 });
682};
683
684=item $status = $fcp->client_get ($uri, $identifier, %kv)
685
686%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
687
688ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
689priority_class, persistence, client_token, global, return_type,
690binary_blob, allowed_mime_types, filename, temp_filename
691
692=cut
693
694_txn client_get => sub {
695 my ($self, $ok, $err, $uri, $identifier, %kv) = @_;
696
697 $self->serialise ($identifier => sub {
698 my ($self, $guard) = @_;
699
700 $self->send_msg (client_get =>
701 %kv,
702 uri => $uri,
703 identifier => $identifier,
704 );
705
706 $self->on (sub {
381 my ($self, $type, $kv, $rdata) = @_; 707 my ($self, $type, $kv, @extra) = @_;
382 708
709 $guard if 0;
710
711 if ($kv->{identifier} eq $identifier) {
712 if ($type eq "persistent_get") {
383 $cv->($kv); 713 $ok->($kv);
714 return;
715 } elsif ($type eq "protocol_error") {
716 $err->($kv);
717 return;
718 }
719 }
720
384 1 721 1
722 });
723 });
724};
725
726=item $status = $fcp->remove_request ($identifier[, $global])
727
728Remove the request with the given isdentifier. Returns true if successful,
729false on error.
730
731=cut
732
733_txn remove_request => sub {
734 my ($self, $ok, $err, $identifier, $global) = @_;
735
736 $self->serialise ($identifier => sub {
737 my ($self, $guard) = @_;
738
739 $self->send_msg (remove_request =>
740 identifier => $identifier,
741 global => $global ? "true" : "false",
742 );
743 $self->on (sub {
744 my ($self, $type, $kv, @extra) = @_;
745
746 $guard if 0;
747
748 if ($kv->{identifier} eq $identifier) {
749 if ($type eq "persistent_request_removed") {
750 $ok->(1);
751 return;
752 } elsif ($type eq "protocol_error") {
753 $err->($kv);
754 return;
755 }
756 }
757
758 1
759 });
760 });
761};
762
763=item ($can_read, $can_write) = $fcp->test_dda ($local_directory, $remote_directory, $want_read, $want_write))
764
765The DDA test in FCP is probably the single most broken protocol - only
766one directory test can be outstanding at any time, and some guessing and
767heuristics are involved in mangling the paths.
768
769This function combines C<TestDDARequest> and C<TestDDAResponse> in one
770request, handling file reading and writing as well, and tries very hard to
771do the right thing.
772
773Both C<$local_directory> and C<$remote_directory> must specify the same
774directory - C<$local_directory> is the directory path on the client (where
775L<AnyEvent::FCP> runs) and C<$remote_directory> is the directory path on
776the server (where the freenet node runs). When both are running on the
777same node, the paths are generally identical.
778
779C<$want_read> and C<$want_write> should be set to a true value when you
780want to read (get) files or write (put) files, respectively.
781
782On error, an exception is thrown. Otherwise, C<$can_read> and
783C<$can_write> indicate whether you can reaqd or write to freenet via the
784directory.
785
786=cut
787
788_txn test_dda => sub {
789 my ($self, $ok, $err, $local, $remote, $want_read, $want_write) = @_;
790
791 $self->serialise (test_dda => sub {
792 my ($self, $guard) = @_;
793
794 $self->send_msg (test_dda_request =>
795 directory => $remote,
796 want_read_directory => $want_read ? "true" : "false",
797 want_write_directory => $want_write ? "true" : "false",
798 );
799 $self->on (sub {
800 my ($self, $type, $kv) = @_;
801
802 if ($type eq "test_dda_reply") {
803 # the filenames are all relative to the server-side directory,
804 # which might or might not match $remote anymore, so we
805 # need to rewrite the paths to be relative to $local
806 for my $k (qw(read_filename write_filename)) {
807 my $f = $kv->{$k};
808 for my $dir ($kv->{directory}, $remote) {
809 if ($dir eq substr $f, 0, length $dir) {
810 substr $f, 0, 1 + length $dir, "";
811 $kv->{$k} = $f;
812 last;
813 }
814 }
815 }
816
817 my %response = (directory => $remote);
818
819 if (length $kv->{read_filename}) {
820 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
821 sysread $fh, my $buf, -s $fh;
822 $response{read_content} = $buf;
823 }
824 }
825
826 if (length $kv->{write_filename}) {
827 if (open my $fh, ">:raw", "$local/$kv->{write_filename}") {
828 syswrite $fh, $kv->{content_to_write};
829 }
830 }
831
832 $self->send_msg (test_dda_response => %response);
833
834 $self->on (sub {
835 my ($self, $type, $kv) = @_;
836
837 $guard if 0; # reference
838
839 if ($type eq "test_dda_complete") {
840 $ok->(
841 $kv->{read_directory_allowed} eq "true",
842 $kv->{write_directory_allowed} eq "true",
843 );
844 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
845 $err->($kv->{extra_description});
846 return;
847 }
848
849 1
850 });
851
852 return;
853 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
854 $err->($kv);
855 return;
856 }
857
858 1
859 });
860 });
861};
862
863=back
864
865=head2 REQUEST CACHE
866
867The C<AnyEvent::FCP> class keeps a request cache, where it caches all
868information from requests.
869
870For these messages, it will store a copy of the key-value pairs, together with a C<type> slot,
871in C<< $fcp->{req}{$identifier} >>:
872
873 persistent_get
874 persistent_put
875 persistent_put_dir
876
877This message updates the stored data:
878
879 persistent_request_modified
880
881This message will remove this entry:
882
883 persistent_request_removed
884
885These messages get merged into the cache entry, under their
886type, i.e. a C<simple_progress> message will be stored in C<<
887$fcp->{req}{$identifier}{simple_progress} >>:
888
889 simple_progress # get/put
890
891 uri_generated # put
892 generated_metadata # put
893 started_compression # put
894 finished_compression # put
895 put_failed # put
896 put_fetchable # put
897 put_successful # put
898
899 sending_to_network # get
900 compatibility_mode # get
901 expected_hashes # get
902 expected_mime # get
903 expected_data_length # get
904 get_failed # get
905 data_found # get
906 enter_finite_cooldown # get
907
908In addition, an event (basically a fake message) of type C<request_changed> is generated
909on every change, which will be called as C<< $cb->($fcp, $kv, $type) >>, where C<$type>
910is the type of the original message triggering the change,
911
912To fill this cache with the global queue and keep it updated,
913call C<watch_global> to subscribe to updates, followed by
914C<list_persistent_requests_sync>.
915
916 $fcp->watch_global_sync_; # do not wait
917 $fcp->list_persistent_requests; # wait
918
919To get a better idea of what is stored in the cache, here is an example of
920what might be stored in C<< $fcp->{req}{"Frost-gpl.txt"} >>:
921
922 {
923 identifier => "Frost-gpl.txt",
924 uri => 'CHK@Fnx5kzdrfE,EImdzaVyEWl,AAIC--8/gpl.txt',
925 binary_blob => "false",
926 global => "true",
927 max_retries => -1,
928 max_size => 9223372036854775807,
929 persistence => "forever",
930 priority_class => 3,
931 real_time => "false",
932 return_type => "direct",
933 started => "true",
934 type => "persistent_get",
935 verbosity => 2147483647,
936 sending_to_network => {
937 identifier => "Frost-gpl.txt",
938 global => "true",
385 }, 939 },
386 ); 940 compatibility_mode => {
941 identifier => "Frost-gpl.txt",
942 definitive => "true",
943 dont_compress => "false",
944 global => "true",
945 max => "COMPAT_1255",
946 min => "COMPAT_1255",
947 },
948 expected_hashes => {
949 identifier => "Frost-gpl.txt",
950 global => "true",
951 hashes => {
952 ed2k => "d83596f5ee3b7...",
953 md5 => "e0894e4a2a6...",
954 sha1 => "...",
955 sha256 => "...",
956 sha512 => "...",
957 tth => "...",
958 },
959 },
960 expected_mime => {
961 identifier => "Frost-gpl.txt",
962 global => "true",
963 metadata => { content_type => "application/rar" },
964 },
965 expected_data_length => {
966 identifier => "Frost-gpl.txt",
967 data_length => 37576,
968 global => "true",
969 },
970 simple_progress => {
971 identifier => "Frost-gpl.txt",
972 failed => 0,
973 fatally_failed => 0,
974 finalized_total => "true",
975 global => "true",
976 last_progress => 1438639282628,
977 required => 372,
978 succeeded => 102,
979 total => 747,
980 },
981 data_found => {
982 identifier => "Frost-gpl.txt",
983 completion_time => 1438663354026,
984 data_length => 37576,
985 global => "true",
986 metadata => { content_type => "image/jpeg" },
987 startup_time => 1438657196167,
988 },
989 }
387 990
388 $cv->(); 991=head1 EXAMPLE PROGRAM
389};
390 992
391=back 993 use AnyEvent::FCP;
994
995 my $fcp = new AnyEvent::FCP;
996
997 # let us look at the global request list
998 $fcp->watch_global_ (1);
999
1000 # list them, synchronously
1001 my $req = $fcp->list_persistent_requests;
1002
1003 # go through all requests
1004TODO
1005 for my $req (values %$req) {
1006 # skip jobs not directly-to-disk
1007 next unless $req->{return_type} eq "disk";
1008 # skip jobs not issued by FProxy
1009 next unless $req->{identifier} =~ /^FProxy:/;
1010
1011 if ($req->{data_found}) {
1012 # file has been successfully downloaded
1013
1014 ... move the file away
1015 (left as exercise)
1016
1017 # remove the request
1018
1019 $fcp->remove_request (1, $req->{identifier});
1020 } elsif ($req->{get_failed}) {
1021 # request has failed
1022 if ($req->{get_failed}{code} == 11) {
1023 # too many path components, should restart
1024 } else {
1025 # other failure
1026 }
1027 } else {
1028 # modify priorities randomly, to improve download rates
1029 $fcp->modify_persistent_request (1, $req->{identifier}, undef, int 6 - 5 * (rand) ** 1.7)
1030 if 0.1 > rand;
1031 }
1032 }
1033
1034 # see if the dummy plugin is loaded, to ensure all previous requests have finished.
1035 $fcp->get_plugin_info_sync ("dummy");
392 1036
393=head1 SEE ALSO 1037=head1 SEE ALSO
394 1038
395L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0>, L<Net::FCP>. 1039L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0>, L<Net::FCP>.
396 1040

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines