ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.4 by root, Wed Jul 29 09:25:46 2009 UTC vs.
Revision 1.28 by root, Thu Sep 9 00:49:06 2021 UTC

6 6
7 use AnyEvent::FCP; 7 use AnyEvent::FCP;
8 8
9 my $fcp = new AnyEvent::FCP; 9 my $fcp = new AnyEvent::FCP;
10 10
11# transactions return condvars 11 # transactions return condvars
12 my $lp_cv = $fcp->list_peers; 12 my $lp_cv = $fcp->list_peers;
13 my $pr_cv = $fcp->list_persistent_requests; 13 my $pr_cv = $fcp->list_persistent_requests;
14 14
15 my $peers = $lp_cv->recv; 15 my $peers = $lp_cv->recv;
16 my $reqs = $pr_cv->recv; 16 my $reqs = $pr_cv->recv;
18=head1 DESCRIPTION 18=head1 DESCRIPTION
19 19
20This module implements the freenet client protocol version 2.0, as used by 20This module implements the freenet client protocol version 2.0, as used by
21freenet 0.7. See L<Net::FCP> for the earlier freenet 0.5 version. 21freenet 0.7. See L<Net::FCP> for the earlier freenet 0.5 version.
22 22
23See L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0> for a 23See L<https://wiki.freenetproject.org/FCP> for a description of what the
24description of what the messages do. 24messages do.
25 25
26The module uses L<AnyEvent> to find a suitable event module. 26The module uses L<AnyEvent> to find a suitable event module.
27 27
28Only very little is implemented, ask if you need more, and look at the 28Only very little is implemented, ask if you need more, and look at the
29example program later in this section. 29example program later in this section.
30 30
31=head2 EXAMPLE
32
33This example fetches the download list and sets the priority of all files
34with "a" in their name to "emergency":
35
36 use AnyEvent::FCP;
37
38 my $fcp = new AnyEvent::FCP;
39
40 $fcp->watch_global (1, 0);
41 my $req = $fcp->list_persistent_requests;
42
43TODO
44 for my $req (values %$req) {
45 if ($req->{filename} =~ /a/) {
46 $fcp->modify_persistent_request (1, $req->{identifier}, undef, 0);
47 }
48 }
49
31=head2 IMPORT TAGS 50=head2 IMPORT TAGS
32 51
33Nothing much can be "imported" from this module right now. 52Nothing much can be "imported" from this module right now.
34 53
35=head2 THE AnyEvent::FCP CLASS 54=head1 THE AnyEvent::FCP CLASS
36 55
37=over 4 56=over 4
38 57
39=cut 58=cut
40 59
42 61
43use common::sense; 62use common::sense;
44 63
45use Carp; 64use Carp;
46 65
47our $VERSION = '0.2'; 66our $VERSION = 0.5;
48 67
49use Scalar::Util (); 68use Scalar::Util ();
50 69
51use AnyEvent; 70use AnyEvent;
52use AnyEvent::Handle; 71use AnyEvent::Handle;
72use AnyEvent::Util ();
73
74our %TOLC; # tolc cache
53 75
54sub touc($) { 76sub touc($) {
55 local $_ = shift; 77 local $_ = shift;
56 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime)(?:_|$))/\U$1/; 78 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
57 s/(?:^|_)(.)/\U$1/g; 79 s/(?:^|_)(.)/\U$1/g;
58 $_ 80 $_
59} 81}
60 82
61sub tolc($) { 83sub tolc($) {
62 local $_ = shift; 84 local $_ = shift;
63 1 while s/(SVK|CHK|URI|FCP|DS|MIME)([^_])/$1\_$2/i; 85 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/;
64 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME)/$1\_$2/i; 86 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/;
65 s/(?<=[a-z])(?=[A-Z])/_/g; 87 s/(?<=[a-z])(?=[A-Z])/_/g;
66 lc 88 lc
67} 89}
68 90
69=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 91=item $fcp = new AnyEvent::FCP key => value...;
70 92
71Create a new FCP connection to the given host and port (default 93Create a new FCP connection to the given host and port (default
72127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). 94127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
73 95
74If no C<name> was specified, then AnyEvent::FCP will generate a 96If no C<name> was specified, then AnyEvent::FCP will generate a
75(hopefully) unique client name for you. 97(hopefully) unique client name for you.
76 98
77=cut 99The following keys can be specified (they are all optional):
78 100
79#TODO 101=over 4
80#You can install a progress callback that is being called with the AnyEvent::FCP 102
81#object, a txn object, the type of the transaction and the attributes. Use 103=item name => $string
82#it like this: 104
83# 105A unique name to identify this client. If none is specified, a randomly
84# sub progress_cb { 106generated name will be used.
85# my ($self, $txn, $type, $attr) = @_; 107
86# 108=item host => $hostname
87# warn "progress<$txn,$type," . (join ":", %$attr) . ">\n"; 109
88# } 110The hostname or IP address of the freenet node. Default is C<$ENV{FREDHOST}>
111or C<127.0.0.1>.
112
113=item port => $portnumber
114
115The port number of the FCP port. Default is C<$ENV{FREDPORT}> or C<9481>.
116
117=item timeout => $seconds
118
119The timeout, in seconds, after which a connection error is assumed when
120there is no activity. Default is C<7200>, i.e. two hours.
121
122=item keepalive => $seconds
123
124The interval, in seconds, at which keepalive messages will be
125sent. Default is C<540>, i.e. nine minutes.
126
127These keepalive messages are useful both to detect that a connection is
128no longer working and to keep any (home) routers from expiring their
129masquerading entry.
130
131=item on_eof => $callback->($fcp)
132
133Invoked when the underlying L<AnyEvent::Handle> signals EOF, currently
134regardless of whether the EOF was expected or not.
135
136=item on_error => $callback->($fcp, $message)
137
138Invoked on any (fatal) errors, such as unexpected connection close. The
139callback receives the FCP object and a textual error message.
140
141=item on_failure => $callback->($fcp, $type, $args, $backtrace, $error)
142
143Invoked when an FCP request fails that didn't have a failure callback. See
144L<FCP REQUESTS> for details.
145
146=back
147
148=cut
89 149
90sub new { 150sub new {
91 my $class = shift; 151 my $class = shift;
152
153 my $rand = join "", map chr 0x21 + rand 94, 1..40; # ~ 262 bits entropy
154
92 my $self = bless { @_ }, $class; 155 my $self = bless {
93 156 host => $ENV{FREDHOST} || "127.0.0.1",
94 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 157 port => $ENV{FREDPORT} || 9481,
95 $self->{port} ||= $ENV{FREDPORT} || 9481; 158 timeout => 3600 * 2,
96 $self->{name} ||= time.rand.rand.rand; # lame 159 keepalive => 9 * 60,
97 $self->{timeout} ||= 600; 160 name => time.rand.rand.rand, # lame
98 161 @_,
99 $self->{id} = "a0"; 162 queue => [],
163 req => {},
164 prefix => "..:aefcpid:$rand:",
165 idseq => "a0",
166 }, $class;
100 167
101 { 168 {
102 Scalar::Util::weaken (my $self = $self); 169 Scalar::Util::weaken (my $self = $self);
170
171 $self->{kw} = AE::timer $self->{keepalive}, $self->{keepalive}, sub {
172 $self->{hdl}->push_write ("\n");
173 };
174
175 our $ENDMESSAGE = qr<\012(EndMessage|Data)\012>;
176
177 # these are declared here for performance reasons
178 my ($k, $v, $type);
179 my $rdata;
180
181 my $on_read = sub {
182 my ($hdl) = @_;
183
184 # we only carve out whole messages here
185 while ($hdl->{rbuf} =~ /\012(EndMessage|Data)\012/) {
186 # remember end marker
187 $rdata = $1 eq "Data"
188 or $1 eq "EndMessage"
189 or return $self->fatal ("protocol error, expected message end, got $1\n");
190
191 my @lines = split /\012/, substr $hdl->{rbuf}, 0, $-[0];
192
193 substr $hdl->{rbuf}, 0, $+[0], ""; # remove pkg
194
195 $type = shift @lines;
196 $type = ($TOLC{$type} ||= tolc $type);
197
198 my %kv;
199
200 for (@lines) {
201 ($k, $v) = split /=/, $_, 2;
202 $k = ($TOLC{$k} ||= tolc $k);
203
204 if ($k =~ /\./) {
205 # generic, slow case
206 my @k = split /\./, $k;
207 my $ro = \\%kv;
208
209 while (@k) {
210 $k = shift @k;
211 if ($k =~ /^\d+$/) {
212 $ro = \$$ro->[$k];
213 } else {
214 $ro = \$$ro->{$k};
215 }
216 }
217
218 $$ro = $v;
219
220 next;
221 }
222
223 # special comon case, for performance only
224 $kv{$k} = $v;
225 }
226
227 if ($rdata) {
228 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
229 $rdata = \$_[1];
230 $self->recv ($type, \%kv, $rdata);
231 });
232
233 last; # do not tgry to parse more messages
234 } else {
235 $self->recv ($type, \%kv);
236 }
237 }
238 };
103 239
104 $self->{hdl} = new AnyEvent::Handle 240 $self->{hdl} = new AnyEvent::Handle
105 connect => [$self->{host} => $self->{port}], 241 connect => [$self->{host} => $self->{port}],
106 timeout => $self->{timeout}, 242 timeout => $self->{timeout},
243 on_read => $on_read,
244 on_eof => sub {
245 if ($self->{on_eof}) {
246 $self->{on_eof}($self);
247 } else {
248 $self->fatal ("EOF");
249 }
250 },
107 on_error => sub { 251 on_error => sub {
108 warn "<@_>\n"; 252 $self->fatal ($_[2]);
109 exit 1;
110 }, 253 },
111 on_read => sub { $self->on_read (@_) }, 254 ;
112 on_eof => $self->{on_eof} || sub { };
113 255
114 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 256 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
115 } 257 }
116 258
117 $self->send_msg ( 259 $self->send_msg (client_hello =>
118 client_hello =>
119 name => $self->{name}, 260 name => $self->{name},
120 expected_version => "2.0", 261 expected_version => "2.0",
121 ); 262 );
122 263
123 $self 264 $self
124} 265}
125 266
126#sub progress { 267sub fatal {
127# my ($self, $txn, $type, $attr) = @_; 268 my ($self, $msg) = @_;
128# 269
129# $self->{progress}->($self, $txn, $type, $attr) 270 $self->{hdl}->push_shutdown if $self->{hdl};
130# if $self->{progress}; 271 delete $self->{kw};
131#} 272
273 if ($self->{on_error}) {
274 $self->{on_error}->($self, $msg);
275 } else {
276 die "AnyEvent::FCP($self->{host}:$self->{port}): $msg";
277 }
278}
279
280sub identifier {
281 $_[0]{prefix} . ++$_[0]{idseq}
282}
132 283
133sub send_msg { 284sub send_msg {
134 my ($self, $type, %kv) = @_; 285 my ($self, $type, %kv) = @_;
135 286
136 my $data = delete $kv{data}; 287 my $data = delete $kv{data};
137 288
138 if (exists $kv{id_cb}) { 289 if (exists $kv{id_cb}) {
139 my $id = $kv{identifier} || ++$self->{id}; 290 my $id = $kv{identifier} ||= $self->identifier;
140 $self->{id}{$id} = delete $kv{id_cb}; 291 $self->{id}{$id} = delete $kv{id_cb};
141 $kv{identifier} = $id;
142 } 292 }
143 293
144 my $msg = (touc $type) . "\012" 294 my $msg = (touc $type) . "\012"
145 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; 295 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
146
147 sub id {
148 my ($self) = @_;
149
150
151 }
152 296
153 if (defined $data) { 297 if (defined $data) {
154 $msg .= "DataLength=" . (length $data) . "\012" 298 $msg .= "DataLength=" . (length $data) . "\012"
155 . "Data\012$data"; 299 . "Data\012$data";
156 } else { 300 } else {
158 } 302 }
159 303
160 $self->{hdl}->push_write ($msg); 304 $self->{hdl}->push_write ($msg);
161} 305}
162 306
163sub on_read { 307sub on {
164 my ($self) = @_; 308 my ($self, $cb) = @_;
165 309
166 my $type; 310 # cb return undef - message eaten, remove cb
167 my %kv; 311 # cb return 0 - message eaten
168 my $rdata; 312 # cb return 1 - pass to next
169 313
170 my $done_cb = sub { 314 push @{ $self->{on} }, $cb;
171 $kv{pkt_type} = $type; 315}
172 316
173 if (my $cb = $self->{queue}[0]) { 317sub _push_queue {
174 $cb->($self, $type, \%kv, $rdata) 318 my ($self, $queue) = @_;
175 and shift @{ $self->{queue} }; 319
176 } else { 320 shift @$queue;
177 $self->default_recv ($type, \%kv, $rdata); 321 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
322 if @$queue;
323}
324
325# lock so only one $type (arbitrary string) is in flight,
326# to work around horribly misdesigned protocol.
327sub serialise {
328 my ($self, $type, $cb) = @_;
329
330 my $queue = $self->{serialise}{$type} ||= [];
331 push @$queue, $cb;
332 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
333 unless $#$queue;
334}
335
336# how to merge these types into $self->{persistent}
337our %PERSISTENT_TYPE = (
338 persistent_get => sub { %{ $_[1] } = (type => "persistent_get" , %{ $_[2] }) },
339 persistent_put => sub { %{ $_[1] } = (type => "persistent_put" , %{ $_[2] }) },
340 persistent_put_dir => sub { %{ $_[1] } = (type => "persistent_put_dir", %{ $_[2] }) },
341 persistent_request_modified => sub { %{ $_[1] } = (%{ $_[1] }, %{ $_[2] }) },
342 persistent_request_removed => sub { delete $_[0]{req}{$_[2]{identifier}} },
343
344 simple_progress => sub { $_[1]{simple_progress} = $_[2] }, # get/put
345
346 uri_generated => sub { $_[1]{uri_generated} = $_[2] }, # put
347 generated_metadata => sub { $_[1]{generated_metadata} = $_[2] }, # put
348 started_compression => sub { $_[1]{started_compression} = $_[2] }, # put
349 finished_compression => sub { $_[1]{finished_compression} = $_[2] }, # put
350 put_fetchable => sub { $_[1]{put_fetchable} = $_[2] }, # put
351 put_failed => sub { $_[1]{put_failed} = $_[2] }, # put
352 put_successful => sub { $_[1]{put_successful} = $_[2] }, # put
353
354 sending_to_network => sub { $_[1]{sending_to_network} = $_[2] }, # get
355 compatibility_mode => sub { $_[1]{compatibility_mode} = $_[2] }, # get
356 expected_hashes => sub { $_[1]{expected_hashes} = $_[2] }, # get
357 expected_mime => sub { $_[1]{expected_mime} = $_[2] }, # get
358 expected_data_length => sub { $_[1]{expected_data_length} = $_[2] }, # get
359 get_failed => sub { $_[1]{get_failed} = $_[2] }, # get
360 data_found => sub { $_[1]{data_found} = $_[2] }, # get
361 enter_finite_cooldown => sub { $_[1]{enter_finite_cooldown} = $_[2] }, # get
362);
363
364sub recv {
365 my ($self, $type, $kv, @extra) = @_;
366
367 if (my $cb = $PERSISTENT_TYPE{$type}) {
368 my $id = $kv->{identifier};
369 my $req = $_[0]{req}{$id} ||= {};
370 $cb->($self, $req, $kv);
371 $self->recv (request_changed => $kv, $type, @extra);
372 }
373
374 my $on = $self->{on};
375 for (0 .. $#$on) {
376 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
377 splice @$on, $_, 1 unless defined $res;
378 return;
178 } 379 }
179 }; 380 }
180 381
181 my $hdr_cb; $hdr_cb = sub { 382 if (my $cb = $self->{queue}[0]) {
182 if ($_[1] =~ /^([^=]+)=(.*)$/) { 383 $cb->($self, $type, $kv, @extra)
183 my ($k, $v) = ($1, $2); 384 and shift @{ $self->{queue} };
184 my @k = split /\./, tolc $k;
185 my $ro = \\%kv;
186
187 while (@k) {
188 my $k = shift @k;
189 if ($k =~ /^\d+$/) {
190 $ro = \$$ro->[$k];
191 } else {
192 $ro = \$$ro->{$k};
193 }
194 }
195
196 $$ro = $v;
197
198 $_[0]->push_read (line => $hdr_cb);
199 } elsif ($_[1] eq "Data") {
200 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
201 $rdata = \$_[1];
202 $done_cb->();
203 });
204 } elsif ($_[1] eq "EndMessage") {
205 $done_cb->();
206 } else { 385 } else {
207 die "protocol error, expected message end, got $_[1]\n";#d# 386 $self->default_recv ($type, $kv, @extra);
208 }
209 }; 387 }
210
211 $self->{hdl}->push_read (line => sub {
212 $type = tolc $_[1];
213 $_[0]->push_read (line => $hdr_cb);
214 });
215} 388}
216 389
217sub default_recv { 390sub default_recv {
218 my ($self, $type, $kv, $rdata) = @_; 391 my ($self, $type, $kv, $rdata) = @_;
219 392
220 if ($type eq "node_hello") { 393 if ($type eq "node_hello") {
221 $self->{node_hello} = $kv; 394 $self->{node_hello} = $kv;
222 } elsif (exists $self->{id}{$kv->{identifier}}) { 395 } elsif (exists $self->{id}{$kv->{identifier}}) {
223 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) 396 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
224 and delete $self->{id}{$kv->{identifier}}; 397 and delete $self->{id}{$kv->{identifier}};
225 } else {
226 # on_warn
227 #warn "protocol warning (unexpected $type message)\n";
228 } 398 }
229} 399}
400
401=back
402
403=head2 FCP REQUESTS
404
405The following methods implement various requests. Most of them map
406directory to the FCP message of the same name. The added benefit of
407these over sending requests yourself is that they handle the necessary
408serialisation, protocol quirks, and replies.
409
410All of them exist in two versions, the variant shown in this manpage, and
411a variant with an extra C<_> at the end, and an extra C<$cb> argument. The
412version as shown is I<synchronous> - it will wait for any replies, and
413either return the reply, or croak with an error. The underscore variant
414returns immediately and invokes one or more callbacks or condvars later.
415
416For example, the call
417
418 $info = $fcp->get_plugin_info ($name, $detailed);
419
420Also comes in this underscore variant:
421
422 $fcp->get_plugin_info_ ($name, $detailed, $cb);
423
424You can think of the underscore as a kind of continuation indicator - the
425normal function waits and returns with the data, the C<_> indicates that
426you pass the continuation yourself, and the continuation will be invoked
427with the results.
428
429This callback/continuation argument (C<$cb>) can come in three forms itself:
430
431=over 4
432
433=item A code reference (or rather anything not matching some other alternative)
434
435This code reference will be invoked with the result on success. On an
436error, it will invoke the C<on_failure> callback of the FCP object, or,
437if none was defined, will die (in the event loop) with a backtrace of the
438call site.
439
440This is a popular choice, but it makes handling errors hard - make sure
441you never generate protocol errors!
442
443In the failure case, if an C<on_failure> hook exists, it will be invoked
444with the FCP object, the request type (the name of the method, an arrayref
445containing the arguments from the original request invocation, a (textual)
446backtrace as generated by C<Carp::longmess>, and the error object from the
447server, in this order, e.g.:
448
449 on_failure => sub {
450 my ($fcp, $request_type, $orig_args, $backtrace, $error_object) = @_;
451
452 warn "FCP failure ($type @$args), $error_object->{code_description} ($error_object->{extra_description})$backtrace";
453 exit 1;
454 },
455
456=item A condvar (as returned by e.g. C<< AnyEvent->condvar >>)
457
458When a condvar is passed, it is sent (C<< $cv->send ($results) >>) the
459results when the request has finished. Should an error occur, the error
460will instead result in C<< $cv->croak ($error) >>.
461
462This is also a popular choice.
463
464=item An array with two callbacks C<[$success, $failure]>
465
466The C<$success> callback will be invoked with the results, while the
467C<$failure> callback will be invoked on any errors.
468
469The C<$failure> callback will be invoked with the error object from the
470server.
471
472=item C<undef>
473
474This is the same thing as specifying C<sub { }> as callback, i.e. on
475success, the results are ignored, while on failure, the C<on_failure> hook
476is invoked or the module dies with a backtrace.
477
478This is good for quick scripts, or when you really aren't interested in
479the results.
480
481=back
482
483=cut
484
485our $NOP_CB = sub { };
230 486
231sub _txn { 487sub _txn {
232 my ($name, $sub) = @_; 488 my ($name, $sub) = @_;
233 489
234 *{$name} = sub { 490 *{$name} = sub {
235 splice @_, 1, 0, (my $cv = AnyEvent->condvar); 491 my $cv = AE::cv;
236 &$sub;
237 $cv
238 };
239 492
240 *{"$name\_sync"} = sub { 493 splice @_, 1, 0, $cv, sub { $cv->croak ($_[0]{extra_description}) };
241 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
242 &$sub; 494 &$sub;
243 $cv->recv 495 $cv->recv
244 }; 496 };
245}
246 497
247=item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]]) 498 *{"$name\_"} = sub {
499 my ($ok, $err) = pop;
248 500
501 if (ARRAY:: eq ref $ok) {
502 ($ok, $err) = @$ok;
503 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
504 $err = sub { $ok->croak ($_[0]{extra_description}) };
505 } else {
506 my $bt = Carp::longmess "AnyEvent::FCP request $name";
507 Scalar::Util::weaken (my $self = $_[0]);
508 my $args = [@_]; shift @$args;
509 $err = sub {
510 if ($self->{on_failure}) {
511 $self->{on_failure}($self, $name, $args, $bt, $_[0]);
512 } else {
513 die "$_[0]{code_description} ($_[0]{extra_description})$bt";
514 }
515 };
516 }
517
518 $ok ||= $NOP_CB;
519
520 splice @_, 1, 0, $ok, $err;
521 &$sub;
522 };
523}
524
525=over 4
526
249=item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]]) 527=item $peers = $fcp->list_peers ([$with_metdata[, $with_volatile]])
250 528
251=cut 529=cut
252 530
253_txn list_peers => sub { 531_txn list_peers => sub {
254 my ($self, $cv, $with_metadata, $with_volatile) = @_; 532 my ($self, $ok, undef, $with_metadata, $with_volatile) = @_;
255 533
256 my @res; 534 my @res;
257 535
258 $self->send_msg (list_peers => 536 $self->send_msg (list_peers =>
259 with_metadata => $with_metadata ? "true" : "false", 537 with_metadata => $with_metadata ? "true" : "false",
260 with_volatile => $with_volatile ? "true" : "false", 538 with_volatile => $with_volatile ? "true" : "false",
261 id_cb => sub { 539 id_cb => sub {
262 my ($self, $type, $kv, $rdata) = @_; 540 my ($self, $type, $kv, $rdata) = @_;
263 541
264 if ($type eq "end_list_peers") { 542 if ($type eq "end_list_peers") {
265 $cv->(\@res); 543 $ok->(\@res);
266 1 544 1
267 } else { 545 } else {
268 push @res, $kv; 546 push @res, $kv;
269 0 547 0
270 } 548 }
271 }, 549 },
272 ); 550 );
273}; 551};
274 552
275=item $cv = $fcp->list_peer_notes ($node_identifier)
276
277=item $notes = $fcp->list_peer_notes_sync ($node_identifier) 553=item $notes = $fcp->list_peer_notes ($node_identifier)
278 554
279=cut 555=cut
280 556
281_txn list_peer_notes => sub { 557_txn list_peer_notes => sub {
282 my ($self, $cv, $node_identifier) = @_; 558 my ($self, $ok, undef, $node_identifier) = @_;
283 559
284 $self->send_msg (list_peer_notes => 560 $self->send_msg (list_peer_notes =>
285 node_identifier => $node_identifier, 561 node_identifier => $node_identifier,
286 id_cb => sub { 562 id_cb => sub {
287 my ($self, $type, $kv, $rdata) = @_; 563 my ($self, $type, $kv, $rdata) = @_;
288 564
289 $cv->($kv); 565 $ok->($kv);
290 1 566 1
291 }, 567 },
292 ); 568 );
293}; 569};
294 570
295=item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
296
297=item $fcp->watch_global_sync ($enabled[, $verbosity_mask]) 571=item $fcp->watch_global ($enabled[, $verbosity_mask])
298 572
299=cut 573=cut
300 574
301_txn watch_global => sub { 575_txn watch_global => sub {
302 my ($self, $cv, $enabled, $verbosity_mask) = @_; 576 my ($self, $ok, $err, $enabled, $verbosity_mask) = @_;
303 577
304 $self->send_msg (watch_global => 578 $self->send_msg (watch_global =>
305 enabled => $enabled ? "true" : "false", 579 enabled => $enabled ? "true" : "false",
306 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (), 580 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
307 ); 581 );
308 582
309 $cv->(); 583 $ok->();
310}; 584};
311 585
312=item $cv = $fcp->list_persistent_requests
313
314=item $reqs = $fcp->list_persistent_requests_sync 586=item $reqs = $fcp->list_persistent_requests
315 587
316=cut 588=cut
317 589
318_txn list_persistent_requests => sub { 590_txn list_persistent_requests => sub {
319 my ($self, $cv) = @_; 591 my ($self, $ok, $err) = @_;
320 592
593 $self->serialise (list_persistent_requests => sub {
594 my ($self, $guard) = @_;
595
321 my %res; 596 my @res;
322 597
323 $self->send_msg ("list_persistent_requests"); 598 $self->send_msg ("list_persistent_requests");
324 599
325 push @{ $self->{queue} }, sub { 600 $self->on (sub {
326 my ($self, $type, $kv, $rdata) = @_; 601 my ($self, $type, $kv, $rdata) = @_;
327 602
603 $guard if 0;
604
328 if ($type eq "end_list_persistent_requests") { 605 if ($type eq "end_list_persistent_requests") {
329 $cv->(\%res); 606 $ok->(\@res);
607 return;
608 } else {
609 my $id = $kv->{identifier};
610
611 if ($type =~ /^persistent_(get|put|put_dir)$/) {
612 push @res, [$type, $kv];
613 }
614 }
615
330 1 616 1
331 } else { 617 });
332 my $id = $kv->{identifier}; 618 });
619};
333 620
334 if ($type =~ /^persistent_(get|put|put_dir)$/) { 621=item $sync = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
335 $res{$id} = { 622
336 type => $1, 623Update either the C<client_token> or C<priority_class> of a request
337 %{ $res{$id} }, 624identified by C<$global> and C<$identifier>, depending on which of
625C<$client_token> and C<$priority_class> are not C<undef>.
626
627=cut
628
629_txn modify_persistent_request => sub {
630 my ($self, $ok, $err, $global, $identifier, $client_token, $priority_class) = @_;
631
632 $self->serialise ($identifier => sub {
633 my ($self, $guard) = @_;
634
635 $self->send_msg (modify_persistent_request =>
636 global => $global ? "true" : "false",
637 identifier => $identifier,
638 defined $client_token ? (client_token => $client_token ) : (),
639 defined $priority_class ? (priority_class => $priority_class) : (),
640 );
641
642 $self->on (sub {
643 my ($self, $type, $kv, @extra) = @_;
644
645 $guard if 0;
646
647 if ($kv->{identifier} eq $identifier) {
648 if ($type eq "persistent_request_modified") {
338 %$kv, 649 $ok->($kv);
650 return;
651 } elsif ($type eq "protocol_error") {
652 $err->($kv);
653 return;
339 }; 654 }
340 } elsif ($type eq "simple_progress") {
341 delete $kv->{pkt_type}; # save memory
342 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
343 } else {
344 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
345 } 655 }
656
346 0 657 1
347 } 658 });
348 }; 659 });
349}; 660};
350 661
351=item $cv = $fcp->remove_request ($global, $identifier)
352
353=item $status = $fcp->remove_request_sync ($global, $identifier)
354
355=cut
356
357_txn remove_request => sub {
358 my ($self, $cv, $global, $identifier) = @_;
359
360 $self->send_msg (remove_request =>
361 global => $global ? "true" : "false",
362 identifier => $identifier,
363 id_cb => sub {
364 my ($self, $type, $kv, $rdata) = @_;
365
366 $cv->($kv);
367 1
368 },
369 );
370};
371
372=item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
373
374=item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
375
376=cut
377
378_txn modify_persistent_request => sub {
379 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
380
381 $self->send_msg (modify_persistent_request =>
382 global => $global ? "true" : "false",
383 defined $client_token ? (client_token => $client_token ) : (),
384 defined $priority_class ? (priority_class => $priority_class) : (),
385 identifier => $identifier,
386 id_cb => sub {
387 my ($self, $type, $kv, $rdata) = @_;
388
389 $cv->($kv);
390 1
391 },
392 );
393};
394
395=item $cv = $fcp->get_plugin_info ($name, $detailed)
396
397=item $info = $fcp->get_plugin_info_sync ($name, $detailed) 662=item $info = $fcp->get_plugin_info ($name, $detailed)
398 663
399=cut 664=cut
400 665
401_txn get_plugin_info => sub { 666_txn get_plugin_info => sub {
402 my ($self, $cv, $name, $detailed) = @_; 667 my ($self, $ok, $err, $name, $detailed) = @_;
668
669 my $id = $self->identifier;
403 670
404 $self->send_msg (get_plugin_info => 671 $self->send_msg (get_plugin_info =>
672 identifier => $id,
405 plugin_name => $name, 673 plugin_name => $name,
406 detailed => $detailed ? "true" : "false", 674 detailed => $detailed ? "true" : "false",
407 id_cb => sub {
408 my ($self, $type, $kv, $rdata) = @_;
409
410 $cv->($kv);
411 1
412 },
413 ); 675 );
676 $self->on (sub {
677 my ($self, $type, $kv) = @_;
678
679 if ($kv->{identifier} eq $id) {
680 if ($type eq "get_plugin_info") {
681 $ok->($kv);
682 } else {
683 $err->($kv, $type);
684 }
685 return;
686 }
687
688 1
689 });
414}; 690};
415 691
416=item $cv = $fcp->client_get ($uri, $identifier, %kv)
417
418=item $status = $fcp->client_get_sync ($uri, $identifier, %kv) 692=item $status = $fcp->client_get ($uri, $identifier, %kv)
419 693
420%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>). 694%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
421 695
422ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries, 696ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
423priority_class, persistence, client_token, global, return_type, 697priority_class, persistence, client_token, global, return_type,
424binary_blob, allowed_mime_types, filename, temp_filename 698binary_blob, allowed_mime_types, filename, temp_filename
425 699
426=cut 700=cut
427 701
428_txn client_get => sub { 702_txn client_get => sub {
429 my ($self, $cv, $uri, $identifier, %kv) = @_; 703 my ($self, $ok, $err, $uri, $identifier, %kv) = @_;
430 704
705 $self->serialise ($identifier => sub {
706 my ($self, $guard) = @_;
707
431 $self->send_msg (client_get => 708 $self->send_msg (client_get =>
432 %kv, 709 %kv,
433 uri => $uri, 710 uri => $uri,
434 identifier => $identifier, 711 identifier => $identifier,
435 id_cb => sub { 712 );
713
714 $self->on (sub {
436 my ($self, $type, $kv, $rdata) = @_; 715 my ($self, $type, $kv, @extra) = @_;
437 716
717 $guard if 0;
718
719 if ($kv->{identifier} eq $identifier) {
720 if ($type eq "persistent_get") {
438 $cv->($kv); 721 $ok->($kv);
722 return;
723 } elsif ($type eq "protocol_error") {
724 $err->($kv);
725 return;
726 }
727 }
728
439 1 729 1
730 });
731 });
732};
733
734=item $status = $fcp->remove_request ($identifier[, $global])
735
736Remove the request with the given identifier. Returns true if successful,
737false on error.
738
739=cut
740
741_txn remove_request => sub {
742 my ($self, $ok, $err, $identifier, $global) = @_;
743
744 $self->serialise ($identifier => sub {
745 my ($self, $guard) = @_;
746
747 $self->send_msg (remove_request =>
748 identifier => $identifier,
749 global => $global ? "true" : "false",
750 );
751 $self->on (sub {
752 my ($self, $type, $kv, @extra) = @_;
753
754 $guard if 0;
755
756 if ($kv->{identifier} eq $identifier) {
757 if ($type eq "persistent_request_removed") {
758 $ok->(1);
759 return;
760 } elsif ($type eq "protocol_error") {
761 $err->($kv);
762 return;
763 }
764 }
765
766 1
767 });
768 });
769};
770
771=item ($can_read, $can_write) = $fcp->test_dda ($local_directory, $remote_directory, $want_read, $want_write))
772
773The DDA test in FCP is probably the single most broken protocol - only
774one directory test can be outstanding at any time, and some guessing and
775heuristics are involved in mangling the paths.
776
777This function combines C<TestDDARequest> and C<TestDDAResponse> in one
778request, handling file reading and writing as well, and tries very hard to
779do the right thing.
780
781Both C<$local_directory> and C<$remote_directory> must specify the same
782directory - C<$local_directory> is the directory path on the client (where
783L<AnyEvent::FCP> runs) and C<$remote_directory> is the directory path on
784the server (where the freenet node runs). When both are running on the
785same node, the paths are generally identical.
786
787C<$want_read> and C<$want_write> should be set to a true value when you
788want to read (get) files or write (put) files, respectively.
789
790On error, an exception is thrown. Otherwise, C<$can_read> and
791C<$can_write> indicate whether you can read or write to freenet via the
792directory.
793
794=cut
795
796_txn test_dda => sub {
797 my ($self, $ok, $err, $local, $remote, $want_read, $want_write) = @_;
798
799 $self->serialise (test_dda => sub {
800 my ($self, $guard) = @_;
801
802 $self->send_msg (test_dda_request =>
803 directory => $remote,
804 want_read_directory => $want_read ? "true" : "false",
805 want_write_directory => $want_write ? "true" : "false",
806 );
807 $self->on (sub {
808 my ($self, $type, $kv) = @_;
809
810 if ($type eq "test_dda_reply") {
811 # the filenames are all relative to the server-side directory,
812 # which might or might not match $remote anymore, so we
813 # need to rewrite the paths to be relative to $local
814 for my $k (qw(read_filename write_filename)) {
815 my $f = $kv->{$k};
816 for my $dir ($kv->{directory}, $remote) {
817 if ($dir eq substr $f, 0, length $dir) {
818 substr $f, 0, 1 + length $dir, "";
819 $kv->{$k} = $f;
820 last;
821 }
822 }
823 }
824
825 my %response = (directory => $remote);
826
827 if (length $kv->{read_filename}) {
828 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
829 sysread $fh, my $buf, -s $fh;
830 $response{read_content} = $buf;
831 }
832 }
833
834 if (length $kv->{write_filename}) {
835 if (open my $fh, ">:raw", "$local/$kv->{write_filename}") {
836 syswrite $fh, $kv->{content_to_write};
837 }
838 }
839
840 $self->send_msg (test_dda_response => %response);
841
842 $self->on (sub {
843 my ($self, $type, $kv) = @_;
844
845 $guard if 0; # reference
846
847 if ($type eq "test_dda_complete") {
848 $ok->(
849 $kv->{read_directory_allowed} eq "true",
850 $kv->{write_directory_allowed} eq "true",
851 );
852 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
853 $err->($kv->{extra_description});
854 return;
855 }
856
857 1
858 });
859
860 return;
861 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
862 $err->($kv);
863 return;
864 }
865
866 1
867 });
868 });
869};
870
871=back
872
873=head2 REQUEST CACHE
874
875The C<AnyEvent::FCP> class keeps a request cache, where it caches all
876information from requests.
877
878For these messages, it will store a copy of the key-value pairs, together with a C<type> slot,
879in C<< $fcp->{req}{$identifier} >>:
880
881 persistent_get
882 persistent_put
883 persistent_put_dir
884
885This message updates the stored data:
886
887 persistent_request_modified
888
889This message will remove this entry:
890
891 persistent_request_removed
892
893These messages get merged into the cache entry, under their
894type, i.e. a C<simple_progress> message will be stored in C<<
895$fcp->{req}{$identifier}{simple_progress} >>:
896
897 simple_progress # get/put
898
899 uri_generated # put
900 generated_metadata # put
901 started_compression # put
902 finished_compression # put
903 put_failed # put
904 put_fetchable # put
905 put_successful # put
906
907 sending_to_network # get
908 compatibility_mode # get
909 expected_hashes # get
910 expected_mime # get
911 expected_data_length # get
912 get_failed # get
913 data_found # get
914 enter_finite_cooldown # get
915
916In addition, an event (basically a fake message) of type C<request_changed> is generated
917on every change, which will be called as C<< $cb->($fcp, $kv, $type) >>, where C<$type>
918is the type of the original message triggering the change,
919
920To fill this cache with the global queue and keep it updated,
921call C<watch_global> to subscribe to updates, followed by
922C<list_persistent_requests>.
923
924 $fcp->watch_global_; # do not wait
925 $fcp->list_persistent_requests; # wait
926
927To get a better idea of what is stored in the cache, here is an example of
928what might be stored in C<< $fcp->{req}{"Frost-gpl.txt"} >>:
929
930 {
931 identifier => "Frost-gpl.txt",
932 uri => 'CHK@Fnx5kzdrfE,EImdzaVyEWl,AAIC--8/gpl.txt',
933 binary_blob => "false",
934 global => "true",
935 max_retries => -1,
936 max_size => 9223372036854775807,
937 persistence => "forever",
938 priority_class => 3,
939 real_time => "false",
940 return_type => "direct",
941 started => "true",
942 type => "persistent_get",
943 verbosity => 2147483647,
944 sending_to_network => {
945 identifier => "Frost-gpl.txt",
946 global => "true",
440 }, 947 },
441 ); 948 compatibility_mode => {
442}; 949 identifier => "Frost-gpl.txt",
443 950 definitive => "true",
444=back 951 dont_compress => "false",
952 global => "true",
953 max => "COMPAT_1255",
954 min => "COMPAT_1255",
955 },
956 expected_hashes => {
957 identifier => "Frost-gpl.txt",
958 global => "true",
959 hashes => {
960 ed2k => "d83596f5ee3b7...",
961 md5 => "e0894e4a2a6...",
962 sha1 => "...",
963 sha256 => "...",
964 sha512 => "...",
965 tth => "...",
966 },
967 },
968 expected_mime => {
969 identifier => "Frost-gpl.txt",
970 global => "true",
971 metadata => { content_type => "application/rar" },
972 },
973 expected_data_length => {
974 identifier => "Frost-gpl.txt",
975 data_length => 37576,
976 global => "true",
977 },
978 simple_progress => {
979 identifier => "Frost-gpl.txt",
980 failed => 0,
981 fatally_failed => 0,
982 finalized_total => "true",
983 global => "true",
984 last_progress => 1438639282628,
985 required => 372,
986 succeeded => 102,
987 total => 747,
988 },
989 data_found => {
990 identifier => "Frost-gpl.txt",
991 completion_time => 1438663354026,
992 data_length => 37576,
993 global => "true",
994 metadata => { content_type => "image/jpeg" },
995 startup_time => 1438657196167,
996 },
997 }
445 998
446=head1 EXAMPLE PROGRAM 999=head1 EXAMPLE PROGRAM
447 1000
448 use AnyEvent::FCP; 1001 use AnyEvent::FCP;
449 1002
450 my $fcp = new AnyEvent::FCP; 1003 my $fcp = new AnyEvent::FCP;
451 1004
452 # let us look at the global request list 1005 # let us look at the global request list
453 $fcp->watch_global (1, 0); 1006 $fcp->watch_global_ (1);
454 1007
455 # list them, synchronously 1008 # list them, synchronously
456 my $req = $fcp->list_persistent_requests_sync; 1009 my $req = $fcp->list_persistent_requests;
457 1010
458 # go through all requests 1011 # go through all requests
1012TODO
459 for my $req (values %$req) { 1013 for my $req (values %$req) {
460 # skip jobs not directly-to-disk 1014 # skip jobs not directly-to-disk
461 next unless $req->{return_type} eq "disk"; 1015 next unless $req->{return_type} eq "disk";
462 # skip jobs not issued by FProxy 1016 # skip jobs not issued by FProxy
463 next unless $req->{identifier} =~ /^FProxy:/; 1017 next unless $req->{identifier} =~ /^FProxy:/;
484 if 0.1 > rand; 1038 if 0.1 > rand;
485 } 1039 }
486 } 1040 }
487 1041
488 # see if the dummy plugin is loaded, to ensure all previous requests have finished. 1042 # see if the dummy plugin is loaded, to ensure all previous requests have finished.
489 $fcp->get_plugin_info_sync ("dummy"); 1043 $fcp->get_plugin_info ("dummy");
490 1044
491=head1 SEE ALSO 1045=head1 SEE ALSO
492 1046
493L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0>, L<Net::FCP>. 1047L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0>, L<Net::FCP>.
494 1048

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines