ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.8 by root, Fri Jun 18 16:59:13 2010 UTC vs.
Revision 1.20 by root, Sun Jun 12 01:32:37 2016 UTC

35 35
36 use AnyEvent::FCP; 36 use AnyEvent::FCP;
37 37
38 my $fcp = new AnyEvent::FCP; 38 my $fcp = new AnyEvent::FCP;
39 39
40 $fcp->watch_global_sync (1, 0); 40 $fcp->watch_global (1, 0);
41 my $req = $fcp->list_persistent_requests_sync; 41 my $req = $fcp->list_persistent_requests;
42 42
43TODO
43 for my $req (values %$req) { 44 for my $req (values %$req) {
44 if ($req->{filename} =~ /a/) { 45 if ($req->{filename} =~ /a/) {
45 $fcp->modify_persistent_request_sync (1, $req->{identifier}, undef, 0); 46 $fcp->modify_persistent_request (1, $req->{identifier}, undef, 0);
46 } 47 }
47 } 48 }
48 49
49=head2 IMPORT TAGS 50=head2 IMPORT TAGS
50 51
51Nothing much can be "imported" from this module right now. 52Nothing much can be "imported" from this module right now.
52 53
53=head2 THE AnyEvent::FCP CLASS 54=head1 THE AnyEvent::FCP CLASS
54 55
55=over 4 56=over 4
56 57
57=cut 58=cut
58 59
60 61
61use common::sense; 62use common::sense;
62 63
63use Carp; 64use Carp;
64 65
65our $VERSION = '0.3'; 66our $VERSION = 0.4;
66 67
67use Scalar::Util (); 68use Scalar::Util ();
68 69
69use AnyEvent; 70use AnyEvent;
70use AnyEvent::Handle; 71use AnyEvent::Handle;
72use AnyEvent::Util ();
73
74our %TOLC; # tolc cache
71 75
72sub touc($) { 76sub touc($) {
73 local $_ = shift; 77 local $_ = shift;
74 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime)(?:_|$))/\U$1/; 78 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
75 s/(?:^|_)(.)/\U$1/g; 79 s/(?:^|_)(.)/\U$1/g;
76 $_ 80 $_
77} 81}
78 82
79sub tolc($) { 83sub tolc($) {
80 local $_ = shift; 84 local $_ = shift;
81 1 while s/(SVK|CHK|URI|FCP|DS|MIME)([^_])/$1\_$2/i; 85 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/;
82 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME)/$1\_$2/i; 86 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/;
83 s/(?<=[a-z])(?=[A-Z])/_/g; 87 s/(?<=[a-z])(?=[A-Z])/_/g;
84 lc 88 lc
85} 89}
86 90
87=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 91=item $fcp = new AnyEvent::FCP key => value...;
88 92
89Create a new FCP connection to the given host and port (default 93Create a new FCP connection to the given host and port (default
90127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). 94127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
91 95
92If no C<name> was specified, then AnyEvent::FCP will generate a 96If no C<name> was specified, then AnyEvent::FCP will generate a
93(hopefully) unique client name for you. 97(hopefully) unique client name for you.
94 98
95You can install a progress callback that is being called with the AnyEvent::FCP 99The following keys can be specified (they are all optional):
96object, the type, a hashref with key-value pairs and a reference to any received data,
97for all unsolicited messages.
98 100
99Example: 101=over 4
100 102
101 sub progress_cb { 103=item name => $string
102 my ($self, $type, $kv, $rdata) = @_;
103 104
104 if ($type eq "simple_progress") { 105A unique name to identify this client. If none is specified, a randomly
105 warn "$kv->{identifier} $kv->{succeeded}/$kv->{required}\n"; 106generated name will be used.
106 } 107
107 } 108=item host => $hostname
109
110The hostname or IP address of the freenet node. Default is C<$ENV{FREDHOST}>
111or C<127.0.0.1>.
112
113=item port => $portnumber
114
115The port number of the FCP port. Default is C<$ENV{FREDPORT}> or C<9481>.
116
117=item timeout => $seconds
118
119The timeout, in seconds, after which a connection error is assumed when
120there is no activity. Default is C<7200>, i.e. two hours.
121
122=item keepalive => $seconds
123
124The interval, in seconds, at which keepalive messages will be
125sent. Default is C<540>, i.e. nine minutes.
126
127These keepalive messages are useful both to detect that a connection is
128no longer working and to keep any (home) routers from expiring their
129masquerading entry.
130
131=item on_error => $callback->($fcp, $message)
132
133Invoked on any (fatal) errors, such as unexpected connection close. The
134callback receives the FCP object and a textual error message.
135
136=item on_failure => $callback->($fcp, $backtrace, $args, $error)
137
138Invoked when an FCP request fails that didn't have a failure callback. See
139L<FCP REQUESTS> for details.
140
141=back
108 142
109=cut 143=cut
110 144
111sub new { 145sub new {
112 my $class = shift; 146 my $class = shift;
147
148 my $rand = join "", map chr 0x21 + rand 94, 1..40; # ~ 262 bits entropy
149
113 my $self = bless { @_ }, $class; 150 my $self = bless {
114 151 host => $ENV{FREDHOST} || "127.0.0.1",
115 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 152 port => $ENV{FREDPORT} || 9481,
116 $self->{port} ||= $ENV{FREDPORT} || 9481; 153 timeout => 3600 * 2,
117 $self->{name} ||= time.rand.rand.rand; # lame 154 keepalive => 9 * 60,
118 $self->{timeout} ||= 3600*2; 155 name => time.rand.rand.rand, # lame
119 $self->{progress} ||= sub { }; 156 @_,
120 157 queue => [],
121 $self->{id} = "a0"; 158 req => {},
159 prefix => "..:aefcpid:$rand:",
160 idseq => "a0",
161 }, $class;
122 162
123 { 163 {
124 Scalar::Util::weaken (my $self = $self); 164 Scalar::Util::weaken (my $self = $self);
165
166 $self->{kw} = AE::timer $self->{keepalive}, $self->{keepalive}, sub {
167 $self->{hdl}->push_write ("\n");
168 };
169
170 our $ENDMESSAGE = qr<\012(EndMessage|Data)\012>;
171
172 # these are declared here for performance reasons
173 my ($k, $v, $type);
174 my $rdata;
175
176 my $on_read = sub {
177 my ($hdl) = @_;
178
179 # we only carve out whole messages here
180 while ($hdl->{rbuf} =~ /\012(EndMessage|Data)\012/) {
181 # remember end marker
182 $rdata = $1 eq "Data"
183 or $1 eq "EndMessage"
184 or return $self->fatal ("protocol error, expected message end, got $1\n");
185
186 my @lines = split /\012/, substr $hdl->{rbuf}, 0, $-[0];
187
188 substr $hdl->{rbuf}, 0, $+[0], ""; # remove pkg
189
190 $type = shift @lines;
191 $type = ($TOLC{$type} ||= tolc $type);
192
193 my %kv;
194
195 for (@lines) {
196 ($k, $v) = split /=/, $_, 2;
197 $k = ($TOLC{$k} ||= tolc $k);
198
199 if ($k =~ /\./) {
200 # generic, slow case
201 my @k = split /\./, $k;
202 my $ro = \\%kv;
203
204 while (@k) {
205 $k = shift @k;
206 if ($k =~ /^\d+$/) {
207 $ro = \$$ro->[$k];
208 } else {
209 $ro = \$$ro->{$k};
210 }
211 }
212
213 $$ro = $v;
214
215 next;
216 }
217
218 # special comon case, for performance only
219 $kv{$k} = $v;
220 }
221
222 if ($rdata) {
223 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
224 $rdata = \$_[1];
225 $self->recv ($type, \%kv, $rdata);
226 });
227
228 last; # do not tgry to parse more messages
229 } else {
230 $self->recv ($type, \%kv);
231 }
232 }
233 };
125 234
126 $self->{hdl} = new AnyEvent::Handle 235 $self->{hdl} = new AnyEvent::Handle
127 connect => [$self->{host} => $self->{port}], 236 connect => [$self->{host} => $self->{port}],
128 timeout => $self->{timeout}, 237 timeout => $self->{timeout},
238 on_read => $on_read,
239 on_eof => $self->{on_eof},
129 on_error => sub { 240 on_error => sub {
130 warn "@_\n";#d# 241 $self->fatal ($_[2]);
131 exit 1;
132 }, 242 },
133 on_read => sub { $self->on_read (@_) }, 243 ;
134 on_eof => $self->{on_eof} || sub { };
135 244
136 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 245 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
137 } 246 }
138 247
139 $self->send_msg ( 248 $self->send_msg (client_hello =>
140 client_hello =>
141 name => $self->{name}, 249 name => $self->{name},
142 expected_version => "2.0", 250 expected_version => "2.0",
143 ); 251 );
144 252
145 $self 253 $self
146} 254}
147 255
256sub fatal {
257 my ($self, $msg) = @_;
258
259 $self->{hdl}->shutdown;
260 delete $self->{kw};
261
262 if ($self->{on_error}) {
263 $self->{on_error}->($self, $msg);
264 } else {
265 die $msg;
266 }
267}
268
269sub identifier {
270 $_[0]{prefix} . ++$_[0]{idseq}
271}
272
148sub send_msg { 273sub send_msg {
149 my ($self, $type, %kv) = @_; 274 my ($self, $type, %kv) = @_;
150 275
151 my $data = delete $kv{data}; 276 my $data = delete $kv{data};
152 277
153 if (exists $kv{id_cb}) { 278 if (exists $kv{id_cb}) {
154 my $id = $kv{identifier} || ++$self->{id}; 279 my $id = $kv{identifier} ||= $self->identifier;
155 $self->{id}{$id} = delete $kv{id_cb}; 280 $self->{id}{$id} = delete $kv{id_cb};
156 $kv{identifier} = $id;
157 } 281 }
158 282
159 my $msg = (touc $type) . "\012" 283 my $msg = (touc $type) . "\012"
160 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; 284 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
161 285
173 } 297 }
174 298
175 $self->{hdl}->push_write ($msg); 299 $self->{hdl}->push_write ($msg);
176} 300}
177 301
178sub on_read { 302sub on {
179 my ($self) = @_; 303 my ($self, $cb) = @_;
180 304
181 my $type; 305 # cb return undef - message eaten, remove cb
182 my %kv; 306 # cb return 0 - message eaten
183 my $rdata; 307 # cb return 1 - pass to next
184 308
185 my $done_cb = sub { 309 push @{ $self->{on} }, $cb;
186 $kv{pkt_type} = $type; 310}
187 311
188 if (my $cb = $self->{queue}[0]) { 312sub _push_queue {
189 $cb->($self, $type, \%kv, $rdata) 313 my ($self, $queue) = @_;
190 and shift @{ $self->{queue} }; 314
191 } else { 315 shift @$queue;
192 $self->default_recv ($type, \%kv, $rdata); 316 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
317 if @$queue;
318}
319
320# lock so only one $type (arbitrary string) is in flight,
321# to work around horribly misdesigned protocol.
322sub serialise {
323 my ($self, $type, $cb) = @_;
324
325 my $queue = $self->{serialise}{$type} ||= [];
326 push @$queue, $cb;
327 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
328 unless $#$queue;
329}
330
331# how to merge these types into $self->{persistent}
332our %PERSISTENT_TYPE = (
333 persistent_get => sub { %{ $_[1] } = (type => "persistent_get" , %{ $_[2] }) },
334 persistent_put => sub { %{ $_[1] } = (type => "persistent_put" , %{ $_[2] }) },
335 persistent_put_dir => sub { %{ $_[1] } = (type => "persistent_put_dir", %{ $_[2] }) },
336 persistent_request_modified => sub { %{ $_[1] } = (%{ $_[1] }, %{ $_[2] }) },
337 persistent_request_removed => sub { delete $_[0]{req}{$_[2]{identifier}} },
338
339 simple_progress => sub { $_[1]{simple_progress} = $_[2] }, # get/put
340
341 uri_generated => sub { $_[1]{uri_generated} = $_[2] }, # put
342 generated_metadata => sub { $_[1]{generated_metadata} = $_[2] }, # put
343 started_compression => sub { $_[1]{started_compression} = $_[2] }, # put
344 finished_compression => sub { $_[1]{finished_compression} = $_[2] }, # put
345 put_fetchable => sub { $_[1]{put_fetchable} = $_[2] }, # put
346 put_failed => sub { $_[1]{put_failed} = $_[2] }, # put
347 put_successful => sub { $_[1]{put_successful} = $_[2] }, # put
348
349 sending_to_network => sub { $_[1]{sending_to_network} = $_[2] }, # get
350 compatibility_mode => sub { $_[1]{compatibility_mode} = $_[2] }, # get
351 expected_hashes => sub { $_[1]{expected_hashes} = $_[2] }, # get
352 expected_mime => sub { $_[1]{expected_mime} = $_[2] }, # get
353 expected_data_length => sub { $_[1]{expected_data_length} = $_[2] }, # get
354 get_failed => sub { $_[1]{get_failed} = $_[2] }, # get
355 data_found => sub { $_[1]{data_found} = $_[2] }, # get
356 enter_finite_cooldown => sub { $_[1]{enter_finite_cooldown} = $_[2] }, # get
357);
358
359sub recv {
360 my ($self, $type, $kv, @extra) = @_;
361
362 if (my $cb = $PERSISTENT_TYPE{$type}) {
363 my $id = $kv->{identifier};
364 my $req = $_[0]{req}{$id} ||= {};
365 $cb->($self, $req, $kv);
366 $self->recv (request_changed => $kv, $type, @extra);
367 }
368
369 my $on = $self->{on};
370 for (0 .. $#$on) {
371 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
372 splice @$on, $_, 1 unless defined $res;
373 return;
193 } 374 }
194 }; 375 }
195 376
196 my $hdr_cb; $hdr_cb = sub { 377 if (my $cb = $self->{queue}[0]) {
197 if ($_[1] =~ /^([^=]+)=(.*)$/) { 378 $cb->($self, $type, $kv, @extra)
198 my ($k, $v) = ($1, $2); 379 and shift @{ $self->{queue} };
199 my @k = split /\./, tolc $k;
200 my $ro = \\%kv;
201
202 while (@k) {
203 my $k = shift @k;
204 if ($k =~ /^\d+$/) {
205 $ro = \$$ro->[$k];
206 } else {
207 $ro = \$$ro->{$k};
208 }
209 }
210
211 $$ro = $v;
212
213 $_[0]->push_read (line => $hdr_cb);
214 } elsif ($_[1] eq "Data") {
215 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
216 $rdata = \$_[1];
217 $done_cb->();
218 });
219 } elsif ($_[1] eq "EndMessage") {
220 $done_cb->();
221 } else { 380 } else {
222 die "protocol error, expected message end, got $_[1]\n";#d# 381 $self->default_recv ($type, $kv, @extra);
223 }
224 }; 382 }
225
226 $self->{hdl}->push_read (line => sub {
227 $type = tolc $_[1];
228 $_[0]->push_read (line => $hdr_cb);
229 });
230} 383}
231 384
232sub default_recv { 385sub default_recv {
233 my ($self, $type, $kv, $rdata) = @_; 386 my ($self, $type, $kv, $rdata) = @_;
234 387
235 if ($type eq "node_hello") { 388 if ($type eq "node_hello") {
236 $self->{node_hello} = $kv; 389 $self->{node_hello} = $kv;
237 } elsif (exists $self->{id}{$kv->{identifier}}) { 390 } elsif (exists $self->{id}{$kv->{identifier}}) {
238 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) 391 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
239 and delete $self->{id}{$kv->{identifier}}; 392 and delete $self->{id}{$kv->{identifier}};
240 } else {
241 &{ $self->{progress} };
242 } 393 }
243} 394}
395
396=back
397
398=head2 FCP REQUESTS
399
400The following methods implement various requests. Most of them map
401directory to the FCP message of the same name. The added benefit of
402these over sending requests yourself is that they handle the necessary
403serialisation, protocol quirks, and replies.
404
405All of them exist in two versions, the variant shown in this manpage, and
406a variant with an extra C<_> at the end, and an extra C<$cb> argument. The
407version as shown is I<synchronous> - it will wait for any replies, and
408either return the reply, or croak with an error. The underscore variant
409returns immediately and invokes one or more callbacks or condvars later.
410
411For example, the call
412
413 $info = $fcp->get_plugin_info ($name, $detailed);
414
415Also comes in this underscore variant:
416
417 $fcp->get_plugin_info_ ($name, $detailed, $cb);
418
419You can thinbk of the underscore as a kind of continuation indicator - the
420normal function waits and returns with the data, the C<_> indicates that
421you pass the continuation yourself, and the continuation will be invoked
422with the results.
423
424This callback/continuation argument (C<$cb>) can come in three forms itself:
425
426=over 4
427
428=item A code reference (or rather anything not matching some other alternative)
429
430This code reference will be invoked with the result on success. On an
431error, it will invoke the C<on_failure> callback of the FCP object, or,
432if none was defined, will die (in the event loop) with a backtrace of the
433call site.
434
435This is a popular choice, but it makes handling errors hard - make sure
436you never generate protocol errors!
437
438If an C<on_failure> hook exists, it will be invoked with the FCP object,
439a (textual) backtrace as generated by C<Carp::longmess>, and arrayref
440containing the arguments from the original request invocation and the
441error object from the server, in this order, e.g.:
442
443 on_failure => sub {
444 my ($fcp, $backtrace, $orig_args, $error_object) = @_;
445 ...
446 },
447
448=item A condvar (as returned by e.g. C<< AnyEvent->condvar >>)
449
450When a condvar is passed, it is sent (C<< $cv->send ($results) >>) the
451results when the request has finished. Should an error occur, the error
452will instead result in C<< $cv->croak ($error) >>.
453
454This is also a popular choice.
455
456=item An array with two callbacks C<[$success, $failure]>
457
458The C<$success> callback will be invoked with the results, while the
459C<$failure> callback will be invoked on any errors.
460
461The C<$failure> callback will be invoked with the error object from the
462server.
463
464=item C<undef>
465
466This is the same thing as specifying C<sub { }> as callback, i.e. on
467success, the results are ignored, while on failure, the C<on_failure> hook
468is invoked or the module dies with a backtrace.
469
470This is good for quick scripts, or when you really aren't interested in
471the results.
472
473=back
474
475=cut
476
477our $NOP_CB = sub { };
244 478
245sub _txn { 479sub _txn {
246 my ($name, $sub) = @_; 480 my ($name, $sub) = @_;
247 481
248 *{$name} = sub { 482 *{$name} = sub {
249 splice @_, 1, 0, (my $cv = AnyEvent->condvar); 483 my $cv = AE::cv;
250 &$sub;
251 $cv
252 };
253 484
254 *{"$name\_sync"} = sub { 485 splice @_, 1, 0, $cv, sub { $cv->croak ($_[0]{extra_description}) };
255 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
256 &$sub; 486 &$sub;
257 $cv->recv 487 $cv->recv
258 }; 488 };
259}
260 489
261=item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]]) 490 *{"$name\_"} = sub {
491 my ($ok, $err) = pop;
262 492
493 if (ARRAY:: eq ref $ok) {
494 ($ok, $err) = @$ok;
495 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
496 $err = sub { $ok->croak ($_[0]{extra_description}) };
497 } else {
498 my $bt = Carp::longmess "AnyEvent::FCP request $name";
499 Scalar::Util::weaken (my $self = $_[0]);
500 my $args = [@_]; shift @$args;
501 $err = sub {
502 if ($self->{on_failure}) {
503 $self->{on_failure}($self, $args, $bt, $_[0]);
504 } else {
505 die "$_[0]{code_description} ($_[0]{extra_description})$bt";
506 }
507 };
508 }
509
510 $ok ||= $NOP_CB;
511
512 splice @_, 1, 0, $ok, $err;
513 &$sub;
514 };
515}
516
517=over 4
518
263=item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]]) 519=item $peers = $fcp->list_peers ([$with_metdata[, $with_volatile]])
264 520
265=cut 521=cut
266 522
267_txn list_peers => sub { 523_txn list_peers => sub {
268 my ($self, $cv, $with_metadata, $with_volatile) = @_; 524 my ($self, $ok, undef, $with_metadata, $with_volatile) = @_;
269 525
270 my @res; 526 my @res;
271 527
272 $self->send_msg (list_peers => 528 $self->send_msg (list_peers =>
273 with_metadata => $with_metadata ? "true" : "false", 529 with_metadata => $with_metadata ? "true" : "false",
274 with_volatile => $with_volatile ? "true" : "false", 530 with_volatile => $with_volatile ? "true" : "false",
275 id_cb => sub { 531 id_cb => sub {
276 my ($self, $type, $kv, $rdata) = @_; 532 my ($self, $type, $kv, $rdata) = @_;
277 533
278 if ($type eq "end_list_peers") { 534 if ($type eq "end_list_peers") {
279 $cv->(\@res); 535 $ok->(\@res);
280 1 536 1
281 } else { 537 } else {
282 push @res, $kv; 538 push @res, $kv;
283 0 539 0
284 } 540 }
285 }, 541 },
286 ); 542 );
287}; 543};
288 544
289=item $cv = $fcp->list_peer_notes ($node_identifier)
290
291=item $notes = $fcp->list_peer_notes_sync ($node_identifier) 545=item $notes = $fcp->list_peer_notes ($node_identifier)
292 546
293=cut 547=cut
294 548
295_txn list_peer_notes => sub { 549_txn list_peer_notes => sub {
296 my ($self, $cv, $node_identifier) = @_; 550 my ($self, $ok, undef, $node_identifier) = @_;
297 551
298 $self->send_msg (list_peer_notes => 552 $self->send_msg (list_peer_notes =>
299 node_identifier => $node_identifier, 553 node_identifier => $node_identifier,
300 id_cb => sub { 554 id_cb => sub {
301 my ($self, $type, $kv, $rdata) = @_; 555 my ($self, $type, $kv, $rdata) = @_;
302 556
303 $cv->($kv); 557 $ok->($kv);
304 1 558 1
305 }, 559 },
306 ); 560 );
307}; 561};
308 562
309=item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
310
311=item $fcp->watch_global_sync ($enabled[, $verbosity_mask]) 563=item $fcp->watch_global ($enabled[, $verbosity_mask])
312 564
313=cut 565=cut
314 566
315_txn watch_global => sub { 567_txn watch_global => sub {
316 my ($self, $cv, $enabled, $verbosity_mask) = @_; 568 my ($self, $ok, $err, $enabled, $verbosity_mask) = @_;
317 569
318 $self->send_msg (watch_global => 570 $self->send_msg (watch_global =>
319 enabled => $enabled ? "true" : "false", 571 enabled => $enabled ? "true" : "false",
320 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (), 572 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
321 ); 573 );
322 574
323 $cv->(); 575 $ok->();
324}; 576};
325 577
326=item $cv = $fcp->list_persistent_requests
327
328=item $reqs = $fcp->list_persistent_requests_sync 578=item $reqs = $fcp->list_persistent_requests
329 579
330=cut 580=cut
331 581
332_txn list_persistent_requests => sub { 582_txn list_persistent_requests => sub {
333 my ($self, $cv) = @_; 583 my ($self, $ok, $err) = @_;
334 584
585 $self->serialise (list_persistent_requests => sub {
586 my ($self, $guard) = @_;
587
335 my %res; 588 my @res;
336 589
337 $self->send_msg ("list_persistent_requests"); 590 $self->send_msg ("list_persistent_requests");
338 591
339 push @{ $self->{queue} }, sub { 592 $self->on (sub {
340 my ($self, $type, $kv, $rdata) = @_; 593 my ($self, $type, $kv, $rdata) = @_;
341 594
595 $guard if 0;
596
342 if ($type eq "end_list_persistent_requests") { 597 if ($type eq "end_list_persistent_requests") {
343 $cv->(\%res); 598 $ok->(\@res);
599 return;
600 } else {
601 my $id = $kv->{identifier};
602
603 if ($type =~ /^persistent_(get|put|put_dir)$/) {
604 push @res, [$type, $kv];
605 }
606 }
607
344 1 608 1
345 } else { 609 });
346 my $id = $kv->{identifier}; 610 });
611};
347 612
348 if ($type =~ /^persistent_(get|put|put_dir)$/) { 613=item $sync = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
349 $res{$id} = { 614
350 type => $1, 615Update either the C<client_token> or C<priority_class> of a request
351 %{ $res{$id} }, 616identified by C<$global> and C<$identifier>, depending on which of
617C<$client_token> and C<$priority_class> are not C<undef>.
618
619=cut
620
621_txn modify_persistent_request => sub {
622 my ($self, $ok, $err, $global, $identifier, $client_token, $priority_class) = @_;
623
624 $self->serialise ($identifier => sub {
625 my ($self, $guard) = @_;
626
627 $self->send_msg (modify_persistent_request =>
628 global => $global ? "true" : "false",
629 identifier => $identifier,
630 defined $client_token ? (client_token => $client_token ) : (),
631 defined $priority_class ? (priority_class => $priority_class) : (),
632 );
633
634 $self->on (sub {
635 my ($self, $type, $kv, @extra) = @_;
636
637 $guard if 0;
638
639 if ($kv->{identifier} eq $identifier) {
640 if ($type eq "persistent_request_modified") {
352 %$kv, 641 $ok->($kv);
642 return;
643 } elsif ($type eq "protocol_error") {
644 $err->($kv);
645 return;
353 }; 646 }
354 } elsif ($type eq "simple_progress") {
355 delete $kv->{pkt_type}; # save memory
356 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
357 } else {
358 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
359 } 647 }
648
360 0 649 1
361 } 650 });
362 }; 651 });
363}; 652};
364 653
365=item $cv = $fcp->remove_request ($global, $identifier)
366
367=item $status = $fcp->remove_request_sync ($global, $identifier)
368
369=cut
370
371_txn remove_request => sub {
372 my ($self, $cv, $global, $identifier) = @_;
373
374 $self->send_msg (remove_request =>
375 global => $global ? "true" : "false",
376 identifier => $identifier,
377 id_cb => sub {
378 my ($self, $type, $kv, $rdata) = @_;
379
380 $cv->($kv);
381 1
382 },
383 );
384};
385
386=item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
387
388=item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
389
390=cut
391
392_txn modify_persistent_request => sub {
393 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
394
395 $self->send_msg (modify_persistent_request =>
396 global => $global ? "true" : "false",
397 defined $client_token ? (client_token => $client_token ) : (),
398 defined $priority_class ? (priority_class => $priority_class) : (),
399 identifier => $identifier,
400 id_cb => sub {
401 my ($self, $type, $kv, $rdata) = @_;
402
403 $cv->($kv);
404 1
405 },
406 );
407};
408
409=item $cv = $fcp->get_plugin_info ($name, $detailed)
410
411=item $info = $fcp->get_plugin_info_sync ($name, $detailed) 654=item $info = $fcp->get_plugin_info ($name, $detailed)
412 655
413=cut 656=cut
414 657
415_txn get_plugin_info => sub { 658_txn get_plugin_info => sub {
416 my ($self, $cv, $name, $detailed) = @_; 659 my ($self, $ok, $err, $name, $detailed) = @_;
660
661 my $id = $self->identifier;
417 662
418 $self->send_msg (get_plugin_info => 663 $self->send_msg (get_plugin_info =>
664 identifier => $id,
419 plugin_name => $name, 665 plugin_name => $name,
420 detailed => $detailed ? "true" : "false", 666 detailed => $detailed ? "true" : "false",
421 id_cb => sub {
422 my ($self, $type, $kv, $rdata) = @_;
423
424 $cv->($kv);
425 1
426 },
427 ); 667 );
668 $self->on (sub {
669 my ($self, $type, $kv) = @_;
670
671 if ($kv->{identifier} eq $id) {
672 if ($type eq "get_plugin_info") {
673 $ok->($kv);
674 } else {
675 $err->($kv, $type);
676 }
677 return;
678 }
679
680 1
681 });
428}; 682};
429 683
430=item $cv = $fcp->client_get ($uri, $identifier, %kv)
431
432=item $status = $fcp->client_get_sync ($uri, $identifier, %kv) 684=item $status = $fcp->client_get ($uri, $identifier, %kv)
433 685
434%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>). 686%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
435 687
436ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries, 688ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
437priority_class, persistence, client_token, global, return_type, 689priority_class, persistence, client_token, global, return_type,
438binary_blob, allowed_mime_types, filename, temp_filename 690binary_blob, allowed_mime_types, filename, temp_filename
439 691
440=cut 692=cut
441 693
442_txn client_get => sub { 694_txn client_get => sub {
443 my ($self, $cv, $uri, $identifier, %kv) = @_; 695 my ($self, $ok, $err, $uri, $identifier, %kv) = @_;
444 696
697 $self->serialise ($identifier => sub {
698 my ($self, $guard) = @_;
699
445 $self->send_msg (client_get => 700 $self->send_msg (client_get =>
446 %kv, 701 %kv,
447 uri => $uri, 702 uri => $uri,
448 identifier => $identifier, 703 identifier => $identifier,
449 id_cb => sub { 704 );
705
706 $self->on (sub {
450 my ($self, $type, $kv, $rdata) = @_; 707 my ($self, $type, $kv, @extra) = @_;
451 708
709 $guard if 0;
710
711 if ($kv->{identifier} eq $identifier) {
712 if ($type eq "persistent_get") {
452 $cv->($kv); 713 $ok->($kv);
714 return;
715 } elsif ($type eq "protocol_error") {
716 $err->($kv);
717 return;
718 }
719 }
720
453 1 721 1
722 });
723 });
724};
725
726=item $status = $fcp->remove_request ($identifier[, $global])
727
728Remove the request with the given isdentifier. Returns true if successful,
729false on error.
730
731=cut
732
733_txn remove_request => sub {
734 my ($self, $ok, $err, $identifier, $global) = @_;
735
736 $self->serialise ($identifier => sub {
737 my ($self, $guard) = @_;
738
739 $self->send_msg (remove_request =>
740 identifier => $identifier,
741 global => $global ? "true" : "false",
742 );
743 $self->on (sub {
744 my ($self, $type, $kv, @extra) = @_;
745
746 $guard if 0;
747
748 if ($kv->{identifier} eq $identifier) {
749 if ($type eq "persistent_request_removed") {
750 $ok->(1);
751 return;
752 } elsif ($type eq "protocol_error") {
753 $err->($kv);
754 return;
755 }
756 }
757
758 1
759 });
760 });
761};
762
763=item ($can_read, $can_write) = $fcp->test_dda ($local_directory, $remote_directory, $want_read, $want_write))
764
765The DDA test in FCP is probably the single most broken protocol - only
766one directory test can be outstanding at any time, and some guessing and
767heuristics are involved in mangling the paths.
768
769This function combines C<TestDDARequest> and C<TestDDAResponse> in one
770request, handling file reading and writing as well, and tries very hard to
771do the right thing.
772
773Both C<$local_directory> and C<$remote_directory> must specify the same
774directory - C<$local_directory> is the directory path on the client (where
775L<AnyEvent::FCP> runs) and C<$remote_directory> is the directory path on
776the server (where the freenet node runs). When both are running on the
777same node, the paths are generally identical.
778
779C<$want_read> and C<$want_write> should be set to a true value when you
780want to read (get) files or write (put) files, respectively.
781
782On error, an exception is thrown. Otherwise, C<$can_read> and
783C<$can_write> indicate whether you can reaqd or write to freenet via the
784directory.
785
786=cut
787
788_txn test_dda => sub {
789 my ($self, $ok, $err, $local, $remote, $want_read, $want_write) = @_;
790
791 $self->serialise (test_dda => sub {
792 my ($self, $guard) = @_;
793
794 $self->send_msg (test_dda_request =>
795 directory => $remote,
796 want_read_directory => $want_read ? "true" : "false",
797 want_write_directory => $want_write ? "true" : "false",
798 );
799 $self->on (sub {
800 my ($self, $type, $kv) = @_;
801
802 if ($type eq "test_dda_reply") {
803 # the filenames are all relative to the server-side directory,
804 # which might or might not match $remote anymore, so we
805 # need to rewrite the paths to be relative to $local
806 for my $k (qw(read_filename write_filename)) {
807 my $f = $kv->{$k};
808 for my $dir ($kv->{directory}, $remote) {
809 if ($dir eq substr $f, 0, length $dir) {
810 substr $f, 0, 1 + length $dir, "";
811 $kv->{$k} = $f;
812 last;
813 }
814 }
815 }
816
817 my %response = (directory => $remote);
818
819 if (length $kv->{read_filename}) {
820 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
821 sysread $fh, my $buf, -s $fh;
822 $response{read_content} = $buf;
823 }
824 }
825
826 if (length $kv->{write_filename}) {
827 if (open my $fh, ">:raw", "$local/$kv->{write_filename}") {
828 syswrite $fh, $kv->{content_to_write};
829 }
830 }
831
832 $self->send_msg (test_dda_response => %response);
833
834 $self->on (sub {
835 my ($self, $type, $kv) = @_;
836
837 $guard if 0; # reference
838
839 if ($type eq "test_dda_complete") {
840 $ok->(
841 $kv->{read_directory_allowed} eq "true",
842 $kv->{write_directory_allowed} eq "true",
843 );
844 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
845 $err->($kv->{extra_description});
846 return;
847 }
848
849 1
850 });
851
852 return;
853 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
854 $err->($kv);
855 return;
856 }
857
858 1
859 });
860 });
861};
862
863=back
864
865=head2 REQUEST CACHE
866
867The C<AnyEvent::FCP> class keeps a request cache, where it caches all
868information from requests.
869
870For these messages, it will store a copy of the key-value pairs, together with a C<type> slot,
871in C<< $fcp->{req}{$identifier} >>:
872
873 persistent_get
874 persistent_put
875 persistent_put_dir
876
877This message updates the stored data:
878
879 persistent_request_modified
880
881This message will remove this entry:
882
883 persistent_request_removed
884
885These messages get merged into the cache entry, under their
886type, i.e. a C<simple_progress> message will be stored in C<<
887$fcp->{req}{$identifier}{simple_progress} >>:
888
889 simple_progress # get/put
890
891 uri_generated # put
892 generated_metadata # put
893 started_compression # put
894 finished_compression # put
895 put_failed # put
896 put_fetchable # put
897 put_successful # put
898
899 sending_to_network # get
900 compatibility_mode # get
901 expected_hashes # get
902 expected_mime # get
903 expected_data_length # get
904 get_failed # get
905 data_found # get
906 enter_finite_cooldown # get
907
908In addition, an event (basically a fake message) of type C<request_changed> is generated
909on every change, which will be called as C<< $cb->($fcp, $kv, $type) >>, where C<$type>
910is the type of the original message triggering the change,
911
912To fill this cache with the global queue and keep it updated,
913call C<watch_global> to subscribe to updates, followed by
914C<list_persistent_requests_sync>.
915
916 $fcp->watch_global_sync_; # do not wait
917 $fcp->list_persistent_requests; # wait
918
919To get a better idea of what is stored in the cache, here is an example of
920what might be stored in C<< $fcp->{req}{"Frost-gpl.txt"} >>:
921
922 {
923 identifier => "Frost-gpl.txt",
924 uri => 'CHK@Fnx5kzdrfE,EImdzaVyEWl,AAIC--8/gpl.txt',
925 binary_blob => "false",
926 global => "true",
927 max_retries => -1,
928 max_size => 9223372036854775807,
929 persistence => "forever",
930 priority_class => 3,
931 real_time => "false",
932 return_type => "direct",
933 started => "true",
934 type => "persistent_get",
935 verbosity => 2147483647,
936 sending_to_network => {
937 identifier => "Frost-gpl.txt",
938 global => "true",
454 }, 939 },
455 ); 940 compatibility_mode => {
456}; 941 identifier => "Frost-gpl.txt",
457 942 definitive => "true",
458=back 943 dont_compress => "false",
944 global => "true",
945 max => "COMPAT_1255",
946 min => "COMPAT_1255",
947 },
948 expected_hashes => {
949 identifier => "Frost-gpl.txt",
950 global => "true",
951 hashes => {
952 ed2k => "d83596f5ee3b7...",
953 md5 => "e0894e4a2a6...",
954 sha1 => "...",
955 sha256 => "...",
956 sha512 => "...",
957 tth => "...",
958 },
959 },
960 expected_mime => {
961 identifier => "Frost-gpl.txt",
962 global => "true",
963 metadata => { content_type => "application/rar" },
964 },
965 expected_data_length => {
966 identifier => "Frost-gpl.txt",
967 data_length => 37576,
968 global => "true",
969 },
970 simple_progress => {
971 identifier => "Frost-gpl.txt",
972 failed => 0,
973 fatally_failed => 0,
974 finalized_total => "true",
975 global => "true",
976 last_progress => 1438639282628,
977 required => 372,
978 succeeded => 102,
979 total => 747,
980 },
981 data_found => {
982 identifier => "Frost-gpl.txt",
983 completion_time => 1438663354026,
984 data_length => 37576,
985 global => "true",
986 metadata => { content_type => "image/jpeg" },
987 startup_time => 1438657196167,
988 },
989 }
459 990
460=head1 EXAMPLE PROGRAM 991=head1 EXAMPLE PROGRAM
461 992
462 use AnyEvent::FCP; 993 use AnyEvent::FCP;
463 994
464 my $fcp = new AnyEvent::FCP; 995 my $fcp = new AnyEvent::FCP;
465 996
466 # let us look at the global request list 997 # let us look at the global request list
467 $fcp->watch_global (1, 0); 998 $fcp->watch_global_ (1);
468 999
469 # list them, synchronously 1000 # list them, synchronously
470 my $req = $fcp->list_persistent_requests_sync; 1001 my $req = $fcp->list_persistent_requests;
471 1002
472 # go through all requests 1003 # go through all requests
1004TODO
473 for my $req (values %$req) { 1005 for my $req (values %$req) {
474 # skip jobs not directly-to-disk 1006 # skip jobs not directly-to-disk
475 next unless $req->{return_type} eq "disk"; 1007 next unless $req->{return_type} eq "disk";
476 # skip jobs not issued by FProxy 1008 # skip jobs not issued by FProxy
477 next unless $req->{identifier} =~ /^FProxy:/; 1009 next unless $req->{identifier} =~ /^FProxy:/;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines