ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.8 by root, Fri Jun 18 16:59:13 2010 UTC vs.
Revision 1.17 by root, Sat Sep 5 19:36:12 2015 UTC

35 35
36 use AnyEvent::FCP; 36 use AnyEvent::FCP;
37 37
38 my $fcp = new AnyEvent::FCP; 38 my $fcp = new AnyEvent::FCP;
39 39
40 $fcp->watch_global_sync (1, 0); 40 $fcp->watch_global (1, 0);
41 my $req = $fcp->list_persistent_requests_sync; 41 my $req = $fcp->list_persistent_requests;
42 42
43TODO
43 for my $req (values %$req) { 44 for my $req (values %$req) {
44 if ($req->{filename} =~ /a/) { 45 if ($req->{filename} =~ /a/) {
45 $fcp->modify_persistent_request_sync (1, $req->{identifier}, undef, 0); 46 $fcp->modify_persistent_request (1, $req->{identifier}, undef, 0);
46 } 47 }
47 } 48 }
48 49
49=head2 IMPORT TAGS 50=head2 IMPORT TAGS
50 51
51Nothing much can be "imported" from this module right now. 52Nothing much can be "imported" from this module right now.
52 53
53=head2 THE AnyEvent::FCP CLASS 54=head1 THE AnyEvent::FCP CLASS
54 55
55=over 4 56=over 4
56 57
57=cut 58=cut
58 59
66 67
67use Scalar::Util (); 68use Scalar::Util ();
68 69
69use AnyEvent; 70use AnyEvent;
70use AnyEvent::Handle; 71use AnyEvent::Handle;
72use AnyEvent::Util ();
73
74our %TOLC; # tolc cache
71 75
72sub touc($) { 76sub touc($) {
73 local $_ = shift; 77 local $_ = shift;
74 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime)(?:_|$))/\U$1/; 78 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
75 s/(?:^|_)(.)/\U$1/g; 79 s/(?:^|_)(.)/\U$1/g;
76 $_ 80 $_
77} 81}
78 82
79sub tolc($) { 83sub tolc($) {
80 local $_ = shift; 84 local $_ = shift;
81 1 while s/(SVK|CHK|URI|FCP|DS|MIME)([^_])/$1\_$2/i; 85 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/;
82 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME)/$1\_$2/i; 86 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/;
83 s/(?<=[a-z])(?=[A-Z])/_/g; 87 s/(?<=[a-z])(?=[A-Z])/_/g;
84 lc 88 lc
85} 89}
86 90
87=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 91=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, name => $name]
88 92
89Create a new FCP connection to the given host and port (default 93Create a new FCP connection to the given host and port (default
90127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). 94127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
91 95
92If no C<name> was specified, then AnyEvent::FCP will generate a 96If no C<name> was specified, then AnyEvent::FCP will generate a
93(hopefully) unique client name for you. 97(hopefully) unique client name for you.
94 98
95You can install a progress callback that is being called with the AnyEvent::FCP
96object, the type, a hashref with key-value pairs and a reference to any received data,
97for all unsolicited messages.
98
99Example:
100
101 sub progress_cb {
102 my ($self, $type, $kv, $rdata) = @_;
103
104 if ($type eq "simple_progress") {
105 warn "$kv->{identifier} $kv->{succeeded}/$kv->{required}\n";
106 }
107 }
108
109=cut 99=cut
110 100
111sub new { 101sub new {
112 my $class = shift; 102 my $class = shift;
103
104 my $rand = join "", map chr 0x21 + rand 94, 1..40; # ~ 262 bits entropy
105
113 my $self = bless { @_ }, $class; 106 my $self = bless {
114 107 host => $ENV{FREDHOST} || "127.0.0.1",
115 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 108 port => $ENV{FREDPORT} || 9481,
116 $self->{port} ||= $ENV{FREDPORT} || 9481; 109 timeout => 3600 * 2,
117 $self->{name} ||= time.rand.rand.rand; # lame 110 name => time.rand.rand.rand, # lame
118 $self->{timeout} ||= 3600*2; 111 @_,
119 $self->{progress} ||= sub { }; 112 queue => [],
120 113 req => {},
121 $self->{id} = "a0"; 114 prefix => "..:aefcpid:$rand:",
115 idseq => "a0",
116 }, $class;
122 117
123 { 118 {
124 Scalar::Util::weaken (my $self = $self); 119 Scalar::Util::weaken (my $self = $self);
120
121 our $ENDMESSAGE = qr<\012(EndMessage|Data)\012>;
122
123 # these are declared here for performance reasons
124 my ($k, $v, $type);
125 my $rdata;
126
127 my $on_read = sub {
128 my ($hdl) = @_;
129
130 # we only carve out whole messages here
131 while ($hdl->{rbuf} =~ /\012(EndMessage|Data)\012/) {
132 # remember end marker
133 $rdata = $1 eq "Data"
134 or $1 eq "EndMessage"
135 or die "protocol error, expected message end, got $1\n";
136
137 my @lines = split /\012/, substr $hdl->{rbuf}, 0, $-[0];
138
139 substr $hdl->{rbuf}, 0, $+[0], ""; # remove pkg
140
141 $type = shift @lines;
142 $type = ($TOLC{$type} ||= tolc $type);
143
144 my %kv;
145
146 for (@lines) {
147 ($k, $v) = split /=/, $_, 2;
148 $k = ($TOLC{$k} ||= tolc $k);
149
150 if ($k =~ /\./) {
151 # generic, slow case
152 my @k = split /\./, $k;
153 my $ro = \\%kv;
154
155 while (@k) {
156 $k = shift @k;
157 if ($k =~ /^\d+$/) {
158 $ro = \$$ro->[$k];
159 } else {
160 $ro = \$$ro->{$k};
161 }
162 }
163
164 $$ro = $v;
165
166 next;
167 }
168
169 # special comon case, for performance only
170 $kv{$k} = $v;
171 }
172
173 if ($rdata) {
174 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
175 $rdata = \$_[1];
176 $self->recv ($type, \%kv, $rdata);
177 });
178
179 last; # do not tgry to parse more messages
180 } else {
181 $self->recv ($type, \%kv);
182 }
183 }
184 };
125 185
126 $self->{hdl} = new AnyEvent::Handle 186 $self->{hdl} = new AnyEvent::Handle
127 connect => [$self->{host} => $self->{port}], 187 connect => [$self->{host} => $self->{port}],
128 timeout => $self->{timeout}, 188 timeout => $self->{timeout},
129 on_error => sub { 189 on_error => sub {
130 warn "@_\n";#d# 190 warn "@_\n";#d#
131 exit 1; 191 exit 1;
132 }, 192 },
133 on_read => sub { $self->on_read (@_) }, 193 on_read => $on_read,
134 on_eof => $self->{on_eof} || sub { }; 194 on_eof => $self->{on_eof} || sub { },
195 ;
135 196
136 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 197 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
137 } 198 }
138 199
139 $self->send_msg ( 200 $self->send_msg (client_hello =>
140 client_hello =>
141 name => $self->{name}, 201 name => $self->{name},
142 expected_version => "2.0", 202 expected_version => "2.0",
143 ); 203 );
144 204
145 $self 205 $self
146} 206}
147 207
208sub identifier {
209 $_[0]{prefix} . ++$_[0]{idseq}
210}
211
148sub send_msg { 212sub send_msg {
149 my ($self, $type, %kv) = @_; 213 my ($self, $type, %kv) = @_;
150 214
151 my $data = delete $kv{data}; 215 my $data = delete $kv{data};
152 216
153 if (exists $kv{id_cb}) { 217 if (exists $kv{id_cb}) {
154 my $id = $kv{identifier} || ++$self->{id}; 218 my $id = $kv{identifier} ||= $self->identifier;
155 $self->{id}{$id} = delete $kv{id_cb}; 219 $self->{id}{$id} = delete $kv{id_cb};
156 $kv{identifier} = $id;
157 } 220 }
158 221
159 my $msg = (touc $type) . "\012" 222 my $msg = (touc $type) . "\012"
160 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; 223 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
161 224
173 } 236 }
174 237
175 $self->{hdl}->push_write ($msg); 238 $self->{hdl}->push_write ($msg);
176} 239}
177 240
178sub on_read { 241sub on {
179 my ($self) = @_; 242 my ($self, $cb) = @_;
180 243
181 my $type; 244 # cb return undef - message eaten, remove cb
182 my %kv; 245 # cb return 0 - message eaten
183 my $rdata; 246 # cb return 1 - pass to next
184 247
185 my $done_cb = sub { 248 push @{ $self->{on} }, $cb;
186 $kv{pkt_type} = $type; 249}
187 250
188 if (my $cb = $self->{queue}[0]) { 251sub _push_queue {
189 $cb->($self, $type, \%kv, $rdata) 252 my ($self, $queue) = @_;
190 and shift @{ $self->{queue} }; 253
191 } else { 254 shift @$queue;
192 $self->default_recv ($type, \%kv, $rdata); 255 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
256 if @$queue;
257}
258
259# lock so only one $type (arbitrary string) is in flight,
260# to work around horribly misdesigned protocol.
261sub serialise {
262 my ($self, $type, $cb) = @_;
263
264 my $queue = $self->{serialise}{$type} ||= [];
265 push @$queue, $cb;
266 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
267 unless $#$queue;
268}
269
270# how to merge these types into $self->{persistent}
271our %PERSISTENT_TYPE = (
272 persistent_get => sub { %{ $_[1] } = (type => "persistent_get" , %{ $_[2] }) },
273 persistent_put => sub { %{ $_[1] } = (type => "persistent_put" , %{ $_[2] }) },
274 persistent_put_dir => sub { %{ $_[1] } = (type => "persistent_put_dir", %{ $_[2] }) },
275 persistent_request_modified => sub { %{ $_[1] } = (%{ $_[1] }, %{ $_[2] }) },
276 persistent_request_removed => sub { delete $_[0]{req}{$_[2]{identifier}} },
277
278 simple_progress => sub { $_[1]{simple_progress} = $_[2] }, # get/put
279
280 uri_generated => sub { $_[1]{uri_generated} = $_[2] }, # put
281 generated_metadata => sub { $_[1]{generated_metadata} = $_[2] }, # put
282 started_compression => sub { $_[1]{started_compression} = $_[2] }, # put
283 finished_compression => sub { $_[1]{finished_compression} = $_[2] }, # put
284 put_fetchable => sub { $_[1]{put_fetchable} = $_[2] }, # put
285 put_failed => sub { $_[1]{put_failed} = $_[2] }, # put
286 put_successful => sub { $_[1]{put_successful} = $_[2] }, # put
287
288 sending_to_network => sub { $_[1]{sending_to_network} = $_[2] }, # get
289 compatibility_mode => sub { $_[1]{compatibility_mode} = $_[2] }, # get
290 expected_hashes => sub { $_[1]{expected_hashes} = $_[2] }, # get
291 expected_mime => sub { $_[1]{expected_mime} = $_[2] }, # get
292 expected_data_length => sub { $_[1]{expected_data_length} = $_[2] }, # get
293 get_failed => sub { $_[1]{get_failed} = $_[2] }, # get
294 data_found => sub { $_[1]{data_found} = $_[2] }, # get
295 enter_finite_cooldown => sub { $_[1]{enter_finite_cooldown} = $_[2] }, # get
296);
297
298sub recv {
299 my ($self, $type, $kv, @extra) = @_;
300
301 if (my $cb = $PERSISTENT_TYPE{$type}) {
302 my $id = $kv->{identifier};
303 my $req = $_[0]{req}{$id} ||= {};
304 $cb->($self, $req, $kv);
305 $self->recv (request_changed => $kv, $type, @extra);
306 }
307
308 my $on = $self->{on};
309 for (0 .. $#$on) {
310 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
311 splice @$on, $_, 1 unless defined $res;
312 return;
193 } 313 }
194 }; 314 }
195 315
196 my $hdr_cb; $hdr_cb = sub { 316 if (my $cb = $self->{queue}[0]) {
197 if ($_[1] =~ /^([^=]+)=(.*)$/) { 317 $cb->($self, $type, $kv, @extra)
198 my ($k, $v) = ($1, $2); 318 and shift @{ $self->{queue} };
199 my @k = split /\./, tolc $k;
200 my $ro = \\%kv;
201
202 while (@k) {
203 my $k = shift @k;
204 if ($k =~ /^\d+$/) {
205 $ro = \$$ro->[$k];
206 } else {
207 $ro = \$$ro->{$k};
208 }
209 }
210
211 $$ro = $v;
212
213 $_[0]->push_read (line => $hdr_cb);
214 } elsif ($_[1] eq "Data") {
215 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
216 $rdata = \$_[1];
217 $done_cb->();
218 });
219 } elsif ($_[1] eq "EndMessage") {
220 $done_cb->();
221 } else { 319 } else {
222 die "protocol error, expected message end, got $_[1]\n";#d# 320 $self->default_recv ($type, $kv, @extra);
223 }
224 }; 321 }
225
226 $self->{hdl}->push_read (line => sub {
227 $type = tolc $_[1];
228 $_[0]->push_read (line => $hdr_cb);
229 });
230} 322}
231 323
232sub default_recv { 324sub default_recv {
233 my ($self, $type, $kv, $rdata) = @_; 325 my ($self, $type, $kv, $rdata) = @_;
234 326
235 if ($type eq "node_hello") { 327 if ($type eq "node_hello") {
236 $self->{node_hello} = $kv; 328 $self->{node_hello} = $kv;
237 } elsif (exists $self->{id}{$kv->{identifier}}) { 329 } elsif (exists $self->{id}{$kv->{identifier}}) {
238 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) 330 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
239 and delete $self->{id}{$kv->{identifier}}; 331 and delete $self->{id}{$kv->{identifier}};
240 } else {
241 &{ $self->{progress} };
242 } 332 }
243} 333}
334
335=back
336
337=head2 FCP REQUESTS
338
339The following methods implement various requests. Most of them map
340directory to the FCP message of the same name. The added benefit of
341these over sending requests yourself is that they handle the necessary
342serialisation, protocol quirks, and replies.
343
344All of them exist in two versions, the variant shown in this manpage, and
345a variant with an extra C<_> at the end, and an extra C<$cb> argument. The
346version as shown is I<synchronous> - it will wait for any replies, and
347either return the reply, or croak with an error. The underscore variant
348returns immediately and invokes one or more callbacks or condvars later.
349
350For example, the call
351
352 $info = $fcp->get_plugin_info ($name, $detailed);
353
354Also comes in this underscore variant:
355
356 $fcp->get_plugin_info_ ($name, $detailed, $cb);
357
358You can thinbk of the underscore as a kind of continuation indicator - the
359normal function waits and returns with the data, the C<_> indicates that
360you pass the continuation yourself, and the continuation will be invoked
361with the results.
362
363This callback/continuation argument (C<$cb>) can come in three forms itself:
364
365=over 4
366
367=item A code reference (or rather anything not matching some other alternative)
368
369This code reference will be invoked with the result on success. On an
370error, it will die (in the event loop) with a backtrace of the call site.
371
372This is a popular choice, but it makes handling errors hard - make sure
373you never generate protocol errors!
374
375=item A condvar (as returned by e.g. C<< AnyEvent->condvar >>)
376
377When a condvar is passed, it is sent (C<< $cv->send ($results) >>) the
378results when the request has finished. Should an error occur, the error
379will instead result in C<< $cv->croak ($error) >>.
380
381This is also a popular choice.
382
383=item An array with two callbacks C<[$success, $failure]>
384
385The C<$success> callback will be invoked with the results, while the
386C<$failure> callback will be invoked on any errors.
387
388=item C<undef>
389
390This is the same thing as specifying C<sub { }> as callback, i.e. on
391success, the results are ignored, while on failure, you the module dies
392with a backtrace.
393
394This is good for quick scripts, or when you really aren't interested in
395the results.
396
397=back
398
399=cut
400
401our $NOP_CB = sub { };
244 402
245sub _txn { 403sub _txn {
246 my ($name, $sub) = @_; 404 my ($name, $sub) = @_;
247 405
248 *{$name} = sub { 406 *{$name} = sub {
249 splice @_, 1, 0, (my $cv = AnyEvent->condvar); 407 my $cv = AE::cv;
250 &$sub;
251 $cv
252 };
253 408
254 *{"$name\_sync"} = sub { 409 splice @_, 1, 0, $cv, sub { $cv->croak ($_[0]{extra_description}) };
255 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
256 &$sub; 410 &$sub;
257 $cv->recv 411 $cv->recv
258 }; 412 };
259}
260 413
261=item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]]) 414 *{"$name\_"} = sub {
415 my ($ok, $err) = pop;
262 416
417 if (ARRAY:: eq ref $ok) {
418 ($ok, $err) = @$ok;
419 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
420 $err = sub { $ok->croak ($_[0]{extra_description}) };
421 } else {
422 my $bt = Carp::longmess "";
423 $err = sub {
424 die "$_[0]{code_description} ($_[0]{extra_description})$bt";
425 };
426 }
427
428 $ok ||= $NOP_CB;
429
430 splice @_, 1, 0, $ok, $err;
431 &$sub;
432 };
433}
434
435=over 4
436
263=item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]]) 437=item $peers = $fcp->list_peers ([$with_metdata[, $with_volatile]])
264 438
265=cut 439=cut
266 440
267_txn list_peers => sub { 441_txn list_peers => sub {
268 my ($self, $cv, $with_metadata, $with_volatile) = @_; 442 my ($self, $ok, undef, $with_metadata, $with_volatile) = @_;
269 443
270 my @res; 444 my @res;
271 445
272 $self->send_msg (list_peers => 446 $self->send_msg (list_peers =>
273 with_metadata => $with_metadata ? "true" : "false", 447 with_metadata => $with_metadata ? "true" : "false",
274 with_volatile => $with_volatile ? "true" : "false", 448 with_volatile => $with_volatile ? "true" : "false",
275 id_cb => sub { 449 id_cb => sub {
276 my ($self, $type, $kv, $rdata) = @_; 450 my ($self, $type, $kv, $rdata) = @_;
277 451
278 if ($type eq "end_list_peers") { 452 if ($type eq "end_list_peers") {
279 $cv->(\@res); 453 $ok->(\@res);
280 1 454 1
281 } else { 455 } else {
282 push @res, $kv; 456 push @res, $kv;
283 0 457 0
284 } 458 }
285 }, 459 },
286 ); 460 );
287}; 461};
288 462
289=item $cv = $fcp->list_peer_notes ($node_identifier)
290
291=item $notes = $fcp->list_peer_notes_sync ($node_identifier) 463=item $notes = $fcp->list_peer_notes ($node_identifier)
292 464
293=cut 465=cut
294 466
295_txn list_peer_notes => sub { 467_txn list_peer_notes => sub {
296 my ($self, $cv, $node_identifier) = @_; 468 my ($self, $ok, undef, $node_identifier) = @_;
297 469
298 $self->send_msg (list_peer_notes => 470 $self->send_msg (list_peer_notes =>
299 node_identifier => $node_identifier, 471 node_identifier => $node_identifier,
300 id_cb => sub { 472 id_cb => sub {
301 my ($self, $type, $kv, $rdata) = @_; 473 my ($self, $type, $kv, $rdata) = @_;
302 474
303 $cv->($kv); 475 $ok->($kv);
304 1 476 1
305 }, 477 },
306 ); 478 );
307}; 479};
308 480
309=item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
310
311=item $fcp->watch_global_sync ($enabled[, $verbosity_mask]) 481=item $fcp->watch_global ($enabled[, $verbosity_mask])
312 482
313=cut 483=cut
314 484
315_txn watch_global => sub { 485_txn watch_global => sub {
316 my ($self, $cv, $enabled, $verbosity_mask) = @_; 486 my ($self, $ok, $err, $enabled, $verbosity_mask) = @_;
317 487
318 $self->send_msg (watch_global => 488 $self->send_msg (watch_global =>
319 enabled => $enabled ? "true" : "false", 489 enabled => $enabled ? "true" : "false",
320 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (), 490 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
321 ); 491 );
322 492
323 $cv->(); 493 $ok->();
324}; 494};
325 495
326=item $cv = $fcp->list_persistent_requests
327
328=item $reqs = $fcp->list_persistent_requests_sync 496=item $reqs = $fcp->list_persistent_requests
329 497
330=cut 498=cut
331 499
332_txn list_persistent_requests => sub { 500_txn list_persistent_requests => sub {
333 my ($self, $cv) = @_; 501 my ($self, $ok, $err) = @_;
334 502
503 $self->serialise (list_persistent_requests => sub {
504 my ($self, $guard) = @_;
505
335 my %res; 506 my @res;
336 507
337 $self->send_msg ("list_persistent_requests"); 508 $self->send_msg ("list_persistent_requests");
338 509
339 push @{ $self->{queue} }, sub { 510 $self->on (sub {
340 my ($self, $type, $kv, $rdata) = @_; 511 my ($self, $type, $kv, $rdata) = @_;
341 512
513 $guard if 0;
514
342 if ($type eq "end_list_persistent_requests") { 515 if ($type eq "end_list_persistent_requests") {
343 $cv->(\%res); 516 $ok->(\@res);
517 return;
518 } else {
519 my $id = $kv->{identifier};
520
521 if ($type =~ /^persistent_(get|put|put_dir)$/) {
522 push @res, [$type, $kv];
523 }
524 }
525
344 1 526 1
345 } else { 527 });
346 my $id = $kv->{identifier}; 528 });
529};
347 530
348 if ($type =~ /^persistent_(get|put|put_dir)$/) { 531=item $sync = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
349 $res{$id} = { 532
350 type => $1, 533Update either the C<client_token> or C<priority_class> of a request
351 %{ $res{$id} }, 534identified by C<$global> and C<$identifier>, depending on which of
535C<$client_token> and C<$priority_class> are not C<undef>.
536
537=cut
538
539_txn modify_persistent_request => sub {
540 my ($self, $ok, $err, $global, $identifier, $client_token, $priority_class) = @_;
541
542 $self->serialise ($identifier => sub {
543 my ($self, $guard) = @_;
544
545 $self->send_msg (modify_persistent_request =>
546 global => $global ? "true" : "false",
547 identifier => $identifier,
548 defined $client_token ? (client_token => $client_token ) : (),
549 defined $priority_class ? (priority_class => $priority_class) : (),
550 );
551
552 $self->on (sub {
553 my ($self, $type, $kv, @extra) = @_;
554
555 $guard if 0;
556
557 if ($kv->{identifier} eq $identifier) {
558 if ($type eq "persistent_request_modified") {
352 %$kv, 559 $ok->($kv);
560 return;
561 } elsif ($type eq "protocol_error") {
562 $err->($kv);
563 return;
353 }; 564 }
354 } elsif ($type eq "simple_progress") {
355 delete $kv->{pkt_type}; # save memory
356 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
357 } else {
358 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
359 } 565 }
566
360 0 567 1
361 } 568 });
362 }; 569 });
363}; 570};
364 571
365=item $cv = $fcp->remove_request ($global, $identifier)
366
367=item $status = $fcp->remove_request_sync ($global, $identifier)
368
369=cut
370
371_txn remove_request => sub {
372 my ($self, $cv, $global, $identifier) = @_;
373
374 $self->send_msg (remove_request =>
375 global => $global ? "true" : "false",
376 identifier => $identifier,
377 id_cb => sub {
378 my ($self, $type, $kv, $rdata) = @_;
379
380 $cv->($kv);
381 1
382 },
383 );
384};
385
386=item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
387
388=item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
389
390=cut
391
392_txn modify_persistent_request => sub {
393 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
394
395 $self->send_msg (modify_persistent_request =>
396 global => $global ? "true" : "false",
397 defined $client_token ? (client_token => $client_token ) : (),
398 defined $priority_class ? (priority_class => $priority_class) : (),
399 identifier => $identifier,
400 id_cb => sub {
401 my ($self, $type, $kv, $rdata) = @_;
402
403 $cv->($kv);
404 1
405 },
406 );
407};
408
409=item $cv = $fcp->get_plugin_info ($name, $detailed)
410
411=item $info = $fcp->get_plugin_info_sync ($name, $detailed) 572=item $info = $fcp->get_plugin_info ($name, $detailed)
412 573
413=cut 574=cut
414 575
415_txn get_plugin_info => sub { 576_txn get_plugin_info => sub {
416 my ($self, $cv, $name, $detailed) = @_; 577 my ($self, $ok, $err, $name, $detailed) = @_;
578
579 my $id = $self->identifier;
417 580
418 $self->send_msg (get_plugin_info => 581 $self->send_msg (get_plugin_info =>
582 identifier => $id,
419 plugin_name => $name, 583 plugin_name => $name,
420 detailed => $detailed ? "true" : "false", 584 detailed => $detailed ? "true" : "false",
421 id_cb => sub {
422 my ($self, $type, $kv, $rdata) = @_;
423
424 $cv->($kv);
425 1
426 },
427 ); 585 );
586 $self->on (sub {
587 my ($self, $type, $kv) = @_;
588
589 if ($kv->{identifier} eq $id) {
590 if ($type eq "get_plugin_info") {
591 $ok->($kv);
592 } else {
593 $err->($kv, $type);
594 }
595 return;
596 }
597
598 1
599 });
428}; 600};
429 601
430=item $cv = $fcp->client_get ($uri, $identifier, %kv)
431
432=item $status = $fcp->client_get_sync ($uri, $identifier, %kv) 602=item $status = $fcp->client_get ($uri, $identifier, %kv)
433 603
434%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>). 604%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
435 605
436ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries, 606ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
437priority_class, persistence, client_token, global, return_type, 607priority_class, persistence, client_token, global, return_type,
438binary_blob, allowed_mime_types, filename, temp_filename 608binary_blob, allowed_mime_types, filename, temp_filename
439 609
440=cut 610=cut
441 611
442_txn client_get => sub { 612_txn client_get => sub {
443 my ($self, $cv, $uri, $identifier, %kv) = @_; 613 my ($self, $ok, $err, $uri, $identifier, %kv) = @_;
444 614
615 $self->serialise ($identifier => sub {
616 my ($self, $guard) = @_;
617
445 $self->send_msg (client_get => 618 $self->send_msg (client_get =>
446 %kv, 619 %kv,
447 uri => $uri, 620 uri => $uri,
448 identifier => $identifier, 621 identifier => $identifier,
449 id_cb => sub { 622 );
623
624 $self->on (sub {
450 my ($self, $type, $kv, $rdata) = @_; 625 my ($self, $type, $kv, @extra) = @_;
451 626
627 $guard if 0;
628
629 if ($kv->{identifier} eq $identifier) {
630 if ($type eq "persistent_get") {
452 $cv->($kv); 631 $ok->($kv);
632 return;
633 } elsif ($type eq "protocol_error") {
634 $err->($kv);
635 return;
636 }
637 }
638
453 1 639 1
640 });
641 });
642};
643
644=item $status = $fcp->remove_request ($identifier[, $global])
645
646Remove the request with the given isdentifier. Returns true if successful,
647false on error.
648
649=cut
650
651_txn remove_request => sub {
652 my ($self, $ok, $err, $identifier, $global) = @_;
653
654 $self->serialise ($identifier => sub {
655 my ($self, $guard) = @_;
656
657 $self->send_msg (remove_request =>
658 identifier => $identifier,
659 global => $global ? "true" : "false",
660 );
661 $self->on (sub {
662 my ($self, $type, $kv, @extra) = @_;
663
664 $guard if 0;
665
666 if ($kv->{identifier} eq $identifier) {
667 if ($type eq "persistent_request_removed") {
668 $ok->(1);
669 return;
670 } elsif ($type eq "protocol_error") {
671 $err->($kv);
672 return;
673 }
674 }
675
676 1
677 });
678 });
679};
680
681=item ($can_read, $can_write) = $fcp->test_dda ($local_directory, $remote_directory, $want_read, $want_write))
682
683The DDA test in FCP is probably the single most broken protocol - only
684one directory test can be outstanding at any time, and some guessing and
685heuristics are involved in mangling the paths.
686
687This function combines C<TestDDARequest> and C<TestDDAResponse> in one
688request, handling file reading and writing as well, and tries very hard to
689do the right thing.
690
691Both C<$local_directory> and C<$remote_directory> must specify the same
692directory - C<$local_directory> is the directory path on the client (where
693L<AnyEvent::FCP> runs) and C<$remote_directory> is the directory path on
694the server (where the freenet node runs). When both are running on the
695same node, the paths are generally identical.
696
697C<$want_read> and C<$want_write> should be set to a true value when you
698want to read (get) files or write (put) files, respectively.
699
700On error, an exception is thrown. Otherwise, C<$can_read> and
701C<$can_write> indicate whether you can reaqd or write to freenet via the
702directory.
703
704=cut
705
706_txn test_dda => sub {
707 my ($self, $ok, $err, $local, $remote, $want_read, $want_write) = @_;
708
709 $self->serialise (test_dda => sub {
710 my ($self, $guard) = @_;
711
712 $self->send_msg (test_dda_request =>
713 directory => $remote,
714 want_read_directory => $want_read ? "true" : "false",
715 want_write_directory => $want_write ? "true" : "false",
716 );
717 $self->on (sub {
718 my ($self, $type, $kv) = @_;
719
720 if ($type eq "test_dda_reply") {
721 # the filenames are all relative to the server-side directory,
722 # which might or might not match $remote anymore, so we
723 # need to rewrite the paths to be relative to $local
724 for my $k (qw(read_filename write_filename)) {
725 my $f = $kv->{$k};
726 for my $dir ($kv->{directory}, $remote) {
727 if ($dir eq substr $f, 0, length $dir) {
728 substr $f, 0, 1 + length $dir, "";
729 $kv->{$k} = $f;
730 last;
731 }
732 }
733 }
734
735 my %response = (directory => $remote);
736
737 if (length $kv->{read_filename}) {
738 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
739 sysread $fh, my $buf, -s $fh;
740 $response{read_content} = $buf;
741 }
742 }
743
744 if (length $kv->{write_filename}) {
745 if (open my $fh, ">:raw", "$local/$kv->{write_filename}") {
746 syswrite $fh, $kv->{content_to_write};
747 }
748 }
749
750 $self->send_msg (test_dda_response => %response);
751
752 $self->on (sub {
753 my ($self, $type, $kv) = @_;
754
755 $guard if 0; # reference
756
757 if ($type eq "test_dda_complete") {
758 $ok->(
759 $kv->{read_directory_allowed} eq "true",
760 $kv->{write_directory_allowed} eq "true",
761 );
762 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
763 $err->($kv->{extra_description});
764 return;
765 }
766
767 1
768 });
769
770 return;
771 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
772 $err->($kv);
773 return;
774 }
775
776 1
777 });
778 });
779};
780
781=back
782
783=head2 REQUEST CACHE
784
785The C<AnyEvent::FCP> class keeps a request cache, where it caches all
786information from requests.
787
788For these messages, it will store a copy of the key-value pairs, together with a C<type> slot,
789in C<< $fcp->{req}{$identifier} >>:
790
791 persistent_get
792 persistent_put
793 persistent_put_dir
794
795This message updates the stored data:
796
797 persistent_request_modified
798
799This message will remove this entry:
800
801 persistent_request_removed
802
803These messages get merged into the cache entry, under their
804type, i.e. a C<simple_progress> message will be stored in C<<
805$fcp->{req}{$identifier}{simple_progress} >>:
806
807 simple_progress # get/put
808
809 uri_generated # put
810 generated_metadata # put
811 started_compression # put
812 finished_compression # put
813 put_failed # put
814 put_fetchable # put
815 put_successful # put
816
817 sending_to_network # get
818 compatibility_mode # get
819 expected_hashes # get
820 expected_mime # get
821 expected_data_length # get
822 get_failed # get
823 data_found # get
824 enter_finite_cooldown # get
825
826In addition, an event (basically a fake message) of type C<request_changed> is generated
827on every change, which will be called as C<< $cb->($fcp, $kv, $type) >>, where C<$type>
828is the type of the original message triggering the change,
829
830To fill this cache with the global queue and keep it updated,
831call C<watch_global> to subscribe to updates, followed by
832C<list_persistent_requests_sync>.
833
834 $fcp->watch_global_sync_; # do not wait
835 $fcp->list_persistent_requests; # wait
836
837To get a better idea of what is stored in the cache, here is an example of
838what might be stored in C<< $fcp->{req}{"Frost-gpl.txt"} >>:
839
840 {
841 identifier => "Frost-gpl.txt",
842 uri => 'CHK@Fnx5kzdrfE,EImdzaVyEWl,AAIC--8/gpl.txt',
843 binary_blob => "false",
844 global => "true",
845 max_retries => -1,
846 max_size => 9223372036854775807,
847 persistence => "forever",
848 priority_class => 3,
849 real_time => "false",
850 return_type => "direct",
851 started => "true",
852 type => "persistent_get",
853 verbosity => 2147483647,
854 sending_to_network => {
855 identifier => "Frost-gpl.txt",
856 global => "true",
454 }, 857 },
455 ); 858 compatibility_mode => {
456}; 859 identifier => "Frost-gpl.txt",
457 860 definitive => "true",
458=back 861 dont_compress => "false",
862 global => "true",
863 max => "COMPAT_1255",
864 min => "COMPAT_1255",
865 },
866 expected_hashes => {
867 identifier => "Frost-gpl.txt",
868 global => "true",
869 hashes => {
870 ed2k => "d83596f5ee3b7...",
871 md5 => "e0894e4a2a6...",
872 sha1 => "...",
873 sha256 => "...",
874 sha512 => "...",
875 tth => "...",
876 },
877 },
878 expected_mime => {
879 identifier => "Frost-gpl.txt",
880 global => "true",
881 metadata => { content_type => "application/rar" },
882 },
883 expected_data_length => {
884 identifier => "Frost-gpl.txt",
885 data_length => 37576,
886 global => "true",
887 },
888 simple_progress => {
889 identifier => "Frost-gpl.txt",
890 failed => 0,
891 fatally_failed => 0,
892 finalized_total => "true",
893 global => "true",
894 last_progress => 1438639282628,
895 required => 372,
896 succeeded => 102,
897 total => 747,
898 },
899 data_found => {
900 identifier => "Frost-gpl.txt",
901 completion_time => 1438663354026,
902 data_length => 37576,
903 global => "true",
904 metadata => { content_type => "image/jpeg" },
905 startup_time => 1438657196167,
906 },
907 }
459 908
460=head1 EXAMPLE PROGRAM 909=head1 EXAMPLE PROGRAM
461 910
462 use AnyEvent::FCP; 911 use AnyEvent::FCP;
463 912
464 my $fcp = new AnyEvent::FCP; 913 my $fcp = new AnyEvent::FCP;
465 914
466 # let us look at the global request list 915 # let us look at the global request list
467 $fcp->watch_global (1, 0); 916 $fcp->watch_global_ (1);
468 917
469 # list them, synchronously 918 # list them, synchronously
470 my $req = $fcp->list_persistent_requests_sync; 919 my $req = $fcp->list_persistent_requests;
471 920
472 # go through all requests 921 # go through all requests
922TODO
473 for my $req (values %$req) { 923 for my $req (values %$req) {
474 # skip jobs not directly-to-disk 924 # skip jobs not directly-to-disk
475 next unless $req->{return_type} eq "disk"; 925 next unless $req->{return_type} eq "disk";
476 # skip jobs not issued by FProxy 926 # skip jobs not issued by FProxy
477 next unless $req->{identifier} =~ /^FProxy:/; 927 next unless $req->{identifier} =~ /^FProxy:/;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines