ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.9 by root, Tue Aug 4 00:35:16 2015 UTC vs.
Revision 1.19 by root, Tue Jun 7 18:53:23 2016 UTC

35 35
36 use AnyEvent::FCP; 36 use AnyEvent::FCP;
37 37
38 my $fcp = new AnyEvent::FCP; 38 my $fcp = new AnyEvent::FCP;
39 39
40 $fcp->watch_global_sync (1, 0); 40 $fcp->watch_global (1, 0);
41 my $req = $fcp->list_persistent_requests_sync; 41 my $req = $fcp->list_persistent_requests;
42 42
43TODO
43 for my $req (values %$req) { 44 for my $req (values %$req) {
44 if ($req->{filename} =~ /a/) { 45 if ($req->{filename} =~ /a/) {
45 $fcp->modify_persistent_request_sync (1, $req->{identifier}, undef, 0); 46 $fcp->modify_persistent_request (1, $req->{identifier}, undef, 0);
46 } 47 }
47 } 48 }
48 49
49=head2 IMPORT TAGS 50=head2 IMPORT TAGS
50 51
51Nothing much can be "imported" from this module right now. 52Nothing much can be "imported" from this module right now.
52 53
53=head2 THE AnyEvent::FCP CLASS 54=head1 THE AnyEvent::FCP CLASS
54 55
55=over 4 56=over 4
56 57
57=cut 58=cut
58 59
60 61
61use common::sense; 62use common::sense;
62 63
63use Carp; 64use Carp;
64 65
65our $VERSION = '0.3'; 66our $VERSION = 0.4;
66 67
67use Scalar::Util (); 68use Scalar::Util ();
68 69
69use AnyEvent; 70use AnyEvent;
70use AnyEvent::Handle; 71use AnyEvent::Handle;
71use AnyEvent::Util (); 72use AnyEvent::Util ();
73
74our %TOLC; # tolc cache
72 75
73sub touc($) { 76sub touc($) {
74 local $_ = shift; 77 local $_ = shift;
75 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/; 78 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
76 s/(?:^|_)(.)/\U$1/g; 79 s/(?:^|_)(.)/\U$1/g;
77 $_ 80 $_
78} 81}
79 82
80sub tolc($) { 83sub tolc($) {
81 local $_ = shift; 84 local $_ = shift;
82 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/i; 85 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/;
83 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/i; 86 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/;
84 s/(?<=[a-z])(?=[A-Z])/_/g; 87 s/(?<=[a-z])(?=[A-Z])/_/g;
85 lc 88 lc
86} 89}
87 90
88=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 91=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, name => $name]
89 92
90Create a new FCP connection to the given host and port (default 93Create a new FCP connection to the given host and port (default
91127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). 94127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
92 95
93If no C<name> was specified, then AnyEvent::FCP will generate a 96If no C<name> was specified, then AnyEvent::FCP will generate a
94(hopefully) unique client name for you. 97(hopefully) unique client name for you.
95 98
96You can install a progress callback that is being called with the AnyEvent::FCP
97object, the type, a hashref with key-value pairs and a reference to any received data,
98for all unsolicited messages.
99
100Example:
101
102 sub progress_cb {
103 my ($self, $type, $kv, $rdata) = @_;
104
105 if ($type eq "simple_progress") {
106 warn "$kv->{identifier} $kv->{succeeded}/$kv->{required}\n";
107 }
108 }
109
110=cut 99=cut
111 100
112sub new { 101sub new {
113 my $class = shift; 102 my $class = shift;
103
104 my $rand = join "", map chr 0x21 + rand 94, 1..40; # ~ 262 bits entropy
105
114 my $self = bless { @_ }, $class; 106 my $self = bless {
115 107 host => $ENV{FREDHOST} || "127.0.0.1",
116 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 108 port => $ENV{FREDPORT} || 9481,
117 $self->{port} ||= $ENV{FREDPORT} || 9481; 109 timeout => 3600 * 2,
118 $self->{name} ||= time.rand.rand.rand; # lame 110 keepalive => 9 * 60,
119 $self->{timeout} ||= 3600*2; 111 name => time.rand.rand.rand, # lame
120 $self->{progress} ||= sub { }; 112 @_,
121 113 queue => [],
122 $self->{id} = "a0"; 114 req => {},
115 prefix => "..:aefcpid:$rand:",
116 idseq => "a0",
117 }, $class;
123 118
124 { 119 {
125 Scalar::Util::weaken (my $self = $self); 120 Scalar::Util::weaken (my $self = $self);
121
122 $self->{kw} = AE::timer $self->{keepalive}, $self->{keepalive}, sub {
123 $self->{hdl}->push_write ("\n");
124 };
125
126 our $ENDMESSAGE = qr<\012(EndMessage|Data)\012>;
127
128 # these are declared here for performance reasons
129 my ($k, $v, $type);
130 my $rdata;
131
132 my $on_read = sub {
133 my ($hdl) = @_;
134
135 # we only carve out whole messages here
136 while ($hdl->{rbuf} =~ /\012(EndMessage|Data)\012/) {
137 # remember end marker
138 $rdata = $1 eq "Data"
139 or $1 eq "EndMessage"
140 or return $self->fatal ("protocol error, expected message end, got $1\n");
141
142 my @lines = split /\012/, substr $hdl->{rbuf}, 0, $-[0];
143
144 substr $hdl->{rbuf}, 0, $+[0], ""; # remove pkg
145
146 $type = shift @lines;
147 $type = ($TOLC{$type} ||= tolc $type);
148
149 my %kv;
150
151 for (@lines) {
152 ($k, $v) = split /=/, $_, 2;
153 $k = ($TOLC{$k} ||= tolc $k);
154
155 if ($k =~ /\./) {
156 # generic, slow case
157 my @k = split /\./, $k;
158 my $ro = \\%kv;
159
160 while (@k) {
161 $k = shift @k;
162 if ($k =~ /^\d+$/) {
163 $ro = \$$ro->[$k];
164 } else {
165 $ro = \$$ro->{$k};
166 }
167 }
168
169 $$ro = $v;
170
171 next;
172 }
173
174 # special comon case, for performance only
175 $kv{$k} = $v;
176 }
177
178 if ($rdata) {
179 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
180 $rdata = \$_[1];
181 $self->recv ($type, \%kv, $rdata);
182 });
183
184 last; # do not tgry to parse more messages
185 } else {
186 $self->recv ($type, \%kv);
187 }
188 }
189 };
126 190
127 $self->{hdl} = new AnyEvent::Handle 191 $self->{hdl} = new AnyEvent::Handle
128 connect => [$self->{host} => $self->{port}], 192 connect => [$self->{host} => $self->{port}],
129 timeout => $self->{timeout}, 193 timeout => $self->{timeout},
194 on_read => $on_read,
195 on_eof => $self->{on_eof},
130 on_error => sub { 196 on_error => sub {
131 warn "@_\n";#d# 197 $self->fatal ($_[2]);
132 exit 1;
133 }, 198 },
134 on_read => sub { $self->on_read (@_) }, 199 ;
135 on_eof => $self->{on_eof} || sub { };
136 200
137 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 201 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
138 } 202 }
139 203
140 $self->send_msg ( 204 $self->send_msg (client_hello =>
141 client_hello =>
142 name => $self->{name}, 205 name => $self->{name},
143 expected_version => "2.0", 206 expected_version => "2.0",
144 ); 207 );
145 208
146 $self 209 $self
147} 210}
148 211
212sub fatal {
213 my ($self, $msg) = @_;
214
215 $self->{hdl}->shutdown;
216 delete $self->{kw};
217
218 if ($self->{on_error}) {
219 $self->{on_error}->($msg);
220 } else {
221 die $msg;
222 }
223}
224
225sub identifier {
226 $_[0]{prefix} . ++$_[0]{idseq}
227}
228
149sub send_msg { 229sub send_msg {
150 my ($self, $type, %kv) = @_; 230 my ($self, $type, %kv) = @_;
151 231
152 my $data = delete $kv{data}; 232 my $data = delete $kv{data};
153 233
154 if (exists $kv{id_cb}) { 234 if (exists $kv{id_cb}) {
155 my $id = $kv{identifier} ||= ++$self->{id}; 235 my $id = $kv{identifier} ||= $self->identifier;
156 $self->{id}{$id} = delete $kv{id_cb}; 236 $self->{id}{$id} = delete $kv{id_cb};
157 } 237 }
158 238
159 my $msg = (touc $type) . "\012" 239 my $msg = (touc $type) . "\012"
160 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; 240 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
186} 266}
187 267
188sub _push_queue { 268sub _push_queue {
189 my ($self, $queue) = @_; 269 my ($self, $queue) = @_;
190 270
191 warn "oush @$queue\n";#d#
192 shift @$queue; 271 shift @$queue;
193 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) }) 272 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
194 if @$queue; 273 if @$queue;
195} 274}
196 275
203 push @$queue, $cb; 282 push @$queue, $cb;
204 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) }) 283 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
205 unless $#$queue; 284 unless $#$queue;
206} 285}
207 286
208sub on_read { 287# how to merge these types into $self->{persistent}
209 my ($self) = @_; 288our %PERSISTENT_TYPE = (
289 persistent_get => sub { %{ $_[1] } = (type => "persistent_get" , %{ $_[2] }) },
290 persistent_put => sub { %{ $_[1] } = (type => "persistent_put" , %{ $_[2] }) },
291 persistent_put_dir => sub { %{ $_[1] } = (type => "persistent_put_dir", %{ $_[2] }) },
292 persistent_request_modified => sub { %{ $_[1] } = (%{ $_[1] }, %{ $_[2] }) },
293 persistent_request_removed => sub { delete $_[0]{req}{$_[2]{identifier}} },
210 294
211 my $type; 295 simple_progress => sub { $_[1]{simple_progress} = $_[2] }, # get/put
212 my %kv;
213 my $rdata;
214 296
215 my $done_cb = sub { 297 uri_generated => sub { $_[1]{uri_generated} = $_[2] }, # put
216 $kv{pkt_type} = $type; 298 generated_metadata => sub { $_[1]{generated_metadata} = $_[2] }, # put
299 started_compression => sub { $_[1]{started_compression} = $_[2] }, # put
300 finished_compression => sub { $_[1]{finished_compression} = $_[2] }, # put
301 put_fetchable => sub { $_[1]{put_fetchable} = $_[2] }, # put
302 put_failed => sub { $_[1]{put_failed} = $_[2] }, # put
303 put_successful => sub { $_[1]{put_successful} = $_[2] }, # put
217 304
305 sending_to_network => sub { $_[1]{sending_to_network} = $_[2] }, # get
306 compatibility_mode => sub { $_[1]{compatibility_mode} = $_[2] }, # get
307 expected_hashes => sub { $_[1]{expected_hashes} = $_[2] }, # get
308 expected_mime => sub { $_[1]{expected_mime} = $_[2] }, # get
309 expected_data_length => sub { $_[1]{expected_data_length} = $_[2] }, # get
310 get_failed => sub { $_[1]{get_failed} = $_[2] }, # get
311 data_found => sub { $_[1]{data_found} = $_[2] }, # get
312 enter_finite_cooldown => sub { $_[1]{enter_finite_cooldown} = $_[2] }, # get
313);
314
315sub recv {
316 my ($self, $type, $kv, @extra) = @_;
317
318 if (my $cb = $PERSISTENT_TYPE{$type}) {
319 my $id = $kv->{identifier};
320 my $req = $_[0]{req}{$id} ||= {};
321 $cb->($self, $req, $kv);
322 $self->recv (request_changed => $kv, $type, @extra);
323 }
324
218 my $on = $self->{on}; 325 my $on = $self->{on};
219 for (0 .. $#$on) { 326 for (0 .. $#$on) {
220 unless (my $res = $on->[$_]($type, \%kv, $rdata)) { 327 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
221 splice @$on, $_, 1 unless defined $res; 328 splice @$on, $_, 1 unless defined $res;
222 return; 329 return;
223 }
224 } 330 }
331 }
225 332
226 if (my $cb = $self->{queue}[0]) { 333 if (my $cb = $self->{queue}[0]) {
227 $cb->($self, $type, \%kv, $rdata) 334 $cb->($self, $type, $kv, @extra)
228 and shift @{ $self->{queue} }; 335 and shift @{ $self->{queue} };
229 } else { 336 } else {
230 $self->default_recv ($type, \%kv, $rdata); 337 $self->default_recv ($type, $kv, @extra);
231 }
232 }; 338 }
233
234 my $hdr_cb; $hdr_cb = sub {
235 if ($_[1] =~ /^([^=]+)=(.*)$/) {
236 my ($k, $v) = ($1, $2);
237 my @k = split /\./, tolc $k;
238 my $ro = \\%kv;
239
240 while (@k) {
241 my $k = shift @k;
242 if ($k =~ /^\d+$/) {
243 $ro = \$$ro->[$k];
244 } else {
245 $ro = \$$ro->{$k};
246 }
247 }
248
249 $$ro = $v;
250
251 $_[0]->push_read (line => $hdr_cb);
252 } elsif ($_[1] eq "Data") {
253 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
254 $rdata = \$_[1];
255 $done_cb->();
256 });
257 } elsif ($_[1] eq "EndMessage") {
258 $done_cb->();
259 } else {
260 die "protocol error, expected message end, got $_[1]\n";#d#
261 }
262 };
263
264 $self->{hdl}->push_read (line => sub {
265 $type = tolc $_[1];
266 $_[0]->push_read (line => $hdr_cb);
267 });
268} 339}
269 340
270sub default_recv { 341sub default_recv {
271 my ($self, $type, $kv, $rdata) = @_; 342 my ($self, $type, $kv, $rdata) = @_;
272 343
273 if ($type eq "node_hello") { 344 if ($type eq "node_hello") {
274 $self->{node_hello} = $kv; 345 $self->{node_hello} = $kv;
275 } elsif (exists $self->{id}{$kv->{identifier}}) { 346 } elsif (exists $self->{id}{$kv->{identifier}}) {
276 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) 347 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
277 and delete $self->{id}{$kv->{identifier}}; 348 and delete $self->{id}{$kv->{identifier}};
278 } else {
279 &{ $self->{progress} };
280 } 349 }
281} 350}
351
352=back
353
354=head2 FCP REQUESTS
355
356The following methods implement various requests. Most of them map
357directory to the FCP message of the same name. The added benefit of
358these over sending requests yourself is that they handle the necessary
359serialisation, protocol quirks, and replies.
360
361All of them exist in two versions, the variant shown in this manpage, and
362a variant with an extra C<_> at the end, and an extra C<$cb> argument. The
363version as shown is I<synchronous> - it will wait for any replies, and
364either return the reply, or croak with an error. The underscore variant
365returns immediately and invokes one or more callbacks or condvars later.
366
367For example, the call
368
369 $info = $fcp->get_plugin_info ($name, $detailed);
370
371Also comes in this underscore variant:
372
373 $fcp->get_plugin_info_ ($name, $detailed, $cb);
374
375You can thinbk of the underscore as a kind of continuation indicator - the
376normal function waits and returns with the data, the C<_> indicates that
377you pass the continuation yourself, and the continuation will be invoked
378with the results.
379
380This callback/continuation argument (C<$cb>) can come in three forms itself:
381
382=over 4
383
384=item A code reference (or rather anything not matching some other alternative)
385
386This code reference will be invoked with the result on success. On an
387error, it will die (in the event loop) with a backtrace of the call site.
388
389This is a popular choice, but it makes handling errors hard - make sure
390you never generate protocol errors!
391
392=item A condvar (as returned by e.g. C<< AnyEvent->condvar >>)
393
394When a condvar is passed, it is sent (C<< $cv->send ($results) >>) the
395results when the request has finished. Should an error occur, the error
396will instead result in C<< $cv->croak ($error) >>.
397
398This is also a popular choice.
399
400=item An array with two callbacks C<[$success, $failure]>
401
402The C<$success> callback will be invoked with the results, while the
403C<$failure> callback will be invoked on any errors.
404
405=item C<undef>
406
407This is the same thing as specifying C<sub { }> as callback, i.e. on
408success, the results are ignored, while on failure, you the module dies
409with a backtrace.
410
411This is good for quick scripts, or when you really aren't interested in
412the results.
413
414=back
415
416=cut
417
418our $NOP_CB = sub { };
282 419
283sub _txn { 420sub _txn {
284 my ($name, $sub) = @_; 421 my ($name, $sub) = @_;
285 422
286 *{$name} = sub { 423 *{$name} = sub {
287 splice @_, 1, 0, (my $cv = AnyEvent->condvar); 424 my $cv = AE::cv;
288 &$sub;
289 $cv
290 };
291 425
292 *{"$name\_sync"} = sub { 426 splice @_, 1, 0, $cv, sub { $cv->croak ($_[0]{extra_description}) };
293 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
294 &$sub; 427 &$sub;
295 $cv->recv 428 $cv->recv
296 }; 429 };
297}
298 430
299=item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]]) 431 *{"$name\_"} = sub {
432 my ($ok, $err) = pop;
300 433
434 if (ARRAY:: eq ref $ok) {
435 ($ok, $err) = @$ok;
436 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
437 $err = sub { $ok->croak ($_[0]{extra_description}) };
438 } else {
439 my $bt = Carp::longmess "";
440 $err = sub {
441 die "$_[0]{code_description} ($_[0]{extra_description})$bt";
442 };
443 }
444
445 $ok ||= $NOP_CB;
446
447 splice @_, 1, 0, $ok, $err;
448 &$sub;
449 };
450}
451
452=over 4
453
301=item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]]) 454=item $peers = $fcp->list_peers ([$with_metdata[, $with_volatile]])
302 455
303=cut 456=cut
304 457
305_txn list_peers => sub { 458_txn list_peers => sub {
306 my ($self, $cv, $with_metadata, $with_volatile) = @_; 459 my ($self, $ok, undef, $with_metadata, $with_volatile) = @_;
307 460
308 my @res; 461 my @res;
309 462
310 $self->send_msg (list_peers => 463 $self->send_msg (list_peers =>
311 with_metadata => $with_metadata ? "true" : "false", 464 with_metadata => $with_metadata ? "true" : "false",
312 with_volatile => $with_volatile ? "true" : "false", 465 with_volatile => $with_volatile ? "true" : "false",
313 id_cb => sub { 466 id_cb => sub {
314 my ($self, $type, $kv, $rdata) = @_; 467 my ($self, $type, $kv, $rdata) = @_;
315 468
316 if ($type eq "end_list_peers") { 469 if ($type eq "end_list_peers") {
317 $cv->(\@res); 470 $ok->(\@res);
318 1 471 1
319 } else { 472 } else {
320 push @res, $kv; 473 push @res, $kv;
321 0 474 0
322 } 475 }
323 }, 476 },
324 ); 477 );
325}; 478};
326 479
327=item $cv = $fcp->list_peer_notes ($node_identifier)
328
329=item $notes = $fcp->list_peer_notes_sync ($node_identifier) 480=item $notes = $fcp->list_peer_notes ($node_identifier)
330 481
331=cut 482=cut
332 483
333_txn list_peer_notes => sub { 484_txn list_peer_notes => sub {
334 my ($self, $cv, $node_identifier) = @_; 485 my ($self, $ok, undef, $node_identifier) = @_;
335 486
336 $self->send_msg (list_peer_notes => 487 $self->send_msg (list_peer_notes =>
337 node_identifier => $node_identifier, 488 node_identifier => $node_identifier,
338 id_cb => sub { 489 id_cb => sub {
339 my ($self, $type, $kv, $rdata) = @_; 490 my ($self, $type, $kv, $rdata) = @_;
340 491
341 $cv->($kv); 492 $ok->($kv);
342 1 493 1
343 }, 494 },
344 ); 495 );
345}; 496};
346 497
347=item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
348
349=item $fcp->watch_global_sync ($enabled[, $verbosity_mask]) 498=item $fcp->watch_global ($enabled[, $verbosity_mask])
350 499
351=cut 500=cut
352 501
353_txn watch_global => sub { 502_txn watch_global => sub {
354 my ($self, $cv, $enabled, $verbosity_mask) = @_; 503 my ($self, $ok, $err, $enabled, $verbosity_mask) = @_;
355 504
356 $self->send_msg (watch_global => 505 $self->send_msg (watch_global =>
357 enabled => $enabled ? "true" : "false", 506 enabled => $enabled ? "true" : "false",
358 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (), 507 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
359 ); 508 );
360 509
361 $cv->(); 510 $ok->();
362}; 511};
363 512
364=item $cv = $fcp->list_persistent_requests
365
366=item $reqs = $fcp->list_persistent_requests_sync 513=item $reqs = $fcp->list_persistent_requests
367 514
368=cut 515=cut
369 516
370_txn list_persistent_requests => sub { 517_txn list_persistent_requests => sub {
371 my ($self, $cv) = @_; 518 my ($self, $ok, $err) = @_;
372 519
520 $self->serialise (list_persistent_requests => sub {
521 my ($self, $guard) = @_;
522
373 my %res; 523 my @res;
374 524
375 $self->send_msg ("list_persistent_requests"); 525 $self->send_msg ("list_persistent_requests");
376 526
377 push @{ $self->{queue} }, sub { 527 $self->on (sub {
378 my ($self, $type, $kv, $rdata) = @_; 528 my ($self, $type, $kv, $rdata) = @_;
379 529
530 $guard if 0;
531
380 if ($type eq "end_list_persistent_requests") { 532 if ($type eq "end_list_persistent_requests") {
381 $cv->(\%res); 533 $ok->(\@res);
534 return;
535 } else {
536 my $id = $kv->{identifier};
537
538 if ($type =~ /^persistent_(get|put|put_dir)$/) {
539 push @res, [$type, $kv];
540 }
541 }
542
382 1 543 1
383 } else { 544 });
384 my $id = $kv->{identifier}; 545 });
546};
385 547
386 if ($type =~ /^persistent_(get|put|put_dir)$/) { 548=item $sync = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
387 $res{$id} = { 549
388 type => $1, 550Update either the C<client_token> or C<priority_class> of a request
389 %{ $res{$id} }, 551identified by C<$global> and C<$identifier>, depending on which of
552C<$client_token> and C<$priority_class> are not C<undef>.
553
554=cut
555
556_txn modify_persistent_request => sub {
557 my ($self, $ok, $err, $global, $identifier, $client_token, $priority_class) = @_;
558
559 $self->serialise ($identifier => sub {
560 my ($self, $guard) = @_;
561
562 $self->send_msg (modify_persistent_request =>
563 global => $global ? "true" : "false",
564 identifier => $identifier,
565 defined $client_token ? (client_token => $client_token ) : (),
566 defined $priority_class ? (priority_class => $priority_class) : (),
567 );
568
569 $self->on (sub {
570 my ($self, $type, $kv, @extra) = @_;
571
572 $guard if 0;
573
574 if ($kv->{identifier} eq $identifier) {
575 if ($type eq "persistent_request_modified") {
390 %$kv, 576 $ok->($kv);
577 return;
578 } elsif ($type eq "protocol_error") {
579 $err->($kv);
580 return;
391 }; 581 }
392 } elsif ($type eq "simple_progress") {
393 delete $kv->{pkt_type}; # save memory
394 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
395 } else {
396 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
397 } 582 }
583
398 0 584 1
399 } 585 });
400 }; 586 });
401}; 587};
402 588
403=item $cv = $fcp->remove_request ($global, $identifier)
404
405=item $status = $fcp->remove_request_sync ($global, $identifier)
406
407=cut
408
409_txn remove_request => sub {
410 my ($self, $cv, $global, $identifier) = @_;
411
412 $self->send_msg (remove_request =>
413 global => $global ? "true" : "false",
414 identifier => $identifier,
415 id_cb => sub {
416 my ($self, $type, $kv, $rdata) = @_;
417
418 $cv->($kv);
419 1
420 },
421 );
422};
423
424=item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
425
426=item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
427
428=cut
429
430_txn modify_persistent_request => sub {
431 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
432
433 $self->send_msg (modify_persistent_request =>
434 global => $global ? "true" : "false",
435 defined $client_token ? (client_token => $client_token ) : (),
436 defined $priority_class ? (priority_class => $priority_class) : (),
437 identifier => $identifier,
438 id_cb => sub {
439 my ($self, $type, $kv, $rdata) = @_;
440
441 $cv->($kv);
442 1
443 },
444 );
445};
446
447=item $cv = $fcp->get_plugin_info ($name, $detailed)
448
449=item $info = $fcp->get_plugin_info_sync ($name, $detailed) 589=item $info = $fcp->get_plugin_info ($name, $detailed)
450 590
451=cut 591=cut
452 592
453_txn get_plugin_info => sub { 593_txn get_plugin_info => sub {
454 my ($self, $cv, $name, $detailed) = @_; 594 my ($self, $ok, $err, $name, $detailed) = @_;
595
596 my $id = $self->identifier;
455 597
456 $self->send_msg (get_plugin_info => 598 $self->send_msg (get_plugin_info =>
599 identifier => $id,
457 plugin_name => $name, 600 plugin_name => $name,
458 detailed => $detailed ? "true" : "false", 601 detailed => $detailed ? "true" : "false",
459 id_cb => sub {
460 my ($self, $type, $kv, $rdata) = @_;
461
462 $cv->($kv);
463 1
464 },
465 ); 602 );
603 $self->on (sub {
604 my ($self, $type, $kv) = @_;
605
606 if ($kv->{identifier} eq $id) {
607 if ($type eq "get_plugin_info") {
608 $ok->($kv);
609 } else {
610 $err->($kv, $type);
611 }
612 return;
613 }
614
615 1
616 });
466}; 617};
467 618
468=item $cv = $fcp->client_get ($uri, $identifier, %kv)
469
470=item $status = $fcp->client_get_sync ($uri, $identifier, %kv) 619=item $status = $fcp->client_get ($uri, $identifier, %kv)
471 620
472%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>). 621%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
473 622
474ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries, 623ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
475priority_class, persistence, client_token, global, return_type, 624priority_class, persistence, client_token, global, return_type,
476binary_blob, allowed_mime_types, filename, temp_filename 625binary_blob, allowed_mime_types, filename, temp_filename
477 626
478=cut 627=cut
479 628
480_txn client_get => sub { 629_txn client_get => sub {
481 my ($self, $cv, $uri, $identifier, %kv) = @_; 630 my ($self, $ok, $err, $uri, $identifier, %kv) = @_;
482 631
632 $self->serialise ($identifier => sub {
633 my ($self, $guard) = @_;
634
483 $self->send_msg (client_get => 635 $self->send_msg (client_get =>
484 %kv, 636 %kv,
485 uri => $uri, 637 uri => $uri,
486 identifier => $identifier, 638 identifier => $identifier,
487 id_cb => sub { 639 );
640
641 $self->on (sub {
488 my ($self, $type, $kv, $rdata) = @_; 642 my ($self, $type, $kv, @extra) = @_;
489 643
644 $guard if 0;
645
646 if ($kv->{identifier} eq $identifier) {
647 if ($type eq "persistent_get") {
490 $cv->($kv); 648 $ok->($kv);
649 return;
650 } elsif ($type eq "protocol_error") {
651 $err->($kv);
652 return;
653 }
654 }
655
491 1 656 1
492 }, 657 });
493 ); 658 });
494}; 659};
495 660
496=item $cv = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write) 661=item $status = $fcp->remove_request ($identifier[, $global])
497 662
663Remove the request with the given isdentifier. Returns true if successful,
664false on error.
665
666=cut
667
668_txn remove_request => sub {
669 my ($self, $ok, $err, $identifier, $global) = @_;
670
671 $self->serialise ($identifier => sub {
672 my ($self, $guard) = @_;
673
674 $self->send_msg (remove_request =>
675 identifier => $identifier,
676 global => $global ? "true" : "false",
677 );
678 $self->on (sub {
679 my ($self, $type, $kv, @extra) = @_;
680
681 $guard if 0;
682
683 if ($kv->{identifier} eq $identifier) {
684 if ($type eq "persistent_request_removed") {
685 $ok->(1);
686 return;
687 } elsif ($type eq "protocol_error") {
688 $err->($kv);
689 return;
690 }
691 }
692
693 1
694 });
695 });
696};
697
498=item ($can_read, $can_write) = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write)) 698=item ($can_read, $can_write) = $fcp->test_dda ($local_directory, $remote_directory, $want_read, $want_write))
499 699
500The DDA test in FCP is probably the single most broken protocol - only 700The DDA test in FCP is probably the single most broken protocol - only
501one directory test can be outstanding at any time, and some guessing and 701one directory test can be outstanding at any time, and some guessing and
502heuristics are involved in mangling the paths. 702heuristics are involved in mangling the paths.
503 703
504This function combines C<TestDDARequest> and C<TestDDAResponse> in one 704This function combines C<TestDDARequest> and C<TestDDAResponse> in one
505request, handling file reading and writing as well. 705request, handling file reading and writing as well, and tries very hard to
706do the right thing.
707
708Both C<$local_directory> and C<$remote_directory> must specify the same
709directory - C<$local_directory> is the directory path on the client (where
710L<AnyEvent::FCP> runs) and C<$remote_directory> is the directory path on
711the server (where the freenet node runs). When both are running on the
712same node, the paths are generally identical.
713
714C<$want_read> and C<$want_write> should be set to a true value when you
715want to read (get) files or write (put) files, respectively.
716
717On error, an exception is thrown. Otherwise, C<$can_read> and
718C<$can_write> indicate whether you can reaqd or write to freenet via the
719directory.
506 720
507=cut 721=cut
508 722
509_txn test_dda => sub { 723_txn test_dda => sub {
510 my ($self, $cv, $local, $remote, $want_read, $want_write) = @_; 724 my ($self, $ok, $err, $local, $remote, $want_read, $want_write) = @_;
511 725
512 $self->serialise (test_dda => sub { 726 $self->serialise (test_dda => sub {
513 my ($self, $guard) = @_; 727 my ($self, $guard) = @_;
514 728
515 $self->send_msg (test_dda_request => 729 $self->send_msg (test_dda_request =>
516 directory => $remote, 730 directory => $remote,
517 want_read_directory => $want_read ? "true" : "false", 731 want_read_directory => $want_read ? "true" : "false",
518 want_write_directory => $want_write ? "true" : "false", 732 want_write_directory => $want_write ? "true" : "false",
519 ); 733 );
520 $self->on (sub { 734 $self->on (sub {
521 my ($type, $kv) = @_; 735 my ($self, $type, $kv) = @_;
522 736
523 if ($type eq "test_dda_reply") { 737 if ($type eq "test_dda_reply") {
524 # the filenames are all relative to the server-side directory, 738 # the filenames are all relative to the server-side directory,
525 # which might or might not match $remote anymore, so we 739 # which might or might not match $remote anymore, so we
526 # need to rewrite the paths to be relative to $local 740 # need to rewrite the paths to be relative to $local
536 } 750 }
537 751
538 my %response = (directory => $remote); 752 my %response = (directory => $remote);
539 753
540 if (length $kv->{read_filename}) { 754 if (length $kv->{read_filename}) {
541 warn "$local/$kv->{read_filename}";#d#
542 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") { 755 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
543 sysread $fh, my $buf, -s $fh; 756 sysread $fh, my $buf, -s $fh;
544 $response{read_content} = $buf; 757 $response{read_content} = $buf;
545 } 758 }
546 } 759 }
552 } 765 }
553 766
554 $self->send_msg (test_dda_response => %response); 767 $self->send_msg (test_dda_response => %response);
555 768
556 $self->on (sub { 769 $self->on (sub {
557 my ($type, $kv) = @_; 770 my ($self, $type, $kv) = @_;
558 771
559 $guard if 0; # reference 772 $guard if 0; # reference
560 773
561 if ($type eq "test_dda_complete") { 774 if ($type eq "test_dda_complete") {
562 $cv->( 775 $ok->(
563 $kv->{read_directory_allowed} eq "true", 776 $kv->{read_directory_allowed} eq "true",
564 $kv->{write_directory_allowed} eq "true", 777 $kv->{write_directory_allowed} eq "true",
565 ); 778 );
566 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) { 779 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
567 $cv->croak ($kv->{extra_description}); 780 $err->($kv->{extra_description});
568 return; 781 return;
569 } 782 }
570 783
571 1 784 1
572 }); 785 });
573 786
574 return; 787 return;
575 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) { 788 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
576 $cv->croak ($kv->{extra_description}); 789 $err->($kv);
577 return; 790 return;
578 } 791 }
579 792
580 1 793 1
581 }); 794 });
582 }); 795 });
583}; 796};
584 797
585=back 798=back
586 799
800=head2 REQUEST CACHE
801
802The C<AnyEvent::FCP> class keeps a request cache, where it caches all
803information from requests.
804
805For these messages, it will store a copy of the key-value pairs, together with a C<type> slot,
806in C<< $fcp->{req}{$identifier} >>:
807
808 persistent_get
809 persistent_put
810 persistent_put_dir
811
812This message updates the stored data:
813
814 persistent_request_modified
815
816This message will remove this entry:
817
818 persistent_request_removed
819
820These messages get merged into the cache entry, under their
821type, i.e. a C<simple_progress> message will be stored in C<<
822$fcp->{req}{$identifier}{simple_progress} >>:
823
824 simple_progress # get/put
825
826 uri_generated # put
827 generated_metadata # put
828 started_compression # put
829 finished_compression # put
830 put_failed # put
831 put_fetchable # put
832 put_successful # put
833
834 sending_to_network # get
835 compatibility_mode # get
836 expected_hashes # get
837 expected_mime # get
838 expected_data_length # get
839 get_failed # get
840 data_found # get
841 enter_finite_cooldown # get
842
843In addition, an event (basically a fake message) of type C<request_changed> is generated
844on every change, which will be called as C<< $cb->($fcp, $kv, $type) >>, where C<$type>
845is the type of the original message triggering the change,
846
847To fill this cache with the global queue and keep it updated,
848call C<watch_global> to subscribe to updates, followed by
849C<list_persistent_requests_sync>.
850
851 $fcp->watch_global_sync_; # do not wait
852 $fcp->list_persistent_requests; # wait
853
854To get a better idea of what is stored in the cache, here is an example of
855what might be stored in C<< $fcp->{req}{"Frost-gpl.txt"} >>:
856
857 {
858 identifier => "Frost-gpl.txt",
859 uri => 'CHK@Fnx5kzdrfE,EImdzaVyEWl,AAIC--8/gpl.txt',
860 binary_blob => "false",
861 global => "true",
862 max_retries => -1,
863 max_size => 9223372036854775807,
864 persistence => "forever",
865 priority_class => 3,
866 real_time => "false",
867 return_type => "direct",
868 started => "true",
869 type => "persistent_get",
870 verbosity => 2147483647,
871 sending_to_network => {
872 identifier => "Frost-gpl.txt",
873 global => "true",
874 },
875 compatibility_mode => {
876 identifier => "Frost-gpl.txt",
877 definitive => "true",
878 dont_compress => "false",
879 global => "true",
880 max => "COMPAT_1255",
881 min => "COMPAT_1255",
882 },
883 expected_hashes => {
884 identifier => "Frost-gpl.txt",
885 global => "true",
886 hashes => {
887 ed2k => "d83596f5ee3b7...",
888 md5 => "e0894e4a2a6...",
889 sha1 => "...",
890 sha256 => "...",
891 sha512 => "...",
892 tth => "...",
893 },
894 },
895 expected_mime => {
896 identifier => "Frost-gpl.txt",
897 global => "true",
898 metadata => { content_type => "application/rar" },
899 },
900 expected_data_length => {
901 identifier => "Frost-gpl.txt",
902 data_length => 37576,
903 global => "true",
904 },
905 simple_progress => {
906 identifier => "Frost-gpl.txt",
907 failed => 0,
908 fatally_failed => 0,
909 finalized_total => "true",
910 global => "true",
911 last_progress => 1438639282628,
912 required => 372,
913 succeeded => 102,
914 total => 747,
915 },
916 data_found => {
917 identifier => "Frost-gpl.txt",
918 completion_time => 1438663354026,
919 data_length => 37576,
920 global => "true",
921 metadata => { content_type => "image/jpeg" },
922 startup_time => 1438657196167,
923 },
924 }
925
587=head1 EXAMPLE PROGRAM 926=head1 EXAMPLE PROGRAM
588 927
589 use AnyEvent::FCP; 928 use AnyEvent::FCP;
590 929
591 my $fcp = new AnyEvent::FCP; 930 my $fcp = new AnyEvent::FCP;
592 931
593 # let us look at the global request list 932 # let us look at the global request list
594 $fcp->watch_global (1, 0); 933 $fcp->watch_global_ (1);
595 934
596 # list them, synchronously 935 # list them, synchronously
597 my $req = $fcp->list_persistent_requests_sync; 936 my $req = $fcp->list_persistent_requests;
598 937
599 # go through all requests 938 # go through all requests
939TODO
600 for my $req (values %$req) { 940 for my $req (values %$req) {
601 # skip jobs not directly-to-disk 941 # skip jobs not directly-to-disk
602 next unless $req->{return_type} eq "disk"; 942 next unless $req->{return_type} eq "disk";
603 # skip jobs not issued by FProxy 943 # skip jobs not issued by FProxy
604 next unless $req->{identifier} =~ /^FProxy:/; 944 next unless $req->{identifier} =~ /^FProxy:/;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines