ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.4 by root, Wed Jul 29 09:25:46 2009 UTC vs.
Revision 1.18 by root, Thu Dec 3 19:07:57 2015 UTC

6 6
7 use AnyEvent::FCP; 7 use AnyEvent::FCP;
8 8
9 my $fcp = new AnyEvent::FCP; 9 my $fcp = new AnyEvent::FCP;
10 10
11# transactions return condvars 11 # transactions return condvars
12 my $lp_cv = $fcp->list_peers; 12 my $lp_cv = $fcp->list_peers;
13 my $pr_cv = $fcp->list_persistent_requests; 13 my $pr_cv = $fcp->list_persistent_requests;
14 14
15 my $peers = $lp_cv->recv; 15 my $peers = $lp_cv->recv;
16 my $reqs = $pr_cv->recv; 16 my $reqs = $pr_cv->recv;
26The module uses L<AnyEvent> to find a suitable event module. 26The module uses L<AnyEvent> to find a suitable event module.
27 27
28Only very little is implemented, ask if you need more, and look at the 28Only very little is implemented, ask if you need more, and look at the
29example program later in this section. 29example program later in this section.
30 30
31=head2 EXAMPLE
32
33This example fetches the download list and sets the priority of all files
34with "a" in their name to "emergency":
35
36 use AnyEvent::FCP;
37
38 my $fcp = new AnyEvent::FCP;
39
40 $fcp->watch_global (1, 0);
41 my $req = $fcp->list_persistent_requests;
42
43TODO
44 for my $req (values %$req) {
45 if ($req->{filename} =~ /a/) {
46 $fcp->modify_persistent_request (1, $req->{identifier}, undef, 0);
47 }
48 }
49
31=head2 IMPORT TAGS 50=head2 IMPORT TAGS
32 51
33Nothing much can be "imported" from this module right now. 52Nothing much can be "imported" from this module right now.
34 53
35=head2 THE AnyEvent::FCP CLASS 54=head1 THE AnyEvent::FCP CLASS
36 55
37=over 4 56=over 4
38 57
39=cut 58=cut
40 59
42 61
43use common::sense; 62use common::sense;
44 63
45use Carp; 64use Carp;
46 65
47our $VERSION = '0.2'; 66our $VERSION = 0.4;
48 67
49use Scalar::Util (); 68use Scalar::Util ();
50 69
51use AnyEvent; 70use AnyEvent;
52use AnyEvent::Handle; 71use AnyEvent::Handle;
72use AnyEvent::Util ();
73
74our %TOLC; # tolc cache
53 75
54sub touc($) { 76sub touc($) {
55 local $_ = shift; 77 local $_ = shift;
56 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime)(?:_|$))/\U$1/; 78 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
57 s/(?:^|_)(.)/\U$1/g; 79 s/(?:^|_)(.)/\U$1/g;
58 $_ 80 $_
59} 81}
60 82
61sub tolc($) { 83sub tolc($) {
62 local $_ = shift; 84 local $_ = shift;
63 1 while s/(SVK|CHK|URI|FCP|DS|MIME)([^_])/$1\_$2/i; 85 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/;
64 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME)/$1\_$2/i; 86 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/;
65 s/(?<=[a-z])(?=[A-Z])/_/g; 87 s/(?<=[a-z])(?=[A-Z])/_/g;
66 lc 88 lc
67} 89}
68 90
69=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 91=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, name => $name]
70 92
71Create a new FCP connection to the given host and port (default 93Create a new FCP connection to the given host and port (default
72127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). 94127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
73 95
74If no C<name> was specified, then AnyEvent::FCP will generate a 96If no C<name> was specified, then AnyEvent::FCP will generate a
75(hopefully) unique client name for you. 97(hopefully) unique client name for you.
76 98
77=cut 99=cut
78 100
79#TODO
80#You can install a progress callback that is being called with the AnyEvent::FCP
81#object, a txn object, the type of the transaction and the attributes. Use
82#it like this:
83#
84# sub progress_cb {
85# my ($self, $txn, $type, $attr) = @_;
86#
87# warn "progress<$txn,$type," . (join ":", %$attr) . ">\n";
88# }
89
90sub new { 101sub new {
91 my $class = shift; 102 my $class = shift;
103
104 my $rand = join "", map chr 0x21 + rand 94, 1..40; # ~ 262 bits entropy
105
92 my $self = bless { @_ }, $class; 106 my $self = bless {
93 107 host => $ENV{FREDHOST} || "127.0.0.1",
94 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 108 port => $ENV{FREDPORT} || 9481,
95 $self->{port} ||= $ENV{FREDPORT} || 9481; 109 timeout => 3600 * 2,
96 $self->{name} ||= time.rand.rand.rand; # lame 110 keepalive => 9 * 60,
97 $self->{timeout} ||= 600; 111 name => time.rand.rand.rand, # lame
98 112 @_,
99 $self->{id} = "a0"; 113 queue => [],
114 req => {},
115 prefix => "..:aefcpid:$rand:",
116 idseq => "a0",
117 }, $class;
100 118
101 { 119 {
102 Scalar::Util::weaken (my $self = $self); 120 Scalar::Util::weaken (my $self = $self);
121
122 $self->{kw} = AE::timer $self->{keepalive}, $self->{keepalive}, sub {
123 $self->{hdl}->push_write ("\n");
124 };
125
126 our $ENDMESSAGE = qr<\012(EndMessage|Data)\012>;
127
128 # these are declared here for performance reasons
129 my ($k, $v, $type);
130 my $rdata;
131
132 my $on_read = sub {
133 my ($hdl) = @_;
134
135 # we only carve out whole messages here
136 while ($hdl->{rbuf} =~ /\012(EndMessage|Data)\012/) {
137 # remember end marker
138 $rdata = $1 eq "Data"
139 or $1 eq "EndMessage"
140 or die "protocol error, expected message end, got $1\n";
141
142 my @lines = split /\012/, substr $hdl->{rbuf}, 0, $-[0];
143
144 substr $hdl->{rbuf}, 0, $+[0], ""; # remove pkg
145
146 $type = shift @lines;
147 $type = ($TOLC{$type} ||= tolc $type);
148
149 my %kv;
150
151 for (@lines) {
152 ($k, $v) = split /=/, $_, 2;
153 $k = ($TOLC{$k} ||= tolc $k);
154
155 if ($k =~ /\./) {
156 # generic, slow case
157 my @k = split /\./, $k;
158 my $ro = \\%kv;
159
160 while (@k) {
161 $k = shift @k;
162 if ($k =~ /^\d+$/) {
163 $ro = \$$ro->[$k];
164 } else {
165 $ro = \$$ro->{$k};
166 }
167 }
168
169 $$ro = $v;
170
171 next;
172 }
173
174 # special comon case, for performance only
175 $kv{$k} = $v;
176 }
177
178 if ($rdata) {
179 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
180 $rdata = \$_[1];
181 $self->recv ($type, \%kv, $rdata);
182 });
183
184 last; # do not tgry to parse more messages
185 } else {
186 $self->recv ($type, \%kv);
187 }
188 }
189 };
103 190
104 $self->{hdl} = new AnyEvent::Handle 191 $self->{hdl} = new AnyEvent::Handle
105 connect => [$self->{host} => $self->{port}], 192 connect => [$self->{host} => $self->{port}],
106 timeout => $self->{timeout}, 193 timeout => $self->{timeout},
107 on_error => sub { 194 on_error => sub {
108 warn "<@_>\n"; 195 warn "$self->{host}: $_[2]\n";#d#
109 exit 1; 196 exit 1;
110 }, 197 },
111 on_read => sub { $self->on_read (@_) }, 198 on_read => $on_read,
112 on_eof => $self->{on_eof} || sub { }; 199 on_eof => $self->{on_eof} || sub { },
200 ;
113 201
114 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 202 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
115 } 203 }
116 204
117 $self->send_msg ( 205 $self->send_msg (client_hello =>
118 client_hello =>
119 name => $self->{name}, 206 name => $self->{name},
120 expected_version => "2.0", 207 expected_version => "2.0",
121 ); 208 );
122 209
123 $self 210 $self
124} 211}
125 212
126#sub progress { 213sub identifier {
127# my ($self, $txn, $type, $attr) = @_; 214 $_[0]{prefix} . ++$_[0]{idseq}
128# 215}
129# $self->{progress}->($self, $txn, $type, $attr)
130# if $self->{progress};
131#}
132 216
133sub send_msg { 217sub send_msg {
134 my ($self, $type, %kv) = @_; 218 my ($self, $type, %kv) = @_;
135 219
136 my $data = delete $kv{data}; 220 my $data = delete $kv{data};
137 221
138 if (exists $kv{id_cb}) { 222 if (exists $kv{id_cb}) {
139 my $id = $kv{identifier} || ++$self->{id}; 223 my $id = $kv{identifier} ||= $self->identifier;
140 $self->{id}{$id} = delete $kv{id_cb}; 224 $self->{id}{$id} = delete $kv{id_cb};
141 $kv{identifier} = $id;
142 } 225 }
143 226
144 my $msg = (touc $type) . "\012" 227 my $msg = (touc $type) . "\012"
145 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; 228 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
146 229
158 } 241 }
159 242
160 $self->{hdl}->push_write ($msg); 243 $self->{hdl}->push_write ($msg);
161} 244}
162 245
163sub on_read { 246sub on {
164 my ($self) = @_; 247 my ($self, $cb) = @_;
165 248
166 my $type; 249 # cb return undef - message eaten, remove cb
167 my %kv; 250 # cb return 0 - message eaten
168 my $rdata; 251 # cb return 1 - pass to next
169 252
170 my $done_cb = sub { 253 push @{ $self->{on} }, $cb;
171 $kv{pkt_type} = $type; 254}
172 255
173 if (my $cb = $self->{queue}[0]) { 256sub _push_queue {
174 $cb->($self, $type, \%kv, $rdata) 257 my ($self, $queue) = @_;
175 and shift @{ $self->{queue} }; 258
176 } else { 259 shift @$queue;
177 $self->default_recv ($type, \%kv, $rdata); 260 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
261 if @$queue;
262}
263
264# lock so only one $type (arbitrary string) is in flight,
265# to work around horribly misdesigned protocol.
266sub serialise {
267 my ($self, $type, $cb) = @_;
268
269 my $queue = $self->{serialise}{$type} ||= [];
270 push @$queue, $cb;
271 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
272 unless $#$queue;
273}
274
275# how to merge these types into $self->{persistent}
276our %PERSISTENT_TYPE = (
277 persistent_get => sub { %{ $_[1] } = (type => "persistent_get" , %{ $_[2] }) },
278 persistent_put => sub { %{ $_[1] } = (type => "persistent_put" , %{ $_[2] }) },
279 persistent_put_dir => sub { %{ $_[1] } = (type => "persistent_put_dir", %{ $_[2] }) },
280 persistent_request_modified => sub { %{ $_[1] } = (%{ $_[1] }, %{ $_[2] }) },
281 persistent_request_removed => sub { delete $_[0]{req}{$_[2]{identifier}} },
282
283 simple_progress => sub { $_[1]{simple_progress} = $_[2] }, # get/put
284
285 uri_generated => sub { $_[1]{uri_generated} = $_[2] }, # put
286 generated_metadata => sub { $_[1]{generated_metadata} = $_[2] }, # put
287 started_compression => sub { $_[1]{started_compression} = $_[2] }, # put
288 finished_compression => sub { $_[1]{finished_compression} = $_[2] }, # put
289 put_fetchable => sub { $_[1]{put_fetchable} = $_[2] }, # put
290 put_failed => sub { $_[1]{put_failed} = $_[2] }, # put
291 put_successful => sub { $_[1]{put_successful} = $_[2] }, # put
292
293 sending_to_network => sub { $_[1]{sending_to_network} = $_[2] }, # get
294 compatibility_mode => sub { $_[1]{compatibility_mode} = $_[2] }, # get
295 expected_hashes => sub { $_[1]{expected_hashes} = $_[2] }, # get
296 expected_mime => sub { $_[1]{expected_mime} = $_[2] }, # get
297 expected_data_length => sub { $_[1]{expected_data_length} = $_[2] }, # get
298 get_failed => sub { $_[1]{get_failed} = $_[2] }, # get
299 data_found => sub { $_[1]{data_found} = $_[2] }, # get
300 enter_finite_cooldown => sub { $_[1]{enter_finite_cooldown} = $_[2] }, # get
301);
302
303sub recv {
304 my ($self, $type, $kv, @extra) = @_;
305
306 if (my $cb = $PERSISTENT_TYPE{$type}) {
307 my $id = $kv->{identifier};
308 my $req = $_[0]{req}{$id} ||= {};
309 $cb->($self, $req, $kv);
310 $self->recv (request_changed => $kv, $type, @extra);
311 }
312
313 my $on = $self->{on};
314 for (0 .. $#$on) {
315 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
316 splice @$on, $_, 1 unless defined $res;
317 return;
178 } 318 }
179 }; 319 }
180 320
181 my $hdr_cb; $hdr_cb = sub { 321 if (my $cb = $self->{queue}[0]) {
182 if ($_[1] =~ /^([^=]+)=(.*)$/) { 322 $cb->($self, $type, $kv, @extra)
183 my ($k, $v) = ($1, $2); 323 and shift @{ $self->{queue} };
184 my @k = split /\./, tolc $k;
185 my $ro = \\%kv;
186
187 while (@k) {
188 my $k = shift @k;
189 if ($k =~ /^\d+$/) {
190 $ro = \$$ro->[$k];
191 } else {
192 $ro = \$$ro->{$k};
193 }
194 }
195
196 $$ro = $v;
197
198 $_[0]->push_read (line => $hdr_cb);
199 } elsif ($_[1] eq "Data") {
200 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
201 $rdata = \$_[1];
202 $done_cb->();
203 });
204 } elsif ($_[1] eq "EndMessage") {
205 $done_cb->();
206 } else { 324 } else {
207 die "protocol error, expected message end, got $_[1]\n";#d# 325 $self->default_recv ($type, $kv, @extra);
208 }
209 }; 326 }
210
211 $self->{hdl}->push_read (line => sub {
212 $type = tolc $_[1];
213 $_[0]->push_read (line => $hdr_cb);
214 });
215} 327}
216 328
217sub default_recv { 329sub default_recv {
218 my ($self, $type, $kv, $rdata) = @_; 330 my ($self, $type, $kv, $rdata) = @_;
219 331
220 if ($type eq "node_hello") { 332 if ($type eq "node_hello") {
221 $self->{node_hello} = $kv; 333 $self->{node_hello} = $kv;
222 } elsif (exists $self->{id}{$kv->{identifier}}) { 334 } elsif (exists $self->{id}{$kv->{identifier}}) {
223 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) 335 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
224 and delete $self->{id}{$kv->{identifier}}; 336 and delete $self->{id}{$kv->{identifier}};
225 } else {
226 # on_warn
227 #warn "protocol warning (unexpected $type message)\n";
228 } 337 }
229} 338}
339
340=back
341
342=head2 FCP REQUESTS
343
344The following methods implement various requests. Most of them map
345directory to the FCP message of the same name. The added benefit of
346these over sending requests yourself is that they handle the necessary
347serialisation, protocol quirks, and replies.
348
349All of them exist in two versions, the variant shown in this manpage, and
350a variant with an extra C<_> at the end, and an extra C<$cb> argument. The
351version as shown is I<synchronous> - it will wait for any replies, and
352either return the reply, or croak with an error. The underscore variant
353returns immediately and invokes one or more callbacks or condvars later.
354
355For example, the call
356
357 $info = $fcp->get_plugin_info ($name, $detailed);
358
359Also comes in this underscore variant:
360
361 $fcp->get_plugin_info_ ($name, $detailed, $cb);
362
363You can thinbk of the underscore as a kind of continuation indicator - the
364normal function waits and returns with the data, the C<_> indicates that
365you pass the continuation yourself, and the continuation will be invoked
366with the results.
367
368This callback/continuation argument (C<$cb>) can come in three forms itself:
369
370=over 4
371
372=item A code reference (or rather anything not matching some other alternative)
373
374This code reference will be invoked with the result on success. On an
375error, it will die (in the event loop) with a backtrace of the call site.
376
377This is a popular choice, but it makes handling errors hard - make sure
378you never generate protocol errors!
379
380=item A condvar (as returned by e.g. C<< AnyEvent->condvar >>)
381
382When a condvar is passed, it is sent (C<< $cv->send ($results) >>) the
383results when the request has finished. Should an error occur, the error
384will instead result in C<< $cv->croak ($error) >>.
385
386This is also a popular choice.
387
388=item An array with two callbacks C<[$success, $failure]>
389
390The C<$success> callback will be invoked with the results, while the
391C<$failure> callback will be invoked on any errors.
392
393=item C<undef>
394
395This is the same thing as specifying C<sub { }> as callback, i.e. on
396success, the results are ignored, while on failure, you the module dies
397with a backtrace.
398
399This is good for quick scripts, or when you really aren't interested in
400the results.
401
402=back
403
404=cut
405
406our $NOP_CB = sub { };
230 407
231sub _txn { 408sub _txn {
232 my ($name, $sub) = @_; 409 my ($name, $sub) = @_;
233 410
234 *{$name} = sub { 411 *{$name} = sub {
235 splice @_, 1, 0, (my $cv = AnyEvent->condvar); 412 my $cv = AE::cv;
236 &$sub;
237 $cv
238 };
239 413
240 *{"$name\_sync"} = sub { 414 splice @_, 1, 0, $cv, sub { $cv->croak ($_[0]{extra_description}) };
241 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
242 &$sub; 415 &$sub;
243 $cv->recv 416 $cv->recv
244 }; 417 };
245}
246 418
247=item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]]) 419 *{"$name\_"} = sub {
420 my ($ok, $err) = pop;
248 421
422 if (ARRAY:: eq ref $ok) {
423 ($ok, $err) = @$ok;
424 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
425 $err = sub { $ok->croak ($_[0]{extra_description}) };
426 } else {
427 my $bt = Carp::longmess "";
428 $err = sub {
429 die "$_[0]{code_description} ($_[0]{extra_description})$bt";
430 };
431 }
432
433 $ok ||= $NOP_CB;
434
435 splice @_, 1, 0, $ok, $err;
436 &$sub;
437 };
438}
439
440=over 4
441
249=item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]]) 442=item $peers = $fcp->list_peers ([$with_metdata[, $with_volatile]])
250 443
251=cut 444=cut
252 445
253_txn list_peers => sub { 446_txn list_peers => sub {
254 my ($self, $cv, $with_metadata, $with_volatile) = @_; 447 my ($self, $ok, undef, $with_metadata, $with_volatile) = @_;
255 448
256 my @res; 449 my @res;
257 450
258 $self->send_msg (list_peers => 451 $self->send_msg (list_peers =>
259 with_metadata => $with_metadata ? "true" : "false", 452 with_metadata => $with_metadata ? "true" : "false",
260 with_volatile => $with_volatile ? "true" : "false", 453 with_volatile => $with_volatile ? "true" : "false",
261 id_cb => sub { 454 id_cb => sub {
262 my ($self, $type, $kv, $rdata) = @_; 455 my ($self, $type, $kv, $rdata) = @_;
263 456
264 if ($type eq "end_list_peers") { 457 if ($type eq "end_list_peers") {
265 $cv->(\@res); 458 $ok->(\@res);
266 1 459 1
267 } else { 460 } else {
268 push @res, $kv; 461 push @res, $kv;
269 0 462 0
270 } 463 }
271 }, 464 },
272 ); 465 );
273}; 466};
274 467
275=item $cv = $fcp->list_peer_notes ($node_identifier)
276
277=item $notes = $fcp->list_peer_notes_sync ($node_identifier) 468=item $notes = $fcp->list_peer_notes ($node_identifier)
278 469
279=cut 470=cut
280 471
281_txn list_peer_notes => sub { 472_txn list_peer_notes => sub {
282 my ($self, $cv, $node_identifier) = @_; 473 my ($self, $ok, undef, $node_identifier) = @_;
283 474
284 $self->send_msg (list_peer_notes => 475 $self->send_msg (list_peer_notes =>
285 node_identifier => $node_identifier, 476 node_identifier => $node_identifier,
286 id_cb => sub { 477 id_cb => sub {
287 my ($self, $type, $kv, $rdata) = @_; 478 my ($self, $type, $kv, $rdata) = @_;
288 479
289 $cv->($kv); 480 $ok->($kv);
290 1 481 1
291 }, 482 },
292 ); 483 );
293}; 484};
294 485
295=item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
296
297=item $fcp->watch_global_sync ($enabled[, $verbosity_mask]) 486=item $fcp->watch_global ($enabled[, $verbosity_mask])
298 487
299=cut 488=cut
300 489
301_txn watch_global => sub { 490_txn watch_global => sub {
302 my ($self, $cv, $enabled, $verbosity_mask) = @_; 491 my ($self, $ok, $err, $enabled, $verbosity_mask) = @_;
303 492
304 $self->send_msg (watch_global => 493 $self->send_msg (watch_global =>
305 enabled => $enabled ? "true" : "false", 494 enabled => $enabled ? "true" : "false",
306 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (), 495 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
307 ); 496 );
308 497
309 $cv->(); 498 $ok->();
310}; 499};
311 500
312=item $cv = $fcp->list_persistent_requests
313
314=item $reqs = $fcp->list_persistent_requests_sync 501=item $reqs = $fcp->list_persistent_requests
315 502
316=cut 503=cut
317 504
318_txn list_persistent_requests => sub { 505_txn list_persistent_requests => sub {
319 my ($self, $cv) = @_; 506 my ($self, $ok, $err) = @_;
320 507
508 $self->serialise (list_persistent_requests => sub {
509 my ($self, $guard) = @_;
510
321 my %res; 511 my @res;
322 512
323 $self->send_msg ("list_persistent_requests"); 513 $self->send_msg ("list_persistent_requests");
324 514
325 push @{ $self->{queue} }, sub { 515 $self->on (sub {
326 my ($self, $type, $kv, $rdata) = @_; 516 my ($self, $type, $kv, $rdata) = @_;
327 517
518 $guard if 0;
519
328 if ($type eq "end_list_persistent_requests") { 520 if ($type eq "end_list_persistent_requests") {
329 $cv->(\%res); 521 $ok->(\@res);
522 return;
523 } else {
524 my $id = $kv->{identifier};
525
526 if ($type =~ /^persistent_(get|put|put_dir)$/) {
527 push @res, [$type, $kv];
528 }
529 }
530
330 1 531 1
331 } else { 532 });
332 my $id = $kv->{identifier}; 533 });
534};
333 535
334 if ($type =~ /^persistent_(get|put|put_dir)$/) { 536=item $sync = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
335 $res{$id} = { 537
336 type => $1, 538Update either the C<client_token> or C<priority_class> of a request
337 %{ $res{$id} }, 539identified by C<$global> and C<$identifier>, depending on which of
540C<$client_token> and C<$priority_class> are not C<undef>.
541
542=cut
543
544_txn modify_persistent_request => sub {
545 my ($self, $ok, $err, $global, $identifier, $client_token, $priority_class) = @_;
546
547 $self->serialise ($identifier => sub {
548 my ($self, $guard) = @_;
549
550 $self->send_msg (modify_persistent_request =>
551 global => $global ? "true" : "false",
552 identifier => $identifier,
553 defined $client_token ? (client_token => $client_token ) : (),
554 defined $priority_class ? (priority_class => $priority_class) : (),
555 );
556
557 $self->on (sub {
558 my ($self, $type, $kv, @extra) = @_;
559
560 $guard if 0;
561
562 if ($kv->{identifier} eq $identifier) {
563 if ($type eq "persistent_request_modified") {
338 %$kv, 564 $ok->($kv);
565 return;
566 } elsif ($type eq "protocol_error") {
567 $err->($kv);
568 return;
339 }; 569 }
340 } elsif ($type eq "simple_progress") {
341 delete $kv->{pkt_type}; # save memory
342 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
343 } else {
344 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
345 } 570 }
571
346 0 572 1
347 } 573 });
348 }; 574 });
349}; 575};
350 576
351=item $cv = $fcp->remove_request ($global, $identifier)
352
353=item $status = $fcp->remove_request_sync ($global, $identifier)
354
355=cut
356
357_txn remove_request => sub {
358 my ($self, $cv, $global, $identifier) = @_;
359
360 $self->send_msg (remove_request =>
361 global => $global ? "true" : "false",
362 identifier => $identifier,
363 id_cb => sub {
364 my ($self, $type, $kv, $rdata) = @_;
365
366 $cv->($kv);
367 1
368 },
369 );
370};
371
372=item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
373
374=item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
375
376=cut
377
378_txn modify_persistent_request => sub {
379 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
380
381 $self->send_msg (modify_persistent_request =>
382 global => $global ? "true" : "false",
383 defined $client_token ? (client_token => $client_token ) : (),
384 defined $priority_class ? (priority_class => $priority_class) : (),
385 identifier => $identifier,
386 id_cb => sub {
387 my ($self, $type, $kv, $rdata) = @_;
388
389 $cv->($kv);
390 1
391 },
392 );
393};
394
395=item $cv = $fcp->get_plugin_info ($name, $detailed)
396
397=item $info = $fcp->get_plugin_info_sync ($name, $detailed) 577=item $info = $fcp->get_plugin_info ($name, $detailed)
398 578
399=cut 579=cut
400 580
401_txn get_plugin_info => sub { 581_txn get_plugin_info => sub {
402 my ($self, $cv, $name, $detailed) = @_; 582 my ($self, $ok, $err, $name, $detailed) = @_;
583
584 my $id = $self->identifier;
403 585
404 $self->send_msg (get_plugin_info => 586 $self->send_msg (get_plugin_info =>
587 identifier => $id,
405 plugin_name => $name, 588 plugin_name => $name,
406 detailed => $detailed ? "true" : "false", 589 detailed => $detailed ? "true" : "false",
407 id_cb => sub {
408 my ($self, $type, $kv, $rdata) = @_;
409
410 $cv->($kv);
411 1
412 },
413 ); 590 );
591 $self->on (sub {
592 my ($self, $type, $kv) = @_;
593
594 if ($kv->{identifier} eq $id) {
595 if ($type eq "get_plugin_info") {
596 $ok->($kv);
597 } else {
598 $err->($kv, $type);
599 }
600 return;
601 }
602
603 1
604 });
414}; 605};
415 606
416=item $cv = $fcp->client_get ($uri, $identifier, %kv)
417
418=item $status = $fcp->client_get_sync ($uri, $identifier, %kv) 607=item $status = $fcp->client_get ($uri, $identifier, %kv)
419 608
420%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>). 609%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
421 610
422ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries, 611ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
423priority_class, persistence, client_token, global, return_type, 612priority_class, persistence, client_token, global, return_type,
424binary_blob, allowed_mime_types, filename, temp_filename 613binary_blob, allowed_mime_types, filename, temp_filename
425 614
426=cut 615=cut
427 616
428_txn client_get => sub { 617_txn client_get => sub {
429 my ($self, $cv, $uri, $identifier, %kv) = @_; 618 my ($self, $ok, $err, $uri, $identifier, %kv) = @_;
430 619
620 $self->serialise ($identifier => sub {
621 my ($self, $guard) = @_;
622
431 $self->send_msg (client_get => 623 $self->send_msg (client_get =>
432 %kv, 624 %kv,
433 uri => $uri, 625 uri => $uri,
434 identifier => $identifier, 626 identifier => $identifier,
435 id_cb => sub { 627 );
628
629 $self->on (sub {
436 my ($self, $type, $kv, $rdata) = @_; 630 my ($self, $type, $kv, @extra) = @_;
437 631
632 $guard if 0;
633
634 if ($kv->{identifier} eq $identifier) {
635 if ($type eq "persistent_get") {
438 $cv->($kv); 636 $ok->($kv);
637 return;
638 } elsif ($type eq "protocol_error") {
639 $err->($kv);
640 return;
641 }
642 }
643
439 1 644 1
645 });
646 });
647};
648
649=item $status = $fcp->remove_request ($identifier[, $global])
650
651Remove the request with the given isdentifier. Returns true if successful,
652false on error.
653
654=cut
655
656_txn remove_request => sub {
657 my ($self, $ok, $err, $identifier, $global) = @_;
658
659 $self->serialise ($identifier => sub {
660 my ($self, $guard) = @_;
661
662 $self->send_msg (remove_request =>
663 identifier => $identifier,
664 global => $global ? "true" : "false",
665 );
666 $self->on (sub {
667 my ($self, $type, $kv, @extra) = @_;
668
669 $guard if 0;
670
671 if ($kv->{identifier} eq $identifier) {
672 if ($type eq "persistent_request_removed") {
673 $ok->(1);
674 return;
675 } elsif ($type eq "protocol_error") {
676 $err->($kv);
677 return;
678 }
679 }
680
681 1
682 });
683 });
684};
685
686=item ($can_read, $can_write) = $fcp->test_dda ($local_directory, $remote_directory, $want_read, $want_write))
687
688The DDA test in FCP is probably the single most broken protocol - only
689one directory test can be outstanding at any time, and some guessing and
690heuristics are involved in mangling the paths.
691
692This function combines C<TestDDARequest> and C<TestDDAResponse> in one
693request, handling file reading and writing as well, and tries very hard to
694do the right thing.
695
696Both C<$local_directory> and C<$remote_directory> must specify the same
697directory - C<$local_directory> is the directory path on the client (where
698L<AnyEvent::FCP> runs) and C<$remote_directory> is the directory path on
699the server (where the freenet node runs). When both are running on the
700same node, the paths are generally identical.
701
702C<$want_read> and C<$want_write> should be set to a true value when you
703want to read (get) files or write (put) files, respectively.
704
705On error, an exception is thrown. Otherwise, C<$can_read> and
706C<$can_write> indicate whether you can reaqd or write to freenet via the
707directory.
708
709=cut
710
711_txn test_dda => sub {
712 my ($self, $ok, $err, $local, $remote, $want_read, $want_write) = @_;
713
714 $self->serialise (test_dda => sub {
715 my ($self, $guard) = @_;
716
717 $self->send_msg (test_dda_request =>
718 directory => $remote,
719 want_read_directory => $want_read ? "true" : "false",
720 want_write_directory => $want_write ? "true" : "false",
721 );
722 $self->on (sub {
723 my ($self, $type, $kv) = @_;
724
725 if ($type eq "test_dda_reply") {
726 # the filenames are all relative to the server-side directory,
727 # which might or might not match $remote anymore, so we
728 # need to rewrite the paths to be relative to $local
729 for my $k (qw(read_filename write_filename)) {
730 my $f = $kv->{$k};
731 for my $dir ($kv->{directory}, $remote) {
732 if ($dir eq substr $f, 0, length $dir) {
733 substr $f, 0, 1 + length $dir, "";
734 $kv->{$k} = $f;
735 last;
736 }
737 }
738 }
739
740 my %response = (directory => $remote);
741
742 if (length $kv->{read_filename}) {
743 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
744 sysread $fh, my $buf, -s $fh;
745 $response{read_content} = $buf;
746 }
747 }
748
749 if (length $kv->{write_filename}) {
750 if (open my $fh, ">:raw", "$local/$kv->{write_filename}") {
751 syswrite $fh, $kv->{content_to_write};
752 }
753 }
754
755 $self->send_msg (test_dda_response => %response);
756
757 $self->on (sub {
758 my ($self, $type, $kv) = @_;
759
760 $guard if 0; # reference
761
762 if ($type eq "test_dda_complete") {
763 $ok->(
764 $kv->{read_directory_allowed} eq "true",
765 $kv->{write_directory_allowed} eq "true",
766 );
767 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
768 $err->($kv->{extra_description});
769 return;
770 }
771
772 1
773 });
774
775 return;
776 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
777 $err->($kv);
778 return;
779 }
780
781 1
782 });
783 });
784};
785
786=back
787
788=head2 REQUEST CACHE
789
790The C<AnyEvent::FCP> class keeps a request cache, where it caches all
791information from requests.
792
793For these messages, it will store a copy of the key-value pairs, together with a C<type> slot,
794in C<< $fcp->{req}{$identifier} >>:
795
796 persistent_get
797 persistent_put
798 persistent_put_dir
799
800This message updates the stored data:
801
802 persistent_request_modified
803
804This message will remove this entry:
805
806 persistent_request_removed
807
808These messages get merged into the cache entry, under their
809type, i.e. a C<simple_progress> message will be stored in C<<
810$fcp->{req}{$identifier}{simple_progress} >>:
811
812 simple_progress # get/put
813
814 uri_generated # put
815 generated_metadata # put
816 started_compression # put
817 finished_compression # put
818 put_failed # put
819 put_fetchable # put
820 put_successful # put
821
822 sending_to_network # get
823 compatibility_mode # get
824 expected_hashes # get
825 expected_mime # get
826 expected_data_length # get
827 get_failed # get
828 data_found # get
829 enter_finite_cooldown # get
830
831In addition, an event (basically a fake message) of type C<request_changed> is generated
832on every change, which will be called as C<< $cb->($fcp, $kv, $type) >>, where C<$type>
833is the type of the original message triggering the change,
834
835To fill this cache with the global queue and keep it updated,
836call C<watch_global> to subscribe to updates, followed by
837C<list_persistent_requests_sync>.
838
839 $fcp->watch_global_sync_; # do not wait
840 $fcp->list_persistent_requests; # wait
841
842To get a better idea of what is stored in the cache, here is an example of
843what might be stored in C<< $fcp->{req}{"Frost-gpl.txt"} >>:
844
845 {
846 identifier => "Frost-gpl.txt",
847 uri => 'CHK@Fnx5kzdrfE,EImdzaVyEWl,AAIC--8/gpl.txt',
848 binary_blob => "false",
849 global => "true",
850 max_retries => -1,
851 max_size => 9223372036854775807,
852 persistence => "forever",
853 priority_class => 3,
854 real_time => "false",
855 return_type => "direct",
856 started => "true",
857 type => "persistent_get",
858 verbosity => 2147483647,
859 sending_to_network => {
860 identifier => "Frost-gpl.txt",
861 global => "true",
440 }, 862 },
441 ); 863 compatibility_mode => {
442}; 864 identifier => "Frost-gpl.txt",
443 865 definitive => "true",
444=back 866 dont_compress => "false",
867 global => "true",
868 max => "COMPAT_1255",
869 min => "COMPAT_1255",
870 },
871 expected_hashes => {
872 identifier => "Frost-gpl.txt",
873 global => "true",
874 hashes => {
875 ed2k => "d83596f5ee3b7...",
876 md5 => "e0894e4a2a6...",
877 sha1 => "...",
878 sha256 => "...",
879 sha512 => "...",
880 tth => "...",
881 },
882 },
883 expected_mime => {
884 identifier => "Frost-gpl.txt",
885 global => "true",
886 metadata => { content_type => "application/rar" },
887 },
888 expected_data_length => {
889 identifier => "Frost-gpl.txt",
890 data_length => 37576,
891 global => "true",
892 },
893 simple_progress => {
894 identifier => "Frost-gpl.txt",
895 failed => 0,
896 fatally_failed => 0,
897 finalized_total => "true",
898 global => "true",
899 last_progress => 1438639282628,
900 required => 372,
901 succeeded => 102,
902 total => 747,
903 },
904 data_found => {
905 identifier => "Frost-gpl.txt",
906 completion_time => 1438663354026,
907 data_length => 37576,
908 global => "true",
909 metadata => { content_type => "image/jpeg" },
910 startup_time => 1438657196167,
911 },
912 }
445 913
446=head1 EXAMPLE PROGRAM 914=head1 EXAMPLE PROGRAM
447 915
448 use AnyEvent::FCP; 916 use AnyEvent::FCP;
449 917
450 my $fcp = new AnyEvent::FCP; 918 my $fcp = new AnyEvent::FCP;
451 919
452 # let us look at the global request list 920 # let us look at the global request list
453 $fcp->watch_global (1, 0); 921 $fcp->watch_global_ (1);
454 922
455 # list them, synchronously 923 # list them, synchronously
456 my $req = $fcp->list_persistent_requests_sync; 924 my $req = $fcp->list_persistent_requests;
457 925
458 # go through all requests 926 # go through all requests
927TODO
459 for my $req (values %$req) { 928 for my $req (values %$req) {
460 # skip jobs not directly-to-disk 929 # skip jobs not directly-to-disk
461 next unless $req->{return_type} eq "disk"; 930 next unless $req->{return_type} eq "disk";
462 # skip jobs not issued by FProxy 931 # skip jobs not issued by FProxy
463 next unless $req->{identifier} =~ /^FProxy:/; 932 next unless $req->{identifier} =~ /^FProxy:/;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines