ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.6 by root, Mon May 31 06:27:37 2010 UTC vs.
Revision 1.15 by root, Fri Aug 14 03:33:13 2015 UTC

6 6
7 use AnyEvent::FCP; 7 use AnyEvent::FCP;
8 8
9 my $fcp = new AnyEvent::FCP; 9 my $fcp = new AnyEvent::FCP;
10 10
11# transactions return condvars 11 # transactions return condvars
12 my $lp_cv = $fcp->list_peers; 12 my $lp_cv = $fcp->list_peers;
13 my $pr_cv = $fcp->list_persistent_requests; 13 my $pr_cv = $fcp->list_persistent_requests;
14 14
15 my $peers = $lp_cv->recv; 15 my $peers = $lp_cv->recv;
16 my $reqs = $pr_cv->recv; 16 my $reqs = $pr_cv->recv;
26The module uses L<AnyEvent> to find a suitable event module. 26The module uses L<AnyEvent> to find a suitable event module.
27 27
28Only very little is implemented, ask if you need more, and look at the 28Only very little is implemented, ask if you need more, and look at the
29example program later in this section. 29example program later in this section.
30 30
31=head2 EXAMPLE
32
33This example fetches the download list and sets the priority of all files
34with "a" in their name to "emergency":
35
36 use AnyEvent::FCP;
37
38 my $fcp = new AnyEvent::FCP;
39
40 $fcp->watch_global (1, 0);
41 my $req = $fcp->list_persistent_requests;
42
43TODO
44 for my $req (values %$req) {
45 if ($req->{filename} =~ /a/) {
46 $fcp->modify_persistent_request (1, $req->{identifier}, undef, 0);
47 }
48 }
49
31=head2 IMPORT TAGS 50=head2 IMPORT TAGS
32 51
33Nothing much can be "imported" from this module right now. 52Nothing much can be "imported" from this module right now.
34 53
35=head2 THE AnyEvent::FCP CLASS 54=head1 THE AnyEvent::FCP CLASS
36 55
37=over 4 56=over 4
38 57
39=cut 58=cut
40 59
42 61
43use common::sense; 62use common::sense;
44 63
45use Carp; 64use Carp;
46 65
47our $VERSION = '0.21'; 66our $VERSION = '0.3';
48 67
49use Scalar::Util (); 68use Scalar::Util ();
50 69
51use AnyEvent; 70use AnyEvent;
52use AnyEvent::Handle; 71use AnyEvent::Handle;
72use AnyEvent::Util ();
53 73
54sub touc($) { 74sub touc($) {
55 local $_ = shift; 75 local $_ = shift;
56 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime)(?:_|$))/\U$1/; 76 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
57 s/(?:^|_)(.)/\U$1/g; 77 s/(?:^|_)(.)/\U$1/g;
58 $_ 78 $_
59} 79}
60 80
61sub tolc($) { 81sub tolc($) {
62 local $_ = shift; 82 local $_ = shift;
63 1 while s/(SVK|CHK|URI|FCP|DS|MIME)([^_])/$1\_$2/i; 83 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/;
64 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME)/$1\_$2/i; 84 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/;
65 s/(?<=[a-z])(?=[A-Z])/_/g; 85 s/(?<=[a-z])(?=[A-Z])/_/g;
66 lc 86 lc
67} 87}
68 88
69=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 89=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, name => $name]
70 90
71Create a new FCP connection to the given host and port (default 91Create a new FCP connection to the given host and port (default
72127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). 92127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
73 93
74If no C<name> was specified, then AnyEvent::FCP will generate a 94If no C<name> was specified, then AnyEvent::FCP will generate a
75(hopefully) unique client name for you. 95(hopefully) unique client name for you.
76 96
77You can install a progress callback that is being called with the AnyEvent::FCP
78object, the type, a hashref with key-value pairs and a reference to any received data,
79for all unsolicited messages.
80
81Example:
82
83 sub progress_cb {
84 my ($self, $type, $kv, $rdata) = @_;
85
86 if ($type eq "simple_progress") {
87 warn "$kv->{identifier} $kv->{succeeded}/$kv->{required}\n";
88 }
89 }
90
91=cut 97=cut
92 98
93sub new { 99sub new {
94 my $class = shift; 100 my $class = shift;
101
102 my $rand = join "", map chr 0x21 + rand 94, 1..40; # ~ 262 bits entropy
103
95 my $self = bless { @_ }, $class; 104 my $self = bless {
96 105 host => $ENV{FREDHOST} || "127.0.0.1",
97 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 106 port => $ENV{FREDPORT} || 9481,
98 $self->{port} ||= $ENV{FREDPORT} || 9481; 107 timeout => 3600 * 2,
99 $self->{name} ||= time.rand.rand.rand; # lame 108 name => time.rand.rand.rand, # lame
100 $self->{timeout} ||= 600; 109 @_,
101 $self->{progress} ||= sub { }; 110 queue => [],
102 111 req => {},
103 $self->{id} = "a0"; 112 prefix => "..:aefcpid:$rand:",
113 idseq => "a0",
114 }, $class;
104 115
105 { 116 {
106 Scalar::Util::weaken (my $self = $self); 117 Scalar::Util::weaken (my $self = $self);
107 118
108 $self->{hdl} = new AnyEvent::Handle 119 $self->{hdl} = new AnyEvent::Handle
109 connect => [$self->{host} => $self->{port}], 120 connect => [$self->{host} => $self->{port}],
110 timeout => $self->{timeout}, 121 timeout => $self->{timeout},
111 on_error => sub { 122 on_error => sub {
112 warn "<@_>\n"; 123 warn "@_\n";#d#
113 exit 1; 124 exit 1;
114 }, 125 },
115 on_read => sub { $self->on_read (@_) }, 126 on_read => sub { $self->on_read (@_) },
116 on_eof => $self->{on_eof} || sub { }; 127 on_eof => $self->{on_eof} || sub { };
117 128
118 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 129 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
119 } 130 }
120 131
121 $self->send_msg ( 132 $self->send_msg (client_hello =>
122 client_hello =>
123 name => $self->{name}, 133 name => $self->{name},
124 expected_version => "2.0", 134 expected_version => "2.0",
125 ); 135 );
126 136
127 $self 137 $self
128} 138}
129 139
140sub identifier {
141 $_[0]{prefix} . ++$_[0]{idseq}
142}
143
130sub send_msg { 144sub send_msg {
131 my ($self, $type, %kv) = @_; 145 my ($self, $type, %kv) = @_;
132 146
133 my $data = delete $kv{data}; 147 my $data = delete $kv{data};
134 148
135 if (exists $kv{id_cb}) { 149 if (exists $kv{id_cb}) {
136 my $id = $kv{identifier} || ++$self->{id}; 150 my $id = $kv{identifier} ||= $self->identifier;
137 $self->{id}{$id} = delete $kv{id_cb}; 151 $self->{id}{$id} = delete $kv{id_cb};
138 $kv{identifier} = $id;
139 } 152 }
140 153
141 my $msg = (touc $type) . "\012" 154 my $msg = (touc $type) . "\012"
142 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; 155 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
143 156
155 } 168 }
156 169
157 $self->{hdl}->push_write ($msg); 170 $self->{hdl}->push_write ($msg);
158} 171}
159 172
173sub on {
174 my ($self, $cb) = @_;
175
176 # cb return undef - message eaten, remove cb
177 # cb return 0 - message eaten
178 # cb return 1 - pass to next
179
180 push @{ $self->{on} }, $cb;
181}
182
183sub _push_queue {
184 my ($self, $queue) = @_;
185
186 shift @$queue;
187 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
188 if @$queue;
189}
190
191# lock so only one $type (arbitrary string) is in flight,
192# to work around horribly misdesigned protocol.
193sub serialise {
194 my ($self, $type, $cb) = @_;
195
196 my $queue = $self->{serialise}{$type} ||= [];
197 push @$queue, $cb;
198 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
199 unless $#$queue;
200}
201
202# how to merge these types into $self->{persistent}
203our %PERSISTENT_TYPE = (
204 persistent_get => sub { %{ $_[1] } = (type => "persistent_get" , %{ $_[2] }) },
205 persistent_put => sub { %{ $_[1] } = (type => "persistent_put" , %{ $_[2] }) },
206 persistent_put_dir => sub { %{ $_[1] } = (type => "persistent_put_dir", %{ $_[2] }) },
207 persistent_request_modified => sub { %{ $_[1] } = (%{ $_[1] }, %{ $_[2] }) },
208 persistent_request_removed => sub { delete $_[0]{req}{$_[2]{identifier}} },
209
210 simple_progress => sub { $_[1]{simple_progress} = $_[2] }, # get/put
211
212 uri_generated => sub { $_[1]{uri_generated} = $_[2] }, # put
213 generated_metadata => sub { $_[1]{generated_metadata} = $_[2] }, # put
214 started_compression => sub { $_[1]{started_compression} = $_[2] }, # put
215 finished_compression => sub { $_[1]{finished_compression} = $_[2] }, # put
216 put_fetchable => sub { $_[1]{put_fetchable} = $_[2] }, # put
217 put_failed => sub { $_[1]{put_failed} = $_[2] }, # put
218 put_successful => sub { $_[1]{put_successful} = $_[2] }, # put
219
220 sending_to_network => sub { $_[1]{sending_to_network} = $_[2] }, # get
221 compatibility_mode => sub { $_[1]{compatibility_mode} = $_[2] }, # get
222 expected_hashes => sub { $_[1]{expected_hashes} = $_[2] }, # get
223 expected_mime => sub { $_[1]{expected_mime} = $_[2] }, # get
224 expected_data_length => sub { $_[1]{expected_data_length} = $_[2] }, # get
225 get_failed => sub { $_[1]{get_failed} = $_[2] }, # get
226 data_found => sub { $_[1]{data_found} = $_[2] }, # get
227 enter_finite_cooldown => sub { $_[1]{enter_finite_cooldown} = $_[2] }, # get
228);
229
230sub recv {
231 my ($self, $type, $kv, @extra) = @_;
232
233 if (my $cb = $PERSISTENT_TYPE{$type}) {
234 my $id = $kv->{identifier};
235 my $req = $_[0]{req}{$id} ||= {};
236 $cb->($self, $req, $kv);
237 $self->recv (request_change => $kv, $type, @extra);
238 }
239
240 my $on = $self->{on};
241 for (0 .. $#$on) {
242 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
243 splice @$on, $_, 1 unless defined $res;
244 return;
245 }
246 }
247
248 if (my $cb = $self->{queue}[0]) {
249 $cb->($self, $type, $kv, @extra)
250 and shift @{ $self->{queue} };
251 } else {
252 $self->default_recv ($type, $kv, @extra);
253 }
254}
255
160sub on_read { 256sub on_read {
161 my ($self) = @_; 257 my ($self) = @_;
162 258
163 my $type; 259 my $type;
164 my %kv; 260 my %kv;
165 my $rdata; 261 my $rdata;
166
167 my $done_cb = sub {
168 $kv{pkt_type} = $type;
169
170 if (my $cb = $self->{queue}[0]) {
171 $cb->($self, $type, \%kv, $rdata)
172 and shift @{ $self->{queue} };
173 } else {
174 $self->default_recv ($type, \%kv, $rdata);
175 }
176 };
177 262
178 my $hdr_cb; $hdr_cb = sub { 263 my $hdr_cb; $hdr_cb = sub {
179 if ($_[1] =~ /^([^=]+)=(.*)$/) { 264 if ($_[1] =~ /^([^=]+)=(.*)$/) {
180 my ($k, $v) = ($1, $2); 265 my ($k, $v) = ($1, $2);
181 my @k = split /\./, tolc $k; 266 my @k = split /\./, tolc $k;
194 279
195 $_[0]->push_read (line => $hdr_cb); 280 $_[0]->push_read (line => $hdr_cb);
196 } elsif ($_[1] eq "Data") { 281 } elsif ($_[1] eq "Data") {
197 $_[0]->push_read (chunk => delete $kv{data_length}, sub { 282 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
198 $rdata = \$_[1]; 283 $rdata = \$_[1];
199 $done_cb->(); 284 $self->recv ($type, \%kv, $rdata);
200 }); 285 });
201 } elsif ($_[1] eq "EndMessage") { 286 } elsif ($_[1] eq "EndMessage") {
202 $done_cb->(); 287 $self->recv ($type, \%kv);
203 } else { 288 } else {
204 die "protocol error, expected message end, got $_[1]\n";#d# 289 die "protocol error, expected message end, got $_[1]\n";#d#
205 } 290 }
206 }; 291 };
207 292
217 if ($type eq "node_hello") { 302 if ($type eq "node_hello") {
218 $self->{node_hello} = $kv; 303 $self->{node_hello} = $kv;
219 } elsif (exists $self->{id}{$kv->{identifier}}) { 304 } elsif (exists $self->{id}{$kv->{identifier}}) {
220 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) 305 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
221 and delete $self->{id}{$kv->{identifier}}; 306 and delete $self->{id}{$kv->{identifier}};
222 } else {
223 &{ $self->{progress} };
224 } 307 }
225} 308}
309
310=back
311
312=head2 FCP REQUESTS
313
314The following methods implement various requests. Most of them map
315directory to the FCP message of the same name. The added benefit of
316these over sending requests yourself is that they handle the necessary
317serialisation, protocol quirks, and replies.
318
319All of them exist in two versions, the variant shown in this manpage, and
320a variant with an extra C<_> at the end, and an extra C<$cb> argument. The
321version as shown is I<synchronous> - it will wait for any replies, and
322either return the reply, or croak with an error. The underscore variant
323returns immediately and invokes one or more callbacks or condvars later.
324
325For example, the call
326
327 $info = $fcp->get_plugin_info ($name, $detailed);
328
329Also comes in this underscore variant:
330
331 $fcp->get_plugin_info_ ($name, $detailed, $cb);
332
333You can thinbk of the underscore as a kind of continuation indicator - the
334normal function waits and returns with the data, the C<_> indicates that
335you pass the continuation yourself, and the continuation will be invoked
336with the results.
337
338This callback/continuation argument (C<$cb>) can come in three forms itself:
339
340=over 4
341
342=item A code reference (or rather anything not matching some other alternative)
343
344This code reference will be invoked with the result on success. On an
345error, it will die (in the event loop) with a backtrace of the call site.
346
347This is a popular choice, but it makes handling errors hard - make sure
348you never generate protocol errors!
349
350=item A condvar (as returned by e.g. C<< AnyEvent->condvar >>)
351
352When a condvar is passed, it is sent (C<< $cv->send ($results) >>) the
353results when the request has finished. Should an error occur, the error
354will instead result in C<< $cv->croak ($error) >>.
355
356This is also a popular choice.
357
358=item An array with two callbacks C<[$success, $failure]>
359
360The C<$success> callback will be invoked with the results, while the
361C<$failure> callback will be invoked on any errors.
362
363=item C<undef>
364
365This is the same thing as specifying C<sub { }> as callback, i.e. on
366success, the results are ignored, while on failure, you the module dies
367with a backtrace.
368
369This is good for quick scripts, or when you really aren't interested in
370the results.
371
372=back
373
374=cut
375
376our $NOP_CB = sub { };
226 377
227sub _txn { 378sub _txn {
228 my ($name, $sub) = @_; 379 my ($name, $sub) = @_;
229 380
230 *{$name} = sub { 381 *{$name} = sub {
231 splice @_, 1, 0, (my $cv = AnyEvent->condvar); 382 my $cv = AE::cv;
232 &$sub;
233 $cv
234 };
235 383
236 *{"$name\_sync"} = sub { 384 splice @_, 1, 0, $cv, sub { $cv->croak ($_[0]{extra_description}) };
237 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
238 &$sub; 385 &$sub;
239 $cv->recv 386 $cv->recv
240 }; 387 };
241}
242 388
243=item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]]) 389 *{"$name\_"} = sub {
390 my ($ok, $err) = pop;
244 391
392 if (ARRAY:: eq ref $ok) {
393 ($ok, $err) = @$ok;
394 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
395 $err = sub { $ok->croak ($_[0]{extra_description}) };
396 } else {
397 my $bt = Carp::longmess "";
398 $err = sub {
399 die "$_[0]{code_description} ($_[0]{extra_description})$bt";
400 };
401 }
402
403 $ok ||= $NOP_CB;
404
405 splice @_, 1, 0, $ok, $err;
406 &$sub;
407 };
408}
409
410=over 4
411
245=item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]]) 412=item $peers = $fcp->list_peers ([$with_metdata[, $with_volatile]])
246 413
247=cut 414=cut
248 415
249_txn list_peers => sub { 416_txn list_peers => sub {
250 my ($self, $cv, $with_metadata, $with_volatile) = @_; 417 my ($self, $ok, undef, $with_metadata, $with_volatile) = @_;
251 418
252 my @res; 419 my @res;
253 420
254 $self->send_msg (list_peers => 421 $self->send_msg (list_peers =>
255 with_metadata => $with_metadata ? "true" : "false", 422 with_metadata => $with_metadata ? "true" : "false",
256 with_volatile => $with_volatile ? "true" : "false", 423 with_volatile => $with_volatile ? "true" : "false",
257 id_cb => sub { 424 id_cb => sub {
258 my ($self, $type, $kv, $rdata) = @_; 425 my ($self, $type, $kv, $rdata) = @_;
259 426
260 if ($type eq "end_list_peers") { 427 if ($type eq "end_list_peers") {
261 $cv->(\@res); 428 $ok->(\@res);
262 1 429 1
263 } else { 430 } else {
264 push @res, $kv; 431 push @res, $kv;
265 0 432 0
266 } 433 }
267 }, 434 },
268 ); 435 );
269}; 436};
270 437
271=item $cv = $fcp->list_peer_notes ($node_identifier)
272
273=item $notes = $fcp->list_peer_notes_sync ($node_identifier) 438=item $notes = $fcp->list_peer_notes ($node_identifier)
274 439
275=cut 440=cut
276 441
277_txn list_peer_notes => sub { 442_txn list_peer_notes => sub {
278 my ($self, $cv, $node_identifier) = @_; 443 my ($self, $ok, undef, $node_identifier) = @_;
279 444
280 $self->send_msg (list_peer_notes => 445 $self->send_msg (list_peer_notes =>
281 node_identifier => $node_identifier, 446 node_identifier => $node_identifier,
282 id_cb => sub { 447 id_cb => sub {
283 my ($self, $type, $kv, $rdata) = @_; 448 my ($self, $type, $kv, $rdata) = @_;
284 449
285 $cv->($kv); 450 $ok->($kv);
286 1 451 1
287 }, 452 },
288 ); 453 );
289}; 454};
290 455
291=item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
292
293=item $fcp->watch_global_sync ($enabled[, $verbosity_mask]) 456=item $fcp->watch_global ($enabled[, $verbosity_mask])
294 457
295=cut 458=cut
296 459
297_txn watch_global => sub { 460_txn watch_global => sub {
298 my ($self, $cv, $enabled, $verbosity_mask) = @_; 461 my ($self, $ok, $err, $enabled, $verbosity_mask) = @_;
299 462
300 $self->send_msg (watch_global => 463 $self->send_msg (watch_global =>
301 enabled => $enabled ? "true" : "false", 464 enabled => $enabled ? "true" : "false",
302 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (), 465 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
303 ); 466 );
304 467
305 $cv->(); 468 $ok->();
306}; 469};
307 470
308=item $cv = $fcp->list_persistent_requests
309
310=item $reqs = $fcp->list_persistent_requests_sync 471=item $reqs = $fcp->list_persistent_requests
311 472
312=cut 473=cut
313 474
314_txn list_persistent_requests => sub { 475_txn list_persistent_requests => sub {
315 my ($self, $cv) = @_; 476 my ($self, $ok, $err) = @_;
316 477
478 $self->serialise (list_persistent_requests => sub {
479 my ($self, $guard) = @_;
480
317 my %res; 481 my @res;
318 482
319 $self->send_msg ("list_persistent_requests"); 483 $self->send_msg ("list_persistent_requests");
320 484
321 push @{ $self->{queue} }, sub { 485 $self->on (sub {
322 my ($self, $type, $kv, $rdata) = @_; 486 my ($self, $type, $kv, $rdata) = @_;
323 487
488 $guard if 0;
489
324 if ($type eq "end_list_persistent_requests") { 490 if ($type eq "end_list_persistent_requests") {
325 $cv->(\%res); 491 $ok->(\@res);
492 return;
493 } else {
494 my $id = $kv->{identifier};
495
496 if ($type =~ /^persistent_(get|put|put_dir)$/) {
497 push @res, [$type, $kv];
498 }
499 }
500
326 1 501 1
327 } else { 502 });
328 my $id = $kv->{identifier}; 503 });
504};
329 505
330 if ($type =~ /^persistent_(get|put|put_dir)$/) { 506=item $sync = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
331 $res{$id} = { 507
332 type => $1, 508Update either the C<client_token> or C<priority_class> of a request
333 %{ $res{$id} }, 509identified by C<$global> and C<$identifier>, depending on which of
510C<$client_token> and C<$priority_class> are not C<undef>.
511
512=cut
513
514_txn modify_persistent_request => sub {
515 my ($self, $ok, $err, $global, $identifier, $client_token, $priority_class) = @_;
516
517 $self->serialise ($identifier => sub {
518 my ($self, $guard) = @_;
519
520 $self->send_msg (modify_persistent_request =>
521 global => $global ? "true" : "false",
522 identifier => $identifier,
523 defined $client_token ? (client_token => $client_token ) : (),
524 defined $priority_class ? (priority_class => $priority_class) : (),
525 );
526
527 $self->on (sub {
528 my ($self, $type, $kv, @extra) = @_;
529
530 $guard if 0;
531
532 if ($kv->{identifier} eq $identifier) {
533 if ($type eq "persistent_request_modified") {
334 %$kv, 534 $ok->($kv);
535 return;
536 } elsif ($type eq "protocol_error") {
537 $err->($kv);
538 return;
335 }; 539 }
336 } elsif ($type eq "simple_progress") {
337 delete $kv->{pkt_type}; # save memory
338 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
339 } else {
340 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
341 } 540 }
541
342 0 542 1
343 } 543 });
344 }; 544 });
345}; 545};
346 546
347=item $cv = $fcp->remove_request ($global, $identifier)
348
349=item $status = $fcp->remove_request_sync ($global, $identifier)
350
351=cut
352
353_txn remove_request => sub {
354 my ($self, $cv, $global, $identifier) = @_;
355
356 $self->send_msg (remove_request =>
357 global => $global ? "true" : "false",
358 identifier => $identifier,
359 id_cb => sub {
360 my ($self, $type, $kv, $rdata) = @_;
361
362 $cv->($kv);
363 1
364 },
365 );
366};
367
368=item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
369
370=item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
371
372=cut
373
374_txn modify_persistent_request => sub {
375 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
376
377 $self->send_msg (modify_persistent_request =>
378 global => $global ? "true" : "false",
379 defined $client_token ? (client_token => $client_token ) : (),
380 defined $priority_class ? (priority_class => $priority_class) : (),
381 identifier => $identifier,
382 id_cb => sub {
383 my ($self, $type, $kv, $rdata) = @_;
384
385 $cv->($kv);
386 1
387 },
388 );
389};
390
391=item $cv = $fcp->get_plugin_info ($name, $detailed)
392
393=item $info = $fcp->get_plugin_info_sync ($name, $detailed) 547=item $info = $fcp->get_plugin_info ($name, $detailed)
394 548
395=cut 549=cut
396 550
397_txn get_plugin_info => sub { 551_txn get_plugin_info => sub {
398 my ($self, $cv, $name, $detailed) = @_; 552 my ($self, $ok, $err, $name, $detailed) = @_;
553
554 my $id = $self->identifier;
399 555
400 $self->send_msg (get_plugin_info => 556 $self->send_msg (get_plugin_info =>
557 identifier => $id,
401 plugin_name => $name, 558 plugin_name => $name,
402 detailed => $detailed ? "true" : "false", 559 detailed => $detailed ? "true" : "false",
403 id_cb => sub {
404 my ($self, $type, $kv, $rdata) = @_;
405
406 $cv->($kv);
407 1
408 },
409 ); 560 );
561 $self->on (sub {
562 my ($self, $type, $kv) = @_;
563
564 if ($kv->{identifier} eq $id) {
565 if ($type eq "get_plugin_info") {
566 $ok->($kv);
567 } else {
568 $err->($kv, $type);
569 }
570 return;
571 }
572
573 1
574 });
410}; 575};
411 576
412=item $cv = $fcp->client_get ($uri, $identifier, %kv)
413
414=item $status = $fcp->client_get_sync ($uri, $identifier, %kv) 577=item $status = $fcp->client_get ($uri, $identifier, %kv)
415 578
416%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>). 579%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
417 580
418ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries, 581ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
419priority_class, persistence, client_token, global, return_type, 582priority_class, persistence, client_token, global, return_type,
420binary_blob, allowed_mime_types, filename, temp_filename 583binary_blob, allowed_mime_types, filename, temp_filename
421 584
422=cut 585=cut
423 586
424_txn client_get => sub { 587_txn client_get => sub {
425 my ($self, $cv, $uri, $identifier, %kv) = @_; 588 my ($self, $ok, $err, $uri, $identifier, %kv) = @_;
426 589
590 $self->serialise ($identifier => sub {
591 my ($self, $guard) = @_;
592
427 $self->send_msg (client_get => 593 $self->send_msg (client_get =>
428 %kv, 594 %kv,
429 uri => $uri, 595 uri => $uri,
430 identifier => $identifier, 596 identifier => $identifier,
431 id_cb => sub { 597 );
598
599 $self->on (sub {
432 my ($self, $type, $kv, $rdata) = @_; 600 my ($self, $type, $kv, @extra) = @_;
433 601
602 $guard if 0;
603
604 if ($kv->{identifier} eq $identifier) {
605 if ($type eq "persistent_get") {
434 $cv->($kv); 606 $ok->($kv);
607 return;
608 } elsif ($type eq "protocol_error") {
609 $err->($kv);
610 return;
611 }
612 }
613
435 1 614 1
615 });
616 });
617};
618
619=item $status = $fcp->remove_request ($identifier[, $global])
620
621Remove the request with the given isdentifier. Returns true if successful,
622false on error.
623
624=cut
625
626_txn remove_request => sub {
627 my ($self, $ok, $err, $identifier, $global) = @_;
628
629 $self->serialise ($identifier => sub {
630 my ($self, $guard) = @_;
631
632 $self->send_msg (remove_request =>
633 identifier => $identifier,
634 global => $global ? "true" : "false",
635 );
636 $self->on (sub {
637 my ($self, $type, $kv, @extra) = @_;
638
639 $guard if 0;
640
641 if ($kv->{identifier} eq $identifier) {
642 if ($type eq "persistent_request_removed") {
643 $ok->(1);
644 return;
645 } elsif ($type eq "protocol_error") {
646 $err->($kv);
647 return;
648 }
649 }
650
651 1
652 });
653 });
654};
655
656=item ($can_read, $can_write) = $fcp->test_dda ($local_directory, $remote_directory, $want_read, $want_write))
657
658The DDA test in FCP is probably the single most broken protocol - only
659one directory test can be outstanding at any time, and some guessing and
660heuristics are involved in mangling the paths.
661
662This function combines C<TestDDARequest> and C<TestDDAResponse> in one
663request, handling file reading and writing as well, and tries very hard to
664do the right thing.
665
666Both C<$local_directory> and C<$remote_directory> must specify the same
667directory - C<$local_directory> is the directory path on the client (where
668L<AnyEvent::FCP> runs) and C<$remote_directory> is the directory path on
669the server (where the freenet node runs). When both are running on the
670same node, the paths are generally identical.
671
672C<$want_read> and C<$want_write> should be set to a true value when you
673want to read (get) files or write (put) files, respectively.
674
675On error, an exception is thrown. Otherwise, C<$can_read> and
676C<$can_write> indicate whether you can reaqd or write to freenet via the
677directory.
678
679=cut
680
681_txn test_dda => sub {
682 my ($self, $ok, $err, $local, $remote, $want_read, $want_write) = @_;
683
684 $self->serialise (test_dda => sub {
685 my ($self, $guard) = @_;
686
687 $self->send_msg (test_dda_request =>
688 directory => $remote,
689 want_read_directory => $want_read ? "true" : "false",
690 want_write_directory => $want_write ? "true" : "false",
691 );
692 $self->on (sub {
693 my ($self, $type, $kv) = @_;
694
695 if ($type eq "test_dda_reply") {
696 # the filenames are all relative to the server-side directory,
697 # which might or might not match $remote anymore, so we
698 # need to rewrite the paths to be relative to $local
699 for my $k (qw(read_filename write_filename)) {
700 my $f = $kv->{$k};
701 for my $dir ($kv->{directory}, $remote) {
702 if ($dir eq substr $f, 0, length $dir) {
703 substr $f, 0, 1 + length $dir, "";
704 $kv->{$k} = $f;
705 last;
706 }
707 }
708 }
709
710 my %response = (directory => $remote);
711
712 if (length $kv->{read_filename}) {
713 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
714 sysread $fh, my $buf, -s $fh;
715 $response{read_content} = $buf;
716 }
717 }
718
719 if (length $kv->{write_filename}) {
720 if (open my $fh, ">:raw", "$local/$kv->{write_filename}") {
721 syswrite $fh, $kv->{content_to_write};
722 }
723 }
724
725 $self->send_msg (test_dda_response => %response);
726
727 $self->on (sub {
728 my ($self, $type, $kv) = @_;
729
730 $guard if 0; # reference
731
732 if ($type eq "test_dda_complete") {
733 $ok->(
734 $kv->{read_directory_allowed} eq "true",
735 $kv->{write_directory_allowed} eq "true",
736 );
737 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
738 $err->($kv->{extra_description});
739 return;
740 }
741
742 1
743 });
744
745 return;
746 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
747 $err->($kv);
748 return;
749 }
750
751 1
752 });
753 });
754};
755
756=back
757
758=head2 REQUEST CACHE
759
760The C<AnyEvent::FCP> class keeps a request cache, where it caches all
761information from requests.
762
763For these messages, it will store a copy of the key-value pairs, together with a C<type> slot,
764in C<< $fcp->{req}{$identifier} >>:
765
766 persistent_get
767 persistent_put
768 persistent_put_dir
769
770This message updates the stored data:
771
772 persistent_request_modified
773
774This message will remove this entry:
775
776 persistent_request_removed
777
778These messages get merged into the cache entry, under their
779type, i.e. a C<simple_progress> message will be stored in C<<
780$fcp->{req}{$identifier}{simple_progress} >>:
781
782 simple_progress # get/put
783
784 uri_generated # put
785 generated_metadata # put
786 started_compression # put
787 finished_compression # put
788 put_failed # put
789 put_fetchable # put
790 put_successful # put
791
792 sending_to_network # get
793 compatibility_mode # get
794 expected_hashes # get
795 expected_mime # get
796 expected_data_length # get
797 get_failed # get
798 data_found # get
799 enter_finite_cooldown # get
800
801In addition, an event (basically a fake message) of type C<request_changed> is generated
802on every change, which will be called as C<< $cb->($fcp, $kv, $type) >>, where C<$type>
803is the type of the original message triggering the change,
804
805To fill this cache with the global queue and keep it updated,
806call C<watch_global> to subscribe to updates, followed by
807C<list_persistent_requests_sync>.
808
809 $fcp->watch_global_sync_; # do not wait
810 $fcp->list_persistent_requests; # wait
811
812To get a better idea of what is stored in the cache, here is an example of
813what might be stored in C<< $fcp->{req}{"Frost-gpl.txt"} >>:
814
815 {
816 identifier => "Frost-gpl.txt",
817 uri => 'CHK@Fnx5kzdrfE,EImdzaVyEWl,AAIC--8/gpl.txt',
818 binary_blob => "false",
819 global => "true",
820 max_retries => -1,
821 max_size => 9223372036854775807,
822 persistence => "forever",
823 priority_class => 3,
824 real_time => "false",
825 return_type => "direct",
826 started => "true",
827 type => "persistent_get",
828 verbosity => 2147483647,
829 sending_to_network => {
830 identifier => "Frost-gpl.txt",
831 global => "true",
436 }, 832 },
437 ); 833 compatibility_mode => {
438}; 834 identifier => "Frost-gpl.txt",
439 835 definitive => "true",
440=back 836 dont_compress => "false",
837 global => "true",
838 max => "COMPAT_1255",
839 min => "COMPAT_1255",
840 },
841 expected_hashes => {
842 identifier => "Frost-gpl.txt",
843 global => "true",
844 hashes => {
845 ed2k => "d83596f5ee3b7...",
846 md5 => "e0894e4a2a6...",
847 sha1 => "...",
848 sha256 => "...",
849 sha512 => "...",
850 tth => "...",
851 },
852 },
853 expected_mime => {
854 identifier => "Frost-gpl.txt",
855 global => "true",
856 metadata => { content_type => "application/rar" },
857 },
858 expected_data_length => {
859 identifier => "Frost-gpl.txt",
860 data_length => 37576,
861 global => "true",
862 },
863 simple_progress => {
864 identifier => "Frost-gpl.txt",
865 failed => 0,
866 fatally_failed => 0,
867 finalized_total => "true",
868 global => "true",
869 last_progress => 1438639282628,
870 required => 372,
871 succeeded => 102,
872 total => 747,
873 },
874 data_found => {
875 identifier => "Frost-gpl.txt",
876 completion_time => 1438663354026,
877 data_length => 37576,
878 global => "true",
879 metadata => { content_type => "image/jpeg" },
880 startup_time => 1438657196167,
881 },
882 }
441 883
442=head1 EXAMPLE PROGRAM 884=head1 EXAMPLE PROGRAM
443 885
444 use AnyEvent::FCP; 886 use AnyEvent::FCP;
445 887
446 my $fcp = new AnyEvent::FCP; 888 my $fcp = new AnyEvent::FCP;
447 889
448 # let us look at the global request list 890 # let us look at the global request list
449 $fcp->watch_global (1, 0); 891 $fcp->watch_global_ (1);
450 892
451 # list them, synchronously 893 # list them, synchronously
452 my $req = $fcp->list_persistent_requests_sync; 894 my $req = $fcp->list_persistent_requests;
453 895
454 # go through all requests 896 # go through all requests
897TODO
455 for my $req (values %$req) { 898 for my $req (values %$req) {
456 # skip jobs not directly-to-disk 899 # skip jobs not directly-to-disk
457 next unless $req->{return_type} eq "disk"; 900 next unless $req->{return_type} eq "disk";
458 # skip jobs not issued by FProxy 901 # skip jobs not issued by FProxy
459 next unless $req->{identifier} =~ /^FProxy:/; 902 next unless $req->{identifier} =~ /^FProxy:/;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines