ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.6 by root, Mon May 31 06:27:37 2010 UTC vs.
Revision 1.12 by root, Sat Aug 8 04:02:48 2015 UTC

6 6
7 use AnyEvent::FCP; 7 use AnyEvent::FCP;
8 8
9 my $fcp = new AnyEvent::FCP; 9 my $fcp = new AnyEvent::FCP;
10 10
11# transactions return condvars 11 # transactions return condvars
12 my $lp_cv = $fcp->list_peers; 12 my $lp_cv = $fcp->list_peers;
13 my $pr_cv = $fcp->list_persistent_requests; 13 my $pr_cv = $fcp->list_persistent_requests;
14 14
15 my $peers = $lp_cv->recv; 15 my $peers = $lp_cv->recv;
16 my $reqs = $pr_cv->recv; 16 my $reqs = $pr_cv->recv;
26The module uses L<AnyEvent> to find a suitable event module. 26The module uses L<AnyEvent> to find a suitable event module.
27 27
28Only very little is implemented, ask if you need more, and look at the 28Only very little is implemented, ask if you need more, and look at the
29example program later in this section. 29example program later in this section.
30 30
31=head2 EXAMPLE
32
33This example fetches the download list and sets the priority of all files
34with "a" in their name to "emergency":
35
36 use AnyEvent::FCP;
37
38 my $fcp = new AnyEvent::FCP;
39
40 $fcp->watch_global (1, 0);
41 my $req = $fcp->list_persistent_requests;
42
43TODO
44 for my $req (values %$req) {
45 if ($req->{filename} =~ /a/) {
46 $fcp->modify_persistent_request (1, $req->{identifier}, undef, 0);
47 }
48 }
49
31=head2 IMPORT TAGS 50=head2 IMPORT TAGS
32 51
33Nothing much can be "imported" from this module right now. 52Nothing much can be "imported" from this module right now.
34 53
35=head2 THE AnyEvent::FCP CLASS 54=head1 THE AnyEvent::FCP CLASS
36 55
37=over 4 56=over 4
38 57
39=cut 58=cut
40 59
42 61
43use common::sense; 62use common::sense;
44 63
45use Carp; 64use Carp;
46 65
47our $VERSION = '0.21'; 66our $VERSION = '0.3';
48 67
49use Scalar::Util (); 68use Scalar::Util ();
50 69
51use AnyEvent; 70use AnyEvent;
52use AnyEvent::Handle; 71use AnyEvent::Handle;
72use AnyEvent::Util ();
53 73
54sub touc($) { 74sub touc($) {
55 local $_ = shift; 75 local $_ = shift;
56 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime)(?:_|$))/\U$1/; 76 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
57 s/(?:^|_)(.)/\U$1/g; 77 s/(?:^|_)(.)/\U$1/g;
58 $_ 78 $_
59} 79}
60 80
61sub tolc($) { 81sub tolc($) {
62 local $_ = shift; 82 local $_ = shift;
63 1 while s/(SVK|CHK|URI|FCP|DS|MIME)([^_])/$1\_$2/i; 83 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/;
64 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME)/$1\_$2/i; 84 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/;
65 s/(?<=[a-z])(?=[A-Z])/_/g; 85 s/(?<=[a-z])(?=[A-Z])/_/g;
66 lc 86 lc
67} 87}
68 88
69=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 89=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, name => $name]
70 90
71Create a new FCP connection to the given host and port (default 91Create a new FCP connection to the given host and port (default
72127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). 92127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
73 93
74If no C<name> was specified, then AnyEvent::FCP will generate a 94If no C<name> was specified, then AnyEvent::FCP will generate a
75(hopefully) unique client name for you. 95(hopefully) unique client name for you.
76 96
77You can install a progress callback that is being called with the AnyEvent::FCP
78object, the type, a hashref with key-value pairs and a reference to any received data,
79for all unsolicited messages.
80
81Example:
82
83 sub progress_cb {
84 my ($self, $type, $kv, $rdata) = @_;
85
86 if ($type eq "simple_progress") {
87 warn "$kv->{identifier} $kv->{succeeded}/$kv->{required}\n";
88 }
89 }
90
91=cut 97=cut
92 98
93sub new { 99sub new {
94 my $class = shift; 100 my $class = shift;
95 my $self = bless { @_ }, $class; 101 my $self = bless {
96 102 host => $ENV{FREDHOST} || "127.0.0.1",
97 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 103 port => $ENV{FREDPORT} || 9481,
98 $self->{port} ||= $ENV{FREDPORT} || 9481; 104 timeout => 3600 * 2,
99 $self->{name} ||= time.rand.rand.rand; # lame 105 name => time.rand.rand.rand, # lame
100 $self->{timeout} ||= 600; 106 @_,
101 $self->{progress} ||= sub { }; 107 queue => [],
102 108 req => {},
103 $self->{id} = "a0"; 109 id => "a0",
110 }, $class;
104 111
105 { 112 {
106 Scalar::Util::weaken (my $self = $self); 113 Scalar::Util::weaken (my $self = $self);
107 114
108 $self->{hdl} = new AnyEvent::Handle 115 $self->{hdl} = new AnyEvent::Handle
109 connect => [$self->{host} => $self->{port}], 116 connect => [$self->{host} => $self->{port}],
110 timeout => $self->{timeout}, 117 timeout => $self->{timeout},
111 on_error => sub { 118 on_error => sub {
112 warn "<@_>\n"; 119 warn "@_\n";#d#
113 exit 1; 120 exit 1;
114 }, 121 },
115 on_read => sub { $self->on_read (@_) }, 122 on_read => sub { $self->on_read (@_) },
116 on_eof => $self->{on_eof} || sub { }; 123 on_eof => $self->{on_eof} || sub { };
117 124
118 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 125 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
119 } 126 }
120 127
121 $self->send_msg ( 128 $self->send_msg (client_hello =>
122 client_hello =>
123 name => $self->{name}, 129 name => $self->{name},
124 expected_version => "2.0", 130 expected_version => "2.0",
125 ); 131 );
126 132
127 $self 133 $self
128} 134}
131 my ($self, $type, %kv) = @_; 137 my ($self, $type, %kv) = @_;
132 138
133 my $data = delete $kv{data}; 139 my $data = delete $kv{data};
134 140
135 if (exists $kv{id_cb}) { 141 if (exists $kv{id_cb}) {
136 my $id = $kv{identifier} || ++$self->{id}; 142 my $id = $kv{identifier} ||= ++$self->{id};
137 $self->{id}{$id} = delete $kv{id_cb}; 143 $self->{id}{$id} = delete $kv{id_cb};
138 $kv{identifier} = $id;
139 } 144 }
140 145
141 my $msg = (touc $type) . "\012" 146 my $msg = (touc $type) . "\012"
142 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; 147 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
143 148
155 } 160 }
156 161
157 $self->{hdl}->push_write ($msg); 162 $self->{hdl}->push_write ($msg);
158} 163}
159 164
165sub on {
166 my ($self, $cb) = @_;
167
168 # cb return undef - message eaten, remove cb
169 # cb return 0 - message eaten
170 # cb return 1 - pass to next
171
172 push @{ $self->{on} }, $cb;
173}
174
175sub _push_queue {
176 my ($self, $queue) = @_;
177
178 shift @$queue;
179 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
180 if @$queue;
181}
182
183# lock so only one $type (arbitrary string) is in flight,
184# to work around horribly misdesigned protocol.
185sub serialise {
186 my ($self, $type, $cb) = @_;
187
188 my $queue = $self->{serialise}{$type} ||= [];
189 push @$queue, $cb;
190 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
191 unless $#$queue;
192}
193
194# how to merge these types into $self->{persistent}
195our %PERSISTENT_TYPE = (
196 persistent_get => sub { %{ $_[1] } = (type => "persistent_get" , %{ $_[2] }) },
197 persistent_put => sub { %{ $_[1] } = (type => "persistent_put" , %{ $_[2] }) },
198 persistent_put_dir => sub { %{ $_[1] } = (type => "persistent_put_dir", %{ $_[2] }) },
199 persistent_request_modified => sub { %{ $_[1] } = (%{ $_[1] }, %{ $_[2] }) },
200 persistent_request_removed => sub { delete $_[0]{req}{$_[2]{identifier}} },
201
202 simple_progress => sub { $_[1]{simple_progress} = $_[2] }, # get/put
203
204 uri_generated => sub { $_[1]{uri_generated} = $_[2] }, # put
205 generated_metadata => sub { $_[1]{generated_metadata} = $_[2] }, # put
206 started_compression => sub { $_[1]{started_compression} = $_[2] }, # put
207 finished_compression => sub { $_[1]{finished_compression} = $_[2] }, # put
208 put_fetchable => sub { $_[1]{put_fetchable} = $_[2] }, # put
209 put_failed => sub { $_[1]{put_failed} = $_[2] }, # put
210 put_successful => sub { $_[1]{put_successful} = $_[2] }, # put
211
212 sending_to_network => sub { $_[1]{sending_to_network} = $_[2] }, # get
213 compatibility_mode => sub { $_[1]{compatibility_mode} = $_[2] }, # get
214 expected_hashes => sub { $_[1]{expected_hashes} = $_[2] }, # get
215 expected_mime => sub { $_[1]{expected_mime} = $_[2] }, # get
216 expected_data_length => sub { $_[1]{expected_data_length} = $_[2] }, # get
217 get_failed => sub { $_[1]{get_failed} = $_[2] }, # get
218 data_found => sub { $_[1]{data_found} = $_[2] }, # get
219 enter_finite_cooldown => sub { $_[1]{enter_finite_cooldown} = $_[2] }, # get
220);
221
222sub recv {
223 my ($self, $type, $kv, @extra) = @_;
224
225 if (my $cb = $PERSISTENT_TYPE{$type}) {
226 my $id = $kv->{identifier};
227 my $req = $_[0]{req}{$id} ||= {};
228 $cb->($self, $req, $kv);
229 $self->recv (request_change => $kv, $type, @extra);
230 }
231
232 my $on = $self->{on};
233 for (0 .. $#$on) {
234 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
235 splice @$on, $_, 1 unless defined $res;
236 return;
237 }
238 }
239
240 if (my $cb = $self->{queue}[0]) {
241 $cb->($self, $type, $kv, @extra)
242 and shift @{ $self->{queue} };
243 } else {
244 $self->default_recv ($type, $kv, @extra);
245 }
246}
247
160sub on_read { 248sub on_read {
161 my ($self) = @_; 249 my ($self) = @_;
162 250
163 my $type; 251 my $type;
164 my %kv; 252 my %kv;
165 my $rdata; 253 my $rdata;
166
167 my $done_cb = sub {
168 $kv{pkt_type} = $type;
169
170 if (my $cb = $self->{queue}[0]) {
171 $cb->($self, $type, \%kv, $rdata)
172 and shift @{ $self->{queue} };
173 } else {
174 $self->default_recv ($type, \%kv, $rdata);
175 }
176 };
177 254
178 my $hdr_cb; $hdr_cb = sub { 255 my $hdr_cb; $hdr_cb = sub {
179 if ($_[1] =~ /^([^=]+)=(.*)$/) { 256 if ($_[1] =~ /^([^=]+)=(.*)$/) {
180 my ($k, $v) = ($1, $2); 257 my ($k, $v) = ($1, $2);
181 my @k = split /\./, tolc $k; 258 my @k = split /\./, tolc $k;
194 271
195 $_[0]->push_read (line => $hdr_cb); 272 $_[0]->push_read (line => $hdr_cb);
196 } elsif ($_[1] eq "Data") { 273 } elsif ($_[1] eq "Data") {
197 $_[0]->push_read (chunk => delete $kv{data_length}, sub { 274 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
198 $rdata = \$_[1]; 275 $rdata = \$_[1];
199 $done_cb->(); 276 $self->recv ($type, \%kv, $rdata);
200 }); 277 });
201 } elsif ($_[1] eq "EndMessage") { 278 } elsif ($_[1] eq "EndMessage") {
202 $done_cb->(); 279 $self->recv ($type, \%kv);
203 } else { 280 } else {
204 die "protocol error, expected message end, got $_[1]\n";#d# 281 die "protocol error, expected message end, got $_[1]\n";#d#
205 } 282 }
206 }; 283 };
207 284
217 if ($type eq "node_hello") { 294 if ($type eq "node_hello") {
218 $self->{node_hello} = $kv; 295 $self->{node_hello} = $kv;
219 } elsif (exists $self->{id}{$kv->{identifier}}) { 296 } elsif (exists $self->{id}{$kv->{identifier}}) {
220 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) 297 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
221 and delete $self->{id}{$kv->{identifier}}; 298 and delete $self->{id}{$kv->{identifier}};
222 } else {
223 &{ $self->{progress} };
224 } 299 }
225} 300}
301
302=back
303
304=head2 FCP REQUESTS
305
306The following methods implement various requests. Most of them map
307directory to the FCP message of the same name. The added benefit of
308these over sending requests yourself is that they handle the necessary
309serialisation, protocol quirks, and replies.
310
311All of them exist in two versions, the variant shown in this manpage, and
312a variant with an extra C<_> at the end, and an extra C<$cb> argument. The
313version as shown is I<synchronous> - it will wait for any replies, and
314either return the reply, or croak with an error. The underscore variant
315returns immediately and invokes one or more callbacks or condvars later.
316
317For example, the call
318
319 $info = $fcp->get_plugin_info ($name, $detailed);
320
321Also comes in this underscore variant:
322
323 $fcp->get_plugin_info_ ($name, $detailed, $cb);
324
325You can thinbk of the underscore as a kind of continuation indicator - the
326normal function waits and returns with the data, the C<_> indicates that
327you pass the continuation yourself, and the continuation will be invoked
328with the results.
329
330This callback/continuation argument (C<$cb>) can come in three forms itself:
331
332=over 4
333
334=item A code reference (or rather anything not matching some other alternative)
335
336This code reference will be invoked with the result on success. On an
337error, it will die (in the event loop) with a backtrace of the call site.
338
339This is a popular choice, but it makes handling errors hard - make sure
340you never generate protocol errors!
341
342=item A condvar (as returned by e.g. C<< AnyEvent->condvar >>)
343
344When a condvar is passed, it is sent (C<< $cv->send ($results) >>) the
345results when the request has finished. Should an error occur, the error
346will instead result in C<< $cv->croak ($error) >>.
347
348This is also a popular choice.
349
350=item An array with two callbacks C<[$success, $failure]>
351
352The C<$success> callback will be invoked with the results, while the
353C<$failure> callback will be invoked on any errors.
354
355=item C<undef>
356
357This is the same thing as specifying C<sub { }> as callback, i.e. on
358success, the results are ignored, while on failure, you the module dies
359with a backtrace.
360
361This is good for quick scripts, or when you really aren't interested in
362the results.
363
364=back
365
366=cut
367
368our $NOP_CB = sub { };
226 369
227sub _txn { 370sub _txn {
228 my ($name, $sub) = @_; 371 my ($name, $sub) = @_;
229 372
230 *{$name} = sub { 373 *{$name} = sub {
231 splice @_, 1, 0, (my $cv = AnyEvent->condvar); 374 my $cv = AE::cv;
232 &$sub;
233 $cv
234 };
235 375
236 *{"$name\_sync"} = sub { 376 splice @_, 1, 0, $cv, sub { $cv->throw ($_[0]{extra_description}) };
237 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
238 &$sub; 377 &$sub;
239 $cv->recv 378 $cv->recv
240 }; 379 };
241}
242 380
243=item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]]) 381 *{"$name\_"} = sub {
382 my ($ok, $err) = pop;
244 383
384 if (ARRAY:: eq ref $ok) {
385 ($ok, $err) = @$ok;
386 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
387 $err = sub { $ok->throw ($_[0]{extra_description}) };
388 } else {
389 my $bt = Carp::longmess "";
390 $err = sub {
391 die "$_[0]{extra_description}$bt";
392 };
393 }
394
395 $ok ||= $NOP_CB;
396
397 splice @_, 1, 0, $ok, $err;
398 &$sub;
399 };
400}
401
402=over 4
403
245=item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]]) 404=item $peers = $fcp->list_peers ([$with_metdata[, $with_volatile]])
246 405
247=cut 406=cut
248 407
249_txn list_peers => sub { 408_txn list_peers => sub {
250 my ($self, $cv, $with_metadata, $with_volatile) = @_; 409 my ($self, $ok, undef, $with_metadata, $with_volatile) = @_;
251 410
252 my @res; 411 my @res;
253 412
254 $self->send_msg (list_peers => 413 $self->send_msg (list_peers =>
255 with_metadata => $with_metadata ? "true" : "false", 414 with_metadata => $with_metadata ? "true" : "false",
256 with_volatile => $with_volatile ? "true" : "false", 415 with_volatile => $with_volatile ? "true" : "false",
257 id_cb => sub { 416 id_cb => sub {
258 my ($self, $type, $kv, $rdata) = @_; 417 my ($self, $type, $kv, $rdata) = @_;
259 418
260 if ($type eq "end_list_peers") { 419 if ($type eq "end_list_peers") {
261 $cv->(\@res); 420 $ok->(\@res);
262 1 421 1
263 } else { 422 } else {
264 push @res, $kv; 423 push @res, $kv;
265 0 424 0
266 } 425 }
267 }, 426 },
268 ); 427 );
269}; 428};
270 429
271=item $cv = $fcp->list_peer_notes ($node_identifier)
272
273=item $notes = $fcp->list_peer_notes_sync ($node_identifier) 430=item $notes = $fcp->list_peer_notes ($node_identifier)
274 431
275=cut 432=cut
276 433
277_txn list_peer_notes => sub { 434_txn list_peer_notes => sub {
278 my ($self, $cv, $node_identifier) = @_; 435 my ($self, $ok, undef, $node_identifier) = @_;
279 436
280 $self->send_msg (list_peer_notes => 437 $self->send_msg (list_peer_notes =>
281 node_identifier => $node_identifier, 438 node_identifier => $node_identifier,
282 id_cb => sub { 439 id_cb => sub {
283 my ($self, $type, $kv, $rdata) = @_; 440 my ($self, $type, $kv, $rdata) = @_;
284 441
285 $cv->($kv); 442 $ok->($kv);
286 1 443 1
287 }, 444 },
288 ); 445 );
289}; 446};
290 447
291=item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
292
293=item $fcp->watch_global_sync ($enabled[, $verbosity_mask]) 448=item $fcp->watch_global ($enabled[, $verbosity_mask])
294 449
295=cut 450=cut
296 451
297_txn watch_global => sub { 452_txn watch_global => sub {
298 my ($self, $cv, $enabled, $verbosity_mask) = @_; 453 my ($self, $ok, $err, $enabled, $verbosity_mask) = @_;
299 454
300 $self->send_msg (watch_global => 455 $self->send_msg (watch_global =>
301 enabled => $enabled ? "true" : "false", 456 enabled => $enabled ? "true" : "false",
302 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (), 457 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
303 ); 458 );
304 459
305 $cv->(); 460 $ok->();
306}; 461};
307 462
308=item $cv = $fcp->list_persistent_requests
309
310=item $reqs = $fcp->list_persistent_requests_sync 463=item $reqs = $fcp->list_persistent_requests
311 464
312=cut 465=cut
313 466
314_txn list_persistent_requests => sub { 467_txn list_persistent_requests => sub {
315 my ($self, $cv) = @_; 468 my ($self, $ok, $err) = @_;
316 469
470 $self->serialise (list_persistent_requests => sub {
471 my ($self, $guard) = @_;
472
317 my %res; 473 my @res;
318 474
319 $self->send_msg ("list_persistent_requests"); 475 $self->send_msg ("list_persistent_requests");
320 476
321 push @{ $self->{queue} }, sub { 477 $self->on (sub {
322 my ($self, $type, $kv, $rdata) = @_; 478 my ($self, $type, $kv, $rdata) = @_;
323 479
480 $guard if 0;
481
324 if ($type eq "end_list_persistent_requests") { 482 if ($type eq "end_list_persistent_requests") {
325 $cv->(\%res); 483 $ok->(\@res);
484 return;
485 } else {
486 my $id = $kv->{identifier};
487
488 if ($type =~ /^persistent_(get|put|put_dir)$/) {
489 push @res, [$type, $kv];
490 }
491 }
492
326 1 493 1
327 } else { 494 });
328 my $id = $kv->{identifier}; 495 });
496};
329 497
330 if ($type =~ /^persistent_(get|put|put_dir)$/) { 498=item $sync = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
331 $res{$id} = { 499
332 type => $1, 500Update either the C<client_token> or C<priority_class> of a request
333 %{ $res{$id} }, 501identified by C<$global> and C<$identifier>, depending on which of
502C<$client_token> and C<$priority_class> are not C<undef>.
503
504=cut
505
506_txn modify_persistent_request => sub {
507 my ($self, $ok, $err, $global, $identifier, $client_token, $priority_class) = @_;
508
509 $self->serialise ($identifier => sub {
510 my ($self, $guard) = @_;
511
512 $self->send_msg (modify_persistent_request =>
513 global => $global ? "true" : "false",
514 identifier => $identifier,
515 defined $client_token ? (client_token => $client_token ) : (),
516 defined $priority_class ? (priority_class => $priority_class) : (),
517 );
518
519 $self->on (sub {
520 my ($self, $type, $kv, @extra) = @_;
521
522 if ($kv->{identifier} eq $identifier) {
523 if ($type eq "persistent_request_modified") {
334 %$kv, 524 $ok->($kv);
525 return;
526 } elsif ($type eq "protocol_error") {
527 $err->($kv);
528 return;
335 }; 529 }
336 } elsif ($type eq "simple_progress") {
337 delete $kv->{pkt_type}; # save memory
338 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
339 } else {
340 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
341 } 530 }
531
342 0 532 1
343 } 533 });
344 }; 534 });
345}; 535};
346 536
347=item $cv = $fcp->remove_request ($global, $identifier)
348
349=item $status = $fcp->remove_request_sync ($global, $identifier)
350
351=cut
352
353_txn remove_request => sub {
354 my ($self, $cv, $global, $identifier) = @_;
355
356 $self->send_msg (remove_request =>
357 global => $global ? "true" : "false",
358 identifier => $identifier,
359 id_cb => sub {
360 my ($self, $type, $kv, $rdata) = @_;
361
362 $cv->($kv);
363 1
364 },
365 );
366};
367
368=item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
369
370=item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
371
372=cut
373
374_txn modify_persistent_request => sub {
375 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
376
377 $self->send_msg (modify_persistent_request =>
378 global => $global ? "true" : "false",
379 defined $client_token ? (client_token => $client_token ) : (),
380 defined $priority_class ? (priority_class => $priority_class) : (),
381 identifier => $identifier,
382 id_cb => sub {
383 my ($self, $type, $kv, $rdata) = @_;
384
385 $cv->($kv);
386 1
387 },
388 );
389};
390
391=item $cv = $fcp->get_plugin_info ($name, $detailed)
392
393=item $info = $fcp->get_plugin_info_sync ($name, $detailed) 537=item $info = $fcp->get_plugin_info ($name, $detailed)
394 538
395=cut 539=cut
396 540
397_txn get_plugin_info => sub { 541_txn get_plugin_info => sub {
398 my ($self, $cv, $name, $detailed) = @_; 542 my ($self, $ok, $err, $name, $detailed) = @_;
399 543
400 $self->send_msg (get_plugin_info => 544 $self->send_msg (get_plugin_info =>
401 plugin_name => $name, 545 plugin_name => $name,
402 detailed => $detailed ? "true" : "false", 546 detailed => $detailed ? "true" : "false",
403 id_cb => sub { 547 id_cb => sub {
404 my ($self, $type, $kv, $rdata) = @_; 548 my ($self, $type, $kv, $rdata) = @_;
405 549
406 $cv->($kv); 550 $ok->($kv);
407 1 551 1
408 }, 552 },
409 ); 553 );
410}; 554};
411 555
412=item $cv = $fcp->client_get ($uri, $identifier, %kv)
413
414=item $status = $fcp->client_get_sync ($uri, $identifier, %kv) 556=item $status = $fcp->client_get ($uri, $identifier, %kv)
415 557
416%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>). 558%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
417 559
418ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries, 560ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
419priority_class, persistence, client_token, global, return_type, 561priority_class, persistence, client_token, global, return_type,
420binary_blob, allowed_mime_types, filename, temp_filename 562binary_blob, allowed_mime_types, filename, temp_filename
421 563
422=cut 564=cut
423 565
424_txn client_get => sub { 566_txn client_get => sub {
425 my ($self, $cv, $uri, $identifier, %kv) = @_; 567 my ($self, $ok, $err, $uri, $identifier, %kv) = @_;
426 568
427 $self->send_msg (client_get => 569 $self->send_msg (client_get =>
428 %kv, 570 %kv,
429 uri => $uri, 571 uri => $uri,
430 identifier => $identifier, 572 identifier => $identifier,
431 id_cb => sub { 573 );
574
575 $ok->();
576};
577
578=item $status = $fcp->remove_request ($identifier[, $global])
579
580Remove the request with the given isdentifier. Returns true if successful,
581false on error.
582
583=cut
584
585_txn remove_request => sub {
586 my ($self, $ok, $err, $identifier, $global) = @_;
587
588 $self->serialise ($identifier => sub {
589 my ($self, $guard) = @_;
590
591 $self->send_msg (remove_request =>
592 identifier => $identifier,
593 global => $global ? "true" : "false",
594 );
595 $self->on (sub {
432 my ($self, $type, $kv, $rdata) = @_; 596 my ($self, $type, $kv, @extra) = @_;
433 597
598 if ($kv->{identifier} eq $identifier) {
599 if ($type eq "persistent_request_removed") {
600 $ok->(1);
601 return;
602 } elsif ($type eq "protocol_error") {
434 $cv->($kv); 603 $err->($kv);
604 return;
605 }
606 }
607
435 1 608 1
609 });
610 });
611};
612
613=item ($can_read, $can_write) = $fcp->test_dda ($local_directory, $remote_directory, $want_read, $want_write))
614
615The DDA test in FCP is probably the single most broken protocol - only
616one directory test can be outstanding at any time, and some guessing and
617heuristics are involved in mangling the paths.
618
619This function combines C<TestDDARequest> and C<TestDDAResponse> in one
620request, handling file reading and writing as well, and tries very hard to
621do the right thing.
622
623Both C<$local_directory> and C<$remote_directory> must specify the same
624directory - C<$local_directory> is the directory path on the client (where
625L<AnyEvent::FCP> runs) and C<$remote_directory> is the directory path on
626the server (where the freenet node runs). When both are running on the
627same node, the paths are generally identical.
628
629C<$want_read> and C<$want_write> should be set to a true value when you
630want to read (get) files or write (put) files, respectively.
631
632On error, an exception is thrown. Otherwise, C<$can_read> and
633C<$can_write> indicate whether you can reaqd or write to freenet via the
634directory.
635
636=cut
637
638_txn test_dda => sub {
639 my ($self, $ok, $err, $local, $remote, $want_read, $want_write) = @_;
640
641 $self->serialise (test_dda => sub {
642 my ($self, $guard) = @_;
643
644 $self->send_msg (test_dda_request =>
645 directory => $remote,
646 want_read_directory => $want_read ? "true" : "false",
647 want_write_directory => $want_write ? "true" : "false",
648 );
649 $self->on (sub {
650 my ($self, $type, $kv) = @_;
651
652 if ($type eq "test_dda_reply") {
653 # the filenames are all relative to the server-side directory,
654 # which might or might not match $remote anymore, so we
655 # need to rewrite the paths to be relative to $local
656 for my $k (qw(read_filename write_filename)) {
657 my $f = $kv->{$k};
658 for my $dir ($kv->{directory}, $remote) {
659 if ($dir eq substr $f, 0, length $dir) {
660 substr $f, 0, 1 + length $dir, "";
661 $kv->{$k} = $f;
662 last;
663 }
664 }
665 }
666
667 my %response = (directory => $remote);
668
669 if (length $kv->{read_filename}) {
670 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
671 sysread $fh, my $buf, -s $fh;
672 $response{read_content} = $buf;
673 }
674 }
675
676 if (length $kv->{write_filename}) {
677 if (open my $fh, ">:raw", "$local/$kv->{write_filename}") {
678 syswrite $fh, $kv->{content_to_write};
679 }
680 }
681
682 $self->send_msg (test_dda_response => %response);
683
684 $self->on (sub {
685 my ($self, $type, $kv) = @_;
686
687 $guard if 0; # reference
688
689 if ($type eq "test_dda_complete") {
690 $ok->(
691 $kv->{read_directory_allowed} eq "true",
692 $kv->{write_directory_allowed} eq "true",
693 );
694 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
695 $err->($kv->{extra_description});
696 return;
697 }
698
699 1
700 });
701
702 return;
703 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
704 $err->($kv);
705 return;
706 }
707
708 1
709 });
710 });
711};
712
713=back
714
715=head2 REQUEST CACHE
716
717The C<AnyEvent::FCP> class keeps a request cache, where it caches all
718information from requests.
719
720For these messages, it will store a copy of the key-value pairs, together with a C<type> slot,
721in C<< $fcp->{req}{$identifier} >>:
722
723 persistent_get
724 persistent_put
725 persistent_put_dir
726
727This message updates the stored data:
728
729 persistent_request_modified
730
731This message will remove this entry:
732
733 persistent_request_removed
734
735These messages get merged into the cache entry, under their
736type, i.e. a C<simple_progress> message will be stored in C<<
737$fcp->{req}{$identifier}{simple_progress} >>:
738
739 simple_progress # get/put
740
741 uri_generated # put
742 generated_metadata # put
743 started_compression # put
744 finished_compression # put
745 put_failed # put
746 put_fetchable # put
747 put_successful # put
748
749 sending_to_network # get
750 compatibility_mode # get
751 expected_hashes # get
752 expected_mime # get
753 expected_data_length # get
754 get_failed # get
755 data_found # get
756 enter_finite_cooldown # get
757
758In addition, an event (basically a fake message) of type C<request_changed> is generated
759on every change, which will be called as C<< $cb->($fcp, $kv, $type) >>, where C<$type>
760is the type of the original message triggering the change,
761
762To fill this cache with the global queue and keep it updated,
763call C<watch_global> to subscribe to updates, followed by
764C<list_persistent_requests_sync>.
765
766 $fcp->watch_global_sync_; # do not wait
767 $fcp->list_persistent_requests; # wait
768
769To get a better idea of what is stored in the cache, here is an example of
770what might be stored in C<< $fcp->{req}{"Frost-gpl.txt"} >>:
771
772 {
773 identifier => "Frost-gpl.txt",
774 uri => 'CHK@Fnx5kzdrfE,EImdzaVyEWl,AAIC--8/gpl.txt',
775 binary_blob => "false",
776 global => "true",
777 max_retries => -1,
778 max_size => 9223372036854775807,
779 persistence => "forever",
780 priority_class => 3,
781 real_time => "false",
782 return_type => "direct",
783 started => "true",
784 type => "persistent_get",
785 verbosity => 2147483647,
786 sending_to_network => {
787 identifier => "Frost-gpl.txt",
788 global => "true",
436 }, 789 },
437 ); 790 compatibility_mode => {
438}; 791 identifier => "Frost-gpl.txt",
439 792 definitive => "true",
440=back 793 dont_compress => "false",
794 global => "true",
795 max => "COMPAT_1255",
796 min => "COMPAT_1255",
797 },
798 expected_hashes => {
799 identifier => "Frost-gpl.txt",
800 global => "true",
801 hashes => {
802 ed2k => "d83596f5ee3b7...",
803 md5 => "e0894e4a2a6...",
804 sha1 => "...",
805 sha256 => "...",
806 sha512 => "...",
807 tth => "...",
808 },
809 },
810 expected_mime => {
811 identifier => "Frost-gpl.txt",
812 global => "true",
813 metadata => { content_type => "application/rar" },
814 },
815 expected_data_length => {
816 identifier => "Frost-gpl.txt",
817 data_length => 37576,
818 global => "true",
819 },
820 simple_progress => {
821 identifier => "Frost-gpl.txt",
822 failed => 0,
823 fatally_failed => 0,
824 finalized_total => "true",
825 global => "true",
826 last_progress => 1438639282628,
827 required => 372,
828 succeeded => 102,
829 total => 747,
830 },
831 data_found => {
832 identifier => "Frost-gpl.txt",
833 completion_time => 1438663354026,
834 data_length => 37576,
835 global => "true",
836 metadata => { content_type => "image/jpeg" },
837 startup_time => 1438657196167,
838 },
839 }
441 840
442=head1 EXAMPLE PROGRAM 841=head1 EXAMPLE PROGRAM
443 842
444 use AnyEvent::FCP; 843 use AnyEvent::FCP;
445 844
446 my $fcp = new AnyEvent::FCP; 845 my $fcp = new AnyEvent::FCP;
447 846
448 # let us look at the global request list 847 # let us look at the global request list
449 $fcp->watch_global (1, 0); 848 $fcp->watch_global_ (1);
450 849
451 # list them, synchronously 850 # list them, synchronously
452 my $req = $fcp->list_persistent_requests_sync; 851 my $req = $fcp->list_persistent_requests;
453 852
454 # go through all requests 853 # go through all requests
854TODO
455 for my $req (values %$req) { 855 for my $req (values %$req) {
456 # skip jobs not directly-to-disk 856 # skip jobs not directly-to-disk
457 next unless $req->{return_type} eq "disk"; 857 next unless $req->{return_type} eq "disk";
458 # skip jobs not issued by FProxy 858 # skip jobs not issued by FProxy
459 next unless $req->{identifier} =~ /^FProxy:/; 859 next unless $req->{identifier} =~ /^FProxy:/;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines