ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.4 by root, Wed Jul 29 09:25:46 2009 UTC vs.
Revision 1.17 by root, Sat Sep 5 19:36:12 2015 UTC

6 6
7 use AnyEvent::FCP; 7 use AnyEvent::FCP;
8 8
9 my $fcp = new AnyEvent::FCP; 9 my $fcp = new AnyEvent::FCP;
10 10
11# transactions return condvars 11 # transactions return condvars
12 my $lp_cv = $fcp->list_peers; 12 my $lp_cv = $fcp->list_peers;
13 my $pr_cv = $fcp->list_persistent_requests; 13 my $pr_cv = $fcp->list_persistent_requests;
14 14
15 my $peers = $lp_cv->recv; 15 my $peers = $lp_cv->recv;
16 my $reqs = $pr_cv->recv; 16 my $reqs = $pr_cv->recv;
26The module uses L<AnyEvent> to find a suitable event module. 26The module uses L<AnyEvent> to find a suitable event module.
27 27
28Only very little is implemented, ask if you need more, and look at the 28Only very little is implemented, ask if you need more, and look at the
29example program later in this section. 29example program later in this section.
30 30
31=head2 EXAMPLE
32
33This example fetches the download list and sets the priority of all files
34with "a" in their name to "emergency":
35
36 use AnyEvent::FCP;
37
38 my $fcp = new AnyEvent::FCP;
39
40 $fcp->watch_global (1, 0);
41 my $req = $fcp->list_persistent_requests;
42
43TODO
44 for my $req (values %$req) {
45 if ($req->{filename} =~ /a/) {
46 $fcp->modify_persistent_request (1, $req->{identifier}, undef, 0);
47 }
48 }
49
31=head2 IMPORT TAGS 50=head2 IMPORT TAGS
32 51
33Nothing much can be "imported" from this module right now. 52Nothing much can be "imported" from this module right now.
34 53
35=head2 THE AnyEvent::FCP CLASS 54=head1 THE AnyEvent::FCP CLASS
36 55
37=over 4 56=over 4
38 57
39=cut 58=cut
40 59
42 61
43use common::sense; 62use common::sense;
44 63
45use Carp; 64use Carp;
46 65
47our $VERSION = '0.2'; 66our $VERSION = '0.3';
48 67
49use Scalar::Util (); 68use Scalar::Util ();
50 69
51use AnyEvent; 70use AnyEvent;
52use AnyEvent::Handle; 71use AnyEvent::Handle;
72use AnyEvent::Util ();
73
74our %TOLC; # tolc cache
53 75
54sub touc($) { 76sub touc($) {
55 local $_ = shift; 77 local $_ = shift;
56 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime)(?:_|$))/\U$1/; 78 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
57 s/(?:^|_)(.)/\U$1/g; 79 s/(?:^|_)(.)/\U$1/g;
58 $_ 80 $_
59} 81}
60 82
61sub tolc($) { 83sub tolc($) {
62 local $_ = shift; 84 local $_ = shift;
63 1 while s/(SVK|CHK|URI|FCP|DS|MIME)([^_])/$1\_$2/i; 85 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/;
64 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME)/$1\_$2/i; 86 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/;
65 s/(?<=[a-z])(?=[A-Z])/_/g; 87 s/(?<=[a-z])(?=[A-Z])/_/g;
66 lc 88 lc
67} 89}
68 90
69=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 91=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, name => $name]
70 92
71Create a new FCP connection to the given host and port (default 93Create a new FCP connection to the given host and port (default
72127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). 94127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
73 95
74If no C<name> was specified, then AnyEvent::FCP will generate a 96If no C<name> was specified, then AnyEvent::FCP will generate a
75(hopefully) unique client name for you. 97(hopefully) unique client name for you.
76 98
77=cut 99=cut
78 100
79#TODO
80#You can install a progress callback that is being called with the AnyEvent::FCP
81#object, a txn object, the type of the transaction and the attributes. Use
82#it like this:
83#
84# sub progress_cb {
85# my ($self, $txn, $type, $attr) = @_;
86#
87# warn "progress<$txn,$type," . (join ":", %$attr) . ">\n";
88# }
89
90sub new { 101sub new {
91 my $class = shift; 102 my $class = shift;
103
104 my $rand = join "", map chr 0x21 + rand 94, 1..40; # ~ 262 bits entropy
105
92 my $self = bless { @_ }, $class; 106 my $self = bless {
93 107 host => $ENV{FREDHOST} || "127.0.0.1",
94 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 108 port => $ENV{FREDPORT} || 9481,
95 $self->{port} ||= $ENV{FREDPORT} || 9481; 109 timeout => 3600 * 2,
96 $self->{name} ||= time.rand.rand.rand; # lame 110 name => time.rand.rand.rand, # lame
97 $self->{timeout} ||= 600; 111 @_,
98 112 queue => [],
99 $self->{id} = "a0"; 113 req => {},
114 prefix => "..:aefcpid:$rand:",
115 idseq => "a0",
116 }, $class;
100 117
101 { 118 {
102 Scalar::Util::weaken (my $self = $self); 119 Scalar::Util::weaken (my $self = $self);
120
121 our $ENDMESSAGE = qr<\012(EndMessage|Data)\012>;
122
123 # these are declared here for performance reasons
124 my ($k, $v, $type);
125 my $rdata;
126
127 my $on_read = sub {
128 my ($hdl) = @_;
129
130 # we only carve out whole messages here
131 while ($hdl->{rbuf} =~ /\012(EndMessage|Data)\012/) {
132 # remember end marker
133 $rdata = $1 eq "Data"
134 or $1 eq "EndMessage"
135 or die "protocol error, expected message end, got $1\n";
136
137 my @lines = split /\012/, substr $hdl->{rbuf}, 0, $-[0];
138
139 substr $hdl->{rbuf}, 0, $+[0], ""; # remove pkg
140
141 $type = shift @lines;
142 $type = ($TOLC{$type} ||= tolc $type);
143
144 my %kv;
145
146 for (@lines) {
147 ($k, $v) = split /=/, $_, 2;
148 $k = ($TOLC{$k} ||= tolc $k);
149
150 if ($k =~ /\./) {
151 # generic, slow case
152 my @k = split /\./, $k;
153 my $ro = \\%kv;
154
155 while (@k) {
156 $k = shift @k;
157 if ($k =~ /^\d+$/) {
158 $ro = \$$ro->[$k];
159 } else {
160 $ro = \$$ro->{$k};
161 }
162 }
163
164 $$ro = $v;
165
166 next;
167 }
168
169 # special comon case, for performance only
170 $kv{$k} = $v;
171 }
172
173 if ($rdata) {
174 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
175 $rdata = \$_[1];
176 $self->recv ($type, \%kv, $rdata);
177 });
178
179 last; # do not tgry to parse more messages
180 } else {
181 $self->recv ($type, \%kv);
182 }
183 }
184 };
103 185
104 $self->{hdl} = new AnyEvent::Handle 186 $self->{hdl} = new AnyEvent::Handle
105 connect => [$self->{host} => $self->{port}], 187 connect => [$self->{host} => $self->{port}],
106 timeout => $self->{timeout}, 188 timeout => $self->{timeout},
107 on_error => sub { 189 on_error => sub {
108 warn "<@_>\n"; 190 warn "@_\n";#d#
109 exit 1; 191 exit 1;
110 }, 192 },
111 on_read => sub { $self->on_read (@_) }, 193 on_read => $on_read,
112 on_eof => $self->{on_eof} || sub { }; 194 on_eof => $self->{on_eof} || sub { },
195 ;
113 196
114 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 197 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
115 } 198 }
116 199
117 $self->send_msg ( 200 $self->send_msg (client_hello =>
118 client_hello =>
119 name => $self->{name}, 201 name => $self->{name},
120 expected_version => "2.0", 202 expected_version => "2.0",
121 ); 203 );
122 204
123 $self 205 $self
124} 206}
125 207
126#sub progress { 208sub identifier {
127# my ($self, $txn, $type, $attr) = @_; 209 $_[0]{prefix} . ++$_[0]{idseq}
128# 210}
129# $self->{progress}->($self, $txn, $type, $attr)
130# if $self->{progress};
131#}
132 211
133sub send_msg { 212sub send_msg {
134 my ($self, $type, %kv) = @_; 213 my ($self, $type, %kv) = @_;
135 214
136 my $data = delete $kv{data}; 215 my $data = delete $kv{data};
137 216
138 if (exists $kv{id_cb}) { 217 if (exists $kv{id_cb}) {
139 my $id = $kv{identifier} || ++$self->{id}; 218 my $id = $kv{identifier} ||= $self->identifier;
140 $self->{id}{$id} = delete $kv{id_cb}; 219 $self->{id}{$id} = delete $kv{id_cb};
141 $kv{identifier} = $id;
142 } 220 }
143 221
144 my $msg = (touc $type) . "\012" 222 my $msg = (touc $type) . "\012"
145 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; 223 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
146 224
158 } 236 }
159 237
160 $self->{hdl}->push_write ($msg); 238 $self->{hdl}->push_write ($msg);
161} 239}
162 240
163sub on_read { 241sub on {
164 my ($self) = @_; 242 my ($self, $cb) = @_;
165 243
166 my $type; 244 # cb return undef - message eaten, remove cb
167 my %kv; 245 # cb return 0 - message eaten
168 my $rdata; 246 # cb return 1 - pass to next
169 247
170 my $done_cb = sub { 248 push @{ $self->{on} }, $cb;
171 $kv{pkt_type} = $type; 249}
172 250
173 if (my $cb = $self->{queue}[0]) { 251sub _push_queue {
174 $cb->($self, $type, \%kv, $rdata) 252 my ($self, $queue) = @_;
175 and shift @{ $self->{queue} }; 253
176 } else { 254 shift @$queue;
177 $self->default_recv ($type, \%kv, $rdata); 255 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
256 if @$queue;
257}
258
259# lock so only one $type (arbitrary string) is in flight,
260# to work around horribly misdesigned protocol.
261sub serialise {
262 my ($self, $type, $cb) = @_;
263
264 my $queue = $self->{serialise}{$type} ||= [];
265 push @$queue, $cb;
266 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
267 unless $#$queue;
268}
269
270# how to merge these types into $self->{persistent}
271our %PERSISTENT_TYPE = (
272 persistent_get => sub { %{ $_[1] } = (type => "persistent_get" , %{ $_[2] }) },
273 persistent_put => sub { %{ $_[1] } = (type => "persistent_put" , %{ $_[2] }) },
274 persistent_put_dir => sub { %{ $_[1] } = (type => "persistent_put_dir", %{ $_[2] }) },
275 persistent_request_modified => sub { %{ $_[1] } = (%{ $_[1] }, %{ $_[2] }) },
276 persistent_request_removed => sub { delete $_[0]{req}{$_[2]{identifier}} },
277
278 simple_progress => sub { $_[1]{simple_progress} = $_[2] }, # get/put
279
280 uri_generated => sub { $_[1]{uri_generated} = $_[2] }, # put
281 generated_metadata => sub { $_[1]{generated_metadata} = $_[2] }, # put
282 started_compression => sub { $_[1]{started_compression} = $_[2] }, # put
283 finished_compression => sub { $_[1]{finished_compression} = $_[2] }, # put
284 put_fetchable => sub { $_[1]{put_fetchable} = $_[2] }, # put
285 put_failed => sub { $_[1]{put_failed} = $_[2] }, # put
286 put_successful => sub { $_[1]{put_successful} = $_[2] }, # put
287
288 sending_to_network => sub { $_[1]{sending_to_network} = $_[2] }, # get
289 compatibility_mode => sub { $_[1]{compatibility_mode} = $_[2] }, # get
290 expected_hashes => sub { $_[1]{expected_hashes} = $_[2] }, # get
291 expected_mime => sub { $_[1]{expected_mime} = $_[2] }, # get
292 expected_data_length => sub { $_[1]{expected_data_length} = $_[2] }, # get
293 get_failed => sub { $_[1]{get_failed} = $_[2] }, # get
294 data_found => sub { $_[1]{data_found} = $_[2] }, # get
295 enter_finite_cooldown => sub { $_[1]{enter_finite_cooldown} = $_[2] }, # get
296);
297
298sub recv {
299 my ($self, $type, $kv, @extra) = @_;
300
301 if (my $cb = $PERSISTENT_TYPE{$type}) {
302 my $id = $kv->{identifier};
303 my $req = $_[0]{req}{$id} ||= {};
304 $cb->($self, $req, $kv);
305 $self->recv (request_changed => $kv, $type, @extra);
306 }
307
308 my $on = $self->{on};
309 for (0 .. $#$on) {
310 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
311 splice @$on, $_, 1 unless defined $res;
312 return;
178 } 313 }
179 }; 314 }
180 315
181 my $hdr_cb; $hdr_cb = sub { 316 if (my $cb = $self->{queue}[0]) {
182 if ($_[1] =~ /^([^=]+)=(.*)$/) { 317 $cb->($self, $type, $kv, @extra)
183 my ($k, $v) = ($1, $2); 318 and shift @{ $self->{queue} };
184 my @k = split /\./, tolc $k;
185 my $ro = \\%kv;
186
187 while (@k) {
188 my $k = shift @k;
189 if ($k =~ /^\d+$/) {
190 $ro = \$$ro->[$k];
191 } else {
192 $ro = \$$ro->{$k};
193 }
194 }
195
196 $$ro = $v;
197
198 $_[0]->push_read (line => $hdr_cb);
199 } elsif ($_[1] eq "Data") {
200 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
201 $rdata = \$_[1];
202 $done_cb->();
203 });
204 } elsif ($_[1] eq "EndMessage") {
205 $done_cb->();
206 } else { 319 } else {
207 die "protocol error, expected message end, got $_[1]\n";#d# 320 $self->default_recv ($type, $kv, @extra);
208 }
209 }; 321 }
210
211 $self->{hdl}->push_read (line => sub {
212 $type = tolc $_[1];
213 $_[0]->push_read (line => $hdr_cb);
214 });
215} 322}
216 323
217sub default_recv { 324sub default_recv {
218 my ($self, $type, $kv, $rdata) = @_; 325 my ($self, $type, $kv, $rdata) = @_;
219 326
220 if ($type eq "node_hello") { 327 if ($type eq "node_hello") {
221 $self->{node_hello} = $kv; 328 $self->{node_hello} = $kv;
222 } elsif (exists $self->{id}{$kv->{identifier}}) { 329 } elsif (exists $self->{id}{$kv->{identifier}}) {
223 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) 330 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
224 and delete $self->{id}{$kv->{identifier}}; 331 and delete $self->{id}{$kv->{identifier}};
225 } else {
226 # on_warn
227 #warn "protocol warning (unexpected $type message)\n";
228 } 332 }
229} 333}
334
335=back
336
337=head2 FCP REQUESTS
338
339The following methods implement various requests. Most of them map
340directory to the FCP message of the same name. The added benefit of
341these over sending requests yourself is that they handle the necessary
342serialisation, protocol quirks, and replies.
343
344All of them exist in two versions, the variant shown in this manpage, and
345a variant with an extra C<_> at the end, and an extra C<$cb> argument. The
346version as shown is I<synchronous> - it will wait for any replies, and
347either return the reply, or croak with an error. The underscore variant
348returns immediately and invokes one or more callbacks or condvars later.
349
350For example, the call
351
352 $info = $fcp->get_plugin_info ($name, $detailed);
353
354Also comes in this underscore variant:
355
356 $fcp->get_plugin_info_ ($name, $detailed, $cb);
357
358You can thinbk of the underscore as a kind of continuation indicator - the
359normal function waits and returns with the data, the C<_> indicates that
360you pass the continuation yourself, and the continuation will be invoked
361with the results.
362
363This callback/continuation argument (C<$cb>) can come in three forms itself:
364
365=over 4
366
367=item A code reference (or rather anything not matching some other alternative)
368
369This code reference will be invoked with the result on success. On an
370error, it will die (in the event loop) with a backtrace of the call site.
371
372This is a popular choice, but it makes handling errors hard - make sure
373you never generate protocol errors!
374
375=item A condvar (as returned by e.g. C<< AnyEvent->condvar >>)
376
377When a condvar is passed, it is sent (C<< $cv->send ($results) >>) the
378results when the request has finished. Should an error occur, the error
379will instead result in C<< $cv->croak ($error) >>.
380
381This is also a popular choice.
382
383=item An array with two callbacks C<[$success, $failure]>
384
385The C<$success> callback will be invoked with the results, while the
386C<$failure> callback will be invoked on any errors.
387
388=item C<undef>
389
390This is the same thing as specifying C<sub { }> as callback, i.e. on
391success, the results are ignored, while on failure, you the module dies
392with a backtrace.
393
394This is good for quick scripts, or when you really aren't interested in
395the results.
396
397=back
398
399=cut
400
401our $NOP_CB = sub { };
230 402
231sub _txn { 403sub _txn {
232 my ($name, $sub) = @_; 404 my ($name, $sub) = @_;
233 405
234 *{$name} = sub { 406 *{$name} = sub {
235 splice @_, 1, 0, (my $cv = AnyEvent->condvar); 407 my $cv = AE::cv;
236 &$sub;
237 $cv
238 };
239 408
240 *{"$name\_sync"} = sub { 409 splice @_, 1, 0, $cv, sub { $cv->croak ($_[0]{extra_description}) };
241 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
242 &$sub; 410 &$sub;
243 $cv->recv 411 $cv->recv
244 }; 412 };
245}
246 413
247=item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]]) 414 *{"$name\_"} = sub {
415 my ($ok, $err) = pop;
248 416
417 if (ARRAY:: eq ref $ok) {
418 ($ok, $err) = @$ok;
419 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
420 $err = sub { $ok->croak ($_[0]{extra_description}) };
421 } else {
422 my $bt = Carp::longmess "";
423 $err = sub {
424 die "$_[0]{code_description} ($_[0]{extra_description})$bt";
425 };
426 }
427
428 $ok ||= $NOP_CB;
429
430 splice @_, 1, 0, $ok, $err;
431 &$sub;
432 };
433}
434
435=over 4
436
249=item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]]) 437=item $peers = $fcp->list_peers ([$with_metdata[, $with_volatile]])
250 438
251=cut 439=cut
252 440
253_txn list_peers => sub { 441_txn list_peers => sub {
254 my ($self, $cv, $with_metadata, $with_volatile) = @_; 442 my ($self, $ok, undef, $with_metadata, $with_volatile) = @_;
255 443
256 my @res; 444 my @res;
257 445
258 $self->send_msg (list_peers => 446 $self->send_msg (list_peers =>
259 with_metadata => $with_metadata ? "true" : "false", 447 with_metadata => $with_metadata ? "true" : "false",
260 with_volatile => $with_volatile ? "true" : "false", 448 with_volatile => $with_volatile ? "true" : "false",
261 id_cb => sub { 449 id_cb => sub {
262 my ($self, $type, $kv, $rdata) = @_; 450 my ($self, $type, $kv, $rdata) = @_;
263 451
264 if ($type eq "end_list_peers") { 452 if ($type eq "end_list_peers") {
265 $cv->(\@res); 453 $ok->(\@res);
266 1 454 1
267 } else { 455 } else {
268 push @res, $kv; 456 push @res, $kv;
269 0 457 0
270 } 458 }
271 }, 459 },
272 ); 460 );
273}; 461};
274 462
275=item $cv = $fcp->list_peer_notes ($node_identifier)
276
277=item $notes = $fcp->list_peer_notes_sync ($node_identifier) 463=item $notes = $fcp->list_peer_notes ($node_identifier)
278 464
279=cut 465=cut
280 466
281_txn list_peer_notes => sub { 467_txn list_peer_notes => sub {
282 my ($self, $cv, $node_identifier) = @_; 468 my ($self, $ok, undef, $node_identifier) = @_;
283 469
284 $self->send_msg (list_peer_notes => 470 $self->send_msg (list_peer_notes =>
285 node_identifier => $node_identifier, 471 node_identifier => $node_identifier,
286 id_cb => sub { 472 id_cb => sub {
287 my ($self, $type, $kv, $rdata) = @_; 473 my ($self, $type, $kv, $rdata) = @_;
288 474
289 $cv->($kv); 475 $ok->($kv);
290 1 476 1
291 }, 477 },
292 ); 478 );
293}; 479};
294 480
295=item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
296
297=item $fcp->watch_global_sync ($enabled[, $verbosity_mask]) 481=item $fcp->watch_global ($enabled[, $verbosity_mask])
298 482
299=cut 483=cut
300 484
301_txn watch_global => sub { 485_txn watch_global => sub {
302 my ($self, $cv, $enabled, $verbosity_mask) = @_; 486 my ($self, $ok, $err, $enabled, $verbosity_mask) = @_;
303 487
304 $self->send_msg (watch_global => 488 $self->send_msg (watch_global =>
305 enabled => $enabled ? "true" : "false", 489 enabled => $enabled ? "true" : "false",
306 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (), 490 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
307 ); 491 );
308 492
309 $cv->(); 493 $ok->();
310}; 494};
311 495
312=item $cv = $fcp->list_persistent_requests
313
314=item $reqs = $fcp->list_persistent_requests_sync 496=item $reqs = $fcp->list_persistent_requests
315 497
316=cut 498=cut
317 499
318_txn list_persistent_requests => sub { 500_txn list_persistent_requests => sub {
319 my ($self, $cv) = @_; 501 my ($self, $ok, $err) = @_;
320 502
503 $self->serialise (list_persistent_requests => sub {
504 my ($self, $guard) = @_;
505
321 my %res; 506 my @res;
322 507
323 $self->send_msg ("list_persistent_requests"); 508 $self->send_msg ("list_persistent_requests");
324 509
325 push @{ $self->{queue} }, sub { 510 $self->on (sub {
326 my ($self, $type, $kv, $rdata) = @_; 511 my ($self, $type, $kv, $rdata) = @_;
327 512
513 $guard if 0;
514
328 if ($type eq "end_list_persistent_requests") { 515 if ($type eq "end_list_persistent_requests") {
329 $cv->(\%res); 516 $ok->(\@res);
517 return;
518 } else {
519 my $id = $kv->{identifier};
520
521 if ($type =~ /^persistent_(get|put|put_dir)$/) {
522 push @res, [$type, $kv];
523 }
524 }
525
330 1 526 1
331 } else { 527 });
332 my $id = $kv->{identifier}; 528 });
529};
333 530
334 if ($type =~ /^persistent_(get|put|put_dir)$/) { 531=item $sync = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
335 $res{$id} = { 532
336 type => $1, 533Update either the C<client_token> or C<priority_class> of a request
337 %{ $res{$id} }, 534identified by C<$global> and C<$identifier>, depending on which of
535C<$client_token> and C<$priority_class> are not C<undef>.
536
537=cut
538
539_txn modify_persistent_request => sub {
540 my ($self, $ok, $err, $global, $identifier, $client_token, $priority_class) = @_;
541
542 $self->serialise ($identifier => sub {
543 my ($self, $guard) = @_;
544
545 $self->send_msg (modify_persistent_request =>
546 global => $global ? "true" : "false",
547 identifier => $identifier,
548 defined $client_token ? (client_token => $client_token ) : (),
549 defined $priority_class ? (priority_class => $priority_class) : (),
550 );
551
552 $self->on (sub {
553 my ($self, $type, $kv, @extra) = @_;
554
555 $guard if 0;
556
557 if ($kv->{identifier} eq $identifier) {
558 if ($type eq "persistent_request_modified") {
338 %$kv, 559 $ok->($kv);
560 return;
561 } elsif ($type eq "protocol_error") {
562 $err->($kv);
563 return;
339 }; 564 }
340 } elsif ($type eq "simple_progress") {
341 delete $kv->{pkt_type}; # save memory
342 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
343 } else {
344 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
345 } 565 }
566
346 0 567 1
347 } 568 });
348 }; 569 });
349}; 570};
350 571
351=item $cv = $fcp->remove_request ($global, $identifier)
352
353=item $status = $fcp->remove_request_sync ($global, $identifier)
354
355=cut
356
357_txn remove_request => sub {
358 my ($self, $cv, $global, $identifier) = @_;
359
360 $self->send_msg (remove_request =>
361 global => $global ? "true" : "false",
362 identifier => $identifier,
363 id_cb => sub {
364 my ($self, $type, $kv, $rdata) = @_;
365
366 $cv->($kv);
367 1
368 },
369 );
370};
371
372=item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
373
374=item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
375
376=cut
377
378_txn modify_persistent_request => sub {
379 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
380
381 $self->send_msg (modify_persistent_request =>
382 global => $global ? "true" : "false",
383 defined $client_token ? (client_token => $client_token ) : (),
384 defined $priority_class ? (priority_class => $priority_class) : (),
385 identifier => $identifier,
386 id_cb => sub {
387 my ($self, $type, $kv, $rdata) = @_;
388
389 $cv->($kv);
390 1
391 },
392 );
393};
394
395=item $cv = $fcp->get_plugin_info ($name, $detailed)
396
397=item $info = $fcp->get_plugin_info_sync ($name, $detailed) 572=item $info = $fcp->get_plugin_info ($name, $detailed)
398 573
399=cut 574=cut
400 575
401_txn get_plugin_info => sub { 576_txn get_plugin_info => sub {
402 my ($self, $cv, $name, $detailed) = @_; 577 my ($self, $ok, $err, $name, $detailed) = @_;
578
579 my $id = $self->identifier;
403 580
404 $self->send_msg (get_plugin_info => 581 $self->send_msg (get_plugin_info =>
582 identifier => $id,
405 plugin_name => $name, 583 plugin_name => $name,
406 detailed => $detailed ? "true" : "false", 584 detailed => $detailed ? "true" : "false",
407 id_cb => sub {
408 my ($self, $type, $kv, $rdata) = @_;
409
410 $cv->($kv);
411 1
412 },
413 ); 585 );
586 $self->on (sub {
587 my ($self, $type, $kv) = @_;
588
589 if ($kv->{identifier} eq $id) {
590 if ($type eq "get_plugin_info") {
591 $ok->($kv);
592 } else {
593 $err->($kv, $type);
594 }
595 return;
596 }
597
598 1
599 });
414}; 600};
415 601
416=item $cv = $fcp->client_get ($uri, $identifier, %kv)
417
418=item $status = $fcp->client_get_sync ($uri, $identifier, %kv) 602=item $status = $fcp->client_get ($uri, $identifier, %kv)
419 603
420%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>). 604%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
421 605
422ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries, 606ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
423priority_class, persistence, client_token, global, return_type, 607priority_class, persistence, client_token, global, return_type,
424binary_blob, allowed_mime_types, filename, temp_filename 608binary_blob, allowed_mime_types, filename, temp_filename
425 609
426=cut 610=cut
427 611
428_txn client_get => sub { 612_txn client_get => sub {
429 my ($self, $cv, $uri, $identifier, %kv) = @_; 613 my ($self, $ok, $err, $uri, $identifier, %kv) = @_;
430 614
615 $self->serialise ($identifier => sub {
616 my ($self, $guard) = @_;
617
431 $self->send_msg (client_get => 618 $self->send_msg (client_get =>
432 %kv, 619 %kv,
433 uri => $uri, 620 uri => $uri,
434 identifier => $identifier, 621 identifier => $identifier,
435 id_cb => sub { 622 );
623
624 $self->on (sub {
436 my ($self, $type, $kv, $rdata) = @_; 625 my ($self, $type, $kv, @extra) = @_;
437 626
627 $guard if 0;
628
629 if ($kv->{identifier} eq $identifier) {
630 if ($type eq "persistent_get") {
438 $cv->($kv); 631 $ok->($kv);
632 return;
633 } elsif ($type eq "protocol_error") {
634 $err->($kv);
635 return;
636 }
637 }
638
439 1 639 1
640 });
641 });
642};
643
644=item $status = $fcp->remove_request ($identifier[, $global])
645
646Remove the request with the given isdentifier. Returns true if successful,
647false on error.
648
649=cut
650
651_txn remove_request => sub {
652 my ($self, $ok, $err, $identifier, $global) = @_;
653
654 $self->serialise ($identifier => sub {
655 my ($self, $guard) = @_;
656
657 $self->send_msg (remove_request =>
658 identifier => $identifier,
659 global => $global ? "true" : "false",
660 );
661 $self->on (sub {
662 my ($self, $type, $kv, @extra) = @_;
663
664 $guard if 0;
665
666 if ($kv->{identifier} eq $identifier) {
667 if ($type eq "persistent_request_removed") {
668 $ok->(1);
669 return;
670 } elsif ($type eq "protocol_error") {
671 $err->($kv);
672 return;
673 }
674 }
675
676 1
677 });
678 });
679};
680
681=item ($can_read, $can_write) = $fcp->test_dda ($local_directory, $remote_directory, $want_read, $want_write))
682
683The DDA test in FCP is probably the single most broken protocol - only
684one directory test can be outstanding at any time, and some guessing and
685heuristics are involved in mangling the paths.
686
687This function combines C<TestDDARequest> and C<TestDDAResponse> in one
688request, handling file reading and writing as well, and tries very hard to
689do the right thing.
690
691Both C<$local_directory> and C<$remote_directory> must specify the same
692directory - C<$local_directory> is the directory path on the client (where
693L<AnyEvent::FCP> runs) and C<$remote_directory> is the directory path on
694the server (where the freenet node runs). When both are running on the
695same node, the paths are generally identical.
696
697C<$want_read> and C<$want_write> should be set to a true value when you
698want to read (get) files or write (put) files, respectively.
699
700On error, an exception is thrown. Otherwise, C<$can_read> and
701C<$can_write> indicate whether you can reaqd or write to freenet via the
702directory.
703
704=cut
705
706_txn test_dda => sub {
707 my ($self, $ok, $err, $local, $remote, $want_read, $want_write) = @_;
708
709 $self->serialise (test_dda => sub {
710 my ($self, $guard) = @_;
711
712 $self->send_msg (test_dda_request =>
713 directory => $remote,
714 want_read_directory => $want_read ? "true" : "false",
715 want_write_directory => $want_write ? "true" : "false",
716 );
717 $self->on (sub {
718 my ($self, $type, $kv) = @_;
719
720 if ($type eq "test_dda_reply") {
721 # the filenames are all relative to the server-side directory,
722 # which might or might not match $remote anymore, so we
723 # need to rewrite the paths to be relative to $local
724 for my $k (qw(read_filename write_filename)) {
725 my $f = $kv->{$k};
726 for my $dir ($kv->{directory}, $remote) {
727 if ($dir eq substr $f, 0, length $dir) {
728 substr $f, 0, 1 + length $dir, "";
729 $kv->{$k} = $f;
730 last;
731 }
732 }
733 }
734
735 my %response = (directory => $remote);
736
737 if (length $kv->{read_filename}) {
738 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
739 sysread $fh, my $buf, -s $fh;
740 $response{read_content} = $buf;
741 }
742 }
743
744 if (length $kv->{write_filename}) {
745 if (open my $fh, ">:raw", "$local/$kv->{write_filename}") {
746 syswrite $fh, $kv->{content_to_write};
747 }
748 }
749
750 $self->send_msg (test_dda_response => %response);
751
752 $self->on (sub {
753 my ($self, $type, $kv) = @_;
754
755 $guard if 0; # reference
756
757 if ($type eq "test_dda_complete") {
758 $ok->(
759 $kv->{read_directory_allowed} eq "true",
760 $kv->{write_directory_allowed} eq "true",
761 );
762 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
763 $err->($kv->{extra_description});
764 return;
765 }
766
767 1
768 });
769
770 return;
771 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
772 $err->($kv);
773 return;
774 }
775
776 1
777 });
778 });
779};
780
781=back
782
783=head2 REQUEST CACHE
784
785The C<AnyEvent::FCP> class keeps a request cache, where it caches all
786information from requests.
787
788For these messages, it will store a copy of the key-value pairs, together with a C<type> slot,
789in C<< $fcp->{req}{$identifier} >>:
790
791 persistent_get
792 persistent_put
793 persistent_put_dir
794
795This message updates the stored data:
796
797 persistent_request_modified
798
799This message will remove this entry:
800
801 persistent_request_removed
802
803These messages get merged into the cache entry, under their
804type, i.e. a C<simple_progress> message will be stored in C<<
805$fcp->{req}{$identifier}{simple_progress} >>:
806
807 simple_progress # get/put
808
809 uri_generated # put
810 generated_metadata # put
811 started_compression # put
812 finished_compression # put
813 put_failed # put
814 put_fetchable # put
815 put_successful # put
816
817 sending_to_network # get
818 compatibility_mode # get
819 expected_hashes # get
820 expected_mime # get
821 expected_data_length # get
822 get_failed # get
823 data_found # get
824 enter_finite_cooldown # get
825
826In addition, an event (basically a fake message) of type C<request_changed> is generated
827on every change, which will be called as C<< $cb->($fcp, $kv, $type) >>, where C<$type>
828is the type of the original message triggering the change,
829
830To fill this cache with the global queue and keep it updated,
831call C<watch_global> to subscribe to updates, followed by
832C<list_persistent_requests_sync>.
833
834 $fcp->watch_global_sync_; # do not wait
835 $fcp->list_persistent_requests; # wait
836
837To get a better idea of what is stored in the cache, here is an example of
838what might be stored in C<< $fcp->{req}{"Frost-gpl.txt"} >>:
839
840 {
841 identifier => "Frost-gpl.txt",
842 uri => 'CHK@Fnx5kzdrfE,EImdzaVyEWl,AAIC--8/gpl.txt',
843 binary_blob => "false",
844 global => "true",
845 max_retries => -1,
846 max_size => 9223372036854775807,
847 persistence => "forever",
848 priority_class => 3,
849 real_time => "false",
850 return_type => "direct",
851 started => "true",
852 type => "persistent_get",
853 verbosity => 2147483647,
854 sending_to_network => {
855 identifier => "Frost-gpl.txt",
856 global => "true",
440 }, 857 },
441 ); 858 compatibility_mode => {
442}; 859 identifier => "Frost-gpl.txt",
443 860 definitive => "true",
444=back 861 dont_compress => "false",
862 global => "true",
863 max => "COMPAT_1255",
864 min => "COMPAT_1255",
865 },
866 expected_hashes => {
867 identifier => "Frost-gpl.txt",
868 global => "true",
869 hashes => {
870 ed2k => "d83596f5ee3b7...",
871 md5 => "e0894e4a2a6...",
872 sha1 => "...",
873 sha256 => "...",
874 sha512 => "...",
875 tth => "...",
876 },
877 },
878 expected_mime => {
879 identifier => "Frost-gpl.txt",
880 global => "true",
881 metadata => { content_type => "application/rar" },
882 },
883 expected_data_length => {
884 identifier => "Frost-gpl.txt",
885 data_length => 37576,
886 global => "true",
887 },
888 simple_progress => {
889 identifier => "Frost-gpl.txt",
890 failed => 0,
891 fatally_failed => 0,
892 finalized_total => "true",
893 global => "true",
894 last_progress => 1438639282628,
895 required => 372,
896 succeeded => 102,
897 total => 747,
898 },
899 data_found => {
900 identifier => "Frost-gpl.txt",
901 completion_time => 1438663354026,
902 data_length => 37576,
903 global => "true",
904 metadata => { content_type => "image/jpeg" },
905 startup_time => 1438657196167,
906 },
907 }
445 908
446=head1 EXAMPLE PROGRAM 909=head1 EXAMPLE PROGRAM
447 910
448 use AnyEvent::FCP; 911 use AnyEvent::FCP;
449 912
450 my $fcp = new AnyEvent::FCP; 913 my $fcp = new AnyEvent::FCP;
451 914
452 # let us look at the global request list 915 # let us look at the global request list
453 $fcp->watch_global (1, 0); 916 $fcp->watch_global_ (1);
454 917
455 # list them, synchronously 918 # list them, synchronously
456 my $req = $fcp->list_persistent_requests_sync; 919 my $req = $fcp->list_persistent_requests;
457 920
458 # go through all requests 921 # go through all requests
922TODO
459 for my $req (values %$req) { 923 for my $req (values %$req) {
460 # skip jobs not directly-to-disk 924 # skip jobs not directly-to-disk
461 next unless $req->{return_type} eq "disk"; 925 next unless $req->{return_type} eq "disk";
462 # skip jobs not issued by FProxy 926 # skip jobs not issued by FProxy
463 next unless $req->{identifier} =~ /^FProxy:/; 927 next unless $req->{identifier} =~ /^FProxy:/;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines