ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.4 by root, Wed Jul 29 09:25:46 2009 UTC vs.
Revision 1.12 by root, Sat Aug 8 04:02:48 2015 UTC

6 6
7 use AnyEvent::FCP; 7 use AnyEvent::FCP;
8 8
9 my $fcp = new AnyEvent::FCP; 9 my $fcp = new AnyEvent::FCP;
10 10
11# transactions return condvars 11 # transactions return condvars
12 my $lp_cv = $fcp->list_peers; 12 my $lp_cv = $fcp->list_peers;
13 my $pr_cv = $fcp->list_persistent_requests; 13 my $pr_cv = $fcp->list_persistent_requests;
14 14
15 my $peers = $lp_cv->recv; 15 my $peers = $lp_cv->recv;
16 my $reqs = $pr_cv->recv; 16 my $reqs = $pr_cv->recv;
26The module uses L<AnyEvent> to find a suitable event module. 26The module uses L<AnyEvent> to find a suitable event module.
27 27
28Only very little is implemented, ask if you need more, and look at the 28Only very little is implemented, ask if you need more, and look at the
29example program later in this section. 29example program later in this section.
30 30
31=head2 EXAMPLE
32
33This example fetches the download list and sets the priority of all files
34with "a" in their name to "emergency":
35
36 use AnyEvent::FCP;
37
38 my $fcp = new AnyEvent::FCP;
39
40 $fcp->watch_global (1, 0);
41 my $req = $fcp->list_persistent_requests;
42
43TODO
44 for my $req (values %$req) {
45 if ($req->{filename} =~ /a/) {
46 $fcp->modify_persistent_request (1, $req->{identifier}, undef, 0);
47 }
48 }
49
31=head2 IMPORT TAGS 50=head2 IMPORT TAGS
32 51
33Nothing much can be "imported" from this module right now. 52Nothing much can be "imported" from this module right now.
34 53
35=head2 THE AnyEvent::FCP CLASS 54=head1 THE AnyEvent::FCP CLASS
36 55
37=over 4 56=over 4
38 57
39=cut 58=cut
40 59
42 61
43use common::sense; 62use common::sense;
44 63
45use Carp; 64use Carp;
46 65
47our $VERSION = '0.2'; 66our $VERSION = '0.3';
48 67
49use Scalar::Util (); 68use Scalar::Util ();
50 69
51use AnyEvent; 70use AnyEvent;
52use AnyEvent::Handle; 71use AnyEvent::Handle;
72use AnyEvent::Util ();
53 73
54sub touc($) { 74sub touc($) {
55 local $_ = shift; 75 local $_ = shift;
56 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime)(?:_|$))/\U$1/; 76 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
57 s/(?:^|_)(.)/\U$1/g; 77 s/(?:^|_)(.)/\U$1/g;
58 $_ 78 $_
59} 79}
60 80
61sub tolc($) { 81sub tolc($) {
62 local $_ = shift; 82 local $_ = shift;
63 1 while s/(SVK|CHK|URI|FCP|DS|MIME)([^_])/$1\_$2/i; 83 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/;
64 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME)/$1\_$2/i; 84 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/;
65 s/(?<=[a-z])(?=[A-Z])/_/g; 85 s/(?<=[a-z])(?=[A-Z])/_/g;
66 lc 86 lc
67} 87}
68 88
69=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 89=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, name => $name]
70 90
71Create a new FCP connection to the given host and port (default 91Create a new FCP connection to the given host and port (default
72127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). 92127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
73 93
74If no C<name> was specified, then AnyEvent::FCP will generate a 94If no C<name> was specified, then AnyEvent::FCP will generate a
75(hopefully) unique client name for you. 95(hopefully) unique client name for you.
76 96
77=cut 97=cut
78 98
79#TODO
80#You can install a progress callback that is being called with the AnyEvent::FCP
81#object, a txn object, the type of the transaction and the attributes. Use
82#it like this:
83#
84# sub progress_cb {
85# my ($self, $txn, $type, $attr) = @_;
86#
87# warn "progress<$txn,$type," . (join ":", %$attr) . ">\n";
88# }
89
90sub new { 99sub new {
91 my $class = shift; 100 my $class = shift;
92 my $self = bless { @_ }, $class; 101 my $self = bless {
93 102 host => $ENV{FREDHOST} || "127.0.0.1",
94 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 103 port => $ENV{FREDPORT} || 9481,
95 $self->{port} ||= $ENV{FREDPORT} || 9481; 104 timeout => 3600 * 2,
96 $self->{name} ||= time.rand.rand.rand; # lame 105 name => time.rand.rand.rand, # lame
97 $self->{timeout} ||= 600; 106 @_,
98 107 queue => [],
99 $self->{id} = "a0"; 108 req => {},
109 id => "a0",
110 }, $class;
100 111
101 { 112 {
102 Scalar::Util::weaken (my $self = $self); 113 Scalar::Util::weaken (my $self = $self);
103 114
104 $self->{hdl} = new AnyEvent::Handle 115 $self->{hdl} = new AnyEvent::Handle
105 connect => [$self->{host} => $self->{port}], 116 connect => [$self->{host} => $self->{port}],
106 timeout => $self->{timeout}, 117 timeout => $self->{timeout},
107 on_error => sub { 118 on_error => sub {
108 warn "<@_>\n"; 119 warn "@_\n";#d#
109 exit 1; 120 exit 1;
110 }, 121 },
111 on_read => sub { $self->on_read (@_) }, 122 on_read => sub { $self->on_read (@_) },
112 on_eof => $self->{on_eof} || sub { }; 123 on_eof => $self->{on_eof} || sub { };
113 124
114 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 125 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
115 } 126 }
116 127
117 $self->send_msg ( 128 $self->send_msg (client_hello =>
118 client_hello =>
119 name => $self->{name}, 129 name => $self->{name},
120 expected_version => "2.0", 130 expected_version => "2.0",
121 ); 131 );
122 132
123 $self 133 $self
124} 134}
125 135
126#sub progress {
127# my ($self, $txn, $type, $attr) = @_;
128#
129# $self->{progress}->($self, $txn, $type, $attr)
130# if $self->{progress};
131#}
132
133sub send_msg { 136sub send_msg {
134 my ($self, $type, %kv) = @_; 137 my ($self, $type, %kv) = @_;
135 138
136 my $data = delete $kv{data}; 139 my $data = delete $kv{data};
137 140
138 if (exists $kv{id_cb}) { 141 if (exists $kv{id_cb}) {
139 my $id = $kv{identifier} || ++$self->{id}; 142 my $id = $kv{identifier} ||= ++$self->{id};
140 $self->{id}{$id} = delete $kv{id_cb}; 143 $self->{id}{$id} = delete $kv{id_cb};
141 $kv{identifier} = $id;
142 } 144 }
143 145
144 my $msg = (touc $type) . "\012" 146 my $msg = (touc $type) . "\012"
145 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; 147 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
146 148
158 } 160 }
159 161
160 $self->{hdl}->push_write ($msg); 162 $self->{hdl}->push_write ($msg);
161} 163}
162 164
165sub on {
166 my ($self, $cb) = @_;
167
168 # cb return undef - message eaten, remove cb
169 # cb return 0 - message eaten
170 # cb return 1 - pass to next
171
172 push @{ $self->{on} }, $cb;
173}
174
175sub _push_queue {
176 my ($self, $queue) = @_;
177
178 shift @$queue;
179 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
180 if @$queue;
181}
182
183# lock so only one $type (arbitrary string) is in flight,
184# to work around horribly misdesigned protocol.
185sub serialise {
186 my ($self, $type, $cb) = @_;
187
188 my $queue = $self->{serialise}{$type} ||= [];
189 push @$queue, $cb;
190 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
191 unless $#$queue;
192}
193
194# how to merge these types into $self->{persistent}
195our %PERSISTENT_TYPE = (
196 persistent_get => sub { %{ $_[1] } = (type => "persistent_get" , %{ $_[2] }) },
197 persistent_put => sub { %{ $_[1] } = (type => "persistent_put" , %{ $_[2] }) },
198 persistent_put_dir => sub { %{ $_[1] } = (type => "persistent_put_dir", %{ $_[2] }) },
199 persistent_request_modified => sub { %{ $_[1] } = (%{ $_[1] }, %{ $_[2] }) },
200 persistent_request_removed => sub { delete $_[0]{req}{$_[2]{identifier}} },
201
202 simple_progress => sub { $_[1]{simple_progress} = $_[2] }, # get/put
203
204 uri_generated => sub { $_[1]{uri_generated} = $_[2] }, # put
205 generated_metadata => sub { $_[1]{generated_metadata} = $_[2] }, # put
206 started_compression => sub { $_[1]{started_compression} = $_[2] }, # put
207 finished_compression => sub { $_[1]{finished_compression} = $_[2] }, # put
208 put_fetchable => sub { $_[1]{put_fetchable} = $_[2] }, # put
209 put_failed => sub { $_[1]{put_failed} = $_[2] }, # put
210 put_successful => sub { $_[1]{put_successful} = $_[2] }, # put
211
212 sending_to_network => sub { $_[1]{sending_to_network} = $_[2] }, # get
213 compatibility_mode => sub { $_[1]{compatibility_mode} = $_[2] }, # get
214 expected_hashes => sub { $_[1]{expected_hashes} = $_[2] }, # get
215 expected_mime => sub { $_[1]{expected_mime} = $_[2] }, # get
216 expected_data_length => sub { $_[1]{expected_data_length} = $_[2] }, # get
217 get_failed => sub { $_[1]{get_failed} = $_[2] }, # get
218 data_found => sub { $_[1]{data_found} = $_[2] }, # get
219 enter_finite_cooldown => sub { $_[1]{enter_finite_cooldown} = $_[2] }, # get
220);
221
222sub recv {
223 my ($self, $type, $kv, @extra) = @_;
224
225 if (my $cb = $PERSISTENT_TYPE{$type}) {
226 my $id = $kv->{identifier};
227 my $req = $_[0]{req}{$id} ||= {};
228 $cb->($self, $req, $kv);
229 $self->recv (request_change => $kv, $type, @extra);
230 }
231
232 my $on = $self->{on};
233 for (0 .. $#$on) {
234 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
235 splice @$on, $_, 1 unless defined $res;
236 return;
237 }
238 }
239
240 if (my $cb = $self->{queue}[0]) {
241 $cb->($self, $type, $kv, @extra)
242 and shift @{ $self->{queue} };
243 } else {
244 $self->default_recv ($type, $kv, @extra);
245 }
246}
247
163sub on_read { 248sub on_read {
164 my ($self) = @_; 249 my ($self) = @_;
165 250
166 my $type; 251 my $type;
167 my %kv; 252 my %kv;
168 my $rdata; 253 my $rdata;
169
170 my $done_cb = sub {
171 $kv{pkt_type} = $type;
172
173 if (my $cb = $self->{queue}[0]) {
174 $cb->($self, $type, \%kv, $rdata)
175 and shift @{ $self->{queue} };
176 } else {
177 $self->default_recv ($type, \%kv, $rdata);
178 }
179 };
180 254
181 my $hdr_cb; $hdr_cb = sub { 255 my $hdr_cb; $hdr_cb = sub {
182 if ($_[1] =~ /^([^=]+)=(.*)$/) { 256 if ($_[1] =~ /^([^=]+)=(.*)$/) {
183 my ($k, $v) = ($1, $2); 257 my ($k, $v) = ($1, $2);
184 my @k = split /\./, tolc $k; 258 my @k = split /\./, tolc $k;
197 271
198 $_[0]->push_read (line => $hdr_cb); 272 $_[0]->push_read (line => $hdr_cb);
199 } elsif ($_[1] eq "Data") { 273 } elsif ($_[1] eq "Data") {
200 $_[0]->push_read (chunk => delete $kv{data_length}, sub { 274 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
201 $rdata = \$_[1]; 275 $rdata = \$_[1];
202 $done_cb->(); 276 $self->recv ($type, \%kv, $rdata);
203 }); 277 });
204 } elsif ($_[1] eq "EndMessage") { 278 } elsif ($_[1] eq "EndMessage") {
205 $done_cb->(); 279 $self->recv ($type, \%kv);
206 } else { 280 } else {
207 die "protocol error, expected message end, got $_[1]\n";#d# 281 die "protocol error, expected message end, got $_[1]\n";#d#
208 } 282 }
209 }; 283 };
210 284
220 if ($type eq "node_hello") { 294 if ($type eq "node_hello") {
221 $self->{node_hello} = $kv; 295 $self->{node_hello} = $kv;
222 } elsif (exists $self->{id}{$kv->{identifier}}) { 296 } elsif (exists $self->{id}{$kv->{identifier}}) {
223 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) 297 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
224 and delete $self->{id}{$kv->{identifier}}; 298 and delete $self->{id}{$kv->{identifier}};
225 } else {
226 # on_warn
227 #warn "protocol warning (unexpected $type message)\n";
228 } 299 }
229} 300}
301
302=back
303
304=head2 FCP REQUESTS
305
306The following methods implement various requests. Most of them map
307directory to the FCP message of the same name. The added benefit of
308these over sending requests yourself is that they handle the necessary
309serialisation, protocol quirks, and replies.
310
311All of them exist in two versions, the variant shown in this manpage, and
312a variant with an extra C<_> at the end, and an extra C<$cb> argument. The
313version as shown is I<synchronous> - it will wait for any replies, and
314either return the reply, or croak with an error. The underscore variant
315returns immediately and invokes one or more callbacks or condvars later.
316
317For example, the call
318
319 $info = $fcp->get_plugin_info ($name, $detailed);
320
321Also comes in this underscore variant:
322
323 $fcp->get_plugin_info_ ($name, $detailed, $cb);
324
325You can thinbk of the underscore as a kind of continuation indicator - the
326normal function waits and returns with the data, the C<_> indicates that
327you pass the continuation yourself, and the continuation will be invoked
328with the results.
329
330This callback/continuation argument (C<$cb>) can come in three forms itself:
331
332=over 4
333
334=item A code reference (or rather anything not matching some other alternative)
335
336This code reference will be invoked with the result on success. On an
337error, it will die (in the event loop) with a backtrace of the call site.
338
339This is a popular choice, but it makes handling errors hard - make sure
340you never generate protocol errors!
341
342=item A condvar (as returned by e.g. C<< AnyEvent->condvar >>)
343
344When a condvar is passed, it is sent (C<< $cv->send ($results) >>) the
345results when the request has finished. Should an error occur, the error
346will instead result in C<< $cv->croak ($error) >>.
347
348This is also a popular choice.
349
350=item An array with two callbacks C<[$success, $failure]>
351
352The C<$success> callback will be invoked with the results, while the
353C<$failure> callback will be invoked on any errors.
354
355=item C<undef>
356
357This is the same thing as specifying C<sub { }> as callback, i.e. on
358success, the results are ignored, while on failure, you the module dies
359with a backtrace.
360
361This is good for quick scripts, or when you really aren't interested in
362the results.
363
364=back
365
366=cut
367
368our $NOP_CB = sub { };
230 369
231sub _txn { 370sub _txn {
232 my ($name, $sub) = @_; 371 my ($name, $sub) = @_;
233 372
234 *{$name} = sub { 373 *{$name} = sub {
235 splice @_, 1, 0, (my $cv = AnyEvent->condvar); 374 my $cv = AE::cv;
236 &$sub;
237 $cv
238 };
239 375
240 *{"$name\_sync"} = sub { 376 splice @_, 1, 0, $cv, sub { $cv->throw ($_[0]{extra_description}) };
241 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
242 &$sub; 377 &$sub;
243 $cv->recv 378 $cv->recv
244 }; 379 };
245}
246 380
247=item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]]) 381 *{"$name\_"} = sub {
382 my ($ok, $err) = pop;
248 383
384 if (ARRAY:: eq ref $ok) {
385 ($ok, $err) = @$ok;
386 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
387 $err = sub { $ok->throw ($_[0]{extra_description}) };
388 } else {
389 my $bt = Carp::longmess "";
390 $err = sub {
391 die "$_[0]{extra_description}$bt";
392 };
393 }
394
395 $ok ||= $NOP_CB;
396
397 splice @_, 1, 0, $ok, $err;
398 &$sub;
399 };
400}
401
402=over 4
403
249=item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]]) 404=item $peers = $fcp->list_peers ([$with_metdata[, $with_volatile]])
250 405
251=cut 406=cut
252 407
253_txn list_peers => sub { 408_txn list_peers => sub {
254 my ($self, $cv, $with_metadata, $with_volatile) = @_; 409 my ($self, $ok, undef, $with_metadata, $with_volatile) = @_;
255 410
256 my @res; 411 my @res;
257 412
258 $self->send_msg (list_peers => 413 $self->send_msg (list_peers =>
259 with_metadata => $with_metadata ? "true" : "false", 414 with_metadata => $with_metadata ? "true" : "false",
260 with_volatile => $with_volatile ? "true" : "false", 415 with_volatile => $with_volatile ? "true" : "false",
261 id_cb => sub { 416 id_cb => sub {
262 my ($self, $type, $kv, $rdata) = @_; 417 my ($self, $type, $kv, $rdata) = @_;
263 418
264 if ($type eq "end_list_peers") { 419 if ($type eq "end_list_peers") {
265 $cv->(\@res); 420 $ok->(\@res);
266 1 421 1
267 } else { 422 } else {
268 push @res, $kv; 423 push @res, $kv;
269 0 424 0
270 } 425 }
271 }, 426 },
272 ); 427 );
273}; 428};
274 429
275=item $cv = $fcp->list_peer_notes ($node_identifier)
276
277=item $notes = $fcp->list_peer_notes_sync ($node_identifier) 430=item $notes = $fcp->list_peer_notes ($node_identifier)
278 431
279=cut 432=cut
280 433
281_txn list_peer_notes => sub { 434_txn list_peer_notes => sub {
282 my ($self, $cv, $node_identifier) = @_; 435 my ($self, $ok, undef, $node_identifier) = @_;
283 436
284 $self->send_msg (list_peer_notes => 437 $self->send_msg (list_peer_notes =>
285 node_identifier => $node_identifier, 438 node_identifier => $node_identifier,
286 id_cb => sub { 439 id_cb => sub {
287 my ($self, $type, $kv, $rdata) = @_; 440 my ($self, $type, $kv, $rdata) = @_;
288 441
289 $cv->($kv); 442 $ok->($kv);
290 1 443 1
291 }, 444 },
292 ); 445 );
293}; 446};
294 447
295=item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
296
297=item $fcp->watch_global_sync ($enabled[, $verbosity_mask]) 448=item $fcp->watch_global ($enabled[, $verbosity_mask])
298 449
299=cut 450=cut
300 451
301_txn watch_global => sub { 452_txn watch_global => sub {
302 my ($self, $cv, $enabled, $verbosity_mask) = @_; 453 my ($self, $ok, $err, $enabled, $verbosity_mask) = @_;
303 454
304 $self->send_msg (watch_global => 455 $self->send_msg (watch_global =>
305 enabled => $enabled ? "true" : "false", 456 enabled => $enabled ? "true" : "false",
306 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (), 457 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
307 ); 458 );
308 459
309 $cv->(); 460 $ok->();
310}; 461};
311 462
312=item $cv = $fcp->list_persistent_requests
313
314=item $reqs = $fcp->list_persistent_requests_sync 463=item $reqs = $fcp->list_persistent_requests
315 464
316=cut 465=cut
317 466
318_txn list_persistent_requests => sub { 467_txn list_persistent_requests => sub {
319 my ($self, $cv) = @_; 468 my ($self, $ok, $err) = @_;
320 469
470 $self->serialise (list_persistent_requests => sub {
471 my ($self, $guard) = @_;
472
321 my %res; 473 my @res;
322 474
323 $self->send_msg ("list_persistent_requests"); 475 $self->send_msg ("list_persistent_requests");
324 476
325 push @{ $self->{queue} }, sub { 477 $self->on (sub {
326 my ($self, $type, $kv, $rdata) = @_; 478 my ($self, $type, $kv, $rdata) = @_;
327 479
480 $guard if 0;
481
328 if ($type eq "end_list_persistent_requests") { 482 if ($type eq "end_list_persistent_requests") {
329 $cv->(\%res); 483 $ok->(\@res);
484 return;
485 } else {
486 my $id = $kv->{identifier};
487
488 if ($type =~ /^persistent_(get|put|put_dir)$/) {
489 push @res, [$type, $kv];
490 }
491 }
492
330 1 493 1
331 } else { 494 });
332 my $id = $kv->{identifier}; 495 });
496};
333 497
334 if ($type =~ /^persistent_(get|put|put_dir)$/) { 498=item $sync = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
335 $res{$id} = { 499
336 type => $1, 500Update either the C<client_token> or C<priority_class> of a request
337 %{ $res{$id} }, 501identified by C<$global> and C<$identifier>, depending on which of
502C<$client_token> and C<$priority_class> are not C<undef>.
503
504=cut
505
506_txn modify_persistent_request => sub {
507 my ($self, $ok, $err, $global, $identifier, $client_token, $priority_class) = @_;
508
509 $self->serialise ($identifier => sub {
510 my ($self, $guard) = @_;
511
512 $self->send_msg (modify_persistent_request =>
513 global => $global ? "true" : "false",
514 identifier => $identifier,
515 defined $client_token ? (client_token => $client_token ) : (),
516 defined $priority_class ? (priority_class => $priority_class) : (),
517 );
518
519 $self->on (sub {
520 my ($self, $type, $kv, @extra) = @_;
521
522 if ($kv->{identifier} eq $identifier) {
523 if ($type eq "persistent_request_modified") {
338 %$kv, 524 $ok->($kv);
525 return;
526 } elsif ($type eq "protocol_error") {
527 $err->($kv);
528 return;
339 }; 529 }
340 } elsif ($type eq "simple_progress") {
341 delete $kv->{pkt_type}; # save memory
342 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
343 } else {
344 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
345 } 530 }
531
346 0 532 1
347 } 533 });
348 }; 534 });
349}; 535};
350 536
351=item $cv = $fcp->remove_request ($global, $identifier)
352
353=item $status = $fcp->remove_request_sync ($global, $identifier)
354
355=cut
356
357_txn remove_request => sub {
358 my ($self, $cv, $global, $identifier) = @_;
359
360 $self->send_msg (remove_request =>
361 global => $global ? "true" : "false",
362 identifier => $identifier,
363 id_cb => sub {
364 my ($self, $type, $kv, $rdata) = @_;
365
366 $cv->($kv);
367 1
368 },
369 );
370};
371
372=item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
373
374=item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
375
376=cut
377
378_txn modify_persistent_request => sub {
379 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
380
381 $self->send_msg (modify_persistent_request =>
382 global => $global ? "true" : "false",
383 defined $client_token ? (client_token => $client_token ) : (),
384 defined $priority_class ? (priority_class => $priority_class) : (),
385 identifier => $identifier,
386 id_cb => sub {
387 my ($self, $type, $kv, $rdata) = @_;
388
389 $cv->($kv);
390 1
391 },
392 );
393};
394
395=item $cv = $fcp->get_plugin_info ($name, $detailed)
396
397=item $info = $fcp->get_plugin_info_sync ($name, $detailed) 537=item $info = $fcp->get_plugin_info ($name, $detailed)
398 538
399=cut 539=cut
400 540
401_txn get_plugin_info => sub { 541_txn get_plugin_info => sub {
402 my ($self, $cv, $name, $detailed) = @_; 542 my ($self, $ok, $err, $name, $detailed) = @_;
403 543
404 $self->send_msg (get_plugin_info => 544 $self->send_msg (get_plugin_info =>
405 plugin_name => $name, 545 plugin_name => $name,
406 detailed => $detailed ? "true" : "false", 546 detailed => $detailed ? "true" : "false",
407 id_cb => sub { 547 id_cb => sub {
408 my ($self, $type, $kv, $rdata) = @_; 548 my ($self, $type, $kv, $rdata) = @_;
409 549
410 $cv->($kv); 550 $ok->($kv);
411 1 551 1
412 }, 552 },
413 ); 553 );
414}; 554};
415 555
416=item $cv = $fcp->client_get ($uri, $identifier, %kv)
417
418=item $status = $fcp->client_get_sync ($uri, $identifier, %kv) 556=item $status = $fcp->client_get ($uri, $identifier, %kv)
419 557
420%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>). 558%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
421 559
422ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries, 560ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
423priority_class, persistence, client_token, global, return_type, 561priority_class, persistence, client_token, global, return_type,
424binary_blob, allowed_mime_types, filename, temp_filename 562binary_blob, allowed_mime_types, filename, temp_filename
425 563
426=cut 564=cut
427 565
428_txn client_get => sub { 566_txn client_get => sub {
429 my ($self, $cv, $uri, $identifier, %kv) = @_; 567 my ($self, $ok, $err, $uri, $identifier, %kv) = @_;
430 568
431 $self->send_msg (client_get => 569 $self->send_msg (client_get =>
432 %kv, 570 %kv,
433 uri => $uri, 571 uri => $uri,
434 identifier => $identifier, 572 identifier => $identifier,
435 id_cb => sub { 573 );
574
575 $ok->();
576};
577
578=item $status = $fcp->remove_request ($identifier[, $global])
579
580Remove the request with the given isdentifier. Returns true if successful,
581false on error.
582
583=cut
584
585_txn remove_request => sub {
586 my ($self, $ok, $err, $identifier, $global) = @_;
587
588 $self->serialise ($identifier => sub {
589 my ($self, $guard) = @_;
590
591 $self->send_msg (remove_request =>
592 identifier => $identifier,
593 global => $global ? "true" : "false",
594 );
595 $self->on (sub {
436 my ($self, $type, $kv, $rdata) = @_; 596 my ($self, $type, $kv, @extra) = @_;
437 597
598 if ($kv->{identifier} eq $identifier) {
599 if ($type eq "persistent_request_removed") {
600 $ok->(1);
601 return;
602 } elsif ($type eq "protocol_error") {
438 $cv->($kv); 603 $err->($kv);
604 return;
605 }
606 }
607
439 1 608 1
609 });
610 });
611};
612
613=item ($can_read, $can_write) = $fcp->test_dda ($local_directory, $remote_directory, $want_read, $want_write))
614
615The DDA test in FCP is probably the single most broken protocol - only
616one directory test can be outstanding at any time, and some guessing and
617heuristics are involved in mangling the paths.
618
619This function combines C<TestDDARequest> and C<TestDDAResponse> in one
620request, handling file reading and writing as well, and tries very hard to
621do the right thing.
622
623Both C<$local_directory> and C<$remote_directory> must specify the same
624directory - C<$local_directory> is the directory path on the client (where
625L<AnyEvent::FCP> runs) and C<$remote_directory> is the directory path on
626the server (where the freenet node runs). When both are running on the
627same node, the paths are generally identical.
628
629C<$want_read> and C<$want_write> should be set to a true value when you
630want to read (get) files or write (put) files, respectively.
631
632On error, an exception is thrown. Otherwise, C<$can_read> and
633C<$can_write> indicate whether you can reaqd or write to freenet via the
634directory.
635
636=cut
637
638_txn test_dda => sub {
639 my ($self, $ok, $err, $local, $remote, $want_read, $want_write) = @_;
640
641 $self->serialise (test_dda => sub {
642 my ($self, $guard) = @_;
643
644 $self->send_msg (test_dda_request =>
645 directory => $remote,
646 want_read_directory => $want_read ? "true" : "false",
647 want_write_directory => $want_write ? "true" : "false",
648 );
649 $self->on (sub {
650 my ($self, $type, $kv) = @_;
651
652 if ($type eq "test_dda_reply") {
653 # the filenames are all relative to the server-side directory,
654 # which might or might not match $remote anymore, so we
655 # need to rewrite the paths to be relative to $local
656 for my $k (qw(read_filename write_filename)) {
657 my $f = $kv->{$k};
658 for my $dir ($kv->{directory}, $remote) {
659 if ($dir eq substr $f, 0, length $dir) {
660 substr $f, 0, 1 + length $dir, "";
661 $kv->{$k} = $f;
662 last;
663 }
664 }
665 }
666
667 my %response = (directory => $remote);
668
669 if (length $kv->{read_filename}) {
670 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
671 sysread $fh, my $buf, -s $fh;
672 $response{read_content} = $buf;
673 }
674 }
675
676 if (length $kv->{write_filename}) {
677 if (open my $fh, ">:raw", "$local/$kv->{write_filename}") {
678 syswrite $fh, $kv->{content_to_write};
679 }
680 }
681
682 $self->send_msg (test_dda_response => %response);
683
684 $self->on (sub {
685 my ($self, $type, $kv) = @_;
686
687 $guard if 0; # reference
688
689 if ($type eq "test_dda_complete") {
690 $ok->(
691 $kv->{read_directory_allowed} eq "true",
692 $kv->{write_directory_allowed} eq "true",
693 );
694 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
695 $err->($kv->{extra_description});
696 return;
697 }
698
699 1
700 });
701
702 return;
703 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
704 $err->($kv);
705 return;
706 }
707
708 1
709 });
710 });
711};
712
713=back
714
715=head2 REQUEST CACHE
716
717The C<AnyEvent::FCP> class keeps a request cache, where it caches all
718information from requests.
719
720For these messages, it will store a copy of the key-value pairs, together with a C<type> slot,
721in C<< $fcp->{req}{$identifier} >>:
722
723 persistent_get
724 persistent_put
725 persistent_put_dir
726
727This message updates the stored data:
728
729 persistent_request_modified
730
731This message will remove this entry:
732
733 persistent_request_removed
734
735These messages get merged into the cache entry, under their
736type, i.e. a C<simple_progress> message will be stored in C<<
737$fcp->{req}{$identifier}{simple_progress} >>:
738
739 simple_progress # get/put
740
741 uri_generated # put
742 generated_metadata # put
743 started_compression # put
744 finished_compression # put
745 put_failed # put
746 put_fetchable # put
747 put_successful # put
748
749 sending_to_network # get
750 compatibility_mode # get
751 expected_hashes # get
752 expected_mime # get
753 expected_data_length # get
754 get_failed # get
755 data_found # get
756 enter_finite_cooldown # get
757
758In addition, an event (basically a fake message) of type C<request_changed> is generated
759on every change, which will be called as C<< $cb->($fcp, $kv, $type) >>, where C<$type>
760is the type of the original message triggering the change,
761
762To fill this cache with the global queue and keep it updated,
763call C<watch_global> to subscribe to updates, followed by
764C<list_persistent_requests_sync>.
765
766 $fcp->watch_global_sync_; # do not wait
767 $fcp->list_persistent_requests; # wait
768
769To get a better idea of what is stored in the cache, here is an example of
770what might be stored in C<< $fcp->{req}{"Frost-gpl.txt"} >>:
771
772 {
773 identifier => "Frost-gpl.txt",
774 uri => 'CHK@Fnx5kzdrfE,EImdzaVyEWl,AAIC--8/gpl.txt',
775 binary_blob => "false",
776 global => "true",
777 max_retries => -1,
778 max_size => 9223372036854775807,
779 persistence => "forever",
780 priority_class => 3,
781 real_time => "false",
782 return_type => "direct",
783 started => "true",
784 type => "persistent_get",
785 verbosity => 2147483647,
786 sending_to_network => {
787 identifier => "Frost-gpl.txt",
788 global => "true",
440 }, 789 },
441 ); 790 compatibility_mode => {
442}; 791 identifier => "Frost-gpl.txt",
443 792 definitive => "true",
444=back 793 dont_compress => "false",
794 global => "true",
795 max => "COMPAT_1255",
796 min => "COMPAT_1255",
797 },
798 expected_hashes => {
799 identifier => "Frost-gpl.txt",
800 global => "true",
801 hashes => {
802 ed2k => "d83596f5ee3b7...",
803 md5 => "e0894e4a2a6...",
804 sha1 => "...",
805 sha256 => "...",
806 sha512 => "...",
807 tth => "...",
808 },
809 },
810 expected_mime => {
811 identifier => "Frost-gpl.txt",
812 global => "true",
813 metadata => { content_type => "application/rar" },
814 },
815 expected_data_length => {
816 identifier => "Frost-gpl.txt",
817 data_length => 37576,
818 global => "true",
819 },
820 simple_progress => {
821 identifier => "Frost-gpl.txt",
822 failed => 0,
823 fatally_failed => 0,
824 finalized_total => "true",
825 global => "true",
826 last_progress => 1438639282628,
827 required => 372,
828 succeeded => 102,
829 total => 747,
830 },
831 data_found => {
832 identifier => "Frost-gpl.txt",
833 completion_time => 1438663354026,
834 data_length => 37576,
835 global => "true",
836 metadata => { content_type => "image/jpeg" },
837 startup_time => 1438657196167,
838 },
839 }
445 840
446=head1 EXAMPLE PROGRAM 841=head1 EXAMPLE PROGRAM
447 842
448 use AnyEvent::FCP; 843 use AnyEvent::FCP;
449 844
450 my $fcp = new AnyEvent::FCP; 845 my $fcp = new AnyEvent::FCP;
451 846
452 # let us look at the global request list 847 # let us look at the global request list
453 $fcp->watch_global (1, 0); 848 $fcp->watch_global_ (1);
454 849
455 # list them, synchronously 850 # list them, synchronously
456 my $req = $fcp->list_persistent_requests_sync; 851 my $req = $fcp->list_persistent_requests;
457 852
458 # go through all requests 853 # go through all requests
854TODO
459 for my $req (values %$req) { 855 for my $req (values %$req) {
460 # skip jobs not directly-to-disk 856 # skip jobs not directly-to-disk
461 next unless $req->{return_type} eq "disk"; 857 next unless $req->{return_type} eq "disk";
462 # skip jobs not issued by FProxy 858 # skip jobs not issued by FProxy
463 next unless $req->{identifier} =~ /^FProxy:/; 859 next unless $req->{identifier} =~ /^FProxy:/;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines