ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.10 by root, Tue Aug 4 00:50:25 2015 UTC vs.
Revision 1.20 by root, Sun Jun 12 01:32:37 2016 UTC

35 35
36 use AnyEvent::FCP; 36 use AnyEvent::FCP;
37 37
38 my $fcp = new AnyEvent::FCP; 38 my $fcp = new AnyEvent::FCP;
39 39
40 $fcp->watch_global_sync (1, 0); 40 $fcp->watch_global (1, 0);
41 my $req = $fcp->list_persistent_requests_sync; 41 my $req = $fcp->list_persistent_requests;
42 42
43TODO
43 for my $req (values %$req) { 44 for my $req (values %$req) {
44 if ($req->{filename} =~ /a/) { 45 if ($req->{filename} =~ /a/) {
45 $fcp->modify_persistent_request_sync (1, $req->{identifier}, undef, 0); 46 $fcp->modify_persistent_request (1, $req->{identifier}, undef, 0);
46 } 47 }
47 } 48 }
48 49
49=head2 IMPORT TAGS 50=head2 IMPORT TAGS
50 51
51Nothing much can be "imported" from this module right now. 52Nothing much can be "imported" from this module right now.
52 53
53=head2 THE AnyEvent::FCP CLASS 54=head1 THE AnyEvent::FCP CLASS
54 55
55=over 4 56=over 4
56 57
57=cut 58=cut
58 59
60 61
61use common::sense; 62use common::sense;
62 63
63use Carp; 64use Carp;
64 65
65our $VERSION = '0.3'; 66our $VERSION = 0.4;
66 67
67use Scalar::Util (); 68use Scalar::Util ();
68 69
69use AnyEvent; 70use AnyEvent;
70use AnyEvent::Handle; 71use AnyEvent::Handle;
71use AnyEvent::Util (); 72use AnyEvent::Util ();
73
74our %TOLC; # tolc cache
72 75
73sub touc($) { 76sub touc($) {
74 local $_ = shift; 77 local $_ = shift;
75 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/; 78 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
76 s/(?:^|_)(.)/\U$1/g; 79 s/(?:^|_)(.)/\U$1/g;
77 $_ 80 $_
78} 81}
79 82
80sub tolc($) { 83sub tolc($) {
81 local $_ = shift; 84 local $_ = shift;
82 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/i; 85 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/;
83 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/i; 86 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/;
84 s/(?<=[a-z])(?=[A-Z])/_/g; 87 s/(?<=[a-z])(?=[A-Z])/_/g;
85 lc 88 lc
86} 89}
87 90
88=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 91=item $fcp = new AnyEvent::FCP key => value...;
89 92
90Create a new FCP connection to the given host and port (default 93Create a new FCP connection to the given host and port (default
91127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). 94127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
92 95
93If no C<name> was specified, then AnyEvent::FCP will generate a 96If no C<name> was specified, then AnyEvent::FCP will generate a
94(hopefully) unique client name for you. 97(hopefully) unique client name for you.
95 98
96You can install a progress callback that is being called with the AnyEvent::FCP 99The following keys can be specified (they are all optional):
97object, the type, a hashref with key-value pairs and a reference to any received data,
98for all unsolicited messages.
99 100
100Example: 101=over 4
101 102
102 sub progress_cb { 103=item name => $string
103 my ($self, $type, $kv, $rdata) = @_;
104 104
105 if ($type eq "simple_progress") { 105A unique name to identify this client. If none is specified, a randomly
106 warn "$kv->{identifier} $kv->{succeeded}/$kv->{required}\n"; 106generated name will be used.
107 } 107
108 } 108=item host => $hostname
109
110The hostname or IP address of the freenet node. Default is C<$ENV{FREDHOST}>
111or C<127.0.0.1>.
112
113=item port => $portnumber
114
115The port number of the FCP port. Default is C<$ENV{FREDPORT}> or C<9481>.
116
117=item timeout => $seconds
118
119The timeout, in seconds, after which a connection error is assumed when
120there is no activity. Default is C<7200>, i.e. two hours.
121
122=item keepalive => $seconds
123
124The interval, in seconds, at which keepalive messages will be
125sent. Default is C<540>, i.e. nine minutes.
126
127These keepalive messages are useful both to detect that a connection is
128no longer working and to keep any (home) routers from expiring their
129masquerading entry.
130
131=item on_error => $callback->($fcp, $message)
132
133Invoked on any (fatal) errors, such as unexpected connection close. The
134callback receives the FCP object and a textual error message.
135
136=item on_failure => $callback->($fcp, $backtrace, $args, $error)
137
138Invoked when an FCP request fails that didn't have a failure callback. See
139L<FCP REQUESTS> for details.
140
141=back
109 142
110=cut 143=cut
111 144
112sub new { 145sub new {
113 my $class = shift; 146 my $class = shift;
147
148 my $rand = join "", map chr 0x21 + rand 94, 1..40; # ~ 262 bits entropy
149
114 my $self = bless { @_ }, $class; 150 my $self = bless {
115 151 host => $ENV{FREDHOST} || "127.0.0.1",
116 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 152 port => $ENV{FREDPORT} || 9481,
117 $self->{port} ||= $ENV{FREDPORT} || 9481; 153 timeout => 3600 * 2,
118 $self->{name} ||= time.rand.rand.rand; # lame 154 keepalive => 9 * 60,
119 $self->{timeout} ||= 3600*2; 155 name => time.rand.rand.rand, # lame
120 $self->{progress} ||= sub { }; 156 @_,
121 157 queue => [],
122 $self->{id} = "a0"; 158 req => {},
159 prefix => "..:aefcpid:$rand:",
160 idseq => "a0",
161 }, $class;
123 162
124 { 163 {
125 Scalar::Util::weaken (my $self = $self); 164 Scalar::Util::weaken (my $self = $self);
165
166 $self->{kw} = AE::timer $self->{keepalive}, $self->{keepalive}, sub {
167 $self->{hdl}->push_write ("\n");
168 };
169
170 our $ENDMESSAGE = qr<\012(EndMessage|Data)\012>;
171
172 # these are declared here for performance reasons
173 my ($k, $v, $type);
174 my $rdata;
175
176 my $on_read = sub {
177 my ($hdl) = @_;
178
179 # we only carve out whole messages here
180 while ($hdl->{rbuf} =~ /\012(EndMessage|Data)\012/) {
181 # remember end marker
182 $rdata = $1 eq "Data"
183 or $1 eq "EndMessage"
184 or return $self->fatal ("protocol error, expected message end, got $1\n");
185
186 my @lines = split /\012/, substr $hdl->{rbuf}, 0, $-[0];
187
188 substr $hdl->{rbuf}, 0, $+[0], ""; # remove pkg
189
190 $type = shift @lines;
191 $type = ($TOLC{$type} ||= tolc $type);
192
193 my %kv;
194
195 for (@lines) {
196 ($k, $v) = split /=/, $_, 2;
197 $k = ($TOLC{$k} ||= tolc $k);
198
199 if ($k =~ /\./) {
200 # generic, slow case
201 my @k = split /\./, $k;
202 my $ro = \\%kv;
203
204 while (@k) {
205 $k = shift @k;
206 if ($k =~ /^\d+$/) {
207 $ro = \$$ro->[$k];
208 } else {
209 $ro = \$$ro->{$k};
210 }
211 }
212
213 $$ro = $v;
214
215 next;
216 }
217
218 # special comon case, for performance only
219 $kv{$k} = $v;
220 }
221
222 if ($rdata) {
223 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
224 $rdata = \$_[1];
225 $self->recv ($type, \%kv, $rdata);
226 });
227
228 last; # do not tgry to parse more messages
229 } else {
230 $self->recv ($type, \%kv);
231 }
232 }
233 };
126 234
127 $self->{hdl} = new AnyEvent::Handle 235 $self->{hdl} = new AnyEvent::Handle
128 connect => [$self->{host} => $self->{port}], 236 connect => [$self->{host} => $self->{port}],
129 timeout => $self->{timeout}, 237 timeout => $self->{timeout},
238 on_read => $on_read,
239 on_eof => $self->{on_eof},
130 on_error => sub { 240 on_error => sub {
131 warn "@_\n";#d# 241 $self->fatal ($_[2]);
132 exit 1;
133 }, 242 },
134 on_read => sub { $self->on_read (@_) }, 243 ;
135 on_eof => $self->{on_eof} || sub { };
136 244
137 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 245 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
138 } 246 }
139 247
140 $self->send_msg ( 248 $self->send_msg (client_hello =>
141 client_hello =>
142 name => $self->{name}, 249 name => $self->{name},
143 expected_version => "2.0", 250 expected_version => "2.0",
144 ); 251 );
145 252
146 $self 253 $self
147} 254}
148 255
256sub fatal {
257 my ($self, $msg) = @_;
258
259 $self->{hdl}->shutdown;
260 delete $self->{kw};
261
262 if ($self->{on_error}) {
263 $self->{on_error}->($self, $msg);
264 } else {
265 die $msg;
266 }
267}
268
269sub identifier {
270 $_[0]{prefix} . ++$_[0]{idseq}
271}
272
149sub send_msg { 273sub send_msg {
150 my ($self, $type, %kv) = @_; 274 my ($self, $type, %kv) = @_;
151 275
152 my $data = delete $kv{data}; 276 my $data = delete $kv{data};
153 277
154 if (exists $kv{id_cb}) { 278 if (exists $kv{id_cb}) {
155 my $id = $kv{identifier} ||= ++$self->{id}; 279 my $id = $kv{identifier} ||= $self->identifier;
156 $self->{id}{$id} = delete $kv{id_cb}; 280 $self->{id}{$id} = delete $kv{id_cb};
157 } 281 }
158 282
159 my $msg = (touc $type) . "\012" 283 my $msg = (touc $type) . "\012"
160 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; 284 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
202 push @$queue, $cb; 326 push @$queue, $cb;
203 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) }) 327 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
204 unless $#$queue; 328 unless $#$queue;
205} 329}
206 330
207sub on_read { 331# how to merge these types into $self->{persistent}
208 my ($self) = @_; 332our %PERSISTENT_TYPE = (
333 persistent_get => sub { %{ $_[1] } = (type => "persistent_get" , %{ $_[2] }) },
334 persistent_put => sub { %{ $_[1] } = (type => "persistent_put" , %{ $_[2] }) },
335 persistent_put_dir => sub { %{ $_[1] } = (type => "persistent_put_dir", %{ $_[2] }) },
336 persistent_request_modified => sub { %{ $_[1] } = (%{ $_[1] }, %{ $_[2] }) },
337 persistent_request_removed => sub { delete $_[0]{req}{$_[2]{identifier}} },
209 338
210 my $type; 339 simple_progress => sub { $_[1]{simple_progress} = $_[2] }, # get/put
211 my %kv;
212 my $rdata;
213 340
214 my $done_cb = sub { 341 uri_generated => sub { $_[1]{uri_generated} = $_[2] }, # put
215 $kv{pkt_type} = $type; 342 generated_metadata => sub { $_[1]{generated_metadata} = $_[2] }, # put
343 started_compression => sub { $_[1]{started_compression} = $_[2] }, # put
344 finished_compression => sub { $_[1]{finished_compression} = $_[2] }, # put
345 put_fetchable => sub { $_[1]{put_fetchable} = $_[2] }, # put
346 put_failed => sub { $_[1]{put_failed} = $_[2] }, # put
347 put_successful => sub { $_[1]{put_successful} = $_[2] }, # put
216 348
349 sending_to_network => sub { $_[1]{sending_to_network} = $_[2] }, # get
350 compatibility_mode => sub { $_[1]{compatibility_mode} = $_[2] }, # get
351 expected_hashes => sub { $_[1]{expected_hashes} = $_[2] }, # get
352 expected_mime => sub { $_[1]{expected_mime} = $_[2] }, # get
353 expected_data_length => sub { $_[1]{expected_data_length} = $_[2] }, # get
354 get_failed => sub { $_[1]{get_failed} = $_[2] }, # get
355 data_found => sub { $_[1]{data_found} = $_[2] }, # get
356 enter_finite_cooldown => sub { $_[1]{enter_finite_cooldown} = $_[2] }, # get
357);
358
359sub recv {
360 my ($self, $type, $kv, @extra) = @_;
361
362 if (my $cb = $PERSISTENT_TYPE{$type}) {
363 my $id = $kv->{identifier};
364 my $req = $_[0]{req}{$id} ||= {};
365 $cb->($self, $req, $kv);
366 $self->recv (request_changed => $kv, $type, @extra);
367 }
368
217 my $on = $self->{on}; 369 my $on = $self->{on};
218 for (0 .. $#$on) { 370 for (0 .. $#$on) {
219 unless (my $res = $on->[$_]($self, $type, \%kv, $rdata)) { 371 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
220 splice @$on, $_, 1 unless defined $res; 372 splice @$on, $_, 1 unless defined $res;
221 return; 373 return;
222 }
223 } 374 }
375 }
224 376
225 if (my $cb = $self->{queue}[0]) { 377 if (my $cb = $self->{queue}[0]) {
226 $cb->($self, $type, \%kv, $rdata) 378 $cb->($self, $type, $kv, @extra)
227 and shift @{ $self->{queue} }; 379 and shift @{ $self->{queue} };
228 } else { 380 } else {
229 $self->default_recv ($type, \%kv, $rdata); 381 $self->default_recv ($type, $kv, @extra);
230 }
231 }; 382 }
232
233 my $hdr_cb; $hdr_cb = sub {
234 if ($_[1] =~ /^([^=]+)=(.*)$/) {
235 my ($k, $v) = ($1, $2);
236 my @k = split /\./, tolc $k;
237 my $ro = \\%kv;
238
239 while (@k) {
240 my $k = shift @k;
241 if ($k =~ /^\d+$/) {
242 $ro = \$$ro->[$k];
243 } else {
244 $ro = \$$ro->{$k};
245 }
246 }
247
248 $$ro = $v;
249
250 $_[0]->push_read (line => $hdr_cb);
251 } elsif ($_[1] eq "Data") {
252 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
253 $rdata = \$_[1];
254 $done_cb->();
255 });
256 } elsif ($_[1] eq "EndMessage") {
257 $done_cb->();
258 } else {
259 die "protocol error, expected message end, got $_[1]\n";#d#
260 }
261 };
262
263 $self->{hdl}->push_read (line => sub {
264 $type = tolc $_[1];
265 $_[0]->push_read (line => $hdr_cb);
266 });
267} 383}
268 384
269sub default_recv { 385sub default_recv {
270 my ($self, $type, $kv, $rdata) = @_; 386 my ($self, $type, $kv, $rdata) = @_;
271 387
272 if ($type eq "node_hello") { 388 if ($type eq "node_hello") {
273 $self->{node_hello} = $kv; 389 $self->{node_hello} = $kv;
274 } elsif (exists $self->{id}{$kv->{identifier}}) { 390 } elsif (exists $self->{id}{$kv->{identifier}}) {
275 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) 391 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
276 and delete $self->{id}{$kv->{identifier}}; 392 and delete $self->{id}{$kv->{identifier}};
277 } else {
278 &{ $self->{progress} };
279 } 393 }
280} 394}
395
396=back
397
398=head2 FCP REQUESTS
399
400The following methods implement various requests. Most of them map
401directory to the FCP message of the same name. The added benefit of
402these over sending requests yourself is that they handle the necessary
403serialisation, protocol quirks, and replies.
404
405All of them exist in two versions, the variant shown in this manpage, and
406a variant with an extra C<_> at the end, and an extra C<$cb> argument. The
407version as shown is I<synchronous> - it will wait for any replies, and
408either return the reply, or croak with an error. The underscore variant
409returns immediately and invokes one or more callbacks or condvars later.
410
411For example, the call
412
413 $info = $fcp->get_plugin_info ($name, $detailed);
414
415Also comes in this underscore variant:
416
417 $fcp->get_plugin_info_ ($name, $detailed, $cb);
418
419You can thinbk of the underscore as a kind of continuation indicator - the
420normal function waits and returns with the data, the C<_> indicates that
421you pass the continuation yourself, and the continuation will be invoked
422with the results.
423
424This callback/continuation argument (C<$cb>) can come in three forms itself:
425
426=over 4
427
428=item A code reference (or rather anything not matching some other alternative)
429
430This code reference will be invoked with the result on success. On an
431error, it will invoke the C<on_failure> callback of the FCP object, or,
432if none was defined, will die (in the event loop) with a backtrace of the
433call site.
434
435This is a popular choice, but it makes handling errors hard - make sure
436you never generate protocol errors!
437
438If an C<on_failure> hook exists, it will be invoked with the FCP object,
439a (textual) backtrace as generated by C<Carp::longmess>, and arrayref
440containing the arguments from the original request invocation and the
441error object from the server, in this order, e.g.:
442
443 on_failure => sub {
444 my ($fcp, $backtrace, $orig_args, $error_object) = @_;
445 ...
446 },
447
448=item A condvar (as returned by e.g. C<< AnyEvent->condvar >>)
449
450When a condvar is passed, it is sent (C<< $cv->send ($results) >>) the
451results when the request has finished. Should an error occur, the error
452will instead result in C<< $cv->croak ($error) >>.
453
454This is also a popular choice.
455
456=item An array with two callbacks C<[$success, $failure]>
457
458The C<$success> callback will be invoked with the results, while the
459C<$failure> callback will be invoked on any errors.
460
461The C<$failure> callback will be invoked with the error object from the
462server.
463
464=item C<undef>
465
466This is the same thing as specifying C<sub { }> as callback, i.e. on
467success, the results are ignored, while on failure, the C<on_failure> hook
468is invoked or the module dies with a backtrace.
469
470This is good for quick scripts, or when you really aren't interested in
471the results.
472
473=back
474
475=cut
476
477our $NOP_CB = sub { };
281 478
282sub _txn { 479sub _txn {
283 my ($name, $sub) = @_; 480 my ($name, $sub) = @_;
284 481
285 *{$name} = sub { 482 *{$name} = sub {
286 splice @_, 1, 0, (my $cv = AnyEvent->condvar); 483 my $cv = AE::cv;
287 &$sub;
288 $cv
289 };
290 484
291 *{"$name\_sync"} = sub { 485 splice @_, 1, 0, $cv, sub { $cv->croak ($_[0]{extra_description}) };
292 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
293 &$sub; 486 &$sub;
294 $cv->recv 487 $cv->recv
295 }; 488 };
296}
297 489
298=item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]]) 490 *{"$name\_"} = sub {
491 my ($ok, $err) = pop;
299 492
493 if (ARRAY:: eq ref $ok) {
494 ($ok, $err) = @$ok;
495 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
496 $err = sub { $ok->croak ($_[0]{extra_description}) };
497 } else {
498 my $bt = Carp::longmess "AnyEvent::FCP request $name";
499 Scalar::Util::weaken (my $self = $_[0]);
500 my $args = [@_]; shift @$args;
501 $err = sub {
502 if ($self->{on_failure}) {
503 $self->{on_failure}($self, $args, $bt, $_[0]);
504 } else {
505 die "$_[0]{code_description} ($_[0]{extra_description})$bt";
506 }
507 };
508 }
509
510 $ok ||= $NOP_CB;
511
512 splice @_, 1, 0, $ok, $err;
513 &$sub;
514 };
515}
516
517=over 4
518
300=item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]]) 519=item $peers = $fcp->list_peers ([$with_metdata[, $with_volatile]])
301 520
302=cut 521=cut
303 522
304_txn list_peers => sub { 523_txn list_peers => sub {
305 my ($self, $cv, $with_metadata, $with_volatile) = @_; 524 my ($self, $ok, undef, $with_metadata, $with_volatile) = @_;
306 525
307 my @res; 526 my @res;
308 527
309 $self->send_msg (list_peers => 528 $self->send_msg (list_peers =>
310 with_metadata => $with_metadata ? "true" : "false", 529 with_metadata => $with_metadata ? "true" : "false",
311 with_volatile => $with_volatile ? "true" : "false", 530 with_volatile => $with_volatile ? "true" : "false",
312 id_cb => sub { 531 id_cb => sub {
313 my ($self, $type, $kv, $rdata) = @_; 532 my ($self, $type, $kv, $rdata) = @_;
314 533
315 if ($type eq "end_list_peers") { 534 if ($type eq "end_list_peers") {
316 $cv->(\@res); 535 $ok->(\@res);
317 1 536 1
318 } else { 537 } else {
319 push @res, $kv; 538 push @res, $kv;
320 0 539 0
321 } 540 }
322 }, 541 },
323 ); 542 );
324}; 543};
325 544
326=item $cv = $fcp->list_peer_notes ($node_identifier)
327
328=item $notes = $fcp->list_peer_notes_sync ($node_identifier) 545=item $notes = $fcp->list_peer_notes ($node_identifier)
329 546
330=cut 547=cut
331 548
332_txn list_peer_notes => sub { 549_txn list_peer_notes => sub {
333 my ($self, $cv, $node_identifier) = @_; 550 my ($self, $ok, undef, $node_identifier) = @_;
334 551
335 $self->send_msg (list_peer_notes => 552 $self->send_msg (list_peer_notes =>
336 node_identifier => $node_identifier, 553 node_identifier => $node_identifier,
337 id_cb => sub { 554 id_cb => sub {
338 my ($self, $type, $kv, $rdata) = @_; 555 my ($self, $type, $kv, $rdata) = @_;
339 556
340 $cv->($kv); 557 $ok->($kv);
341 1 558 1
342 }, 559 },
343 ); 560 );
344}; 561};
345 562
346=item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
347
348=item $fcp->watch_global_sync ($enabled[, $verbosity_mask]) 563=item $fcp->watch_global ($enabled[, $verbosity_mask])
349 564
350=cut 565=cut
351 566
352_txn watch_global => sub { 567_txn watch_global => sub {
353 my ($self, $cv, $enabled, $verbosity_mask) = @_; 568 my ($self, $ok, $err, $enabled, $verbosity_mask) = @_;
354 569
355 $self->send_msg (watch_global => 570 $self->send_msg (watch_global =>
356 enabled => $enabled ? "true" : "false", 571 enabled => $enabled ? "true" : "false",
357 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (), 572 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
358 ); 573 );
359 574
360 $cv->(); 575 $ok->();
361}; 576};
362 577
363=item $cv = $fcp->list_persistent_requests
364
365=item $reqs = $fcp->list_persistent_requests_sync 578=item $reqs = $fcp->list_persistent_requests
366 579
367=cut 580=cut
368 581
369_txn list_persistent_requests => sub { 582_txn list_persistent_requests => sub {
370 my ($self, $cv) = @_; 583 my ($self, $ok, $err) = @_;
371 584
372 $self->serialise (list_persistent_requests => sub { 585 $self->serialise (list_persistent_requests => sub {
373 my ($self, $guard) = @_; 586 my ($self, $guard) = @_;
374 587
375 my %res; 588 my @res;
376 589
377 $self->send_msg ("list_persistent_requests"); 590 $self->send_msg ("list_persistent_requests");
378 591
379 $self->on (sub { 592 $self->on (sub {
380 my ($self, $type, $kv, $rdata) = @_; 593 my ($self, $type, $kv, $rdata) = @_;
381 594
382 $guard if 0; 595 $guard if 0;
383 596
384 if ($type eq "end_list_persistent_requests") { 597 if ($type eq "end_list_persistent_requests") {
385 $cv->(\%res); 598 $ok->(\@res);
386 return; 599 return;
387 } else { 600 } else {
388 my $id = $kv->{identifier}; 601 my $id = $kv->{identifier};
389 602
390 if ($type =~ /^persistent_(get|put|put_dir)$/) { 603 if ($type =~ /^persistent_(get|put|put_dir)$/) {
391 $res{$id} = { 604 push @res, [$type, $kv];
392 type => $1,
393 %{ $res{$id} },
394 %$kv,
395 };
396 } elsif ($type eq "simple_progress") {
397 delete $kv->{pkt_type}; # save memory
398 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
399 } else {
400 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
401 } 605 }
402 } 606 }
403 607
404 1 608 1
405 }); 609 });
406 }); 610 });
407}; 611};
408 612
409=item $cv = $fcp->remove_request ($global, $identifier) 613=item $sync = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
410 614
411=item $status = $fcp->remove_request_sync ($global, $identifier) 615Update either the C<client_token> or C<priority_class> of a request
616identified by C<$global> and C<$identifier>, depending on which of
617C<$client_token> and C<$priority_class> are not C<undef>.
412 618
413=cut 619=cut
414 620
415_txn remove_request => sub { 621_txn modify_persistent_request => sub {
416 my ($self, $cv, $global, $identifier) = @_; 622 my ($self, $ok, $err, $global, $identifier, $client_token, $priority_class) = @_;
417 623
624 $self->serialise ($identifier => sub {
625 my ($self, $guard) = @_;
626
418 $self->send_msg (remove_request => 627 $self->send_msg (modify_persistent_request =>
419 global => $global ? "true" : "false", 628 global => $global ? "true" : "false",
420 identifier => $identifier, 629 identifier => $identifier,
421 id_cb => sub { 630 defined $client_token ? (client_token => $client_token ) : (),
631 defined $priority_class ? (priority_class => $priority_class) : (),
632 );
633
634 $self->on (sub {
422 my ($self, $type, $kv, $rdata) = @_; 635 my ($self, $type, $kv, @extra) = @_;
423 636
637 $guard if 0;
638
639 if ($kv->{identifier} eq $identifier) {
640 if ($type eq "persistent_request_modified") {
424 $cv->($kv); 641 $ok->($kv);
642 return;
643 } elsif ($type eq "protocol_error") {
644 $err->($kv);
645 return;
646 }
647 }
648
425 1 649 1
426 }, 650 });
427 ); 651 });
428}; 652};
429 653
430=item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
431
432=item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
433
434=cut
435
436_txn modify_persistent_request => sub {
437 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
438
439 $self->send_msg (modify_persistent_request =>
440 global => $global ? "true" : "false",
441 defined $client_token ? (client_token => $client_token ) : (),
442 defined $priority_class ? (priority_class => $priority_class) : (),
443 identifier => $identifier,
444 id_cb => sub {
445 my ($self, $type, $kv, $rdata) = @_;
446
447 $cv->($kv);
448 1
449 },
450 );
451};
452
453=item $cv = $fcp->get_plugin_info ($name, $detailed)
454
455=item $info = $fcp->get_plugin_info_sync ($name, $detailed) 654=item $info = $fcp->get_plugin_info ($name, $detailed)
456 655
457=cut 656=cut
458 657
459_txn get_plugin_info => sub { 658_txn get_plugin_info => sub {
460 my ($self, $cv, $name, $detailed) = @_; 659 my ($self, $ok, $err, $name, $detailed) = @_;
660
661 my $id = $self->identifier;
461 662
462 $self->send_msg (get_plugin_info => 663 $self->send_msg (get_plugin_info =>
664 identifier => $id,
463 plugin_name => $name, 665 plugin_name => $name,
464 detailed => $detailed ? "true" : "false", 666 detailed => $detailed ? "true" : "false",
465 id_cb => sub {
466 my ($self, $type, $kv, $rdata) = @_;
467
468 $cv->($kv);
469 1
470 },
471 ); 667 );
668 $self->on (sub {
669 my ($self, $type, $kv) = @_;
670
671 if ($kv->{identifier} eq $id) {
672 if ($type eq "get_plugin_info") {
673 $ok->($kv);
674 } else {
675 $err->($kv, $type);
676 }
677 return;
678 }
679
680 1
681 });
472}; 682};
473 683
474=item $cv = $fcp->client_get ($uri, $identifier, %kv)
475
476=item $status = $fcp->client_get_sync ($uri, $identifier, %kv) 684=item $status = $fcp->client_get ($uri, $identifier, %kv)
477 685
478%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>). 686%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
479 687
480ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries, 688ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
481priority_class, persistence, client_token, global, return_type, 689priority_class, persistence, client_token, global, return_type,
482binary_blob, allowed_mime_types, filename, temp_filename 690binary_blob, allowed_mime_types, filename, temp_filename
483 691
484=cut 692=cut
485 693
486_txn client_get => sub { 694_txn client_get => sub {
487 my ($self, $cv, $uri, $identifier, %kv) = @_; 695 my ($self, $ok, $err, $uri, $identifier, %kv) = @_;
488 696
697 $self->serialise ($identifier => sub {
698 my ($self, $guard) = @_;
699
489 $self->send_msg (client_get => 700 $self->send_msg (client_get =>
490 %kv, 701 %kv,
491 uri => $uri, 702 uri => $uri,
492 identifier => $identifier, 703 identifier => $identifier,
493 id_cb => sub { 704 );
705
706 $self->on (sub {
494 my ($self, $type, $kv, $rdata) = @_; 707 my ($self, $type, $kv, @extra) = @_;
495 708
709 $guard if 0;
710
711 if ($kv->{identifier} eq $identifier) {
712 if ($type eq "persistent_get") {
496 $cv->($kv); 713 $ok->($kv);
714 return;
715 } elsif ($type eq "protocol_error") {
716 $err->($kv);
717 return;
718 }
719 }
720
497 1 721 1
498 }, 722 });
499 ); 723 });
500}; 724};
501 725
502=item $cv = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write) 726=item $status = $fcp->remove_request ($identifier[, $global])
503 727
728Remove the request with the given isdentifier. Returns true if successful,
729false on error.
730
731=cut
732
733_txn remove_request => sub {
734 my ($self, $ok, $err, $identifier, $global) = @_;
735
736 $self->serialise ($identifier => sub {
737 my ($self, $guard) = @_;
738
739 $self->send_msg (remove_request =>
740 identifier => $identifier,
741 global => $global ? "true" : "false",
742 );
743 $self->on (sub {
744 my ($self, $type, $kv, @extra) = @_;
745
746 $guard if 0;
747
748 if ($kv->{identifier} eq $identifier) {
749 if ($type eq "persistent_request_removed") {
750 $ok->(1);
751 return;
752 } elsif ($type eq "protocol_error") {
753 $err->($kv);
754 return;
755 }
756 }
757
758 1
759 });
760 });
761};
762
504=item ($can_read, $can_write) = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write)) 763=item ($can_read, $can_write) = $fcp->test_dda ($local_directory, $remote_directory, $want_read, $want_write))
505 764
506The DDA test in FCP is probably the single most broken protocol - only 765The DDA test in FCP is probably the single most broken protocol - only
507one directory test can be outstanding at any time, and some guessing and 766one directory test can be outstanding at any time, and some guessing and
508heuristics are involved in mangling the paths. 767heuristics are involved in mangling the paths.
509 768
510This function combines C<TestDDARequest> and C<TestDDAResponse> in one 769This function combines C<TestDDARequest> and C<TestDDAResponse> in one
511request, handling file reading and writing as well. 770request, handling file reading and writing as well, and tries very hard to
771do the right thing.
772
773Both C<$local_directory> and C<$remote_directory> must specify the same
774directory - C<$local_directory> is the directory path on the client (where
775L<AnyEvent::FCP> runs) and C<$remote_directory> is the directory path on
776the server (where the freenet node runs). When both are running on the
777same node, the paths are generally identical.
778
779C<$want_read> and C<$want_write> should be set to a true value when you
780want to read (get) files or write (put) files, respectively.
781
782On error, an exception is thrown. Otherwise, C<$can_read> and
783C<$can_write> indicate whether you can reaqd or write to freenet via the
784directory.
512 785
513=cut 786=cut
514 787
515_txn test_dda => sub { 788_txn test_dda => sub {
516 my ($self, $cv, $local, $remote, $want_read, $want_write) = @_; 789 my ($self, $ok, $err, $local, $remote, $want_read, $want_write) = @_;
517 790
518 $self->serialise (test_dda => sub { 791 $self->serialise (test_dda => sub {
519 my ($self, $guard) = @_; 792 my ($self, $guard) = @_;
520 793
521 $self->send_msg (test_dda_request => 794 $self->send_msg (test_dda_request =>
542 } 815 }
543 816
544 my %response = (directory => $remote); 817 my %response = (directory => $remote);
545 818
546 if (length $kv->{read_filename}) { 819 if (length $kv->{read_filename}) {
547 warn "$local/$kv->{read_filename}";#d#
548 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") { 820 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
549 sysread $fh, my $buf, -s $fh; 821 sysread $fh, my $buf, -s $fh;
550 $response{read_content} = $buf; 822 $response{read_content} = $buf;
551 } 823 }
552 } 824 }
563 my ($self, $type, $kv) = @_; 835 my ($self, $type, $kv) = @_;
564 836
565 $guard if 0; # reference 837 $guard if 0; # reference
566 838
567 if ($type eq "test_dda_complete") { 839 if ($type eq "test_dda_complete") {
568 $cv->( 840 $ok->(
569 $kv->{read_directory_allowed} eq "true", 841 $kv->{read_directory_allowed} eq "true",
570 $kv->{write_directory_allowed} eq "true", 842 $kv->{write_directory_allowed} eq "true",
571 ); 843 );
572 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) { 844 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
573 $cv->croak ($kv->{extra_description}); 845 $err->($kv->{extra_description});
574 return; 846 return;
575 } 847 }
576 848
577 1 849 1
578 }); 850 });
579 851
580 return; 852 return;
581 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) { 853 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
582 $cv->croak ($kv->{extra_description}); 854 $err->($kv);
583 return; 855 return;
584 } 856 }
585 857
586 1 858 1
587 }); 859 });
588 }); 860 });
589}; 861};
590 862
591=back 863=back
592 864
865=head2 REQUEST CACHE
866
867The C<AnyEvent::FCP> class keeps a request cache, where it caches all
868information from requests.
869
870For these messages, it will store a copy of the key-value pairs, together with a C<type> slot,
871in C<< $fcp->{req}{$identifier} >>:
872
873 persistent_get
874 persistent_put
875 persistent_put_dir
876
877This message updates the stored data:
878
879 persistent_request_modified
880
881This message will remove this entry:
882
883 persistent_request_removed
884
885These messages get merged into the cache entry, under their
886type, i.e. a C<simple_progress> message will be stored in C<<
887$fcp->{req}{$identifier}{simple_progress} >>:
888
889 simple_progress # get/put
890
891 uri_generated # put
892 generated_metadata # put
893 started_compression # put
894 finished_compression # put
895 put_failed # put
896 put_fetchable # put
897 put_successful # put
898
899 sending_to_network # get
900 compatibility_mode # get
901 expected_hashes # get
902 expected_mime # get
903 expected_data_length # get
904 get_failed # get
905 data_found # get
906 enter_finite_cooldown # get
907
908In addition, an event (basically a fake message) of type C<request_changed> is generated
909on every change, which will be called as C<< $cb->($fcp, $kv, $type) >>, where C<$type>
910is the type of the original message triggering the change,
911
912To fill this cache with the global queue and keep it updated,
913call C<watch_global> to subscribe to updates, followed by
914C<list_persistent_requests_sync>.
915
916 $fcp->watch_global_sync_; # do not wait
917 $fcp->list_persistent_requests; # wait
918
919To get a better idea of what is stored in the cache, here is an example of
920what might be stored in C<< $fcp->{req}{"Frost-gpl.txt"} >>:
921
922 {
923 identifier => "Frost-gpl.txt",
924 uri => 'CHK@Fnx5kzdrfE,EImdzaVyEWl,AAIC--8/gpl.txt',
925 binary_blob => "false",
926 global => "true",
927 max_retries => -1,
928 max_size => 9223372036854775807,
929 persistence => "forever",
930 priority_class => 3,
931 real_time => "false",
932 return_type => "direct",
933 started => "true",
934 type => "persistent_get",
935 verbosity => 2147483647,
936 sending_to_network => {
937 identifier => "Frost-gpl.txt",
938 global => "true",
939 },
940 compatibility_mode => {
941 identifier => "Frost-gpl.txt",
942 definitive => "true",
943 dont_compress => "false",
944 global => "true",
945 max => "COMPAT_1255",
946 min => "COMPAT_1255",
947 },
948 expected_hashes => {
949 identifier => "Frost-gpl.txt",
950 global => "true",
951 hashes => {
952 ed2k => "d83596f5ee3b7...",
953 md5 => "e0894e4a2a6...",
954 sha1 => "...",
955 sha256 => "...",
956 sha512 => "...",
957 tth => "...",
958 },
959 },
960 expected_mime => {
961 identifier => "Frost-gpl.txt",
962 global => "true",
963 metadata => { content_type => "application/rar" },
964 },
965 expected_data_length => {
966 identifier => "Frost-gpl.txt",
967 data_length => 37576,
968 global => "true",
969 },
970 simple_progress => {
971 identifier => "Frost-gpl.txt",
972 failed => 0,
973 fatally_failed => 0,
974 finalized_total => "true",
975 global => "true",
976 last_progress => 1438639282628,
977 required => 372,
978 succeeded => 102,
979 total => 747,
980 },
981 data_found => {
982 identifier => "Frost-gpl.txt",
983 completion_time => 1438663354026,
984 data_length => 37576,
985 global => "true",
986 metadata => { content_type => "image/jpeg" },
987 startup_time => 1438657196167,
988 },
989 }
990
593=head1 EXAMPLE PROGRAM 991=head1 EXAMPLE PROGRAM
594 992
595 use AnyEvent::FCP; 993 use AnyEvent::FCP;
596 994
597 my $fcp = new AnyEvent::FCP; 995 my $fcp = new AnyEvent::FCP;
598 996
599 # let us look at the global request list 997 # let us look at the global request list
600 $fcp->watch_global (1, 0); 998 $fcp->watch_global_ (1);
601 999
602 # list them, synchronously 1000 # list them, synchronously
603 my $req = $fcp->list_persistent_requests_sync; 1001 my $req = $fcp->list_persistent_requests;
604 1002
605 # go through all requests 1003 # go through all requests
1004TODO
606 for my $req (values %$req) { 1005 for my $req (values %$req) {
607 # skip jobs not directly-to-disk 1006 # skip jobs not directly-to-disk
608 next unless $req->{return_type} eq "disk"; 1007 next unless $req->{return_type} eq "disk";
609 # skip jobs not issued by FProxy 1008 # skip jobs not issued by FProxy
610 next unless $req->{identifier} =~ /^FProxy:/; 1009 next unless $req->{identifier} =~ /^FProxy:/;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines