ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.3 by root, Tue Jul 28 02:20:51 2009 UTC vs.
Revision 1.11 by root, Fri Aug 7 01:54:00 2015 UTC

6 6
7 use AnyEvent::FCP; 7 use AnyEvent::FCP;
8 8
9 my $fcp = new AnyEvent::FCP; 9 my $fcp = new AnyEvent::FCP;
10 10
11# transactions return condvars 11 # transactions return condvars
12 my $lp_cv = $fcp->list_peers; 12 my $lp_cv = $fcp->list_peers;
13 my $pr_cv = $fcp->list_persistent_requests; 13 my $pr_cv = $fcp->list_persistent_requests;
14 14
15 my $peers = $lp_cv->recv; 15 my $peers = $lp_cv->recv;
16 my $reqs = $pr_cv->recv; 16 my $reqs = $pr_cv->recv;
26The module uses L<AnyEvent> to find a suitable event module. 26The module uses L<AnyEvent> to find a suitable event module.
27 27
28Only very little is implemented, ask if you need more, and look at the 28Only very little is implemented, ask if you need more, and look at the
29example program later in this section. 29example program later in this section.
30 30
31=head2 EXAMPLE
32
33This example fetches the download list and sets the priority of all files
34with "a" in their name to "emergency":
35
36 use AnyEvent::FCP;
37
38 my $fcp = new AnyEvent::FCP;
39
40 $fcp->watch_global (1, 0);
41 my $req = $fcp->list_persistent_requests;
42
43TODO
44 for my $req (values %$req) {
45 if ($req->{filename} =~ /a/) {
46 $fcp->modify_persistent_request (1, $req->{identifier}, undef, 0);
47 }
48 }
49
31=head2 IMPORT TAGS 50=head2 IMPORT TAGS
32 51
33Nothing much can be "imported" from this module right now. 52Nothing much can be "imported" from this module right now.
34 53
35=head2 THE AnyEvent::FCP CLASS 54=head1 THE AnyEvent::FCP CLASS
36 55
37=over 4 56=over 4
38 57
39=cut 58=cut
40 59
42 61
43use common::sense; 62use common::sense;
44 63
45use Carp; 64use Carp;
46 65
47our $VERSION = '0.2'; 66our $VERSION = '0.3';
48 67
49use Scalar::Util (); 68use Scalar::Util ();
50 69
51use AnyEvent; 70use AnyEvent;
52use AnyEvent::Handle; 71use AnyEvent::Handle;
72use AnyEvent::Util ();
53 73
54sub touc($) { 74sub touc($) {
55 local $_ = shift; 75 local $_ = shift;
56 1 while s/((?:^|_)(?:svk|chk|uri|fcp)(?:_|$))/\U$1/; 76 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
57 s/(?:^|_)(.)/\U$1/g; 77 s/(?:^|_)(.)/\U$1/g;
58 $_ 78 $_
59} 79}
60 80
61sub tolc($) { 81sub tolc($) {
62 local $_ = shift; 82 local $_ = shift;
63 1 while s/(SVK|CHK|URI|FCP)([^_])/$1\_$2/i; 83 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/;
64 1 while s/([^_])(SVK|CHK|URI|FCP)/$1\_$2/i; 84 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/;
65 s/(?<=[a-z])(?=[A-Z])/_/g; 85 s/(?<=[a-z])(?=[A-Z])/_/g;
66 lc 86 lc
67} 87}
68 88
69=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 89=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, name => $name]
70 90
71Create a new FCP connection to the given host and port (default 91Create a new FCP connection to the given host and port (default
72127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). 92127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
73 93
74If no C<name> was specified, then AnyEvent::FCP will generate a 94If no C<name> was specified, then AnyEvent::FCP will generate a
75(hopefully) unique client name for you. 95(hopefully) unique client name for you.
76 96
77=cut 97=cut
78 98
79#TODO
80#You can install a progress callback that is being called with the AnyEvent::FCP
81#object, a txn object, the type of the transaction and the attributes. Use
82#it like this:
83#
84# sub progress_cb {
85# my ($self, $txn, $type, $attr) = @_;
86#
87# warn "progress<$txn,$type," . (join ":", %$attr) . ">\n";
88# }
89
90sub new { 99sub new {
91 my $class = shift; 100 my $class = shift;
92 my $self = bless { @_ }, $class; 101 my $self = bless {
93 102 host => $ENV{FREDHOST} || "127.0.0.1",
94 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 103 port => $ENV{FREDPORT} || 9481,
95 $self->{port} ||= $ENV{FREDPORT} || 9481; 104 timeout => 3600 * 2,
96 $self->{name} ||= time.rand.rand.rand; # lame 105 name => time.rand.rand.rand, # lame
97 $self->{timeout} ||= 600; 106 @_,
98 107 queue => [],
99 $self->{id} = "a0"; 108 req => {},
109 id => "a0",
110 }, $class;
100 111
101 { 112 {
102 Scalar::Util::weaken (my $self = $self); 113 Scalar::Util::weaken (my $self = $self);
103 114
104 $self->{hdl} = new AnyEvent::Handle 115 $self->{hdl} = new AnyEvent::Handle
105 connect => [$self->{host} => $self->{port}], 116 connect => [$self->{host} => $self->{port}],
106 timeout => $self->{timeout}, 117 timeout => $self->{timeout},
107 on_error => sub { 118 on_error => sub {
108 warn "<@_>\n"; 119 warn "@_\n";#d#
109 exit 1; 120 exit 1;
110 }, 121 },
111 on_read => sub { $self->on_read (@_) }, 122 on_read => sub { $self->on_read (@_) },
112 on_eof => $self->{on_eof} || sub { }; 123 on_eof => $self->{on_eof} || sub { };
113 124
114 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 125 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
115 } 126 }
116 127
117 $self->send_msg ( 128 $self->send_msg (client_hello =>
118 client_hello =>
119 name => $self->{name}, 129 name => $self->{name},
120 expected_version => "2.0", 130 expected_version => "2.0",
121 ); 131 );
122 132
123 $self 133 $self
124} 134}
125 135
126#sub progress {
127# my ($self, $txn, $type, $attr) = @_;
128#
129# $self->{progress}->($self, $txn, $type, $attr)
130# if $self->{progress};
131#}
132
133sub send_msg { 136sub send_msg {
134 my ($self, $type, %kv) = @_; 137 my ($self, $type, %kv) = @_;
135 138
136 my $data = delete $kv{data}; 139 my $data = delete $kv{data};
137 140
138 if (exists $kv{id_cb}) { 141 if (exists $kv{id_cb}) {
139 my $id = $kv{identifier} || ++$self->{id}; 142 my $id = $kv{identifier} ||= ++$self->{id};
140 $self->{id}{$id} = delete $kv{id_cb}; 143 $self->{id}{$id} = delete $kv{id_cb};
141 $kv{identifier} = $id;
142 } 144 }
143 145
144 my $msg = (touc $type) . "\012" 146 my $msg = (touc $type) . "\012"
145 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; 147 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
146 148
158 } 160 }
159 161
160 $self->{hdl}->push_write ($msg); 162 $self->{hdl}->push_write ($msg);
161} 163}
162 164
165sub on {
166 my ($self, $cb) = @_;
167
168 # cb return undef - message eaten, remove cb
169 # cb return 0 - message eaten
170 # cb return 1 - pass to next
171
172 push @{ $self->{on} }, $cb;
173}
174
175sub _push_queue {
176 my ($self, $queue) = @_;
177
178 shift @$queue;
179 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
180 if @$queue;
181}
182
183# lock so only one $type (arbitrary string) is in flight,
184# to work around horribly misdesigned protocol.
185sub serialise {
186 my ($self, $type, $cb) = @_;
187
188 my $queue = $self->{serialise}{$type} ||= [];
189 push @$queue, $cb;
190 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
191 unless $#$queue;
192}
193
194# how to merge these types into $self->{persistent}
195our %PERSISTENT_TYPE = (
196 persistent_get => sub { %{ $_[1] } = (type => "persistent_get" , %{ $_[2] }) },
197 persistent_put => sub { %{ $_[1] } = (type => "persistent_put" , %{ $_[2] }) },
198 persistent_put_dir => sub { %{ $_[1] } = (type => "persistent_put_dir", %{ $_[2] }) },
199 persistent_request_modified => sub { %{ $_[1] } = (%{ $_[1] }, %{ $_[2] }) },
200 persistent_request_removed => sub { delete $_[0]{req}{$_[2]{identifier}} },
201
202 simple_progress => sub { $_[1]{simple_progress} = $_[2] }, # get/put
203
204 uri_generated => sub { $_[1]{uri_generated} = $_[2] }, # put
205 generated_metadata => sub { $_[1]{generated_metadata} = $_[2] }, # put
206 started_compression => sub { $_[1]{started_compression} = $_[2] }, # put
207 finished_compression => sub { $_[1]{finished_compression} = $_[2] }, # put
208 put_fetchable => sub { $_[1]{put_fetchable} = $_[2] }, # put
209 put_failed => sub { $_[1]{put_failed} = $_[2] }, # put
210 put_successful => sub { $_[1]{put_successful} = $_[2] }, # put
211
212 sending_to_network => sub { $_[1]{sending_to_network} = $_[2] }, # get
213 compatibility_mode => sub { $_[1]{compatibility_mode} = $_[2] }, # get
214 expected_hashes => sub { $_[1]{expected_hashes} = $_[2] }, # get
215 expected_mime => sub { $_[1]{expected_mime} = $_[2] }, # get
216 expected_data_length => sub { $_[1]{expected_data_length} = $_[2] }, # get
217 get_failed => sub { $_[1]{get_failed} = $_[2] }, # get
218 data_found => sub { $_[1]{data_found} = $_[2] }, # get
219 enter_finite_cooldown => sub { $_[1]{enter_finite_cooldown} = $_[2] }, # get
220);
221
222sub recv {
223 my ($self, $type, $kv, @extra) = @_;
224
225 if (my $cb = $PERSISTENT_TYPE{$type}) {
226 my $id = $kv->{identifier};
227 my $req = $_[0]{req}{$id} ||= {};
228 $cb->($self, $req, $kv);
229 $self->recv (request_change => $kv, $type, @extra);
230 }
231
232 my $on = $self->{on};
233 for (0 .. $#$on) {
234 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
235 splice @$on, $_, 1 unless defined $res;
236 return;
237 }
238 }
239
240 if (my $cb = $self->{queue}[0]) {
241 $cb->($self, $type, $kv, @extra)
242 and shift @{ $self->{queue} };
243 } else {
244 $self->default_recv ($type, $kv, @extra);
245 }
246}
247
163sub on_read { 248sub on_read {
164 my ($self) = @_; 249 my ($self) = @_;
165 250
166 my $type; 251 my $type;
167 my %kv; 252 my %kv;
168 my $rdata; 253 my $rdata;
169
170 my $done_cb = sub {
171 $kv{pkt_type} = $type;
172
173 if (my $cb = $self->{queue}[0]) {
174 $cb->($self, $type, \%kv, $rdata)
175 and shift @{ $self->{queue} };
176 } else {
177 $self->default_recv ($type, \%kv, $rdata);
178 }
179 };
180 254
181 my $hdr_cb; $hdr_cb = sub { 255 my $hdr_cb; $hdr_cb = sub {
182 if ($_[1] =~ /^([^=]+)=(.*)$/) { 256 if ($_[1] =~ /^([^=]+)=(.*)$/) {
183 my ($k, $v) = ($1, $2); 257 my ($k, $v) = ($1, $2);
184 my @k = split /\./, tolc $k; 258 my @k = split /\./, tolc $k;
197 271
198 $_[0]->push_read (line => $hdr_cb); 272 $_[0]->push_read (line => $hdr_cb);
199 } elsif ($_[1] eq "Data") { 273 } elsif ($_[1] eq "Data") {
200 $_[0]->push_read (chunk => delete $kv{data_length}, sub { 274 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
201 $rdata = \$_[1]; 275 $rdata = \$_[1];
202 $done_cb->(); 276 $self->recv ($type, \%kv, $rdata);
203 }); 277 });
204 } elsif ($_[1] eq "EndMessage") { 278 } elsif ($_[1] eq "EndMessage") {
205 $done_cb->(); 279 $self->recv ($type, \%kv);
206 } else { 280 } else {
207 die "protocol error, expected message end, got $_[1]\n";#d# 281 die "protocol error, expected message end, got $_[1]\n";#d#
208 } 282 }
209 }; 283 };
210 284
220 if ($type eq "node_hello") { 294 if ($type eq "node_hello") {
221 $self->{node_hello} = $kv; 295 $self->{node_hello} = $kv;
222 } elsif (exists $self->{id}{$kv->{identifier}}) { 296 } elsif (exists $self->{id}{$kv->{identifier}}) {
223 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) 297 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
224 and delete $self->{id}{$kv->{identifier}}; 298 and delete $self->{id}{$kv->{identifier}};
225 } else {
226 # on_warn
227 #warn "protocol warning (unexpected $type message)\n";
228 } 299 }
229} 300}
301
302our $NOP_CB = sub { };
230 303
231sub _txn { 304sub _txn {
232 my ($name, $sub) = @_; 305 my ($name, $sub) = @_;
233 306
234 *{$name} = sub { 307 *{$name} = sub {
235 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
236 &$sub;
237 $cv
238 };
239
240 *{"$name\_sync"} = sub {
241 splice @_, 1, 0, (my $cv = AnyEvent->condvar); 308 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
242 &$sub; 309 &$sub;
243 $cv->recv 310 $cv->recv
244 }; 311 };
245}
246 312
247=item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]]) 313 *{"$name\_"} = sub {
314 splice @_, 1, 0, pop || $NOP_CB;
315 &$sub;
316 };
317}
248 318
249=item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]]) 319=item $peers = $fcp->list_peers ([$with_metdata[, $with_volatile]])
250 320
251=cut 321=cut
252 322
253_txn list_peers => sub { 323_txn list_peers => sub {
254 my ($self, $cv, $with_metadata, $with_volatile) = @_; 324 my ($self, $cv, $with_metadata, $with_volatile) = @_;
270 } 340 }
271 }, 341 },
272 ); 342 );
273}; 343};
274 344
275=item $cv = $fcp->list_peer_notes ($node_identifier)
276
277=item $notes = $fcp->list_peer_notes_sync ($node_identifier) 345=item $notes = $fcp->list_peer_notes ($node_identifier)
278 346
279=cut 347=cut
280 348
281_txn list_peer_notes => sub { 349_txn list_peer_notes => sub {
282 my ($self, $cv, $node_identifier) = @_; 350 my ($self, $cv, $node_identifier) = @_;
290 1 358 1
291 }, 359 },
292 ); 360 );
293}; 361};
294 362
295=item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
296
297=item $fcp->watch_global_sync ($enabled[, $verbosity_mask]) 363=item $fcp->watch_global ($enabled[, $verbosity_mask])
298 364
299=cut 365=cut
300 366
301_txn watch_global => sub { 367_txn watch_global => sub {
302 my ($self, $cv, $enabled, $verbosity_mask) = @_; 368 my ($self, $cv, $enabled, $verbosity_mask) = @_;
307 ); 373 );
308 374
309 $cv->(); 375 $cv->();
310}; 376};
311 377
312=item $cv = $fcp->list_persistent_requests
313
314=item $reqs = $fcp->list_persistent_requests_sync 378=item $reqs = $fcp->list_persistent_requests
315 379
316=cut 380=cut
317 381
318_txn list_persistent_requests => sub { 382_txn list_persistent_requests => sub {
319 my ($self, $cv) = @_; 383 my ($self, $cv) = @_;
320 384
385 $self->serialise (list_persistent_requests => sub {
386 my ($self, $guard) = @_;
387
321 my %res; 388 my @res;
322 389
323 $self->send_msg ("list_persistent_requests"); 390 $self->send_msg ("list_persistent_requests");
324 391
325 push @{ $self->{queue} }, sub { 392 $self->on (sub {
326 my ($self, $type, $kv, $rdata) = @_; 393 my ($self, $type, $kv, $rdata) = @_;
327 394
395 $guard if 0;
396
328 if ($type eq "end_list_persistent_requests") { 397 if ($type eq "end_list_persistent_requests") {
329 $cv->(\%res); 398 $cv->(\@res);
399 return;
400 } else {
401 my $id = $kv->{identifier};
402
403 if ($type =~ /^persistent_(get|put|put_dir)$/) {
404 push @res, [$type, $kv];
405 }
406 }
407
330 1 408 1
331 } else {
332 my $id = $kv->{identifier};
333
334 if ($type =~ /^persistent_(get|put|put_dir)$/) {
335 $res{$id} = {
336 type => $1,
337 %{ $res{$id} },
338 %$kv,
339 };
340 } elsif ($type eq "simple_progress") {
341 delete $kv->{pkt_type}; # save memory
342 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
343 } else {
344 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
345 }
346 0
347 } 409 });
348 }; 410 });
349}; 411};
350 412
351=item $cv = $fcp->remove_request ($global, $identifier)
352
353=item $status = $fcp->remove_request_sync ($global, $identifier) 413=item $status = $fcp->remove_request ($global, $identifier)
354 414
355=cut 415=cut
356 416
357_txn remove_request => sub { 417_txn remove_request => sub {
358 my ($self, $cv, $global, $identifier) = @_; 418 my ($self, $cv, $global, $identifier) = @_;
365 425
366 $cv->($kv); 426 $cv->($kv);
367 1 427 1
368 }, 428 },
369 ); 429 );
370
371 $cv->();
372}; 430};
373 431
374=item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]]) 432=item $sync = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
375
376=item $status = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
377 433
378=cut 434=cut
379 435
380_txn modify_persistent_request => sub { 436_txn modify_persistent_request => sub {
381 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_; 437 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
382 438
383 $self->send_msg (modify_persistent_request => 439 $self->send_msg (modify_persistent_request =>
384 global => $global ? "true" : "false", 440 global => $global ? "true" : "false",
385 identifier => $identifier,
386 defined $client_token ? (client_token => $client_token ) : (), 441 defined $client_token ? (client_token => $client_token ) : (),
387 defined $priority_class ? (priority_class => $priority_class) : (), 442 defined $priority_class ? (priority_class => $priority_class) : (),
443 identifier => $identifier,
388 id_cb => sub { 444 id_cb => sub {
389 my ($self, $type, $kv, $rdata) = @_; 445 my ($self, $type, $kv, $rdata) = @_;
390 446
391 $cv->($kv); 447 $cv->($kv);
392 1 448 1
393 }, 449 },
394 ); 450 );
395
396 $cv->();
397}; 451};
398 452
399=item $cv = $fcp->get_plugin_info ($name, $detailed)
400
401=item $info = $fcp->get_plugin_info_sync ($name, $detailed) 453=item $info = $fcp->get_plugin_info ($name, $detailed)
402 454
403=cut 455=cut
404 456
405_txn get_plugin_info => sub { 457_txn get_plugin_info => sub {
406 my ($self, $cv, $name, $detailed) = @_; 458 my ($self, $cv, $name, $detailed) = @_;
413 465
414 $cv->($kv); 466 $cv->($kv);
415 1 467 1
416 }, 468 },
417 ); 469 );
470};
418 471
419 $cv->(); 472=item $status = $fcp->client_get ($uri, $identifier, %kv)
473
474%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
475
476ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
477priority_class, persistence, client_token, global, return_type,
478binary_blob, allowed_mime_types, filename, temp_filename
479
480=cut
481
482_txn client_get => sub {
483 my ($self, $cv, $uri, $identifier, %kv) = @_;
484
485 $self->send_msg (client_get =>
486 %kv,
487 uri => $uri,
488 identifier => $identifier,
489 );
490};
491
492=item $status = $fcp->remove_request ($identifier[, $global])
493
494Remove the request with the given isdentifier. Returns true if successful,
495false on error.
496
497=cut
498
499_txn remove_request => sub {
500 my ($self, $cv, $identifier, $global) = @_;
501
502 $self->serialise ($identifier => sub {
503 my ($self, $guard) = @_;
504
505 $self->send_msg (remove_request =>
506 identifier => $identifier,
507 global => $global ? "true" : "false",
508 );
509 $self->on (sub {
510 my ($self, $type, $kv, @extra) = @_;
511
512 if ($kv->{identifier} eq $identifier) {
513 if ($type eq "persistent_request_removed") {
514 $cv->(1);
515 return;
516 } elsif ($type eq "protocol_error") {
517 $cv->(undef);
518 return;
519 }
520 }
521
522 1
523 });
524 });
525};
526
527=item ($can_read, $can_write) = $fcp->test_dda ($local_directory, $remote_directory, $want_read, $want_write))
528
529The DDA test in FCP is probably the single most broken protocol - only
530one directory test can be outstanding at any time, and some guessing and
531heuristics are involved in mangling the paths.
532
533This function combines C<TestDDARequest> and C<TestDDAResponse> in one
534request, handling file reading and writing as well, and tries very hard to
535do the right thing.
536
537Both C<$local_directory> and C<$remote_directory> must specify the same
538directory - C<$local_directory> is the directory path on the client (where
539L<AnyEvent::FCP> runs) and C<$remote_directory> is the directory path on
540the server (where the freenet node runs). When both are running on the
541same node, the paths are generally identical.
542
543C<$want_read> and C<$want_write> should be set to a true value when you
544want to read (get) files or write (put) files, respectively.
545
546On error, an exception is thrown. Otherwise, C<$can_read> and
547C<$can_write> indicate whether you can reaqd or write to freenet via the
548directory.
549
550=cut
551
552_txn test_dda => sub {
553 my ($self, $cv, $local, $remote, $want_read, $want_write) = @_;
554
555 $self->serialise (test_dda => sub {
556 my ($self, $guard) = @_;
557
558 $self->send_msg (test_dda_request =>
559 directory => $remote,
560 want_read_directory => $want_read ? "true" : "false",
561 want_write_directory => $want_write ? "true" : "false",
562 );
563 $self->on (sub {
564 my ($self, $type, $kv) = @_;
565
566 if ($type eq "test_dda_reply") {
567 # the filenames are all relative to the server-side directory,
568 # which might or might not match $remote anymore, so we
569 # need to rewrite the paths to be relative to $local
570 for my $k (qw(read_filename write_filename)) {
571 my $f = $kv->{$k};
572 for my $dir ($kv->{directory}, $remote) {
573 if ($dir eq substr $f, 0, length $dir) {
574 substr $f, 0, 1 + length $dir, "";
575 $kv->{$k} = $f;
576 last;
577 }
578 }
579 }
580
581 my %response = (directory => $remote);
582
583 if (length $kv->{read_filename}) {
584 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
585 sysread $fh, my $buf, -s $fh;
586 $response{read_content} = $buf;
587 }
588 }
589
590 if (length $kv->{write_filename}) {
591 if (open my $fh, ">:raw", "$local/$kv->{write_filename}") {
592 syswrite $fh, $kv->{content_to_write};
593 }
594 }
595
596 $self->send_msg (test_dda_response => %response);
597
598 $self->on (sub {
599 my ($self, $type, $kv) = @_;
600
601 $guard if 0; # reference
602
603 if ($type eq "test_dda_complete") {
604 $cv->(
605 $kv->{read_directory_allowed} eq "true",
606 $kv->{write_directory_allowed} eq "true",
607 );
608 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
609 $cv->croak ($kv->{extra_description});
610 return;
611 }
612
613 1
614 });
615
616 return;
617 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
618 $cv->croak ($kv->{extra_description});
619 return;
620 }
621
622 1
623 });
624 });
420}; 625};
421 626
422=back 627=back
423 628
629=head2 REQUEST CACHE
630
631The C<AnyEvent::FCP> class keeps a request cache, where it caches all
632information from requests.
633
634For these messages, it will store a copy of the key-value pairs, together with a C<type> slot,
635in C<< $fcp->{req}{$identifier} >>:
636
637 persistent_get
638 persistent_put
639 persistent_put_dir
640
641This message updates the stored data:
642
643 persistent_request_modified
644
645This message will remove this entry:
646
647 persistent_request_removed
648
649These messages get merged into the cache entry, under their
650type, i.e. a C<simple_progress> message will be stored in C<<
651$fcp->{req}{$identifier}{simple_progress} >>:
652
653 simple_progress # get/put
654
655 uri_generated # put
656 generated_metadata # put
657 started_compression # put
658 finished_compression # put
659 put_failed # put
660 put_fetchable # put
661 put_successful # put
662
663 sending_to_network # get
664 compatibility_mode # get
665 expected_hashes # get
666 expected_mime # get
667 expected_data_length # get
668 get_failed # get
669 data_found # get
670 enter_finite_cooldown # get
671
672In addition, an event (basically a fake message) of type C<request_changed> is generated
673on every change, which will be called as C<< $cb->($fcp, $kv, $type) >>, where C<$type>
674is the type of the original message triggering the change,
675
676To fill this cache with the global queue and keep it updated,
677call C<watch_global> to subscribe to updates, followed by
678C<list_persistent_requests_sync>.
679
680 $fcp->watch_global_sync_; # do not wait
681 $fcp->list_persistent_requests; # wait
682
683To get a better idea of what is stored in the cache, here is an example of
684what might be stored in C<< $fcp->{req}{"Frost-gpl.txt"} >>:
685
686 {
687 identifier => "Frost-gpl.txt",
688 uri => 'CHK@Fnx5kzdrfE,EImdzaVyEWl,AAIC--8/gpl.txt',
689 binary_blob => "false",
690 global => "true",
691 max_retries => -1,
692 max_size => 9223372036854775807,
693 persistence => "forever",
694 priority_class => 3,
695 real_time => "false",
696 return_type => "direct",
697 started => "true",
698 type => "persistent_get",
699 verbosity => 2147483647,
700 sending_to_network => {
701 identifier => "Frost-gpl.txt",
702 global => "true",
703 },
704 compatibility_mode => {
705 identifier => "Frost-gpl.txt",
706 definitive => "true",
707 dont_compress => "false",
708 global => "true",
709 max => "COMPAT_1255",
710 min => "COMPAT_1255",
711 },
712 expected_hashes => {
713 identifier => "Frost-gpl.txt",
714 global => "true",
715 hashes => {
716 ed2k => "d83596f5ee3b7...",
717 md5 => "e0894e4a2a6...",
718 sha1 => "...",
719 sha256 => "...",
720 sha512 => "...",
721 tth => "...",
722 },
723 },
724 expected_mime => {
725 identifier => "Frost-gpl.txt",
726 global => "true",
727 metadata => { content_type => "application/rar" },
728 },
729 expected_data_length => {
730 identifier => "Frost-gpl.txt",
731 data_length => 37576,
732 global => "true",
733 },
734 simple_progress => {
735 identifier => "Frost-gpl.txt",
736 failed => 0,
737 fatally_failed => 0,
738 finalized_total => "true",
739 global => "true",
740 last_progress => 1438639282628,
741 required => 372,
742 succeeded => 102,
743 total => 747,
744 },
745 data_found => {
746 identifier => "Frost-gpl.txt",
747 completion_time => 1438663354026,
748 data_length => 37576,
749 global => "true",
750 metadata => { content_type => "image/jpeg" },
751 startup_time => 1438657196167,
752 },
753 }
754
424=head1 EXAMPLE PROGRAM 755=head1 EXAMPLE PROGRAM
425 756
426 use AnyEvent::FCP; 757 use AnyEvent::FCP;
427 758
428 my $fcp = new AnyEvent::FCP; 759 my $fcp = new AnyEvent::FCP;
429 760
430 # let us look at the global request list 761 # let us look at the global request list
431 $fcp->watch_global (1, 0); 762 $fcp->watch_global_ (1);
432 763
433 # list them, synchronously 764 # list them, synchronously
434 my $req = $fcp->list_persistent_requests_sync; 765 my $req = $fcp->list_persistent_requests;
435 766
436 # go through all requests 767 # go through all requests
768TODO
437 for my $req (values %$req) { 769 for my $req (values %$req) {
438 # skip jobs not directly-to-disk 770 # skip jobs not directly-to-disk
439 next unless $req->{return_type} eq "disk"; 771 next unless $req->{return_type} eq "disk";
440 # skip jobs not issued by FProxy 772 # skip jobs not issued by FProxy
441 next unless $req->{identifier} =~ /^FProxy:/; 773 next unless $req->{identifier} =~ /^FProxy:/;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines