ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.3 by root, Tue Jul 28 02:20:51 2009 UTC vs.
Revision 1.19 by root, Tue Jun 7 18:53:23 2016 UTC

6 6
7 use AnyEvent::FCP; 7 use AnyEvent::FCP;
8 8
9 my $fcp = new AnyEvent::FCP; 9 my $fcp = new AnyEvent::FCP;
10 10
11# transactions return condvars 11 # transactions return condvars
12 my $lp_cv = $fcp->list_peers; 12 my $lp_cv = $fcp->list_peers;
13 my $pr_cv = $fcp->list_persistent_requests; 13 my $pr_cv = $fcp->list_persistent_requests;
14 14
15 my $peers = $lp_cv->recv; 15 my $peers = $lp_cv->recv;
16 my $reqs = $pr_cv->recv; 16 my $reqs = $pr_cv->recv;
26The module uses L<AnyEvent> to find a suitable event module. 26The module uses L<AnyEvent> to find a suitable event module.
27 27
28Only very little is implemented, ask if you need more, and look at the 28Only very little is implemented, ask if you need more, and look at the
29example program later in this section. 29example program later in this section.
30 30
31=head2 EXAMPLE
32
33This example fetches the download list and sets the priority of all files
34with "a" in their name to "emergency":
35
36 use AnyEvent::FCP;
37
38 my $fcp = new AnyEvent::FCP;
39
40 $fcp->watch_global (1, 0);
41 my $req = $fcp->list_persistent_requests;
42
43TODO
44 for my $req (values %$req) {
45 if ($req->{filename} =~ /a/) {
46 $fcp->modify_persistent_request (1, $req->{identifier}, undef, 0);
47 }
48 }
49
31=head2 IMPORT TAGS 50=head2 IMPORT TAGS
32 51
33Nothing much can be "imported" from this module right now. 52Nothing much can be "imported" from this module right now.
34 53
35=head2 THE AnyEvent::FCP CLASS 54=head1 THE AnyEvent::FCP CLASS
36 55
37=over 4 56=over 4
38 57
39=cut 58=cut
40 59
42 61
43use common::sense; 62use common::sense;
44 63
45use Carp; 64use Carp;
46 65
47our $VERSION = '0.2'; 66our $VERSION = 0.4;
48 67
49use Scalar::Util (); 68use Scalar::Util ();
50 69
51use AnyEvent; 70use AnyEvent;
52use AnyEvent::Handle; 71use AnyEvent::Handle;
72use AnyEvent::Util ();
73
74our %TOLC; # tolc cache
53 75
54sub touc($) { 76sub touc($) {
55 local $_ = shift; 77 local $_ = shift;
56 1 while s/((?:^|_)(?:svk|chk|uri|fcp)(?:_|$))/\U$1/; 78 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
57 s/(?:^|_)(.)/\U$1/g; 79 s/(?:^|_)(.)/\U$1/g;
58 $_ 80 $_
59} 81}
60 82
61sub tolc($) { 83sub tolc($) {
62 local $_ = shift; 84 local $_ = shift;
63 1 while s/(SVK|CHK|URI|FCP)([^_])/$1\_$2/i; 85 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/;
64 1 while s/([^_])(SVK|CHK|URI|FCP)/$1\_$2/i; 86 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/;
65 s/(?<=[a-z])(?=[A-Z])/_/g; 87 s/(?<=[a-z])(?=[A-Z])/_/g;
66 lc 88 lc
67} 89}
68 90
69=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 91=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, name => $name]
70 92
71Create a new FCP connection to the given host and port (default 93Create a new FCP connection to the given host and port (default
72127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). 94127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
73 95
74If no C<name> was specified, then AnyEvent::FCP will generate a 96If no C<name> was specified, then AnyEvent::FCP will generate a
75(hopefully) unique client name for you. 97(hopefully) unique client name for you.
76 98
77=cut 99=cut
78 100
79#TODO
80#You can install a progress callback that is being called with the AnyEvent::FCP
81#object, a txn object, the type of the transaction and the attributes. Use
82#it like this:
83#
84# sub progress_cb {
85# my ($self, $txn, $type, $attr) = @_;
86#
87# warn "progress<$txn,$type," . (join ":", %$attr) . ">\n";
88# }
89
90sub new { 101sub new {
91 my $class = shift; 102 my $class = shift;
103
104 my $rand = join "", map chr 0x21 + rand 94, 1..40; # ~ 262 bits entropy
105
92 my $self = bless { @_ }, $class; 106 my $self = bless {
93 107 host => $ENV{FREDHOST} || "127.0.0.1",
94 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 108 port => $ENV{FREDPORT} || 9481,
95 $self->{port} ||= $ENV{FREDPORT} || 9481; 109 timeout => 3600 * 2,
96 $self->{name} ||= time.rand.rand.rand; # lame 110 keepalive => 9 * 60,
97 $self->{timeout} ||= 600; 111 name => time.rand.rand.rand, # lame
98 112 @_,
99 $self->{id} = "a0"; 113 queue => [],
114 req => {},
115 prefix => "..:aefcpid:$rand:",
116 idseq => "a0",
117 }, $class;
100 118
101 { 119 {
102 Scalar::Util::weaken (my $self = $self); 120 Scalar::Util::weaken (my $self = $self);
121
122 $self->{kw} = AE::timer $self->{keepalive}, $self->{keepalive}, sub {
123 $self->{hdl}->push_write ("\n");
124 };
125
126 our $ENDMESSAGE = qr<\012(EndMessage|Data)\012>;
127
128 # these are declared here for performance reasons
129 my ($k, $v, $type);
130 my $rdata;
131
132 my $on_read = sub {
133 my ($hdl) = @_;
134
135 # we only carve out whole messages here
136 while ($hdl->{rbuf} =~ /\012(EndMessage|Data)\012/) {
137 # remember end marker
138 $rdata = $1 eq "Data"
139 or $1 eq "EndMessage"
140 or return $self->fatal ("protocol error, expected message end, got $1\n");
141
142 my @lines = split /\012/, substr $hdl->{rbuf}, 0, $-[0];
143
144 substr $hdl->{rbuf}, 0, $+[0], ""; # remove pkg
145
146 $type = shift @lines;
147 $type = ($TOLC{$type} ||= tolc $type);
148
149 my %kv;
150
151 for (@lines) {
152 ($k, $v) = split /=/, $_, 2;
153 $k = ($TOLC{$k} ||= tolc $k);
154
155 if ($k =~ /\./) {
156 # generic, slow case
157 my @k = split /\./, $k;
158 my $ro = \\%kv;
159
160 while (@k) {
161 $k = shift @k;
162 if ($k =~ /^\d+$/) {
163 $ro = \$$ro->[$k];
164 } else {
165 $ro = \$$ro->{$k};
166 }
167 }
168
169 $$ro = $v;
170
171 next;
172 }
173
174 # special comon case, for performance only
175 $kv{$k} = $v;
176 }
177
178 if ($rdata) {
179 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
180 $rdata = \$_[1];
181 $self->recv ($type, \%kv, $rdata);
182 });
183
184 last; # do not tgry to parse more messages
185 } else {
186 $self->recv ($type, \%kv);
187 }
188 }
189 };
103 190
104 $self->{hdl} = new AnyEvent::Handle 191 $self->{hdl} = new AnyEvent::Handle
105 connect => [$self->{host} => $self->{port}], 192 connect => [$self->{host} => $self->{port}],
106 timeout => $self->{timeout}, 193 timeout => $self->{timeout},
194 on_read => $on_read,
195 on_eof => $self->{on_eof},
107 on_error => sub { 196 on_error => sub {
108 warn "<@_>\n"; 197 $self->fatal ($_[2]);
109 exit 1;
110 }, 198 },
111 on_read => sub { $self->on_read (@_) }, 199 ;
112 on_eof => $self->{on_eof} || sub { };
113 200
114 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 201 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
115 } 202 }
116 203
117 $self->send_msg ( 204 $self->send_msg (client_hello =>
118 client_hello =>
119 name => $self->{name}, 205 name => $self->{name},
120 expected_version => "2.0", 206 expected_version => "2.0",
121 ); 207 );
122 208
123 $self 209 $self
124} 210}
125 211
126#sub progress { 212sub fatal {
127# my ($self, $txn, $type, $attr) = @_; 213 my ($self, $msg) = @_;
128# 214
129# $self->{progress}->($self, $txn, $type, $attr) 215 $self->{hdl}->shutdown;
130# if $self->{progress}; 216 delete $self->{kw};
131#} 217
218 if ($self->{on_error}) {
219 $self->{on_error}->($msg);
220 } else {
221 die $msg;
222 }
223}
224
225sub identifier {
226 $_[0]{prefix} . ++$_[0]{idseq}
227}
132 228
133sub send_msg { 229sub send_msg {
134 my ($self, $type, %kv) = @_; 230 my ($self, $type, %kv) = @_;
135 231
136 my $data = delete $kv{data}; 232 my $data = delete $kv{data};
137 233
138 if (exists $kv{id_cb}) { 234 if (exists $kv{id_cb}) {
139 my $id = $kv{identifier} || ++$self->{id}; 235 my $id = $kv{identifier} ||= $self->identifier;
140 $self->{id}{$id} = delete $kv{id_cb}; 236 $self->{id}{$id} = delete $kv{id_cb};
141 $kv{identifier} = $id;
142 } 237 }
143 238
144 my $msg = (touc $type) . "\012" 239 my $msg = (touc $type) . "\012"
145 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; 240 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
146 241
158 } 253 }
159 254
160 $self->{hdl}->push_write ($msg); 255 $self->{hdl}->push_write ($msg);
161} 256}
162 257
163sub on_read { 258sub on {
164 my ($self) = @_; 259 my ($self, $cb) = @_;
165 260
166 my $type; 261 # cb return undef - message eaten, remove cb
167 my %kv; 262 # cb return 0 - message eaten
168 my $rdata; 263 # cb return 1 - pass to next
169 264
170 my $done_cb = sub { 265 push @{ $self->{on} }, $cb;
171 $kv{pkt_type} = $type; 266}
172 267
173 if (my $cb = $self->{queue}[0]) { 268sub _push_queue {
174 $cb->($self, $type, \%kv, $rdata) 269 my ($self, $queue) = @_;
175 and shift @{ $self->{queue} }; 270
176 } else { 271 shift @$queue;
177 $self->default_recv ($type, \%kv, $rdata); 272 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
273 if @$queue;
274}
275
276# lock so only one $type (arbitrary string) is in flight,
277# to work around horribly misdesigned protocol.
278sub serialise {
279 my ($self, $type, $cb) = @_;
280
281 my $queue = $self->{serialise}{$type} ||= [];
282 push @$queue, $cb;
283 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
284 unless $#$queue;
285}
286
287# how to merge these types into $self->{persistent}
288our %PERSISTENT_TYPE = (
289 persistent_get => sub { %{ $_[1] } = (type => "persistent_get" , %{ $_[2] }) },
290 persistent_put => sub { %{ $_[1] } = (type => "persistent_put" , %{ $_[2] }) },
291 persistent_put_dir => sub { %{ $_[1] } = (type => "persistent_put_dir", %{ $_[2] }) },
292 persistent_request_modified => sub { %{ $_[1] } = (%{ $_[1] }, %{ $_[2] }) },
293 persistent_request_removed => sub { delete $_[0]{req}{$_[2]{identifier}} },
294
295 simple_progress => sub { $_[1]{simple_progress} = $_[2] }, # get/put
296
297 uri_generated => sub { $_[1]{uri_generated} = $_[2] }, # put
298 generated_metadata => sub { $_[1]{generated_metadata} = $_[2] }, # put
299 started_compression => sub { $_[1]{started_compression} = $_[2] }, # put
300 finished_compression => sub { $_[1]{finished_compression} = $_[2] }, # put
301 put_fetchable => sub { $_[1]{put_fetchable} = $_[2] }, # put
302 put_failed => sub { $_[1]{put_failed} = $_[2] }, # put
303 put_successful => sub { $_[1]{put_successful} = $_[2] }, # put
304
305 sending_to_network => sub { $_[1]{sending_to_network} = $_[2] }, # get
306 compatibility_mode => sub { $_[1]{compatibility_mode} = $_[2] }, # get
307 expected_hashes => sub { $_[1]{expected_hashes} = $_[2] }, # get
308 expected_mime => sub { $_[1]{expected_mime} = $_[2] }, # get
309 expected_data_length => sub { $_[1]{expected_data_length} = $_[2] }, # get
310 get_failed => sub { $_[1]{get_failed} = $_[2] }, # get
311 data_found => sub { $_[1]{data_found} = $_[2] }, # get
312 enter_finite_cooldown => sub { $_[1]{enter_finite_cooldown} = $_[2] }, # get
313);
314
315sub recv {
316 my ($self, $type, $kv, @extra) = @_;
317
318 if (my $cb = $PERSISTENT_TYPE{$type}) {
319 my $id = $kv->{identifier};
320 my $req = $_[0]{req}{$id} ||= {};
321 $cb->($self, $req, $kv);
322 $self->recv (request_changed => $kv, $type, @extra);
323 }
324
325 my $on = $self->{on};
326 for (0 .. $#$on) {
327 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
328 splice @$on, $_, 1 unless defined $res;
329 return;
178 } 330 }
179 }; 331 }
180 332
181 my $hdr_cb; $hdr_cb = sub { 333 if (my $cb = $self->{queue}[0]) {
182 if ($_[1] =~ /^([^=]+)=(.*)$/) { 334 $cb->($self, $type, $kv, @extra)
183 my ($k, $v) = ($1, $2); 335 and shift @{ $self->{queue} };
184 my @k = split /\./, tolc $k;
185 my $ro = \\%kv;
186
187 while (@k) {
188 my $k = shift @k;
189 if ($k =~ /^\d+$/) {
190 $ro = \$$ro->[$k];
191 } else {
192 $ro = \$$ro->{$k};
193 }
194 }
195
196 $$ro = $v;
197
198 $_[0]->push_read (line => $hdr_cb);
199 } elsif ($_[1] eq "Data") {
200 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
201 $rdata = \$_[1];
202 $done_cb->();
203 });
204 } elsif ($_[1] eq "EndMessage") {
205 $done_cb->();
206 } else { 336 } else {
207 die "protocol error, expected message end, got $_[1]\n";#d# 337 $self->default_recv ($type, $kv, @extra);
208 }
209 }; 338 }
210
211 $self->{hdl}->push_read (line => sub {
212 $type = tolc $_[1];
213 $_[0]->push_read (line => $hdr_cb);
214 });
215} 339}
216 340
217sub default_recv { 341sub default_recv {
218 my ($self, $type, $kv, $rdata) = @_; 342 my ($self, $type, $kv, $rdata) = @_;
219 343
220 if ($type eq "node_hello") { 344 if ($type eq "node_hello") {
221 $self->{node_hello} = $kv; 345 $self->{node_hello} = $kv;
222 } elsif (exists $self->{id}{$kv->{identifier}}) { 346 } elsif (exists $self->{id}{$kv->{identifier}}) {
223 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) 347 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
224 and delete $self->{id}{$kv->{identifier}}; 348 and delete $self->{id}{$kv->{identifier}};
225 } else {
226 # on_warn
227 #warn "protocol warning (unexpected $type message)\n";
228 } 349 }
229} 350}
351
352=back
353
354=head2 FCP REQUESTS
355
356The following methods implement various requests. Most of them map
357directory to the FCP message of the same name. The added benefit of
358these over sending requests yourself is that they handle the necessary
359serialisation, protocol quirks, and replies.
360
361All of them exist in two versions, the variant shown in this manpage, and
362a variant with an extra C<_> at the end, and an extra C<$cb> argument. The
363version as shown is I<synchronous> - it will wait for any replies, and
364either return the reply, or croak with an error. The underscore variant
365returns immediately and invokes one or more callbacks or condvars later.
366
367For example, the call
368
369 $info = $fcp->get_plugin_info ($name, $detailed);
370
371Also comes in this underscore variant:
372
373 $fcp->get_plugin_info_ ($name, $detailed, $cb);
374
375You can thinbk of the underscore as a kind of continuation indicator - the
376normal function waits and returns with the data, the C<_> indicates that
377you pass the continuation yourself, and the continuation will be invoked
378with the results.
379
380This callback/continuation argument (C<$cb>) can come in three forms itself:
381
382=over 4
383
384=item A code reference (or rather anything not matching some other alternative)
385
386This code reference will be invoked with the result on success. On an
387error, it will die (in the event loop) with a backtrace of the call site.
388
389This is a popular choice, but it makes handling errors hard - make sure
390you never generate protocol errors!
391
392=item A condvar (as returned by e.g. C<< AnyEvent->condvar >>)
393
394When a condvar is passed, it is sent (C<< $cv->send ($results) >>) the
395results when the request has finished. Should an error occur, the error
396will instead result in C<< $cv->croak ($error) >>.
397
398This is also a popular choice.
399
400=item An array with two callbacks C<[$success, $failure]>
401
402The C<$success> callback will be invoked with the results, while the
403C<$failure> callback will be invoked on any errors.
404
405=item C<undef>
406
407This is the same thing as specifying C<sub { }> as callback, i.e. on
408success, the results are ignored, while on failure, you the module dies
409with a backtrace.
410
411This is good for quick scripts, or when you really aren't interested in
412the results.
413
414=back
415
416=cut
417
418our $NOP_CB = sub { };
230 419
231sub _txn { 420sub _txn {
232 my ($name, $sub) = @_; 421 my ($name, $sub) = @_;
233 422
234 *{$name} = sub { 423 *{$name} = sub {
235 splice @_, 1, 0, (my $cv = AnyEvent->condvar); 424 my $cv = AE::cv;
236 &$sub;
237 $cv
238 };
239 425
240 *{"$name\_sync"} = sub { 426 splice @_, 1, 0, $cv, sub { $cv->croak ($_[0]{extra_description}) };
241 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
242 &$sub; 427 &$sub;
243 $cv->recv 428 $cv->recv
244 }; 429 };
245}
246 430
247=item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]]) 431 *{"$name\_"} = sub {
432 my ($ok, $err) = pop;
248 433
434 if (ARRAY:: eq ref $ok) {
435 ($ok, $err) = @$ok;
436 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
437 $err = sub { $ok->croak ($_[0]{extra_description}) };
438 } else {
439 my $bt = Carp::longmess "";
440 $err = sub {
441 die "$_[0]{code_description} ($_[0]{extra_description})$bt";
442 };
443 }
444
445 $ok ||= $NOP_CB;
446
447 splice @_, 1, 0, $ok, $err;
448 &$sub;
449 };
450}
451
452=over 4
453
249=item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]]) 454=item $peers = $fcp->list_peers ([$with_metdata[, $with_volatile]])
250 455
251=cut 456=cut
252 457
253_txn list_peers => sub { 458_txn list_peers => sub {
254 my ($self, $cv, $with_metadata, $with_volatile) = @_; 459 my ($self, $ok, undef, $with_metadata, $with_volatile) = @_;
255 460
256 my @res; 461 my @res;
257 462
258 $self->send_msg (list_peers => 463 $self->send_msg (list_peers =>
259 with_metadata => $with_metadata ? "true" : "false", 464 with_metadata => $with_metadata ? "true" : "false",
260 with_volatile => $with_volatile ? "true" : "false", 465 with_volatile => $with_volatile ? "true" : "false",
261 id_cb => sub { 466 id_cb => sub {
262 my ($self, $type, $kv, $rdata) = @_; 467 my ($self, $type, $kv, $rdata) = @_;
263 468
264 if ($type eq "end_list_peers") { 469 if ($type eq "end_list_peers") {
265 $cv->(\@res); 470 $ok->(\@res);
266 1 471 1
267 } else { 472 } else {
268 push @res, $kv; 473 push @res, $kv;
269 0 474 0
270 } 475 }
271 }, 476 },
272 ); 477 );
273}; 478};
274 479
275=item $cv = $fcp->list_peer_notes ($node_identifier)
276
277=item $notes = $fcp->list_peer_notes_sync ($node_identifier) 480=item $notes = $fcp->list_peer_notes ($node_identifier)
278 481
279=cut 482=cut
280 483
281_txn list_peer_notes => sub { 484_txn list_peer_notes => sub {
282 my ($self, $cv, $node_identifier) = @_; 485 my ($self, $ok, undef, $node_identifier) = @_;
283 486
284 $self->send_msg (list_peer_notes => 487 $self->send_msg (list_peer_notes =>
285 node_identifier => $node_identifier, 488 node_identifier => $node_identifier,
286 id_cb => sub { 489 id_cb => sub {
287 my ($self, $type, $kv, $rdata) = @_; 490 my ($self, $type, $kv, $rdata) = @_;
288 491
289 $cv->($kv); 492 $ok->($kv);
290 1 493 1
291 }, 494 },
292 ); 495 );
293}; 496};
294 497
295=item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
296
297=item $fcp->watch_global_sync ($enabled[, $verbosity_mask]) 498=item $fcp->watch_global ($enabled[, $verbosity_mask])
298 499
299=cut 500=cut
300 501
301_txn watch_global => sub { 502_txn watch_global => sub {
302 my ($self, $cv, $enabled, $verbosity_mask) = @_; 503 my ($self, $ok, $err, $enabled, $verbosity_mask) = @_;
303 504
304 $self->send_msg (watch_global => 505 $self->send_msg (watch_global =>
305 enabled => $enabled ? "true" : "false", 506 enabled => $enabled ? "true" : "false",
306 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (), 507 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
307 ); 508 );
308 509
309 $cv->(); 510 $ok->();
310}; 511};
311 512
312=item $cv = $fcp->list_persistent_requests
313
314=item $reqs = $fcp->list_persistent_requests_sync 513=item $reqs = $fcp->list_persistent_requests
315 514
316=cut 515=cut
317 516
318_txn list_persistent_requests => sub { 517_txn list_persistent_requests => sub {
319 my ($self, $cv) = @_; 518 my ($self, $ok, $err) = @_;
320 519
520 $self->serialise (list_persistent_requests => sub {
521 my ($self, $guard) = @_;
522
321 my %res; 523 my @res;
322 524
323 $self->send_msg ("list_persistent_requests"); 525 $self->send_msg ("list_persistent_requests");
324 526
325 push @{ $self->{queue} }, sub { 527 $self->on (sub {
326 my ($self, $type, $kv, $rdata) = @_; 528 my ($self, $type, $kv, $rdata) = @_;
327 529
530 $guard if 0;
531
328 if ($type eq "end_list_persistent_requests") { 532 if ($type eq "end_list_persistent_requests") {
329 $cv->(\%res); 533 $ok->(\@res);
534 return;
535 } else {
536 my $id = $kv->{identifier};
537
538 if ($type =~ /^persistent_(get|put|put_dir)$/) {
539 push @res, [$type, $kv];
540 }
541 }
542
330 1 543 1
331 } else { 544 });
332 my $id = $kv->{identifier}; 545 });
546};
333 547
334 if ($type =~ /^persistent_(get|put|put_dir)$/) { 548=item $sync = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
335 $res{$id} = { 549
336 type => $1, 550Update either the C<client_token> or C<priority_class> of a request
337 %{ $res{$id} }, 551identified by C<$global> and C<$identifier>, depending on which of
552C<$client_token> and C<$priority_class> are not C<undef>.
553
554=cut
555
556_txn modify_persistent_request => sub {
557 my ($self, $ok, $err, $global, $identifier, $client_token, $priority_class) = @_;
558
559 $self->serialise ($identifier => sub {
560 my ($self, $guard) = @_;
561
562 $self->send_msg (modify_persistent_request =>
563 global => $global ? "true" : "false",
564 identifier => $identifier,
565 defined $client_token ? (client_token => $client_token ) : (),
566 defined $priority_class ? (priority_class => $priority_class) : (),
567 );
568
569 $self->on (sub {
570 my ($self, $type, $kv, @extra) = @_;
571
572 $guard if 0;
573
574 if ($kv->{identifier} eq $identifier) {
575 if ($type eq "persistent_request_modified") {
338 %$kv, 576 $ok->($kv);
577 return;
578 } elsif ($type eq "protocol_error") {
579 $err->($kv);
580 return;
339 }; 581 }
340 } elsif ($type eq "simple_progress") {
341 delete $kv->{pkt_type}; # save memory
342 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
343 } else {
344 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
345 } 582 }
583
346 0 584 1
347 } 585 });
348 }; 586 });
349}; 587};
350 588
351=item $cv = $fcp->remove_request ($global, $identifier)
352
353=item $status = $fcp->remove_request_sync ($global, $identifier)
354
355=cut
356
357_txn remove_request => sub {
358 my ($self, $cv, $global, $identifier) = @_;
359
360 $self->send_msg (remove_request =>
361 global => $global ? "true" : "false",
362 identifier => $identifier,
363 id_cb => sub {
364 my ($self, $type, $kv, $rdata) = @_;
365
366 $cv->($kv);
367 1
368 },
369 );
370
371 $cv->();
372};
373
374=item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
375
376=item $status = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
377
378=cut
379
380_txn modify_persistent_request => sub {
381 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
382
383 $self->send_msg (modify_persistent_request =>
384 global => $global ? "true" : "false",
385 identifier => $identifier,
386 defined $client_token ? (client_token => $client_token ) : (),
387 defined $priority_class ? (priority_class => $priority_class) : (),
388 id_cb => sub {
389 my ($self, $type, $kv, $rdata) = @_;
390
391 $cv->($kv);
392 1
393 },
394 );
395
396 $cv->();
397};
398
399=item $cv = $fcp->get_plugin_info ($name, $detailed)
400
401=item $info = $fcp->get_plugin_info_sync ($name, $detailed) 589=item $info = $fcp->get_plugin_info ($name, $detailed)
402 590
403=cut 591=cut
404 592
405_txn get_plugin_info => sub { 593_txn get_plugin_info => sub {
406 my ($self, $cv, $name, $detailed) = @_; 594 my ($self, $ok, $err, $name, $detailed) = @_;
595
596 my $id = $self->identifier;
407 597
408 $self->send_msg (get_plugin_info => 598 $self->send_msg (get_plugin_info =>
599 identifier => $id,
409 plugin_name => $name, 600 plugin_name => $name,
410 detailed => $detailed ? "true" : "false", 601 detailed => $detailed ? "true" : "false",
411 id_cb => sub { 602 );
603 $self->on (sub {
604 my ($self, $type, $kv) = @_;
605
606 if ($kv->{identifier} eq $id) {
607 if ($type eq "get_plugin_info") {
608 $ok->($kv);
609 } else {
610 $err->($kv, $type);
611 }
612 return;
613 }
614
615 1
616 });
617};
618
619=item $status = $fcp->client_get ($uri, $identifier, %kv)
620
621%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
622
623ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
624priority_class, persistence, client_token, global, return_type,
625binary_blob, allowed_mime_types, filename, temp_filename
626
627=cut
628
629_txn client_get => sub {
630 my ($self, $ok, $err, $uri, $identifier, %kv) = @_;
631
632 $self->serialise ($identifier => sub {
633 my ($self, $guard) = @_;
634
635 $self->send_msg (client_get =>
636 %kv,
637 uri => $uri,
638 identifier => $identifier,
639 );
640
641 $self->on (sub {
412 my ($self, $type, $kv, $rdata) = @_; 642 my ($self, $type, $kv, @extra) = @_;
413 643
644 $guard if 0;
645
646 if ($kv->{identifier} eq $identifier) {
647 if ($type eq "persistent_get") {
414 $cv->($kv); 648 $ok->($kv);
649 return;
650 } elsif ($type eq "protocol_error") {
651 $err->($kv);
652 return;
653 }
654 }
655
415 1 656 1
657 });
658 });
659};
660
661=item $status = $fcp->remove_request ($identifier[, $global])
662
663Remove the request with the given isdentifier. Returns true if successful,
664false on error.
665
666=cut
667
668_txn remove_request => sub {
669 my ($self, $ok, $err, $identifier, $global) = @_;
670
671 $self->serialise ($identifier => sub {
672 my ($self, $guard) = @_;
673
674 $self->send_msg (remove_request =>
675 identifier => $identifier,
676 global => $global ? "true" : "false",
677 );
678 $self->on (sub {
679 my ($self, $type, $kv, @extra) = @_;
680
681 $guard if 0;
682
683 if ($kv->{identifier} eq $identifier) {
684 if ($type eq "persistent_request_removed") {
685 $ok->(1);
686 return;
687 } elsif ($type eq "protocol_error") {
688 $err->($kv);
689 return;
690 }
691 }
692
693 1
694 });
695 });
696};
697
698=item ($can_read, $can_write) = $fcp->test_dda ($local_directory, $remote_directory, $want_read, $want_write))
699
700The DDA test in FCP is probably the single most broken protocol - only
701one directory test can be outstanding at any time, and some guessing and
702heuristics are involved in mangling the paths.
703
704This function combines C<TestDDARequest> and C<TestDDAResponse> in one
705request, handling file reading and writing as well, and tries very hard to
706do the right thing.
707
708Both C<$local_directory> and C<$remote_directory> must specify the same
709directory - C<$local_directory> is the directory path on the client (where
710L<AnyEvent::FCP> runs) and C<$remote_directory> is the directory path on
711the server (where the freenet node runs). When both are running on the
712same node, the paths are generally identical.
713
714C<$want_read> and C<$want_write> should be set to a true value when you
715want to read (get) files or write (put) files, respectively.
716
717On error, an exception is thrown. Otherwise, C<$can_read> and
718C<$can_write> indicate whether you can reaqd or write to freenet via the
719directory.
720
721=cut
722
723_txn test_dda => sub {
724 my ($self, $ok, $err, $local, $remote, $want_read, $want_write) = @_;
725
726 $self->serialise (test_dda => sub {
727 my ($self, $guard) = @_;
728
729 $self->send_msg (test_dda_request =>
730 directory => $remote,
731 want_read_directory => $want_read ? "true" : "false",
732 want_write_directory => $want_write ? "true" : "false",
733 );
734 $self->on (sub {
735 my ($self, $type, $kv) = @_;
736
737 if ($type eq "test_dda_reply") {
738 # the filenames are all relative to the server-side directory,
739 # which might or might not match $remote anymore, so we
740 # need to rewrite the paths to be relative to $local
741 for my $k (qw(read_filename write_filename)) {
742 my $f = $kv->{$k};
743 for my $dir ($kv->{directory}, $remote) {
744 if ($dir eq substr $f, 0, length $dir) {
745 substr $f, 0, 1 + length $dir, "";
746 $kv->{$k} = $f;
747 last;
748 }
749 }
750 }
751
752 my %response = (directory => $remote);
753
754 if (length $kv->{read_filename}) {
755 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
756 sysread $fh, my $buf, -s $fh;
757 $response{read_content} = $buf;
758 }
759 }
760
761 if (length $kv->{write_filename}) {
762 if (open my $fh, ">:raw", "$local/$kv->{write_filename}") {
763 syswrite $fh, $kv->{content_to_write};
764 }
765 }
766
767 $self->send_msg (test_dda_response => %response);
768
769 $self->on (sub {
770 my ($self, $type, $kv) = @_;
771
772 $guard if 0; # reference
773
774 if ($type eq "test_dda_complete") {
775 $ok->(
776 $kv->{read_directory_allowed} eq "true",
777 $kv->{write_directory_allowed} eq "true",
778 );
779 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
780 $err->($kv->{extra_description});
781 return;
782 }
783
784 1
785 });
786
787 return;
788 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
789 $err->($kv);
790 return;
791 }
792
793 1
794 });
795 });
796};
797
798=back
799
800=head2 REQUEST CACHE
801
802The C<AnyEvent::FCP> class keeps a request cache, where it caches all
803information from requests.
804
805For these messages, it will store a copy of the key-value pairs, together with a C<type> slot,
806in C<< $fcp->{req}{$identifier} >>:
807
808 persistent_get
809 persistent_put
810 persistent_put_dir
811
812This message updates the stored data:
813
814 persistent_request_modified
815
816This message will remove this entry:
817
818 persistent_request_removed
819
820These messages get merged into the cache entry, under their
821type, i.e. a C<simple_progress> message will be stored in C<<
822$fcp->{req}{$identifier}{simple_progress} >>:
823
824 simple_progress # get/put
825
826 uri_generated # put
827 generated_metadata # put
828 started_compression # put
829 finished_compression # put
830 put_failed # put
831 put_fetchable # put
832 put_successful # put
833
834 sending_to_network # get
835 compatibility_mode # get
836 expected_hashes # get
837 expected_mime # get
838 expected_data_length # get
839 get_failed # get
840 data_found # get
841 enter_finite_cooldown # get
842
843In addition, an event (basically a fake message) of type C<request_changed> is generated
844on every change, which will be called as C<< $cb->($fcp, $kv, $type) >>, where C<$type>
845is the type of the original message triggering the change,
846
847To fill this cache with the global queue and keep it updated,
848call C<watch_global> to subscribe to updates, followed by
849C<list_persistent_requests_sync>.
850
851 $fcp->watch_global_sync_; # do not wait
852 $fcp->list_persistent_requests; # wait
853
854To get a better idea of what is stored in the cache, here is an example of
855what might be stored in C<< $fcp->{req}{"Frost-gpl.txt"} >>:
856
857 {
858 identifier => "Frost-gpl.txt",
859 uri => 'CHK@Fnx5kzdrfE,EImdzaVyEWl,AAIC--8/gpl.txt',
860 binary_blob => "false",
861 global => "true",
862 max_retries => -1,
863 max_size => 9223372036854775807,
864 persistence => "forever",
865 priority_class => 3,
866 real_time => "false",
867 return_type => "direct",
868 started => "true",
869 type => "persistent_get",
870 verbosity => 2147483647,
871 sending_to_network => {
872 identifier => "Frost-gpl.txt",
873 global => "true",
416 }, 874 },
417 ); 875 compatibility_mode => {
418 876 identifier => "Frost-gpl.txt",
419 $cv->(); 877 definitive => "true",
420}; 878 dont_compress => "false",
421 879 global => "true",
422=back 880 max => "COMPAT_1255",
881 min => "COMPAT_1255",
882 },
883 expected_hashes => {
884 identifier => "Frost-gpl.txt",
885 global => "true",
886 hashes => {
887 ed2k => "d83596f5ee3b7...",
888 md5 => "e0894e4a2a6...",
889 sha1 => "...",
890 sha256 => "...",
891 sha512 => "...",
892 tth => "...",
893 },
894 },
895 expected_mime => {
896 identifier => "Frost-gpl.txt",
897 global => "true",
898 metadata => { content_type => "application/rar" },
899 },
900 expected_data_length => {
901 identifier => "Frost-gpl.txt",
902 data_length => 37576,
903 global => "true",
904 },
905 simple_progress => {
906 identifier => "Frost-gpl.txt",
907 failed => 0,
908 fatally_failed => 0,
909 finalized_total => "true",
910 global => "true",
911 last_progress => 1438639282628,
912 required => 372,
913 succeeded => 102,
914 total => 747,
915 },
916 data_found => {
917 identifier => "Frost-gpl.txt",
918 completion_time => 1438663354026,
919 data_length => 37576,
920 global => "true",
921 metadata => { content_type => "image/jpeg" },
922 startup_time => 1438657196167,
923 },
924 }
423 925
424=head1 EXAMPLE PROGRAM 926=head1 EXAMPLE PROGRAM
425 927
426 use AnyEvent::FCP; 928 use AnyEvent::FCP;
427 929
428 my $fcp = new AnyEvent::FCP; 930 my $fcp = new AnyEvent::FCP;
429 931
430 # let us look at the global request list 932 # let us look at the global request list
431 $fcp->watch_global (1, 0); 933 $fcp->watch_global_ (1);
432 934
433 # list them, synchronously 935 # list them, synchronously
434 my $req = $fcp->list_persistent_requests_sync; 936 my $req = $fcp->list_persistent_requests;
435 937
436 # go through all requests 938 # go through all requests
939TODO
437 for my $req (values %$req) { 940 for my $req (values %$req) {
438 # skip jobs not directly-to-disk 941 # skip jobs not directly-to-disk
439 next unless $req->{return_type} eq "disk"; 942 next unless $req->{return_type} eq "disk";
440 # skip jobs not issued by FProxy 943 # skip jobs not issued by FProxy
441 next unless $req->{identifier} =~ /^FProxy:/; 944 next unless $req->{identifier} =~ /^FProxy:/;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines