ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.8 by root, Fri Jun 18 16:59:13 2010 UTC vs.
Revision 1.28 by root, Thu Sep 9 00:49:06 2021 UTC

18=head1 DESCRIPTION 18=head1 DESCRIPTION
19 19
20This module implements the freenet client protocol version 2.0, as used by 20This module implements the freenet client protocol version 2.0, as used by
21freenet 0.7. See L<Net::FCP> for the earlier freenet 0.5 version. 21freenet 0.7. See L<Net::FCP> for the earlier freenet 0.5 version.
22 22
23See L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0> for a 23See L<https://wiki.freenetproject.org/FCP> for a description of what the
24description of what the messages do. 24messages do.
25 25
26The module uses L<AnyEvent> to find a suitable event module. 26The module uses L<AnyEvent> to find a suitable event module.
27 27
28Only very little is implemented, ask if you need more, and look at the 28Only very little is implemented, ask if you need more, and look at the
29example program later in this section. 29example program later in this section.
35 35
36 use AnyEvent::FCP; 36 use AnyEvent::FCP;
37 37
38 my $fcp = new AnyEvent::FCP; 38 my $fcp = new AnyEvent::FCP;
39 39
40 $fcp->watch_global_sync (1, 0); 40 $fcp->watch_global (1, 0);
41 my $req = $fcp->list_persistent_requests_sync; 41 my $req = $fcp->list_persistent_requests;
42 42
43TODO
43 for my $req (values %$req) { 44 for my $req (values %$req) {
44 if ($req->{filename} =~ /a/) { 45 if ($req->{filename} =~ /a/) {
45 $fcp->modify_persistent_request_sync (1, $req->{identifier}, undef, 0); 46 $fcp->modify_persistent_request (1, $req->{identifier}, undef, 0);
46 } 47 }
47 } 48 }
48 49
49=head2 IMPORT TAGS 50=head2 IMPORT TAGS
50 51
51Nothing much can be "imported" from this module right now. 52Nothing much can be "imported" from this module right now.
52 53
53=head2 THE AnyEvent::FCP CLASS 54=head1 THE AnyEvent::FCP CLASS
54 55
55=over 4 56=over 4
56 57
57=cut 58=cut
58 59
60 61
61use common::sense; 62use common::sense;
62 63
63use Carp; 64use Carp;
64 65
65our $VERSION = '0.3'; 66our $VERSION = 0.5;
66 67
67use Scalar::Util (); 68use Scalar::Util ();
68 69
69use AnyEvent; 70use AnyEvent;
70use AnyEvent::Handle; 71use AnyEvent::Handle;
72use AnyEvent::Util ();
73
74our %TOLC; # tolc cache
71 75
72sub touc($) { 76sub touc($) {
73 local $_ = shift; 77 local $_ = shift;
74 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime)(?:_|$))/\U$1/; 78 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
75 s/(?:^|_)(.)/\U$1/g; 79 s/(?:^|_)(.)/\U$1/g;
76 $_ 80 $_
77} 81}
78 82
79sub tolc($) { 83sub tolc($) {
80 local $_ = shift; 84 local $_ = shift;
81 1 while s/(SVK|CHK|URI|FCP|DS|MIME)([^_])/$1\_$2/i; 85 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/;
82 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME)/$1\_$2/i; 86 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/;
83 s/(?<=[a-z])(?=[A-Z])/_/g; 87 s/(?<=[a-z])(?=[A-Z])/_/g;
84 lc 88 lc
85} 89}
86 90
87=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 91=item $fcp = new AnyEvent::FCP key => value...;
88 92
89Create a new FCP connection to the given host and port (default 93Create a new FCP connection to the given host and port (default
90127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). 94127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
91 95
92If no C<name> was specified, then AnyEvent::FCP will generate a 96If no C<name> was specified, then AnyEvent::FCP will generate a
93(hopefully) unique client name for you. 97(hopefully) unique client name for you.
94 98
95You can install a progress callback that is being called with the AnyEvent::FCP 99The following keys can be specified (they are all optional):
96object, the type, a hashref with key-value pairs and a reference to any received data,
97for all unsolicited messages.
98 100
99Example: 101=over 4
100 102
101 sub progress_cb { 103=item name => $string
102 my ($self, $type, $kv, $rdata) = @_;
103 104
104 if ($type eq "simple_progress") { 105A unique name to identify this client. If none is specified, a randomly
105 warn "$kv->{identifier} $kv->{succeeded}/$kv->{required}\n"; 106generated name will be used.
106 } 107
107 } 108=item host => $hostname
109
110The hostname or IP address of the freenet node. Default is C<$ENV{FREDHOST}>
111or C<127.0.0.1>.
112
113=item port => $portnumber
114
115The port number of the FCP port. Default is C<$ENV{FREDPORT}> or C<9481>.
116
117=item timeout => $seconds
118
119The timeout, in seconds, after which a connection error is assumed when
120there is no activity. Default is C<7200>, i.e. two hours.
121
122=item keepalive => $seconds
123
124The interval, in seconds, at which keepalive messages will be
125sent. Default is C<540>, i.e. nine minutes.
126
127These keepalive messages are useful both to detect that a connection is
128no longer working and to keep any (home) routers from expiring their
129masquerading entry.
130
131=item on_eof => $callback->($fcp)
132
133Invoked when the underlying L<AnyEvent::Handle> signals EOF, currently
134regardless of whether the EOF was expected or not.
135
136=item on_error => $callback->($fcp, $message)
137
138Invoked on any (fatal) errors, such as unexpected connection close. The
139callback receives the FCP object and a textual error message.
140
141=item on_failure => $callback->($fcp, $type, $args, $backtrace, $error)
142
143Invoked when an FCP request fails that didn't have a failure callback. See
144L<FCP REQUESTS> for details.
145
146=back
108 147
109=cut 148=cut
110 149
111sub new { 150sub new {
112 my $class = shift; 151 my $class = shift;
152
153 my $rand = join "", map chr 0x21 + rand 94, 1..40; # ~ 262 bits entropy
154
113 my $self = bless { @_ }, $class; 155 my $self = bless {
114 156 host => $ENV{FREDHOST} || "127.0.0.1",
115 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 157 port => $ENV{FREDPORT} || 9481,
116 $self->{port} ||= $ENV{FREDPORT} || 9481; 158 timeout => 3600 * 2,
117 $self->{name} ||= time.rand.rand.rand; # lame 159 keepalive => 9 * 60,
118 $self->{timeout} ||= 3600*2; 160 name => time.rand.rand.rand, # lame
119 $self->{progress} ||= sub { }; 161 @_,
120 162 queue => [],
121 $self->{id} = "a0"; 163 req => {},
164 prefix => "..:aefcpid:$rand:",
165 idseq => "a0",
166 }, $class;
122 167
123 { 168 {
124 Scalar::Util::weaken (my $self = $self); 169 Scalar::Util::weaken (my $self = $self);
170
171 $self->{kw} = AE::timer $self->{keepalive}, $self->{keepalive}, sub {
172 $self->{hdl}->push_write ("\n");
173 };
174
175 our $ENDMESSAGE = qr<\012(EndMessage|Data)\012>;
176
177 # these are declared here for performance reasons
178 my ($k, $v, $type);
179 my $rdata;
180
181 my $on_read = sub {
182 my ($hdl) = @_;
183
184 # we only carve out whole messages here
185 while ($hdl->{rbuf} =~ /\012(EndMessage|Data)\012/) {
186 # remember end marker
187 $rdata = $1 eq "Data"
188 or $1 eq "EndMessage"
189 or return $self->fatal ("protocol error, expected message end, got $1\n");
190
191 my @lines = split /\012/, substr $hdl->{rbuf}, 0, $-[0];
192
193 substr $hdl->{rbuf}, 0, $+[0], ""; # remove pkg
194
195 $type = shift @lines;
196 $type = ($TOLC{$type} ||= tolc $type);
197
198 my %kv;
199
200 for (@lines) {
201 ($k, $v) = split /=/, $_, 2;
202 $k = ($TOLC{$k} ||= tolc $k);
203
204 if ($k =~ /\./) {
205 # generic, slow case
206 my @k = split /\./, $k;
207 my $ro = \\%kv;
208
209 while (@k) {
210 $k = shift @k;
211 if ($k =~ /^\d+$/) {
212 $ro = \$$ro->[$k];
213 } else {
214 $ro = \$$ro->{$k};
215 }
216 }
217
218 $$ro = $v;
219
220 next;
221 }
222
223 # special comon case, for performance only
224 $kv{$k} = $v;
225 }
226
227 if ($rdata) {
228 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
229 $rdata = \$_[1];
230 $self->recv ($type, \%kv, $rdata);
231 });
232
233 last; # do not tgry to parse more messages
234 } else {
235 $self->recv ($type, \%kv);
236 }
237 }
238 };
125 239
126 $self->{hdl} = new AnyEvent::Handle 240 $self->{hdl} = new AnyEvent::Handle
127 connect => [$self->{host} => $self->{port}], 241 connect => [$self->{host} => $self->{port}],
128 timeout => $self->{timeout}, 242 timeout => $self->{timeout},
243 on_read => $on_read,
244 on_eof => sub {
245 if ($self->{on_eof}) {
246 $self->{on_eof}($self);
247 } else {
248 $self->fatal ("EOF");
249 }
250 },
129 on_error => sub { 251 on_error => sub {
130 warn "@_\n";#d# 252 $self->fatal ($_[2]);
131 exit 1;
132 }, 253 },
133 on_read => sub { $self->on_read (@_) }, 254 ;
134 on_eof => $self->{on_eof} || sub { };
135 255
136 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 256 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
137 } 257 }
138 258
139 $self->send_msg ( 259 $self->send_msg (client_hello =>
140 client_hello =>
141 name => $self->{name}, 260 name => $self->{name},
142 expected_version => "2.0", 261 expected_version => "2.0",
143 ); 262 );
144 263
145 $self 264 $self
146} 265}
147 266
267sub fatal {
268 my ($self, $msg) = @_;
269
270 $self->{hdl}->push_shutdown if $self->{hdl};
271 delete $self->{kw};
272
273 if ($self->{on_error}) {
274 $self->{on_error}->($self, $msg);
275 } else {
276 die "AnyEvent::FCP($self->{host}:$self->{port}): $msg";
277 }
278}
279
280sub identifier {
281 $_[0]{prefix} . ++$_[0]{idseq}
282}
283
148sub send_msg { 284sub send_msg {
149 my ($self, $type, %kv) = @_; 285 my ($self, $type, %kv) = @_;
150 286
151 my $data = delete $kv{data}; 287 my $data = delete $kv{data};
152 288
153 if (exists $kv{id_cb}) { 289 if (exists $kv{id_cb}) {
154 my $id = $kv{identifier} || ++$self->{id}; 290 my $id = $kv{identifier} ||= $self->identifier;
155 $self->{id}{$id} = delete $kv{id_cb}; 291 $self->{id}{$id} = delete $kv{id_cb};
156 $kv{identifier} = $id;
157 } 292 }
158 293
159 my $msg = (touc $type) . "\012" 294 my $msg = (touc $type) . "\012"
160 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; 295 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
161
162 sub id {
163 my ($self) = @_;
164
165
166 }
167 296
168 if (defined $data) { 297 if (defined $data) {
169 $msg .= "DataLength=" . (length $data) . "\012" 298 $msg .= "DataLength=" . (length $data) . "\012"
170 . "Data\012$data"; 299 . "Data\012$data";
171 } else { 300 } else {
173 } 302 }
174 303
175 $self->{hdl}->push_write ($msg); 304 $self->{hdl}->push_write ($msg);
176} 305}
177 306
178sub on_read { 307sub on {
179 my ($self) = @_; 308 my ($self, $cb) = @_;
180 309
181 my $type; 310 # cb return undef - message eaten, remove cb
182 my %kv; 311 # cb return 0 - message eaten
183 my $rdata; 312 # cb return 1 - pass to next
184 313
185 my $done_cb = sub { 314 push @{ $self->{on} }, $cb;
186 $kv{pkt_type} = $type; 315}
187 316
188 if (my $cb = $self->{queue}[0]) { 317sub _push_queue {
189 $cb->($self, $type, \%kv, $rdata) 318 my ($self, $queue) = @_;
190 and shift @{ $self->{queue} }; 319
191 } else { 320 shift @$queue;
192 $self->default_recv ($type, \%kv, $rdata); 321 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
322 if @$queue;
323}
324
325# lock so only one $type (arbitrary string) is in flight,
326# to work around horribly misdesigned protocol.
327sub serialise {
328 my ($self, $type, $cb) = @_;
329
330 my $queue = $self->{serialise}{$type} ||= [];
331 push @$queue, $cb;
332 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
333 unless $#$queue;
334}
335
336# how to merge these types into $self->{persistent}
337our %PERSISTENT_TYPE = (
338 persistent_get => sub { %{ $_[1] } = (type => "persistent_get" , %{ $_[2] }) },
339 persistent_put => sub { %{ $_[1] } = (type => "persistent_put" , %{ $_[2] }) },
340 persistent_put_dir => sub { %{ $_[1] } = (type => "persistent_put_dir", %{ $_[2] }) },
341 persistent_request_modified => sub { %{ $_[1] } = (%{ $_[1] }, %{ $_[2] }) },
342 persistent_request_removed => sub { delete $_[0]{req}{$_[2]{identifier}} },
343
344 simple_progress => sub { $_[1]{simple_progress} = $_[2] }, # get/put
345
346 uri_generated => sub { $_[1]{uri_generated} = $_[2] }, # put
347 generated_metadata => sub { $_[1]{generated_metadata} = $_[2] }, # put
348 started_compression => sub { $_[1]{started_compression} = $_[2] }, # put
349 finished_compression => sub { $_[1]{finished_compression} = $_[2] }, # put
350 put_fetchable => sub { $_[1]{put_fetchable} = $_[2] }, # put
351 put_failed => sub { $_[1]{put_failed} = $_[2] }, # put
352 put_successful => sub { $_[1]{put_successful} = $_[2] }, # put
353
354 sending_to_network => sub { $_[1]{sending_to_network} = $_[2] }, # get
355 compatibility_mode => sub { $_[1]{compatibility_mode} = $_[2] }, # get
356 expected_hashes => sub { $_[1]{expected_hashes} = $_[2] }, # get
357 expected_mime => sub { $_[1]{expected_mime} = $_[2] }, # get
358 expected_data_length => sub { $_[1]{expected_data_length} = $_[2] }, # get
359 get_failed => sub { $_[1]{get_failed} = $_[2] }, # get
360 data_found => sub { $_[1]{data_found} = $_[2] }, # get
361 enter_finite_cooldown => sub { $_[1]{enter_finite_cooldown} = $_[2] }, # get
362);
363
364sub recv {
365 my ($self, $type, $kv, @extra) = @_;
366
367 if (my $cb = $PERSISTENT_TYPE{$type}) {
368 my $id = $kv->{identifier};
369 my $req = $_[0]{req}{$id} ||= {};
370 $cb->($self, $req, $kv);
371 $self->recv (request_changed => $kv, $type, @extra);
372 }
373
374 my $on = $self->{on};
375 for (0 .. $#$on) {
376 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
377 splice @$on, $_, 1 unless defined $res;
378 return;
193 } 379 }
194 }; 380 }
195 381
196 my $hdr_cb; $hdr_cb = sub { 382 if (my $cb = $self->{queue}[0]) {
197 if ($_[1] =~ /^([^=]+)=(.*)$/) { 383 $cb->($self, $type, $kv, @extra)
198 my ($k, $v) = ($1, $2); 384 and shift @{ $self->{queue} };
199 my @k = split /\./, tolc $k;
200 my $ro = \\%kv;
201
202 while (@k) {
203 my $k = shift @k;
204 if ($k =~ /^\d+$/) {
205 $ro = \$$ro->[$k];
206 } else {
207 $ro = \$$ro->{$k};
208 }
209 }
210
211 $$ro = $v;
212
213 $_[0]->push_read (line => $hdr_cb);
214 } elsif ($_[1] eq "Data") {
215 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
216 $rdata = \$_[1];
217 $done_cb->();
218 });
219 } elsif ($_[1] eq "EndMessage") {
220 $done_cb->();
221 } else { 385 } else {
222 die "protocol error, expected message end, got $_[1]\n";#d# 386 $self->default_recv ($type, $kv, @extra);
223 }
224 }; 387 }
225
226 $self->{hdl}->push_read (line => sub {
227 $type = tolc $_[1];
228 $_[0]->push_read (line => $hdr_cb);
229 });
230} 388}
231 389
232sub default_recv { 390sub default_recv {
233 my ($self, $type, $kv, $rdata) = @_; 391 my ($self, $type, $kv, $rdata) = @_;
234 392
235 if ($type eq "node_hello") { 393 if ($type eq "node_hello") {
236 $self->{node_hello} = $kv; 394 $self->{node_hello} = $kv;
237 } elsif (exists $self->{id}{$kv->{identifier}}) { 395 } elsif (exists $self->{id}{$kv->{identifier}}) {
238 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) 396 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
239 and delete $self->{id}{$kv->{identifier}}; 397 and delete $self->{id}{$kv->{identifier}};
240 } else {
241 &{ $self->{progress} };
242 } 398 }
243} 399}
400
401=back
402
403=head2 FCP REQUESTS
404
405The following methods implement various requests. Most of them map
406directory to the FCP message of the same name. The added benefit of
407these over sending requests yourself is that they handle the necessary
408serialisation, protocol quirks, and replies.
409
410All of them exist in two versions, the variant shown in this manpage, and
411a variant with an extra C<_> at the end, and an extra C<$cb> argument. The
412version as shown is I<synchronous> - it will wait for any replies, and
413either return the reply, or croak with an error. The underscore variant
414returns immediately and invokes one or more callbacks or condvars later.
415
416For example, the call
417
418 $info = $fcp->get_plugin_info ($name, $detailed);
419
420Also comes in this underscore variant:
421
422 $fcp->get_plugin_info_ ($name, $detailed, $cb);
423
424You can think of the underscore as a kind of continuation indicator - the
425normal function waits and returns with the data, the C<_> indicates that
426you pass the continuation yourself, and the continuation will be invoked
427with the results.
428
429This callback/continuation argument (C<$cb>) can come in three forms itself:
430
431=over 4
432
433=item A code reference (or rather anything not matching some other alternative)
434
435This code reference will be invoked with the result on success. On an
436error, it will invoke the C<on_failure> callback of the FCP object, or,
437if none was defined, will die (in the event loop) with a backtrace of the
438call site.
439
440This is a popular choice, but it makes handling errors hard - make sure
441you never generate protocol errors!
442
443In the failure case, if an C<on_failure> hook exists, it will be invoked
444with the FCP object, the request type (the name of the method, an arrayref
445containing the arguments from the original request invocation, a (textual)
446backtrace as generated by C<Carp::longmess>, and the error object from the
447server, in this order, e.g.:
448
449 on_failure => sub {
450 my ($fcp, $request_type, $orig_args, $backtrace, $error_object) = @_;
451
452 warn "FCP failure ($type @$args), $error_object->{code_description} ($error_object->{extra_description})$backtrace";
453 exit 1;
454 },
455
456=item A condvar (as returned by e.g. C<< AnyEvent->condvar >>)
457
458When a condvar is passed, it is sent (C<< $cv->send ($results) >>) the
459results when the request has finished. Should an error occur, the error
460will instead result in C<< $cv->croak ($error) >>.
461
462This is also a popular choice.
463
464=item An array with two callbacks C<[$success, $failure]>
465
466The C<$success> callback will be invoked with the results, while the
467C<$failure> callback will be invoked on any errors.
468
469The C<$failure> callback will be invoked with the error object from the
470server.
471
472=item C<undef>
473
474This is the same thing as specifying C<sub { }> as callback, i.e. on
475success, the results are ignored, while on failure, the C<on_failure> hook
476is invoked or the module dies with a backtrace.
477
478This is good for quick scripts, or when you really aren't interested in
479the results.
480
481=back
482
483=cut
484
485our $NOP_CB = sub { };
244 486
245sub _txn { 487sub _txn {
246 my ($name, $sub) = @_; 488 my ($name, $sub) = @_;
247 489
248 *{$name} = sub { 490 *{$name} = sub {
249 splice @_, 1, 0, (my $cv = AnyEvent->condvar); 491 my $cv = AE::cv;
250 &$sub;
251 $cv
252 };
253 492
254 *{"$name\_sync"} = sub { 493 splice @_, 1, 0, $cv, sub { $cv->croak ($_[0]{extra_description}) };
255 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
256 &$sub; 494 &$sub;
257 $cv->recv 495 $cv->recv
258 }; 496 };
259}
260 497
261=item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]]) 498 *{"$name\_"} = sub {
499 my ($ok, $err) = pop;
262 500
501 if (ARRAY:: eq ref $ok) {
502 ($ok, $err) = @$ok;
503 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
504 $err = sub { $ok->croak ($_[0]{extra_description}) };
505 } else {
506 my $bt = Carp::longmess "AnyEvent::FCP request $name";
507 Scalar::Util::weaken (my $self = $_[0]);
508 my $args = [@_]; shift @$args;
509 $err = sub {
510 if ($self->{on_failure}) {
511 $self->{on_failure}($self, $name, $args, $bt, $_[0]);
512 } else {
513 die "$_[0]{code_description} ($_[0]{extra_description})$bt";
514 }
515 };
516 }
517
518 $ok ||= $NOP_CB;
519
520 splice @_, 1, 0, $ok, $err;
521 &$sub;
522 };
523}
524
525=over 4
526
263=item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]]) 527=item $peers = $fcp->list_peers ([$with_metdata[, $with_volatile]])
264 528
265=cut 529=cut
266 530
267_txn list_peers => sub { 531_txn list_peers => sub {
268 my ($self, $cv, $with_metadata, $with_volatile) = @_; 532 my ($self, $ok, undef, $with_metadata, $with_volatile) = @_;
269 533
270 my @res; 534 my @res;
271 535
272 $self->send_msg (list_peers => 536 $self->send_msg (list_peers =>
273 with_metadata => $with_metadata ? "true" : "false", 537 with_metadata => $with_metadata ? "true" : "false",
274 with_volatile => $with_volatile ? "true" : "false", 538 with_volatile => $with_volatile ? "true" : "false",
275 id_cb => sub { 539 id_cb => sub {
276 my ($self, $type, $kv, $rdata) = @_; 540 my ($self, $type, $kv, $rdata) = @_;
277 541
278 if ($type eq "end_list_peers") { 542 if ($type eq "end_list_peers") {
279 $cv->(\@res); 543 $ok->(\@res);
280 1 544 1
281 } else { 545 } else {
282 push @res, $kv; 546 push @res, $kv;
283 0 547 0
284 } 548 }
285 }, 549 },
286 ); 550 );
287}; 551};
288 552
289=item $cv = $fcp->list_peer_notes ($node_identifier)
290
291=item $notes = $fcp->list_peer_notes_sync ($node_identifier) 553=item $notes = $fcp->list_peer_notes ($node_identifier)
292 554
293=cut 555=cut
294 556
295_txn list_peer_notes => sub { 557_txn list_peer_notes => sub {
296 my ($self, $cv, $node_identifier) = @_; 558 my ($self, $ok, undef, $node_identifier) = @_;
297 559
298 $self->send_msg (list_peer_notes => 560 $self->send_msg (list_peer_notes =>
299 node_identifier => $node_identifier, 561 node_identifier => $node_identifier,
300 id_cb => sub { 562 id_cb => sub {
301 my ($self, $type, $kv, $rdata) = @_; 563 my ($self, $type, $kv, $rdata) = @_;
302 564
303 $cv->($kv); 565 $ok->($kv);
304 1 566 1
305 }, 567 },
306 ); 568 );
307}; 569};
308 570
309=item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
310
311=item $fcp->watch_global_sync ($enabled[, $verbosity_mask]) 571=item $fcp->watch_global ($enabled[, $verbosity_mask])
312 572
313=cut 573=cut
314 574
315_txn watch_global => sub { 575_txn watch_global => sub {
316 my ($self, $cv, $enabled, $verbosity_mask) = @_; 576 my ($self, $ok, $err, $enabled, $verbosity_mask) = @_;
317 577
318 $self->send_msg (watch_global => 578 $self->send_msg (watch_global =>
319 enabled => $enabled ? "true" : "false", 579 enabled => $enabled ? "true" : "false",
320 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (), 580 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
321 ); 581 );
322 582
323 $cv->(); 583 $ok->();
324}; 584};
325 585
326=item $cv = $fcp->list_persistent_requests
327
328=item $reqs = $fcp->list_persistent_requests_sync 586=item $reqs = $fcp->list_persistent_requests
329 587
330=cut 588=cut
331 589
332_txn list_persistent_requests => sub { 590_txn list_persistent_requests => sub {
333 my ($self, $cv) = @_; 591 my ($self, $ok, $err) = @_;
334 592
593 $self->serialise (list_persistent_requests => sub {
594 my ($self, $guard) = @_;
595
335 my %res; 596 my @res;
336 597
337 $self->send_msg ("list_persistent_requests"); 598 $self->send_msg ("list_persistent_requests");
338 599
339 push @{ $self->{queue} }, sub { 600 $self->on (sub {
340 my ($self, $type, $kv, $rdata) = @_; 601 my ($self, $type, $kv, $rdata) = @_;
341 602
603 $guard if 0;
604
342 if ($type eq "end_list_persistent_requests") { 605 if ($type eq "end_list_persistent_requests") {
343 $cv->(\%res); 606 $ok->(\@res);
607 return;
608 } else {
609 my $id = $kv->{identifier};
610
611 if ($type =~ /^persistent_(get|put|put_dir)$/) {
612 push @res, [$type, $kv];
613 }
614 }
615
344 1 616 1
345 } else { 617 });
346 my $id = $kv->{identifier}; 618 });
619};
347 620
348 if ($type =~ /^persistent_(get|put|put_dir)$/) { 621=item $sync = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
349 $res{$id} = { 622
350 type => $1, 623Update either the C<client_token> or C<priority_class> of a request
351 %{ $res{$id} }, 624identified by C<$global> and C<$identifier>, depending on which of
625C<$client_token> and C<$priority_class> are not C<undef>.
626
627=cut
628
629_txn modify_persistent_request => sub {
630 my ($self, $ok, $err, $global, $identifier, $client_token, $priority_class) = @_;
631
632 $self->serialise ($identifier => sub {
633 my ($self, $guard) = @_;
634
635 $self->send_msg (modify_persistent_request =>
636 global => $global ? "true" : "false",
637 identifier => $identifier,
638 defined $client_token ? (client_token => $client_token ) : (),
639 defined $priority_class ? (priority_class => $priority_class) : (),
640 );
641
642 $self->on (sub {
643 my ($self, $type, $kv, @extra) = @_;
644
645 $guard if 0;
646
647 if ($kv->{identifier} eq $identifier) {
648 if ($type eq "persistent_request_modified") {
352 %$kv, 649 $ok->($kv);
650 return;
651 } elsif ($type eq "protocol_error") {
652 $err->($kv);
653 return;
353 }; 654 }
354 } elsif ($type eq "simple_progress") {
355 delete $kv->{pkt_type}; # save memory
356 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
357 } else {
358 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
359 } 655 }
656
360 0 657 1
361 } 658 });
362 }; 659 });
363}; 660};
364 661
365=item $cv = $fcp->remove_request ($global, $identifier)
366
367=item $status = $fcp->remove_request_sync ($global, $identifier)
368
369=cut
370
371_txn remove_request => sub {
372 my ($self, $cv, $global, $identifier) = @_;
373
374 $self->send_msg (remove_request =>
375 global => $global ? "true" : "false",
376 identifier => $identifier,
377 id_cb => sub {
378 my ($self, $type, $kv, $rdata) = @_;
379
380 $cv->($kv);
381 1
382 },
383 );
384};
385
386=item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
387
388=item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
389
390=cut
391
392_txn modify_persistent_request => sub {
393 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
394
395 $self->send_msg (modify_persistent_request =>
396 global => $global ? "true" : "false",
397 defined $client_token ? (client_token => $client_token ) : (),
398 defined $priority_class ? (priority_class => $priority_class) : (),
399 identifier => $identifier,
400 id_cb => sub {
401 my ($self, $type, $kv, $rdata) = @_;
402
403 $cv->($kv);
404 1
405 },
406 );
407};
408
409=item $cv = $fcp->get_plugin_info ($name, $detailed)
410
411=item $info = $fcp->get_plugin_info_sync ($name, $detailed) 662=item $info = $fcp->get_plugin_info ($name, $detailed)
412 663
413=cut 664=cut
414 665
415_txn get_plugin_info => sub { 666_txn get_plugin_info => sub {
416 my ($self, $cv, $name, $detailed) = @_; 667 my ($self, $ok, $err, $name, $detailed) = @_;
668
669 my $id = $self->identifier;
417 670
418 $self->send_msg (get_plugin_info => 671 $self->send_msg (get_plugin_info =>
672 identifier => $id,
419 plugin_name => $name, 673 plugin_name => $name,
420 detailed => $detailed ? "true" : "false", 674 detailed => $detailed ? "true" : "false",
421 id_cb => sub {
422 my ($self, $type, $kv, $rdata) = @_;
423
424 $cv->($kv);
425 1
426 },
427 ); 675 );
676 $self->on (sub {
677 my ($self, $type, $kv) = @_;
678
679 if ($kv->{identifier} eq $id) {
680 if ($type eq "get_plugin_info") {
681 $ok->($kv);
682 } else {
683 $err->($kv, $type);
684 }
685 return;
686 }
687
688 1
689 });
428}; 690};
429 691
430=item $cv = $fcp->client_get ($uri, $identifier, %kv)
431
432=item $status = $fcp->client_get_sync ($uri, $identifier, %kv) 692=item $status = $fcp->client_get ($uri, $identifier, %kv)
433 693
434%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>). 694%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
435 695
436ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries, 696ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
437priority_class, persistence, client_token, global, return_type, 697priority_class, persistence, client_token, global, return_type,
438binary_blob, allowed_mime_types, filename, temp_filename 698binary_blob, allowed_mime_types, filename, temp_filename
439 699
440=cut 700=cut
441 701
442_txn client_get => sub { 702_txn client_get => sub {
443 my ($self, $cv, $uri, $identifier, %kv) = @_; 703 my ($self, $ok, $err, $uri, $identifier, %kv) = @_;
444 704
705 $self->serialise ($identifier => sub {
706 my ($self, $guard) = @_;
707
445 $self->send_msg (client_get => 708 $self->send_msg (client_get =>
446 %kv, 709 %kv,
447 uri => $uri, 710 uri => $uri,
448 identifier => $identifier, 711 identifier => $identifier,
449 id_cb => sub { 712 );
713
714 $self->on (sub {
450 my ($self, $type, $kv, $rdata) = @_; 715 my ($self, $type, $kv, @extra) = @_;
451 716
717 $guard if 0;
718
719 if ($kv->{identifier} eq $identifier) {
720 if ($type eq "persistent_get") {
452 $cv->($kv); 721 $ok->($kv);
722 return;
723 } elsif ($type eq "protocol_error") {
724 $err->($kv);
725 return;
726 }
727 }
728
453 1 729 1
730 });
731 });
732};
733
734=item $status = $fcp->remove_request ($identifier[, $global])
735
736Remove the request with the given identifier. Returns true if successful,
737false on error.
738
739=cut
740
741_txn remove_request => sub {
742 my ($self, $ok, $err, $identifier, $global) = @_;
743
744 $self->serialise ($identifier => sub {
745 my ($self, $guard) = @_;
746
747 $self->send_msg (remove_request =>
748 identifier => $identifier,
749 global => $global ? "true" : "false",
750 );
751 $self->on (sub {
752 my ($self, $type, $kv, @extra) = @_;
753
754 $guard if 0;
755
756 if ($kv->{identifier} eq $identifier) {
757 if ($type eq "persistent_request_removed") {
758 $ok->(1);
759 return;
760 } elsif ($type eq "protocol_error") {
761 $err->($kv);
762 return;
763 }
764 }
765
766 1
767 });
768 });
769};
770
771=item ($can_read, $can_write) = $fcp->test_dda ($local_directory, $remote_directory, $want_read, $want_write))
772
773The DDA test in FCP is probably the single most broken protocol - only
774one directory test can be outstanding at any time, and some guessing and
775heuristics are involved in mangling the paths.
776
777This function combines C<TestDDARequest> and C<TestDDAResponse> in one
778request, handling file reading and writing as well, and tries very hard to
779do the right thing.
780
781Both C<$local_directory> and C<$remote_directory> must specify the same
782directory - C<$local_directory> is the directory path on the client (where
783L<AnyEvent::FCP> runs) and C<$remote_directory> is the directory path on
784the server (where the freenet node runs). When both are running on the
785same node, the paths are generally identical.
786
787C<$want_read> and C<$want_write> should be set to a true value when you
788want to read (get) files or write (put) files, respectively.
789
790On error, an exception is thrown. Otherwise, C<$can_read> and
791C<$can_write> indicate whether you can read or write to freenet via the
792directory.
793
794=cut
795
796_txn test_dda => sub {
797 my ($self, $ok, $err, $local, $remote, $want_read, $want_write) = @_;
798
799 $self->serialise (test_dda => sub {
800 my ($self, $guard) = @_;
801
802 $self->send_msg (test_dda_request =>
803 directory => $remote,
804 want_read_directory => $want_read ? "true" : "false",
805 want_write_directory => $want_write ? "true" : "false",
806 );
807 $self->on (sub {
808 my ($self, $type, $kv) = @_;
809
810 if ($type eq "test_dda_reply") {
811 # the filenames are all relative to the server-side directory,
812 # which might or might not match $remote anymore, so we
813 # need to rewrite the paths to be relative to $local
814 for my $k (qw(read_filename write_filename)) {
815 my $f = $kv->{$k};
816 for my $dir ($kv->{directory}, $remote) {
817 if ($dir eq substr $f, 0, length $dir) {
818 substr $f, 0, 1 + length $dir, "";
819 $kv->{$k} = $f;
820 last;
821 }
822 }
823 }
824
825 my %response = (directory => $remote);
826
827 if (length $kv->{read_filename}) {
828 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
829 sysread $fh, my $buf, -s $fh;
830 $response{read_content} = $buf;
831 }
832 }
833
834 if (length $kv->{write_filename}) {
835 if (open my $fh, ">:raw", "$local/$kv->{write_filename}") {
836 syswrite $fh, $kv->{content_to_write};
837 }
838 }
839
840 $self->send_msg (test_dda_response => %response);
841
842 $self->on (sub {
843 my ($self, $type, $kv) = @_;
844
845 $guard if 0; # reference
846
847 if ($type eq "test_dda_complete") {
848 $ok->(
849 $kv->{read_directory_allowed} eq "true",
850 $kv->{write_directory_allowed} eq "true",
851 );
852 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
853 $err->($kv->{extra_description});
854 return;
855 }
856
857 1
858 });
859
860 return;
861 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
862 $err->($kv);
863 return;
864 }
865
866 1
867 });
868 });
869};
870
871=back
872
873=head2 REQUEST CACHE
874
875The C<AnyEvent::FCP> class keeps a request cache, where it caches all
876information from requests.
877
878For these messages, it will store a copy of the key-value pairs, together with a C<type> slot,
879in C<< $fcp->{req}{$identifier} >>:
880
881 persistent_get
882 persistent_put
883 persistent_put_dir
884
885This message updates the stored data:
886
887 persistent_request_modified
888
889This message will remove this entry:
890
891 persistent_request_removed
892
893These messages get merged into the cache entry, under their
894type, i.e. a C<simple_progress> message will be stored in C<<
895$fcp->{req}{$identifier}{simple_progress} >>:
896
897 simple_progress # get/put
898
899 uri_generated # put
900 generated_metadata # put
901 started_compression # put
902 finished_compression # put
903 put_failed # put
904 put_fetchable # put
905 put_successful # put
906
907 sending_to_network # get
908 compatibility_mode # get
909 expected_hashes # get
910 expected_mime # get
911 expected_data_length # get
912 get_failed # get
913 data_found # get
914 enter_finite_cooldown # get
915
916In addition, an event (basically a fake message) of type C<request_changed> is generated
917on every change, which will be called as C<< $cb->($fcp, $kv, $type) >>, where C<$type>
918is the type of the original message triggering the change,
919
920To fill this cache with the global queue and keep it updated,
921call C<watch_global> to subscribe to updates, followed by
922C<list_persistent_requests>.
923
924 $fcp->watch_global_; # do not wait
925 $fcp->list_persistent_requests; # wait
926
927To get a better idea of what is stored in the cache, here is an example of
928what might be stored in C<< $fcp->{req}{"Frost-gpl.txt"} >>:
929
930 {
931 identifier => "Frost-gpl.txt",
932 uri => 'CHK@Fnx5kzdrfE,EImdzaVyEWl,AAIC--8/gpl.txt',
933 binary_blob => "false",
934 global => "true",
935 max_retries => -1,
936 max_size => 9223372036854775807,
937 persistence => "forever",
938 priority_class => 3,
939 real_time => "false",
940 return_type => "direct",
941 started => "true",
942 type => "persistent_get",
943 verbosity => 2147483647,
944 sending_to_network => {
945 identifier => "Frost-gpl.txt",
946 global => "true",
454 }, 947 },
455 ); 948 compatibility_mode => {
456}; 949 identifier => "Frost-gpl.txt",
457 950 definitive => "true",
458=back 951 dont_compress => "false",
952 global => "true",
953 max => "COMPAT_1255",
954 min => "COMPAT_1255",
955 },
956 expected_hashes => {
957 identifier => "Frost-gpl.txt",
958 global => "true",
959 hashes => {
960 ed2k => "d83596f5ee3b7...",
961 md5 => "e0894e4a2a6...",
962 sha1 => "...",
963 sha256 => "...",
964 sha512 => "...",
965 tth => "...",
966 },
967 },
968 expected_mime => {
969 identifier => "Frost-gpl.txt",
970 global => "true",
971 metadata => { content_type => "application/rar" },
972 },
973 expected_data_length => {
974 identifier => "Frost-gpl.txt",
975 data_length => 37576,
976 global => "true",
977 },
978 simple_progress => {
979 identifier => "Frost-gpl.txt",
980 failed => 0,
981 fatally_failed => 0,
982 finalized_total => "true",
983 global => "true",
984 last_progress => 1438639282628,
985 required => 372,
986 succeeded => 102,
987 total => 747,
988 },
989 data_found => {
990 identifier => "Frost-gpl.txt",
991 completion_time => 1438663354026,
992 data_length => 37576,
993 global => "true",
994 metadata => { content_type => "image/jpeg" },
995 startup_time => 1438657196167,
996 },
997 }
459 998
460=head1 EXAMPLE PROGRAM 999=head1 EXAMPLE PROGRAM
461 1000
462 use AnyEvent::FCP; 1001 use AnyEvent::FCP;
463 1002
464 my $fcp = new AnyEvent::FCP; 1003 my $fcp = new AnyEvent::FCP;
465 1004
466 # let us look at the global request list 1005 # let us look at the global request list
467 $fcp->watch_global (1, 0); 1006 $fcp->watch_global_ (1);
468 1007
469 # list them, synchronously 1008 # list them, synchronously
470 my $req = $fcp->list_persistent_requests_sync; 1009 my $req = $fcp->list_persistent_requests;
471 1010
472 # go through all requests 1011 # go through all requests
1012TODO
473 for my $req (values %$req) { 1013 for my $req (values %$req) {
474 # skip jobs not directly-to-disk 1014 # skip jobs not directly-to-disk
475 next unless $req->{return_type} eq "disk"; 1015 next unless $req->{return_type} eq "disk";
476 # skip jobs not issued by FProxy 1016 # skip jobs not issued by FProxy
477 next unless $req->{identifier} =~ /^FProxy:/; 1017 next unless $req->{identifier} =~ /^FProxy:/;
498 if 0.1 > rand; 1038 if 0.1 > rand;
499 } 1039 }
500 } 1040 }
501 1041
502 # see if the dummy plugin is loaded, to ensure all previous requests have finished. 1042 # see if the dummy plugin is loaded, to ensure all previous requests have finished.
503 $fcp->get_plugin_info_sync ("dummy"); 1043 $fcp->get_plugin_info ("dummy");
504 1044
505=head1 SEE ALSO 1045=head1 SEE ALSO
506 1046
507L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0>, L<Net::FCP>. 1047L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0>, L<Net::FCP>.
508 1048

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines