ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.10 by root, Tue Aug 4 00:50:25 2015 UTC vs.
Revision 1.13 by root, Sat Aug 8 04:07:28 2015 UTC

35 35
36 use AnyEvent::FCP; 36 use AnyEvent::FCP;
37 37
38 my $fcp = new AnyEvent::FCP; 38 my $fcp = new AnyEvent::FCP;
39 39
40 $fcp->watch_global_sync (1, 0); 40 $fcp->watch_global (1, 0);
41 my $req = $fcp->list_persistent_requests_sync; 41 my $req = $fcp->list_persistent_requests;
42 42
43TODO
43 for my $req (values %$req) { 44 for my $req (values %$req) {
44 if ($req->{filename} =~ /a/) { 45 if ($req->{filename} =~ /a/) {
45 $fcp->modify_persistent_request_sync (1, $req->{identifier}, undef, 0); 46 $fcp->modify_persistent_request (1, $req->{identifier}, undef, 0);
46 } 47 }
47 } 48 }
48 49
49=head2 IMPORT TAGS 50=head2 IMPORT TAGS
50 51
51Nothing much can be "imported" from this module right now. 52Nothing much can be "imported" from this module right now.
52 53
53=head2 THE AnyEvent::FCP CLASS 54=head1 THE AnyEvent::FCP CLASS
54 55
55=over 4 56=over 4
56 57
57=cut 58=cut
58 59
77 $_ 78 $_
78} 79}
79 80
80sub tolc($) { 81sub tolc($) {
81 local $_ = shift; 82 local $_ = shift;
82 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/i; 83 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/;
83 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/i; 84 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/;
84 s/(?<=[a-z])(?=[A-Z])/_/g; 85 s/(?<=[a-z])(?=[A-Z])/_/g;
85 lc 86 lc
86} 87}
87 88
88=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 89=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, name => $name]
89 90
90Create a new FCP connection to the given host and port (default 91Create a new FCP connection to the given host and port (default
91127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). 92127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
92 93
93If no C<name> was specified, then AnyEvent::FCP will generate a 94If no C<name> was specified, then AnyEvent::FCP will generate a
94(hopefully) unique client name for you. 95(hopefully) unique client name for you.
95 96
96You can install a progress callback that is being called with the AnyEvent::FCP
97object, the type, a hashref with key-value pairs and a reference to any received data,
98for all unsolicited messages.
99
100Example:
101
102 sub progress_cb {
103 my ($self, $type, $kv, $rdata) = @_;
104
105 if ($type eq "simple_progress") {
106 warn "$kv->{identifier} $kv->{succeeded}/$kv->{required}\n";
107 }
108 }
109
110=cut 97=cut
111 98
112sub new { 99sub new {
113 my $class = shift; 100 my $class = shift;
114 my $self = bless { @_ }, $class; 101 my $self = bless {
115 102 host => $ENV{FREDHOST} || "127.0.0.1",
116 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 103 port => $ENV{FREDPORT} || 9481,
117 $self->{port} ||= $ENV{FREDPORT} || 9481; 104 timeout => 3600 * 2,
118 $self->{name} ||= time.rand.rand.rand; # lame 105 name => time.rand.rand.rand, # lame
119 $self->{timeout} ||= 3600*2; 106 @_,
120 $self->{progress} ||= sub { }; 107 queue => [],
121 108 req => {},
122 $self->{id} = "a0"; 109 id => "a0",
110 }, $class;
123 111
124 { 112 {
125 Scalar::Util::weaken (my $self = $self); 113 Scalar::Util::weaken (my $self = $self);
126 114
127 $self->{hdl} = new AnyEvent::Handle 115 $self->{hdl} = new AnyEvent::Handle
135 on_eof => $self->{on_eof} || sub { }; 123 on_eof => $self->{on_eof} || sub { };
136 124
137 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 125 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
138 } 126 }
139 127
140 $self->send_msg ( 128 $self->send_msg (client_hello =>
141 client_hello =>
142 name => $self->{name}, 129 name => $self->{name},
143 expected_version => "2.0", 130 expected_version => "2.0",
144 ); 131 );
145 132
146 $self 133 $self
147} 134}
202 push @$queue, $cb; 189 push @$queue, $cb;
203 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) }) 190 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
204 unless $#$queue; 191 unless $#$queue;
205} 192}
206 193
194# how to merge these types into $self->{persistent}
195our %PERSISTENT_TYPE = (
196 persistent_get => sub { %{ $_[1] } = (type => "persistent_get" , %{ $_[2] }) },
197 persistent_put => sub { %{ $_[1] } = (type => "persistent_put" , %{ $_[2] }) },
198 persistent_put_dir => sub { %{ $_[1] } = (type => "persistent_put_dir", %{ $_[2] }) },
199 persistent_request_modified => sub { %{ $_[1] } = (%{ $_[1] }, %{ $_[2] }) },
200 persistent_request_removed => sub { delete $_[0]{req}{$_[2]{identifier}} },
201
202 simple_progress => sub { $_[1]{simple_progress} = $_[2] }, # get/put
203
204 uri_generated => sub { $_[1]{uri_generated} = $_[2] }, # put
205 generated_metadata => sub { $_[1]{generated_metadata} = $_[2] }, # put
206 started_compression => sub { $_[1]{started_compression} = $_[2] }, # put
207 finished_compression => sub { $_[1]{finished_compression} = $_[2] }, # put
208 put_fetchable => sub { $_[1]{put_fetchable} = $_[2] }, # put
209 put_failed => sub { $_[1]{put_failed} = $_[2] }, # put
210 put_successful => sub { $_[1]{put_successful} = $_[2] }, # put
211
212 sending_to_network => sub { $_[1]{sending_to_network} = $_[2] }, # get
213 compatibility_mode => sub { $_[1]{compatibility_mode} = $_[2] }, # get
214 expected_hashes => sub { $_[1]{expected_hashes} = $_[2] }, # get
215 expected_mime => sub { $_[1]{expected_mime} = $_[2] }, # get
216 expected_data_length => sub { $_[1]{expected_data_length} = $_[2] }, # get
217 get_failed => sub { $_[1]{get_failed} = $_[2] }, # get
218 data_found => sub { $_[1]{data_found} = $_[2] }, # get
219 enter_finite_cooldown => sub { $_[1]{enter_finite_cooldown} = $_[2] }, # get
220);
221
222sub recv {
223 my ($self, $type, $kv, @extra) = @_;
224
225 if (my $cb = $PERSISTENT_TYPE{$type}) {
226 my $id = $kv->{identifier};
227 my $req = $_[0]{req}{$id} ||= {};
228 $cb->($self, $req, $kv);
229 $self->recv (request_change => $kv, $type, @extra);
230 }
231
232 my $on = $self->{on};
233 for (0 .. $#$on) {
234 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
235 splice @$on, $_, 1 unless defined $res;
236 return;
237 }
238 }
239
240 if (my $cb = $self->{queue}[0]) {
241 $cb->($self, $type, $kv, @extra)
242 and shift @{ $self->{queue} };
243 } else {
244 $self->default_recv ($type, $kv, @extra);
245 }
246}
247
207sub on_read { 248sub on_read {
208 my ($self) = @_; 249 my ($self) = @_;
209 250
210 my $type; 251 my $type;
211 my %kv; 252 my %kv;
212 my $rdata; 253 my $rdata;
213
214 my $done_cb = sub {
215 $kv{pkt_type} = $type;
216
217 my $on = $self->{on};
218 for (0 .. $#$on) {
219 unless (my $res = $on->[$_]($self, $type, \%kv, $rdata)) {
220 splice @$on, $_, 1 unless defined $res;
221 return;
222 }
223 }
224
225 if (my $cb = $self->{queue}[0]) {
226 $cb->($self, $type, \%kv, $rdata)
227 and shift @{ $self->{queue} };
228 } else {
229 $self->default_recv ($type, \%kv, $rdata);
230 }
231 };
232 254
233 my $hdr_cb; $hdr_cb = sub { 255 my $hdr_cb; $hdr_cb = sub {
234 if ($_[1] =~ /^([^=]+)=(.*)$/) { 256 if ($_[1] =~ /^([^=]+)=(.*)$/) {
235 my ($k, $v) = ($1, $2); 257 my ($k, $v) = ($1, $2);
236 my @k = split /\./, tolc $k; 258 my @k = split /\./, tolc $k;
249 271
250 $_[0]->push_read (line => $hdr_cb); 272 $_[0]->push_read (line => $hdr_cb);
251 } elsif ($_[1] eq "Data") { 273 } elsif ($_[1] eq "Data") {
252 $_[0]->push_read (chunk => delete $kv{data_length}, sub { 274 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
253 $rdata = \$_[1]; 275 $rdata = \$_[1];
254 $done_cb->(); 276 $self->recv ($type, \%kv, $rdata);
255 }); 277 });
256 } elsif ($_[1] eq "EndMessage") { 278 } elsif ($_[1] eq "EndMessage") {
257 $done_cb->(); 279 $self->recv ($type, \%kv);
258 } else { 280 } else {
259 die "protocol error, expected message end, got $_[1]\n";#d# 281 die "protocol error, expected message end, got $_[1]\n";#d#
260 } 282 }
261 }; 283 };
262 284
272 if ($type eq "node_hello") { 294 if ($type eq "node_hello") {
273 $self->{node_hello} = $kv; 295 $self->{node_hello} = $kv;
274 } elsif (exists $self->{id}{$kv->{identifier}}) { 296 } elsif (exists $self->{id}{$kv->{identifier}}) {
275 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) 297 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
276 and delete $self->{id}{$kv->{identifier}}; 298 and delete $self->{id}{$kv->{identifier}};
277 } else {
278 &{ $self->{progress} };
279 } 299 }
280} 300}
301
302=back
303
304=head2 FCP REQUESTS
305
306The following methods implement various requests. Most of them map
307directory to the FCP message of the same name. The added benefit of
308these over sending requests yourself is that they handle the necessary
309serialisation, protocol quirks, and replies.
310
311All of them exist in two versions, the variant shown in this manpage, and
312a variant with an extra C<_> at the end, and an extra C<$cb> argument. The
313version as shown is I<synchronous> - it will wait for any replies, and
314either return the reply, or croak with an error. The underscore variant
315returns immediately and invokes one or more callbacks or condvars later.
316
317For example, the call
318
319 $info = $fcp->get_plugin_info ($name, $detailed);
320
321Also comes in this underscore variant:
322
323 $fcp->get_plugin_info_ ($name, $detailed, $cb);
324
325You can thinbk of the underscore as a kind of continuation indicator - the
326normal function waits and returns with the data, the C<_> indicates that
327you pass the continuation yourself, and the continuation will be invoked
328with the results.
329
330This callback/continuation argument (C<$cb>) can come in three forms itself:
331
332=over 4
333
334=item A code reference (or rather anything not matching some other alternative)
335
336This code reference will be invoked with the result on success. On an
337error, it will die (in the event loop) with a backtrace of the call site.
338
339This is a popular choice, but it makes handling errors hard - make sure
340you never generate protocol errors!
341
342=item A condvar (as returned by e.g. C<< AnyEvent->condvar >>)
343
344When a condvar is passed, it is sent (C<< $cv->send ($results) >>) the
345results when the request has finished. Should an error occur, the error
346will instead result in C<< $cv->croak ($error) >>.
347
348This is also a popular choice.
349
350=item An array with two callbacks C<[$success, $failure]>
351
352The C<$success> callback will be invoked with the results, while the
353C<$failure> callback will be invoked on any errors.
354
355=item C<undef>
356
357This is the same thing as specifying C<sub { }> as callback, i.e. on
358success, the results are ignored, while on failure, you the module dies
359with a backtrace.
360
361This is good for quick scripts, or when you really aren't interested in
362the results.
363
364=back
365
366=cut
367
368our $NOP_CB = sub { };
281 369
282sub _txn { 370sub _txn {
283 my ($name, $sub) = @_; 371 my ($name, $sub) = @_;
284 372
285 *{$name} = sub { 373 *{$name} = sub {
286 splice @_, 1, 0, (my $cv = AnyEvent->condvar); 374 my $cv = AE::cv;
287 &$sub;
288 $cv
289 };
290 375
291 *{"$name\_sync"} = sub { 376 splice @_, 1, 0, $cv, sub { $cv->throw ($_[0]{extra_description}) };
292 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
293 &$sub; 377 &$sub;
294 $cv->recv 378 $cv->recv
295 }; 379 };
296}
297 380
298=item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]]) 381 *{"$name\_"} = sub {
382 my ($ok, $err) = pop;
299 383
384 if (ARRAY:: eq ref $ok) {
385 ($ok, $err) = @$ok;
386 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
387 $err = sub { $ok->throw ($_[0]{extra_description}) };
388 } else {
389 my $bt = Carp::longmess "";
390 $err = sub {
391 die "$_[0]{extra_description}$bt";
392 };
393 }
394
395 $ok ||= $NOP_CB;
396
397 splice @_, 1, 0, $ok, $err;
398 &$sub;
399 };
400}
401
402=over 4
403
300=item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]]) 404=item $peers = $fcp->list_peers ([$with_metdata[, $with_volatile]])
301 405
302=cut 406=cut
303 407
304_txn list_peers => sub { 408_txn list_peers => sub {
305 my ($self, $cv, $with_metadata, $with_volatile) = @_; 409 my ($self, $ok, undef, $with_metadata, $with_volatile) = @_;
306 410
307 my @res; 411 my @res;
308 412
309 $self->send_msg (list_peers => 413 $self->send_msg (list_peers =>
310 with_metadata => $with_metadata ? "true" : "false", 414 with_metadata => $with_metadata ? "true" : "false",
311 with_volatile => $with_volatile ? "true" : "false", 415 with_volatile => $with_volatile ? "true" : "false",
312 id_cb => sub { 416 id_cb => sub {
313 my ($self, $type, $kv, $rdata) = @_; 417 my ($self, $type, $kv, $rdata) = @_;
314 418
315 if ($type eq "end_list_peers") { 419 if ($type eq "end_list_peers") {
316 $cv->(\@res); 420 $ok->(\@res);
317 1 421 1
318 } else { 422 } else {
319 push @res, $kv; 423 push @res, $kv;
320 0 424 0
321 } 425 }
322 }, 426 },
323 ); 427 );
324}; 428};
325 429
326=item $cv = $fcp->list_peer_notes ($node_identifier)
327
328=item $notes = $fcp->list_peer_notes_sync ($node_identifier) 430=item $notes = $fcp->list_peer_notes ($node_identifier)
329 431
330=cut 432=cut
331 433
332_txn list_peer_notes => sub { 434_txn list_peer_notes => sub {
333 my ($self, $cv, $node_identifier) = @_; 435 my ($self, $ok, undef, $node_identifier) = @_;
334 436
335 $self->send_msg (list_peer_notes => 437 $self->send_msg (list_peer_notes =>
336 node_identifier => $node_identifier, 438 node_identifier => $node_identifier,
337 id_cb => sub { 439 id_cb => sub {
338 my ($self, $type, $kv, $rdata) = @_; 440 my ($self, $type, $kv, $rdata) = @_;
339 441
340 $cv->($kv); 442 $ok->($kv);
341 1 443 1
342 }, 444 },
343 ); 445 );
344}; 446};
345 447
346=item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
347
348=item $fcp->watch_global_sync ($enabled[, $verbosity_mask]) 448=item $fcp->watch_global ($enabled[, $verbosity_mask])
349 449
350=cut 450=cut
351 451
352_txn watch_global => sub { 452_txn watch_global => sub {
353 my ($self, $cv, $enabled, $verbosity_mask) = @_; 453 my ($self, $ok, $err, $enabled, $verbosity_mask) = @_;
354 454
355 $self->send_msg (watch_global => 455 $self->send_msg (watch_global =>
356 enabled => $enabled ? "true" : "false", 456 enabled => $enabled ? "true" : "false",
357 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (), 457 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
358 ); 458 );
359 459
360 $cv->(); 460 $ok->();
361}; 461};
362 462
363=item $cv = $fcp->list_persistent_requests
364
365=item $reqs = $fcp->list_persistent_requests_sync 463=item $reqs = $fcp->list_persistent_requests
366 464
367=cut 465=cut
368 466
369_txn list_persistent_requests => sub { 467_txn list_persistent_requests => sub {
370 my ($self, $cv) = @_; 468 my ($self, $ok, $err) = @_;
371 469
372 $self->serialise (list_persistent_requests => sub { 470 $self->serialise (list_persistent_requests => sub {
373 my ($self, $guard) = @_; 471 my ($self, $guard) = @_;
374 472
375 my %res; 473 my @res;
376 474
377 $self->send_msg ("list_persistent_requests"); 475 $self->send_msg ("list_persistent_requests");
378 476
379 $self->on (sub { 477 $self->on (sub {
380 my ($self, $type, $kv, $rdata) = @_; 478 my ($self, $type, $kv, $rdata) = @_;
381 479
382 $guard if 0; 480 $guard if 0;
383 481
384 if ($type eq "end_list_persistent_requests") { 482 if ($type eq "end_list_persistent_requests") {
385 $cv->(\%res); 483 $ok->(\@res);
386 return; 484 return;
387 } else { 485 } else {
388 my $id = $kv->{identifier}; 486 my $id = $kv->{identifier};
389 487
390 if ($type =~ /^persistent_(get|put|put_dir)$/) { 488 if ($type =~ /^persistent_(get|put|put_dir)$/) {
391 $res{$id} = { 489 push @res, [$type, $kv];
392 type => $1,
393 %{ $res{$id} },
394 %$kv,
395 };
396 } elsif ($type eq "simple_progress") {
397 delete $kv->{pkt_type}; # save memory
398 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
399 } else {
400 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
401 } 490 }
402 } 491 }
403 492
404 1 493 1
405 }); 494 });
406 }); 495 });
407}; 496};
408 497
409=item $cv = $fcp->remove_request ($global, $identifier) 498=item $sync = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
410 499
411=item $status = $fcp->remove_request_sync ($global, $identifier) 500Update either the C<client_token> or C<priority_class> of a request
501identified by C<$global> and C<$identifier>, depending on which of
502C<$client_token> and C<$priority_class> are not C<undef>.
412 503
413=cut 504=cut
414 505
415_txn remove_request => sub { 506_txn modify_persistent_request => sub {
416 my ($self, $cv, $global, $identifier) = @_; 507 my ($self, $ok, $err, $global, $identifier, $client_token, $priority_class) = @_;
417 508
509 $self->serialise ($identifier => sub {
510 my ($self, $guard) = @_;
511
418 $self->send_msg (remove_request => 512 $self->send_msg (modify_persistent_request =>
419 global => $global ? "true" : "false", 513 global => $global ? "true" : "false",
420 identifier => $identifier, 514 identifier => $identifier,
421 id_cb => sub { 515 defined $client_token ? (client_token => $client_token ) : (),
516 defined $priority_class ? (priority_class => $priority_class) : (),
517 );
518
519 $self->on (sub {
422 my ($self, $type, $kv, $rdata) = @_; 520 my ($self, $type, $kv, @extra) = @_;
423 521
522 $guard if 0;
523
524 if ($kv->{identifier} eq $identifier) {
525 if ($type eq "persistent_request_modified") {
424 $cv->($kv); 526 $ok->($kv);
527 return;
528 } elsif ($type eq "protocol_error") {
529 $err->($kv);
530 return;
531 }
532 }
533
425 1 534 1
426 }, 535 });
427 ); 536 });
428}; 537};
429 538
430=item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
431
432=item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
433
434=cut
435
436_txn modify_persistent_request => sub {
437 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
438
439 $self->send_msg (modify_persistent_request =>
440 global => $global ? "true" : "false",
441 defined $client_token ? (client_token => $client_token ) : (),
442 defined $priority_class ? (priority_class => $priority_class) : (),
443 identifier => $identifier,
444 id_cb => sub {
445 my ($self, $type, $kv, $rdata) = @_;
446
447 $cv->($kv);
448 1
449 },
450 );
451};
452
453=item $cv = $fcp->get_plugin_info ($name, $detailed)
454
455=item $info = $fcp->get_plugin_info_sync ($name, $detailed) 539=item $info = $fcp->get_plugin_info ($name, $detailed)
456 540
457=cut 541=cut
458 542
459_txn get_plugin_info => sub { 543_txn get_plugin_info => sub {
460 my ($self, $cv, $name, $detailed) = @_; 544 my ($self, $ok, $err, $name, $detailed) = @_;
461 545
462 $self->send_msg (get_plugin_info => 546 $self->send_msg (get_plugin_info =>
463 plugin_name => $name, 547 plugin_name => $name,
464 detailed => $detailed ? "true" : "false", 548 detailed => $detailed ? "true" : "false",
465 id_cb => sub { 549 id_cb => sub {
466 my ($self, $type, $kv, $rdata) = @_; 550 my ($self, $type, $kv, $rdata) = @_;
467 551
468 $cv->($kv); 552 $ok->($kv);
469 1 553 1
470 }, 554 },
471 ); 555 );
472}; 556};
473 557
474=item $cv = $fcp->client_get ($uri, $identifier, %kv)
475
476=item $status = $fcp->client_get_sync ($uri, $identifier, %kv) 558=item $status = $fcp->client_get ($uri, $identifier, %kv)
477 559
478%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>). 560%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
479 561
480ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries, 562ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
481priority_class, persistence, client_token, global, return_type, 563priority_class, persistence, client_token, global, return_type,
482binary_blob, allowed_mime_types, filename, temp_filename 564binary_blob, allowed_mime_types, filename, temp_filename
483 565
484=cut 566=cut
485 567
486_txn client_get => sub { 568_txn client_get => sub {
487 my ($self, $cv, $uri, $identifier, %kv) = @_; 569 my ($self, $ok, $err, $uri, $identifier, %kv) = @_;
488 570
571 $self->serialise ($identifier => sub {
572 my ($self, $guard) = @_;
573
489 $self->send_msg (client_get => 574 $self->send_msg (client_get =>
490 %kv, 575 %kv,
491 uri => $uri, 576 uri => $uri,
492 identifier => $identifier, 577 identifier => $identifier,
493 id_cb => sub { 578 );
579
580 $self->on (sub {
494 my ($self, $type, $kv, $rdata) = @_; 581 my ($self, $type, $kv, @extra) = @_;
495 582
583 $guard if 0;
584
585 if ($kv->{identifier} eq $identifier) {
586 if ($type eq "persistent_get") {
496 $cv->($kv); 587 $ok->($kv);
588 return;
589 } elsif ($type eq "protocol_error") {
590 $err->($kv);
591 return;
592 }
593 }
594
497 1 595 1
498 }, 596 });
499 ); 597 });
500}; 598};
501 599
502=item $cv = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write) 600=item $status = $fcp->remove_request ($identifier[, $global])
503 601
602Remove the request with the given isdentifier. Returns true if successful,
603false on error.
604
605=cut
606
607_txn remove_request => sub {
608 my ($self, $ok, $err, $identifier, $global) = @_;
609
610 $self->serialise ($identifier => sub {
611 my ($self, $guard) = @_;
612
613 $self->send_msg (remove_request =>
614 identifier => $identifier,
615 global => $global ? "true" : "false",
616 );
617 $self->on (sub {
618 my ($self, $type, $kv, @extra) = @_;
619
620 $guard if 0;
621
622 if ($kv->{identifier} eq $identifier) {
623 if ($type eq "persistent_request_removed") {
624 $ok->(1);
625 return;
626 } elsif ($type eq "protocol_error") {
627 $err->($kv);
628 return;
629 }
630 }
631
632 1
633 });
634 });
635};
636
504=item ($can_read, $can_write) = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write)) 637=item ($can_read, $can_write) = $fcp->test_dda ($local_directory, $remote_directory, $want_read, $want_write))
505 638
506The DDA test in FCP is probably the single most broken protocol - only 639The DDA test in FCP is probably the single most broken protocol - only
507one directory test can be outstanding at any time, and some guessing and 640one directory test can be outstanding at any time, and some guessing and
508heuristics are involved in mangling the paths. 641heuristics are involved in mangling the paths.
509 642
510This function combines C<TestDDARequest> and C<TestDDAResponse> in one 643This function combines C<TestDDARequest> and C<TestDDAResponse> in one
511request, handling file reading and writing as well. 644request, handling file reading and writing as well, and tries very hard to
645do the right thing.
646
647Both C<$local_directory> and C<$remote_directory> must specify the same
648directory - C<$local_directory> is the directory path on the client (where
649L<AnyEvent::FCP> runs) and C<$remote_directory> is the directory path on
650the server (where the freenet node runs). When both are running on the
651same node, the paths are generally identical.
652
653C<$want_read> and C<$want_write> should be set to a true value when you
654want to read (get) files or write (put) files, respectively.
655
656On error, an exception is thrown. Otherwise, C<$can_read> and
657C<$can_write> indicate whether you can reaqd or write to freenet via the
658directory.
512 659
513=cut 660=cut
514 661
515_txn test_dda => sub { 662_txn test_dda => sub {
516 my ($self, $cv, $local, $remote, $want_read, $want_write) = @_; 663 my ($self, $ok, $err, $local, $remote, $want_read, $want_write) = @_;
517 664
518 $self->serialise (test_dda => sub { 665 $self->serialise (test_dda => sub {
519 my ($self, $guard) = @_; 666 my ($self, $guard) = @_;
520 667
521 $self->send_msg (test_dda_request => 668 $self->send_msg (test_dda_request =>
542 } 689 }
543 690
544 my %response = (directory => $remote); 691 my %response = (directory => $remote);
545 692
546 if (length $kv->{read_filename}) { 693 if (length $kv->{read_filename}) {
547 warn "$local/$kv->{read_filename}";#d#
548 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") { 694 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
549 sysread $fh, my $buf, -s $fh; 695 sysread $fh, my $buf, -s $fh;
550 $response{read_content} = $buf; 696 $response{read_content} = $buf;
551 } 697 }
552 } 698 }
563 my ($self, $type, $kv) = @_; 709 my ($self, $type, $kv) = @_;
564 710
565 $guard if 0; # reference 711 $guard if 0; # reference
566 712
567 if ($type eq "test_dda_complete") { 713 if ($type eq "test_dda_complete") {
568 $cv->( 714 $ok->(
569 $kv->{read_directory_allowed} eq "true", 715 $kv->{read_directory_allowed} eq "true",
570 $kv->{write_directory_allowed} eq "true", 716 $kv->{write_directory_allowed} eq "true",
571 ); 717 );
572 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) { 718 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
573 $cv->croak ($kv->{extra_description}); 719 $err->($kv->{extra_description});
574 return; 720 return;
575 } 721 }
576 722
577 1 723 1
578 }); 724 });
579 725
580 return; 726 return;
581 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) { 727 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
582 $cv->croak ($kv->{extra_description}); 728 $err->($kv);
583 return; 729 return;
584 } 730 }
585 731
586 1 732 1
587 }); 733 });
588 }); 734 });
589}; 735};
590 736
591=back 737=back
592 738
739=head2 REQUEST CACHE
740
741The C<AnyEvent::FCP> class keeps a request cache, where it caches all
742information from requests.
743
744For these messages, it will store a copy of the key-value pairs, together with a C<type> slot,
745in C<< $fcp->{req}{$identifier} >>:
746
747 persistent_get
748 persistent_put
749 persistent_put_dir
750
751This message updates the stored data:
752
753 persistent_request_modified
754
755This message will remove this entry:
756
757 persistent_request_removed
758
759These messages get merged into the cache entry, under their
760type, i.e. a C<simple_progress> message will be stored in C<<
761$fcp->{req}{$identifier}{simple_progress} >>:
762
763 simple_progress # get/put
764
765 uri_generated # put
766 generated_metadata # put
767 started_compression # put
768 finished_compression # put
769 put_failed # put
770 put_fetchable # put
771 put_successful # put
772
773 sending_to_network # get
774 compatibility_mode # get
775 expected_hashes # get
776 expected_mime # get
777 expected_data_length # get
778 get_failed # get
779 data_found # get
780 enter_finite_cooldown # get
781
782In addition, an event (basically a fake message) of type C<request_changed> is generated
783on every change, which will be called as C<< $cb->($fcp, $kv, $type) >>, where C<$type>
784is the type of the original message triggering the change,
785
786To fill this cache with the global queue and keep it updated,
787call C<watch_global> to subscribe to updates, followed by
788C<list_persistent_requests_sync>.
789
790 $fcp->watch_global_sync_; # do not wait
791 $fcp->list_persistent_requests; # wait
792
793To get a better idea of what is stored in the cache, here is an example of
794what might be stored in C<< $fcp->{req}{"Frost-gpl.txt"} >>:
795
796 {
797 identifier => "Frost-gpl.txt",
798 uri => 'CHK@Fnx5kzdrfE,EImdzaVyEWl,AAIC--8/gpl.txt',
799 binary_blob => "false",
800 global => "true",
801 max_retries => -1,
802 max_size => 9223372036854775807,
803 persistence => "forever",
804 priority_class => 3,
805 real_time => "false",
806 return_type => "direct",
807 started => "true",
808 type => "persistent_get",
809 verbosity => 2147483647,
810 sending_to_network => {
811 identifier => "Frost-gpl.txt",
812 global => "true",
813 },
814 compatibility_mode => {
815 identifier => "Frost-gpl.txt",
816 definitive => "true",
817 dont_compress => "false",
818 global => "true",
819 max => "COMPAT_1255",
820 min => "COMPAT_1255",
821 },
822 expected_hashes => {
823 identifier => "Frost-gpl.txt",
824 global => "true",
825 hashes => {
826 ed2k => "d83596f5ee3b7...",
827 md5 => "e0894e4a2a6...",
828 sha1 => "...",
829 sha256 => "...",
830 sha512 => "...",
831 tth => "...",
832 },
833 },
834 expected_mime => {
835 identifier => "Frost-gpl.txt",
836 global => "true",
837 metadata => { content_type => "application/rar" },
838 },
839 expected_data_length => {
840 identifier => "Frost-gpl.txt",
841 data_length => 37576,
842 global => "true",
843 },
844 simple_progress => {
845 identifier => "Frost-gpl.txt",
846 failed => 0,
847 fatally_failed => 0,
848 finalized_total => "true",
849 global => "true",
850 last_progress => 1438639282628,
851 required => 372,
852 succeeded => 102,
853 total => 747,
854 },
855 data_found => {
856 identifier => "Frost-gpl.txt",
857 completion_time => 1438663354026,
858 data_length => 37576,
859 global => "true",
860 metadata => { content_type => "image/jpeg" },
861 startup_time => 1438657196167,
862 },
863 }
864
593=head1 EXAMPLE PROGRAM 865=head1 EXAMPLE PROGRAM
594 866
595 use AnyEvent::FCP; 867 use AnyEvent::FCP;
596 868
597 my $fcp = new AnyEvent::FCP; 869 my $fcp = new AnyEvent::FCP;
598 870
599 # let us look at the global request list 871 # let us look at the global request list
600 $fcp->watch_global (1, 0); 872 $fcp->watch_global_ (1);
601 873
602 # list them, synchronously 874 # list them, synchronously
603 my $req = $fcp->list_persistent_requests_sync; 875 my $req = $fcp->list_persistent_requests;
604 876
605 # go through all requests 877 # go through all requests
878TODO
606 for my $req (values %$req) { 879 for my $req (values %$req) {
607 # skip jobs not directly-to-disk 880 # skip jobs not directly-to-disk
608 next unless $req->{return_type} eq "disk"; 881 next unless $req->{return_type} eq "disk";
609 # skip jobs not issued by FProxy 882 # skip jobs not issued by FProxy
610 next unless $req->{identifier} =~ /^FProxy:/; 883 next unless $req->{identifier} =~ /^FProxy:/;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines