ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.3 by root, Tue Jul 28 02:20:51 2009 UTC vs.
Revision 1.13 by root, Sat Aug 8 04:07:28 2015 UTC

6 6
7 use AnyEvent::FCP; 7 use AnyEvent::FCP;
8 8
9 my $fcp = new AnyEvent::FCP; 9 my $fcp = new AnyEvent::FCP;
10 10
11# transactions return condvars 11 # transactions return condvars
12 my $lp_cv = $fcp->list_peers; 12 my $lp_cv = $fcp->list_peers;
13 my $pr_cv = $fcp->list_persistent_requests; 13 my $pr_cv = $fcp->list_persistent_requests;
14 14
15 my $peers = $lp_cv->recv; 15 my $peers = $lp_cv->recv;
16 my $reqs = $pr_cv->recv; 16 my $reqs = $pr_cv->recv;
26The module uses L<AnyEvent> to find a suitable event module. 26The module uses L<AnyEvent> to find a suitable event module.
27 27
28Only very little is implemented, ask if you need more, and look at the 28Only very little is implemented, ask if you need more, and look at the
29example program later in this section. 29example program later in this section.
30 30
31=head2 EXAMPLE
32
33This example fetches the download list and sets the priority of all files
34with "a" in their name to "emergency":
35
36 use AnyEvent::FCP;
37
38 my $fcp = new AnyEvent::FCP;
39
40 $fcp->watch_global (1, 0);
41 my $req = $fcp->list_persistent_requests;
42
43TODO
44 for my $req (values %$req) {
45 if ($req->{filename} =~ /a/) {
46 $fcp->modify_persistent_request (1, $req->{identifier}, undef, 0);
47 }
48 }
49
31=head2 IMPORT TAGS 50=head2 IMPORT TAGS
32 51
33Nothing much can be "imported" from this module right now. 52Nothing much can be "imported" from this module right now.
34 53
35=head2 THE AnyEvent::FCP CLASS 54=head1 THE AnyEvent::FCP CLASS
36 55
37=over 4 56=over 4
38 57
39=cut 58=cut
40 59
42 61
43use common::sense; 62use common::sense;
44 63
45use Carp; 64use Carp;
46 65
47our $VERSION = '0.2'; 66our $VERSION = '0.3';
48 67
49use Scalar::Util (); 68use Scalar::Util ();
50 69
51use AnyEvent; 70use AnyEvent;
52use AnyEvent::Handle; 71use AnyEvent::Handle;
72use AnyEvent::Util ();
53 73
54sub touc($) { 74sub touc($) {
55 local $_ = shift; 75 local $_ = shift;
56 1 while s/((?:^|_)(?:svk|chk|uri|fcp)(?:_|$))/\U$1/; 76 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
57 s/(?:^|_)(.)/\U$1/g; 77 s/(?:^|_)(.)/\U$1/g;
58 $_ 78 $_
59} 79}
60 80
61sub tolc($) { 81sub tolc($) {
62 local $_ = shift; 82 local $_ = shift;
63 1 while s/(SVK|CHK|URI|FCP)([^_])/$1\_$2/i; 83 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/;
64 1 while s/([^_])(SVK|CHK|URI|FCP)/$1\_$2/i; 84 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/;
65 s/(?<=[a-z])(?=[A-Z])/_/g; 85 s/(?<=[a-z])(?=[A-Z])/_/g;
66 lc 86 lc
67} 87}
68 88
69=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 89=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, name => $name]
70 90
71Create a new FCP connection to the given host and port (default 91Create a new FCP connection to the given host and port (default
72127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). 92127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
73 93
74If no C<name> was specified, then AnyEvent::FCP will generate a 94If no C<name> was specified, then AnyEvent::FCP will generate a
75(hopefully) unique client name for you. 95(hopefully) unique client name for you.
76 96
77=cut 97=cut
78 98
79#TODO
80#You can install a progress callback that is being called with the AnyEvent::FCP
81#object, a txn object, the type of the transaction and the attributes. Use
82#it like this:
83#
84# sub progress_cb {
85# my ($self, $txn, $type, $attr) = @_;
86#
87# warn "progress<$txn,$type," . (join ":", %$attr) . ">\n";
88# }
89
90sub new { 99sub new {
91 my $class = shift; 100 my $class = shift;
92 my $self = bless { @_ }, $class; 101 my $self = bless {
93 102 host => $ENV{FREDHOST} || "127.0.0.1",
94 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 103 port => $ENV{FREDPORT} || 9481,
95 $self->{port} ||= $ENV{FREDPORT} || 9481; 104 timeout => 3600 * 2,
96 $self->{name} ||= time.rand.rand.rand; # lame 105 name => time.rand.rand.rand, # lame
97 $self->{timeout} ||= 600; 106 @_,
98 107 queue => [],
99 $self->{id} = "a0"; 108 req => {},
109 id => "a0",
110 }, $class;
100 111
101 { 112 {
102 Scalar::Util::weaken (my $self = $self); 113 Scalar::Util::weaken (my $self = $self);
103 114
104 $self->{hdl} = new AnyEvent::Handle 115 $self->{hdl} = new AnyEvent::Handle
105 connect => [$self->{host} => $self->{port}], 116 connect => [$self->{host} => $self->{port}],
106 timeout => $self->{timeout}, 117 timeout => $self->{timeout},
107 on_error => sub { 118 on_error => sub {
108 warn "<@_>\n"; 119 warn "@_\n";#d#
109 exit 1; 120 exit 1;
110 }, 121 },
111 on_read => sub { $self->on_read (@_) }, 122 on_read => sub { $self->on_read (@_) },
112 on_eof => $self->{on_eof} || sub { }; 123 on_eof => $self->{on_eof} || sub { };
113 124
114 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 125 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
115 } 126 }
116 127
117 $self->send_msg ( 128 $self->send_msg (client_hello =>
118 client_hello =>
119 name => $self->{name}, 129 name => $self->{name},
120 expected_version => "2.0", 130 expected_version => "2.0",
121 ); 131 );
122 132
123 $self 133 $self
124} 134}
125 135
126#sub progress {
127# my ($self, $txn, $type, $attr) = @_;
128#
129# $self->{progress}->($self, $txn, $type, $attr)
130# if $self->{progress};
131#}
132
133sub send_msg { 136sub send_msg {
134 my ($self, $type, %kv) = @_; 137 my ($self, $type, %kv) = @_;
135 138
136 my $data = delete $kv{data}; 139 my $data = delete $kv{data};
137 140
138 if (exists $kv{id_cb}) { 141 if (exists $kv{id_cb}) {
139 my $id = $kv{identifier} || ++$self->{id}; 142 my $id = $kv{identifier} ||= ++$self->{id};
140 $self->{id}{$id} = delete $kv{id_cb}; 143 $self->{id}{$id} = delete $kv{id_cb};
141 $kv{identifier} = $id;
142 } 144 }
143 145
144 my $msg = (touc $type) . "\012" 146 my $msg = (touc $type) . "\012"
145 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; 147 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
146 148
158 } 160 }
159 161
160 $self->{hdl}->push_write ($msg); 162 $self->{hdl}->push_write ($msg);
161} 163}
162 164
165sub on {
166 my ($self, $cb) = @_;
167
168 # cb return undef - message eaten, remove cb
169 # cb return 0 - message eaten
170 # cb return 1 - pass to next
171
172 push @{ $self->{on} }, $cb;
173}
174
175sub _push_queue {
176 my ($self, $queue) = @_;
177
178 shift @$queue;
179 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
180 if @$queue;
181}
182
183# lock so only one $type (arbitrary string) is in flight,
184# to work around horribly misdesigned protocol.
185sub serialise {
186 my ($self, $type, $cb) = @_;
187
188 my $queue = $self->{serialise}{$type} ||= [];
189 push @$queue, $cb;
190 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
191 unless $#$queue;
192}
193
194# how to merge these types into $self->{persistent}
195our %PERSISTENT_TYPE = (
196 persistent_get => sub { %{ $_[1] } = (type => "persistent_get" , %{ $_[2] }) },
197 persistent_put => sub { %{ $_[1] } = (type => "persistent_put" , %{ $_[2] }) },
198 persistent_put_dir => sub { %{ $_[1] } = (type => "persistent_put_dir", %{ $_[2] }) },
199 persistent_request_modified => sub { %{ $_[1] } = (%{ $_[1] }, %{ $_[2] }) },
200 persistent_request_removed => sub { delete $_[0]{req}{$_[2]{identifier}} },
201
202 simple_progress => sub { $_[1]{simple_progress} = $_[2] }, # get/put
203
204 uri_generated => sub { $_[1]{uri_generated} = $_[2] }, # put
205 generated_metadata => sub { $_[1]{generated_metadata} = $_[2] }, # put
206 started_compression => sub { $_[1]{started_compression} = $_[2] }, # put
207 finished_compression => sub { $_[1]{finished_compression} = $_[2] }, # put
208 put_fetchable => sub { $_[1]{put_fetchable} = $_[2] }, # put
209 put_failed => sub { $_[1]{put_failed} = $_[2] }, # put
210 put_successful => sub { $_[1]{put_successful} = $_[2] }, # put
211
212 sending_to_network => sub { $_[1]{sending_to_network} = $_[2] }, # get
213 compatibility_mode => sub { $_[1]{compatibility_mode} = $_[2] }, # get
214 expected_hashes => sub { $_[1]{expected_hashes} = $_[2] }, # get
215 expected_mime => sub { $_[1]{expected_mime} = $_[2] }, # get
216 expected_data_length => sub { $_[1]{expected_data_length} = $_[2] }, # get
217 get_failed => sub { $_[1]{get_failed} = $_[2] }, # get
218 data_found => sub { $_[1]{data_found} = $_[2] }, # get
219 enter_finite_cooldown => sub { $_[1]{enter_finite_cooldown} = $_[2] }, # get
220);
221
222sub recv {
223 my ($self, $type, $kv, @extra) = @_;
224
225 if (my $cb = $PERSISTENT_TYPE{$type}) {
226 my $id = $kv->{identifier};
227 my $req = $_[0]{req}{$id} ||= {};
228 $cb->($self, $req, $kv);
229 $self->recv (request_change => $kv, $type, @extra);
230 }
231
232 my $on = $self->{on};
233 for (0 .. $#$on) {
234 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
235 splice @$on, $_, 1 unless defined $res;
236 return;
237 }
238 }
239
240 if (my $cb = $self->{queue}[0]) {
241 $cb->($self, $type, $kv, @extra)
242 and shift @{ $self->{queue} };
243 } else {
244 $self->default_recv ($type, $kv, @extra);
245 }
246}
247
163sub on_read { 248sub on_read {
164 my ($self) = @_; 249 my ($self) = @_;
165 250
166 my $type; 251 my $type;
167 my %kv; 252 my %kv;
168 my $rdata; 253 my $rdata;
169
170 my $done_cb = sub {
171 $kv{pkt_type} = $type;
172
173 if (my $cb = $self->{queue}[0]) {
174 $cb->($self, $type, \%kv, $rdata)
175 and shift @{ $self->{queue} };
176 } else {
177 $self->default_recv ($type, \%kv, $rdata);
178 }
179 };
180 254
181 my $hdr_cb; $hdr_cb = sub { 255 my $hdr_cb; $hdr_cb = sub {
182 if ($_[1] =~ /^([^=]+)=(.*)$/) { 256 if ($_[1] =~ /^([^=]+)=(.*)$/) {
183 my ($k, $v) = ($1, $2); 257 my ($k, $v) = ($1, $2);
184 my @k = split /\./, tolc $k; 258 my @k = split /\./, tolc $k;
197 271
198 $_[0]->push_read (line => $hdr_cb); 272 $_[0]->push_read (line => $hdr_cb);
199 } elsif ($_[1] eq "Data") { 273 } elsif ($_[1] eq "Data") {
200 $_[0]->push_read (chunk => delete $kv{data_length}, sub { 274 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
201 $rdata = \$_[1]; 275 $rdata = \$_[1];
202 $done_cb->(); 276 $self->recv ($type, \%kv, $rdata);
203 }); 277 });
204 } elsif ($_[1] eq "EndMessage") { 278 } elsif ($_[1] eq "EndMessage") {
205 $done_cb->(); 279 $self->recv ($type, \%kv);
206 } else { 280 } else {
207 die "protocol error, expected message end, got $_[1]\n";#d# 281 die "protocol error, expected message end, got $_[1]\n";#d#
208 } 282 }
209 }; 283 };
210 284
220 if ($type eq "node_hello") { 294 if ($type eq "node_hello") {
221 $self->{node_hello} = $kv; 295 $self->{node_hello} = $kv;
222 } elsif (exists $self->{id}{$kv->{identifier}}) { 296 } elsif (exists $self->{id}{$kv->{identifier}}) {
223 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) 297 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
224 and delete $self->{id}{$kv->{identifier}}; 298 and delete $self->{id}{$kv->{identifier}};
225 } else {
226 # on_warn
227 #warn "protocol warning (unexpected $type message)\n";
228 } 299 }
229} 300}
301
302=back
303
304=head2 FCP REQUESTS
305
306The following methods implement various requests. Most of them map
307directory to the FCP message of the same name. The added benefit of
308these over sending requests yourself is that they handle the necessary
309serialisation, protocol quirks, and replies.
310
311All of them exist in two versions, the variant shown in this manpage, and
312a variant with an extra C<_> at the end, and an extra C<$cb> argument. The
313version as shown is I<synchronous> - it will wait for any replies, and
314either return the reply, or croak with an error. The underscore variant
315returns immediately and invokes one or more callbacks or condvars later.
316
317For example, the call
318
319 $info = $fcp->get_plugin_info ($name, $detailed);
320
321Also comes in this underscore variant:
322
323 $fcp->get_plugin_info_ ($name, $detailed, $cb);
324
325You can thinbk of the underscore as a kind of continuation indicator - the
326normal function waits and returns with the data, the C<_> indicates that
327you pass the continuation yourself, and the continuation will be invoked
328with the results.
329
330This callback/continuation argument (C<$cb>) can come in three forms itself:
331
332=over 4
333
334=item A code reference (or rather anything not matching some other alternative)
335
336This code reference will be invoked with the result on success. On an
337error, it will die (in the event loop) with a backtrace of the call site.
338
339This is a popular choice, but it makes handling errors hard - make sure
340you never generate protocol errors!
341
342=item A condvar (as returned by e.g. C<< AnyEvent->condvar >>)
343
344When a condvar is passed, it is sent (C<< $cv->send ($results) >>) the
345results when the request has finished. Should an error occur, the error
346will instead result in C<< $cv->croak ($error) >>.
347
348This is also a popular choice.
349
350=item An array with two callbacks C<[$success, $failure]>
351
352The C<$success> callback will be invoked with the results, while the
353C<$failure> callback will be invoked on any errors.
354
355=item C<undef>
356
357This is the same thing as specifying C<sub { }> as callback, i.e. on
358success, the results are ignored, while on failure, you the module dies
359with a backtrace.
360
361This is good for quick scripts, or when you really aren't interested in
362the results.
363
364=back
365
366=cut
367
368our $NOP_CB = sub { };
230 369
231sub _txn { 370sub _txn {
232 my ($name, $sub) = @_; 371 my ($name, $sub) = @_;
233 372
234 *{$name} = sub { 373 *{$name} = sub {
235 splice @_, 1, 0, (my $cv = AnyEvent->condvar); 374 my $cv = AE::cv;
236 &$sub;
237 $cv
238 };
239 375
240 *{"$name\_sync"} = sub { 376 splice @_, 1, 0, $cv, sub { $cv->throw ($_[0]{extra_description}) };
241 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
242 &$sub; 377 &$sub;
243 $cv->recv 378 $cv->recv
244 }; 379 };
245}
246 380
247=item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]]) 381 *{"$name\_"} = sub {
382 my ($ok, $err) = pop;
248 383
384 if (ARRAY:: eq ref $ok) {
385 ($ok, $err) = @$ok;
386 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
387 $err = sub { $ok->throw ($_[0]{extra_description}) };
388 } else {
389 my $bt = Carp::longmess "";
390 $err = sub {
391 die "$_[0]{extra_description}$bt";
392 };
393 }
394
395 $ok ||= $NOP_CB;
396
397 splice @_, 1, 0, $ok, $err;
398 &$sub;
399 };
400}
401
402=over 4
403
249=item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]]) 404=item $peers = $fcp->list_peers ([$with_metdata[, $with_volatile]])
250 405
251=cut 406=cut
252 407
253_txn list_peers => sub { 408_txn list_peers => sub {
254 my ($self, $cv, $with_metadata, $with_volatile) = @_; 409 my ($self, $ok, undef, $with_metadata, $with_volatile) = @_;
255 410
256 my @res; 411 my @res;
257 412
258 $self->send_msg (list_peers => 413 $self->send_msg (list_peers =>
259 with_metadata => $with_metadata ? "true" : "false", 414 with_metadata => $with_metadata ? "true" : "false",
260 with_volatile => $with_volatile ? "true" : "false", 415 with_volatile => $with_volatile ? "true" : "false",
261 id_cb => sub { 416 id_cb => sub {
262 my ($self, $type, $kv, $rdata) = @_; 417 my ($self, $type, $kv, $rdata) = @_;
263 418
264 if ($type eq "end_list_peers") { 419 if ($type eq "end_list_peers") {
265 $cv->(\@res); 420 $ok->(\@res);
266 1 421 1
267 } else { 422 } else {
268 push @res, $kv; 423 push @res, $kv;
269 0 424 0
270 } 425 }
271 }, 426 },
272 ); 427 );
273}; 428};
274 429
275=item $cv = $fcp->list_peer_notes ($node_identifier)
276
277=item $notes = $fcp->list_peer_notes_sync ($node_identifier) 430=item $notes = $fcp->list_peer_notes ($node_identifier)
278 431
279=cut 432=cut
280 433
281_txn list_peer_notes => sub { 434_txn list_peer_notes => sub {
282 my ($self, $cv, $node_identifier) = @_; 435 my ($self, $ok, undef, $node_identifier) = @_;
283 436
284 $self->send_msg (list_peer_notes => 437 $self->send_msg (list_peer_notes =>
285 node_identifier => $node_identifier, 438 node_identifier => $node_identifier,
286 id_cb => sub { 439 id_cb => sub {
287 my ($self, $type, $kv, $rdata) = @_; 440 my ($self, $type, $kv, $rdata) = @_;
288 441
289 $cv->($kv); 442 $ok->($kv);
290 1 443 1
291 }, 444 },
292 ); 445 );
293}; 446};
294 447
295=item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
296
297=item $fcp->watch_global_sync ($enabled[, $verbosity_mask]) 448=item $fcp->watch_global ($enabled[, $verbosity_mask])
298 449
299=cut 450=cut
300 451
301_txn watch_global => sub { 452_txn watch_global => sub {
302 my ($self, $cv, $enabled, $verbosity_mask) = @_; 453 my ($self, $ok, $err, $enabled, $verbosity_mask) = @_;
303 454
304 $self->send_msg (watch_global => 455 $self->send_msg (watch_global =>
305 enabled => $enabled ? "true" : "false", 456 enabled => $enabled ? "true" : "false",
306 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (), 457 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
307 ); 458 );
308 459
309 $cv->(); 460 $ok->();
310}; 461};
311 462
312=item $cv = $fcp->list_persistent_requests
313
314=item $reqs = $fcp->list_persistent_requests_sync 463=item $reqs = $fcp->list_persistent_requests
315 464
316=cut 465=cut
317 466
318_txn list_persistent_requests => sub { 467_txn list_persistent_requests => sub {
319 my ($self, $cv) = @_; 468 my ($self, $ok, $err) = @_;
320 469
470 $self->serialise (list_persistent_requests => sub {
471 my ($self, $guard) = @_;
472
321 my %res; 473 my @res;
322 474
323 $self->send_msg ("list_persistent_requests"); 475 $self->send_msg ("list_persistent_requests");
324 476
325 push @{ $self->{queue} }, sub { 477 $self->on (sub {
326 my ($self, $type, $kv, $rdata) = @_; 478 my ($self, $type, $kv, $rdata) = @_;
327 479
480 $guard if 0;
481
328 if ($type eq "end_list_persistent_requests") { 482 if ($type eq "end_list_persistent_requests") {
329 $cv->(\%res); 483 $ok->(\@res);
484 return;
485 } else {
486 my $id = $kv->{identifier};
487
488 if ($type =~ /^persistent_(get|put|put_dir)$/) {
489 push @res, [$type, $kv];
490 }
491 }
492
330 1 493 1
331 } else { 494 });
332 my $id = $kv->{identifier}; 495 });
496};
333 497
334 if ($type =~ /^persistent_(get|put|put_dir)$/) { 498=item $sync = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
335 $res{$id} = { 499
336 type => $1, 500Update either the C<client_token> or C<priority_class> of a request
337 %{ $res{$id} }, 501identified by C<$global> and C<$identifier>, depending on which of
502C<$client_token> and C<$priority_class> are not C<undef>.
503
504=cut
505
506_txn modify_persistent_request => sub {
507 my ($self, $ok, $err, $global, $identifier, $client_token, $priority_class) = @_;
508
509 $self->serialise ($identifier => sub {
510 my ($self, $guard) = @_;
511
512 $self->send_msg (modify_persistent_request =>
513 global => $global ? "true" : "false",
514 identifier => $identifier,
515 defined $client_token ? (client_token => $client_token ) : (),
516 defined $priority_class ? (priority_class => $priority_class) : (),
517 );
518
519 $self->on (sub {
520 my ($self, $type, $kv, @extra) = @_;
521
522 $guard if 0;
523
524 if ($kv->{identifier} eq $identifier) {
525 if ($type eq "persistent_request_modified") {
338 %$kv, 526 $ok->($kv);
527 return;
528 } elsif ($type eq "protocol_error") {
529 $err->($kv);
530 return;
339 }; 531 }
340 } elsif ($type eq "simple_progress") {
341 delete $kv->{pkt_type}; # save memory
342 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
343 } else {
344 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
345 } 532 }
533
346 0 534 1
347 } 535 });
348 }; 536 });
349}; 537};
350 538
351=item $cv = $fcp->remove_request ($global, $identifier)
352
353=item $status = $fcp->remove_request_sync ($global, $identifier)
354
355=cut
356
357_txn remove_request => sub {
358 my ($self, $cv, $global, $identifier) = @_;
359
360 $self->send_msg (remove_request =>
361 global => $global ? "true" : "false",
362 identifier => $identifier,
363 id_cb => sub {
364 my ($self, $type, $kv, $rdata) = @_;
365
366 $cv->($kv);
367 1
368 },
369 );
370
371 $cv->();
372};
373
374=item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
375
376=item $status = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
377
378=cut
379
380_txn modify_persistent_request => sub {
381 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
382
383 $self->send_msg (modify_persistent_request =>
384 global => $global ? "true" : "false",
385 identifier => $identifier,
386 defined $client_token ? (client_token => $client_token ) : (),
387 defined $priority_class ? (priority_class => $priority_class) : (),
388 id_cb => sub {
389 my ($self, $type, $kv, $rdata) = @_;
390
391 $cv->($kv);
392 1
393 },
394 );
395
396 $cv->();
397};
398
399=item $cv = $fcp->get_plugin_info ($name, $detailed)
400
401=item $info = $fcp->get_plugin_info_sync ($name, $detailed) 539=item $info = $fcp->get_plugin_info ($name, $detailed)
402 540
403=cut 541=cut
404 542
405_txn get_plugin_info => sub { 543_txn get_plugin_info => sub {
406 my ($self, $cv, $name, $detailed) = @_; 544 my ($self, $ok, $err, $name, $detailed) = @_;
407 545
408 $self->send_msg (get_plugin_info => 546 $self->send_msg (get_plugin_info =>
409 plugin_name => $name, 547 plugin_name => $name,
410 detailed => $detailed ? "true" : "false", 548 detailed => $detailed ? "true" : "false",
411 id_cb => sub { 549 id_cb => sub {
412 my ($self, $type, $kv, $rdata) = @_; 550 my ($self, $type, $kv, $rdata) = @_;
413 551
414 $cv->($kv); 552 $ok->($kv);
415 1 553 1
416 }, 554 },
417 ); 555 );
418
419 $cv->();
420}; 556};
421 557
558=item $status = $fcp->client_get ($uri, $identifier, %kv)
559
560%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
561
562ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
563priority_class, persistence, client_token, global, return_type,
564binary_blob, allowed_mime_types, filename, temp_filename
565
566=cut
567
568_txn client_get => sub {
569 my ($self, $ok, $err, $uri, $identifier, %kv) = @_;
570
571 $self->serialise ($identifier => sub {
572 my ($self, $guard) = @_;
573
574 $self->send_msg (client_get =>
575 %kv,
576 uri => $uri,
577 identifier => $identifier,
578 );
579
580 $self->on (sub {
581 my ($self, $type, $kv, @extra) = @_;
582
583 $guard if 0;
584
585 if ($kv->{identifier} eq $identifier) {
586 if ($type eq "persistent_get") {
587 $ok->($kv);
588 return;
589 } elsif ($type eq "protocol_error") {
590 $err->($kv);
591 return;
592 }
593 }
594
595 1
596 });
597 });
598};
599
600=item $status = $fcp->remove_request ($identifier[, $global])
601
602Remove the request with the given isdentifier. Returns true if successful,
603false on error.
604
605=cut
606
607_txn remove_request => sub {
608 my ($self, $ok, $err, $identifier, $global) = @_;
609
610 $self->serialise ($identifier => sub {
611 my ($self, $guard) = @_;
612
613 $self->send_msg (remove_request =>
614 identifier => $identifier,
615 global => $global ? "true" : "false",
616 );
617 $self->on (sub {
618 my ($self, $type, $kv, @extra) = @_;
619
620 $guard if 0;
621
622 if ($kv->{identifier} eq $identifier) {
623 if ($type eq "persistent_request_removed") {
624 $ok->(1);
625 return;
626 } elsif ($type eq "protocol_error") {
627 $err->($kv);
628 return;
629 }
630 }
631
632 1
633 });
634 });
635};
636
637=item ($can_read, $can_write) = $fcp->test_dda ($local_directory, $remote_directory, $want_read, $want_write))
638
639The DDA test in FCP is probably the single most broken protocol - only
640one directory test can be outstanding at any time, and some guessing and
641heuristics are involved in mangling the paths.
642
643This function combines C<TestDDARequest> and C<TestDDAResponse> in one
644request, handling file reading and writing as well, and tries very hard to
645do the right thing.
646
647Both C<$local_directory> and C<$remote_directory> must specify the same
648directory - C<$local_directory> is the directory path on the client (where
649L<AnyEvent::FCP> runs) and C<$remote_directory> is the directory path on
650the server (where the freenet node runs). When both are running on the
651same node, the paths are generally identical.
652
653C<$want_read> and C<$want_write> should be set to a true value when you
654want to read (get) files or write (put) files, respectively.
655
656On error, an exception is thrown. Otherwise, C<$can_read> and
657C<$can_write> indicate whether you can reaqd or write to freenet via the
658directory.
659
660=cut
661
662_txn test_dda => sub {
663 my ($self, $ok, $err, $local, $remote, $want_read, $want_write) = @_;
664
665 $self->serialise (test_dda => sub {
666 my ($self, $guard) = @_;
667
668 $self->send_msg (test_dda_request =>
669 directory => $remote,
670 want_read_directory => $want_read ? "true" : "false",
671 want_write_directory => $want_write ? "true" : "false",
672 );
673 $self->on (sub {
674 my ($self, $type, $kv) = @_;
675
676 if ($type eq "test_dda_reply") {
677 # the filenames are all relative to the server-side directory,
678 # which might or might not match $remote anymore, so we
679 # need to rewrite the paths to be relative to $local
680 for my $k (qw(read_filename write_filename)) {
681 my $f = $kv->{$k};
682 for my $dir ($kv->{directory}, $remote) {
683 if ($dir eq substr $f, 0, length $dir) {
684 substr $f, 0, 1 + length $dir, "";
685 $kv->{$k} = $f;
686 last;
687 }
688 }
689 }
690
691 my %response = (directory => $remote);
692
693 if (length $kv->{read_filename}) {
694 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
695 sysread $fh, my $buf, -s $fh;
696 $response{read_content} = $buf;
697 }
698 }
699
700 if (length $kv->{write_filename}) {
701 if (open my $fh, ">:raw", "$local/$kv->{write_filename}") {
702 syswrite $fh, $kv->{content_to_write};
703 }
704 }
705
706 $self->send_msg (test_dda_response => %response);
707
708 $self->on (sub {
709 my ($self, $type, $kv) = @_;
710
711 $guard if 0; # reference
712
713 if ($type eq "test_dda_complete") {
714 $ok->(
715 $kv->{read_directory_allowed} eq "true",
716 $kv->{write_directory_allowed} eq "true",
717 );
718 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
719 $err->($kv->{extra_description});
720 return;
721 }
722
723 1
724 });
725
726 return;
727 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
728 $err->($kv);
729 return;
730 }
731
732 1
733 });
734 });
735};
736
422=back 737=back
423 738
739=head2 REQUEST CACHE
740
741The C<AnyEvent::FCP> class keeps a request cache, where it caches all
742information from requests.
743
744For these messages, it will store a copy of the key-value pairs, together with a C<type> slot,
745in C<< $fcp->{req}{$identifier} >>:
746
747 persistent_get
748 persistent_put
749 persistent_put_dir
750
751This message updates the stored data:
752
753 persistent_request_modified
754
755This message will remove this entry:
756
757 persistent_request_removed
758
759These messages get merged into the cache entry, under their
760type, i.e. a C<simple_progress> message will be stored in C<<
761$fcp->{req}{$identifier}{simple_progress} >>:
762
763 simple_progress # get/put
764
765 uri_generated # put
766 generated_metadata # put
767 started_compression # put
768 finished_compression # put
769 put_failed # put
770 put_fetchable # put
771 put_successful # put
772
773 sending_to_network # get
774 compatibility_mode # get
775 expected_hashes # get
776 expected_mime # get
777 expected_data_length # get
778 get_failed # get
779 data_found # get
780 enter_finite_cooldown # get
781
782In addition, an event (basically a fake message) of type C<request_changed> is generated
783on every change, which will be called as C<< $cb->($fcp, $kv, $type) >>, where C<$type>
784is the type of the original message triggering the change,
785
786To fill this cache with the global queue and keep it updated,
787call C<watch_global> to subscribe to updates, followed by
788C<list_persistent_requests_sync>.
789
790 $fcp->watch_global_sync_; # do not wait
791 $fcp->list_persistent_requests; # wait
792
793To get a better idea of what is stored in the cache, here is an example of
794what might be stored in C<< $fcp->{req}{"Frost-gpl.txt"} >>:
795
796 {
797 identifier => "Frost-gpl.txt",
798 uri => 'CHK@Fnx5kzdrfE,EImdzaVyEWl,AAIC--8/gpl.txt',
799 binary_blob => "false",
800 global => "true",
801 max_retries => -1,
802 max_size => 9223372036854775807,
803 persistence => "forever",
804 priority_class => 3,
805 real_time => "false",
806 return_type => "direct",
807 started => "true",
808 type => "persistent_get",
809 verbosity => 2147483647,
810 sending_to_network => {
811 identifier => "Frost-gpl.txt",
812 global => "true",
813 },
814 compatibility_mode => {
815 identifier => "Frost-gpl.txt",
816 definitive => "true",
817 dont_compress => "false",
818 global => "true",
819 max => "COMPAT_1255",
820 min => "COMPAT_1255",
821 },
822 expected_hashes => {
823 identifier => "Frost-gpl.txt",
824 global => "true",
825 hashes => {
826 ed2k => "d83596f5ee3b7...",
827 md5 => "e0894e4a2a6...",
828 sha1 => "...",
829 sha256 => "...",
830 sha512 => "...",
831 tth => "...",
832 },
833 },
834 expected_mime => {
835 identifier => "Frost-gpl.txt",
836 global => "true",
837 metadata => { content_type => "application/rar" },
838 },
839 expected_data_length => {
840 identifier => "Frost-gpl.txt",
841 data_length => 37576,
842 global => "true",
843 },
844 simple_progress => {
845 identifier => "Frost-gpl.txt",
846 failed => 0,
847 fatally_failed => 0,
848 finalized_total => "true",
849 global => "true",
850 last_progress => 1438639282628,
851 required => 372,
852 succeeded => 102,
853 total => 747,
854 },
855 data_found => {
856 identifier => "Frost-gpl.txt",
857 completion_time => 1438663354026,
858 data_length => 37576,
859 global => "true",
860 metadata => { content_type => "image/jpeg" },
861 startup_time => 1438657196167,
862 },
863 }
864
424=head1 EXAMPLE PROGRAM 865=head1 EXAMPLE PROGRAM
425 866
426 use AnyEvent::FCP; 867 use AnyEvent::FCP;
427 868
428 my $fcp = new AnyEvent::FCP; 869 my $fcp = new AnyEvent::FCP;
429 870
430 # let us look at the global request list 871 # let us look at the global request list
431 $fcp->watch_global (1, 0); 872 $fcp->watch_global_ (1);
432 873
433 # list them, synchronously 874 # list them, synchronously
434 my $req = $fcp->list_persistent_requests_sync; 875 my $req = $fcp->list_persistent_requests;
435 876
436 # go through all requests 877 # go through all requests
878TODO
437 for my $req (values %$req) { 879 for my $req (values %$req) {
438 # skip jobs not directly-to-disk 880 # skip jobs not directly-to-disk
439 next unless $req->{return_type} eq "disk"; 881 next unless $req->{return_type} eq "disk";
440 # skip jobs not issued by FProxy 882 # skip jobs not issued by FProxy
441 next unless $req->{identifier} =~ /^FProxy:/; 883 next unless $req->{identifier} =~ /^FProxy:/;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines