ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.10 by root, Tue Aug 4 00:50:25 2015 UTC vs.
Revision 1.16 by root, Sat Sep 5 13:26:47 2015 UTC

35 35
36 use AnyEvent::FCP; 36 use AnyEvent::FCP;
37 37
38 my $fcp = new AnyEvent::FCP; 38 my $fcp = new AnyEvent::FCP;
39 39
40 $fcp->watch_global_sync (1, 0); 40 $fcp->watch_global (1, 0);
41 my $req = $fcp->list_persistent_requests_sync; 41 my $req = $fcp->list_persistent_requests;
42 42
43TODO
43 for my $req (values %$req) { 44 for my $req (values %$req) {
44 if ($req->{filename} =~ /a/) { 45 if ($req->{filename} =~ /a/) {
45 $fcp->modify_persistent_request_sync (1, $req->{identifier}, undef, 0); 46 $fcp->modify_persistent_request (1, $req->{identifier}, undef, 0);
46 } 47 }
47 } 48 }
48 49
49=head2 IMPORT TAGS 50=head2 IMPORT TAGS
50 51
51Nothing much can be "imported" from this module right now. 52Nothing much can be "imported" from this module right now.
52 53
53=head2 THE AnyEvent::FCP CLASS 54=head1 THE AnyEvent::FCP CLASS
54 55
55=over 4 56=over 4
56 57
57=cut 58=cut
58 59
67use Scalar::Util (); 68use Scalar::Util ();
68 69
69use AnyEvent; 70use AnyEvent;
70use AnyEvent::Handle; 71use AnyEvent::Handle;
71use AnyEvent::Util (); 72use AnyEvent::Util ();
73
74our %TOLC; # tolc cache
72 75
73sub touc($) { 76sub touc($) {
74 local $_ = shift; 77 local $_ = shift;
75 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/; 78 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
76 s/(?:^|_)(.)/\U$1/g; 79 s/(?:^|_)(.)/\U$1/g;
77 $_ 80 $_
78} 81}
79 82
80sub tolc($) { 83sub tolc($) {
81 local $_ = shift; 84 local $_ = shift;
82 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/i; 85 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/;
83 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/i; 86 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/;
84 s/(?<=[a-z])(?=[A-Z])/_/g; 87 s/(?<=[a-z])(?=[A-Z])/_/g;
85 lc 88 lc
86} 89}
87 90
88=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 91=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, name => $name]
89 92
90Create a new FCP connection to the given host and port (default 93Create a new FCP connection to the given host and port (default
91127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). 94127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
92 95
93If no C<name> was specified, then AnyEvent::FCP will generate a 96If no C<name> was specified, then AnyEvent::FCP will generate a
94(hopefully) unique client name for you. 97(hopefully) unique client name for you.
95 98
96You can install a progress callback that is being called with the AnyEvent::FCP
97object, the type, a hashref with key-value pairs and a reference to any received data,
98for all unsolicited messages.
99
100Example:
101
102 sub progress_cb {
103 my ($self, $type, $kv, $rdata) = @_;
104
105 if ($type eq "simple_progress") {
106 warn "$kv->{identifier} $kv->{succeeded}/$kv->{required}\n";
107 }
108 }
109
110=cut 99=cut
111 100
112sub new { 101sub new {
113 my $class = shift; 102 my $class = shift;
103
104 my $rand = join "", map chr 0x21 + rand 94, 1..40; # ~ 262 bits entropy
105
114 my $self = bless { @_ }, $class; 106 my $self = bless {
115 107 host => $ENV{FREDHOST} || "127.0.0.1",
116 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 108 port => $ENV{FREDPORT} || 9481,
117 $self->{port} ||= $ENV{FREDPORT} || 9481; 109 timeout => 3600 * 2,
118 $self->{name} ||= time.rand.rand.rand; # lame 110 name => time.rand.rand.rand, # lame
119 $self->{timeout} ||= 3600*2; 111 @_,
120 $self->{progress} ||= sub { }; 112 queue => [],
121 113 req => {},
122 $self->{id} = "a0"; 114 prefix => "..:aefcpid:$rand:",
115 idseq => "a0",
116 }, $class;
123 117
124 { 118 {
125 Scalar::Util::weaken (my $self = $self); 119 Scalar::Util::weaken (my $self = $self);
126 120
127 $self->{hdl} = new AnyEvent::Handle 121 $self->{hdl} = new AnyEvent::Handle
135 on_eof => $self->{on_eof} || sub { }; 129 on_eof => $self->{on_eof} || sub { };
136 130
137 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 131 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
138 } 132 }
139 133
140 $self->send_msg ( 134 $self->send_msg (client_hello =>
141 client_hello =>
142 name => $self->{name}, 135 name => $self->{name},
143 expected_version => "2.0", 136 expected_version => "2.0",
144 ); 137 );
145 138
146 $self 139 $self
147} 140}
148 141
142sub identifier {
143 $_[0]{prefix} . ++$_[0]{idseq}
144}
145
149sub send_msg { 146sub send_msg {
150 my ($self, $type, %kv) = @_; 147 my ($self, $type, %kv) = @_;
151 148
152 my $data = delete $kv{data}; 149 my $data = delete $kv{data};
153 150
154 if (exists $kv{id_cb}) { 151 if (exists $kv{id_cb}) {
155 my $id = $kv{identifier} ||= ++$self->{id}; 152 my $id = $kv{identifier} ||= $self->identifier;
156 $self->{id}{$id} = delete $kv{id_cb}; 153 $self->{id}{$id} = delete $kv{id_cb};
157 } 154 }
158 155
159 my $msg = (touc $type) . "\012" 156 my $msg = (touc $type) . "\012"
160 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; 157 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
202 push @$queue, $cb; 199 push @$queue, $cb;
203 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) }) 200 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
204 unless $#$queue; 201 unless $#$queue;
205} 202}
206 203
204# how to merge these types into $self->{persistent}
205our %PERSISTENT_TYPE = (
206 persistent_get => sub { %{ $_[1] } = (type => "persistent_get" , %{ $_[2] }) },
207 persistent_put => sub { %{ $_[1] } = (type => "persistent_put" , %{ $_[2] }) },
208 persistent_put_dir => sub { %{ $_[1] } = (type => "persistent_put_dir", %{ $_[2] }) },
209 persistent_request_modified => sub { %{ $_[1] } = (%{ $_[1] }, %{ $_[2] }) },
210 persistent_request_removed => sub { delete $_[0]{req}{$_[2]{identifier}} },
211
212 simple_progress => sub { $_[1]{simple_progress} = $_[2] }, # get/put
213
214 uri_generated => sub { $_[1]{uri_generated} = $_[2] }, # put
215 generated_metadata => sub { $_[1]{generated_metadata} = $_[2] }, # put
216 started_compression => sub { $_[1]{started_compression} = $_[2] }, # put
217 finished_compression => sub { $_[1]{finished_compression} = $_[2] }, # put
218 put_fetchable => sub { $_[1]{put_fetchable} = $_[2] }, # put
219 put_failed => sub { $_[1]{put_failed} = $_[2] }, # put
220 put_successful => sub { $_[1]{put_successful} = $_[2] }, # put
221
222 sending_to_network => sub { $_[1]{sending_to_network} = $_[2] }, # get
223 compatibility_mode => sub { $_[1]{compatibility_mode} = $_[2] }, # get
224 expected_hashes => sub { $_[1]{expected_hashes} = $_[2] }, # get
225 expected_mime => sub { $_[1]{expected_mime} = $_[2] }, # get
226 expected_data_length => sub { $_[1]{expected_data_length} = $_[2] }, # get
227 get_failed => sub { $_[1]{get_failed} = $_[2] }, # get
228 data_found => sub { $_[1]{data_found} = $_[2] }, # get
229 enter_finite_cooldown => sub { $_[1]{enter_finite_cooldown} = $_[2] }, # get
230);
231
232sub recv {
233 my ($self, $type, $kv, @extra) = @_;
234
235 if (my $cb = $PERSISTENT_TYPE{$type}) {
236 my $id = $kv->{identifier};
237 my $req = $_[0]{req}{$id} ||= {};
238 $cb->($self, $req, $kv);
239 $self->recv (request_changed => $kv, $type, @extra);
240 }
241
242 my $on = $self->{on};
243 for (0 .. $#$on) {
244 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
245 splice @$on, $_, 1 unless defined $res;
246 return;
247 }
248 }
249
250 if (my $cb = $self->{queue}[0]) {
251 $cb->($self, $type, $kv, @extra)
252 and shift @{ $self->{queue} };
253 } else {
254 $self->default_recv ($type, $kv, @extra);
255 }
256}
257
207sub on_read { 258sub on_read {
208 my ($self) = @_; 259 my ($self) = @_;
209 260
210 my $type; 261 my ($k, $v, $type);
211 my %kv; 262 my %kv;
212 my $rdata; 263 my $rdata;
213 264
214 my $done_cb = sub { 265 my $hdr_cb; $hdr_cb = sub {
215 $kv{pkt_type} = $type; 266 if (($v = index $_[1], "=") >= 0) {
267 $k = substr $_[1], 0, $v;
268 $v = substr $_[1], $v + 1;
269 $k = ($TOLC{$k} ||= tolc $k);
216 270
217 my $on = $self->{on}; 271 if ($k !~ /\./) {
218 for (0 .. $#$on) { 272 # special case common case, for performance only
219 unless (my $res = $on->[$_]($self, $type, \%kv, $rdata)) { 273 $kv{$k} = $v;
220 splice @$on, $_, 1 unless defined $res; 274 } else {
275 my @k = split /\./, $k;
276 my $ro = \\%kv;
277
278 while (@k) {
279 $k = shift @k;
280 if ($k =~ /^\d+$/) {
281 $ro = \$$ro->[$k];
282 } else {
283 $ro = \$$ro->{$k};
284 }
221 return; 285 }
286
287 $$ro = $v;
222 } 288 }
223 }
224
225 if (my $cb = $self->{queue}[0]) {
226 $cb->($self, $type, \%kv, $rdata)
227 and shift @{ $self->{queue} };
228 } else {
229 $self->default_recv ($type, \%kv, $rdata);
230 }
231 };
232
233 my $hdr_cb; $hdr_cb = sub {
234 if ($_[1] =~ /^([^=]+)=(.*)$/) {
235 my ($k, $v) = ($1, $2);
236 my @k = split /\./, tolc $k;
237 my $ro = \\%kv;
238
239 while (@k) {
240 my $k = shift @k;
241 if ($k =~ /^\d+$/) {
242 $ro = \$$ro->[$k];
243 } else {
244 $ro = \$$ro->{$k};
245 }
246 }
247
248 $$ro = $v;
249 289
250 $_[0]->push_read (line => $hdr_cb); 290 $_[0]->push_read (line => $hdr_cb);
251 } elsif ($_[1] eq "Data") { 291 } elsif ($_[1] eq "Data") {
252 $_[0]->push_read (chunk => delete $kv{data_length}, sub { 292 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
253 $rdata = \$_[1]; 293 $rdata = \$_[1];
254 $done_cb->(); 294 $self->recv ($type, \%kv, $rdata);
255 }); 295 });
256 } elsif ($_[1] eq "EndMessage") { 296 } elsif ($_[1] eq "EndMessage") {
257 $done_cb->(); 297 $self->recv ($type, \%kv);
258 } else { 298 } else {
259 die "protocol error, expected message end, got $_[1]\n";#d# 299 die "protocol error, expected message end, got $_[1]\n";#d#
260 } 300 }
261 }; 301 };
262 302
263 $self->{hdl}->push_read (line => sub { 303 $self->{hdl}->push_read (line => sub {
264 $type = tolc $_[1]; 304 $type = ($TOLC{$_[1]} ||= tolc $_[1]);
265 $_[0]->push_read (line => $hdr_cb); 305 $_[0]->push_read (line => $hdr_cb);
266 }); 306 });
267} 307}
268 308
269sub default_recv { 309sub default_recv {
272 if ($type eq "node_hello") { 312 if ($type eq "node_hello") {
273 $self->{node_hello} = $kv; 313 $self->{node_hello} = $kv;
274 } elsif (exists $self->{id}{$kv->{identifier}}) { 314 } elsif (exists $self->{id}{$kv->{identifier}}) {
275 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) 315 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
276 and delete $self->{id}{$kv->{identifier}}; 316 and delete $self->{id}{$kv->{identifier}};
277 } else {
278 &{ $self->{progress} };
279 } 317 }
280} 318}
319
320=back
321
322=head2 FCP REQUESTS
323
324The following methods implement various requests. Most of them map
325directory to the FCP message of the same name. The added benefit of
326these over sending requests yourself is that they handle the necessary
327serialisation, protocol quirks, and replies.
328
329All of them exist in two versions, the variant shown in this manpage, and
330a variant with an extra C<_> at the end, and an extra C<$cb> argument. The
331version as shown is I<synchronous> - it will wait for any replies, and
332either return the reply, or croak with an error. The underscore variant
333returns immediately and invokes one or more callbacks or condvars later.
334
335For example, the call
336
337 $info = $fcp->get_plugin_info ($name, $detailed);
338
339Also comes in this underscore variant:
340
341 $fcp->get_plugin_info_ ($name, $detailed, $cb);
342
343You can thinbk of the underscore as a kind of continuation indicator - the
344normal function waits and returns with the data, the C<_> indicates that
345you pass the continuation yourself, and the continuation will be invoked
346with the results.
347
348This callback/continuation argument (C<$cb>) can come in three forms itself:
349
350=over 4
351
352=item A code reference (or rather anything not matching some other alternative)
353
354This code reference will be invoked with the result on success. On an
355error, it will die (in the event loop) with a backtrace of the call site.
356
357This is a popular choice, but it makes handling errors hard - make sure
358you never generate protocol errors!
359
360=item A condvar (as returned by e.g. C<< AnyEvent->condvar >>)
361
362When a condvar is passed, it is sent (C<< $cv->send ($results) >>) the
363results when the request has finished. Should an error occur, the error
364will instead result in C<< $cv->croak ($error) >>.
365
366This is also a popular choice.
367
368=item An array with two callbacks C<[$success, $failure]>
369
370The C<$success> callback will be invoked with the results, while the
371C<$failure> callback will be invoked on any errors.
372
373=item C<undef>
374
375This is the same thing as specifying C<sub { }> as callback, i.e. on
376success, the results are ignored, while on failure, you the module dies
377with a backtrace.
378
379This is good for quick scripts, or when you really aren't interested in
380the results.
381
382=back
383
384=cut
385
386our $NOP_CB = sub { };
281 387
282sub _txn { 388sub _txn {
283 my ($name, $sub) = @_; 389 my ($name, $sub) = @_;
284 390
285 *{$name} = sub { 391 *{$name} = sub {
286 splice @_, 1, 0, (my $cv = AnyEvent->condvar); 392 my $cv = AE::cv;
287 &$sub;
288 $cv
289 };
290 393
291 *{"$name\_sync"} = sub { 394 splice @_, 1, 0, $cv, sub { $cv->croak ($_[0]{extra_description}) };
292 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
293 &$sub; 395 &$sub;
294 $cv->recv 396 $cv->recv
295 }; 397 };
296}
297 398
298=item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]]) 399 *{"$name\_"} = sub {
400 my ($ok, $err) = pop;
299 401
402 if (ARRAY:: eq ref $ok) {
403 ($ok, $err) = @$ok;
404 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
405 $err = sub { $ok->croak ($_[0]{extra_description}) };
406 } else {
407 my $bt = Carp::longmess "";
408 $err = sub {
409 die "$_[0]{code_description} ($_[0]{extra_description})$bt";
410 };
411 }
412
413 $ok ||= $NOP_CB;
414
415 splice @_, 1, 0, $ok, $err;
416 &$sub;
417 };
418}
419
420=over 4
421
300=item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]]) 422=item $peers = $fcp->list_peers ([$with_metdata[, $with_volatile]])
301 423
302=cut 424=cut
303 425
304_txn list_peers => sub { 426_txn list_peers => sub {
305 my ($self, $cv, $with_metadata, $with_volatile) = @_; 427 my ($self, $ok, undef, $with_metadata, $with_volatile) = @_;
306 428
307 my @res; 429 my @res;
308 430
309 $self->send_msg (list_peers => 431 $self->send_msg (list_peers =>
310 with_metadata => $with_metadata ? "true" : "false", 432 with_metadata => $with_metadata ? "true" : "false",
311 with_volatile => $with_volatile ? "true" : "false", 433 with_volatile => $with_volatile ? "true" : "false",
312 id_cb => sub { 434 id_cb => sub {
313 my ($self, $type, $kv, $rdata) = @_; 435 my ($self, $type, $kv, $rdata) = @_;
314 436
315 if ($type eq "end_list_peers") { 437 if ($type eq "end_list_peers") {
316 $cv->(\@res); 438 $ok->(\@res);
317 1 439 1
318 } else { 440 } else {
319 push @res, $kv; 441 push @res, $kv;
320 0 442 0
321 } 443 }
322 }, 444 },
323 ); 445 );
324}; 446};
325 447
326=item $cv = $fcp->list_peer_notes ($node_identifier)
327
328=item $notes = $fcp->list_peer_notes_sync ($node_identifier) 448=item $notes = $fcp->list_peer_notes ($node_identifier)
329 449
330=cut 450=cut
331 451
332_txn list_peer_notes => sub { 452_txn list_peer_notes => sub {
333 my ($self, $cv, $node_identifier) = @_; 453 my ($self, $ok, undef, $node_identifier) = @_;
334 454
335 $self->send_msg (list_peer_notes => 455 $self->send_msg (list_peer_notes =>
336 node_identifier => $node_identifier, 456 node_identifier => $node_identifier,
337 id_cb => sub { 457 id_cb => sub {
338 my ($self, $type, $kv, $rdata) = @_; 458 my ($self, $type, $kv, $rdata) = @_;
339 459
340 $cv->($kv); 460 $ok->($kv);
341 1 461 1
342 }, 462 },
343 ); 463 );
344}; 464};
345 465
346=item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
347
348=item $fcp->watch_global_sync ($enabled[, $verbosity_mask]) 466=item $fcp->watch_global ($enabled[, $verbosity_mask])
349 467
350=cut 468=cut
351 469
352_txn watch_global => sub { 470_txn watch_global => sub {
353 my ($self, $cv, $enabled, $verbosity_mask) = @_; 471 my ($self, $ok, $err, $enabled, $verbosity_mask) = @_;
354 472
355 $self->send_msg (watch_global => 473 $self->send_msg (watch_global =>
356 enabled => $enabled ? "true" : "false", 474 enabled => $enabled ? "true" : "false",
357 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (), 475 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
358 ); 476 );
359 477
360 $cv->(); 478 $ok->();
361}; 479};
362 480
363=item $cv = $fcp->list_persistent_requests
364
365=item $reqs = $fcp->list_persistent_requests_sync 481=item $reqs = $fcp->list_persistent_requests
366 482
367=cut 483=cut
368 484
369_txn list_persistent_requests => sub { 485_txn list_persistent_requests => sub {
370 my ($self, $cv) = @_; 486 my ($self, $ok, $err) = @_;
371 487
372 $self->serialise (list_persistent_requests => sub { 488 $self->serialise (list_persistent_requests => sub {
373 my ($self, $guard) = @_; 489 my ($self, $guard) = @_;
374 490
375 my %res; 491 my @res;
376 492
377 $self->send_msg ("list_persistent_requests"); 493 $self->send_msg ("list_persistent_requests");
378 494
379 $self->on (sub { 495 $self->on (sub {
380 my ($self, $type, $kv, $rdata) = @_; 496 my ($self, $type, $kv, $rdata) = @_;
381 497
382 $guard if 0; 498 $guard if 0;
383 499
384 if ($type eq "end_list_persistent_requests") { 500 if ($type eq "end_list_persistent_requests") {
385 $cv->(\%res); 501 $ok->(\@res);
386 return; 502 return;
387 } else { 503 } else {
388 my $id = $kv->{identifier}; 504 my $id = $kv->{identifier};
389 505
390 if ($type =~ /^persistent_(get|put|put_dir)$/) { 506 if ($type =~ /^persistent_(get|put|put_dir)$/) {
391 $res{$id} = { 507 push @res, [$type, $kv];
392 type => $1,
393 %{ $res{$id} },
394 %$kv,
395 };
396 } elsif ($type eq "simple_progress") {
397 delete $kv->{pkt_type}; # save memory
398 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
399 } else {
400 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
401 } 508 }
402 } 509 }
403 510
404 1 511 1
405 }); 512 });
406 }); 513 });
407}; 514};
408 515
409=item $cv = $fcp->remove_request ($global, $identifier) 516=item $sync = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
410 517
411=item $status = $fcp->remove_request_sync ($global, $identifier) 518Update either the C<client_token> or C<priority_class> of a request
519identified by C<$global> and C<$identifier>, depending on which of
520C<$client_token> and C<$priority_class> are not C<undef>.
412 521
413=cut 522=cut
414 523
415_txn remove_request => sub { 524_txn modify_persistent_request => sub {
416 my ($self, $cv, $global, $identifier) = @_; 525 my ($self, $ok, $err, $global, $identifier, $client_token, $priority_class) = @_;
417 526
527 $self->serialise ($identifier => sub {
528 my ($self, $guard) = @_;
529
418 $self->send_msg (remove_request => 530 $self->send_msg (modify_persistent_request =>
419 global => $global ? "true" : "false", 531 global => $global ? "true" : "false",
420 identifier => $identifier, 532 identifier => $identifier,
421 id_cb => sub { 533 defined $client_token ? (client_token => $client_token ) : (),
534 defined $priority_class ? (priority_class => $priority_class) : (),
535 );
536
537 $self->on (sub {
422 my ($self, $type, $kv, $rdata) = @_; 538 my ($self, $type, $kv, @extra) = @_;
423 539
540 $guard if 0;
541
542 if ($kv->{identifier} eq $identifier) {
543 if ($type eq "persistent_request_modified") {
424 $cv->($kv); 544 $ok->($kv);
545 return;
546 } elsif ($type eq "protocol_error") {
547 $err->($kv);
548 return;
549 }
550 }
551
425 1 552 1
426 }, 553 });
427 ); 554 });
428}; 555};
429 556
430=item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
431
432=item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
433
434=cut
435
436_txn modify_persistent_request => sub {
437 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
438
439 $self->send_msg (modify_persistent_request =>
440 global => $global ? "true" : "false",
441 defined $client_token ? (client_token => $client_token ) : (),
442 defined $priority_class ? (priority_class => $priority_class) : (),
443 identifier => $identifier,
444 id_cb => sub {
445 my ($self, $type, $kv, $rdata) = @_;
446
447 $cv->($kv);
448 1
449 },
450 );
451};
452
453=item $cv = $fcp->get_plugin_info ($name, $detailed)
454
455=item $info = $fcp->get_plugin_info_sync ($name, $detailed) 557=item $info = $fcp->get_plugin_info ($name, $detailed)
456 558
457=cut 559=cut
458 560
459_txn get_plugin_info => sub { 561_txn get_plugin_info => sub {
460 my ($self, $cv, $name, $detailed) = @_; 562 my ($self, $ok, $err, $name, $detailed) = @_;
563
564 my $id = $self->identifier;
461 565
462 $self->send_msg (get_plugin_info => 566 $self->send_msg (get_plugin_info =>
567 identifier => $id,
463 plugin_name => $name, 568 plugin_name => $name,
464 detailed => $detailed ? "true" : "false", 569 detailed => $detailed ? "true" : "false",
465 id_cb => sub {
466 my ($self, $type, $kv, $rdata) = @_;
467
468 $cv->($kv);
469 1
470 },
471 ); 570 );
571 $self->on (sub {
572 my ($self, $type, $kv) = @_;
573
574 if ($kv->{identifier} eq $id) {
575 if ($type eq "get_plugin_info") {
576 $ok->($kv);
577 } else {
578 $err->($kv, $type);
579 }
580 return;
581 }
582
583 1
584 });
472}; 585};
473 586
474=item $cv = $fcp->client_get ($uri, $identifier, %kv)
475
476=item $status = $fcp->client_get_sync ($uri, $identifier, %kv) 587=item $status = $fcp->client_get ($uri, $identifier, %kv)
477 588
478%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>). 589%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
479 590
480ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries, 591ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
481priority_class, persistence, client_token, global, return_type, 592priority_class, persistence, client_token, global, return_type,
482binary_blob, allowed_mime_types, filename, temp_filename 593binary_blob, allowed_mime_types, filename, temp_filename
483 594
484=cut 595=cut
485 596
486_txn client_get => sub { 597_txn client_get => sub {
487 my ($self, $cv, $uri, $identifier, %kv) = @_; 598 my ($self, $ok, $err, $uri, $identifier, %kv) = @_;
488 599
600 $self->serialise ($identifier => sub {
601 my ($self, $guard) = @_;
602
489 $self->send_msg (client_get => 603 $self->send_msg (client_get =>
490 %kv, 604 %kv,
491 uri => $uri, 605 uri => $uri,
492 identifier => $identifier, 606 identifier => $identifier,
493 id_cb => sub { 607 );
608
609 $self->on (sub {
494 my ($self, $type, $kv, $rdata) = @_; 610 my ($self, $type, $kv, @extra) = @_;
495 611
612 $guard if 0;
613
614 if ($kv->{identifier} eq $identifier) {
615 if ($type eq "persistent_get") {
496 $cv->($kv); 616 $ok->($kv);
617 return;
618 } elsif ($type eq "protocol_error") {
619 $err->($kv);
620 return;
621 }
622 }
623
497 1 624 1
498 }, 625 });
499 ); 626 });
500}; 627};
501 628
502=item $cv = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write) 629=item $status = $fcp->remove_request ($identifier[, $global])
503 630
631Remove the request with the given isdentifier. Returns true if successful,
632false on error.
633
634=cut
635
636_txn remove_request => sub {
637 my ($self, $ok, $err, $identifier, $global) = @_;
638
639 $self->serialise ($identifier => sub {
640 my ($self, $guard) = @_;
641
642 $self->send_msg (remove_request =>
643 identifier => $identifier,
644 global => $global ? "true" : "false",
645 );
646 $self->on (sub {
647 my ($self, $type, $kv, @extra) = @_;
648
649 $guard if 0;
650
651 if ($kv->{identifier} eq $identifier) {
652 if ($type eq "persistent_request_removed") {
653 $ok->(1);
654 return;
655 } elsif ($type eq "protocol_error") {
656 $err->($kv);
657 return;
658 }
659 }
660
661 1
662 });
663 });
664};
665
504=item ($can_read, $can_write) = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write)) 666=item ($can_read, $can_write) = $fcp->test_dda ($local_directory, $remote_directory, $want_read, $want_write))
505 667
506The DDA test in FCP is probably the single most broken protocol - only 668The DDA test in FCP is probably the single most broken protocol - only
507one directory test can be outstanding at any time, and some guessing and 669one directory test can be outstanding at any time, and some guessing and
508heuristics are involved in mangling the paths. 670heuristics are involved in mangling the paths.
509 671
510This function combines C<TestDDARequest> and C<TestDDAResponse> in one 672This function combines C<TestDDARequest> and C<TestDDAResponse> in one
511request, handling file reading and writing as well. 673request, handling file reading and writing as well, and tries very hard to
674do the right thing.
675
676Both C<$local_directory> and C<$remote_directory> must specify the same
677directory - C<$local_directory> is the directory path on the client (where
678L<AnyEvent::FCP> runs) and C<$remote_directory> is the directory path on
679the server (where the freenet node runs). When both are running on the
680same node, the paths are generally identical.
681
682C<$want_read> and C<$want_write> should be set to a true value when you
683want to read (get) files or write (put) files, respectively.
684
685On error, an exception is thrown. Otherwise, C<$can_read> and
686C<$can_write> indicate whether you can reaqd or write to freenet via the
687directory.
512 688
513=cut 689=cut
514 690
515_txn test_dda => sub { 691_txn test_dda => sub {
516 my ($self, $cv, $local, $remote, $want_read, $want_write) = @_; 692 my ($self, $ok, $err, $local, $remote, $want_read, $want_write) = @_;
517 693
518 $self->serialise (test_dda => sub { 694 $self->serialise (test_dda => sub {
519 my ($self, $guard) = @_; 695 my ($self, $guard) = @_;
520 696
521 $self->send_msg (test_dda_request => 697 $self->send_msg (test_dda_request =>
542 } 718 }
543 719
544 my %response = (directory => $remote); 720 my %response = (directory => $remote);
545 721
546 if (length $kv->{read_filename}) { 722 if (length $kv->{read_filename}) {
547 warn "$local/$kv->{read_filename}";#d#
548 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") { 723 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
549 sysread $fh, my $buf, -s $fh; 724 sysread $fh, my $buf, -s $fh;
550 $response{read_content} = $buf; 725 $response{read_content} = $buf;
551 } 726 }
552 } 727 }
563 my ($self, $type, $kv) = @_; 738 my ($self, $type, $kv) = @_;
564 739
565 $guard if 0; # reference 740 $guard if 0; # reference
566 741
567 if ($type eq "test_dda_complete") { 742 if ($type eq "test_dda_complete") {
568 $cv->( 743 $ok->(
569 $kv->{read_directory_allowed} eq "true", 744 $kv->{read_directory_allowed} eq "true",
570 $kv->{write_directory_allowed} eq "true", 745 $kv->{write_directory_allowed} eq "true",
571 ); 746 );
572 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) { 747 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
573 $cv->croak ($kv->{extra_description}); 748 $err->($kv->{extra_description});
574 return; 749 return;
575 } 750 }
576 751
577 1 752 1
578 }); 753 });
579 754
580 return; 755 return;
581 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) { 756 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
582 $cv->croak ($kv->{extra_description}); 757 $err->($kv);
583 return; 758 return;
584 } 759 }
585 760
586 1 761 1
587 }); 762 });
588 }); 763 });
589}; 764};
590 765
591=back 766=back
592 767
768=head2 REQUEST CACHE
769
770The C<AnyEvent::FCP> class keeps a request cache, where it caches all
771information from requests.
772
773For these messages, it will store a copy of the key-value pairs, together with a C<type> slot,
774in C<< $fcp->{req}{$identifier} >>:
775
776 persistent_get
777 persistent_put
778 persistent_put_dir
779
780This message updates the stored data:
781
782 persistent_request_modified
783
784This message will remove this entry:
785
786 persistent_request_removed
787
788These messages get merged into the cache entry, under their
789type, i.e. a C<simple_progress> message will be stored in C<<
790$fcp->{req}{$identifier}{simple_progress} >>:
791
792 simple_progress # get/put
793
794 uri_generated # put
795 generated_metadata # put
796 started_compression # put
797 finished_compression # put
798 put_failed # put
799 put_fetchable # put
800 put_successful # put
801
802 sending_to_network # get
803 compatibility_mode # get
804 expected_hashes # get
805 expected_mime # get
806 expected_data_length # get
807 get_failed # get
808 data_found # get
809 enter_finite_cooldown # get
810
811In addition, an event (basically a fake message) of type C<request_changed> is generated
812on every change, which will be called as C<< $cb->($fcp, $kv, $type) >>, where C<$type>
813is the type of the original message triggering the change,
814
815To fill this cache with the global queue and keep it updated,
816call C<watch_global> to subscribe to updates, followed by
817C<list_persistent_requests_sync>.
818
819 $fcp->watch_global_sync_; # do not wait
820 $fcp->list_persistent_requests; # wait
821
822To get a better idea of what is stored in the cache, here is an example of
823what might be stored in C<< $fcp->{req}{"Frost-gpl.txt"} >>:
824
825 {
826 identifier => "Frost-gpl.txt",
827 uri => 'CHK@Fnx5kzdrfE,EImdzaVyEWl,AAIC--8/gpl.txt',
828 binary_blob => "false",
829 global => "true",
830 max_retries => -1,
831 max_size => 9223372036854775807,
832 persistence => "forever",
833 priority_class => 3,
834 real_time => "false",
835 return_type => "direct",
836 started => "true",
837 type => "persistent_get",
838 verbosity => 2147483647,
839 sending_to_network => {
840 identifier => "Frost-gpl.txt",
841 global => "true",
842 },
843 compatibility_mode => {
844 identifier => "Frost-gpl.txt",
845 definitive => "true",
846 dont_compress => "false",
847 global => "true",
848 max => "COMPAT_1255",
849 min => "COMPAT_1255",
850 },
851 expected_hashes => {
852 identifier => "Frost-gpl.txt",
853 global => "true",
854 hashes => {
855 ed2k => "d83596f5ee3b7...",
856 md5 => "e0894e4a2a6...",
857 sha1 => "...",
858 sha256 => "...",
859 sha512 => "...",
860 tth => "...",
861 },
862 },
863 expected_mime => {
864 identifier => "Frost-gpl.txt",
865 global => "true",
866 metadata => { content_type => "application/rar" },
867 },
868 expected_data_length => {
869 identifier => "Frost-gpl.txt",
870 data_length => 37576,
871 global => "true",
872 },
873 simple_progress => {
874 identifier => "Frost-gpl.txt",
875 failed => 0,
876 fatally_failed => 0,
877 finalized_total => "true",
878 global => "true",
879 last_progress => 1438639282628,
880 required => 372,
881 succeeded => 102,
882 total => 747,
883 },
884 data_found => {
885 identifier => "Frost-gpl.txt",
886 completion_time => 1438663354026,
887 data_length => 37576,
888 global => "true",
889 metadata => { content_type => "image/jpeg" },
890 startup_time => 1438657196167,
891 },
892 }
893
593=head1 EXAMPLE PROGRAM 894=head1 EXAMPLE PROGRAM
594 895
595 use AnyEvent::FCP; 896 use AnyEvent::FCP;
596 897
597 my $fcp = new AnyEvent::FCP; 898 my $fcp = new AnyEvent::FCP;
598 899
599 # let us look at the global request list 900 # let us look at the global request list
600 $fcp->watch_global (1, 0); 901 $fcp->watch_global_ (1);
601 902
602 # list them, synchronously 903 # list them, synchronously
603 my $req = $fcp->list_persistent_requests_sync; 904 my $req = $fcp->list_persistent_requests;
604 905
605 # go through all requests 906 # go through all requests
907TODO
606 for my $req (values %$req) { 908 for my $req (values %$req) {
607 # skip jobs not directly-to-disk 909 # skip jobs not directly-to-disk
608 next unless $req->{return_type} eq "disk"; 910 next unless $req->{return_type} eq "disk";
609 # skip jobs not issued by FProxy 911 # skip jobs not issued by FProxy
610 next unless $req->{identifier} =~ /^FProxy:/; 912 next unless $req->{identifier} =~ /^FProxy:/;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines