ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.6 by root, Mon May 31 06:27:37 2010 UTC vs.
Revision 1.19 by root, Tue Jun 7 18:53:23 2016 UTC

6 6
7 use AnyEvent::FCP; 7 use AnyEvent::FCP;
8 8
9 my $fcp = new AnyEvent::FCP; 9 my $fcp = new AnyEvent::FCP;
10 10
11# transactions return condvars 11 # transactions return condvars
12 my $lp_cv = $fcp->list_peers; 12 my $lp_cv = $fcp->list_peers;
13 my $pr_cv = $fcp->list_persistent_requests; 13 my $pr_cv = $fcp->list_persistent_requests;
14 14
15 my $peers = $lp_cv->recv; 15 my $peers = $lp_cv->recv;
16 my $reqs = $pr_cv->recv; 16 my $reqs = $pr_cv->recv;
26The module uses L<AnyEvent> to find a suitable event module. 26The module uses L<AnyEvent> to find a suitable event module.
27 27
28Only very little is implemented, ask if you need more, and look at the 28Only very little is implemented, ask if you need more, and look at the
29example program later in this section. 29example program later in this section.
30 30
31=head2 EXAMPLE
32
33This example fetches the download list and sets the priority of all files
34with "a" in their name to "emergency":
35
36 use AnyEvent::FCP;
37
38 my $fcp = new AnyEvent::FCP;
39
40 $fcp->watch_global (1, 0);
41 my $req = $fcp->list_persistent_requests;
42
43TODO
44 for my $req (values %$req) {
45 if ($req->{filename} =~ /a/) {
46 $fcp->modify_persistent_request (1, $req->{identifier}, undef, 0);
47 }
48 }
49
31=head2 IMPORT TAGS 50=head2 IMPORT TAGS
32 51
33Nothing much can be "imported" from this module right now. 52Nothing much can be "imported" from this module right now.
34 53
35=head2 THE AnyEvent::FCP CLASS 54=head1 THE AnyEvent::FCP CLASS
36 55
37=over 4 56=over 4
38 57
39=cut 58=cut
40 59
42 61
43use common::sense; 62use common::sense;
44 63
45use Carp; 64use Carp;
46 65
47our $VERSION = '0.21'; 66our $VERSION = 0.4;
48 67
49use Scalar::Util (); 68use Scalar::Util ();
50 69
51use AnyEvent; 70use AnyEvent;
52use AnyEvent::Handle; 71use AnyEvent::Handle;
72use AnyEvent::Util ();
73
74our %TOLC; # tolc cache
53 75
54sub touc($) { 76sub touc($) {
55 local $_ = shift; 77 local $_ = shift;
56 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime)(?:_|$))/\U$1/; 78 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
57 s/(?:^|_)(.)/\U$1/g; 79 s/(?:^|_)(.)/\U$1/g;
58 $_ 80 $_
59} 81}
60 82
61sub tolc($) { 83sub tolc($) {
62 local $_ = shift; 84 local $_ = shift;
63 1 while s/(SVK|CHK|URI|FCP|DS|MIME)([^_])/$1\_$2/i; 85 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/;
64 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME)/$1\_$2/i; 86 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/;
65 s/(?<=[a-z])(?=[A-Z])/_/g; 87 s/(?<=[a-z])(?=[A-Z])/_/g;
66 lc 88 lc
67} 89}
68 90
69=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 91=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, name => $name]
70 92
71Create a new FCP connection to the given host and port (default 93Create a new FCP connection to the given host and port (default
72127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). 94127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
73 95
74If no C<name> was specified, then AnyEvent::FCP will generate a 96If no C<name> was specified, then AnyEvent::FCP will generate a
75(hopefully) unique client name for you. 97(hopefully) unique client name for you.
76 98
77You can install a progress callback that is being called with the AnyEvent::FCP
78object, the type, a hashref with key-value pairs and a reference to any received data,
79for all unsolicited messages.
80
81Example:
82
83 sub progress_cb {
84 my ($self, $type, $kv, $rdata) = @_;
85
86 if ($type eq "simple_progress") {
87 warn "$kv->{identifier} $kv->{succeeded}/$kv->{required}\n";
88 }
89 }
90
91=cut 99=cut
92 100
93sub new { 101sub new {
94 my $class = shift; 102 my $class = shift;
103
104 my $rand = join "", map chr 0x21 + rand 94, 1..40; # ~ 262 bits entropy
105
95 my $self = bless { @_ }, $class; 106 my $self = bless {
96 107 host => $ENV{FREDHOST} || "127.0.0.1",
97 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 108 port => $ENV{FREDPORT} || 9481,
98 $self->{port} ||= $ENV{FREDPORT} || 9481; 109 timeout => 3600 * 2,
99 $self->{name} ||= time.rand.rand.rand; # lame 110 keepalive => 9 * 60,
100 $self->{timeout} ||= 600; 111 name => time.rand.rand.rand, # lame
101 $self->{progress} ||= sub { }; 112 @_,
102 113 queue => [],
103 $self->{id} = "a0"; 114 req => {},
115 prefix => "..:aefcpid:$rand:",
116 idseq => "a0",
117 }, $class;
104 118
105 { 119 {
106 Scalar::Util::weaken (my $self = $self); 120 Scalar::Util::weaken (my $self = $self);
121
122 $self->{kw} = AE::timer $self->{keepalive}, $self->{keepalive}, sub {
123 $self->{hdl}->push_write ("\n");
124 };
125
126 our $ENDMESSAGE = qr<\012(EndMessage|Data)\012>;
127
128 # these are declared here for performance reasons
129 my ($k, $v, $type);
130 my $rdata;
131
132 my $on_read = sub {
133 my ($hdl) = @_;
134
135 # we only carve out whole messages here
136 while ($hdl->{rbuf} =~ /\012(EndMessage|Data)\012/) {
137 # remember end marker
138 $rdata = $1 eq "Data"
139 or $1 eq "EndMessage"
140 or return $self->fatal ("protocol error, expected message end, got $1\n");
141
142 my @lines = split /\012/, substr $hdl->{rbuf}, 0, $-[0];
143
144 substr $hdl->{rbuf}, 0, $+[0], ""; # remove pkg
145
146 $type = shift @lines;
147 $type = ($TOLC{$type} ||= tolc $type);
148
149 my %kv;
150
151 for (@lines) {
152 ($k, $v) = split /=/, $_, 2;
153 $k = ($TOLC{$k} ||= tolc $k);
154
155 if ($k =~ /\./) {
156 # generic, slow case
157 my @k = split /\./, $k;
158 my $ro = \\%kv;
159
160 while (@k) {
161 $k = shift @k;
162 if ($k =~ /^\d+$/) {
163 $ro = \$$ro->[$k];
164 } else {
165 $ro = \$$ro->{$k};
166 }
167 }
168
169 $$ro = $v;
170
171 next;
172 }
173
174 # special comon case, for performance only
175 $kv{$k} = $v;
176 }
177
178 if ($rdata) {
179 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
180 $rdata = \$_[1];
181 $self->recv ($type, \%kv, $rdata);
182 });
183
184 last; # do not tgry to parse more messages
185 } else {
186 $self->recv ($type, \%kv);
187 }
188 }
189 };
107 190
108 $self->{hdl} = new AnyEvent::Handle 191 $self->{hdl} = new AnyEvent::Handle
109 connect => [$self->{host} => $self->{port}], 192 connect => [$self->{host} => $self->{port}],
110 timeout => $self->{timeout}, 193 timeout => $self->{timeout},
194 on_read => $on_read,
195 on_eof => $self->{on_eof},
111 on_error => sub { 196 on_error => sub {
112 warn "<@_>\n"; 197 $self->fatal ($_[2]);
113 exit 1;
114 }, 198 },
115 on_read => sub { $self->on_read (@_) }, 199 ;
116 on_eof => $self->{on_eof} || sub { };
117 200
118 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 201 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
119 } 202 }
120 203
121 $self->send_msg ( 204 $self->send_msg (client_hello =>
122 client_hello =>
123 name => $self->{name}, 205 name => $self->{name},
124 expected_version => "2.0", 206 expected_version => "2.0",
125 ); 207 );
126 208
127 $self 209 $self
128} 210}
129 211
212sub fatal {
213 my ($self, $msg) = @_;
214
215 $self->{hdl}->shutdown;
216 delete $self->{kw};
217
218 if ($self->{on_error}) {
219 $self->{on_error}->($msg);
220 } else {
221 die $msg;
222 }
223}
224
225sub identifier {
226 $_[0]{prefix} . ++$_[0]{idseq}
227}
228
130sub send_msg { 229sub send_msg {
131 my ($self, $type, %kv) = @_; 230 my ($self, $type, %kv) = @_;
132 231
133 my $data = delete $kv{data}; 232 my $data = delete $kv{data};
134 233
135 if (exists $kv{id_cb}) { 234 if (exists $kv{id_cb}) {
136 my $id = $kv{identifier} || ++$self->{id}; 235 my $id = $kv{identifier} ||= $self->identifier;
137 $self->{id}{$id} = delete $kv{id_cb}; 236 $self->{id}{$id} = delete $kv{id_cb};
138 $kv{identifier} = $id;
139 } 237 }
140 238
141 my $msg = (touc $type) . "\012" 239 my $msg = (touc $type) . "\012"
142 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; 240 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
143 241
155 } 253 }
156 254
157 $self->{hdl}->push_write ($msg); 255 $self->{hdl}->push_write ($msg);
158} 256}
159 257
160sub on_read { 258sub on {
161 my ($self) = @_; 259 my ($self, $cb) = @_;
162 260
163 my $type; 261 # cb return undef - message eaten, remove cb
164 my %kv; 262 # cb return 0 - message eaten
165 my $rdata; 263 # cb return 1 - pass to next
166 264
167 my $done_cb = sub { 265 push @{ $self->{on} }, $cb;
168 $kv{pkt_type} = $type; 266}
169 267
170 if (my $cb = $self->{queue}[0]) { 268sub _push_queue {
171 $cb->($self, $type, \%kv, $rdata) 269 my ($self, $queue) = @_;
172 and shift @{ $self->{queue} }; 270
173 } else { 271 shift @$queue;
174 $self->default_recv ($type, \%kv, $rdata); 272 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
273 if @$queue;
274}
275
276# lock so only one $type (arbitrary string) is in flight,
277# to work around horribly misdesigned protocol.
278sub serialise {
279 my ($self, $type, $cb) = @_;
280
281 my $queue = $self->{serialise}{$type} ||= [];
282 push @$queue, $cb;
283 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
284 unless $#$queue;
285}
286
287# how to merge these types into $self->{persistent}
288our %PERSISTENT_TYPE = (
289 persistent_get => sub { %{ $_[1] } = (type => "persistent_get" , %{ $_[2] }) },
290 persistent_put => sub { %{ $_[1] } = (type => "persistent_put" , %{ $_[2] }) },
291 persistent_put_dir => sub { %{ $_[1] } = (type => "persistent_put_dir", %{ $_[2] }) },
292 persistent_request_modified => sub { %{ $_[1] } = (%{ $_[1] }, %{ $_[2] }) },
293 persistent_request_removed => sub { delete $_[0]{req}{$_[2]{identifier}} },
294
295 simple_progress => sub { $_[1]{simple_progress} = $_[2] }, # get/put
296
297 uri_generated => sub { $_[1]{uri_generated} = $_[2] }, # put
298 generated_metadata => sub { $_[1]{generated_metadata} = $_[2] }, # put
299 started_compression => sub { $_[1]{started_compression} = $_[2] }, # put
300 finished_compression => sub { $_[1]{finished_compression} = $_[2] }, # put
301 put_fetchable => sub { $_[1]{put_fetchable} = $_[2] }, # put
302 put_failed => sub { $_[1]{put_failed} = $_[2] }, # put
303 put_successful => sub { $_[1]{put_successful} = $_[2] }, # put
304
305 sending_to_network => sub { $_[1]{sending_to_network} = $_[2] }, # get
306 compatibility_mode => sub { $_[1]{compatibility_mode} = $_[2] }, # get
307 expected_hashes => sub { $_[1]{expected_hashes} = $_[2] }, # get
308 expected_mime => sub { $_[1]{expected_mime} = $_[2] }, # get
309 expected_data_length => sub { $_[1]{expected_data_length} = $_[2] }, # get
310 get_failed => sub { $_[1]{get_failed} = $_[2] }, # get
311 data_found => sub { $_[1]{data_found} = $_[2] }, # get
312 enter_finite_cooldown => sub { $_[1]{enter_finite_cooldown} = $_[2] }, # get
313);
314
315sub recv {
316 my ($self, $type, $kv, @extra) = @_;
317
318 if (my $cb = $PERSISTENT_TYPE{$type}) {
319 my $id = $kv->{identifier};
320 my $req = $_[0]{req}{$id} ||= {};
321 $cb->($self, $req, $kv);
322 $self->recv (request_changed => $kv, $type, @extra);
323 }
324
325 my $on = $self->{on};
326 for (0 .. $#$on) {
327 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
328 splice @$on, $_, 1 unless defined $res;
329 return;
175 } 330 }
176 }; 331 }
177 332
178 my $hdr_cb; $hdr_cb = sub { 333 if (my $cb = $self->{queue}[0]) {
179 if ($_[1] =~ /^([^=]+)=(.*)$/) { 334 $cb->($self, $type, $kv, @extra)
180 my ($k, $v) = ($1, $2); 335 and shift @{ $self->{queue} };
181 my @k = split /\./, tolc $k;
182 my $ro = \\%kv;
183
184 while (@k) {
185 my $k = shift @k;
186 if ($k =~ /^\d+$/) {
187 $ro = \$$ro->[$k];
188 } else {
189 $ro = \$$ro->{$k};
190 }
191 }
192
193 $$ro = $v;
194
195 $_[0]->push_read (line => $hdr_cb);
196 } elsif ($_[1] eq "Data") {
197 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
198 $rdata = \$_[1];
199 $done_cb->();
200 });
201 } elsif ($_[1] eq "EndMessage") {
202 $done_cb->();
203 } else { 336 } else {
204 die "protocol error, expected message end, got $_[1]\n";#d# 337 $self->default_recv ($type, $kv, @extra);
205 }
206 }; 338 }
207
208 $self->{hdl}->push_read (line => sub {
209 $type = tolc $_[1];
210 $_[0]->push_read (line => $hdr_cb);
211 });
212} 339}
213 340
214sub default_recv { 341sub default_recv {
215 my ($self, $type, $kv, $rdata) = @_; 342 my ($self, $type, $kv, $rdata) = @_;
216 343
217 if ($type eq "node_hello") { 344 if ($type eq "node_hello") {
218 $self->{node_hello} = $kv; 345 $self->{node_hello} = $kv;
219 } elsif (exists $self->{id}{$kv->{identifier}}) { 346 } elsif (exists $self->{id}{$kv->{identifier}}) {
220 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) 347 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
221 and delete $self->{id}{$kv->{identifier}}; 348 and delete $self->{id}{$kv->{identifier}};
222 } else {
223 &{ $self->{progress} };
224 } 349 }
225} 350}
351
352=back
353
354=head2 FCP REQUESTS
355
356The following methods implement various requests. Most of them map
357directory to the FCP message of the same name. The added benefit of
358these over sending requests yourself is that they handle the necessary
359serialisation, protocol quirks, and replies.
360
361All of them exist in two versions, the variant shown in this manpage, and
362a variant with an extra C<_> at the end, and an extra C<$cb> argument. The
363version as shown is I<synchronous> - it will wait for any replies, and
364either return the reply, or croak with an error. The underscore variant
365returns immediately and invokes one or more callbacks or condvars later.
366
367For example, the call
368
369 $info = $fcp->get_plugin_info ($name, $detailed);
370
371Also comes in this underscore variant:
372
373 $fcp->get_plugin_info_ ($name, $detailed, $cb);
374
375You can thinbk of the underscore as a kind of continuation indicator - the
376normal function waits and returns with the data, the C<_> indicates that
377you pass the continuation yourself, and the continuation will be invoked
378with the results.
379
380This callback/continuation argument (C<$cb>) can come in three forms itself:
381
382=over 4
383
384=item A code reference (or rather anything not matching some other alternative)
385
386This code reference will be invoked with the result on success. On an
387error, it will die (in the event loop) with a backtrace of the call site.
388
389This is a popular choice, but it makes handling errors hard - make sure
390you never generate protocol errors!
391
392=item A condvar (as returned by e.g. C<< AnyEvent->condvar >>)
393
394When a condvar is passed, it is sent (C<< $cv->send ($results) >>) the
395results when the request has finished. Should an error occur, the error
396will instead result in C<< $cv->croak ($error) >>.
397
398This is also a popular choice.
399
400=item An array with two callbacks C<[$success, $failure]>
401
402The C<$success> callback will be invoked with the results, while the
403C<$failure> callback will be invoked on any errors.
404
405=item C<undef>
406
407This is the same thing as specifying C<sub { }> as callback, i.e. on
408success, the results are ignored, while on failure, you the module dies
409with a backtrace.
410
411This is good for quick scripts, or when you really aren't interested in
412the results.
413
414=back
415
416=cut
417
418our $NOP_CB = sub { };
226 419
227sub _txn { 420sub _txn {
228 my ($name, $sub) = @_; 421 my ($name, $sub) = @_;
229 422
230 *{$name} = sub { 423 *{$name} = sub {
231 splice @_, 1, 0, (my $cv = AnyEvent->condvar); 424 my $cv = AE::cv;
232 &$sub;
233 $cv
234 };
235 425
236 *{"$name\_sync"} = sub { 426 splice @_, 1, 0, $cv, sub { $cv->croak ($_[0]{extra_description}) };
237 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
238 &$sub; 427 &$sub;
239 $cv->recv 428 $cv->recv
240 }; 429 };
241}
242 430
243=item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]]) 431 *{"$name\_"} = sub {
432 my ($ok, $err) = pop;
244 433
434 if (ARRAY:: eq ref $ok) {
435 ($ok, $err) = @$ok;
436 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
437 $err = sub { $ok->croak ($_[0]{extra_description}) };
438 } else {
439 my $bt = Carp::longmess "";
440 $err = sub {
441 die "$_[0]{code_description} ($_[0]{extra_description})$bt";
442 };
443 }
444
445 $ok ||= $NOP_CB;
446
447 splice @_, 1, 0, $ok, $err;
448 &$sub;
449 };
450}
451
452=over 4
453
245=item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]]) 454=item $peers = $fcp->list_peers ([$with_metdata[, $with_volatile]])
246 455
247=cut 456=cut
248 457
249_txn list_peers => sub { 458_txn list_peers => sub {
250 my ($self, $cv, $with_metadata, $with_volatile) = @_; 459 my ($self, $ok, undef, $with_metadata, $with_volatile) = @_;
251 460
252 my @res; 461 my @res;
253 462
254 $self->send_msg (list_peers => 463 $self->send_msg (list_peers =>
255 with_metadata => $with_metadata ? "true" : "false", 464 with_metadata => $with_metadata ? "true" : "false",
256 with_volatile => $with_volatile ? "true" : "false", 465 with_volatile => $with_volatile ? "true" : "false",
257 id_cb => sub { 466 id_cb => sub {
258 my ($self, $type, $kv, $rdata) = @_; 467 my ($self, $type, $kv, $rdata) = @_;
259 468
260 if ($type eq "end_list_peers") { 469 if ($type eq "end_list_peers") {
261 $cv->(\@res); 470 $ok->(\@res);
262 1 471 1
263 } else { 472 } else {
264 push @res, $kv; 473 push @res, $kv;
265 0 474 0
266 } 475 }
267 }, 476 },
268 ); 477 );
269}; 478};
270 479
271=item $cv = $fcp->list_peer_notes ($node_identifier)
272
273=item $notes = $fcp->list_peer_notes_sync ($node_identifier) 480=item $notes = $fcp->list_peer_notes ($node_identifier)
274 481
275=cut 482=cut
276 483
277_txn list_peer_notes => sub { 484_txn list_peer_notes => sub {
278 my ($self, $cv, $node_identifier) = @_; 485 my ($self, $ok, undef, $node_identifier) = @_;
279 486
280 $self->send_msg (list_peer_notes => 487 $self->send_msg (list_peer_notes =>
281 node_identifier => $node_identifier, 488 node_identifier => $node_identifier,
282 id_cb => sub { 489 id_cb => sub {
283 my ($self, $type, $kv, $rdata) = @_; 490 my ($self, $type, $kv, $rdata) = @_;
284 491
285 $cv->($kv); 492 $ok->($kv);
286 1 493 1
287 }, 494 },
288 ); 495 );
289}; 496};
290 497
291=item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
292
293=item $fcp->watch_global_sync ($enabled[, $verbosity_mask]) 498=item $fcp->watch_global ($enabled[, $verbosity_mask])
294 499
295=cut 500=cut
296 501
297_txn watch_global => sub { 502_txn watch_global => sub {
298 my ($self, $cv, $enabled, $verbosity_mask) = @_; 503 my ($self, $ok, $err, $enabled, $verbosity_mask) = @_;
299 504
300 $self->send_msg (watch_global => 505 $self->send_msg (watch_global =>
301 enabled => $enabled ? "true" : "false", 506 enabled => $enabled ? "true" : "false",
302 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (), 507 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
303 ); 508 );
304 509
305 $cv->(); 510 $ok->();
306}; 511};
307 512
308=item $cv = $fcp->list_persistent_requests
309
310=item $reqs = $fcp->list_persistent_requests_sync 513=item $reqs = $fcp->list_persistent_requests
311 514
312=cut 515=cut
313 516
314_txn list_persistent_requests => sub { 517_txn list_persistent_requests => sub {
315 my ($self, $cv) = @_; 518 my ($self, $ok, $err) = @_;
316 519
520 $self->serialise (list_persistent_requests => sub {
521 my ($self, $guard) = @_;
522
317 my %res; 523 my @res;
318 524
319 $self->send_msg ("list_persistent_requests"); 525 $self->send_msg ("list_persistent_requests");
320 526
321 push @{ $self->{queue} }, sub { 527 $self->on (sub {
322 my ($self, $type, $kv, $rdata) = @_; 528 my ($self, $type, $kv, $rdata) = @_;
323 529
530 $guard if 0;
531
324 if ($type eq "end_list_persistent_requests") { 532 if ($type eq "end_list_persistent_requests") {
325 $cv->(\%res); 533 $ok->(\@res);
534 return;
535 } else {
536 my $id = $kv->{identifier};
537
538 if ($type =~ /^persistent_(get|put|put_dir)$/) {
539 push @res, [$type, $kv];
540 }
541 }
542
326 1 543 1
327 } else { 544 });
328 my $id = $kv->{identifier}; 545 });
546};
329 547
330 if ($type =~ /^persistent_(get|put|put_dir)$/) { 548=item $sync = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
331 $res{$id} = { 549
332 type => $1, 550Update either the C<client_token> or C<priority_class> of a request
333 %{ $res{$id} }, 551identified by C<$global> and C<$identifier>, depending on which of
552C<$client_token> and C<$priority_class> are not C<undef>.
553
554=cut
555
556_txn modify_persistent_request => sub {
557 my ($self, $ok, $err, $global, $identifier, $client_token, $priority_class) = @_;
558
559 $self->serialise ($identifier => sub {
560 my ($self, $guard) = @_;
561
562 $self->send_msg (modify_persistent_request =>
563 global => $global ? "true" : "false",
564 identifier => $identifier,
565 defined $client_token ? (client_token => $client_token ) : (),
566 defined $priority_class ? (priority_class => $priority_class) : (),
567 );
568
569 $self->on (sub {
570 my ($self, $type, $kv, @extra) = @_;
571
572 $guard if 0;
573
574 if ($kv->{identifier} eq $identifier) {
575 if ($type eq "persistent_request_modified") {
334 %$kv, 576 $ok->($kv);
577 return;
578 } elsif ($type eq "protocol_error") {
579 $err->($kv);
580 return;
335 }; 581 }
336 } elsif ($type eq "simple_progress") {
337 delete $kv->{pkt_type}; # save memory
338 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
339 } else {
340 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
341 } 582 }
583
342 0 584 1
343 } 585 });
344 }; 586 });
345}; 587};
346 588
347=item $cv = $fcp->remove_request ($global, $identifier)
348
349=item $status = $fcp->remove_request_sync ($global, $identifier)
350
351=cut
352
353_txn remove_request => sub {
354 my ($self, $cv, $global, $identifier) = @_;
355
356 $self->send_msg (remove_request =>
357 global => $global ? "true" : "false",
358 identifier => $identifier,
359 id_cb => sub {
360 my ($self, $type, $kv, $rdata) = @_;
361
362 $cv->($kv);
363 1
364 },
365 );
366};
367
368=item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
369
370=item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
371
372=cut
373
374_txn modify_persistent_request => sub {
375 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
376
377 $self->send_msg (modify_persistent_request =>
378 global => $global ? "true" : "false",
379 defined $client_token ? (client_token => $client_token ) : (),
380 defined $priority_class ? (priority_class => $priority_class) : (),
381 identifier => $identifier,
382 id_cb => sub {
383 my ($self, $type, $kv, $rdata) = @_;
384
385 $cv->($kv);
386 1
387 },
388 );
389};
390
391=item $cv = $fcp->get_plugin_info ($name, $detailed)
392
393=item $info = $fcp->get_plugin_info_sync ($name, $detailed) 589=item $info = $fcp->get_plugin_info ($name, $detailed)
394 590
395=cut 591=cut
396 592
397_txn get_plugin_info => sub { 593_txn get_plugin_info => sub {
398 my ($self, $cv, $name, $detailed) = @_; 594 my ($self, $ok, $err, $name, $detailed) = @_;
595
596 my $id = $self->identifier;
399 597
400 $self->send_msg (get_plugin_info => 598 $self->send_msg (get_plugin_info =>
599 identifier => $id,
401 plugin_name => $name, 600 plugin_name => $name,
402 detailed => $detailed ? "true" : "false", 601 detailed => $detailed ? "true" : "false",
403 id_cb => sub {
404 my ($self, $type, $kv, $rdata) = @_;
405
406 $cv->($kv);
407 1
408 },
409 ); 602 );
603 $self->on (sub {
604 my ($self, $type, $kv) = @_;
605
606 if ($kv->{identifier} eq $id) {
607 if ($type eq "get_plugin_info") {
608 $ok->($kv);
609 } else {
610 $err->($kv, $type);
611 }
612 return;
613 }
614
615 1
616 });
410}; 617};
411 618
412=item $cv = $fcp->client_get ($uri, $identifier, %kv)
413
414=item $status = $fcp->client_get_sync ($uri, $identifier, %kv) 619=item $status = $fcp->client_get ($uri, $identifier, %kv)
415 620
416%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>). 621%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
417 622
418ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries, 623ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
419priority_class, persistence, client_token, global, return_type, 624priority_class, persistence, client_token, global, return_type,
420binary_blob, allowed_mime_types, filename, temp_filename 625binary_blob, allowed_mime_types, filename, temp_filename
421 626
422=cut 627=cut
423 628
424_txn client_get => sub { 629_txn client_get => sub {
425 my ($self, $cv, $uri, $identifier, %kv) = @_; 630 my ($self, $ok, $err, $uri, $identifier, %kv) = @_;
426 631
632 $self->serialise ($identifier => sub {
633 my ($self, $guard) = @_;
634
427 $self->send_msg (client_get => 635 $self->send_msg (client_get =>
428 %kv, 636 %kv,
429 uri => $uri, 637 uri => $uri,
430 identifier => $identifier, 638 identifier => $identifier,
431 id_cb => sub { 639 );
640
641 $self->on (sub {
432 my ($self, $type, $kv, $rdata) = @_; 642 my ($self, $type, $kv, @extra) = @_;
433 643
644 $guard if 0;
645
646 if ($kv->{identifier} eq $identifier) {
647 if ($type eq "persistent_get") {
434 $cv->($kv); 648 $ok->($kv);
649 return;
650 } elsif ($type eq "protocol_error") {
651 $err->($kv);
652 return;
653 }
654 }
655
435 1 656 1
657 });
658 });
659};
660
661=item $status = $fcp->remove_request ($identifier[, $global])
662
663Remove the request with the given isdentifier. Returns true if successful,
664false on error.
665
666=cut
667
668_txn remove_request => sub {
669 my ($self, $ok, $err, $identifier, $global) = @_;
670
671 $self->serialise ($identifier => sub {
672 my ($self, $guard) = @_;
673
674 $self->send_msg (remove_request =>
675 identifier => $identifier,
676 global => $global ? "true" : "false",
677 );
678 $self->on (sub {
679 my ($self, $type, $kv, @extra) = @_;
680
681 $guard if 0;
682
683 if ($kv->{identifier} eq $identifier) {
684 if ($type eq "persistent_request_removed") {
685 $ok->(1);
686 return;
687 } elsif ($type eq "protocol_error") {
688 $err->($kv);
689 return;
690 }
691 }
692
693 1
694 });
695 });
696};
697
698=item ($can_read, $can_write) = $fcp->test_dda ($local_directory, $remote_directory, $want_read, $want_write))
699
700The DDA test in FCP is probably the single most broken protocol - only
701one directory test can be outstanding at any time, and some guessing and
702heuristics are involved in mangling the paths.
703
704This function combines C<TestDDARequest> and C<TestDDAResponse> in one
705request, handling file reading and writing as well, and tries very hard to
706do the right thing.
707
708Both C<$local_directory> and C<$remote_directory> must specify the same
709directory - C<$local_directory> is the directory path on the client (where
710L<AnyEvent::FCP> runs) and C<$remote_directory> is the directory path on
711the server (where the freenet node runs). When both are running on the
712same node, the paths are generally identical.
713
714C<$want_read> and C<$want_write> should be set to a true value when you
715want to read (get) files or write (put) files, respectively.
716
717On error, an exception is thrown. Otherwise, C<$can_read> and
718C<$can_write> indicate whether you can reaqd or write to freenet via the
719directory.
720
721=cut
722
723_txn test_dda => sub {
724 my ($self, $ok, $err, $local, $remote, $want_read, $want_write) = @_;
725
726 $self->serialise (test_dda => sub {
727 my ($self, $guard) = @_;
728
729 $self->send_msg (test_dda_request =>
730 directory => $remote,
731 want_read_directory => $want_read ? "true" : "false",
732 want_write_directory => $want_write ? "true" : "false",
733 );
734 $self->on (sub {
735 my ($self, $type, $kv) = @_;
736
737 if ($type eq "test_dda_reply") {
738 # the filenames are all relative to the server-side directory,
739 # which might or might not match $remote anymore, so we
740 # need to rewrite the paths to be relative to $local
741 for my $k (qw(read_filename write_filename)) {
742 my $f = $kv->{$k};
743 for my $dir ($kv->{directory}, $remote) {
744 if ($dir eq substr $f, 0, length $dir) {
745 substr $f, 0, 1 + length $dir, "";
746 $kv->{$k} = $f;
747 last;
748 }
749 }
750 }
751
752 my %response = (directory => $remote);
753
754 if (length $kv->{read_filename}) {
755 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
756 sysread $fh, my $buf, -s $fh;
757 $response{read_content} = $buf;
758 }
759 }
760
761 if (length $kv->{write_filename}) {
762 if (open my $fh, ">:raw", "$local/$kv->{write_filename}") {
763 syswrite $fh, $kv->{content_to_write};
764 }
765 }
766
767 $self->send_msg (test_dda_response => %response);
768
769 $self->on (sub {
770 my ($self, $type, $kv) = @_;
771
772 $guard if 0; # reference
773
774 if ($type eq "test_dda_complete") {
775 $ok->(
776 $kv->{read_directory_allowed} eq "true",
777 $kv->{write_directory_allowed} eq "true",
778 );
779 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
780 $err->($kv->{extra_description});
781 return;
782 }
783
784 1
785 });
786
787 return;
788 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
789 $err->($kv);
790 return;
791 }
792
793 1
794 });
795 });
796};
797
798=back
799
800=head2 REQUEST CACHE
801
802The C<AnyEvent::FCP> class keeps a request cache, where it caches all
803information from requests.
804
805For these messages, it will store a copy of the key-value pairs, together with a C<type> slot,
806in C<< $fcp->{req}{$identifier} >>:
807
808 persistent_get
809 persistent_put
810 persistent_put_dir
811
812This message updates the stored data:
813
814 persistent_request_modified
815
816This message will remove this entry:
817
818 persistent_request_removed
819
820These messages get merged into the cache entry, under their
821type, i.e. a C<simple_progress> message will be stored in C<<
822$fcp->{req}{$identifier}{simple_progress} >>:
823
824 simple_progress # get/put
825
826 uri_generated # put
827 generated_metadata # put
828 started_compression # put
829 finished_compression # put
830 put_failed # put
831 put_fetchable # put
832 put_successful # put
833
834 sending_to_network # get
835 compatibility_mode # get
836 expected_hashes # get
837 expected_mime # get
838 expected_data_length # get
839 get_failed # get
840 data_found # get
841 enter_finite_cooldown # get
842
843In addition, an event (basically a fake message) of type C<request_changed> is generated
844on every change, which will be called as C<< $cb->($fcp, $kv, $type) >>, where C<$type>
845is the type of the original message triggering the change,
846
847To fill this cache with the global queue and keep it updated,
848call C<watch_global> to subscribe to updates, followed by
849C<list_persistent_requests_sync>.
850
851 $fcp->watch_global_sync_; # do not wait
852 $fcp->list_persistent_requests; # wait
853
854To get a better idea of what is stored in the cache, here is an example of
855what might be stored in C<< $fcp->{req}{"Frost-gpl.txt"} >>:
856
857 {
858 identifier => "Frost-gpl.txt",
859 uri => 'CHK@Fnx5kzdrfE,EImdzaVyEWl,AAIC--8/gpl.txt',
860 binary_blob => "false",
861 global => "true",
862 max_retries => -1,
863 max_size => 9223372036854775807,
864 persistence => "forever",
865 priority_class => 3,
866 real_time => "false",
867 return_type => "direct",
868 started => "true",
869 type => "persistent_get",
870 verbosity => 2147483647,
871 sending_to_network => {
872 identifier => "Frost-gpl.txt",
873 global => "true",
436 }, 874 },
437 ); 875 compatibility_mode => {
438}; 876 identifier => "Frost-gpl.txt",
439 877 definitive => "true",
440=back 878 dont_compress => "false",
879 global => "true",
880 max => "COMPAT_1255",
881 min => "COMPAT_1255",
882 },
883 expected_hashes => {
884 identifier => "Frost-gpl.txt",
885 global => "true",
886 hashes => {
887 ed2k => "d83596f5ee3b7...",
888 md5 => "e0894e4a2a6...",
889 sha1 => "...",
890 sha256 => "...",
891 sha512 => "...",
892 tth => "...",
893 },
894 },
895 expected_mime => {
896 identifier => "Frost-gpl.txt",
897 global => "true",
898 metadata => { content_type => "application/rar" },
899 },
900 expected_data_length => {
901 identifier => "Frost-gpl.txt",
902 data_length => 37576,
903 global => "true",
904 },
905 simple_progress => {
906 identifier => "Frost-gpl.txt",
907 failed => 0,
908 fatally_failed => 0,
909 finalized_total => "true",
910 global => "true",
911 last_progress => 1438639282628,
912 required => 372,
913 succeeded => 102,
914 total => 747,
915 },
916 data_found => {
917 identifier => "Frost-gpl.txt",
918 completion_time => 1438663354026,
919 data_length => 37576,
920 global => "true",
921 metadata => { content_type => "image/jpeg" },
922 startup_time => 1438657196167,
923 },
924 }
441 925
442=head1 EXAMPLE PROGRAM 926=head1 EXAMPLE PROGRAM
443 927
444 use AnyEvent::FCP; 928 use AnyEvent::FCP;
445 929
446 my $fcp = new AnyEvent::FCP; 930 my $fcp = new AnyEvent::FCP;
447 931
448 # let us look at the global request list 932 # let us look at the global request list
449 $fcp->watch_global (1, 0); 933 $fcp->watch_global_ (1);
450 934
451 # list them, synchronously 935 # list them, synchronously
452 my $req = $fcp->list_persistent_requests_sync; 936 my $req = $fcp->list_persistent_requests;
453 937
454 # go through all requests 938 # go through all requests
939TODO
455 for my $req (values %$req) { 940 for my $req (values %$req) {
456 # skip jobs not directly-to-disk 941 # skip jobs not directly-to-disk
457 next unless $req->{return_type} eq "disk"; 942 next unless $req->{return_type} eq "disk";
458 # skip jobs not issued by FProxy 943 # skip jobs not issued by FProxy
459 next unless $req->{identifier} =~ /^FProxy:/; 944 next unless $req->{identifier} =~ /^FProxy:/;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines