ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.9 by root, Tue Aug 4 00:35:16 2015 UTC vs.
Revision 1.13 by root, Sat Aug 8 04:07:28 2015 UTC

35 35
36 use AnyEvent::FCP; 36 use AnyEvent::FCP;
37 37
38 my $fcp = new AnyEvent::FCP; 38 my $fcp = new AnyEvent::FCP;
39 39
40 $fcp->watch_global_sync (1, 0); 40 $fcp->watch_global (1, 0);
41 my $req = $fcp->list_persistent_requests_sync; 41 my $req = $fcp->list_persistent_requests;
42 42
43TODO
43 for my $req (values %$req) { 44 for my $req (values %$req) {
44 if ($req->{filename} =~ /a/) { 45 if ($req->{filename} =~ /a/) {
45 $fcp->modify_persistent_request_sync (1, $req->{identifier}, undef, 0); 46 $fcp->modify_persistent_request (1, $req->{identifier}, undef, 0);
46 } 47 }
47 } 48 }
48 49
49=head2 IMPORT TAGS 50=head2 IMPORT TAGS
50 51
51Nothing much can be "imported" from this module right now. 52Nothing much can be "imported" from this module right now.
52 53
53=head2 THE AnyEvent::FCP CLASS 54=head1 THE AnyEvent::FCP CLASS
54 55
55=over 4 56=over 4
56 57
57=cut 58=cut
58 59
77 $_ 78 $_
78} 79}
79 80
80sub tolc($) { 81sub tolc($) {
81 local $_ = shift; 82 local $_ = shift;
82 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/i; 83 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/;
83 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/i; 84 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/;
84 s/(?<=[a-z])(?=[A-Z])/_/g; 85 s/(?<=[a-z])(?=[A-Z])/_/g;
85 lc 86 lc
86} 87}
87 88
88=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 89=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, name => $name]
89 90
90Create a new FCP connection to the given host and port (default 91Create a new FCP connection to the given host and port (default
91127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). 92127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
92 93
93If no C<name> was specified, then AnyEvent::FCP will generate a 94If no C<name> was specified, then AnyEvent::FCP will generate a
94(hopefully) unique client name for you. 95(hopefully) unique client name for you.
95 96
96You can install a progress callback that is being called with the AnyEvent::FCP
97object, the type, a hashref with key-value pairs and a reference to any received data,
98for all unsolicited messages.
99
100Example:
101
102 sub progress_cb {
103 my ($self, $type, $kv, $rdata) = @_;
104
105 if ($type eq "simple_progress") {
106 warn "$kv->{identifier} $kv->{succeeded}/$kv->{required}\n";
107 }
108 }
109
110=cut 97=cut
111 98
112sub new { 99sub new {
113 my $class = shift; 100 my $class = shift;
114 my $self = bless { @_ }, $class; 101 my $self = bless {
115 102 host => $ENV{FREDHOST} || "127.0.0.1",
116 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 103 port => $ENV{FREDPORT} || 9481,
117 $self->{port} ||= $ENV{FREDPORT} || 9481; 104 timeout => 3600 * 2,
118 $self->{name} ||= time.rand.rand.rand; # lame 105 name => time.rand.rand.rand, # lame
119 $self->{timeout} ||= 3600*2; 106 @_,
120 $self->{progress} ||= sub { }; 107 queue => [],
121 108 req => {},
122 $self->{id} = "a0"; 109 id => "a0",
110 }, $class;
123 111
124 { 112 {
125 Scalar::Util::weaken (my $self = $self); 113 Scalar::Util::weaken (my $self = $self);
126 114
127 $self->{hdl} = new AnyEvent::Handle 115 $self->{hdl} = new AnyEvent::Handle
135 on_eof => $self->{on_eof} || sub { }; 123 on_eof => $self->{on_eof} || sub { };
136 124
137 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 125 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
138 } 126 }
139 127
140 $self->send_msg ( 128 $self->send_msg (client_hello =>
141 client_hello =>
142 name => $self->{name}, 129 name => $self->{name},
143 expected_version => "2.0", 130 expected_version => "2.0",
144 ); 131 );
145 132
146 $self 133 $self
147} 134}
186} 173}
187 174
188sub _push_queue { 175sub _push_queue {
189 my ($self, $queue) = @_; 176 my ($self, $queue) = @_;
190 177
191 warn "oush @$queue\n";#d#
192 shift @$queue; 178 shift @$queue;
193 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) }) 179 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
194 if @$queue; 180 if @$queue;
195} 181}
196 182
203 push @$queue, $cb; 189 push @$queue, $cb;
204 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) }) 190 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
205 unless $#$queue; 191 unless $#$queue;
206} 192}
207 193
194# how to merge these types into $self->{persistent}
195our %PERSISTENT_TYPE = (
196 persistent_get => sub { %{ $_[1] } = (type => "persistent_get" , %{ $_[2] }) },
197 persistent_put => sub { %{ $_[1] } = (type => "persistent_put" , %{ $_[2] }) },
198 persistent_put_dir => sub { %{ $_[1] } = (type => "persistent_put_dir", %{ $_[2] }) },
199 persistent_request_modified => sub { %{ $_[1] } = (%{ $_[1] }, %{ $_[2] }) },
200 persistent_request_removed => sub { delete $_[0]{req}{$_[2]{identifier}} },
201
202 simple_progress => sub { $_[1]{simple_progress} = $_[2] }, # get/put
203
204 uri_generated => sub { $_[1]{uri_generated} = $_[2] }, # put
205 generated_metadata => sub { $_[1]{generated_metadata} = $_[2] }, # put
206 started_compression => sub { $_[1]{started_compression} = $_[2] }, # put
207 finished_compression => sub { $_[1]{finished_compression} = $_[2] }, # put
208 put_fetchable => sub { $_[1]{put_fetchable} = $_[2] }, # put
209 put_failed => sub { $_[1]{put_failed} = $_[2] }, # put
210 put_successful => sub { $_[1]{put_successful} = $_[2] }, # put
211
212 sending_to_network => sub { $_[1]{sending_to_network} = $_[2] }, # get
213 compatibility_mode => sub { $_[1]{compatibility_mode} = $_[2] }, # get
214 expected_hashes => sub { $_[1]{expected_hashes} = $_[2] }, # get
215 expected_mime => sub { $_[1]{expected_mime} = $_[2] }, # get
216 expected_data_length => sub { $_[1]{expected_data_length} = $_[2] }, # get
217 get_failed => sub { $_[1]{get_failed} = $_[2] }, # get
218 data_found => sub { $_[1]{data_found} = $_[2] }, # get
219 enter_finite_cooldown => sub { $_[1]{enter_finite_cooldown} = $_[2] }, # get
220);
221
222sub recv {
223 my ($self, $type, $kv, @extra) = @_;
224
225 if (my $cb = $PERSISTENT_TYPE{$type}) {
226 my $id = $kv->{identifier};
227 my $req = $_[0]{req}{$id} ||= {};
228 $cb->($self, $req, $kv);
229 $self->recv (request_change => $kv, $type, @extra);
230 }
231
232 my $on = $self->{on};
233 for (0 .. $#$on) {
234 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
235 splice @$on, $_, 1 unless defined $res;
236 return;
237 }
238 }
239
240 if (my $cb = $self->{queue}[0]) {
241 $cb->($self, $type, $kv, @extra)
242 and shift @{ $self->{queue} };
243 } else {
244 $self->default_recv ($type, $kv, @extra);
245 }
246}
247
208sub on_read { 248sub on_read {
209 my ($self) = @_; 249 my ($self) = @_;
210 250
211 my $type; 251 my $type;
212 my %kv; 252 my %kv;
213 my $rdata; 253 my $rdata;
214
215 my $done_cb = sub {
216 $kv{pkt_type} = $type;
217
218 my $on = $self->{on};
219 for (0 .. $#$on) {
220 unless (my $res = $on->[$_]($type, \%kv, $rdata)) {
221 splice @$on, $_, 1 unless defined $res;
222 return;
223 }
224 }
225
226 if (my $cb = $self->{queue}[0]) {
227 $cb->($self, $type, \%kv, $rdata)
228 and shift @{ $self->{queue} };
229 } else {
230 $self->default_recv ($type, \%kv, $rdata);
231 }
232 };
233 254
234 my $hdr_cb; $hdr_cb = sub { 255 my $hdr_cb; $hdr_cb = sub {
235 if ($_[1] =~ /^([^=]+)=(.*)$/) { 256 if ($_[1] =~ /^([^=]+)=(.*)$/) {
236 my ($k, $v) = ($1, $2); 257 my ($k, $v) = ($1, $2);
237 my @k = split /\./, tolc $k; 258 my @k = split /\./, tolc $k;
250 271
251 $_[0]->push_read (line => $hdr_cb); 272 $_[0]->push_read (line => $hdr_cb);
252 } elsif ($_[1] eq "Data") { 273 } elsif ($_[1] eq "Data") {
253 $_[0]->push_read (chunk => delete $kv{data_length}, sub { 274 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
254 $rdata = \$_[1]; 275 $rdata = \$_[1];
255 $done_cb->(); 276 $self->recv ($type, \%kv, $rdata);
256 }); 277 });
257 } elsif ($_[1] eq "EndMessage") { 278 } elsif ($_[1] eq "EndMessage") {
258 $done_cb->(); 279 $self->recv ($type, \%kv);
259 } else { 280 } else {
260 die "protocol error, expected message end, got $_[1]\n";#d# 281 die "protocol error, expected message end, got $_[1]\n";#d#
261 } 282 }
262 }; 283 };
263 284
273 if ($type eq "node_hello") { 294 if ($type eq "node_hello") {
274 $self->{node_hello} = $kv; 295 $self->{node_hello} = $kv;
275 } elsif (exists $self->{id}{$kv->{identifier}}) { 296 } elsif (exists $self->{id}{$kv->{identifier}}) {
276 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) 297 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
277 and delete $self->{id}{$kv->{identifier}}; 298 and delete $self->{id}{$kv->{identifier}};
278 } else {
279 &{ $self->{progress} };
280 } 299 }
281} 300}
301
302=back
303
304=head2 FCP REQUESTS
305
306The following methods implement various requests. Most of them map
307directory to the FCP message of the same name. The added benefit of
308these over sending requests yourself is that they handle the necessary
309serialisation, protocol quirks, and replies.
310
311All of them exist in two versions, the variant shown in this manpage, and
312a variant with an extra C<_> at the end, and an extra C<$cb> argument. The
313version as shown is I<synchronous> - it will wait for any replies, and
314either return the reply, or croak with an error. The underscore variant
315returns immediately and invokes one or more callbacks or condvars later.
316
317For example, the call
318
319 $info = $fcp->get_plugin_info ($name, $detailed);
320
321Also comes in this underscore variant:
322
323 $fcp->get_plugin_info_ ($name, $detailed, $cb);
324
325You can thinbk of the underscore as a kind of continuation indicator - the
326normal function waits and returns with the data, the C<_> indicates that
327you pass the continuation yourself, and the continuation will be invoked
328with the results.
329
330This callback/continuation argument (C<$cb>) can come in three forms itself:
331
332=over 4
333
334=item A code reference (or rather anything not matching some other alternative)
335
336This code reference will be invoked with the result on success. On an
337error, it will die (in the event loop) with a backtrace of the call site.
338
339This is a popular choice, but it makes handling errors hard - make sure
340you never generate protocol errors!
341
342=item A condvar (as returned by e.g. C<< AnyEvent->condvar >>)
343
344When a condvar is passed, it is sent (C<< $cv->send ($results) >>) the
345results when the request has finished. Should an error occur, the error
346will instead result in C<< $cv->croak ($error) >>.
347
348This is also a popular choice.
349
350=item An array with two callbacks C<[$success, $failure]>
351
352The C<$success> callback will be invoked with the results, while the
353C<$failure> callback will be invoked on any errors.
354
355=item C<undef>
356
357This is the same thing as specifying C<sub { }> as callback, i.e. on
358success, the results are ignored, while on failure, you the module dies
359with a backtrace.
360
361This is good for quick scripts, or when you really aren't interested in
362the results.
363
364=back
365
366=cut
367
368our $NOP_CB = sub { };
282 369
283sub _txn { 370sub _txn {
284 my ($name, $sub) = @_; 371 my ($name, $sub) = @_;
285 372
286 *{$name} = sub { 373 *{$name} = sub {
287 splice @_, 1, 0, (my $cv = AnyEvent->condvar); 374 my $cv = AE::cv;
288 &$sub;
289 $cv
290 };
291 375
292 *{"$name\_sync"} = sub { 376 splice @_, 1, 0, $cv, sub { $cv->throw ($_[0]{extra_description}) };
293 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
294 &$sub; 377 &$sub;
295 $cv->recv 378 $cv->recv
296 }; 379 };
297}
298 380
299=item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]]) 381 *{"$name\_"} = sub {
382 my ($ok, $err) = pop;
300 383
384 if (ARRAY:: eq ref $ok) {
385 ($ok, $err) = @$ok;
386 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
387 $err = sub { $ok->throw ($_[0]{extra_description}) };
388 } else {
389 my $bt = Carp::longmess "";
390 $err = sub {
391 die "$_[0]{extra_description}$bt";
392 };
393 }
394
395 $ok ||= $NOP_CB;
396
397 splice @_, 1, 0, $ok, $err;
398 &$sub;
399 };
400}
401
402=over 4
403
301=item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]]) 404=item $peers = $fcp->list_peers ([$with_metdata[, $with_volatile]])
302 405
303=cut 406=cut
304 407
305_txn list_peers => sub { 408_txn list_peers => sub {
306 my ($self, $cv, $with_metadata, $with_volatile) = @_; 409 my ($self, $ok, undef, $with_metadata, $with_volatile) = @_;
307 410
308 my @res; 411 my @res;
309 412
310 $self->send_msg (list_peers => 413 $self->send_msg (list_peers =>
311 with_metadata => $with_metadata ? "true" : "false", 414 with_metadata => $with_metadata ? "true" : "false",
312 with_volatile => $with_volatile ? "true" : "false", 415 with_volatile => $with_volatile ? "true" : "false",
313 id_cb => sub { 416 id_cb => sub {
314 my ($self, $type, $kv, $rdata) = @_; 417 my ($self, $type, $kv, $rdata) = @_;
315 418
316 if ($type eq "end_list_peers") { 419 if ($type eq "end_list_peers") {
317 $cv->(\@res); 420 $ok->(\@res);
318 1 421 1
319 } else { 422 } else {
320 push @res, $kv; 423 push @res, $kv;
321 0 424 0
322 } 425 }
323 }, 426 },
324 ); 427 );
325}; 428};
326 429
327=item $cv = $fcp->list_peer_notes ($node_identifier)
328
329=item $notes = $fcp->list_peer_notes_sync ($node_identifier) 430=item $notes = $fcp->list_peer_notes ($node_identifier)
330 431
331=cut 432=cut
332 433
333_txn list_peer_notes => sub { 434_txn list_peer_notes => sub {
334 my ($self, $cv, $node_identifier) = @_; 435 my ($self, $ok, undef, $node_identifier) = @_;
335 436
336 $self->send_msg (list_peer_notes => 437 $self->send_msg (list_peer_notes =>
337 node_identifier => $node_identifier, 438 node_identifier => $node_identifier,
338 id_cb => sub { 439 id_cb => sub {
339 my ($self, $type, $kv, $rdata) = @_; 440 my ($self, $type, $kv, $rdata) = @_;
340 441
341 $cv->($kv); 442 $ok->($kv);
342 1 443 1
343 }, 444 },
344 ); 445 );
345}; 446};
346 447
347=item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
348
349=item $fcp->watch_global_sync ($enabled[, $verbosity_mask]) 448=item $fcp->watch_global ($enabled[, $verbosity_mask])
350 449
351=cut 450=cut
352 451
353_txn watch_global => sub { 452_txn watch_global => sub {
354 my ($self, $cv, $enabled, $verbosity_mask) = @_; 453 my ($self, $ok, $err, $enabled, $verbosity_mask) = @_;
355 454
356 $self->send_msg (watch_global => 455 $self->send_msg (watch_global =>
357 enabled => $enabled ? "true" : "false", 456 enabled => $enabled ? "true" : "false",
358 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (), 457 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
359 ); 458 );
360 459
361 $cv->(); 460 $ok->();
362}; 461};
363 462
364=item $cv = $fcp->list_persistent_requests
365
366=item $reqs = $fcp->list_persistent_requests_sync 463=item $reqs = $fcp->list_persistent_requests
367 464
368=cut 465=cut
369 466
370_txn list_persistent_requests => sub { 467_txn list_persistent_requests => sub {
371 my ($self, $cv) = @_; 468 my ($self, $ok, $err) = @_;
372 469
470 $self->serialise (list_persistent_requests => sub {
471 my ($self, $guard) = @_;
472
373 my %res; 473 my @res;
374 474
375 $self->send_msg ("list_persistent_requests"); 475 $self->send_msg ("list_persistent_requests");
376 476
377 push @{ $self->{queue} }, sub { 477 $self->on (sub {
378 my ($self, $type, $kv, $rdata) = @_; 478 my ($self, $type, $kv, $rdata) = @_;
379 479
480 $guard if 0;
481
380 if ($type eq "end_list_persistent_requests") { 482 if ($type eq "end_list_persistent_requests") {
381 $cv->(\%res); 483 $ok->(\@res);
484 return;
485 } else {
486 my $id = $kv->{identifier};
487
488 if ($type =~ /^persistent_(get|put|put_dir)$/) {
489 push @res, [$type, $kv];
490 }
491 }
492
382 1 493 1
383 } else { 494 });
384 my $id = $kv->{identifier}; 495 });
496};
385 497
386 if ($type =~ /^persistent_(get|put|put_dir)$/) { 498=item $sync = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
387 $res{$id} = { 499
388 type => $1, 500Update either the C<client_token> or C<priority_class> of a request
389 %{ $res{$id} }, 501identified by C<$global> and C<$identifier>, depending on which of
502C<$client_token> and C<$priority_class> are not C<undef>.
503
504=cut
505
506_txn modify_persistent_request => sub {
507 my ($self, $ok, $err, $global, $identifier, $client_token, $priority_class) = @_;
508
509 $self->serialise ($identifier => sub {
510 my ($self, $guard) = @_;
511
512 $self->send_msg (modify_persistent_request =>
513 global => $global ? "true" : "false",
514 identifier => $identifier,
515 defined $client_token ? (client_token => $client_token ) : (),
516 defined $priority_class ? (priority_class => $priority_class) : (),
517 );
518
519 $self->on (sub {
520 my ($self, $type, $kv, @extra) = @_;
521
522 $guard if 0;
523
524 if ($kv->{identifier} eq $identifier) {
525 if ($type eq "persistent_request_modified") {
390 %$kv, 526 $ok->($kv);
527 return;
528 } elsif ($type eq "protocol_error") {
529 $err->($kv);
530 return;
391 }; 531 }
392 } elsif ($type eq "simple_progress") {
393 delete $kv->{pkt_type}; # save memory
394 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
395 } else {
396 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
397 } 532 }
533
398 0 534 1
399 } 535 });
400 }; 536 });
401}; 537};
402 538
403=item $cv = $fcp->remove_request ($global, $identifier)
404
405=item $status = $fcp->remove_request_sync ($global, $identifier)
406
407=cut
408
409_txn remove_request => sub {
410 my ($self, $cv, $global, $identifier) = @_;
411
412 $self->send_msg (remove_request =>
413 global => $global ? "true" : "false",
414 identifier => $identifier,
415 id_cb => sub {
416 my ($self, $type, $kv, $rdata) = @_;
417
418 $cv->($kv);
419 1
420 },
421 );
422};
423
424=item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
425
426=item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
427
428=cut
429
430_txn modify_persistent_request => sub {
431 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
432
433 $self->send_msg (modify_persistent_request =>
434 global => $global ? "true" : "false",
435 defined $client_token ? (client_token => $client_token ) : (),
436 defined $priority_class ? (priority_class => $priority_class) : (),
437 identifier => $identifier,
438 id_cb => sub {
439 my ($self, $type, $kv, $rdata) = @_;
440
441 $cv->($kv);
442 1
443 },
444 );
445};
446
447=item $cv = $fcp->get_plugin_info ($name, $detailed)
448
449=item $info = $fcp->get_plugin_info_sync ($name, $detailed) 539=item $info = $fcp->get_plugin_info ($name, $detailed)
450 540
451=cut 541=cut
452 542
453_txn get_plugin_info => sub { 543_txn get_plugin_info => sub {
454 my ($self, $cv, $name, $detailed) = @_; 544 my ($self, $ok, $err, $name, $detailed) = @_;
455 545
456 $self->send_msg (get_plugin_info => 546 $self->send_msg (get_plugin_info =>
457 plugin_name => $name, 547 plugin_name => $name,
458 detailed => $detailed ? "true" : "false", 548 detailed => $detailed ? "true" : "false",
459 id_cb => sub { 549 id_cb => sub {
460 my ($self, $type, $kv, $rdata) = @_; 550 my ($self, $type, $kv, $rdata) = @_;
461 551
462 $cv->($kv); 552 $ok->($kv);
463 1 553 1
464 }, 554 },
465 ); 555 );
466}; 556};
467 557
468=item $cv = $fcp->client_get ($uri, $identifier, %kv)
469
470=item $status = $fcp->client_get_sync ($uri, $identifier, %kv) 558=item $status = $fcp->client_get ($uri, $identifier, %kv)
471 559
472%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>). 560%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
473 561
474ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries, 562ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
475priority_class, persistence, client_token, global, return_type, 563priority_class, persistence, client_token, global, return_type,
476binary_blob, allowed_mime_types, filename, temp_filename 564binary_blob, allowed_mime_types, filename, temp_filename
477 565
478=cut 566=cut
479 567
480_txn client_get => sub { 568_txn client_get => sub {
481 my ($self, $cv, $uri, $identifier, %kv) = @_; 569 my ($self, $ok, $err, $uri, $identifier, %kv) = @_;
482 570
571 $self->serialise ($identifier => sub {
572 my ($self, $guard) = @_;
573
483 $self->send_msg (client_get => 574 $self->send_msg (client_get =>
484 %kv, 575 %kv,
485 uri => $uri, 576 uri => $uri,
486 identifier => $identifier, 577 identifier => $identifier,
487 id_cb => sub { 578 );
579
580 $self->on (sub {
488 my ($self, $type, $kv, $rdata) = @_; 581 my ($self, $type, $kv, @extra) = @_;
489 582
583 $guard if 0;
584
585 if ($kv->{identifier} eq $identifier) {
586 if ($type eq "persistent_get") {
490 $cv->($kv); 587 $ok->($kv);
588 return;
589 } elsif ($type eq "protocol_error") {
590 $err->($kv);
591 return;
592 }
593 }
594
491 1 595 1
492 }, 596 });
493 ); 597 });
494}; 598};
495 599
496=item $cv = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write) 600=item $status = $fcp->remove_request ($identifier[, $global])
497 601
602Remove the request with the given isdentifier. Returns true if successful,
603false on error.
604
605=cut
606
607_txn remove_request => sub {
608 my ($self, $ok, $err, $identifier, $global) = @_;
609
610 $self->serialise ($identifier => sub {
611 my ($self, $guard) = @_;
612
613 $self->send_msg (remove_request =>
614 identifier => $identifier,
615 global => $global ? "true" : "false",
616 );
617 $self->on (sub {
618 my ($self, $type, $kv, @extra) = @_;
619
620 $guard if 0;
621
622 if ($kv->{identifier} eq $identifier) {
623 if ($type eq "persistent_request_removed") {
624 $ok->(1);
625 return;
626 } elsif ($type eq "protocol_error") {
627 $err->($kv);
628 return;
629 }
630 }
631
632 1
633 });
634 });
635};
636
498=item ($can_read, $can_write) = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write)) 637=item ($can_read, $can_write) = $fcp->test_dda ($local_directory, $remote_directory, $want_read, $want_write))
499 638
500The DDA test in FCP is probably the single most broken protocol - only 639The DDA test in FCP is probably the single most broken protocol - only
501one directory test can be outstanding at any time, and some guessing and 640one directory test can be outstanding at any time, and some guessing and
502heuristics are involved in mangling the paths. 641heuristics are involved in mangling the paths.
503 642
504This function combines C<TestDDARequest> and C<TestDDAResponse> in one 643This function combines C<TestDDARequest> and C<TestDDAResponse> in one
505request, handling file reading and writing as well. 644request, handling file reading and writing as well, and tries very hard to
645do the right thing.
646
647Both C<$local_directory> and C<$remote_directory> must specify the same
648directory - C<$local_directory> is the directory path on the client (where
649L<AnyEvent::FCP> runs) and C<$remote_directory> is the directory path on
650the server (where the freenet node runs). When both are running on the
651same node, the paths are generally identical.
652
653C<$want_read> and C<$want_write> should be set to a true value when you
654want to read (get) files or write (put) files, respectively.
655
656On error, an exception is thrown. Otherwise, C<$can_read> and
657C<$can_write> indicate whether you can reaqd or write to freenet via the
658directory.
506 659
507=cut 660=cut
508 661
509_txn test_dda => sub { 662_txn test_dda => sub {
510 my ($self, $cv, $local, $remote, $want_read, $want_write) = @_; 663 my ($self, $ok, $err, $local, $remote, $want_read, $want_write) = @_;
511 664
512 $self->serialise (test_dda => sub { 665 $self->serialise (test_dda => sub {
513 my ($self, $guard) = @_; 666 my ($self, $guard) = @_;
514 667
515 $self->send_msg (test_dda_request => 668 $self->send_msg (test_dda_request =>
516 directory => $remote, 669 directory => $remote,
517 want_read_directory => $want_read ? "true" : "false", 670 want_read_directory => $want_read ? "true" : "false",
518 want_write_directory => $want_write ? "true" : "false", 671 want_write_directory => $want_write ? "true" : "false",
519 ); 672 );
520 $self->on (sub { 673 $self->on (sub {
521 my ($type, $kv) = @_; 674 my ($self, $type, $kv) = @_;
522 675
523 if ($type eq "test_dda_reply") { 676 if ($type eq "test_dda_reply") {
524 # the filenames are all relative to the server-side directory, 677 # the filenames are all relative to the server-side directory,
525 # which might or might not match $remote anymore, so we 678 # which might or might not match $remote anymore, so we
526 # need to rewrite the paths to be relative to $local 679 # need to rewrite the paths to be relative to $local
536 } 689 }
537 690
538 my %response = (directory => $remote); 691 my %response = (directory => $remote);
539 692
540 if (length $kv->{read_filename}) { 693 if (length $kv->{read_filename}) {
541 warn "$local/$kv->{read_filename}";#d#
542 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") { 694 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
543 sysread $fh, my $buf, -s $fh; 695 sysread $fh, my $buf, -s $fh;
544 $response{read_content} = $buf; 696 $response{read_content} = $buf;
545 } 697 }
546 } 698 }
552 } 704 }
553 705
554 $self->send_msg (test_dda_response => %response); 706 $self->send_msg (test_dda_response => %response);
555 707
556 $self->on (sub { 708 $self->on (sub {
557 my ($type, $kv) = @_; 709 my ($self, $type, $kv) = @_;
558 710
559 $guard if 0; # reference 711 $guard if 0; # reference
560 712
561 if ($type eq "test_dda_complete") { 713 if ($type eq "test_dda_complete") {
562 $cv->( 714 $ok->(
563 $kv->{read_directory_allowed} eq "true", 715 $kv->{read_directory_allowed} eq "true",
564 $kv->{write_directory_allowed} eq "true", 716 $kv->{write_directory_allowed} eq "true",
565 ); 717 );
566 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) { 718 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
567 $cv->croak ($kv->{extra_description}); 719 $err->($kv->{extra_description});
568 return; 720 return;
569 } 721 }
570 722
571 1 723 1
572 }); 724 });
573 725
574 return; 726 return;
575 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) { 727 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
576 $cv->croak ($kv->{extra_description}); 728 $err->($kv);
577 return; 729 return;
578 } 730 }
579 731
580 1 732 1
581 }); 733 });
582 }); 734 });
583}; 735};
584 736
585=back 737=back
586 738
739=head2 REQUEST CACHE
740
741The C<AnyEvent::FCP> class keeps a request cache, where it caches all
742information from requests.
743
744For these messages, it will store a copy of the key-value pairs, together with a C<type> slot,
745in C<< $fcp->{req}{$identifier} >>:
746
747 persistent_get
748 persistent_put
749 persistent_put_dir
750
751This message updates the stored data:
752
753 persistent_request_modified
754
755This message will remove this entry:
756
757 persistent_request_removed
758
759These messages get merged into the cache entry, under their
760type, i.e. a C<simple_progress> message will be stored in C<<
761$fcp->{req}{$identifier}{simple_progress} >>:
762
763 simple_progress # get/put
764
765 uri_generated # put
766 generated_metadata # put
767 started_compression # put
768 finished_compression # put
769 put_failed # put
770 put_fetchable # put
771 put_successful # put
772
773 sending_to_network # get
774 compatibility_mode # get
775 expected_hashes # get
776 expected_mime # get
777 expected_data_length # get
778 get_failed # get
779 data_found # get
780 enter_finite_cooldown # get
781
782In addition, an event (basically a fake message) of type C<request_changed> is generated
783on every change, which will be called as C<< $cb->($fcp, $kv, $type) >>, where C<$type>
784is the type of the original message triggering the change,
785
786To fill this cache with the global queue and keep it updated,
787call C<watch_global> to subscribe to updates, followed by
788C<list_persistent_requests_sync>.
789
790 $fcp->watch_global_sync_; # do not wait
791 $fcp->list_persistent_requests; # wait
792
793To get a better idea of what is stored in the cache, here is an example of
794what might be stored in C<< $fcp->{req}{"Frost-gpl.txt"} >>:
795
796 {
797 identifier => "Frost-gpl.txt",
798 uri => 'CHK@Fnx5kzdrfE,EImdzaVyEWl,AAIC--8/gpl.txt',
799 binary_blob => "false",
800 global => "true",
801 max_retries => -1,
802 max_size => 9223372036854775807,
803 persistence => "forever",
804 priority_class => 3,
805 real_time => "false",
806 return_type => "direct",
807 started => "true",
808 type => "persistent_get",
809 verbosity => 2147483647,
810 sending_to_network => {
811 identifier => "Frost-gpl.txt",
812 global => "true",
813 },
814 compatibility_mode => {
815 identifier => "Frost-gpl.txt",
816 definitive => "true",
817 dont_compress => "false",
818 global => "true",
819 max => "COMPAT_1255",
820 min => "COMPAT_1255",
821 },
822 expected_hashes => {
823 identifier => "Frost-gpl.txt",
824 global => "true",
825 hashes => {
826 ed2k => "d83596f5ee3b7...",
827 md5 => "e0894e4a2a6...",
828 sha1 => "...",
829 sha256 => "...",
830 sha512 => "...",
831 tth => "...",
832 },
833 },
834 expected_mime => {
835 identifier => "Frost-gpl.txt",
836 global => "true",
837 metadata => { content_type => "application/rar" },
838 },
839 expected_data_length => {
840 identifier => "Frost-gpl.txt",
841 data_length => 37576,
842 global => "true",
843 },
844 simple_progress => {
845 identifier => "Frost-gpl.txt",
846 failed => 0,
847 fatally_failed => 0,
848 finalized_total => "true",
849 global => "true",
850 last_progress => 1438639282628,
851 required => 372,
852 succeeded => 102,
853 total => 747,
854 },
855 data_found => {
856 identifier => "Frost-gpl.txt",
857 completion_time => 1438663354026,
858 data_length => 37576,
859 global => "true",
860 metadata => { content_type => "image/jpeg" },
861 startup_time => 1438657196167,
862 },
863 }
864
587=head1 EXAMPLE PROGRAM 865=head1 EXAMPLE PROGRAM
588 866
589 use AnyEvent::FCP; 867 use AnyEvent::FCP;
590 868
591 my $fcp = new AnyEvent::FCP; 869 my $fcp = new AnyEvent::FCP;
592 870
593 # let us look at the global request list 871 # let us look at the global request list
594 $fcp->watch_global (1, 0); 872 $fcp->watch_global_ (1);
595 873
596 # list them, synchronously 874 # list them, synchronously
597 my $req = $fcp->list_persistent_requests_sync; 875 my $req = $fcp->list_persistent_requests;
598 876
599 # go through all requests 877 # go through all requests
878TODO
600 for my $req (values %$req) { 879 for my $req (values %$req) {
601 # skip jobs not directly-to-disk 880 # skip jobs not directly-to-disk
602 next unless $req->{return_type} eq "disk"; 881 next unless $req->{return_type} eq "disk";
603 # skip jobs not issued by FProxy 882 # skip jobs not issued by FProxy
604 next unless $req->{identifier} =~ /^FProxy:/; 883 next unless $req->{identifier} =~ /^FProxy:/;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines