… | |
… | |
6 | |
6 | |
7 | use AnyEvent::FCP; |
7 | use AnyEvent::FCP; |
8 | |
8 | |
9 | my $fcp = new AnyEvent::FCP; |
9 | my $fcp = new AnyEvent::FCP; |
10 | |
10 | |
11 | # transactions return condvars |
11 | # transactions return condvars |
12 | my $lp_cv = $fcp->list_peers; |
12 | my $lp_cv = $fcp->list_peers; |
13 | my $pr_cv = $fcp->list_persistent_requests; |
13 | my $pr_cv = $fcp->list_persistent_requests; |
14 | |
14 | |
15 | my $peers = $lp_cv->recv; |
15 | my $peers = $lp_cv->recv; |
16 | my $reqs = $pr_cv->recv; |
16 | my $reqs = $pr_cv->recv; |
… | |
… | |
26 | The module uses L<AnyEvent> to find a suitable event module. |
26 | The module uses L<AnyEvent> to find a suitable event module. |
27 | |
27 | |
28 | Only very little is implemented, ask if you need more, and look at the |
28 | Only very little is implemented, ask if you need more, and look at the |
29 | example program later in this section. |
29 | example program later in this section. |
30 | |
30 | |
|
|
31 | =head2 EXAMPLE |
|
|
32 | |
|
|
33 | This example fetches the download list and sets the priority of all files |
|
|
34 | with "a" in their name to "emergency": |
|
|
35 | |
|
|
36 | use AnyEvent::FCP; |
|
|
37 | |
|
|
38 | my $fcp = new AnyEvent::FCP; |
|
|
39 | |
|
|
40 | $fcp->watch_global_sync (1, 0); |
|
|
41 | my $req = $fcp->list_persistent_requests_sync; |
|
|
42 | |
|
|
43 | for my $req (values %$req) { |
|
|
44 | if ($req->{filename} =~ /a/) { |
|
|
45 | $fcp->modify_persistent_request_sync (1, $req->{identifier}, undef, 0); |
|
|
46 | } |
|
|
47 | } |
|
|
48 | |
31 | =head2 IMPORT TAGS |
49 | =head2 IMPORT TAGS |
32 | |
50 | |
33 | Nothing much can be "imported" from this module right now. |
51 | Nothing much can be "imported" from this module right now. |
34 | |
52 | |
35 | =head2 THE AnyEvent::FCP CLASS |
53 | =head2 THE AnyEvent::FCP CLASS |
… | |
… | |
42 | |
60 | |
43 | use common::sense; |
61 | use common::sense; |
44 | |
62 | |
45 | use Carp; |
63 | use Carp; |
46 | |
64 | |
47 | our $VERSION = '0.21'; |
65 | our $VERSION = '0.3'; |
48 | |
66 | |
49 | use Scalar::Util (); |
67 | use Scalar::Util (); |
50 | |
68 | |
51 | use AnyEvent; |
69 | use AnyEvent; |
52 | use AnyEvent::Handle; |
70 | use AnyEvent::Handle; |
|
|
71 | use AnyEvent::Util (); |
53 | |
72 | |
54 | sub touc($) { |
73 | sub touc($) { |
55 | local $_ = shift; |
74 | local $_ = shift; |
56 | 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime)(?:_|$))/\U$1/; |
75 | 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/; |
57 | s/(?:^|_)(.)/\U$1/g; |
76 | s/(?:^|_)(.)/\U$1/g; |
58 | $_ |
77 | $_ |
59 | } |
78 | } |
60 | |
79 | |
61 | sub tolc($) { |
80 | sub tolc($) { |
62 | local $_ = shift; |
81 | local $_ = shift; |
63 | 1 while s/(SVK|CHK|URI|FCP|DS|MIME)([^_])/$1\_$2/i; |
82 | 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/i; |
64 | 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME)/$1\_$2/i; |
83 | 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/i; |
65 | s/(?<=[a-z])(?=[A-Z])/_/g; |
84 | s/(?<=[a-z])(?=[A-Z])/_/g; |
66 | lc |
85 | lc |
67 | } |
86 | } |
68 | |
87 | |
69 | =item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] |
88 | =item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] |
… | |
… | |
72 | 127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). |
91 | 127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). |
73 | |
92 | |
74 | If no C<name> was specified, then AnyEvent::FCP will generate a |
93 | If no C<name> was specified, then AnyEvent::FCP will generate a |
75 | (hopefully) unique client name for you. |
94 | (hopefully) unique client name for you. |
76 | |
95 | |
77 | =cut |
|
|
78 | |
|
|
79 | #TODO |
|
|
80 | #You can install a progress callback that is being called with the AnyEvent::FCP |
96 | You can install a progress callback that is being called with the AnyEvent::FCP |
81 | #object, a txn object, the type of the transaction and the attributes. Use |
97 | object, the type, a hashref with key-value pairs and a reference to any received data, |
82 | #it like this: |
98 | for all unsolicited messages. |
83 | # |
99 | |
|
|
100 | Example: |
|
|
101 | |
84 | # sub progress_cb { |
102 | sub progress_cb { |
85 | # my ($self, $txn, $type, $attr) = @_; |
103 | my ($self, $type, $kv, $rdata) = @_; |
86 | # |
104 | |
87 | # warn "progress<$txn,$type," . (join ":", %$attr) . ">\n"; |
105 | if ($type eq "simple_progress") { |
|
|
106 | warn "$kv->{identifier} $kv->{succeeded}/$kv->{required}\n"; |
|
|
107 | } |
88 | # } |
108 | } |
|
|
109 | |
|
|
110 | =cut |
89 | |
111 | |
90 | sub new { |
112 | sub new { |
91 | my $class = shift; |
113 | my $class = shift; |
92 | my $self = bless { @_ }, $class; |
114 | my $self = bless { @_ }, $class; |
93 | |
115 | |
94 | $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; |
116 | $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; |
95 | $self->{port} ||= $ENV{FREDPORT} || 9481; |
117 | $self->{port} ||= $ENV{FREDPORT} || 9481; |
96 | $self->{name} ||= time.rand.rand.rand; # lame |
118 | $self->{name} ||= time.rand.rand.rand; # lame |
97 | $self->{timeout} ||= 600; |
119 | $self->{timeout} ||= 3600*2; |
|
|
120 | $self->{progress} ||= sub { }; |
98 | |
121 | |
99 | $self->{id} = "a0"; |
122 | $self->{id} = "a0"; |
100 | |
123 | |
101 | { |
124 | { |
102 | Scalar::Util::weaken (my $self = $self); |
125 | Scalar::Util::weaken (my $self = $self); |
103 | |
126 | |
104 | $self->{hdl} = new AnyEvent::Handle |
127 | $self->{hdl} = new AnyEvent::Handle |
105 | connect => [$self->{host} => $self->{port}], |
128 | connect => [$self->{host} => $self->{port}], |
106 | timeout => $self->{timeout}, |
129 | timeout => $self->{timeout}, |
107 | on_error => sub { |
130 | on_error => sub { |
108 | warn "<@_>\n"; |
131 | warn "@_\n";#d# |
109 | exit 1; |
132 | exit 1; |
110 | }, |
133 | }, |
111 | on_read => sub { $self->on_read (@_) }, |
134 | on_read => sub { $self->on_read (@_) }, |
112 | on_eof => $self->{on_eof} || sub { }; |
135 | on_eof => $self->{on_eof} || sub { }; |
113 | |
136 | |
… | |
… | |
121 | ); |
144 | ); |
122 | |
145 | |
123 | $self |
146 | $self |
124 | } |
147 | } |
125 | |
148 | |
126 | #sub progress { |
|
|
127 | # my ($self, $txn, $type, $attr) = @_; |
|
|
128 | # |
|
|
129 | # $self->{progress}->($self, $txn, $type, $attr) |
|
|
130 | # if $self->{progress}; |
|
|
131 | #} |
|
|
132 | |
|
|
133 | sub send_msg { |
149 | sub send_msg { |
134 | my ($self, $type, %kv) = @_; |
150 | my ($self, $type, %kv) = @_; |
135 | |
151 | |
136 | my $data = delete $kv{data}; |
152 | my $data = delete $kv{data}; |
137 | |
153 | |
138 | if (exists $kv{id_cb}) { |
154 | if (exists $kv{id_cb}) { |
139 | my $id = $kv{identifier} || ++$self->{id}; |
155 | my $id = $kv{identifier} ||= ++$self->{id}; |
140 | $self->{id}{$id} = delete $kv{id_cb}; |
156 | $self->{id}{$id} = delete $kv{id_cb}; |
141 | $kv{identifier} = $id; |
|
|
142 | } |
157 | } |
143 | |
158 | |
144 | my $msg = (touc $type) . "\012" |
159 | my $msg = (touc $type) . "\012" |
145 | . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; |
160 | . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; |
146 | |
161 | |
… | |
… | |
158 | } |
173 | } |
159 | |
174 | |
160 | $self->{hdl}->push_write ($msg); |
175 | $self->{hdl}->push_write ($msg); |
161 | } |
176 | } |
162 | |
177 | |
|
|
178 | sub on { |
|
|
179 | my ($self, $cb) = @_; |
|
|
180 | |
|
|
181 | # cb return undef - message eaten, remove cb |
|
|
182 | # cb return 0 - message eaten |
|
|
183 | # cb return 1 - pass to next |
|
|
184 | |
|
|
185 | push @{ $self->{on} }, $cb; |
|
|
186 | } |
|
|
187 | |
|
|
188 | sub _push_queue { |
|
|
189 | my ($self, $queue) = @_; |
|
|
190 | |
|
|
191 | shift @$queue; |
|
|
192 | $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) }) |
|
|
193 | if @$queue; |
|
|
194 | } |
|
|
195 | |
|
|
196 | # lock so only one $type (arbitrary string) is in flight, |
|
|
197 | # to work around horribly misdesigned protocol. |
|
|
198 | sub serialise { |
|
|
199 | my ($self, $type, $cb) = @_; |
|
|
200 | |
|
|
201 | my $queue = $self->{serialise}{$type} ||= []; |
|
|
202 | push @$queue, $cb; |
|
|
203 | $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) }) |
|
|
204 | unless $#$queue; |
|
|
205 | } |
|
|
206 | |
163 | sub on_read { |
207 | sub on_read { |
164 | my ($self) = @_; |
208 | my ($self) = @_; |
165 | |
209 | |
166 | my $type; |
210 | my $type; |
167 | my %kv; |
211 | my %kv; |
168 | my $rdata; |
212 | my $rdata; |
169 | |
213 | |
170 | my $done_cb = sub { |
214 | my $done_cb = sub { |
171 | $kv{pkt_type} = $type; |
215 | $kv{pkt_type} = $type; |
|
|
216 | |
|
|
217 | my $on = $self->{on}; |
|
|
218 | for (0 .. $#$on) { |
|
|
219 | unless (my $res = $on->[$_]($self, $type, \%kv, $rdata)) { |
|
|
220 | splice @$on, $_, 1 unless defined $res; |
|
|
221 | return; |
|
|
222 | } |
|
|
223 | } |
172 | |
224 | |
173 | if (my $cb = $self->{queue}[0]) { |
225 | if (my $cb = $self->{queue}[0]) { |
174 | $cb->($self, $type, \%kv, $rdata) |
226 | $cb->($self, $type, \%kv, $rdata) |
175 | and shift @{ $self->{queue} }; |
227 | and shift @{ $self->{queue} }; |
176 | } else { |
228 | } else { |
… | |
… | |
221 | $self->{node_hello} = $kv; |
273 | $self->{node_hello} = $kv; |
222 | } elsif (exists $self->{id}{$kv->{identifier}}) { |
274 | } elsif (exists $self->{id}{$kv->{identifier}}) { |
223 | $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) |
275 | $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) |
224 | and delete $self->{id}{$kv->{identifier}}; |
276 | and delete $self->{id}{$kv->{identifier}}; |
225 | } else { |
277 | } else { |
226 | # on_warn |
278 | &{ $self->{progress} }; |
227 | #warn "protocol warning (unexpected $type message)\n"; |
|
|
228 | } |
279 | } |
229 | } |
280 | } |
230 | |
281 | |
231 | sub _txn { |
282 | sub _txn { |
232 | my ($name, $sub) = @_; |
283 | my ($name, $sub) = @_; |
… | |
… | |
316 | =cut |
367 | =cut |
317 | |
368 | |
318 | _txn list_persistent_requests => sub { |
369 | _txn list_persistent_requests => sub { |
319 | my ($self, $cv) = @_; |
370 | my ($self, $cv) = @_; |
320 | |
371 | |
|
|
372 | $self->serialise (list_persistent_requests => sub { |
|
|
373 | my ($self, $guard) = @_; |
|
|
374 | |
321 | my %res; |
375 | my %res; |
322 | |
376 | |
323 | $self->send_msg ("list_persistent_requests"); |
377 | $self->send_msg ("list_persistent_requests"); |
324 | |
378 | |
325 | push @{ $self->{queue} }, sub { |
379 | $self->on (sub { |
326 | my ($self, $type, $kv, $rdata) = @_; |
380 | my ($self, $type, $kv, $rdata) = @_; |
327 | |
381 | |
|
|
382 | $guard if 0; |
|
|
383 | |
328 | if ($type eq "end_list_persistent_requests") { |
384 | if ($type eq "end_list_persistent_requests") { |
329 | $cv->(\%res); |
385 | $cv->(\%res); |
|
|
386 | return; |
|
|
387 | } else { |
|
|
388 | my $id = $kv->{identifier}; |
|
|
389 | |
|
|
390 | if ($type =~ /^persistent_(get|put|put_dir)$/) { |
|
|
391 | $res{$id} = { |
|
|
392 | type => $1, |
|
|
393 | %{ $res{$id} }, |
|
|
394 | %$kv, |
|
|
395 | }; |
|
|
396 | } elsif ($type eq "simple_progress") { |
|
|
397 | delete $kv->{pkt_type}; # save memory |
|
|
398 | push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv; |
|
|
399 | } else { |
|
|
400 | $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv; |
|
|
401 | } |
|
|
402 | } |
|
|
403 | |
330 | 1 |
404 | 1 |
331 | } else { |
|
|
332 | my $id = $kv->{identifier}; |
|
|
333 | |
|
|
334 | if ($type =~ /^persistent_(get|put|put_dir)$/) { |
|
|
335 | $res{$id} = { |
|
|
336 | type => $1, |
|
|
337 | %{ $res{$id} }, |
|
|
338 | %$kv, |
|
|
339 | }; |
|
|
340 | } elsif ($type eq "simple_progress") { |
|
|
341 | delete $kv->{pkt_type}; # save memory |
|
|
342 | push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv; |
|
|
343 | } else { |
|
|
344 | $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv; |
|
|
345 | } |
|
|
346 | 0 |
|
|
347 | } |
405 | }); |
348 | }; |
406 | }); |
349 | }; |
407 | }; |
350 | |
408 | |
351 | =item $cv = $fcp->remove_request ($global, $identifier) |
409 | =item $cv = $fcp->remove_request ($global, $identifier) |
352 | |
410 | |
353 | =item $status = $fcp->remove_request_sync ($global, $identifier) |
411 | =item $status = $fcp->remove_request_sync ($global, $identifier) |
… | |
… | |
437 | |
495 | |
438 | $cv->($kv); |
496 | $cv->($kv); |
439 | 1 |
497 | 1 |
440 | }, |
498 | }, |
441 | ); |
499 | ); |
|
|
500 | }; |
|
|
501 | |
|
|
502 | =item $cv = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write) |
|
|
503 | |
|
|
504 | =item ($can_read, $can_write) = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write)) |
|
|
505 | |
|
|
506 | The DDA test in FCP is probably the single most broken protocol - only |
|
|
507 | one directory test can be outstanding at any time, and some guessing and |
|
|
508 | heuristics are involved in mangling the paths. |
|
|
509 | |
|
|
510 | This function combines C<TestDDARequest> and C<TestDDAResponse> in one |
|
|
511 | request, handling file reading and writing as well. |
|
|
512 | |
|
|
513 | =cut |
|
|
514 | |
|
|
515 | _txn test_dda => sub { |
|
|
516 | my ($self, $cv, $local, $remote, $want_read, $want_write) = @_; |
|
|
517 | |
|
|
518 | $self->serialise (test_dda => sub { |
|
|
519 | my ($self, $guard) = @_; |
|
|
520 | |
|
|
521 | $self->send_msg (test_dda_request => |
|
|
522 | directory => $remote, |
|
|
523 | want_read_directory => $want_read ? "true" : "false", |
|
|
524 | want_write_directory => $want_write ? "true" : "false", |
|
|
525 | ); |
|
|
526 | $self->on (sub { |
|
|
527 | my ($self, $type, $kv) = @_; |
|
|
528 | |
|
|
529 | if ($type eq "test_dda_reply") { |
|
|
530 | # the filenames are all relative to the server-side directory, |
|
|
531 | # which might or might not match $remote anymore, so we |
|
|
532 | # need to rewrite the paths to be relative to $local |
|
|
533 | for my $k (qw(read_filename write_filename)) { |
|
|
534 | my $f = $kv->{$k}; |
|
|
535 | for my $dir ($kv->{directory}, $remote) { |
|
|
536 | if ($dir eq substr $f, 0, length $dir) { |
|
|
537 | substr $f, 0, 1 + length $dir, ""; |
|
|
538 | $kv->{$k} = $f; |
|
|
539 | last; |
|
|
540 | } |
|
|
541 | } |
|
|
542 | } |
|
|
543 | |
|
|
544 | my %response = (directory => $remote); |
|
|
545 | |
|
|
546 | if (length $kv->{read_filename}) { |
|
|
547 | warn "$local/$kv->{read_filename}";#d# |
|
|
548 | if (open my $fh, "<:raw", "$local/$kv->{read_filename}") { |
|
|
549 | sysread $fh, my $buf, -s $fh; |
|
|
550 | $response{read_content} = $buf; |
|
|
551 | } |
|
|
552 | } |
|
|
553 | |
|
|
554 | if (length $kv->{write_filename}) { |
|
|
555 | if (open my $fh, ">:raw", "$local/$kv->{write_filename}") { |
|
|
556 | syswrite $fh, $kv->{content_to_write}; |
|
|
557 | } |
|
|
558 | } |
|
|
559 | |
|
|
560 | $self->send_msg (test_dda_response => %response); |
|
|
561 | |
|
|
562 | $self->on (sub { |
|
|
563 | my ($self, $type, $kv) = @_; |
|
|
564 | |
|
|
565 | $guard if 0; # reference |
|
|
566 | |
|
|
567 | if ($type eq "test_dda_complete") { |
|
|
568 | $cv->( |
|
|
569 | $kv->{read_directory_allowed} eq "true", |
|
|
570 | $kv->{write_directory_allowed} eq "true", |
|
|
571 | ); |
|
|
572 | } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) { |
|
|
573 | $cv->croak ($kv->{extra_description}); |
|
|
574 | return; |
|
|
575 | } |
|
|
576 | |
|
|
577 | 1 |
|
|
578 | }); |
|
|
579 | |
|
|
580 | return; |
|
|
581 | } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) { |
|
|
582 | $cv->croak ($kv->{extra_description}); |
|
|
583 | return; |
|
|
584 | } |
|
|
585 | |
|
|
586 | 1 |
|
|
587 | }); |
|
|
588 | }); |
442 | }; |
589 | }; |
443 | |
590 | |
444 | =back |
591 | =back |
445 | |
592 | |
446 | =head1 EXAMPLE PROGRAM |
593 | =head1 EXAMPLE PROGRAM |