ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.10 by root, Tue Aug 4 00:50:25 2015 UTC vs.
Revision 1.19 by root, Tue Jun 7 18:53:23 2016 UTC

35 35
36 use AnyEvent::FCP; 36 use AnyEvent::FCP;
37 37
38 my $fcp = new AnyEvent::FCP; 38 my $fcp = new AnyEvent::FCP;
39 39
40 $fcp->watch_global_sync (1, 0); 40 $fcp->watch_global (1, 0);
41 my $req = $fcp->list_persistent_requests_sync; 41 my $req = $fcp->list_persistent_requests;
42 42
43TODO
43 for my $req (values %$req) { 44 for my $req (values %$req) {
44 if ($req->{filename} =~ /a/) { 45 if ($req->{filename} =~ /a/) {
45 $fcp->modify_persistent_request_sync (1, $req->{identifier}, undef, 0); 46 $fcp->modify_persistent_request (1, $req->{identifier}, undef, 0);
46 } 47 }
47 } 48 }
48 49
49=head2 IMPORT TAGS 50=head2 IMPORT TAGS
50 51
51Nothing much can be "imported" from this module right now. 52Nothing much can be "imported" from this module right now.
52 53
53=head2 THE AnyEvent::FCP CLASS 54=head1 THE AnyEvent::FCP CLASS
54 55
55=over 4 56=over 4
56 57
57=cut 58=cut
58 59
60 61
61use common::sense; 62use common::sense;
62 63
63use Carp; 64use Carp;
64 65
65our $VERSION = '0.3'; 66our $VERSION = 0.4;
66 67
67use Scalar::Util (); 68use Scalar::Util ();
68 69
69use AnyEvent; 70use AnyEvent;
70use AnyEvent::Handle; 71use AnyEvent::Handle;
71use AnyEvent::Util (); 72use AnyEvent::Util ();
73
74our %TOLC; # tolc cache
72 75
73sub touc($) { 76sub touc($) {
74 local $_ = shift; 77 local $_ = shift;
75 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/; 78 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
76 s/(?:^|_)(.)/\U$1/g; 79 s/(?:^|_)(.)/\U$1/g;
77 $_ 80 $_
78} 81}
79 82
80sub tolc($) { 83sub tolc($) {
81 local $_ = shift; 84 local $_ = shift;
82 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/i; 85 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/;
83 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/i; 86 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/;
84 s/(?<=[a-z])(?=[A-Z])/_/g; 87 s/(?<=[a-z])(?=[A-Z])/_/g;
85 lc 88 lc
86} 89}
87 90
88=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 91=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, name => $name]
89 92
90Create a new FCP connection to the given host and port (default 93Create a new FCP connection to the given host and port (default
91127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). 94127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
92 95
93If no C<name> was specified, then AnyEvent::FCP will generate a 96If no C<name> was specified, then AnyEvent::FCP will generate a
94(hopefully) unique client name for you. 97(hopefully) unique client name for you.
95 98
96You can install a progress callback that is being called with the AnyEvent::FCP
97object, the type, a hashref with key-value pairs and a reference to any received data,
98for all unsolicited messages.
99
100Example:
101
102 sub progress_cb {
103 my ($self, $type, $kv, $rdata) = @_;
104
105 if ($type eq "simple_progress") {
106 warn "$kv->{identifier} $kv->{succeeded}/$kv->{required}\n";
107 }
108 }
109
110=cut 99=cut
111 100
112sub new { 101sub new {
113 my $class = shift; 102 my $class = shift;
103
104 my $rand = join "", map chr 0x21 + rand 94, 1..40; # ~ 262 bits entropy
105
114 my $self = bless { @_ }, $class; 106 my $self = bless {
115 107 host => $ENV{FREDHOST} || "127.0.0.1",
116 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 108 port => $ENV{FREDPORT} || 9481,
117 $self->{port} ||= $ENV{FREDPORT} || 9481; 109 timeout => 3600 * 2,
118 $self->{name} ||= time.rand.rand.rand; # lame 110 keepalive => 9 * 60,
119 $self->{timeout} ||= 3600*2; 111 name => time.rand.rand.rand, # lame
120 $self->{progress} ||= sub { }; 112 @_,
121 113 queue => [],
122 $self->{id} = "a0"; 114 req => {},
115 prefix => "..:aefcpid:$rand:",
116 idseq => "a0",
117 }, $class;
123 118
124 { 119 {
125 Scalar::Util::weaken (my $self = $self); 120 Scalar::Util::weaken (my $self = $self);
121
122 $self->{kw} = AE::timer $self->{keepalive}, $self->{keepalive}, sub {
123 $self->{hdl}->push_write ("\n");
124 };
125
126 our $ENDMESSAGE = qr<\012(EndMessage|Data)\012>;
127
128 # these are declared here for performance reasons
129 my ($k, $v, $type);
130 my $rdata;
131
132 my $on_read = sub {
133 my ($hdl) = @_;
134
135 # we only carve out whole messages here
136 while ($hdl->{rbuf} =~ /\012(EndMessage|Data)\012/) {
137 # remember end marker
138 $rdata = $1 eq "Data"
139 or $1 eq "EndMessage"
140 or return $self->fatal ("protocol error, expected message end, got $1\n");
141
142 my @lines = split /\012/, substr $hdl->{rbuf}, 0, $-[0];
143
144 substr $hdl->{rbuf}, 0, $+[0], ""; # remove pkg
145
146 $type = shift @lines;
147 $type = ($TOLC{$type} ||= tolc $type);
148
149 my %kv;
150
151 for (@lines) {
152 ($k, $v) = split /=/, $_, 2;
153 $k = ($TOLC{$k} ||= tolc $k);
154
155 if ($k =~ /\./) {
156 # generic, slow case
157 my @k = split /\./, $k;
158 my $ro = \\%kv;
159
160 while (@k) {
161 $k = shift @k;
162 if ($k =~ /^\d+$/) {
163 $ro = \$$ro->[$k];
164 } else {
165 $ro = \$$ro->{$k};
166 }
167 }
168
169 $$ro = $v;
170
171 next;
172 }
173
174 # special comon case, for performance only
175 $kv{$k} = $v;
176 }
177
178 if ($rdata) {
179 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
180 $rdata = \$_[1];
181 $self->recv ($type, \%kv, $rdata);
182 });
183
184 last; # do not tgry to parse more messages
185 } else {
186 $self->recv ($type, \%kv);
187 }
188 }
189 };
126 190
127 $self->{hdl} = new AnyEvent::Handle 191 $self->{hdl} = new AnyEvent::Handle
128 connect => [$self->{host} => $self->{port}], 192 connect => [$self->{host} => $self->{port}],
129 timeout => $self->{timeout}, 193 timeout => $self->{timeout},
194 on_read => $on_read,
195 on_eof => $self->{on_eof},
130 on_error => sub { 196 on_error => sub {
131 warn "@_\n";#d# 197 $self->fatal ($_[2]);
132 exit 1;
133 }, 198 },
134 on_read => sub { $self->on_read (@_) }, 199 ;
135 on_eof => $self->{on_eof} || sub { };
136 200
137 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 201 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
138 } 202 }
139 203
140 $self->send_msg ( 204 $self->send_msg (client_hello =>
141 client_hello =>
142 name => $self->{name}, 205 name => $self->{name},
143 expected_version => "2.0", 206 expected_version => "2.0",
144 ); 207 );
145 208
146 $self 209 $self
147} 210}
148 211
212sub fatal {
213 my ($self, $msg) = @_;
214
215 $self->{hdl}->shutdown;
216 delete $self->{kw};
217
218 if ($self->{on_error}) {
219 $self->{on_error}->($msg);
220 } else {
221 die $msg;
222 }
223}
224
225sub identifier {
226 $_[0]{prefix} . ++$_[0]{idseq}
227}
228
149sub send_msg { 229sub send_msg {
150 my ($self, $type, %kv) = @_; 230 my ($self, $type, %kv) = @_;
151 231
152 my $data = delete $kv{data}; 232 my $data = delete $kv{data};
153 233
154 if (exists $kv{id_cb}) { 234 if (exists $kv{id_cb}) {
155 my $id = $kv{identifier} ||= ++$self->{id}; 235 my $id = $kv{identifier} ||= $self->identifier;
156 $self->{id}{$id} = delete $kv{id_cb}; 236 $self->{id}{$id} = delete $kv{id_cb};
157 } 237 }
158 238
159 my $msg = (touc $type) . "\012" 239 my $msg = (touc $type) . "\012"
160 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; 240 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
202 push @$queue, $cb; 282 push @$queue, $cb;
203 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) }) 283 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
204 unless $#$queue; 284 unless $#$queue;
205} 285}
206 286
207sub on_read { 287# how to merge these types into $self->{persistent}
208 my ($self) = @_; 288our %PERSISTENT_TYPE = (
289 persistent_get => sub { %{ $_[1] } = (type => "persistent_get" , %{ $_[2] }) },
290 persistent_put => sub { %{ $_[1] } = (type => "persistent_put" , %{ $_[2] }) },
291 persistent_put_dir => sub { %{ $_[1] } = (type => "persistent_put_dir", %{ $_[2] }) },
292 persistent_request_modified => sub { %{ $_[1] } = (%{ $_[1] }, %{ $_[2] }) },
293 persistent_request_removed => sub { delete $_[0]{req}{$_[2]{identifier}} },
209 294
210 my $type; 295 simple_progress => sub { $_[1]{simple_progress} = $_[2] }, # get/put
211 my %kv;
212 my $rdata;
213 296
214 my $done_cb = sub { 297 uri_generated => sub { $_[1]{uri_generated} = $_[2] }, # put
215 $kv{pkt_type} = $type; 298 generated_metadata => sub { $_[1]{generated_metadata} = $_[2] }, # put
299 started_compression => sub { $_[1]{started_compression} = $_[2] }, # put
300 finished_compression => sub { $_[1]{finished_compression} = $_[2] }, # put
301 put_fetchable => sub { $_[1]{put_fetchable} = $_[2] }, # put
302 put_failed => sub { $_[1]{put_failed} = $_[2] }, # put
303 put_successful => sub { $_[1]{put_successful} = $_[2] }, # put
216 304
305 sending_to_network => sub { $_[1]{sending_to_network} = $_[2] }, # get
306 compatibility_mode => sub { $_[1]{compatibility_mode} = $_[2] }, # get
307 expected_hashes => sub { $_[1]{expected_hashes} = $_[2] }, # get
308 expected_mime => sub { $_[1]{expected_mime} = $_[2] }, # get
309 expected_data_length => sub { $_[1]{expected_data_length} = $_[2] }, # get
310 get_failed => sub { $_[1]{get_failed} = $_[2] }, # get
311 data_found => sub { $_[1]{data_found} = $_[2] }, # get
312 enter_finite_cooldown => sub { $_[1]{enter_finite_cooldown} = $_[2] }, # get
313);
314
315sub recv {
316 my ($self, $type, $kv, @extra) = @_;
317
318 if (my $cb = $PERSISTENT_TYPE{$type}) {
319 my $id = $kv->{identifier};
320 my $req = $_[0]{req}{$id} ||= {};
321 $cb->($self, $req, $kv);
322 $self->recv (request_changed => $kv, $type, @extra);
323 }
324
217 my $on = $self->{on}; 325 my $on = $self->{on};
218 for (0 .. $#$on) { 326 for (0 .. $#$on) {
219 unless (my $res = $on->[$_]($self, $type, \%kv, $rdata)) { 327 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
220 splice @$on, $_, 1 unless defined $res; 328 splice @$on, $_, 1 unless defined $res;
221 return; 329 return;
222 }
223 } 330 }
331 }
224 332
225 if (my $cb = $self->{queue}[0]) { 333 if (my $cb = $self->{queue}[0]) {
226 $cb->($self, $type, \%kv, $rdata) 334 $cb->($self, $type, $kv, @extra)
227 and shift @{ $self->{queue} }; 335 and shift @{ $self->{queue} };
228 } else { 336 } else {
229 $self->default_recv ($type, \%kv, $rdata); 337 $self->default_recv ($type, $kv, @extra);
230 }
231 }; 338 }
232
233 my $hdr_cb; $hdr_cb = sub {
234 if ($_[1] =~ /^([^=]+)=(.*)$/) {
235 my ($k, $v) = ($1, $2);
236 my @k = split /\./, tolc $k;
237 my $ro = \\%kv;
238
239 while (@k) {
240 my $k = shift @k;
241 if ($k =~ /^\d+$/) {
242 $ro = \$$ro->[$k];
243 } else {
244 $ro = \$$ro->{$k};
245 }
246 }
247
248 $$ro = $v;
249
250 $_[0]->push_read (line => $hdr_cb);
251 } elsif ($_[1] eq "Data") {
252 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
253 $rdata = \$_[1];
254 $done_cb->();
255 });
256 } elsif ($_[1] eq "EndMessage") {
257 $done_cb->();
258 } else {
259 die "protocol error, expected message end, got $_[1]\n";#d#
260 }
261 };
262
263 $self->{hdl}->push_read (line => sub {
264 $type = tolc $_[1];
265 $_[0]->push_read (line => $hdr_cb);
266 });
267} 339}
268 340
269sub default_recv { 341sub default_recv {
270 my ($self, $type, $kv, $rdata) = @_; 342 my ($self, $type, $kv, $rdata) = @_;
271 343
272 if ($type eq "node_hello") { 344 if ($type eq "node_hello") {
273 $self->{node_hello} = $kv; 345 $self->{node_hello} = $kv;
274 } elsif (exists $self->{id}{$kv->{identifier}}) { 346 } elsif (exists $self->{id}{$kv->{identifier}}) {
275 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) 347 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
276 and delete $self->{id}{$kv->{identifier}}; 348 and delete $self->{id}{$kv->{identifier}};
277 } else {
278 &{ $self->{progress} };
279 } 349 }
280} 350}
351
352=back
353
354=head2 FCP REQUESTS
355
356The following methods implement various requests. Most of them map
357directory to the FCP message of the same name. The added benefit of
358these over sending requests yourself is that they handle the necessary
359serialisation, protocol quirks, and replies.
360
361All of them exist in two versions, the variant shown in this manpage, and
362a variant with an extra C<_> at the end, and an extra C<$cb> argument. The
363version as shown is I<synchronous> - it will wait for any replies, and
364either return the reply, or croak with an error. The underscore variant
365returns immediately and invokes one or more callbacks or condvars later.
366
367For example, the call
368
369 $info = $fcp->get_plugin_info ($name, $detailed);
370
371Also comes in this underscore variant:
372
373 $fcp->get_plugin_info_ ($name, $detailed, $cb);
374
375You can thinbk of the underscore as a kind of continuation indicator - the
376normal function waits and returns with the data, the C<_> indicates that
377you pass the continuation yourself, and the continuation will be invoked
378with the results.
379
380This callback/continuation argument (C<$cb>) can come in three forms itself:
381
382=over 4
383
384=item A code reference (or rather anything not matching some other alternative)
385
386This code reference will be invoked with the result on success. On an
387error, it will die (in the event loop) with a backtrace of the call site.
388
389This is a popular choice, but it makes handling errors hard - make sure
390you never generate protocol errors!
391
392=item A condvar (as returned by e.g. C<< AnyEvent->condvar >>)
393
394When a condvar is passed, it is sent (C<< $cv->send ($results) >>) the
395results when the request has finished. Should an error occur, the error
396will instead result in C<< $cv->croak ($error) >>.
397
398This is also a popular choice.
399
400=item An array with two callbacks C<[$success, $failure]>
401
402The C<$success> callback will be invoked with the results, while the
403C<$failure> callback will be invoked on any errors.
404
405=item C<undef>
406
407This is the same thing as specifying C<sub { }> as callback, i.e. on
408success, the results are ignored, while on failure, you the module dies
409with a backtrace.
410
411This is good for quick scripts, or when you really aren't interested in
412the results.
413
414=back
415
416=cut
417
418our $NOP_CB = sub { };
281 419
282sub _txn { 420sub _txn {
283 my ($name, $sub) = @_; 421 my ($name, $sub) = @_;
284 422
285 *{$name} = sub { 423 *{$name} = sub {
286 splice @_, 1, 0, (my $cv = AnyEvent->condvar); 424 my $cv = AE::cv;
287 &$sub;
288 $cv
289 };
290 425
291 *{"$name\_sync"} = sub { 426 splice @_, 1, 0, $cv, sub { $cv->croak ($_[0]{extra_description}) };
292 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
293 &$sub; 427 &$sub;
294 $cv->recv 428 $cv->recv
295 }; 429 };
296}
297 430
298=item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]]) 431 *{"$name\_"} = sub {
432 my ($ok, $err) = pop;
299 433
434 if (ARRAY:: eq ref $ok) {
435 ($ok, $err) = @$ok;
436 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
437 $err = sub { $ok->croak ($_[0]{extra_description}) };
438 } else {
439 my $bt = Carp::longmess "";
440 $err = sub {
441 die "$_[0]{code_description} ($_[0]{extra_description})$bt";
442 };
443 }
444
445 $ok ||= $NOP_CB;
446
447 splice @_, 1, 0, $ok, $err;
448 &$sub;
449 };
450}
451
452=over 4
453
300=item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]]) 454=item $peers = $fcp->list_peers ([$with_metdata[, $with_volatile]])
301 455
302=cut 456=cut
303 457
304_txn list_peers => sub { 458_txn list_peers => sub {
305 my ($self, $cv, $with_metadata, $with_volatile) = @_; 459 my ($self, $ok, undef, $with_metadata, $with_volatile) = @_;
306 460
307 my @res; 461 my @res;
308 462
309 $self->send_msg (list_peers => 463 $self->send_msg (list_peers =>
310 with_metadata => $with_metadata ? "true" : "false", 464 with_metadata => $with_metadata ? "true" : "false",
311 with_volatile => $with_volatile ? "true" : "false", 465 with_volatile => $with_volatile ? "true" : "false",
312 id_cb => sub { 466 id_cb => sub {
313 my ($self, $type, $kv, $rdata) = @_; 467 my ($self, $type, $kv, $rdata) = @_;
314 468
315 if ($type eq "end_list_peers") { 469 if ($type eq "end_list_peers") {
316 $cv->(\@res); 470 $ok->(\@res);
317 1 471 1
318 } else { 472 } else {
319 push @res, $kv; 473 push @res, $kv;
320 0 474 0
321 } 475 }
322 }, 476 },
323 ); 477 );
324}; 478};
325 479
326=item $cv = $fcp->list_peer_notes ($node_identifier)
327
328=item $notes = $fcp->list_peer_notes_sync ($node_identifier) 480=item $notes = $fcp->list_peer_notes ($node_identifier)
329 481
330=cut 482=cut
331 483
332_txn list_peer_notes => sub { 484_txn list_peer_notes => sub {
333 my ($self, $cv, $node_identifier) = @_; 485 my ($self, $ok, undef, $node_identifier) = @_;
334 486
335 $self->send_msg (list_peer_notes => 487 $self->send_msg (list_peer_notes =>
336 node_identifier => $node_identifier, 488 node_identifier => $node_identifier,
337 id_cb => sub { 489 id_cb => sub {
338 my ($self, $type, $kv, $rdata) = @_; 490 my ($self, $type, $kv, $rdata) = @_;
339 491
340 $cv->($kv); 492 $ok->($kv);
341 1 493 1
342 }, 494 },
343 ); 495 );
344}; 496};
345 497
346=item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
347
348=item $fcp->watch_global_sync ($enabled[, $verbosity_mask]) 498=item $fcp->watch_global ($enabled[, $verbosity_mask])
349 499
350=cut 500=cut
351 501
352_txn watch_global => sub { 502_txn watch_global => sub {
353 my ($self, $cv, $enabled, $verbosity_mask) = @_; 503 my ($self, $ok, $err, $enabled, $verbosity_mask) = @_;
354 504
355 $self->send_msg (watch_global => 505 $self->send_msg (watch_global =>
356 enabled => $enabled ? "true" : "false", 506 enabled => $enabled ? "true" : "false",
357 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (), 507 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
358 ); 508 );
359 509
360 $cv->(); 510 $ok->();
361}; 511};
362 512
363=item $cv = $fcp->list_persistent_requests
364
365=item $reqs = $fcp->list_persistent_requests_sync 513=item $reqs = $fcp->list_persistent_requests
366 514
367=cut 515=cut
368 516
369_txn list_persistent_requests => sub { 517_txn list_persistent_requests => sub {
370 my ($self, $cv) = @_; 518 my ($self, $ok, $err) = @_;
371 519
372 $self->serialise (list_persistent_requests => sub { 520 $self->serialise (list_persistent_requests => sub {
373 my ($self, $guard) = @_; 521 my ($self, $guard) = @_;
374 522
375 my %res; 523 my @res;
376 524
377 $self->send_msg ("list_persistent_requests"); 525 $self->send_msg ("list_persistent_requests");
378 526
379 $self->on (sub { 527 $self->on (sub {
380 my ($self, $type, $kv, $rdata) = @_; 528 my ($self, $type, $kv, $rdata) = @_;
381 529
382 $guard if 0; 530 $guard if 0;
383 531
384 if ($type eq "end_list_persistent_requests") { 532 if ($type eq "end_list_persistent_requests") {
385 $cv->(\%res); 533 $ok->(\@res);
386 return; 534 return;
387 } else { 535 } else {
388 my $id = $kv->{identifier}; 536 my $id = $kv->{identifier};
389 537
390 if ($type =~ /^persistent_(get|put|put_dir)$/) { 538 if ($type =~ /^persistent_(get|put|put_dir)$/) {
391 $res{$id} = { 539 push @res, [$type, $kv];
392 type => $1,
393 %{ $res{$id} },
394 %$kv,
395 };
396 } elsif ($type eq "simple_progress") {
397 delete $kv->{pkt_type}; # save memory
398 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
399 } else {
400 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
401 } 540 }
402 } 541 }
403 542
404 1 543 1
405 }); 544 });
406 }); 545 });
407}; 546};
408 547
409=item $cv = $fcp->remove_request ($global, $identifier) 548=item $sync = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
410 549
411=item $status = $fcp->remove_request_sync ($global, $identifier) 550Update either the C<client_token> or C<priority_class> of a request
551identified by C<$global> and C<$identifier>, depending on which of
552C<$client_token> and C<$priority_class> are not C<undef>.
412 553
413=cut 554=cut
414 555
415_txn remove_request => sub { 556_txn modify_persistent_request => sub {
416 my ($self, $cv, $global, $identifier) = @_; 557 my ($self, $ok, $err, $global, $identifier, $client_token, $priority_class) = @_;
417 558
559 $self->serialise ($identifier => sub {
560 my ($self, $guard) = @_;
561
418 $self->send_msg (remove_request => 562 $self->send_msg (modify_persistent_request =>
419 global => $global ? "true" : "false", 563 global => $global ? "true" : "false",
420 identifier => $identifier, 564 identifier => $identifier,
421 id_cb => sub { 565 defined $client_token ? (client_token => $client_token ) : (),
566 defined $priority_class ? (priority_class => $priority_class) : (),
567 );
568
569 $self->on (sub {
422 my ($self, $type, $kv, $rdata) = @_; 570 my ($self, $type, $kv, @extra) = @_;
423 571
572 $guard if 0;
573
574 if ($kv->{identifier} eq $identifier) {
575 if ($type eq "persistent_request_modified") {
424 $cv->($kv); 576 $ok->($kv);
577 return;
578 } elsif ($type eq "protocol_error") {
579 $err->($kv);
580 return;
581 }
582 }
583
425 1 584 1
426 }, 585 });
427 ); 586 });
428}; 587};
429 588
430=item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
431
432=item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
433
434=cut
435
436_txn modify_persistent_request => sub {
437 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
438
439 $self->send_msg (modify_persistent_request =>
440 global => $global ? "true" : "false",
441 defined $client_token ? (client_token => $client_token ) : (),
442 defined $priority_class ? (priority_class => $priority_class) : (),
443 identifier => $identifier,
444 id_cb => sub {
445 my ($self, $type, $kv, $rdata) = @_;
446
447 $cv->($kv);
448 1
449 },
450 );
451};
452
453=item $cv = $fcp->get_plugin_info ($name, $detailed)
454
455=item $info = $fcp->get_plugin_info_sync ($name, $detailed) 589=item $info = $fcp->get_plugin_info ($name, $detailed)
456 590
457=cut 591=cut
458 592
459_txn get_plugin_info => sub { 593_txn get_plugin_info => sub {
460 my ($self, $cv, $name, $detailed) = @_; 594 my ($self, $ok, $err, $name, $detailed) = @_;
595
596 my $id = $self->identifier;
461 597
462 $self->send_msg (get_plugin_info => 598 $self->send_msg (get_plugin_info =>
599 identifier => $id,
463 plugin_name => $name, 600 plugin_name => $name,
464 detailed => $detailed ? "true" : "false", 601 detailed => $detailed ? "true" : "false",
465 id_cb => sub {
466 my ($self, $type, $kv, $rdata) = @_;
467
468 $cv->($kv);
469 1
470 },
471 ); 602 );
603 $self->on (sub {
604 my ($self, $type, $kv) = @_;
605
606 if ($kv->{identifier} eq $id) {
607 if ($type eq "get_plugin_info") {
608 $ok->($kv);
609 } else {
610 $err->($kv, $type);
611 }
612 return;
613 }
614
615 1
616 });
472}; 617};
473 618
474=item $cv = $fcp->client_get ($uri, $identifier, %kv)
475
476=item $status = $fcp->client_get_sync ($uri, $identifier, %kv) 619=item $status = $fcp->client_get ($uri, $identifier, %kv)
477 620
478%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>). 621%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
479 622
480ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries, 623ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
481priority_class, persistence, client_token, global, return_type, 624priority_class, persistence, client_token, global, return_type,
482binary_blob, allowed_mime_types, filename, temp_filename 625binary_blob, allowed_mime_types, filename, temp_filename
483 626
484=cut 627=cut
485 628
486_txn client_get => sub { 629_txn client_get => sub {
487 my ($self, $cv, $uri, $identifier, %kv) = @_; 630 my ($self, $ok, $err, $uri, $identifier, %kv) = @_;
488 631
632 $self->serialise ($identifier => sub {
633 my ($self, $guard) = @_;
634
489 $self->send_msg (client_get => 635 $self->send_msg (client_get =>
490 %kv, 636 %kv,
491 uri => $uri, 637 uri => $uri,
492 identifier => $identifier, 638 identifier => $identifier,
493 id_cb => sub { 639 );
640
641 $self->on (sub {
494 my ($self, $type, $kv, $rdata) = @_; 642 my ($self, $type, $kv, @extra) = @_;
495 643
644 $guard if 0;
645
646 if ($kv->{identifier} eq $identifier) {
647 if ($type eq "persistent_get") {
496 $cv->($kv); 648 $ok->($kv);
649 return;
650 } elsif ($type eq "protocol_error") {
651 $err->($kv);
652 return;
653 }
654 }
655
497 1 656 1
498 }, 657 });
499 ); 658 });
500}; 659};
501 660
502=item $cv = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write) 661=item $status = $fcp->remove_request ($identifier[, $global])
503 662
663Remove the request with the given isdentifier. Returns true if successful,
664false on error.
665
666=cut
667
668_txn remove_request => sub {
669 my ($self, $ok, $err, $identifier, $global) = @_;
670
671 $self->serialise ($identifier => sub {
672 my ($self, $guard) = @_;
673
674 $self->send_msg (remove_request =>
675 identifier => $identifier,
676 global => $global ? "true" : "false",
677 );
678 $self->on (sub {
679 my ($self, $type, $kv, @extra) = @_;
680
681 $guard if 0;
682
683 if ($kv->{identifier} eq $identifier) {
684 if ($type eq "persistent_request_removed") {
685 $ok->(1);
686 return;
687 } elsif ($type eq "protocol_error") {
688 $err->($kv);
689 return;
690 }
691 }
692
693 1
694 });
695 });
696};
697
504=item ($can_read, $can_write) = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write)) 698=item ($can_read, $can_write) = $fcp->test_dda ($local_directory, $remote_directory, $want_read, $want_write))
505 699
506The DDA test in FCP is probably the single most broken protocol - only 700The DDA test in FCP is probably the single most broken protocol - only
507one directory test can be outstanding at any time, and some guessing and 701one directory test can be outstanding at any time, and some guessing and
508heuristics are involved in mangling the paths. 702heuristics are involved in mangling the paths.
509 703
510This function combines C<TestDDARequest> and C<TestDDAResponse> in one 704This function combines C<TestDDARequest> and C<TestDDAResponse> in one
511request, handling file reading and writing as well. 705request, handling file reading and writing as well, and tries very hard to
706do the right thing.
707
708Both C<$local_directory> and C<$remote_directory> must specify the same
709directory - C<$local_directory> is the directory path on the client (where
710L<AnyEvent::FCP> runs) and C<$remote_directory> is the directory path on
711the server (where the freenet node runs). When both are running on the
712same node, the paths are generally identical.
713
714C<$want_read> and C<$want_write> should be set to a true value when you
715want to read (get) files or write (put) files, respectively.
716
717On error, an exception is thrown. Otherwise, C<$can_read> and
718C<$can_write> indicate whether you can reaqd or write to freenet via the
719directory.
512 720
513=cut 721=cut
514 722
515_txn test_dda => sub { 723_txn test_dda => sub {
516 my ($self, $cv, $local, $remote, $want_read, $want_write) = @_; 724 my ($self, $ok, $err, $local, $remote, $want_read, $want_write) = @_;
517 725
518 $self->serialise (test_dda => sub { 726 $self->serialise (test_dda => sub {
519 my ($self, $guard) = @_; 727 my ($self, $guard) = @_;
520 728
521 $self->send_msg (test_dda_request => 729 $self->send_msg (test_dda_request =>
542 } 750 }
543 751
544 my %response = (directory => $remote); 752 my %response = (directory => $remote);
545 753
546 if (length $kv->{read_filename}) { 754 if (length $kv->{read_filename}) {
547 warn "$local/$kv->{read_filename}";#d#
548 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") { 755 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
549 sysread $fh, my $buf, -s $fh; 756 sysread $fh, my $buf, -s $fh;
550 $response{read_content} = $buf; 757 $response{read_content} = $buf;
551 } 758 }
552 } 759 }
563 my ($self, $type, $kv) = @_; 770 my ($self, $type, $kv) = @_;
564 771
565 $guard if 0; # reference 772 $guard if 0; # reference
566 773
567 if ($type eq "test_dda_complete") { 774 if ($type eq "test_dda_complete") {
568 $cv->( 775 $ok->(
569 $kv->{read_directory_allowed} eq "true", 776 $kv->{read_directory_allowed} eq "true",
570 $kv->{write_directory_allowed} eq "true", 777 $kv->{write_directory_allowed} eq "true",
571 ); 778 );
572 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) { 779 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
573 $cv->croak ($kv->{extra_description}); 780 $err->($kv->{extra_description});
574 return; 781 return;
575 } 782 }
576 783
577 1 784 1
578 }); 785 });
579 786
580 return; 787 return;
581 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) { 788 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
582 $cv->croak ($kv->{extra_description}); 789 $err->($kv);
583 return; 790 return;
584 } 791 }
585 792
586 1 793 1
587 }); 794 });
588 }); 795 });
589}; 796};
590 797
591=back 798=back
592 799
800=head2 REQUEST CACHE
801
802The C<AnyEvent::FCP> class keeps a request cache, where it caches all
803information from requests.
804
805For these messages, it will store a copy of the key-value pairs, together with a C<type> slot,
806in C<< $fcp->{req}{$identifier} >>:
807
808 persistent_get
809 persistent_put
810 persistent_put_dir
811
812This message updates the stored data:
813
814 persistent_request_modified
815
816This message will remove this entry:
817
818 persistent_request_removed
819
820These messages get merged into the cache entry, under their
821type, i.e. a C<simple_progress> message will be stored in C<<
822$fcp->{req}{$identifier}{simple_progress} >>:
823
824 simple_progress # get/put
825
826 uri_generated # put
827 generated_metadata # put
828 started_compression # put
829 finished_compression # put
830 put_failed # put
831 put_fetchable # put
832 put_successful # put
833
834 sending_to_network # get
835 compatibility_mode # get
836 expected_hashes # get
837 expected_mime # get
838 expected_data_length # get
839 get_failed # get
840 data_found # get
841 enter_finite_cooldown # get
842
843In addition, an event (basically a fake message) of type C<request_changed> is generated
844on every change, which will be called as C<< $cb->($fcp, $kv, $type) >>, where C<$type>
845is the type of the original message triggering the change,
846
847To fill this cache with the global queue and keep it updated,
848call C<watch_global> to subscribe to updates, followed by
849C<list_persistent_requests_sync>.
850
851 $fcp->watch_global_sync_; # do not wait
852 $fcp->list_persistent_requests; # wait
853
854To get a better idea of what is stored in the cache, here is an example of
855what might be stored in C<< $fcp->{req}{"Frost-gpl.txt"} >>:
856
857 {
858 identifier => "Frost-gpl.txt",
859 uri => 'CHK@Fnx5kzdrfE,EImdzaVyEWl,AAIC--8/gpl.txt',
860 binary_blob => "false",
861 global => "true",
862 max_retries => -1,
863 max_size => 9223372036854775807,
864 persistence => "forever",
865 priority_class => 3,
866 real_time => "false",
867 return_type => "direct",
868 started => "true",
869 type => "persistent_get",
870 verbosity => 2147483647,
871 sending_to_network => {
872 identifier => "Frost-gpl.txt",
873 global => "true",
874 },
875 compatibility_mode => {
876 identifier => "Frost-gpl.txt",
877 definitive => "true",
878 dont_compress => "false",
879 global => "true",
880 max => "COMPAT_1255",
881 min => "COMPAT_1255",
882 },
883 expected_hashes => {
884 identifier => "Frost-gpl.txt",
885 global => "true",
886 hashes => {
887 ed2k => "d83596f5ee3b7...",
888 md5 => "e0894e4a2a6...",
889 sha1 => "...",
890 sha256 => "...",
891 sha512 => "...",
892 tth => "...",
893 },
894 },
895 expected_mime => {
896 identifier => "Frost-gpl.txt",
897 global => "true",
898 metadata => { content_type => "application/rar" },
899 },
900 expected_data_length => {
901 identifier => "Frost-gpl.txt",
902 data_length => 37576,
903 global => "true",
904 },
905 simple_progress => {
906 identifier => "Frost-gpl.txt",
907 failed => 0,
908 fatally_failed => 0,
909 finalized_total => "true",
910 global => "true",
911 last_progress => 1438639282628,
912 required => 372,
913 succeeded => 102,
914 total => 747,
915 },
916 data_found => {
917 identifier => "Frost-gpl.txt",
918 completion_time => 1438663354026,
919 data_length => 37576,
920 global => "true",
921 metadata => { content_type => "image/jpeg" },
922 startup_time => 1438657196167,
923 },
924 }
925
593=head1 EXAMPLE PROGRAM 926=head1 EXAMPLE PROGRAM
594 927
595 use AnyEvent::FCP; 928 use AnyEvent::FCP;
596 929
597 my $fcp = new AnyEvent::FCP; 930 my $fcp = new AnyEvent::FCP;
598 931
599 # let us look at the global request list 932 # let us look at the global request list
600 $fcp->watch_global (1, 0); 933 $fcp->watch_global_ (1);
601 934
602 # list them, synchronously 935 # list them, synchronously
603 my $req = $fcp->list_persistent_requests_sync; 936 my $req = $fcp->list_persistent_requests;
604 937
605 # go through all requests 938 # go through all requests
939TODO
606 for my $req (values %$req) { 940 for my $req (values %$req) {
607 # skip jobs not directly-to-disk 941 # skip jobs not directly-to-disk
608 next unless $req->{return_type} eq "disk"; 942 next unless $req->{return_type} eq "disk";
609 # skip jobs not issued by FProxy 943 # skip jobs not issued by FProxy
610 next unless $req->{identifier} =~ /^FProxy:/; 944 next unless $req->{identifier} =~ /^FProxy:/;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines