ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.5 by root, Tue Dec 1 13:49:56 2009 UTC vs.
Revision 1.10 by root, Tue Aug 4 00:50:25 2015 UTC

6 6
7 use AnyEvent::FCP; 7 use AnyEvent::FCP;
8 8
9 my $fcp = new AnyEvent::FCP; 9 my $fcp = new AnyEvent::FCP;
10 10
11# transactions return condvars 11 # transactions return condvars
12 my $lp_cv = $fcp->list_peers; 12 my $lp_cv = $fcp->list_peers;
13 my $pr_cv = $fcp->list_persistent_requests; 13 my $pr_cv = $fcp->list_persistent_requests;
14 14
15 my $peers = $lp_cv->recv; 15 my $peers = $lp_cv->recv;
16 my $reqs = $pr_cv->recv; 16 my $reqs = $pr_cv->recv;
26The module uses L<AnyEvent> to find a suitable event module. 26The module uses L<AnyEvent> to find a suitable event module.
27 27
28Only very little is implemented, ask if you need more, and look at the 28Only very little is implemented, ask if you need more, and look at the
29example program later in this section. 29example program later in this section.
30 30
31=head2 EXAMPLE
32
33This example fetches the download list and sets the priority of all files
34with "a" in their name to "emergency":
35
36 use AnyEvent::FCP;
37
38 my $fcp = new AnyEvent::FCP;
39
40 $fcp->watch_global_sync (1, 0);
41 my $req = $fcp->list_persistent_requests_sync;
42
43 for my $req (values %$req) {
44 if ($req->{filename} =~ /a/) {
45 $fcp->modify_persistent_request_sync (1, $req->{identifier}, undef, 0);
46 }
47 }
48
31=head2 IMPORT TAGS 49=head2 IMPORT TAGS
32 50
33Nothing much can be "imported" from this module right now. 51Nothing much can be "imported" from this module right now.
34 52
35=head2 THE AnyEvent::FCP CLASS 53=head2 THE AnyEvent::FCP CLASS
42 60
43use common::sense; 61use common::sense;
44 62
45use Carp; 63use Carp;
46 64
47our $VERSION = '0.21'; 65our $VERSION = '0.3';
48 66
49use Scalar::Util (); 67use Scalar::Util ();
50 68
51use AnyEvent; 69use AnyEvent;
52use AnyEvent::Handle; 70use AnyEvent::Handle;
71use AnyEvent::Util ();
53 72
54sub touc($) { 73sub touc($) {
55 local $_ = shift; 74 local $_ = shift;
56 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime)(?:_|$))/\U$1/; 75 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
57 s/(?:^|_)(.)/\U$1/g; 76 s/(?:^|_)(.)/\U$1/g;
58 $_ 77 $_
59} 78}
60 79
61sub tolc($) { 80sub tolc($) {
62 local $_ = shift; 81 local $_ = shift;
63 1 while s/(SVK|CHK|URI|FCP|DS|MIME)([^_])/$1\_$2/i; 82 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/i;
64 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME)/$1\_$2/i; 83 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/i;
65 s/(?<=[a-z])(?=[A-Z])/_/g; 84 s/(?<=[a-z])(?=[A-Z])/_/g;
66 lc 85 lc
67} 86}
68 87
69=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 88=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name]
72127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>). 91127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
73 92
74If no C<name> was specified, then AnyEvent::FCP will generate a 93If no C<name> was specified, then AnyEvent::FCP will generate a
75(hopefully) unique client name for you. 94(hopefully) unique client name for you.
76 95
77=cut
78
79#TODO
80#You can install a progress callback that is being called with the AnyEvent::FCP 96You can install a progress callback that is being called with the AnyEvent::FCP
81#object, a txn object, the type of the transaction and the attributes. Use 97object, the type, a hashref with key-value pairs and a reference to any received data,
82#it like this: 98for all unsolicited messages.
83# 99
100Example:
101
84# sub progress_cb { 102 sub progress_cb {
85# my ($self, $txn, $type, $attr) = @_; 103 my ($self, $type, $kv, $rdata) = @_;
86# 104
87# warn "progress<$txn,$type," . (join ":", %$attr) . ">\n"; 105 if ($type eq "simple_progress") {
106 warn "$kv->{identifier} $kv->{succeeded}/$kv->{required}\n";
107 }
88# } 108 }
109
110=cut
89 111
90sub new { 112sub new {
91 my $class = shift; 113 my $class = shift;
92 my $self = bless { @_ }, $class; 114 my $self = bless { @_ }, $class;
93 115
94 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 116 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1";
95 $self->{port} ||= $ENV{FREDPORT} || 9481; 117 $self->{port} ||= $ENV{FREDPORT} || 9481;
96 $self->{name} ||= time.rand.rand.rand; # lame 118 $self->{name} ||= time.rand.rand.rand; # lame
97 $self->{timeout} ||= 600; 119 $self->{timeout} ||= 3600*2;
120 $self->{progress} ||= sub { };
98 121
99 $self->{id} = "a0"; 122 $self->{id} = "a0";
100 123
101 { 124 {
102 Scalar::Util::weaken (my $self = $self); 125 Scalar::Util::weaken (my $self = $self);
103 126
104 $self->{hdl} = new AnyEvent::Handle 127 $self->{hdl} = new AnyEvent::Handle
105 connect => [$self->{host} => $self->{port}], 128 connect => [$self->{host} => $self->{port}],
106 timeout => $self->{timeout}, 129 timeout => $self->{timeout},
107 on_error => sub { 130 on_error => sub {
108 warn "<@_>\n"; 131 warn "@_\n";#d#
109 exit 1; 132 exit 1;
110 }, 133 },
111 on_read => sub { $self->on_read (@_) }, 134 on_read => sub { $self->on_read (@_) },
112 on_eof => $self->{on_eof} || sub { }; 135 on_eof => $self->{on_eof} || sub { };
113 136
121 ); 144 );
122 145
123 $self 146 $self
124} 147}
125 148
126#sub progress {
127# my ($self, $txn, $type, $attr) = @_;
128#
129# $self->{progress}->($self, $txn, $type, $attr)
130# if $self->{progress};
131#}
132
133sub send_msg { 149sub send_msg {
134 my ($self, $type, %kv) = @_; 150 my ($self, $type, %kv) = @_;
135 151
136 my $data = delete $kv{data}; 152 my $data = delete $kv{data};
137 153
138 if (exists $kv{id_cb}) { 154 if (exists $kv{id_cb}) {
139 my $id = $kv{identifier} || ++$self->{id}; 155 my $id = $kv{identifier} ||= ++$self->{id};
140 $self->{id}{$id} = delete $kv{id_cb}; 156 $self->{id}{$id} = delete $kv{id_cb};
141 $kv{identifier} = $id;
142 } 157 }
143 158
144 my $msg = (touc $type) . "\012" 159 my $msg = (touc $type) . "\012"
145 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; 160 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
146 161
158 } 173 }
159 174
160 $self->{hdl}->push_write ($msg); 175 $self->{hdl}->push_write ($msg);
161} 176}
162 177
178sub on {
179 my ($self, $cb) = @_;
180
181 # cb return undef - message eaten, remove cb
182 # cb return 0 - message eaten
183 # cb return 1 - pass to next
184
185 push @{ $self->{on} }, $cb;
186}
187
188sub _push_queue {
189 my ($self, $queue) = @_;
190
191 shift @$queue;
192 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
193 if @$queue;
194}
195
196# lock so only one $type (arbitrary string) is in flight,
197# to work around horribly misdesigned protocol.
198sub serialise {
199 my ($self, $type, $cb) = @_;
200
201 my $queue = $self->{serialise}{$type} ||= [];
202 push @$queue, $cb;
203 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
204 unless $#$queue;
205}
206
163sub on_read { 207sub on_read {
164 my ($self) = @_; 208 my ($self) = @_;
165 209
166 my $type; 210 my $type;
167 my %kv; 211 my %kv;
168 my $rdata; 212 my $rdata;
169 213
170 my $done_cb = sub { 214 my $done_cb = sub {
171 $kv{pkt_type} = $type; 215 $kv{pkt_type} = $type;
216
217 my $on = $self->{on};
218 for (0 .. $#$on) {
219 unless (my $res = $on->[$_]($self, $type, \%kv, $rdata)) {
220 splice @$on, $_, 1 unless defined $res;
221 return;
222 }
223 }
172 224
173 if (my $cb = $self->{queue}[0]) { 225 if (my $cb = $self->{queue}[0]) {
174 $cb->($self, $type, \%kv, $rdata) 226 $cb->($self, $type, \%kv, $rdata)
175 and shift @{ $self->{queue} }; 227 and shift @{ $self->{queue} };
176 } else { 228 } else {
221 $self->{node_hello} = $kv; 273 $self->{node_hello} = $kv;
222 } elsif (exists $self->{id}{$kv->{identifier}}) { 274 } elsif (exists $self->{id}{$kv->{identifier}}) {
223 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata) 275 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
224 and delete $self->{id}{$kv->{identifier}}; 276 and delete $self->{id}{$kv->{identifier}};
225 } else { 277 } else {
226 # on_warn 278 &{ $self->{progress} };
227 #warn "protocol warning (unexpected $type message)\n";
228 } 279 }
229} 280}
230 281
231sub _txn { 282sub _txn {
232 my ($name, $sub) = @_; 283 my ($name, $sub) = @_;
316=cut 367=cut
317 368
318_txn list_persistent_requests => sub { 369_txn list_persistent_requests => sub {
319 my ($self, $cv) = @_; 370 my ($self, $cv) = @_;
320 371
372 $self->serialise (list_persistent_requests => sub {
373 my ($self, $guard) = @_;
374
321 my %res; 375 my %res;
322 376
323 $self->send_msg ("list_persistent_requests"); 377 $self->send_msg ("list_persistent_requests");
324 378
325 push @{ $self->{queue} }, sub { 379 $self->on (sub {
326 my ($self, $type, $kv, $rdata) = @_; 380 my ($self, $type, $kv, $rdata) = @_;
327 381
382 $guard if 0;
383
328 if ($type eq "end_list_persistent_requests") { 384 if ($type eq "end_list_persistent_requests") {
329 $cv->(\%res); 385 $cv->(\%res);
386 return;
387 } else {
388 my $id = $kv->{identifier};
389
390 if ($type =~ /^persistent_(get|put|put_dir)$/) {
391 $res{$id} = {
392 type => $1,
393 %{ $res{$id} },
394 %$kv,
395 };
396 } elsif ($type eq "simple_progress") {
397 delete $kv->{pkt_type}; # save memory
398 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
399 } else {
400 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
401 }
402 }
403
330 1 404 1
331 } else {
332 my $id = $kv->{identifier};
333
334 if ($type =~ /^persistent_(get|put|put_dir)$/) {
335 $res{$id} = {
336 type => $1,
337 %{ $res{$id} },
338 %$kv,
339 };
340 } elsif ($type eq "simple_progress") {
341 delete $kv->{pkt_type}; # save memory
342 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
343 } else {
344 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
345 }
346 0
347 } 405 });
348 }; 406 });
349}; 407};
350 408
351=item $cv = $fcp->remove_request ($global, $identifier) 409=item $cv = $fcp->remove_request ($global, $identifier)
352 410
353=item $status = $fcp->remove_request_sync ($global, $identifier) 411=item $status = $fcp->remove_request_sync ($global, $identifier)
437 495
438 $cv->($kv); 496 $cv->($kv);
439 1 497 1
440 }, 498 },
441 ); 499 );
500};
501
502=item $cv = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write)
503
504=item ($can_read, $can_write) = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write))
505
506The DDA test in FCP is probably the single most broken protocol - only
507one directory test can be outstanding at any time, and some guessing and
508heuristics are involved in mangling the paths.
509
510This function combines C<TestDDARequest> and C<TestDDAResponse> in one
511request, handling file reading and writing as well.
512
513=cut
514
515_txn test_dda => sub {
516 my ($self, $cv, $local, $remote, $want_read, $want_write) = @_;
517
518 $self->serialise (test_dda => sub {
519 my ($self, $guard) = @_;
520
521 $self->send_msg (test_dda_request =>
522 directory => $remote,
523 want_read_directory => $want_read ? "true" : "false",
524 want_write_directory => $want_write ? "true" : "false",
525 );
526 $self->on (sub {
527 my ($self, $type, $kv) = @_;
528
529 if ($type eq "test_dda_reply") {
530 # the filenames are all relative to the server-side directory,
531 # which might or might not match $remote anymore, so we
532 # need to rewrite the paths to be relative to $local
533 for my $k (qw(read_filename write_filename)) {
534 my $f = $kv->{$k};
535 for my $dir ($kv->{directory}, $remote) {
536 if ($dir eq substr $f, 0, length $dir) {
537 substr $f, 0, 1 + length $dir, "";
538 $kv->{$k} = $f;
539 last;
540 }
541 }
542 }
543
544 my %response = (directory => $remote);
545
546 if (length $kv->{read_filename}) {
547 warn "$local/$kv->{read_filename}";#d#
548 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
549 sysread $fh, my $buf, -s $fh;
550 $response{read_content} = $buf;
551 }
552 }
553
554 if (length $kv->{write_filename}) {
555 if (open my $fh, ">:raw", "$local/$kv->{write_filename}") {
556 syswrite $fh, $kv->{content_to_write};
557 }
558 }
559
560 $self->send_msg (test_dda_response => %response);
561
562 $self->on (sub {
563 my ($self, $type, $kv) = @_;
564
565 $guard if 0; # reference
566
567 if ($type eq "test_dda_complete") {
568 $cv->(
569 $kv->{read_directory_allowed} eq "true",
570 $kv->{write_directory_allowed} eq "true",
571 );
572 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
573 $cv->croak ($kv->{extra_description});
574 return;
575 }
576
577 1
578 });
579
580 return;
581 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
582 $cv->croak ($kv->{extra_description});
583 return;
584 }
585
586 1
587 });
588 });
442}; 589};
443 590
444=back 591=back
445 592
446=head1 EXAMPLE PROGRAM 593=head1 EXAMPLE PROGRAM

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines