ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
Revision: 1.6
Committed: Mon May 31 06:27:37 2010 UTC (13 years, 11 months ago) by root
Branch: MAIN
Changes since 1.5: +20 -24 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::FCP - freenet client protocol 2.0
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::FCP;
8
9 my $fcp = new AnyEvent::FCP;
10
11 # transactions return condvars
12 my $lp_cv = $fcp->list_peers;
13 my $pr_cv = $fcp->list_persistent_requests;
14
15 my $peers = $lp_cv->recv;
16 my $reqs = $pr_cv->recv;
17
18 =head1 DESCRIPTION
19
20 This module implements the freenet client protocol version 2.0, as used by
21 freenet 0.7. See L<Net::FCP> for the earlier freenet 0.5 version.
22
23 See L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0> for a
24 description of what the messages do.
25
26 The module uses L<AnyEvent> to find a suitable event module.
27
28 Only very little is implemented, ask if you need more, and look at the
29 example program later in this section.
30
31 =head2 IMPORT TAGS
32
33 Nothing much can be "imported" from this module right now.
34
35 =head2 THE AnyEvent::FCP CLASS
36
37 =over 4
38
39 =cut
40
41 package AnyEvent::FCP;
42
43 use common::sense;
44
45 use Carp;
46
47 our $VERSION = '0.21';
48
49 use Scalar::Util ();
50
51 use AnyEvent;
52 use AnyEvent::Handle;
53
54 sub touc($) {
55 local $_ = shift;
56 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime)(?:_|$))/\U$1/;
57 s/(?:^|_)(.)/\U$1/g;
58 $_
59 }
60
61 sub tolc($) {
62 local $_ = shift;
63 1 while s/(SVK|CHK|URI|FCP|DS|MIME)([^_])/$1\_$2/i;
64 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME)/$1\_$2/i;
65 s/(?<=[a-z])(?=[A-Z])/_/g;
66 lc
67 }
68
69 =item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name]
70
71 Create a new FCP connection to the given host and port (default
72 127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
73
74 If no C<name> was specified, then AnyEvent::FCP will generate a
75 (hopefully) unique client name for you.
76
77 You can install a progress callback that is being called with the AnyEvent::FCP
78 object, the type, a hashref with key-value pairs and a reference to any received data,
79 for all unsolicited messages.
80
81 Example:
82
83 sub progress_cb {
84 my ($self, $type, $kv, $rdata) = @_;
85
86 if ($type eq "simple_progress") {
87 warn "$kv->{identifier} $kv->{succeeded}/$kv->{required}\n";
88 }
89 }
90
91 =cut
92
93 sub new {
94 my $class = shift;
95 my $self = bless { @_ }, $class;
96
97 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1";
98 $self->{port} ||= $ENV{FREDPORT} || 9481;
99 $self->{name} ||= time.rand.rand.rand; # lame
100 $self->{timeout} ||= 600;
101 $self->{progress} ||= sub { };
102
103 $self->{id} = "a0";
104
105 {
106 Scalar::Util::weaken (my $self = $self);
107
108 $self->{hdl} = new AnyEvent::Handle
109 connect => [$self->{host} => $self->{port}],
110 timeout => $self->{timeout},
111 on_error => sub {
112 warn "<@_>\n";
113 exit 1;
114 },
115 on_read => sub { $self->on_read (@_) },
116 on_eof => $self->{on_eof} || sub { };
117
118 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
119 }
120
121 $self->send_msg (
122 client_hello =>
123 name => $self->{name},
124 expected_version => "2.0",
125 );
126
127 $self
128 }
129
130 sub send_msg {
131 my ($self, $type, %kv) = @_;
132
133 my $data = delete $kv{data};
134
135 if (exists $kv{id_cb}) {
136 my $id = $kv{identifier} || ++$self->{id};
137 $self->{id}{$id} = delete $kv{id_cb};
138 $kv{identifier} = $id;
139 }
140
141 my $msg = (touc $type) . "\012"
142 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
143
144 sub id {
145 my ($self) = @_;
146
147
148 }
149
150 if (defined $data) {
151 $msg .= "DataLength=" . (length $data) . "\012"
152 . "Data\012$data";
153 } else {
154 $msg .= "EndMessage\012";
155 }
156
157 $self->{hdl}->push_write ($msg);
158 }
159
160 sub on_read {
161 my ($self) = @_;
162
163 my $type;
164 my %kv;
165 my $rdata;
166
167 my $done_cb = sub {
168 $kv{pkt_type} = $type;
169
170 if (my $cb = $self->{queue}[0]) {
171 $cb->($self, $type, \%kv, $rdata)
172 and shift @{ $self->{queue} };
173 } else {
174 $self->default_recv ($type, \%kv, $rdata);
175 }
176 };
177
178 my $hdr_cb; $hdr_cb = sub {
179 if ($_[1] =~ /^([^=]+)=(.*)$/) {
180 my ($k, $v) = ($1, $2);
181 my @k = split /\./, tolc $k;
182 my $ro = \\%kv;
183
184 while (@k) {
185 my $k = shift @k;
186 if ($k =~ /^\d+$/) {
187 $ro = \$$ro->[$k];
188 } else {
189 $ro = \$$ro->{$k};
190 }
191 }
192
193 $$ro = $v;
194
195 $_[0]->push_read (line => $hdr_cb);
196 } elsif ($_[1] eq "Data") {
197 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
198 $rdata = \$_[1];
199 $done_cb->();
200 });
201 } elsif ($_[1] eq "EndMessage") {
202 $done_cb->();
203 } else {
204 die "protocol error, expected message end, got $_[1]\n";#d#
205 }
206 };
207
208 $self->{hdl}->push_read (line => sub {
209 $type = tolc $_[1];
210 $_[0]->push_read (line => $hdr_cb);
211 });
212 }
213
214 sub default_recv {
215 my ($self, $type, $kv, $rdata) = @_;
216
217 if ($type eq "node_hello") {
218 $self->{node_hello} = $kv;
219 } elsif (exists $self->{id}{$kv->{identifier}}) {
220 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
221 and delete $self->{id}{$kv->{identifier}};
222 } else {
223 &{ $self->{progress} };
224 }
225 }
226
227 sub _txn {
228 my ($name, $sub) = @_;
229
230 *{$name} = sub {
231 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
232 &$sub;
233 $cv
234 };
235
236 *{"$name\_sync"} = sub {
237 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
238 &$sub;
239 $cv->recv
240 };
241 }
242
243 =item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]])
244
245 =item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]])
246
247 =cut
248
249 _txn list_peers => sub {
250 my ($self, $cv, $with_metadata, $with_volatile) = @_;
251
252 my @res;
253
254 $self->send_msg (list_peers =>
255 with_metadata => $with_metadata ? "true" : "false",
256 with_volatile => $with_volatile ? "true" : "false",
257 id_cb => sub {
258 my ($self, $type, $kv, $rdata) = @_;
259
260 if ($type eq "end_list_peers") {
261 $cv->(\@res);
262 1
263 } else {
264 push @res, $kv;
265 0
266 }
267 },
268 );
269 };
270
271 =item $cv = $fcp->list_peer_notes ($node_identifier)
272
273 =item $notes = $fcp->list_peer_notes_sync ($node_identifier)
274
275 =cut
276
277 _txn list_peer_notes => sub {
278 my ($self, $cv, $node_identifier) = @_;
279
280 $self->send_msg (list_peer_notes =>
281 node_identifier => $node_identifier,
282 id_cb => sub {
283 my ($self, $type, $kv, $rdata) = @_;
284
285 $cv->($kv);
286 1
287 },
288 );
289 };
290
291 =item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
292
293 =item $fcp->watch_global_sync ($enabled[, $verbosity_mask])
294
295 =cut
296
297 _txn watch_global => sub {
298 my ($self, $cv, $enabled, $verbosity_mask) = @_;
299
300 $self->send_msg (watch_global =>
301 enabled => $enabled ? "true" : "false",
302 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
303 );
304
305 $cv->();
306 };
307
308 =item $cv = $fcp->list_persistent_requests
309
310 =item $reqs = $fcp->list_persistent_requests_sync
311
312 =cut
313
314 _txn list_persistent_requests => sub {
315 my ($self, $cv) = @_;
316
317 my %res;
318
319 $self->send_msg ("list_persistent_requests");
320
321 push @{ $self->{queue} }, sub {
322 my ($self, $type, $kv, $rdata) = @_;
323
324 if ($type eq "end_list_persistent_requests") {
325 $cv->(\%res);
326 1
327 } else {
328 my $id = $kv->{identifier};
329
330 if ($type =~ /^persistent_(get|put|put_dir)$/) {
331 $res{$id} = {
332 type => $1,
333 %{ $res{$id} },
334 %$kv,
335 };
336 } elsif ($type eq "simple_progress") {
337 delete $kv->{pkt_type}; # save memory
338 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
339 } else {
340 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
341 }
342 0
343 }
344 };
345 };
346
347 =item $cv = $fcp->remove_request ($global, $identifier)
348
349 =item $status = $fcp->remove_request_sync ($global, $identifier)
350
351 =cut
352
353 _txn remove_request => sub {
354 my ($self, $cv, $global, $identifier) = @_;
355
356 $self->send_msg (remove_request =>
357 global => $global ? "true" : "false",
358 identifier => $identifier,
359 id_cb => sub {
360 my ($self, $type, $kv, $rdata) = @_;
361
362 $cv->($kv);
363 1
364 },
365 );
366 };
367
368 =item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
369
370 =item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
371
372 =cut
373
374 _txn modify_persistent_request => sub {
375 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
376
377 $self->send_msg (modify_persistent_request =>
378 global => $global ? "true" : "false",
379 defined $client_token ? (client_token => $client_token ) : (),
380 defined $priority_class ? (priority_class => $priority_class) : (),
381 identifier => $identifier,
382 id_cb => sub {
383 my ($self, $type, $kv, $rdata) = @_;
384
385 $cv->($kv);
386 1
387 },
388 );
389 };
390
391 =item $cv = $fcp->get_plugin_info ($name, $detailed)
392
393 =item $info = $fcp->get_plugin_info_sync ($name, $detailed)
394
395 =cut
396
397 _txn get_plugin_info => sub {
398 my ($self, $cv, $name, $detailed) = @_;
399
400 $self->send_msg (get_plugin_info =>
401 plugin_name => $name,
402 detailed => $detailed ? "true" : "false",
403 id_cb => sub {
404 my ($self, $type, $kv, $rdata) = @_;
405
406 $cv->($kv);
407 1
408 },
409 );
410 };
411
412 =item $cv = $fcp->client_get ($uri, $identifier, %kv)
413
414 =item $status = $fcp->client_get_sync ($uri, $identifier, %kv)
415
416 %kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
417
418 ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
419 priority_class, persistence, client_token, global, return_type,
420 binary_blob, allowed_mime_types, filename, temp_filename
421
422 =cut
423
424 _txn client_get => sub {
425 my ($self, $cv, $uri, $identifier, %kv) = @_;
426
427 $self->send_msg (client_get =>
428 %kv,
429 uri => $uri,
430 identifier => $identifier,
431 id_cb => sub {
432 my ($self, $type, $kv, $rdata) = @_;
433
434 $cv->($kv);
435 1
436 },
437 );
438 };
439
440 =back
441
442 =head1 EXAMPLE PROGRAM
443
444 use AnyEvent::FCP;
445
446 my $fcp = new AnyEvent::FCP;
447
448 # let us look at the global request list
449 $fcp->watch_global (1, 0);
450
451 # list them, synchronously
452 my $req = $fcp->list_persistent_requests_sync;
453
454 # go through all requests
455 for my $req (values %$req) {
456 # skip jobs not directly-to-disk
457 next unless $req->{return_type} eq "disk";
458 # skip jobs not issued by FProxy
459 next unless $req->{identifier} =~ /^FProxy:/;
460
461 if ($req->{data_found}) {
462 # file has been successfully downloaded
463
464 ... move the file away
465 (left as exercise)
466
467 # remove the request
468
469 $fcp->remove_request (1, $req->{identifier});
470 } elsif ($req->{get_failed}) {
471 # request has failed
472 if ($req->{get_failed}{code} == 11) {
473 # too many path components, should restart
474 } else {
475 # other failure
476 }
477 } else {
478 # modify priorities randomly, to improve download rates
479 $fcp->modify_persistent_request (1, $req->{identifier}, undef, int 6 - 5 * (rand) ** 1.7)
480 if 0.1 > rand;
481 }
482 }
483
484 # see if the dummy plugin is loaded, to ensure all previous requests have finished.
485 $fcp->get_plugin_info_sync ("dummy");
486
487 =head1 SEE ALSO
488
489 L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0>, L<Net::FCP>.
490
491 =head1 BUGS
492
493 =head1 AUTHOR
494
495 Marc Lehmann <schmorp@schmorp.de>
496 http://home.schmorp.de/
497
498 =cut
499
500 1
501