ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.3 by root, Tue Jul 28 02:20:51 2009 UTC vs.
Revision 1.20 by root, Sun Jun 12 01:32:37 2016 UTC

6 6
7 use AnyEvent::FCP; 7 use AnyEvent::FCP;
8 8
9 my $fcp = new AnyEvent::FCP; 9 my $fcp = new AnyEvent::FCP;
10 10
11# transactions return condvars 11 # transactions return condvars
12 my $lp_cv = $fcp->list_peers; 12 my $lp_cv = $fcp->list_peers;
13 my $pr_cv = $fcp->list_persistent_requests; 13 my $pr_cv = $fcp->list_persistent_requests;
14 14
15 my $peers = $lp_cv->recv; 15 my $peers = $lp_cv->recv;
16 my $reqs = $pr_cv->recv; 16 my $reqs = $pr_cv->recv;
26The module uses L<AnyEvent> to find a suitable event module. 26The module uses L<AnyEvent> to find a suitable event module.
27 27
28Only very little is implemented, ask if you need more, and look at the 28Only very little is implemented, ask if you need more, and look at the
29example program later in this section. 29example program later in this section.
30 30
31=head2 EXAMPLE
32
33This example fetches the download list and sets the priority of all files
34with "a" in their name to "emergency":
35
36 use AnyEvent::FCP;
37
38 my $fcp = new AnyEvent::FCP;
39
40 $fcp->watch_global (1, 0);
41 my $req = $fcp->list_persistent_requests;
42
43TODO
44 for my $req (values %$req) {
45 if ($req->{filename} =~ /a/) {
46 $fcp->modify_persistent_request (1, $req->{identifier}, undef, 0);
47 }
48 }
49
31=head2 IMPORT TAGS 50=head2 IMPORT TAGS
32 51
33Nothing much can be "imported" from this module right now. 52Nothing much can be "imported" from this module right now.
34 53
35=head2 THE AnyEvent::FCP CLASS 54=head1 THE AnyEvent::FCP CLASS
36 55
37=over 4 56=over 4
38 57
39=cut 58=cut
40 59
42 61
43use common::sense; 62use common::sense;
44 63
45use Carp; 64use Carp;
46 65
47our $VERSION = '0.2'; 66our $VERSION = 0.4;
48 67
49use Scalar::Util (); 68use Scalar::Util ();
50 69
51use AnyEvent; 70use AnyEvent;
52use AnyEvent::Handle; 71use AnyEvent::Handle;
72use AnyEvent::Util ();
73
74our %TOLC; # tolc cache
53 75
54sub touc($) { 76sub touc($) {
55 local $_ = shift; 77 local $_ = shift;
56 1 while s/((?:^|_)(?:svk|chk|uri|fcp)(?:_|$))/\U$1/; 78 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
57 s/(?:^|_)(.)/\U$1/g; 79 s/(?:^|_)(.)/\U$1/g;
58 $_ 80 $_
59} 81}
60 82
61sub tolc($) { 83sub tolc($) {
62 local $_ = shift; 84 local $_ = shift;
63 1 while s/(SVK|CHK|URI|FCP)([^_])/$1\_$2/i; 85 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/;
64 1 while s/([^_])(SVK|CHK|URI|FCP)/$1\_$2/i; 86 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/;
65 s/(?<=[a-z])(?=[A-Z])/_/g; 87 s/(?<=[a-z])(?=[A-Z])/_/g;
66 lc 88 lc
67} 89}
68 90
69=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 91=item $fcp = new AnyEvent::FCP key => value...;
70 92
71Create a new FCP connection to the given host and port (default 93Create a new FCP connection to the given host and port (default
72127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). 94127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
73 95
74If no C<name> was specified, then AnyEvent::FCP will generate a 96If no C<name> was specified, then AnyEvent::FCP will generate a
75(hopefully) unique client name for you. 97(hopefully) unique client name for you.
76 98
77=cut 99The following keys can be specified (they are all optional):
78 100
79#TODO 101=over 4
80#You can install a progress callback that is being called with the AnyEvent::FCP 102
81#object, a txn object, the type of the transaction and the attributes. Use 103=item name => $string
82#it like this: 104
83# 105A unique name to identify this client. If none is specified, a randomly
84# sub progress_cb { 106generated name will be used.
85# my ($self, $txn, $type, $attr) = @_; 107
86# 108=item host => $hostname
87# warn "progress<$txn,$type," . (join ":", %$attr) . ">\n"; 109
88# } 110The hostname or IP address of the freenet node. Default is C<$ENV{FREDHOST}>
111or C<127.0.0.1>.
112
113=item port => $portnumber
114
115The port number of the FCP port. Default is C<$ENV{FREDPORT}> or C<9481>.
116
117=item timeout => $seconds
118
119The timeout, in seconds, after which a connection error is assumed when
120there is no activity. Default is C<7200>, i.e. two hours.
121
122=item keepalive => $seconds
123
124The interval, in seconds, at which keepalive messages will be
125sent. Default is C<540>, i.e. nine minutes.
126
127These keepalive messages are useful both to detect that a connection is
128no longer working and to keep any (home) routers from expiring their
129masquerading entry.
130
131=item on_error => $callback->($fcp, $message)
132
133Invoked on any (fatal) errors, such as unexpected connection close. The
134callback receives the FCP object and a textual error message.
135
136=item on_failure => $callback->($fcp, $backtrace, $args, $error)
137
138Invoked when an FCP request fails that didn't have a failure callback. See
139L<FCP REQUESTS> for details.
140
141=back
142
143=cut
89 144
90sub new { 145sub new {
91 my $class = shift; 146 my $class = shift;
147
148 my $rand = join "", map chr 0x21 + rand 94, 1..40; # ~ 262 bits entropy
149
92 my $self = bless { @_ }, $class; 150 my $self = bless {
93 151 host => $ENV{FREDHOST} || "127.0.0.1",
94 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 152 port => $ENV{FREDPORT} || 9481,
95 $self->{port} ||= $ENV{FREDPORT} || 9481; 153 timeout => 3600 * 2,
96 $self->{name} ||= time.rand.rand.rand; # lame 154 keepalive => 9 * 60,
97 $self->{timeout} ||= 600; 155 name => time.rand.rand.rand, # lame
98 156 @_,
99 $self->{id} = "a0"; 157 queue => [],
158 req => {},
159 prefix => "..:aefcpid:$rand:",
160 idseq => "a0",
161 }, $class;
100 162
101 { 163 {
102 Scalar::Util::weaken (my $self = $self); 164 Scalar::Util::weaken (my $self = $self);
165
166 $self->{kw} = AE::timer $self->{keepalive}, $self->{keepalive}, sub {
167 $self->{hdl}->push_write ("\n");
168 };
169
170 our $ENDMESSAGE = qr<\012(EndMessage|Data)\012>;
171
172 # these are declared here for performance reasons
173 my ($k, $v, $type);
174 my $rdata;
175
176 my $on_read = sub {
177 my ($hdl) = @_;
178
179 # we only carve out whole messages here
180 while ($hdl->{rbuf} =~ /\012(EndMessage|Data)\012/) {
181 # remember end marker
182 $rdata = $1 eq "Data"
183 or $1 eq "EndMessage"
184 or return $self->fatal ("protocol error, expected message end, got $1\n");
185
186 my @lines = split /\012/, substr $hdl->{rbuf}, 0, $-[0];
187
188 substr $hdl->{rbuf}, 0, $+[0], ""; # remove pkg
189
190 $type = shift @lines;
191 $type = ($TOLC{$type} ||= tolc $type);
192
193 my %kv;
194
195 for (@lines) {
196 ($k, $v) = split /=/, $_, 2;
197 $k = ($TOLC{$k} ||= tolc $k);
198
199 if ($k =~ /\./) {
200 # generic, slow case
201 my @k = split /\./, $k;
202 my $ro = \\%kv;
203
204 while (@k) {
205 $k = shift @k;
206 if ($k =~ /^\d+$/) {
207 $ro = \$$ro->[$k];
208 } else {
209 $ro = \$$ro->{$k};
210 }
211 }
212
213 $$ro = $v;
214
215 next;
216 }
217
218 # special comon case, for performance only
219 $kv{$k} = $v;
220 }
221
222 if ($rdata) {
223 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
224 $rdata = \$_[1];
225 $self->recv ($type, \%kv, $rdata);
226 });
227
228 last; # do not tgry to parse more messages
229 } else {
230 $self->recv ($type, \%kv);
231 }
232 }
233 };
103 234
104 $self->{hdl} = new AnyEvent::Handle 235 $self->{hdl} = new AnyEvent::Handle
105 connect => [$self->{host} => $self->{port}], 236 connect => [$self->{host} => $self->{port}],
106 timeout => $self->{timeout}, 237 timeout => $self->{timeout},
238 on_read => $on_read,
239 on_eof => $self->{on_eof},
107 on_error => sub { 240 on_error => sub {
108 warn "<@_>\n"; 241 $self->fatal ($_[2]);
109 exit 1;
110 }, 242 },
111 on_read => sub { $self->on_read (@_) }, 243 ;
112 on_eof => $self->{on_eof} || sub { };
113 244
114 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 245 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
115 } 246 }
116 247
117 $self->send_msg ( 248 $self->send_msg (client_hello =>
118 client_hello =>
119 name => $self->{name}, 249 name => $self->{name},
120 expected_version => "2.0", 250 expected_version => "2.0",
121 ); 251 );
122 252
123 $self 253 $self
124} 254}
125 255
126#sub progress { 256sub fatal {
127# my ($self, $txn, $type, $attr) = @_; 257 my ($self, $msg) = @_;
128# 258
129# $self->{progress}->($self, $txn, $type, $attr) 259 $self->{hdl}->shutdown;
130# if $self->{progress}; 260 delete $self->{kw};
131#} 261
262 if ($self->{on_error}) {
263 $self->{on_error}->($self, $msg);
264 } else {
265 die $msg;
266 }
267}
268
269sub identifier {
270 $_[0]{prefix} . ++$_[0]{idseq}
271}
132 272
133sub send_msg { 273sub send_msg {
134 my ($self, $type, %kv) = @_; 274 my ($self, $type, %kv) = @_;
135 275
136 my $data = delete $kv{data}; 276 my $data = delete $kv{data};
137 277
138 if (exists $kv{id_cb}) { 278 if (exists $kv{id_cb}) {
139 my $id = $kv{identifier} || ++$self->{id}; 279 my $id = $kv{identifier} ||= $self->identifier;
140 $self->{id}{$id} = delete $kv{id_cb}; 280 $self->{id}{$id} = delete $kv{id_cb};
141 $kv{identifier} = $id;
142 } 281 }
143 282
144 my $msg = (touc $type) . "\012" 283 my $msg = (touc $type) . "\012"
145 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; 284 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
146 285
158 } 297 }
159 298
160 $self->{hdl}->push_write ($msg); 299 $self->{hdl}->push_write ($msg);
161} 300}
162 301
163sub on_read { 302sub on {
164 my ($self) = @_; 303 my ($self, $cb) = @_;
165 304
166 my $type; 305 # cb return undef - message eaten, remove cb
167 my %kv; 306 # cb return 0 - message eaten
168 my $rdata; 307 # cb return 1 - pass to next
169 308
170 my $done_cb = sub { 309 push @{ $self->{on} }, $cb;
171 $kv{pkt_type} = $type; 310}
172 311
173 if (my $cb = $self->{queue}[0]) { 312sub _push_queue {
174 $cb->($self, $type, \%kv, $rdata) 313 my ($self, $queue) = @_;
175 and shift @{ $self->{queue} }; 314
176 } else { 315 shift @$queue;
177 $self->default_recv ($type, \%kv, $rdata); 316 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
317 if @$queue;
318}
319
320# lock so only one $type (arbitrary string) is in flight,
321# to work around horribly misdesigned protocol.
322sub serialise {
323 my ($self, $type, $cb) = @_;
324
325 my $queue = $self->{serialise}{$type} ||= [];
326 push @$queue, $cb;
327 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
328 unless $#$queue;
329}
330
331# how to merge these types into $self->{persistent}
332our %PERSISTENT_TYPE = (
333 persistent_get => sub { %{ $_[1] } = (type => "persistent_get" , %{ $_[2] }) },
334 persistent_put => sub { %{ $_[1] } = (type => "persistent_put" , %{ $_[2] }) },
335 persistent_put_dir => sub { %{ $_[1] } = (type => "persistent_put_dir", %{ $_[2] }) },
336 persistent_request_modified => sub { %{ $_[1] } = (%{ $_[1] }, %{ $_[2] }) },
337 persistent_request_removed => sub { delete $_[0]{req}{$_[2]{identifier}} },
338
339 simple_progress => sub { $_[1]{simple_progress} = $_[2] }, # get/put
340
341 uri_generated => sub { $_[1]{uri_generated} = $_[2] }, # put
342 generated_metadata => sub { $_[1]{generated_metadata} = $_[2] }, # put
343 started_compression => sub { $_[1]{started_compression} = $_[2] }, # put
344 finished_compression => sub { $_[1]{finished_compression} = $_[2] }, # put
345 put_fetchable => sub { $_[1]{put_fetchable} = $_[2] }, # put
346 put_failed => sub { $_[1]{put_failed} = $_[2] }, # put
347 put_successful => sub { $_[1]{put_successful} = $_[2] }, # put
348
349 sending_to_network => sub { $_[1]{sending_to_network} = $_[2] }, # get
350 compatibility_mode => sub { $_[1]{compatibility_mode} = $_[2] }, # get
351 expected_hashes => sub { $_[1]{expected_hashes} = $_[2] }, # get
352 expected_mime => sub { $_[1]{expected_mime} = $_[2] }, # get
353 expected_data_length => sub { $_[1]{expected_data_length} = $_[2] }, # get
354 get_failed => sub { $_[1]{get_failed} = $_[2] }, # get
355 data_found => sub { $_[1]{data_found} = $_[2] }, # get
356 enter_finite_cooldown => sub { $_[1]{enter_finite_cooldown} = $_[2] }, # get
357);
358
359sub recv {
360 my ($self, $type, $kv, @extra) = @_;
361
362 if (my $cb = $PERSISTENT_TYPE{$type}) {
363 my $id = $kv->{identifier};
364 my $req = $_[0]{req}{$id} ||= {};
365 $cb->($self, $req, $kv);
366 $self->recv (request_changed => $kv, $type, @extra);
367 }
368
369 my $on = $self->{on};
370 for (0 .. $#$on) {
371 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
372 splice @$on, $_, 1 unless defined $res;
373 return;
178 } 374 }
179 }; 375 }
180 376
181 my $hdr_cb; $hdr_cb = sub { 377 if (my $cb = $self->{queue}[0]) {
182 if ($_[1] =~ /^([^=]+)=(.*)$/) { 378 $cb->($self, $type, $kv, @extra)
183 my ($k, $v) = ($1, $2); 379 and shift @{ $self->{queue} };
184 my @k = split /\./, tolc $k;
185 my $ro = \\%kv;
186
187 while (@k) {
188 my $k = shift @k;
189 if ($k =~ /^\d+$/) {
190 $ro = \$$ro->[$k];
191 } else {
192 $ro = \$$ro->{$k};
193 }
194 }
195
196 $$ro = $v;
197
198 $_[0]->push_read (line => $hdr_cb);
199 } elsif ($_[1] eq "Data") {
200 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
201 $rdata = \$_[1];
202 $done_cb->();
203 });
204 } elsif ($_[1] eq "EndMessage") {
205 $done_cb->();
206 } else { 380 } else {
207 die "protocol error, expected message end, got $_[1]\n";#d# 381 $self->default_recv ($type, $kv, @extra);
208 }
209 }; 382 }
210
211 $self->{hdl}->push_read (line => sub {
212 $type = tolc $_[1];
213 $_[0]->push_read (line => $hdr_cb);
214 });
215} 383}
216 384
217sub default_recv { 385sub default_recv {
218 my ($self, $type, $kv, $rdata) = @_; 386 my ($self, $type, $kv, $rdata) = @_;
219 387
220 if ($type eq "node_hello") { 388 if ($type eq "node_hello") {
221 $self->{node_hello} = $kv; 389 $self->{node_hello} = $kv;
222 } elsif (exists $self->{id}{$kv->{identifier}}) { 390 } elsif (exists $self->{id}{$kv->{identifier}}) {
223 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) 391 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
224 and delete $self->{id}{$kv->{identifier}}; 392 and delete $self->{id}{$kv->{identifier}};
225 } else {
226 # on_warn
227 #warn "protocol warning (unexpected $type message)\n";
228 } 393 }
229} 394}
395
396=back
397
398=head2 FCP REQUESTS
399
400The following methods implement various requests. Most of them map
401directory to the FCP message of the same name. The added benefit of
402these over sending requests yourself is that they handle the necessary
403serialisation, protocol quirks, and replies.
404
405All of them exist in two versions, the variant shown in this manpage, and
406a variant with an extra C<_> at the end, and an extra C<$cb> argument. The
407version as shown is I<synchronous> - it will wait for any replies, and
408either return the reply, or croak with an error. The underscore variant
409returns immediately and invokes one or more callbacks or condvars later.
410
411For example, the call
412
413 $info = $fcp->get_plugin_info ($name, $detailed);
414
415Also comes in this underscore variant:
416
417 $fcp->get_plugin_info_ ($name, $detailed, $cb);
418
419You can thinbk of the underscore as a kind of continuation indicator - the
420normal function waits and returns with the data, the C<_> indicates that
421you pass the continuation yourself, and the continuation will be invoked
422with the results.
423
424This callback/continuation argument (C<$cb>) can come in three forms itself:
425
426=over 4
427
428=item A code reference (or rather anything not matching some other alternative)
429
430This code reference will be invoked with the result on success. On an
431error, it will invoke the C<on_failure> callback of the FCP object, or,
432if none was defined, will die (in the event loop) with a backtrace of the
433call site.
434
435This is a popular choice, but it makes handling errors hard - make sure
436you never generate protocol errors!
437
438If an C<on_failure> hook exists, it will be invoked with the FCP object,
439a (textual) backtrace as generated by C<Carp::longmess>, and arrayref
440containing the arguments from the original request invocation and the
441error object from the server, in this order, e.g.:
442
443 on_failure => sub {
444 my ($fcp, $backtrace, $orig_args, $error_object) = @_;
445 ...
446 },
447
448=item A condvar (as returned by e.g. C<< AnyEvent->condvar >>)
449
450When a condvar is passed, it is sent (C<< $cv->send ($results) >>) the
451results when the request has finished. Should an error occur, the error
452will instead result in C<< $cv->croak ($error) >>.
453
454This is also a popular choice.
455
456=item An array with two callbacks C<[$success, $failure]>
457
458The C<$success> callback will be invoked with the results, while the
459C<$failure> callback will be invoked on any errors.
460
461The C<$failure> callback will be invoked with the error object from the
462server.
463
464=item C<undef>
465
466This is the same thing as specifying C<sub { }> as callback, i.e. on
467success, the results are ignored, while on failure, the C<on_failure> hook
468is invoked or the module dies with a backtrace.
469
470This is good for quick scripts, or when you really aren't interested in
471the results.
472
473=back
474
475=cut
476
477our $NOP_CB = sub { };
230 478
231sub _txn { 479sub _txn {
232 my ($name, $sub) = @_; 480 my ($name, $sub) = @_;
233 481
234 *{$name} = sub { 482 *{$name} = sub {
235 splice @_, 1, 0, (my $cv = AnyEvent->condvar); 483 my $cv = AE::cv;
236 &$sub;
237 $cv
238 };
239 484
240 *{"$name\_sync"} = sub { 485 splice @_, 1, 0, $cv, sub { $cv->croak ($_[0]{extra_description}) };
241 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
242 &$sub; 486 &$sub;
243 $cv->recv 487 $cv->recv
244 }; 488 };
245}
246 489
247=item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]]) 490 *{"$name\_"} = sub {
491 my ($ok, $err) = pop;
248 492
493 if (ARRAY:: eq ref $ok) {
494 ($ok, $err) = @$ok;
495 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
496 $err = sub { $ok->croak ($_[0]{extra_description}) };
497 } else {
498 my $bt = Carp::longmess "AnyEvent::FCP request $name";
499 Scalar::Util::weaken (my $self = $_[0]);
500 my $args = [@_]; shift @$args;
501 $err = sub {
502 if ($self->{on_failure}) {
503 $self->{on_failure}($self, $args, $bt, $_[0]);
504 } else {
505 die "$_[0]{code_description} ($_[0]{extra_description})$bt";
506 }
507 };
508 }
509
510 $ok ||= $NOP_CB;
511
512 splice @_, 1, 0, $ok, $err;
513 &$sub;
514 };
515}
516
517=over 4
518
249=item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]]) 519=item $peers = $fcp->list_peers ([$with_metdata[, $with_volatile]])
250 520
251=cut 521=cut
252 522
253_txn list_peers => sub { 523_txn list_peers => sub {
254 my ($self, $cv, $with_metadata, $with_volatile) = @_; 524 my ($self, $ok, undef, $with_metadata, $with_volatile) = @_;
255 525
256 my @res; 526 my @res;
257 527
258 $self->send_msg (list_peers => 528 $self->send_msg (list_peers =>
259 with_metadata => $with_metadata ? "true" : "false", 529 with_metadata => $with_metadata ? "true" : "false",
260 with_volatile => $with_volatile ? "true" : "false", 530 with_volatile => $with_volatile ? "true" : "false",
261 id_cb => sub { 531 id_cb => sub {
262 my ($self, $type, $kv, $rdata) = @_; 532 my ($self, $type, $kv, $rdata) = @_;
263 533
264 if ($type eq "end_list_peers") { 534 if ($type eq "end_list_peers") {
265 $cv->(\@res); 535 $ok->(\@res);
266 1 536 1
267 } else { 537 } else {
268 push @res, $kv; 538 push @res, $kv;
269 0 539 0
270 } 540 }
271 }, 541 },
272 ); 542 );
273}; 543};
274 544
275=item $cv = $fcp->list_peer_notes ($node_identifier)
276
277=item $notes = $fcp->list_peer_notes_sync ($node_identifier) 545=item $notes = $fcp->list_peer_notes ($node_identifier)
278 546
279=cut 547=cut
280 548
281_txn list_peer_notes => sub { 549_txn list_peer_notes => sub {
282 my ($self, $cv, $node_identifier) = @_; 550 my ($self, $ok, undef, $node_identifier) = @_;
283 551
284 $self->send_msg (list_peer_notes => 552 $self->send_msg (list_peer_notes =>
285 node_identifier => $node_identifier, 553 node_identifier => $node_identifier,
286 id_cb => sub { 554 id_cb => sub {
287 my ($self, $type, $kv, $rdata) = @_; 555 my ($self, $type, $kv, $rdata) = @_;
288 556
289 $cv->($kv); 557 $ok->($kv);
290 1 558 1
291 }, 559 },
292 ); 560 );
293}; 561};
294 562
295=item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
296
297=item $fcp->watch_global_sync ($enabled[, $verbosity_mask]) 563=item $fcp->watch_global ($enabled[, $verbosity_mask])
298 564
299=cut 565=cut
300 566
301_txn watch_global => sub { 567_txn watch_global => sub {
302 my ($self, $cv, $enabled, $verbosity_mask) = @_; 568 my ($self, $ok, $err, $enabled, $verbosity_mask) = @_;
303 569
304 $self->send_msg (watch_global => 570 $self->send_msg (watch_global =>
305 enabled => $enabled ? "true" : "false", 571 enabled => $enabled ? "true" : "false",
306 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (), 572 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
307 ); 573 );
308 574
309 $cv->(); 575 $ok->();
310}; 576};
311 577
312=item $cv = $fcp->list_persistent_requests
313
314=item $reqs = $fcp->list_persistent_requests_sync 578=item $reqs = $fcp->list_persistent_requests
315 579
316=cut 580=cut
317 581
318_txn list_persistent_requests => sub { 582_txn list_persistent_requests => sub {
319 my ($self, $cv) = @_; 583 my ($self, $ok, $err) = @_;
320 584
585 $self->serialise (list_persistent_requests => sub {
586 my ($self, $guard) = @_;
587
321 my %res; 588 my @res;
322 589
323 $self->send_msg ("list_persistent_requests"); 590 $self->send_msg ("list_persistent_requests");
324 591
325 push @{ $self->{queue} }, sub { 592 $self->on (sub {
326 my ($self, $type, $kv, $rdata) = @_; 593 my ($self, $type, $kv, $rdata) = @_;
327 594
595 $guard if 0;
596
328 if ($type eq "end_list_persistent_requests") { 597 if ($type eq "end_list_persistent_requests") {
329 $cv->(\%res); 598 $ok->(\@res);
599 return;
600 } else {
601 my $id = $kv->{identifier};
602
603 if ($type =~ /^persistent_(get|put|put_dir)$/) {
604 push @res, [$type, $kv];
605 }
606 }
607
330 1 608 1
331 } else { 609 });
332 my $id = $kv->{identifier}; 610 });
611};
333 612
334 if ($type =~ /^persistent_(get|put|put_dir)$/) { 613=item $sync = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
335 $res{$id} = { 614
336 type => $1, 615Update either the C<client_token> or C<priority_class> of a request
337 %{ $res{$id} }, 616identified by C<$global> and C<$identifier>, depending on which of
617C<$client_token> and C<$priority_class> are not C<undef>.
618
619=cut
620
621_txn modify_persistent_request => sub {
622 my ($self, $ok, $err, $global, $identifier, $client_token, $priority_class) = @_;
623
624 $self->serialise ($identifier => sub {
625 my ($self, $guard) = @_;
626
627 $self->send_msg (modify_persistent_request =>
628 global => $global ? "true" : "false",
629 identifier => $identifier,
630 defined $client_token ? (client_token => $client_token ) : (),
631 defined $priority_class ? (priority_class => $priority_class) : (),
632 );
633
634 $self->on (sub {
635 my ($self, $type, $kv, @extra) = @_;
636
637 $guard if 0;
638
639 if ($kv->{identifier} eq $identifier) {
640 if ($type eq "persistent_request_modified") {
338 %$kv, 641 $ok->($kv);
642 return;
643 } elsif ($type eq "protocol_error") {
644 $err->($kv);
645 return;
339 }; 646 }
340 } elsif ($type eq "simple_progress") {
341 delete $kv->{pkt_type}; # save memory
342 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
343 } else {
344 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
345 } 647 }
648
346 0 649 1
347 } 650 });
348 }; 651 });
349}; 652};
350 653
351=item $cv = $fcp->remove_request ($global, $identifier)
352
353=item $status = $fcp->remove_request_sync ($global, $identifier)
354
355=cut
356
357_txn remove_request => sub {
358 my ($self, $cv, $global, $identifier) = @_;
359
360 $self->send_msg (remove_request =>
361 global => $global ? "true" : "false",
362 identifier => $identifier,
363 id_cb => sub {
364 my ($self, $type, $kv, $rdata) = @_;
365
366 $cv->($kv);
367 1
368 },
369 );
370
371 $cv->();
372};
373
374=item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
375
376=item $status = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
377
378=cut
379
380_txn modify_persistent_request => sub {
381 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
382
383 $self->send_msg (modify_persistent_request =>
384 global => $global ? "true" : "false",
385 identifier => $identifier,
386 defined $client_token ? (client_token => $client_token ) : (),
387 defined $priority_class ? (priority_class => $priority_class) : (),
388 id_cb => sub {
389 my ($self, $type, $kv, $rdata) = @_;
390
391 $cv->($kv);
392 1
393 },
394 );
395
396 $cv->();
397};
398
399=item $cv = $fcp->get_plugin_info ($name, $detailed)
400
401=item $info = $fcp->get_plugin_info_sync ($name, $detailed) 654=item $info = $fcp->get_plugin_info ($name, $detailed)
402 655
403=cut 656=cut
404 657
405_txn get_plugin_info => sub { 658_txn get_plugin_info => sub {
406 my ($self, $cv, $name, $detailed) = @_; 659 my ($self, $ok, $err, $name, $detailed) = @_;
660
661 my $id = $self->identifier;
407 662
408 $self->send_msg (get_plugin_info => 663 $self->send_msg (get_plugin_info =>
664 identifier => $id,
409 plugin_name => $name, 665 plugin_name => $name,
410 detailed => $detailed ? "true" : "false", 666 detailed => $detailed ? "true" : "false",
411 id_cb => sub { 667 );
668 $self->on (sub {
669 my ($self, $type, $kv) = @_;
670
671 if ($kv->{identifier} eq $id) {
672 if ($type eq "get_plugin_info") {
673 $ok->($kv);
674 } else {
675 $err->($kv, $type);
676 }
677 return;
678 }
679
680 1
681 });
682};
683
684=item $status = $fcp->client_get ($uri, $identifier, %kv)
685
686%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
687
688ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
689priority_class, persistence, client_token, global, return_type,
690binary_blob, allowed_mime_types, filename, temp_filename
691
692=cut
693
694_txn client_get => sub {
695 my ($self, $ok, $err, $uri, $identifier, %kv) = @_;
696
697 $self->serialise ($identifier => sub {
698 my ($self, $guard) = @_;
699
700 $self->send_msg (client_get =>
701 %kv,
702 uri => $uri,
703 identifier => $identifier,
704 );
705
706 $self->on (sub {
412 my ($self, $type, $kv, $rdata) = @_; 707 my ($self, $type, $kv, @extra) = @_;
413 708
709 $guard if 0;
710
711 if ($kv->{identifier} eq $identifier) {
712 if ($type eq "persistent_get") {
414 $cv->($kv); 713 $ok->($kv);
714 return;
715 } elsif ($type eq "protocol_error") {
716 $err->($kv);
717 return;
718 }
719 }
720
415 1 721 1
722 });
723 });
724};
725
726=item $status = $fcp->remove_request ($identifier[, $global])
727
728Remove the request with the given isdentifier. Returns true if successful,
729false on error.
730
731=cut
732
733_txn remove_request => sub {
734 my ($self, $ok, $err, $identifier, $global) = @_;
735
736 $self->serialise ($identifier => sub {
737 my ($self, $guard) = @_;
738
739 $self->send_msg (remove_request =>
740 identifier => $identifier,
741 global => $global ? "true" : "false",
742 );
743 $self->on (sub {
744 my ($self, $type, $kv, @extra) = @_;
745
746 $guard if 0;
747
748 if ($kv->{identifier} eq $identifier) {
749 if ($type eq "persistent_request_removed") {
750 $ok->(1);
751 return;
752 } elsif ($type eq "protocol_error") {
753 $err->($kv);
754 return;
755 }
756 }
757
758 1
759 });
760 });
761};
762
763=item ($can_read, $can_write) = $fcp->test_dda ($local_directory, $remote_directory, $want_read, $want_write))
764
765The DDA test in FCP is probably the single most broken protocol - only
766one directory test can be outstanding at any time, and some guessing and
767heuristics are involved in mangling the paths.
768
769This function combines C<TestDDARequest> and C<TestDDAResponse> in one
770request, handling file reading and writing as well, and tries very hard to
771do the right thing.
772
773Both C<$local_directory> and C<$remote_directory> must specify the same
774directory - C<$local_directory> is the directory path on the client (where
775L<AnyEvent::FCP> runs) and C<$remote_directory> is the directory path on
776the server (where the freenet node runs). When both are running on the
777same node, the paths are generally identical.
778
779C<$want_read> and C<$want_write> should be set to a true value when you
780want to read (get) files or write (put) files, respectively.
781
782On error, an exception is thrown. Otherwise, C<$can_read> and
783C<$can_write> indicate whether you can reaqd or write to freenet via the
784directory.
785
786=cut
787
788_txn test_dda => sub {
789 my ($self, $ok, $err, $local, $remote, $want_read, $want_write) = @_;
790
791 $self->serialise (test_dda => sub {
792 my ($self, $guard) = @_;
793
794 $self->send_msg (test_dda_request =>
795 directory => $remote,
796 want_read_directory => $want_read ? "true" : "false",
797 want_write_directory => $want_write ? "true" : "false",
798 );
799 $self->on (sub {
800 my ($self, $type, $kv) = @_;
801
802 if ($type eq "test_dda_reply") {
803 # the filenames are all relative to the server-side directory,
804 # which might or might not match $remote anymore, so we
805 # need to rewrite the paths to be relative to $local
806 for my $k (qw(read_filename write_filename)) {
807 my $f = $kv->{$k};
808 for my $dir ($kv->{directory}, $remote) {
809 if ($dir eq substr $f, 0, length $dir) {
810 substr $f, 0, 1 + length $dir, "";
811 $kv->{$k} = $f;
812 last;
813 }
814 }
815 }
816
817 my %response = (directory => $remote);
818
819 if (length $kv->{read_filename}) {
820 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
821 sysread $fh, my $buf, -s $fh;
822 $response{read_content} = $buf;
823 }
824 }
825
826 if (length $kv->{write_filename}) {
827 if (open my $fh, ">:raw", "$local/$kv->{write_filename}") {
828 syswrite $fh, $kv->{content_to_write};
829 }
830 }
831
832 $self->send_msg (test_dda_response => %response);
833
834 $self->on (sub {
835 my ($self, $type, $kv) = @_;
836
837 $guard if 0; # reference
838
839 if ($type eq "test_dda_complete") {
840 $ok->(
841 $kv->{read_directory_allowed} eq "true",
842 $kv->{write_directory_allowed} eq "true",
843 );
844 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
845 $err->($kv->{extra_description});
846 return;
847 }
848
849 1
850 });
851
852 return;
853 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
854 $err->($kv);
855 return;
856 }
857
858 1
859 });
860 });
861};
862
863=back
864
865=head2 REQUEST CACHE
866
867The C<AnyEvent::FCP> class keeps a request cache, where it caches all
868information from requests.
869
870For these messages, it will store a copy of the key-value pairs, together with a C<type> slot,
871in C<< $fcp->{req}{$identifier} >>:
872
873 persistent_get
874 persistent_put
875 persistent_put_dir
876
877This message updates the stored data:
878
879 persistent_request_modified
880
881This message will remove this entry:
882
883 persistent_request_removed
884
885These messages get merged into the cache entry, under their
886type, i.e. a C<simple_progress> message will be stored in C<<
887$fcp->{req}{$identifier}{simple_progress} >>:
888
889 simple_progress # get/put
890
891 uri_generated # put
892 generated_metadata # put
893 started_compression # put
894 finished_compression # put
895 put_failed # put
896 put_fetchable # put
897 put_successful # put
898
899 sending_to_network # get
900 compatibility_mode # get
901 expected_hashes # get
902 expected_mime # get
903 expected_data_length # get
904 get_failed # get
905 data_found # get
906 enter_finite_cooldown # get
907
908In addition, an event (basically a fake message) of type C<request_changed> is generated
909on every change, which will be called as C<< $cb->($fcp, $kv, $type) >>, where C<$type>
910is the type of the original message triggering the change,
911
912To fill this cache with the global queue and keep it updated,
913call C<watch_global> to subscribe to updates, followed by
914C<list_persistent_requests_sync>.
915
916 $fcp->watch_global_sync_; # do not wait
917 $fcp->list_persistent_requests; # wait
918
919To get a better idea of what is stored in the cache, here is an example of
920what might be stored in C<< $fcp->{req}{"Frost-gpl.txt"} >>:
921
922 {
923 identifier => "Frost-gpl.txt",
924 uri => 'CHK@Fnx5kzdrfE,EImdzaVyEWl,AAIC--8/gpl.txt',
925 binary_blob => "false",
926 global => "true",
927 max_retries => -1,
928 max_size => 9223372036854775807,
929 persistence => "forever",
930 priority_class => 3,
931 real_time => "false",
932 return_type => "direct",
933 started => "true",
934 type => "persistent_get",
935 verbosity => 2147483647,
936 sending_to_network => {
937 identifier => "Frost-gpl.txt",
938 global => "true",
416 }, 939 },
417 ); 940 compatibility_mode => {
418 941 identifier => "Frost-gpl.txt",
419 $cv->(); 942 definitive => "true",
420}; 943 dont_compress => "false",
421 944 global => "true",
422=back 945 max => "COMPAT_1255",
946 min => "COMPAT_1255",
947 },
948 expected_hashes => {
949 identifier => "Frost-gpl.txt",
950 global => "true",
951 hashes => {
952 ed2k => "d83596f5ee3b7...",
953 md5 => "e0894e4a2a6...",
954 sha1 => "...",
955 sha256 => "...",
956 sha512 => "...",
957 tth => "...",
958 },
959 },
960 expected_mime => {
961 identifier => "Frost-gpl.txt",
962 global => "true",
963 metadata => { content_type => "application/rar" },
964 },
965 expected_data_length => {
966 identifier => "Frost-gpl.txt",
967 data_length => 37576,
968 global => "true",
969 },
970 simple_progress => {
971 identifier => "Frost-gpl.txt",
972 failed => 0,
973 fatally_failed => 0,
974 finalized_total => "true",
975 global => "true",
976 last_progress => 1438639282628,
977 required => 372,
978 succeeded => 102,
979 total => 747,
980 },
981 data_found => {
982 identifier => "Frost-gpl.txt",
983 completion_time => 1438663354026,
984 data_length => 37576,
985 global => "true",
986 metadata => { content_type => "image/jpeg" },
987 startup_time => 1438657196167,
988 },
989 }
423 990
424=head1 EXAMPLE PROGRAM 991=head1 EXAMPLE PROGRAM
425 992
426 use AnyEvent::FCP; 993 use AnyEvent::FCP;
427 994
428 my $fcp = new AnyEvent::FCP; 995 my $fcp = new AnyEvent::FCP;
429 996
430 # let us look at the global request list 997 # let us look at the global request list
431 $fcp->watch_global (1, 0); 998 $fcp->watch_global_ (1);
432 999
433 # list them, synchronously 1000 # list them, synchronously
434 my $req = $fcp->list_persistent_requests_sync; 1001 my $req = $fcp->list_persistent_requests;
435 1002
436 # go through all requests 1003 # go through all requests
1004TODO
437 for my $req (values %$req) { 1005 for my $req (values %$req) {
438 # skip jobs not directly-to-disk 1006 # skip jobs not directly-to-disk
439 next unless $req->{return_type} eq "disk"; 1007 next unless $req->{return_type} eq "disk";
440 # skip jobs not issued by FProxy 1008 # skip jobs not issued by FProxy
441 next unless $req->{identifier} =~ /^FProxy:/; 1009 next unless $req->{identifier} =~ /^FProxy:/;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines