ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.9 by root, Tue Aug 4 00:35:16 2015 UTC vs.
Revision 1.14 by root, Sat Aug 8 14:09:47 2015 UTC

35 35
36 use AnyEvent::FCP; 36 use AnyEvent::FCP;
37 37
38 my $fcp = new AnyEvent::FCP; 38 my $fcp = new AnyEvent::FCP;
39 39
40 $fcp->watch_global_sync (1, 0); 40 $fcp->watch_global (1, 0);
41 my $req = $fcp->list_persistent_requests_sync; 41 my $req = $fcp->list_persistent_requests;
42 42
43TODO
43 for my $req (values %$req) { 44 for my $req (values %$req) {
44 if ($req->{filename} =~ /a/) { 45 if ($req->{filename} =~ /a/) {
45 $fcp->modify_persistent_request_sync (1, $req->{identifier}, undef, 0); 46 $fcp->modify_persistent_request (1, $req->{identifier}, undef, 0);
46 } 47 }
47 } 48 }
48 49
49=head2 IMPORT TAGS 50=head2 IMPORT TAGS
50 51
51Nothing much can be "imported" from this module right now. 52Nothing much can be "imported" from this module right now.
52 53
53=head2 THE AnyEvent::FCP CLASS 54=head1 THE AnyEvent::FCP CLASS
54 55
55=over 4 56=over 4
56 57
57=cut 58=cut
58 59
77 $_ 78 $_
78} 79}
79 80
80sub tolc($) { 81sub tolc($) {
81 local $_ = shift; 82 local $_ = shift;
82 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/i; 83 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/;
83 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/i; 84 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/;
84 s/(?<=[a-z])(?=[A-Z])/_/g; 85 s/(?<=[a-z])(?=[A-Z])/_/g;
85 lc 86 lc
86} 87}
87 88
88=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 89=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, name => $name]
89 90
90Create a new FCP connection to the given host and port (default 91Create a new FCP connection to the given host and port (default
91127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). 92127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
92 93
93If no C<name> was specified, then AnyEvent::FCP will generate a 94If no C<name> was specified, then AnyEvent::FCP will generate a
94(hopefully) unique client name for you. 95(hopefully) unique client name for you.
95 96
96You can install a progress callback that is being called with the AnyEvent::FCP
97object, the type, a hashref with key-value pairs and a reference to any received data,
98for all unsolicited messages.
99
100Example:
101
102 sub progress_cb {
103 my ($self, $type, $kv, $rdata) = @_;
104
105 if ($type eq "simple_progress") {
106 warn "$kv->{identifier} $kv->{succeeded}/$kv->{required}\n";
107 }
108 }
109
110=cut 97=cut
111 98
112sub new { 99sub new {
113 my $class = shift; 100 my $class = shift;
101
102 my $rand = join "", map chr 0x21 + rand 94, 1..40; # ~ 262 bits entropy
103
114 my $self = bless { @_ }, $class; 104 my $self = bless {
115 105 host => $ENV{FREDHOST} || "127.0.0.1",
116 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 106 port => $ENV{FREDPORT} || 9481,
117 $self->{port} ||= $ENV{FREDPORT} || 9481; 107 timeout => 3600 * 2,
118 $self->{name} ||= time.rand.rand.rand; # lame 108 name => time.rand.rand.rand, # lame
119 $self->{timeout} ||= 3600*2; 109 @_,
120 $self->{progress} ||= sub { }; 110 queue => [],
121 111 req => {},
122 $self->{id} = "a0"; 112 prefix => "..:aefcpid-$rand:",
113 idseq => "a0",
114 }, $class;
123 115
124 { 116 {
125 Scalar::Util::weaken (my $self = $self); 117 Scalar::Util::weaken (my $self = $self);
126 118
127 $self->{hdl} = new AnyEvent::Handle 119 $self->{hdl} = new AnyEvent::Handle
135 on_eof => $self->{on_eof} || sub { }; 127 on_eof => $self->{on_eof} || sub { };
136 128
137 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 129 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
138 } 130 }
139 131
140 $self->send_msg ( 132 $self->send_msg (client_hello =>
141 client_hello =>
142 name => $self->{name}, 133 name => $self->{name},
143 expected_version => "2.0", 134 expected_version => "2.0",
144 ); 135 );
145 136
146 $self 137 $self
147} 138}
148 139
140sub identifier {
141 $_[0]{prefix} . ++$_[0]{idseq}
142}
143
149sub send_msg { 144sub send_msg {
150 my ($self, $type, %kv) = @_; 145 my ($self, $type, %kv) = @_;
151 146
152 my $data = delete $kv{data}; 147 my $data = delete $kv{data};
153 148
154 if (exists $kv{id_cb}) { 149 if (exists $kv{id_cb}) {
155 my $id = $kv{identifier} ||= ++$self->{id}; 150 my $id = $kv{identifier} ||= $self->identifier;
156 $self->{id}{$id} = delete $kv{id_cb}; 151 $self->{id}{$id} = delete $kv{id_cb};
157 } 152 }
158 153
159 my $msg = (touc $type) . "\012" 154 my $msg = (touc $type) . "\012"
160 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; 155 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
186} 181}
187 182
188sub _push_queue { 183sub _push_queue {
189 my ($self, $queue) = @_; 184 my ($self, $queue) = @_;
190 185
191 warn "oush @$queue\n";#d#
192 shift @$queue; 186 shift @$queue;
193 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) }) 187 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
194 if @$queue; 188 if @$queue;
195} 189}
196 190
203 push @$queue, $cb; 197 push @$queue, $cb;
204 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) }) 198 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
205 unless $#$queue; 199 unless $#$queue;
206} 200}
207 201
202# how to merge these types into $self->{persistent}
203our %PERSISTENT_TYPE = (
204 persistent_get => sub { %{ $_[1] } = (type => "persistent_get" , %{ $_[2] }) },
205 persistent_put => sub { %{ $_[1] } = (type => "persistent_put" , %{ $_[2] }) },
206 persistent_put_dir => sub { %{ $_[1] } = (type => "persistent_put_dir", %{ $_[2] }) },
207 persistent_request_modified => sub { %{ $_[1] } = (%{ $_[1] }, %{ $_[2] }) },
208 persistent_request_removed => sub { delete $_[0]{req}{$_[2]{identifier}} },
209
210 simple_progress => sub { $_[1]{simple_progress} = $_[2] }, # get/put
211
212 uri_generated => sub { $_[1]{uri_generated} = $_[2] }, # put
213 generated_metadata => sub { $_[1]{generated_metadata} = $_[2] }, # put
214 started_compression => sub { $_[1]{started_compression} = $_[2] }, # put
215 finished_compression => sub { $_[1]{finished_compression} = $_[2] }, # put
216 put_fetchable => sub { $_[1]{put_fetchable} = $_[2] }, # put
217 put_failed => sub { $_[1]{put_failed} = $_[2] }, # put
218 put_successful => sub { $_[1]{put_successful} = $_[2] }, # put
219
220 sending_to_network => sub { $_[1]{sending_to_network} = $_[2] }, # get
221 compatibility_mode => sub { $_[1]{compatibility_mode} = $_[2] }, # get
222 expected_hashes => sub { $_[1]{expected_hashes} = $_[2] }, # get
223 expected_mime => sub { $_[1]{expected_mime} = $_[2] }, # get
224 expected_data_length => sub { $_[1]{expected_data_length} = $_[2] }, # get
225 get_failed => sub { $_[1]{get_failed} = $_[2] }, # get
226 data_found => sub { $_[1]{data_found} = $_[2] }, # get
227 enter_finite_cooldown => sub { $_[1]{enter_finite_cooldown} = $_[2] }, # get
228);
229
230sub recv {
231 my ($self, $type, $kv, @extra) = @_;
232
233 if (my $cb = $PERSISTENT_TYPE{$type}) {
234 my $id = $kv->{identifier};
235 my $req = $_[0]{req}{$id} ||= {};
236 $cb->($self, $req, $kv);
237 $self->recv (request_change => $kv, $type, @extra);
238 }
239
240 my $on = $self->{on};
241 for (0 .. $#$on) {
242 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
243 splice @$on, $_, 1 unless defined $res;
244 return;
245 }
246 }
247
248 if (my $cb = $self->{queue}[0]) {
249 $cb->($self, $type, $kv, @extra)
250 and shift @{ $self->{queue} };
251 } else {
252 $self->default_recv ($type, $kv, @extra);
253 }
254}
255
208sub on_read { 256sub on_read {
209 my ($self) = @_; 257 my ($self) = @_;
210 258
211 my $type; 259 my $type;
212 my %kv; 260 my %kv;
213 my $rdata; 261 my $rdata;
214
215 my $done_cb = sub {
216 $kv{pkt_type} = $type;
217
218 my $on = $self->{on};
219 for (0 .. $#$on) {
220 unless (my $res = $on->[$_]($type, \%kv, $rdata)) {
221 splice @$on, $_, 1 unless defined $res;
222 return;
223 }
224 }
225
226 if (my $cb = $self->{queue}[0]) {
227 $cb->($self, $type, \%kv, $rdata)
228 and shift @{ $self->{queue} };
229 } else {
230 $self->default_recv ($type, \%kv, $rdata);
231 }
232 };
233 262
234 my $hdr_cb; $hdr_cb = sub { 263 my $hdr_cb; $hdr_cb = sub {
235 if ($_[1] =~ /^([^=]+)=(.*)$/) { 264 if ($_[1] =~ /^([^=]+)=(.*)$/) {
236 my ($k, $v) = ($1, $2); 265 my ($k, $v) = ($1, $2);
237 my @k = split /\./, tolc $k; 266 my @k = split /\./, tolc $k;
250 279
251 $_[0]->push_read (line => $hdr_cb); 280 $_[0]->push_read (line => $hdr_cb);
252 } elsif ($_[1] eq "Data") { 281 } elsif ($_[1] eq "Data") {
253 $_[0]->push_read (chunk => delete $kv{data_length}, sub { 282 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
254 $rdata = \$_[1]; 283 $rdata = \$_[1];
255 $done_cb->(); 284 $self->recv ($type, \%kv, $rdata);
256 }); 285 });
257 } elsif ($_[1] eq "EndMessage") { 286 } elsif ($_[1] eq "EndMessage") {
258 $done_cb->(); 287 $self->recv ($type, \%kv);
259 } else { 288 } else {
260 die "protocol error, expected message end, got $_[1]\n";#d# 289 die "protocol error, expected message end, got $_[1]\n";#d#
261 } 290 }
262 }; 291 };
263 292
273 if ($type eq "node_hello") { 302 if ($type eq "node_hello") {
274 $self->{node_hello} = $kv; 303 $self->{node_hello} = $kv;
275 } elsif (exists $self->{id}{$kv->{identifier}}) { 304 } elsif (exists $self->{id}{$kv->{identifier}}) {
276 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) 305 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
277 and delete $self->{id}{$kv->{identifier}}; 306 and delete $self->{id}{$kv->{identifier}};
278 } else {
279 &{ $self->{progress} };
280 } 307 }
281} 308}
309
310=back
311
312=head2 FCP REQUESTS
313
314The following methods implement various requests. Most of them map
315directory to the FCP message of the same name. The added benefit of
316these over sending requests yourself is that they handle the necessary
317serialisation, protocol quirks, and replies.
318
319All of them exist in two versions, the variant shown in this manpage, and
320a variant with an extra C<_> at the end, and an extra C<$cb> argument. The
321version as shown is I<synchronous> - it will wait for any replies, and
322either return the reply, or croak with an error. The underscore variant
323returns immediately and invokes one or more callbacks or condvars later.
324
325For example, the call
326
327 $info = $fcp->get_plugin_info ($name, $detailed);
328
329Also comes in this underscore variant:
330
331 $fcp->get_plugin_info_ ($name, $detailed, $cb);
332
333You can thinbk of the underscore as a kind of continuation indicator - the
334normal function waits and returns with the data, the C<_> indicates that
335you pass the continuation yourself, and the continuation will be invoked
336with the results.
337
338This callback/continuation argument (C<$cb>) can come in three forms itself:
339
340=over 4
341
342=item A code reference (or rather anything not matching some other alternative)
343
344This code reference will be invoked with the result on success. On an
345error, it will die (in the event loop) with a backtrace of the call site.
346
347This is a popular choice, but it makes handling errors hard - make sure
348you never generate protocol errors!
349
350=item A condvar (as returned by e.g. C<< AnyEvent->condvar >>)
351
352When a condvar is passed, it is sent (C<< $cv->send ($results) >>) the
353results when the request has finished. Should an error occur, the error
354will instead result in C<< $cv->croak ($error) >>.
355
356This is also a popular choice.
357
358=item An array with two callbacks C<[$success, $failure]>
359
360The C<$success> callback will be invoked with the results, while the
361C<$failure> callback will be invoked on any errors.
362
363=item C<undef>
364
365This is the same thing as specifying C<sub { }> as callback, i.e. on
366success, the results are ignored, while on failure, you the module dies
367with a backtrace.
368
369This is good for quick scripts, or when you really aren't interested in
370the results.
371
372=back
373
374=cut
375
376our $NOP_CB = sub { };
282 377
283sub _txn { 378sub _txn {
284 my ($name, $sub) = @_; 379 my ($name, $sub) = @_;
285 380
286 *{$name} = sub { 381 *{$name} = sub {
287 splice @_, 1, 0, (my $cv = AnyEvent->condvar); 382 my $cv = AE::cv;
288 &$sub;
289 $cv
290 };
291 383
292 *{"$name\_sync"} = sub { 384 splice @_, 1, 0, $cv, sub { $cv->croak ($_[0]{extra_description}) };
293 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
294 &$sub; 385 &$sub;
295 $cv->recv 386 $cv->recv
296 }; 387 };
297}
298 388
299=item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]]) 389 *{"$name\_"} = sub {
390 my ($ok, $err) = pop;
300 391
392 if (ARRAY:: eq ref $ok) {
393 ($ok, $err) = @$ok;
394 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
395 $err = sub { $ok->croak ($_[0]{extra_description}) };
396 } else {
397 my $bt = Carp::longmess "";
398 $err = sub {
399 die "$_[0]{extra_description}$bt";
400 };
401 }
402
403 $ok ||= $NOP_CB;
404
405 splice @_, 1, 0, $ok, $err;
406 &$sub;
407 };
408}
409
410=over 4
411
301=item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]]) 412=item $peers = $fcp->list_peers ([$with_metdata[, $with_volatile]])
302 413
303=cut 414=cut
304 415
305_txn list_peers => sub { 416_txn list_peers => sub {
306 my ($self, $cv, $with_metadata, $with_volatile) = @_; 417 my ($self, $ok, undef, $with_metadata, $with_volatile) = @_;
307 418
308 my @res; 419 my @res;
309 420
310 $self->send_msg (list_peers => 421 $self->send_msg (list_peers =>
311 with_metadata => $with_metadata ? "true" : "false", 422 with_metadata => $with_metadata ? "true" : "false",
312 with_volatile => $with_volatile ? "true" : "false", 423 with_volatile => $with_volatile ? "true" : "false",
313 id_cb => sub { 424 id_cb => sub {
314 my ($self, $type, $kv, $rdata) = @_; 425 my ($self, $type, $kv, $rdata) = @_;
315 426
316 if ($type eq "end_list_peers") { 427 if ($type eq "end_list_peers") {
317 $cv->(\@res); 428 $ok->(\@res);
318 1 429 1
319 } else { 430 } else {
320 push @res, $kv; 431 push @res, $kv;
321 0 432 0
322 } 433 }
323 }, 434 },
324 ); 435 );
325}; 436};
326 437
327=item $cv = $fcp->list_peer_notes ($node_identifier)
328
329=item $notes = $fcp->list_peer_notes_sync ($node_identifier) 438=item $notes = $fcp->list_peer_notes ($node_identifier)
330 439
331=cut 440=cut
332 441
333_txn list_peer_notes => sub { 442_txn list_peer_notes => sub {
334 my ($self, $cv, $node_identifier) = @_; 443 my ($self, $ok, undef, $node_identifier) = @_;
335 444
336 $self->send_msg (list_peer_notes => 445 $self->send_msg (list_peer_notes =>
337 node_identifier => $node_identifier, 446 node_identifier => $node_identifier,
338 id_cb => sub { 447 id_cb => sub {
339 my ($self, $type, $kv, $rdata) = @_; 448 my ($self, $type, $kv, $rdata) = @_;
340 449
341 $cv->($kv); 450 $ok->($kv);
342 1 451 1
343 }, 452 },
344 ); 453 );
345}; 454};
346 455
347=item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
348
349=item $fcp->watch_global_sync ($enabled[, $verbosity_mask]) 456=item $fcp->watch_global ($enabled[, $verbosity_mask])
350 457
351=cut 458=cut
352 459
353_txn watch_global => sub { 460_txn watch_global => sub {
354 my ($self, $cv, $enabled, $verbosity_mask) = @_; 461 my ($self, $ok, $err, $enabled, $verbosity_mask) = @_;
355 462
356 $self->send_msg (watch_global => 463 $self->send_msg (watch_global =>
357 enabled => $enabled ? "true" : "false", 464 enabled => $enabled ? "true" : "false",
358 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (), 465 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
359 ); 466 );
360 467
361 $cv->(); 468 $ok->();
362}; 469};
363 470
364=item $cv = $fcp->list_persistent_requests
365
366=item $reqs = $fcp->list_persistent_requests_sync 471=item $reqs = $fcp->list_persistent_requests
367 472
368=cut 473=cut
369 474
370_txn list_persistent_requests => sub { 475_txn list_persistent_requests => sub {
371 my ($self, $cv) = @_; 476 my ($self, $ok, $err) = @_;
372 477
478 $self->serialise (list_persistent_requests => sub {
479 my ($self, $guard) = @_;
480
373 my %res; 481 my @res;
374 482
375 $self->send_msg ("list_persistent_requests"); 483 $self->send_msg ("list_persistent_requests");
376 484
377 push @{ $self->{queue} }, sub { 485 $self->on (sub {
378 my ($self, $type, $kv, $rdata) = @_; 486 my ($self, $type, $kv, $rdata) = @_;
379 487
488 $guard if 0;
489
380 if ($type eq "end_list_persistent_requests") { 490 if ($type eq "end_list_persistent_requests") {
381 $cv->(\%res); 491 $ok->(\@res);
492 return;
493 } else {
494 my $id = $kv->{identifier};
495
496 if ($type =~ /^persistent_(get|put|put_dir)$/) {
497 push @res, [$type, $kv];
498 }
499 }
500
382 1 501 1
383 } else { 502 });
384 my $id = $kv->{identifier}; 503 });
504};
385 505
386 if ($type =~ /^persistent_(get|put|put_dir)$/) { 506=item $sync = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
387 $res{$id} = { 507
388 type => $1, 508Update either the C<client_token> or C<priority_class> of a request
389 %{ $res{$id} }, 509identified by C<$global> and C<$identifier>, depending on which of
510C<$client_token> and C<$priority_class> are not C<undef>.
511
512=cut
513
514_txn modify_persistent_request => sub {
515 my ($self, $ok, $err, $global, $identifier, $client_token, $priority_class) = @_;
516
517 $self->serialise ($identifier => sub {
518 my ($self, $guard) = @_;
519
520 $self->send_msg (modify_persistent_request =>
521 global => $global ? "true" : "false",
522 identifier => $identifier,
523 defined $client_token ? (client_token => $client_token ) : (),
524 defined $priority_class ? (priority_class => $priority_class) : (),
525 );
526
527 $self->on (sub {
528 my ($self, $type, $kv, @extra) = @_;
529
530 $guard if 0;
531
532 if ($kv->{identifier} eq $identifier) {
533 if ($type eq "persistent_request_modified") {
390 %$kv, 534 $ok->($kv);
535 return;
536 } elsif ($type eq "protocol_error") {
537 $err->($kv);
538 return;
391 }; 539 }
392 } elsif ($type eq "simple_progress") {
393 delete $kv->{pkt_type}; # save memory
394 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
395 } else {
396 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
397 } 540 }
541
398 0 542 1
399 } 543 });
400 }; 544 });
401}; 545};
402 546
403=item $cv = $fcp->remove_request ($global, $identifier)
404
405=item $status = $fcp->remove_request_sync ($global, $identifier)
406
407=cut
408
409_txn remove_request => sub {
410 my ($self, $cv, $global, $identifier) = @_;
411
412 $self->send_msg (remove_request =>
413 global => $global ? "true" : "false",
414 identifier => $identifier,
415 id_cb => sub {
416 my ($self, $type, $kv, $rdata) = @_;
417
418 $cv->($kv);
419 1
420 },
421 );
422};
423
424=item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
425
426=item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
427
428=cut
429
430_txn modify_persistent_request => sub {
431 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
432
433 $self->send_msg (modify_persistent_request =>
434 global => $global ? "true" : "false",
435 defined $client_token ? (client_token => $client_token ) : (),
436 defined $priority_class ? (priority_class => $priority_class) : (),
437 identifier => $identifier,
438 id_cb => sub {
439 my ($self, $type, $kv, $rdata) = @_;
440
441 $cv->($kv);
442 1
443 },
444 );
445};
446
447=item $cv = $fcp->get_plugin_info ($name, $detailed)
448
449=item $info = $fcp->get_plugin_info_sync ($name, $detailed) 547=item $info = $fcp->get_plugin_info ($name, $detailed)
450 548
451=cut 549=cut
452 550
453_txn get_plugin_info => sub { 551_txn get_plugin_info => sub {
454 my ($self, $cv, $name, $detailed) = @_; 552 my ($self, $ok, $err, $name, $detailed) = @_;
455 553
456 $self->send_msg (get_plugin_info => 554 $self->send_msg (get_plugin_info =>
457 plugin_name => $name, 555 plugin_name => $name,
458 detailed => $detailed ? "true" : "false", 556 detailed => $detailed ? "true" : "false",
459 id_cb => sub { 557 id_cb => sub {
460 my ($self, $type, $kv, $rdata) = @_; 558 my ($self, $type, $kv, $rdata) = @_;
461 559
462 $cv->($kv); 560 $ok->($kv);
463 1 561 1
464 }, 562 },
465 ); 563 );
466}; 564};
467 565
468=item $cv = $fcp->client_get ($uri, $identifier, %kv)
469
470=item $status = $fcp->client_get_sync ($uri, $identifier, %kv) 566=item $status = $fcp->client_get ($uri, $identifier, %kv)
471 567
472%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>). 568%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
473 569
474ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries, 570ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
475priority_class, persistence, client_token, global, return_type, 571priority_class, persistence, client_token, global, return_type,
476binary_blob, allowed_mime_types, filename, temp_filename 572binary_blob, allowed_mime_types, filename, temp_filename
477 573
478=cut 574=cut
479 575
480_txn client_get => sub { 576_txn client_get => sub {
481 my ($self, $cv, $uri, $identifier, %kv) = @_; 577 my ($self, $ok, $err, $uri, $identifier, %kv) = @_;
482 578
579 $self->serialise ($identifier => sub {
580 my ($self, $guard) = @_;
581
483 $self->send_msg (client_get => 582 $self->send_msg (client_get =>
484 %kv, 583 %kv,
485 uri => $uri, 584 uri => $uri,
486 identifier => $identifier, 585 identifier => $identifier,
487 id_cb => sub { 586 );
587
588 $self->on (sub {
488 my ($self, $type, $kv, $rdata) = @_; 589 my ($self, $type, $kv, @extra) = @_;
489 590
591 $guard if 0;
592
593 if ($kv->{identifier} eq $identifier) {
594 if ($type eq "persistent_get") {
490 $cv->($kv); 595 $ok->($kv);
596 return;
597 } elsif ($type eq "protocol_error") {
598 $err->($kv);
599 return;
600 }
601 }
602
491 1 603 1
492 }, 604 });
493 ); 605 });
494}; 606};
495 607
496=item $cv = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write) 608=item $status = $fcp->remove_request ($identifier[, $global])
497 609
610Remove the request with the given isdentifier. Returns true if successful,
611false on error.
612
613=cut
614
615_txn remove_request => sub {
616 my ($self, $ok, $err, $identifier, $global) = @_;
617
618 $self->serialise ($identifier => sub {
619 my ($self, $guard) = @_;
620
621 $self->send_msg (remove_request =>
622 identifier => $identifier,
623 global => $global ? "true" : "false",
624 );
625 $self->on (sub {
626 my ($self, $type, $kv, @extra) = @_;
627
628 $guard if 0;
629
630 if ($kv->{identifier} eq $identifier) {
631 if ($type eq "persistent_request_removed") {
632 $ok->(1);
633 return;
634 } elsif ($type eq "protocol_error") {
635 $err->($kv);
636 return;
637 }
638 }
639
640 1
641 });
642 });
643};
644
498=item ($can_read, $can_write) = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write)) 645=item ($can_read, $can_write) = $fcp->test_dda ($local_directory, $remote_directory, $want_read, $want_write))
499 646
500The DDA test in FCP is probably the single most broken protocol - only 647The DDA test in FCP is probably the single most broken protocol - only
501one directory test can be outstanding at any time, and some guessing and 648one directory test can be outstanding at any time, and some guessing and
502heuristics are involved in mangling the paths. 649heuristics are involved in mangling the paths.
503 650
504This function combines C<TestDDARequest> and C<TestDDAResponse> in one 651This function combines C<TestDDARequest> and C<TestDDAResponse> in one
505request, handling file reading and writing as well. 652request, handling file reading and writing as well, and tries very hard to
653do the right thing.
654
655Both C<$local_directory> and C<$remote_directory> must specify the same
656directory - C<$local_directory> is the directory path on the client (where
657L<AnyEvent::FCP> runs) and C<$remote_directory> is the directory path on
658the server (where the freenet node runs). When both are running on the
659same node, the paths are generally identical.
660
661C<$want_read> and C<$want_write> should be set to a true value when you
662want to read (get) files or write (put) files, respectively.
663
664On error, an exception is thrown. Otherwise, C<$can_read> and
665C<$can_write> indicate whether you can reaqd or write to freenet via the
666directory.
506 667
507=cut 668=cut
508 669
509_txn test_dda => sub { 670_txn test_dda => sub {
510 my ($self, $cv, $local, $remote, $want_read, $want_write) = @_; 671 my ($self, $ok, $err, $local, $remote, $want_read, $want_write) = @_;
511 672
512 $self->serialise (test_dda => sub { 673 $self->serialise (test_dda => sub {
513 my ($self, $guard) = @_; 674 my ($self, $guard) = @_;
514 675
515 $self->send_msg (test_dda_request => 676 $self->send_msg (test_dda_request =>
516 directory => $remote, 677 directory => $remote,
517 want_read_directory => $want_read ? "true" : "false", 678 want_read_directory => $want_read ? "true" : "false",
518 want_write_directory => $want_write ? "true" : "false", 679 want_write_directory => $want_write ? "true" : "false",
519 ); 680 );
520 $self->on (sub { 681 $self->on (sub {
521 my ($type, $kv) = @_; 682 my ($self, $type, $kv) = @_;
522 683
523 if ($type eq "test_dda_reply") { 684 if ($type eq "test_dda_reply") {
524 # the filenames are all relative to the server-side directory, 685 # the filenames are all relative to the server-side directory,
525 # which might or might not match $remote anymore, so we 686 # which might or might not match $remote anymore, so we
526 # need to rewrite the paths to be relative to $local 687 # need to rewrite the paths to be relative to $local
536 } 697 }
537 698
538 my %response = (directory => $remote); 699 my %response = (directory => $remote);
539 700
540 if (length $kv->{read_filename}) { 701 if (length $kv->{read_filename}) {
541 warn "$local/$kv->{read_filename}";#d#
542 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") { 702 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
543 sysread $fh, my $buf, -s $fh; 703 sysread $fh, my $buf, -s $fh;
544 $response{read_content} = $buf; 704 $response{read_content} = $buf;
545 } 705 }
546 } 706 }
552 } 712 }
553 713
554 $self->send_msg (test_dda_response => %response); 714 $self->send_msg (test_dda_response => %response);
555 715
556 $self->on (sub { 716 $self->on (sub {
557 my ($type, $kv) = @_; 717 my ($self, $type, $kv) = @_;
558 718
559 $guard if 0; # reference 719 $guard if 0; # reference
560 720
561 if ($type eq "test_dda_complete") { 721 if ($type eq "test_dda_complete") {
562 $cv->( 722 $ok->(
563 $kv->{read_directory_allowed} eq "true", 723 $kv->{read_directory_allowed} eq "true",
564 $kv->{write_directory_allowed} eq "true", 724 $kv->{write_directory_allowed} eq "true",
565 ); 725 );
566 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) { 726 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
567 $cv->croak ($kv->{extra_description}); 727 $err->($kv->{extra_description});
568 return; 728 return;
569 } 729 }
570 730
571 1 731 1
572 }); 732 });
573 733
574 return; 734 return;
575 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) { 735 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
576 $cv->croak ($kv->{extra_description}); 736 $err->($kv);
577 return; 737 return;
578 } 738 }
579 739
580 1 740 1
581 }); 741 });
582 }); 742 });
583}; 743};
584 744
585=back 745=back
586 746
747=head2 REQUEST CACHE
748
749The C<AnyEvent::FCP> class keeps a request cache, where it caches all
750information from requests.
751
752For these messages, it will store a copy of the key-value pairs, together with a C<type> slot,
753in C<< $fcp->{req}{$identifier} >>:
754
755 persistent_get
756 persistent_put
757 persistent_put_dir
758
759This message updates the stored data:
760
761 persistent_request_modified
762
763This message will remove this entry:
764
765 persistent_request_removed
766
767These messages get merged into the cache entry, under their
768type, i.e. a C<simple_progress> message will be stored in C<<
769$fcp->{req}{$identifier}{simple_progress} >>:
770
771 simple_progress # get/put
772
773 uri_generated # put
774 generated_metadata # put
775 started_compression # put
776 finished_compression # put
777 put_failed # put
778 put_fetchable # put
779 put_successful # put
780
781 sending_to_network # get
782 compatibility_mode # get
783 expected_hashes # get
784 expected_mime # get
785 expected_data_length # get
786 get_failed # get
787 data_found # get
788 enter_finite_cooldown # get
789
790In addition, an event (basically a fake message) of type C<request_changed> is generated
791on every change, which will be called as C<< $cb->($fcp, $kv, $type) >>, where C<$type>
792is the type of the original message triggering the change,
793
794To fill this cache with the global queue and keep it updated,
795call C<watch_global> to subscribe to updates, followed by
796C<list_persistent_requests_sync>.
797
798 $fcp->watch_global_sync_; # do not wait
799 $fcp->list_persistent_requests; # wait
800
801To get a better idea of what is stored in the cache, here is an example of
802what might be stored in C<< $fcp->{req}{"Frost-gpl.txt"} >>:
803
804 {
805 identifier => "Frost-gpl.txt",
806 uri => 'CHK@Fnx5kzdrfE,EImdzaVyEWl,AAIC--8/gpl.txt',
807 binary_blob => "false",
808 global => "true",
809 max_retries => -1,
810 max_size => 9223372036854775807,
811 persistence => "forever",
812 priority_class => 3,
813 real_time => "false",
814 return_type => "direct",
815 started => "true",
816 type => "persistent_get",
817 verbosity => 2147483647,
818 sending_to_network => {
819 identifier => "Frost-gpl.txt",
820 global => "true",
821 },
822 compatibility_mode => {
823 identifier => "Frost-gpl.txt",
824 definitive => "true",
825 dont_compress => "false",
826 global => "true",
827 max => "COMPAT_1255",
828 min => "COMPAT_1255",
829 },
830 expected_hashes => {
831 identifier => "Frost-gpl.txt",
832 global => "true",
833 hashes => {
834 ed2k => "d83596f5ee3b7...",
835 md5 => "e0894e4a2a6...",
836 sha1 => "...",
837 sha256 => "...",
838 sha512 => "...",
839 tth => "...",
840 },
841 },
842 expected_mime => {
843 identifier => "Frost-gpl.txt",
844 global => "true",
845 metadata => { content_type => "application/rar" },
846 },
847 expected_data_length => {
848 identifier => "Frost-gpl.txt",
849 data_length => 37576,
850 global => "true",
851 },
852 simple_progress => {
853 identifier => "Frost-gpl.txt",
854 failed => 0,
855 fatally_failed => 0,
856 finalized_total => "true",
857 global => "true",
858 last_progress => 1438639282628,
859 required => 372,
860 succeeded => 102,
861 total => 747,
862 },
863 data_found => {
864 identifier => "Frost-gpl.txt",
865 completion_time => 1438663354026,
866 data_length => 37576,
867 global => "true",
868 metadata => { content_type => "image/jpeg" },
869 startup_time => 1438657196167,
870 },
871 }
872
587=head1 EXAMPLE PROGRAM 873=head1 EXAMPLE PROGRAM
588 874
589 use AnyEvent::FCP; 875 use AnyEvent::FCP;
590 876
591 my $fcp = new AnyEvent::FCP; 877 my $fcp = new AnyEvent::FCP;
592 878
593 # let us look at the global request list 879 # let us look at the global request list
594 $fcp->watch_global (1, 0); 880 $fcp->watch_global_ (1);
595 881
596 # list them, synchronously 882 # list them, synchronously
597 my $req = $fcp->list_persistent_requests_sync; 883 my $req = $fcp->list_persistent_requests;
598 884
599 # go through all requests 885 # go through all requests
886TODO
600 for my $req (values %$req) { 887 for my $req (values %$req) {
601 # skip jobs not directly-to-disk 888 # skip jobs not directly-to-disk
602 next unless $req->{return_type} eq "disk"; 889 next unless $req->{return_type} eq "disk";
603 # skip jobs not issued by FProxy 890 # skip jobs not issued by FProxy
604 next unless $req->{identifier} =~ /^FProxy:/; 891 next unless $req->{identifier} =~ /^FProxy:/;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines