ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.2 by root, Sat Jul 25 06:28:49 2009 UTC vs.
Revision 1.19 by root, Tue Jun 7 18:53:23 2016 UTC

2 2
3AnyEvent::FCP - freenet client protocol 2.0 3AnyEvent::FCP - freenet client protocol 2.0
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::FCP; 7 use AnyEvent::FCP;
8 8
9 my $fcp = new AnyEvent::FCP; 9 my $fcp = new AnyEvent::FCP;
10 10
11 my $ni = $fcp->txn_node_info->result; 11 # transactions return condvars
12 my $ni = $fcp->node_info; 12 my $lp_cv = $fcp->list_peers;
13 my $pr_cv = $fcp->list_persistent_requests;
14
15 my $peers = $lp_cv->recv;
16 my $reqs = $pr_cv->recv;
13 17
14=head1 DESCRIPTION 18=head1 DESCRIPTION
15 19
16This module implements the freenet client protocol version 2.0, as used by 20This module implements the freenet client protocol version 2.0, as used by
17freenet 0.7. See L<Net::FCP> for the earlier freenet 0.5 version. 21freenet 0.7. See L<Net::FCP> for the earlier freenet 0.5 version.
18 22
19See L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0> for a description 23See L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0> for a
20of what the messages do. 24description of what the messages do.
21 25
22The module uses L<AnyEvent> to find a suitable event module. 26The module uses L<AnyEvent> to find a suitable event module.
23 27
28Only very little is implemented, ask if you need more, and look at the
29example program later in this section.
30
31=head2 EXAMPLE
32
33This example fetches the download list and sets the priority of all files
34with "a" in their name to "emergency":
35
36 use AnyEvent::FCP;
37
38 my $fcp = new AnyEvent::FCP;
39
40 $fcp->watch_global (1, 0);
41 my $req = $fcp->list_persistent_requests;
42
43TODO
44 for my $req (values %$req) {
45 if ($req->{filename} =~ /a/) {
46 $fcp->modify_persistent_request (1, $req->{identifier}, undef, 0);
47 }
48 }
49
24=head2 IMPORT TAGS 50=head2 IMPORT TAGS
25 51
26Nothing much can be "imported" from this module right now. 52Nothing much can be "imported" from this module right now.
27 53
28=head2 FREENET BASICS 54=head1 THE AnyEvent::FCP CLASS
29
30Ok, this section will not explain any freenet basics to you, just some
31problems I found that you might want to avoid:
32 55
33=over 4 56=over 4
34 57
35=item freenet URIs are _NOT_ URIs
36
37Whenever a "uri" is required by the protocol, freenet expects a kind of
38URI prefixed with the "freenet:" scheme, e.g. "freenet:CHK...". However,
39these are not URIs, as freeent fails to parse them correctly, that is, you
40must unescape an escaped characters ("%2c" => ",") yourself. Maybe in the
41future this library will do it for you, so watch out for this incompatible
42change.
43
44=back
45
46=head2 THE AnyEvent::FCP CLASS
47
48=over 4
49
50=cut 58=cut
51 59
52package AnyEvent::FCP; 60package AnyEvent::FCP;
53 61
54use common::sense; 62use common::sense;
55 63
56use Carp; 64use Carp;
57 65
58our $VERSION = '0.1'; 66our $VERSION = 0.4;
59 67
60use Scalar::Util (); 68use Scalar::Util ();
61 69
62use AnyEvent; 70use AnyEvent;
63use AnyEvent::Handle; 71use AnyEvent::Handle;
72use AnyEvent::Util ();
73
74our %TOLC; # tolc cache
64 75
65sub touc($) { 76sub touc($) {
66 local $_ = shift; 77 local $_ = shift;
67 1 while s/((?:^|_)(?:svk|chk|uri|fcp)(?:_|$))/\U$1/; 78 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
68 s/(?:^|_)(.)/\U$1/g; 79 s/(?:^|_)(.)/\U$1/g;
69 $_ 80 $_
70} 81}
71 82
72sub tolc($) { 83sub tolc($) {
73 local $_ = shift; 84 local $_ = shift;
74 1 while s/(SVK|CHK|URI|FCP)([^_])/$1\_$2/i; 85 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/;
75 1 while s/([^_])(SVK|CHK|URI|FCP)/$1\_$2/i; 86 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/;
76 s/(?<=[a-z])(?=[A-Z])/_/g; 87 s/(?<=[a-z])(?=[A-Z])/_/g;
77 lc 88 lc
78} 89}
79 90
80=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 91=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, name => $name]
81 92
82Create a new FCP connection to the given host and port (default 93Create a new FCP connection to the given host and port (default
83127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). 94127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
84 95
85If no C<name> was specified, then AnyEvent::FCP will generate a (hopefully) 96If no C<name> was specified, then AnyEvent::FCP will generate a
86unique client name for you. 97(hopefully) unique client name for you.
87
88#TODO
89#You can install a progress callback that is being called with the AnyEvent::FCP
90#object, a txn object, the type of the transaction and the attributes. Use
91#it like this:
92#
93# sub progress_cb {
94# my ($self, $txn, $type, $attr) = @_;
95#
96# warn "progress<$txn,$type," . (join ":", %$attr) . ">\n";
97# }
98 98
99=cut 99=cut
100 100
101sub new { 101sub new {
102 my $class = shift; 102 my $class = shift;
103
104 my $rand = join "", map chr 0x21 + rand 94, 1..40; # ~ 262 bits entropy
105
103 my $self = bless { @_ }, $class; 106 my $self = bless {
104 107 host => $ENV{FREDHOST} || "127.0.0.1",
105 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 108 port => $ENV{FREDPORT} || 9481,
106 $self->{port} ||= $ENV{FREDPORT} || 9481; 109 timeout => 3600 * 2,
107 $self->{name} ||= time.rand.rand.rand; # lame 110 keepalive => 9 * 60,
108 $self->{timeout} ||= 600; 111 name => time.rand.rand.rand, # lame
109 112 @_,
110 $self->{id} = "a0"; 113 queue => [],
114 req => {},
115 prefix => "..:aefcpid:$rand:",
116 idseq => "a0",
117 }, $class;
111 118
112 { 119 {
113 Scalar::Util::weaken (my $self = $self); 120 Scalar::Util::weaken (my $self = $self);
121
122 $self->{kw} = AE::timer $self->{keepalive}, $self->{keepalive}, sub {
123 $self->{hdl}->push_write ("\n");
124 };
125
126 our $ENDMESSAGE = qr<\012(EndMessage|Data)\012>;
127
128 # these are declared here for performance reasons
129 my ($k, $v, $type);
130 my $rdata;
131
132 my $on_read = sub {
133 my ($hdl) = @_;
134
135 # we only carve out whole messages here
136 while ($hdl->{rbuf} =~ /\012(EndMessage|Data)\012/) {
137 # remember end marker
138 $rdata = $1 eq "Data"
139 or $1 eq "EndMessage"
140 or return $self->fatal ("protocol error, expected message end, got $1\n");
141
142 my @lines = split /\012/, substr $hdl->{rbuf}, 0, $-[0];
143
144 substr $hdl->{rbuf}, 0, $+[0], ""; # remove pkg
145
146 $type = shift @lines;
147 $type = ($TOLC{$type} ||= tolc $type);
148
149 my %kv;
150
151 for (@lines) {
152 ($k, $v) = split /=/, $_, 2;
153 $k = ($TOLC{$k} ||= tolc $k);
154
155 if ($k =~ /\./) {
156 # generic, slow case
157 my @k = split /\./, $k;
158 my $ro = \\%kv;
159
160 while (@k) {
161 $k = shift @k;
162 if ($k =~ /^\d+$/) {
163 $ro = \$$ro->[$k];
164 } else {
165 $ro = \$$ro->{$k};
166 }
167 }
168
169 $$ro = $v;
170
171 next;
172 }
173
174 # special comon case, for performance only
175 $kv{$k} = $v;
176 }
177
178 if ($rdata) {
179 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
180 $rdata = \$_[1];
181 $self->recv ($type, \%kv, $rdata);
182 });
183
184 last; # do not tgry to parse more messages
185 } else {
186 $self->recv ($type, \%kv);
187 }
188 }
189 };
114 190
115 $self->{hdl} = new AnyEvent::Handle 191 $self->{hdl} = new AnyEvent::Handle
116 connect => [$self->{host} => $self->{port}], 192 connect => [$self->{host} => $self->{port}],
117 timeout => $self->{timeout}, 193 timeout => $self->{timeout},
194 on_read => $on_read,
195 on_eof => $self->{on_eof},
118 on_error => sub { 196 on_error => sub {
119 warn "<@_>\n"; 197 $self->fatal ($_[2]);
120 exit 1;
121 }, 198 },
122 on_read => sub { $self->on_read (@_) }, 199 ;
123 on_eof => $self->{on_eof} || sub { };
124 200
125 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 201 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
126 } 202 }
127 203
128 $self->send_msg ( 204 $self->send_msg (client_hello =>
129 client_hello =>
130 name => $self->{name}, 205 name => $self->{name},
131 expected_version => "2.0", 206 expected_version => "2.0",
132 ); 207 );
133 208
134 $self 209 $self
135} 210}
136 211
137sub progress { 212sub fatal {
138 my ($self, $txn, $type, $attr) = @_; 213 my ($self, $msg) = @_;
139 214
140 $self->{progress}->($self, $txn, $type, $attr) 215 $self->{hdl}->shutdown;
141 if $self->{progress}; 216 delete $self->{kw};
217
218 if ($self->{on_error}) {
219 $self->{on_error}->($msg);
220 } else {
221 die $msg;
222 }
223}
224
225sub identifier {
226 $_[0]{prefix} . ++$_[0]{idseq}
142} 227}
143 228
144sub send_msg { 229sub send_msg {
145 my ($self, $type, %kv) = @_; 230 my ($self, $type, %kv) = @_;
146 231
147 my $data = delete $kv{data}; 232 my $data = delete $kv{data};
148 233
149 if (exists $kv{id_cb}) { 234 if (exists $kv{id_cb}) {
150 my $id = $kv{identifier} || ++$self->{id}; 235 my $id = $kv{identifier} ||= $self->identifier;
151 $self->{id}{$id} = delete $kv{id_cb}; 236 $self->{id}{$id} = delete $kv{id_cb};
152 $kv{identifier} = $id;
153 } 237 }
154 238
155 my $msg = (touc $type) . "\012" 239 my $msg = (touc $type) . "\012"
156 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; 240 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
157 241
169 } 253 }
170 254
171 $self->{hdl}->push_write ($msg); 255 $self->{hdl}->push_write ($msg);
172} 256}
173 257
174sub on_read { 258sub on {
175 my ($self) = @_; 259 my ($self, $cb) = @_;
176 260
177 my $type; 261 # cb return undef - message eaten, remove cb
178 my %kv; 262 # cb return 0 - message eaten
179 my $rdata; 263 # cb return 1 - pass to next
180 264
181 my $done_cb = sub { 265 push @{ $self->{on} }, $cb;
182 $kv{pkt_type} = $type; 266}
183 267
184 if (my $cb = $self->{queue}[0]) { 268sub _push_queue {
185 $cb->($self, $type, \%kv, $rdata) 269 my ($self, $queue) = @_;
186 and shift @{ $self->{queue} }; 270
187 } else { 271 shift @$queue;
188 $self->default_recv ($type, \%kv, $rdata); 272 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
273 if @$queue;
274}
275
276# lock so only one $type (arbitrary string) is in flight,
277# to work around horribly misdesigned protocol.
278sub serialise {
279 my ($self, $type, $cb) = @_;
280
281 my $queue = $self->{serialise}{$type} ||= [];
282 push @$queue, $cb;
283 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
284 unless $#$queue;
285}
286
287# how to merge these types into $self->{persistent}
288our %PERSISTENT_TYPE = (
289 persistent_get => sub { %{ $_[1] } = (type => "persistent_get" , %{ $_[2] }) },
290 persistent_put => sub { %{ $_[1] } = (type => "persistent_put" , %{ $_[2] }) },
291 persistent_put_dir => sub { %{ $_[1] } = (type => "persistent_put_dir", %{ $_[2] }) },
292 persistent_request_modified => sub { %{ $_[1] } = (%{ $_[1] }, %{ $_[2] }) },
293 persistent_request_removed => sub { delete $_[0]{req}{$_[2]{identifier}} },
294
295 simple_progress => sub { $_[1]{simple_progress} = $_[2] }, # get/put
296
297 uri_generated => sub { $_[1]{uri_generated} = $_[2] }, # put
298 generated_metadata => sub { $_[1]{generated_metadata} = $_[2] }, # put
299 started_compression => sub { $_[1]{started_compression} = $_[2] }, # put
300 finished_compression => sub { $_[1]{finished_compression} = $_[2] }, # put
301 put_fetchable => sub { $_[1]{put_fetchable} = $_[2] }, # put
302 put_failed => sub { $_[1]{put_failed} = $_[2] }, # put
303 put_successful => sub { $_[1]{put_successful} = $_[2] }, # put
304
305 sending_to_network => sub { $_[1]{sending_to_network} = $_[2] }, # get
306 compatibility_mode => sub { $_[1]{compatibility_mode} = $_[2] }, # get
307 expected_hashes => sub { $_[1]{expected_hashes} = $_[2] }, # get
308 expected_mime => sub { $_[1]{expected_mime} = $_[2] }, # get
309 expected_data_length => sub { $_[1]{expected_data_length} = $_[2] }, # get
310 get_failed => sub { $_[1]{get_failed} = $_[2] }, # get
311 data_found => sub { $_[1]{data_found} = $_[2] }, # get
312 enter_finite_cooldown => sub { $_[1]{enter_finite_cooldown} = $_[2] }, # get
313);
314
315sub recv {
316 my ($self, $type, $kv, @extra) = @_;
317
318 if (my $cb = $PERSISTENT_TYPE{$type}) {
319 my $id = $kv->{identifier};
320 my $req = $_[0]{req}{$id} ||= {};
321 $cb->($self, $req, $kv);
322 $self->recv (request_changed => $kv, $type, @extra);
323 }
324
325 my $on = $self->{on};
326 for (0 .. $#$on) {
327 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
328 splice @$on, $_, 1 unless defined $res;
329 return;
189 } 330 }
190 }; 331 }
191 332
192 my $hdr_cb; $hdr_cb = sub { 333 if (my $cb = $self->{queue}[0]) {
193 if ($_[1] =~ /^([^=]+)=(.*)$/) { 334 $cb->($self, $type, $kv, @extra)
194 my ($k, $v) = ($1, $2); 335 and shift @{ $self->{queue} };
195 my @k = split /\./, tolc $k;
196 my $ro = \\%kv;
197
198 while (@k) {
199 my $k = shift @k;
200 if ($k =~ /^\d+$/) {
201 $ro = \$$ro->[$k];
202 } else {
203 $ro = \$$ro->{$k};
204 }
205 }
206
207 $$ro = $v;
208
209 $_[0]->push_read (line => $hdr_cb);
210 } elsif ($_[1] eq "Data") {
211 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
212 $rdata = \$_[1];
213 $done_cb->();
214 });
215 } elsif ($_[1] eq "EndMessage") {
216 $done_cb->();
217 } else { 336 } else {
218 die "protocol error, expected message end, got $_[1]\n";#d# 337 $self->default_recv ($type, $kv, @extra);
219 }
220 }; 338 }
221
222 $self->{hdl}->push_read (line => sub {
223 $type = tolc $_[1];
224 $_[0]->push_read (line => $hdr_cb);
225 });
226} 339}
227 340
228sub default_recv { 341sub default_recv {
229 my ($self, $type, $kv, $rdata) = @_; 342 my ($self, $type, $kv, $rdata) = @_;
230 343
231 if ($type eq "node_hello") { 344 if ($type eq "node_hello") {
232 $self->{node_hello} = $kv; 345 $self->{node_hello} = $kv;
233 } elsif (exists $self->{id}{$kv->{identifier}}) { 346 } elsif (exists $self->{id}{$kv->{identifier}}) {
234 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) 347 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
235 and delete $self->{id}{$kv->{identifier}}; 348 and delete $self->{id}{$kv->{identifier}};
236 } else {
237 # on_warn
238 #warn "protocol warning (unexpected $type message)\n";
239 } 349 }
240} 350}
351
352=back
353
354=head2 FCP REQUESTS
355
356The following methods implement various requests. Most of them map
357directory to the FCP message of the same name. The added benefit of
358these over sending requests yourself is that they handle the necessary
359serialisation, protocol quirks, and replies.
360
361All of them exist in two versions, the variant shown in this manpage, and
362a variant with an extra C<_> at the end, and an extra C<$cb> argument. The
363version as shown is I<synchronous> - it will wait for any replies, and
364either return the reply, or croak with an error. The underscore variant
365returns immediately and invokes one or more callbacks or condvars later.
366
367For example, the call
368
369 $info = $fcp->get_plugin_info ($name, $detailed);
370
371Also comes in this underscore variant:
372
373 $fcp->get_plugin_info_ ($name, $detailed, $cb);
374
375You can thinbk of the underscore as a kind of continuation indicator - the
376normal function waits and returns with the data, the C<_> indicates that
377you pass the continuation yourself, and the continuation will be invoked
378with the results.
379
380This callback/continuation argument (C<$cb>) can come in three forms itself:
381
382=over 4
383
384=item A code reference (or rather anything not matching some other alternative)
385
386This code reference will be invoked with the result on success. On an
387error, it will die (in the event loop) with a backtrace of the call site.
388
389This is a popular choice, but it makes handling errors hard - make sure
390you never generate protocol errors!
391
392=item A condvar (as returned by e.g. C<< AnyEvent->condvar >>)
393
394When a condvar is passed, it is sent (C<< $cv->send ($results) >>) the
395results when the request has finished. Should an error occur, the error
396will instead result in C<< $cv->croak ($error) >>.
397
398This is also a popular choice.
399
400=item An array with two callbacks C<[$success, $failure]>
401
402The C<$success> callback will be invoked with the results, while the
403C<$failure> callback will be invoked on any errors.
404
405=item C<undef>
406
407This is the same thing as specifying C<sub { }> as callback, i.e. on
408success, the results are ignored, while on failure, you the module dies
409with a backtrace.
410
411This is good for quick scripts, or when you really aren't interested in
412the results.
413
414=back
415
416=cut
417
418our $NOP_CB = sub { };
241 419
242sub _txn { 420sub _txn {
243 my ($name, $sub) = @_; 421 my ($name, $sub) = @_;
244 422
245 *{$name} = sub { 423 *{$name} = sub {
246 splice @_, 1, 0, (my $cv = AnyEvent->condvar); 424 my $cv = AE::cv;
247 &$sub;
248 $cv
249 };
250 425
251 *{"$name\_sync"} = sub { 426 splice @_, 1, 0, $cv, sub { $cv->croak ($_[0]{extra_description}) };
252 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
253 &$sub; 427 &$sub;
254 $cv->recv 428 $cv->recv
255 }; 429 };
430
431 *{"$name\_"} = sub {
432 my ($ok, $err) = pop;
433
434 if (ARRAY:: eq ref $ok) {
435 ($ok, $err) = @$ok;
436 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
437 $err = sub { $ok->croak ($_[0]{extra_description}) };
438 } else {
439 my $bt = Carp::longmess "";
440 $err = sub {
441 die "$_[0]{code_description} ($_[0]{extra_description})$bt";
442 };
443 }
444
445 $ok ||= $NOP_CB;
446
447 splice @_, 1, 0, $ok, $err;
448 &$sub;
449 };
256} 450}
451
452=over 4
453
454=item $peers = $fcp->list_peers ([$with_metdata[, $with_volatile]])
455
456=cut
257 457
258_txn list_peers => sub { 458_txn list_peers => sub {
259 my ($self, $cv, $with_metadata, $with_volatile) = @_; 459 my ($self, $ok, undef, $with_metadata, $with_volatile) = @_;
260 460
261 my @res; 461 my @res;
262 462
263 $self->send_msg (list_peers => 463 $self->send_msg (list_peers =>
264 with_metadata => $with_metadata ? "true" : "false", 464 with_metadata => $with_metadata ? "true" : "false",
265 with_volatile => $with_volatile ? "true" : "false", 465 with_volatile => $with_volatile ? "true" : "false",
266 id_cb => sub { 466 id_cb => sub {
267 my ($self, $type, $kv, $rdata) = @_; 467 my ($self, $type, $kv, $rdata) = @_;
268 468
269 if ($type eq "end_list_peers") { 469 if ($type eq "end_list_peers") {
270 $cv->(\@res); 470 $ok->(\@res);
271 1 471 1
272 } else { 472 } else {
273 push @res, $kv; 473 push @res, $kv;
274 0 474 0
275 } 475 }
276 }, 476 },
277 ); 477 );
278}; 478};
279 479
480=item $notes = $fcp->list_peer_notes ($node_identifier)
481
482=cut
483
280_txn list_peer_notes => sub { 484_txn list_peer_notes => sub {
281 my ($self, $cv, $node_identifier) = @_; 485 my ($self, $ok, undef, $node_identifier) = @_;
282 486
283 $self->send_msg (list_peer_notes => 487 $self->send_msg (list_peer_notes =>
284 node_identifier => $node_identifier, 488 node_identifier => $node_identifier,
285 id_cb => sub { 489 id_cb => sub {
286 my ($self, $type, $kv, $rdata) = @_; 490 my ($self, $type, $kv, $rdata) = @_;
287 491
288 $cv->($kv); 492 $ok->($kv);
289 1 493 1
290 }, 494 },
291 ); 495 );
292}; 496};
293 497
498=item $fcp->watch_global ($enabled[, $verbosity_mask])
499
500=cut
501
294_txn watch_global => sub { 502_txn watch_global => sub {
295 my ($self, $cv, $enabled, $verbosity_mask) = @_; 503 my ($self, $ok, $err, $enabled, $verbosity_mask) = @_;
296 504
297 $self->send_msg (watch_global => 505 $self->send_msg (watch_global =>
298 enabled => $enabled ? "true" : "false", 506 enabled => $enabled ? "true" : "false",
299 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (), 507 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
300 ); 508 );
301 509
302 $cv->(); 510 $ok->();
303}; 511};
304 512
513=item $reqs = $fcp->list_persistent_requests
514
515=cut
516
305_txn list_persistent_requests => sub { 517_txn list_persistent_requests => sub {
306 my ($self, $cv) = @_; 518 my ($self, $ok, $err) = @_;
307 519
520 $self->serialise (list_persistent_requests => sub {
521 my ($self, $guard) = @_;
522
308 my %res; 523 my @res;
309 524
310 $self->send_msg ("list_persistent_requests"); 525 $self->send_msg ("list_persistent_requests");
311 526
312 push @{ $self->{queue} }, sub { 527 $self->on (sub {
313 my ($self, $type, $kv, $rdata) = @_; 528 my ($self, $type, $kv, $rdata) = @_;
314 529
530 $guard if 0;
531
315 if ($type eq "end_list_persistent_requests") { 532 if ($type eq "end_list_persistent_requests") {
316 $cv->(\%res); 533 $ok->(\@res);
534 return;
535 } else {
536 my $id = $kv->{identifier};
537
538 if ($type =~ /^persistent_(get|put|put_dir)$/) {
539 push @res, [$type, $kv];
540 }
541 }
542
317 1 543 1
318 } else { 544 });
319 my $id = $kv->{identifier}; 545 });
546};
320 547
321 if ($type =~ /^persistent_(get|put|put_dir)$/) { 548=item $sync = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
322 $res{$id} = { 549
323 type => $1, 550Update either the C<client_token> or C<priority_class> of a request
324 %{ $res{$id} }, 551identified by C<$global> and C<$identifier>, depending on which of
552C<$client_token> and C<$priority_class> are not C<undef>.
553
554=cut
555
556_txn modify_persistent_request => sub {
557 my ($self, $ok, $err, $global, $identifier, $client_token, $priority_class) = @_;
558
559 $self->serialise ($identifier => sub {
560 my ($self, $guard) = @_;
561
562 $self->send_msg (modify_persistent_request =>
563 global => $global ? "true" : "false",
564 identifier => $identifier,
565 defined $client_token ? (client_token => $client_token ) : (),
566 defined $priority_class ? (priority_class => $priority_class) : (),
567 );
568
569 $self->on (sub {
570 my ($self, $type, $kv, @extra) = @_;
571
572 $guard if 0;
573
574 if ($kv->{identifier} eq $identifier) {
575 if ($type eq "persistent_request_modified") {
325 %$kv, 576 $ok->($kv);
577 return;
578 } elsif ($type eq "protocol_error") {
579 $err->($kv);
580 return;
326 }; 581 }
327 } elsif ($type eq "simple_progress") {
328 delete $kv->{pkt_type}; # save memory
329 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
330 } else {
331 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
332 } 582 }
583
333 0 584 1
334 } 585 });
335 }; 586 });
336}; 587};
337 588
338_txn remove_request => sub { 589=item $info = $fcp->get_plugin_info ($name, $detailed)
339 my ($self, $cv, $global, $identifier) = @_;
340 590
341 $self->send_msg (remove_request => 591=cut
342 global => $global ? "true" : "false",
343 identifier => $identifier,
344 id_cb => sub {
345 my ($self, $type, $kv, $rdata) = @_;
346
347 $cv->($kv);
348 1
349 },
350 );
351
352 $cv->();
353};
354
355_txn modify_persistent_request => sub {
356 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
357
358 $self->send_msg (modify_persistent_request =>
359 global => $global ? "true" : "false",
360 identifier => $identifier,
361 defined $client_token ? (client_token => $client_token ) : (),
362 defined $priority_class ? (priority_class => $priority_class) : (),
363 id_cb => sub {
364 my ($self, $type, $kv, $rdata) = @_;
365
366 $cv->($kv);
367 1
368 },
369 );
370
371 $cv->();
372};
373 592
374_txn get_plugin_info => sub { 593_txn get_plugin_info => sub {
375 my ($self, $cv, $name, $detailed) = @_; 594 my ($self, $ok, $err, $name, $detailed) = @_;
595
596 my $id = $self->identifier;
376 597
377 $self->send_msg (get_plugin_info => 598 $self->send_msg (get_plugin_info =>
599 identifier => $id,
378 plugin_name => $name, 600 plugin_name => $name,
379 detailed => $detailed ? "true" : "false", 601 detailed => $detailed ? "true" : "false",
380 id_cb => sub { 602 );
603 $self->on (sub {
604 my ($self, $type, $kv) = @_;
605
606 if ($kv->{identifier} eq $id) {
607 if ($type eq "get_plugin_info") {
608 $ok->($kv);
609 } else {
610 $err->($kv, $type);
611 }
612 return;
613 }
614
615 1
616 });
617};
618
619=item $status = $fcp->client_get ($uri, $identifier, %kv)
620
621%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
622
623ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
624priority_class, persistence, client_token, global, return_type,
625binary_blob, allowed_mime_types, filename, temp_filename
626
627=cut
628
629_txn client_get => sub {
630 my ($self, $ok, $err, $uri, $identifier, %kv) = @_;
631
632 $self->serialise ($identifier => sub {
633 my ($self, $guard) = @_;
634
635 $self->send_msg (client_get =>
636 %kv,
637 uri => $uri,
638 identifier => $identifier,
639 );
640
641 $self->on (sub {
381 my ($self, $type, $kv, $rdata) = @_; 642 my ($self, $type, $kv, @extra) = @_;
382 643
644 $guard if 0;
645
646 if ($kv->{identifier} eq $identifier) {
647 if ($type eq "persistent_get") {
383 $cv->($kv); 648 $ok->($kv);
649 return;
650 } elsif ($type eq "protocol_error") {
651 $err->($kv);
652 return;
653 }
654 }
655
384 1 656 1
657 });
658 });
659};
660
661=item $status = $fcp->remove_request ($identifier[, $global])
662
663Remove the request with the given isdentifier. Returns true if successful,
664false on error.
665
666=cut
667
668_txn remove_request => sub {
669 my ($self, $ok, $err, $identifier, $global) = @_;
670
671 $self->serialise ($identifier => sub {
672 my ($self, $guard) = @_;
673
674 $self->send_msg (remove_request =>
675 identifier => $identifier,
676 global => $global ? "true" : "false",
677 );
678 $self->on (sub {
679 my ($self, $type, $kv, @extra) = @_;
680
681 $guard if 0;
682
683 if ($kv->{identifier} eq $identifier) {
684 if ($type eq "persistent_request_removed") {
685 $ok->(1);
686 return;
687 } elsif ($type eq "protocol_error") {
688 $err->($kv);
689 return;
690 }
691 }
692
693 1
694 });
695 });
696};
697
698=item ($can_read, $can_write) = $fcp->test_dda ($local_directory, $remote_directory, $want_read, $want_write))
699
700The DDA test in FCP is probably the single most broken protocol - only
701one directory test can be outstanding at any time, and some guessing and
702heuristics are involved in mangling the paths.
703
704This function combines C<TestDDARequest> and C<TestDDAResponse> in one
705request, handling file reading and writing as well, and tries very hard to
706do the right thing.
707
708Both C<$local_directory> and C<$remote_directory> must specify the same
709directory - C<$local_directory> is the directory path on the client (where
710L<AnyEvent::FCP> runs) and C<$remote_directory> is the directory path on
711the server (where the freenet node runs). When both are running on the
712same node, the paths are generally identical.
713
714C<$want_read> and C<$want_write> should be set to a true value when you
715want to read (get) files or write (put) files, respectively.
716
717On error, an exception is thrown. Otherwise, C<$can_read> and
718C<$can_write> indicate whether you can reaqd or write to freenet via the
719directory.
720
721=cut
722
723_txn test_dda => sub {
724 my ($self, $ok, $err, $local, $remote, $want_read, $want_write) = @_;
725
726 $self->serialise (test_dda => sub {
727 my ($self, $guard) = @_;
728
729 $self->send_msg (test_dda_request =>
730 directory => $remote,
731 want_read_directory => $want_read ? "true" : "false",
732 want_write_directory => $want_write ? "true" : "false",
733 );
734 $self->on (sub {
735 my ($self, $type, $kv) = @_;
736
737 if ($type eq "test_dda_reply") {
738 # the filenames are all relative to the server-side directory,
739 # which might or might not match $remote anymore, so we
740 # need to rewrite the paths to be relative to $local
741 for my $k (qw(read_filename write_filename)) {
742 my $f = $kv->{$k};
743 for my $dir ($kv->{directory}, $remote) {
744 if ($dir eq substr $f, 0, length $dir) {
745 substr $f, 0, 1 + length $dir, "";
746 $kv->{$k} = $f;
747 last;
748 }
749 }
750 }
751
752 my %response = (directory => $remote);
753
754 if (length $kv->{read_filename}) {
755 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
756 sysread $fh, my $buf, -s $fh;
757 $response{read_content} = $buf;
758 }
759 }
760
761 if (length $kv->{write_filename}) {
762 if (open my $fh, ">:raw", "$local/$kv->{write_filename}") {
763 syswrite $fh, $kv->{content_to_write};
764 }
765 }
766
767 $self->send_msg (test_dda_response => %response);
768
769 $self->on (sub {
770 my ($self, $type, $kv) = @_;
771
772 $guard if 0; # reference
773
774 if ($type eq "test_dda_complete") {
775 $ok->(
776 $kv->{read_directory_allowed} eq "true",
777 $kv->{write_directory_allowed} eq "true",
778 );
779 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
780 $err->($kv->{extra_description});
781 return;
782 }
783
784 1
785 });
786
787 return;
788 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
789 $err->($kv);
790 return;
791 }
792
793 1
794 });
795 });
796};
797
798=back
799
800=head2 REQUEST CACHE
801
802The C<AnyEvent::FCP> class keeps a request cache, where it caches all
803information from requests.
804
805For these messages, it will store a copy of the key-value pairs, together with a C<type> slot,
806in C<< $fcp->{req}{$identifier} >>:
807
808 persistent_get
809 persistent_put
810 persistent_put_dir
811
812This message updates the stored data:
813
814 persistent_request_modified
815
816This message will remove this entry:
817
818 persistent_request_removed
819
820These messages get merged into the cache entry, under their
821type, i.e. a C<simple_progress> message will be stored in C<<
822$fcp->{req}{$identifier}{simple_progress} >>:
823
824 simple_progress # get/put
825
826 uri_generated # put
827 generated_metadata # put
828 started_compression # put
829 finished_compression # put
830 put_failed # put
831 put_fetchable # put
832 put_successful # put
833
834 sending_to_network # get
835 compatibility_mode # get
836 expected_hashes # get
837 expected_mime # get
838 expected_data_length # get
839 get_failed # get
840 data_found # get
841 enter_finite_cooldown # get
842
843In addition, an event (basically a fake message) of type C<request_changed> is generated
844on every change, which will be called as C<< $cb->($fcp, $kv, $type) >>, where C<$type>
845is the type of the original message triggering the change,
846
847To fill this cache with the global queue and keep it updated,
848call C<watch_global> to subscribe to updates, followed by
849C<list_persistent_requests_sync>.
850
851 $fcp->watch_global_sync_; # do not wait
852 $fcp->list_persistent_requests; # wait
853
854To get a better idea of what is stored in the cache, here is an example of
855what might be stored in C<< $fcp->{req}{"Frost-gpl.txt"} >>:
856
857 {
858 identifier => "Frost-gpl.txt",
859 uri => 'CHK@Fnx5kzdrfE,EImdzaVyEWl,AAIC--8/gpl.txt',
860 binary_blob => "false",
861 global => "true",
862 max_retries => -1,
863 max_size => 9223372036854775807,
864 persistence => "forever",
865 priority_class => 3,
866 real_time => "false",
867 return_type => "direct",
868 started => "true",
869 type => "persistent_get",
870 verbosity => 2147483647,
871 sending_to_network => {
872 identifier => "Frost-gpl.txt",
873 global => "true",
385 }, 874 },
386 ); 875 compatibility_mode => {
876 identifier => "Frost-gpl.txt",
877 definitive => "true",
878 dont_compress => "false",
879 global => "true",
880 max => "COMPAT_1255",
881 min => "COMPAT_1255",
882 },
883 expected_hashes => {
884 identifier => "Frost-gpl.txt",
885 global => "true",
886 hashes => {
887 ed2k => "d83596f5ee3b7...",
888 md5 => "e0894e4a2a6...",
889 sha1 => "...",
890 sha256 => "...",
891 sha512 => "...",
892 tth => "...",
893 },
894 },
895 expected_mime => {
896 identifier => "Frost-gpl.txt",
897 global => "true",
898 metadata => { content_type => "application/rar" },
899 },
900 expected_data_length => {
901 identifier => "Frost-gpl.txt",
902 data_length => 37576,
903 global => "true",
904 },
905 simple_progress => {
906 identifier => "Frost-gpl.txt",
907 failed => 0,
908 fatally_failed => 0,
909 finalized_total => "true",
910 global => "true",
911 last_progress => 1438639282628,
912 required => 372,
913 succeeded => 102,
914 total => 747,
915 },
916 data_found => {
917 identifier => "Frost-gpl.txt",
918 completion_time => 1438663354026,
919 data_length => 37576,
920 global => "true",
921 metadata => { content_type => "image/jpeg" },
922 startup_time => 1438657196167,
923 },
924 }
387 925
388 $cv->(); 926=head1 EXAMPLE PROGRAM
389};
390 927
391=back 928 use AnyEvent::FCP;
929
930 my $fcp = new AnyEvent::FCP;
931
932 # let us look at the global request list
933 $fcp->watch_global_ (1);
934
935 # list them, synchronously
936 my $req = $fcp->list_persistent_requests;
937
938 # go through all requests
939TODO
940 for my $req (values %$req) {
941 # skip jobs not directly-to-disk
942 next unless $req->{return_type} eq "disk";
943 # skip jobs not issued by FProxy
944 next unless $req->{identifier} =~ /^FProxy:/;
945
946 if ($req->{data_found}) {
947 # file has been successfully downloaded
948
949 ... move the file away
950 (left as exercise)
951
952 # remove the request
953
954 $fcp->remove_request (1, $req->{identifier});
955 } elsif ($req->{get_failed}) {
956 # request has failed
957 if ($req->{get_failed}{code} == 11) {
958 # too many path components, should restart
959 } else {
960 # other failure
961 }
962 } else {
963 # modify priorities randomly, to improve download rates
964 $fcp->modify_persistent_request (1, $req->{identifier}, undef, int 6 - 5 * (rand) ** 1.7)
965 if 0.1 > rand;
966 }
967 }
968
969 # see if the dummy plugin is loaded, to ensure all previous requests have finished.
970 $fcp->get_plugin_info_sync ("dummy");
392 971
393=head1 SEE ALSO 972=head1 SEE ALSO
394 973
395L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0>, L<Net::FCP>. 974L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0>, L<Net::FCP>.
396 975

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines