ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.8 by root, Fri Jun 18 16:59:13 2010 UTC vs.
Revision 1.12 by root, Sat Aug 8 04:02:48 2015 UTC

35 35
36 use AnyEvent::FCP; 36 use AnyEvent::FCP;
37 37
38 my $fcp = new AnyEvent::FCP; 38 my $fcp = new AnyEvent::FCP;
39 39
40 $fcp->watch_global_sync (1, 0); 40 $fcp->watch_global (1, 0);
41 my $req = $fcp->list_persistent_requests_sync; 41 my $req = $fcp->list_persistent_requests;
42 42
43TODO
43 for my $req (values %$req) { 44 for my $req (values %$req) {
44 if ($req->{filename} =~ /a/) { 45 if ($req->{filename} =~ /a/) {
45 $fcp->modify_persistent_request_sync (1, $req->{identifier}, undef, 0); 46 $fcp->modify_persistent_request (1, $req->{identifier}, undef, 0);
46 } 47 }
47 } 48 }
48 49
49=head2 IMPORT TAGS 50=head2 IMPORT TAGS
50 51
51Nothing much can be "imported" from this module right now. 52Nothing much can be "imported" from this module right now.
52 53
53=head2 THE AnyEvent::FCP CLASS 54=head1 THE AnyEvent::FCP CLASS
54 55
55=over 4 56=over 4
56 57
57=cut 58=cut
58 59
66 67
67use Scalar::Util (); 68use Scalar::Util ();
68 69
69use AnyEvent; 70use AnyEvent;
70use AnyEvent::Handle; 71use AnyEvent::Handle;
72use AnyEvent::Util ();
71 73
72sub touc($) { 74sub touc($) {
73 local $_ = shift; 75 local $_ = shift;
74 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime)(?:_|$))/\U$1/; 76 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
75 s/(?:^|_)(.)/\U$1/g; 77 s/(?:^|_)(.)/\U$1/g;
76 $_ 78 $_
77} 79}
78 80
79sub tolc($) { 81sub tolc($) {
80 local $_ = shift; 82 local $_ = shift;
81 1 while s/(SVK|CHK|URI|FCP|DS|MIME)([^_])/$1\_$2/i; 83 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/;
82 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME)/$1\_$2/i; 84 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/;
83 s/(?<=[a-z])(?=[A-Z])/_/g; 85 s/(?<=[a-z])(?=[A-Z])/_/g;
84 lc 86 lc
85} 87}
86 88
87=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 89=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, name => $name]
88 90
89Create a new FCP connection to the given host and port (default 91Create a new FCP connection to the given host and port (default
90127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). 92127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
91 93
92If no C<name> was specified, then AnyEvent::FCP will generate a 94If no C<name> was specified, then AnyEvent::FCP will generate a
93(hopefully) unique client name for you. 95(hopefully) unique client name for you.
94 96
95You can install a progress callback that is being called with the AnyEvent::FCP
96object, the type, a hashref with key-value pairs and a reference to any received data,
97for all unsolicited messages.
98
99Example:
100
101 sub progress_cb {
102 my ($self, $type, $kv, $rdata) = @_;
103
104 if ($type eq "simple_progress") {
105 warn "$kv->{identifier} $kv->{succeeded}/$kv->{required}\n";
106 }
107 }
108
109=cut 97=cut
110 98
111sub new { 99sub new {
112 my $class = shift; 100 my $class = shift;
113 my $self = bless { @_ }, $class; 101 my $self = bless {
114 102 host => $ENV{FREDHOST} || "127.0.0.1",
115 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 103 port => $ENV{FREDPORT} || 9481,
116 $self->{port} ||= $ENV{FREDPORT} || 9481; 104 timeout => 3600 * 2,
117 $self->{name} ||= time.rand.rand.rand; # lame 105 name => time.rand.rand.rand, # lame
118 $self->{timeout} ||= 3600*2; 106 @_,
119 $self->{progress} ||= sub { }; 107 queue => [],
120 108 req => {},
121 $self->{id} = "a0"; 109 id => "a0",
110 }, $class;
122 111
123 { 112 {
124 Scalar::Util::weaken (my $self = $self); 113 Scalar::Util::weaken (my $self = $self);
125 114
126 $self->{hdl} = new AnyEvent::Handle 115 $self->{hdl} = new AnyEvent::Handle
134 on_eof => $self->{on_eof} || sub { }; 123 on_eof => $self->{on_eof} || sub { };
135 124
136 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 125 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
137 } 126 }
138 127
139 $self->send_msg ( 128 $self->send_msg (client_hello =>
140 client_hello =>
141 name => $self->{name}, 129 name => $self->{name},
142 expected_version => "2.0", 130 expected_version => "2.0",
143 ); 131 );
144 132
145 $self 133 $self
146} 134}
149 my ($self, $type, %kv) = @_; 137 my ($self, $type, %kv) = @_;
150 138
151 my $data = delete $kv{data}; 139 my $data = delete $kv{data};
152 140
153 if (exists $kv{id_cb}) { 141 if (exists $kv{id_cb}) {
154 my $id = $kv{identifier} || ++$self->{id}; 142 my $id = $kv{identifier} ||= ++$self->{id};
155 $self->{id}{$id} = delete $kv{id_cb}; 143 $self->{id}{$id} = delete $kv{id_cb};
156 $kv{identifier} = $id;
157 } 144 }
158 145
159 my $msg = (touc $type) . "\012" 146 my $msg = (touc $type) . "\012"
160 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; 147 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
161 148
173 } 160 }
174 161
175 $self->{hdl}->push_write ($msg); 162 $self->{hdl}->push_write ($msg);
176} 163}
177 164
165sub on {
166 my ($self, $cb) = @_;
167
168 # cb return undef - message eaten, remove cb
169 # cb return 0 - message eaten
170 # cb return 1 - pass to next
171
172 push @{ $self->{on} }, $cb;
173}
174
175sub _push_queue {
176 my ($self, $queue) = @_;
177
178 shift @$queue;
179 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
180 if @$queue;
181}
182
183# lock so only one $type (arbitrary string) is in flight,
184# to work around horribly misdesigned protocol.
185sub serialise {
186 my ($self, $type, $cb) = @_;
187
188 my $queue = $self->{serialise}{$type} ||= [];
189 push @$queue, $cb;
190 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
191 unless $#$queue;
192}
193
194# how to merge these types into $self->{persistent}
195our %PERSISTENT_TYPE = (
196 persistent_get => sub { %{ $_[1] } = (type => "persistent_get" , %{ $_[2] }) },
197 persistent_put => sub { %{ $_[1] } = (type => "persistent_put" , %{ $_[2] }) },
198 persistent_put_dir => sub { %{ $_[1] } = (type => "persistent_put_dir", %{ $_[2] }) },
199 persistent_request_modified => sub { %{ $_[1] } = (%{ $_[1] }, %{ $_[2] }) },
200 persistent_request_removed => sub { delete $_[0]{req}{$_[2]{identifier}} },
201
202 simple_progress => sub { $_[1]{simple_progress} = $_[2] }, # get/put
203
204 uri_generated => sub { $_[1]{uri_generated} = $_[2] }, # put
205 generated_metadata => sub { $_[1]{generated_metadata} = $_[2] }, # put
206 started_compression => sub { $_[1]{started_compression} = $_[2] }, # put
207 finished_compression => sub { $_[1]{finished_compression} = $_[2] }, # put
208 put_fetchable => sub { $_[1]{put_fetchable} = $_[2] }, # put
209 put_failed => sub { $_[1]{put_failed} = $_[2] }, # put
210 put_successful => sub { $_[1]{put_successful} = $_[2] }, # put
211
212 sending_to_network => sub { $_[1]{sending_to_network} = $_[2] }, # get
213 compatibility_mode => sub { $_[1]{compatibility_mode} = $_[2] }, # get
214 expected_hashes => sub { $_[1]{expected_hashes} = $_[2] }, # get
215 expected_mime => sub { $_[1]{expected_mime} = $_[2] }, # get
216 expected_data_length => sub { $_[1]{expected_data_length} = $_[2] }, # get
217 get_failed => sub { $_[1]{get_failed} = $_[2] }, # get
218 data_found => sub { $_[1]{data_found} = $_[2] }, # get
219 enter_finite_cooldown => sub { $_[1]{enter_finite_cooldown} = $_[2] }, # get
220);
221
222sub recv {
223 my ($self, $type, $kv, @extra) = @_;
224
225 if (my $cb = $PERSISTENT_TYPE{$type}) {
226 my $id = $kv->{identifier};
227 my $req = $_[0]{req}{$id} ||= {};
228 $cb->($self, $req, $kv);
229 $self->recv (request_change => $kv, $type, @extra);
230 }
231
232 my $on = $self->{on};
233 for (0 .. $#$on) {
234 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
235 splice @$on, $_, 1 unless defined $res;
236 return;
237 }
238 }
239
240 if (my $cb = $self->{queue}[0]) {
241 $cb->($self, $type, $kv, @extra)
242 and shift @{ $self->{queue} };
243 } else {
244 $self->default_recv ($type, $kv, @extra);
245 }
246}
247
178sub on_read { 248sub on_read {
179 my ($self) = @_; 249 my ($self) = @_;
180 250
181 my $type; 251 my $type;
182 my %kv; 252 my %kv;
183 my $rdata; 253 my $rdata;
184
185 my $done_cb = sub {
186 $kv{pkt_type} = $type;
187
188 if (my $cb = $self->{queue}[0]) {
189 $cb->($self, $type, \%kv, $rdata)
190 and shift @{ $self->{queue} };
191 } else {
192 $self->default_recv ($type, \%kv, $rdata);
193 }
194 };
195 254
196 my $hdr_cb; $hdr_cb = sub { 255 my $hdr_cb; $hdr_cb = sub {
197 if ($_[1] =~ /^([^=]+)=(.*)$/) { 256 if ($_[1] =~ /^([^=]+)=(.*)$/) {
198 my ($k, $v) = ($1, $2); 257 my ($k, $v) = ($1, $2);
199 my @k = split /\./, tolc $k; 258 my @k = split /\./, tolc $k;
212 271
213 $_[0]->push_read (line => $hdr_cb); 272 $_[0]->push_read (line => $hdr_cb);
214 } elsif ($_[1] eq "Data") { 273 } elsif ($_[1] eq "Data") {
215 $_[0]->push_read (chunk => delete $kv{data_length}, sub { 274 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
216 $rdata = \$_[1]; 275 $rdata = \$_[1];
217 $done_cb->(); 276 $self->recv ($type, \%kv, $rdata);
218 }); 277 });
219 } elsif ($_[1] eq "EndMessage") { 278 } elsif ($_[1] eq "EndMessage") {
220 $done_cb->(); 279 $self->recv ($type, \%kv);
221 } else { 280 } else {
222 die "protocol error, expected message end, got $_[1]\n";#d# 281 die "protocol error, expected message end, got $_[1]\n";#d#
223 } 282 }
224 }; 283 };
225 284
235 if ($type eq "node_hello") { 294 if ($type eq "node_hello") {
236 $self->{node_hello} = $kv; 295 $self->{node_hello} = $kv;
237 } elsif (exists $self->{id}{$kv->{identifier}}) { 296 } elsif (exists $self->{id}{$kv->{identifier}}) {
238 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) 297 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
239 and delete $self->{id}{$kv->{identifier}}; 298 and delete $self->{id}{$kv->{identifier}};
240 } else {
241 &{ $self->{progress} };
242 } 299 }
243} 300}
301
302=back
303
304=head2 FCP REQUESTS
305
306The following methods implement various requests. Most of them map
307directory to the FCP message of the same name. The added benefit of
308these over sending requests yourself is that they handle the necessary
309serialisation, protocol quirks, and replies.
310
311All of them exist in two versions, the variant shown in this manpage, and
312a variant with an extra C<_> at the end, and an extra C<$cb> argument. The
313version as shown is I<synchronous> - it will wait for any replies, and
314either return the reply, or croak with an error. The underscore variant
315returns immediately and invokes one or more callbacks or condvars later.
316
317For example, the call
318
319 $info = $fcp->get_plugin_info ($name, $detailed);
320
321Also comes in this underscore variant:
322
323 $fcp->get_plugin_info_ ($name, $detailed, $cb);
324
325You can thinbk of the underscore as a kind of continuation indicator - the
326normal function waits and returns with the data, the C<_> indicates that
327you pass the continuation yourself, and the continuation will be invoked
328with the results.
329
330This callback/continuation argument (C<$cb>) can come in three forms itself:
331
332=over 4
333
334=item A code reference (or rather anything not matching some other alternative)
335
336This code reference will be invoked with the result on success. On an
337error, it will die (in the event loop) with a backtrace of the call site.
338
339This is a popular choice, but it makes handling errors hard - make sure
340you never generate protocol errors!
341
342=item A condvar (as returned by e.g. C<< AnyEvent->condvar >>)
343
344When a condvar is passed, it is sent (C<< $cv->send ($results) >>) the
345results when the request has finished. Should an error occur, the error
346will instead result in C<< $cv->croak ($error) >>.
347
348This is also a popular choice.
349
350=item An array with two callbacks C<[$success, $failure]>
351
352The C<$success> callback will be invoked with the results, while the
353C<$failure> callback will be invoked on any errors.
354
355=item C<undef>
356
357This is the same thing as specifying C<sub { }> as callback, i.e. on
358success, the results are ignored, while on failure, you the module dies
359with a backtrace.
360
361This is good for quick scripts, or when you really aren't interested in
362the results.
363
364=back
365
366=cut
367
368our $NOP_CB = sub { };
244 369
245sub _txn { 370sub _txn {
246 my ($name, $sub) = @_; 371 my ($name, $sub) = @_;
247 372
248 *{$name} = sub { 373 *{$name} = sub {
249 splice @_, 1, 0, (my $cv = AnyEvent->condvar); 374 my $cv = AE::cv;
250 &$sub;
251 $cv
252 };
253 375
254 *{"$name\_sync"} = sub { 376 splice @_, 1, 0, $cv, sub { $cv->throw ($_[0]{extra_description}) };
255 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
256 &$sub; 377 &$sub;
257 $cv->recv 378 $cv->recv
258 }; 379 };
259}
260 380
261=item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]]) 381 *{"$name\_"} = sub {
382 my ($ok, $err) = pop;
262 383
384 if (ARRAY:: eq ref $ok) {
385 ($ok, $err) = @$ok;
386 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
387 $err = sub { $ok->throw ($_[0]{extra_description}) };
388 } else {
389 my $bt = Carp::longmess "";
390 $err = sub {
391 die "$_[0]{extra_description}$bt";
392 };
393 }
394
395 $ok ||= $NOP_CB;
396
397 splice @_, 1, 0, $ok, $err;
398 &$sub;
399 };
400}
401
402=over 4
403
263=item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]]) 404=item $peers = $fcp->list_peers ([$with_metdata[, $with_volatile]])
264 405
265=cut 406=cut
266 407
267_txn list_peers => sub { 408_txn list_peers => sub {
268 my ($self, $cv, $with_metadata, $with_volatile) = @_; 409 my ($self, $ok, undef, $with_metadata, $with_volatile) = @_;
269 410
270 my @res; 411 my @res;
271 412
272 $self->send_msg (list_peers => 413 $self->send_msg (list_peers =>
273 with_metadata => $with_metadata ? "true" : "false", 414 with_metadata => $with_metadata ? "true" : "false",
274 with_volatile => $with_volatile ? "true" : "false", 415 with_volatile => $with_volatile ? "true" : "false",
275 id_cb => sub { 416 id_cb => sub {
276 my ($self, $type, $kv, $rdata) = @_; 417 my ($self, $type, $kv, $rdata) = @_;
277 418
278 if ($type eq "end_list_peers") { 419 if ($type eq "end_list_peers") {
279 $cv->(\@res); 420 $ok->(\@res);
280 1 421 1
281 } else { 422 } else {
282 push @res, $kv; 423 push @res, $kv;
283 0 424 0
284 } 425 }
285 }, 426 },
286 ); 427 );
287}; 428};
288 429
289=item $cv = $fcp->list_peer_notes ($node_identifier)
290
291=item $notes = $fcp->list_peer_notes_sync ($node_identifier) 430=item $notes = $fcp->list_peer_notes ($node_identifier)
292 431
293=cut 432=cut
294 433
295_txn list_peer_notes => sub { 434_txn list_peer_notes => sub {
296 my ($self, $cv, $node_identifier) = @_; 435 my ($self, $ok, undef, $node_identifier) = @_;
297 436
298 $self->send_msg (list_peer_notes => 437 $self->send_msg (list_peer_notes =>
299 node_identifier => $node_identifier, 438 node_identifier => $node_identifier,
300 id_cb => sub { 439 id_cb => sub {
301 my ($self, $type, $kv, $rdata) = @_; 440 my ($self, $type, $kv, $rdata) = @_;
302 441
303 $cv->($kv); 442 $ok->($kv);
304 1 443 1
305 }, 444 },
306 ); 445 );
307}; 446};
308 447
309=item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
310
311=item $fcp->watch_global_sync ($enabled[, $verbosity_mask]) 448=item $fcp->watch_global ($enabled[, $verbosity_mask])
312 449
313=cut 450=cut
314 451
315_txn watch_global => sub { 452_txn watch_global => sub {
316 my ($self, $cv, $enabled, $verbosity_mask) = @_; 453 my ($self, $ok, $err, $enabled, $verbosity_mask) = @_;
317 454
318 $self->send_msg (watch_global => 455 $self->send_msg (watch_global =>
319 enabled => $enabled ? "true" : "false", 456 enabled => $enabled ? "true" : "false",
320 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (), 457 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
321 ); 458 );
322 459
323 $cv->(); 460 $ok->();
324}; 461};
325 462
326=item $cv = $fcp->list_persistent_requests
327
328=item $reqs = $fcp->list_persistent_requests_sync 463=item $reqs = $fcp->list_persistent_requests
329 464
330=cut 465=cut
331 466
332_txn list_persistent_requests => sub { 467_txn list_persistent_requests => sub {
333 my ($self, $cv) = @_; 468 my ($self, $ok, $err) = @_;
334 469
470 $self->serialise (list_persistent_requests => sub {
471 my ($self, $guard) = @_;
472
335 my %res; 473 my @res;
336 474
337 $self->send_msg ("list_persistent_requests"); 475 $self->send_msg ("list_persistent_requests");
338 476
339 push @{ $self->{queue} }, sub { 477 $self->on (sub {
340 my ($self, $type, $kv, $rdata) = @_; 478 my ($self, $type, $kv, $rdata) = @_;
341 479
480 $guard if 0;
481
342 if ($type eq "end_list_persistent_requests") { 482 if ($type eq "end_list_persistent_requests") {
343 $cv->(\%res); 483 $ok->(\@res);
484 return;
485 } else {
486 my $id = $kv->{identifier};
487
488 if ($type =~ /^persistent_(get|put|put_dir)$/) {
489 push @res, [$type, $kv];
490 }
491 }
492
344 1 493 1
345 } else { 494 });
346 my $id = $kv->{identifier}; 495 });
496};
347 497
348 if ($type =~ /^persistent_(get|put|put_dir)$/) { 498=item $sync = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
349 $res{$id} = { 499
350 type => $1, 500Update either the C<client_token> or C<priority_class> of a request
351 %{ $res{$id} }, 501identified by C<$global> and C<$identifier>, depending on which of
502C<$client_token> and C<$priority_class> are not C<undef>.
503
504=cut
505
506_txn modify_persistent_request => sub {
507 my ($self, $ok, $err, $global, $identifier, $client_token, $priority_class) = @_;
508
509 $self->serialise ($identifier => sub {
510 my ($self, $guard) = @_;
511
512 $self->send_msg (modify_persistent_request =>
513 global => $global ? "true" : "false",
514 identifier => $identifier,
515 defined $client_token ? (client_token => $client_token ) : (),
516 defined $priority_class ? (priority_class => $priority_class) : (),
517 );
518
519 $self->on (sub {
520 my ($self, $type, $kv, @extra) = @_;
521
522 if ($kv->{identifier} eq $identifier) {
523 if ($type eq "persistent_request_modified") {
352 %$kv, 524 $ok->($kv);
525 return;
526 } elsif ($type eq "protocol_error") {
527 $err->($kv);
528 return;
353 }; 529 }
354 } elsif ($type eq "simple_progress") {
355 delete $kv->{pkt_type}; # save memory
356 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
357 } else {
358 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
359 } 530 }
531
360 0 532 1
361 } 533 });
362 }; 534 });
363}; 535};
364 536
365=item $cv = $fcp->remove_request ($global, $identifier)
366
367=item $status = $fcp->remove_request_sync ($global, $identifier)
368
369=cut
370
371_txn remove_request => sub {
372 my ($self, $cv, $global, $identifier) = @_;
373
374 $self->send_msg (remove_request =>
375 global => $global ? "true" : "false",
376 identifier => $identifier,
377 id_cb => sub {
378 my ($self, $type, $kv, $rdata) = @_;
379
380 $cv->($kv);
381 1
382 },
383 );
384};
385
386=item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
387
388=item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
389
390=cut
391
392_txn modify_persistent_request => sub {
393 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
394
395 $self->send_msg (modify_persistent_request =>
396 global => $global ? "true" : "false",
397 defined $client_token ? (client_token => $client_token ) : (),
398 defined $priority_class ? (priority_class => $priority_class) : (),
399 identifier => $identifier,
400 id_cb => sub {
401 my ($self, $type, $kv, $rdata) = @_;
402
403 $cv->($kv);
404 1
405 },
406 );
407};
408
409=item $cv = $fcp->get_plugin_info ($name, $detailed)
410
411=item $info = $fcp->get_plugin_info_sync ($name, $detailed) 537=item $info = $fcp->get_plugin_info ($name, $detailed)
412 538
413=cut 539=cut
414 540
415_txn get_plugin_info => sub { 541_txn get_plugin_info => sub {
416 my ($self, $cv, $name, $detailed) = @_; 542 my ($self, $ok, $err, $name, $detailed) = @_;
417 543
418 $self->send_msg (get_plugin_info => 544 $self->send_msg (get_plugin_info =>
419 plugin_name => $name, 545 plugin_name => $name,
420 detailed => $detailed ? "true" : "false", 546 detailed => $detailed ? "true" : "false",
421 id_cb => sub { 547 id_cb => sub {
422 my ($self, $type, $kv, $rdata) = @_; 548 my ($self, $type, $kv, $rdata) = @_;
423 549
424 $cv->($kv); 550 $ok->($kv);
425 1 551 1
426 }, 552 },
427 ); 553 );
428}; 554};
429 555
430=item $cv = $fcp->client_get ($uri, $identifier, %kv)
431
432=item $status = $fcp->client_get_sync ($uri, $identifier, %kv) 556=item $status = $fcp->client_get ($uri, $identifier, %kv)
433 557
434%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>). 558%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
435 559
436ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries, 560ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
437priority_class, persistence, client_token, global, return_type, 561priority_class, persistence, client_token, global, return_type,
438binary_blob, allowed_mime_types, filename, temp_filename 562binary_blob, allowed_mime_types, filename, temp_filename
439 563
440=cut 564=cut
441 565
442_txn client_get => sub { 566_txn client_get => sub {
443 my ($self, $cv, $uri, $identifier, %kv) = @_; 567 my ($self, $ok, $err, $uri, $identifier, %kv) = @_;
444 568
445 $self->send_msg (client_get => 569 $self->send_msg (client_get =>
446 %kv, 570 %kv,
447 uri => $uri, 571 uri => $uri,
448 identifier => $identifier, 572 identifier => $identifier,
449 id_cb => sub { 573 );
574
575 $ok->();
576};
577
578=item $status = $fcp->remove_request ($identifier[, $global])
579
580Remove the request with the given isdentifier. Returns true if successful,
581false on error.
582
583=cut
584
585_txn remove_request => sub {
586 my ($self, $ok, $err, $identifier, $global) = @_;
587
588 $self->serialise ($identifier => sub {
589 my ($self, $guard) = @_;
590
591 $self->send_msg (remove_request =>
592 identifier => $identifier,
593 global => $global ? "true" : "false",
594 );
595 $self->on (sub {
450 my ($self, $type, $kv, $rdata) = @_; 596 my ($self, $type, $kv, @extra) = @_;
451 597
598 if ($kv->{identifier} eq $identifier) {
599 if ($type eq "persistent_request_removed") {
600 $ok->(1);
601 return;
602 } elsif ($type eq "protocol_error") {
452 $cv->($kv); 603 $err->($kv);
604 return;
605 }
606 }
607
453 1 608 1
609 });
610 });
611};
612
613=item ($can_read, $can_write) = $fcp->test_dda ($local_directory, $remote_directory, $want_read, $want_write))
614
615The DDA test in FCP is probably the single most broken protocol - only
616one directory test can be outstanding at any time, and some guessing and
617heuristics are involved in mangling the paths.
618
619This function combines C<TestDDARequest> and C<TestDDAResponse> in one
620request, handling file reading and writing as well, and tries very hard to
621do the right thing.
622
623Both C<$local_directory> and C<$remote_directory> must specify the same
624directory - C<$local_directory> is the directory path on the client (where
625L<AnyEvent::FCP> runs) and C<$remote_directory> is the directory path on
626the server (where the freenet node runs). When both are running on the
627same node, the paths are generally identical.
628
629C<$want_read> and C<$want_write> should be set to a true value when you
630want to read (get) files or write (put) files, respectively.
631
632On error, an exception is thrown. Otherwise, C<$can_read> and
633C<$can_write> indicate whether you can reaqd or write to freenet via the
634directory.
635
636=cut
637
638_txn test_dda => sub {
639 my ($self, $ok, $err, $local, $remote, $want_read, $want_write) = @_;
640
641 $self->serialise (test_dda => sub {
642 my ($self, $guard) = @_;
643
644 $self->send_msg (test_dda_request =>
645 directory => $remote,
646 want_read_directory => $want_read ? "true" : "false",
647 want_write_directory => $want_write ? "true" : "false",
648 );
649 $self->on (sub {
650 my ($self, $type, $kv) = @_;
651
652 if ($type eq "test_dda_reply") {
653 # the filenames are all relative to the server-side directory,
654 # which might or might not match $remote anymore, so we
655 # need to rewrite the paths to be relative to $local
656 for my $k (qw(read_filename write_filename)) {
657 my $f = $kv->{$k};
658 for my $dir ($kv->{directory}, $remote) {
659 if ($dir eq substr $f, 0, length $dir) {
660 substr $f, 0, 1 + length $dir, "";
661 $kv->{$k} = $f;
662 last;
663 }
664 }
665 }
666
667 my %response = (directory => $remote);
668
669 if (length $kv->{read_filename}) {
670 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
671 sysread $fh, my $buf, -s $fh;
672 $response{read_content} = $buf;
673 }
674 }
675
676 if (length $kv->{write_filename}) {
677 if (open my $fh, ">:raw", "$local/$kv->{write_filename}") {
678 syswrite $fh, $kv->{content_to_write};
679 }
680 }
681
682 $self->send_msg (test_dda_response => %response);
683
684 $self->on (sub {
685 my ($self, $type, $kv) = @_;
686
687 $guard if 0; # reference
688
689 if ($type eq "test_dda_complete") {
690 $ok->(
691 $kv->{read_directory_allowed} eq "true",
692 $kv->{write_directory_allowed} eq "true",
693 );
694 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
695 $err->($kv->{extra_description});
696 return;
697 }
698
699 1
700 });
701
702 return;
703 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
704 $err->($kv);
705 return;
706 }
707
708 1
709 });
710 });
711};
712
713=back
714
715=head2 REQUEST CACHE
716
717The C<AnyEvent::FCP> class keeps a request cache, where it caches all
718information from requests.
719
720For these messages, it will store a copy of the key-value pairs, together with a C<type> slot,
721in C<< $fcp->{req}{$identifier} >>:
722
723 persistent_get
724 persistent_put
725 persistent_put_dir
726
727This message updates the stored data:
728
729 persistent_request_modified
730
731This message will remove this entry:
732
733 persistent_request_removed
734
735These messages get merged into the cache entry, under their
736type, i.e. a C<simple_progress> message will be stored in C<<
737$fcp->{req}{$identifier}{simple_progress} >>:
738
739 simple_progress # get/put
740
741 uri_generated # put
742 generated_metadata # put
743 started_compression # put
744 finished_compression # put
745 put_failed # put
746 put_fetchable # put
747 put_successful # put
748
749 sending_to_network # get
750 compatibility_mode # get
751 expected_hashes # get
752 expected_mime # get
753 expected_data_length # get
754 get_failed # get
755 data_found # get
756 enter_finite_cooldown # get
757
758In addition, an event (basically a fake message) of type C<request_changed> is generated
759on every change, which will be called as C<< $cb->($fcp, $kv, $type) >>, where C<$type>
760is the type of the original message triggering the change,
761
762To fill this cache with the global queue and keep it updated,
763call C<watch_global> to subscribe to updates, followed by
764C<list_persistent_requests_sync>.
765
766 $fcp->watch_global_sync_; # do not wait
767 $fcp->list_persistent_requests; # wait
768
769To get a better idea of what is stored in the cache, here is an example of
770what might be stored in C<< $fcp->{req}{"Frost-gpl.txt"} >>:
771
772 {
773 identifier => "Frost-gpl.txt",
774 uri => 'CHK@Fnx5kzdrfE,EImdzaVyEWl,AAIC--8/gpl.txt',
775 binary_blob => "false",
776 global => "true",
777 max_retries => -1,
778 max_size => 9223372036854775807,
779 persistence => "forever",
780 priority_class => 3,
781 real_time => "false",
782 return_type => "direct",
783 started => "true",
784 type => "persistent_get",
785 verbosity => 2147483647,
786 sending_to_network => {
787 identifier => "Frost-gpl.txt",
788 global => "true",
454 }, 789 },
455 ); 790 compatibility_mode => {
456}; 791 identifier => "Frost-gpl.txt",
457 792 definitive => "true",
458=back 793 dont_compress => "false",
794 global => "true",
795 max => "COMPAT_1255",
796 min => "COMPAT_1255",
797 },
798 expected_hashes => {
799 identifier => "Frost-gpl.txt",
800 global => "true",
801 hashes => {
802 ed2k => "d83596f5ee3b7...",
803 md5 => "e0894e4a2a6...",
804 sha1 => "...",
805 sha256 => "...",
806 sha512 => "...",
807 tth => "...",
808 },
809 },
810 expected_mime => {
811 identifier => "Frost-gpl.txt",
812 global => "true",
813 metadata => { content_type => "application/rar" },
814 },
815 expected_data_length => {
816 identifier => "Frost-gpl.txt",
817 data_length => 37576,
818 global => "true",
819 },
820 simple_progress => {
821 identifier => "Frost-gpl.txt",
822 failed => 0,
823 fatally_failed => 0,
824 finalized_total => "true",
825 global => "true",
826 last_progress => 1438639282628,
827 required => 372,
828 succeeded => 102,
829 total => 747,
830 },
831 data_found => {
832 identifier => "Frost-gpl.txt",
833 completion_time => 1438663354026,
834 data_length => 37576,
835 global => "true",
836 metadata => { content_type => "image/jpeg" },
837 startup_time => 1438657196167,
838 },
839 }
459 840
460=head1 EXAMPLE PROGRAM 841=head1 EXAMPLE PROGRAM
461 842
462 use AnyEvent::FCP; 843 use AnyEvent::FCP;
463 844
464 my $fcp = new AnyEvent::FCP; 845 my $fcp = new AnyEvent::FCP;
465 846
466 # let us look at the global request list 847 # let us look at the global request list
467 $fcp->watch_global (1, 0); 848 $fcp->watch_global_ (1);
468 849
469 # list them, synchronously 850 # list them, synchronously
470 my $req = $fcp->list_persistent_requests_sync; 851 my $req = $fcp->list_persistent_requests;
471 852
472 # go through all requests 853 # go through all requests
854TODO
473 for my $req (values %$req) { 855 for my $req (values %$req) {
474 # skip jobs not directly-to-disk 856 # skip jobs not directly-to-disk
475 next unless $req->{return_type} eq "disk"; 857 next unless $req->{return_type} eq "disk";
476 # skip jobs not issued by FProxy 858 # skip jobs not issued by FProxy
477 next unless $req->{identifier} =~ /^FProxy:/; 859 next unless $req->{identifier} =~ /^FProxy:/;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines