ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.1 by root, Sat Jul 18 05:57:59 2009 UTC vs.
Revision 1.10 by root, Tue Aug 4 00:50:25 2015 UTC

2 2
3AnyEvent::FCP - freenet client protocol 2.0 3AnyEvent::FCP - freenet client protocol 2.0
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::FCP; 7 use AnyEvent::FCP;
8 8
9 my $fcp = new AnyEvent::FCP; 9 my $fcp = new AnyEvent::FCP;
10 10
11 my $ni = $fcp->txn_node_info->result; 11 # transactions return condvars
12 my $ni = $fcp->node_info; 12 my $lp_cv = $fcp->list_peers;
13 my $pr_cv = $fcp->list_persistent_requests;
14
15 my $peers = $lp_cv->recv;
16 my $reqs = $pr_cv->recv;
13 17
14=head1 DESCRIPTION 18=head1 DESCRIPTION
15 19
16This module implements the freenet client protocol version 2.0, as used by 20This module implements the freenet client protocol version 2.0, as used by
17freenet 0.7. See L<Net::FCP> for the earlier freenet 0.5 version. 21freenet 0.7. See L<Net::FCP> for the earlier freenet 0.5 version.
18 22
19See L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0> for a description 23See L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0> for a
20of what the messages do. 24description of what the messages do.
21 25
22The module uses L<AnyEvent> to find a suitable event module. 26The module uses L<AnyEvent> to find a suitable event module.
23 27
28Only very little is implemented, ask if you need more, and look at the
29example program later in this section.
30
31=head2 EXAMPLE
32
33This example fetches the download list and sets the priority of all files
34with "a" in their name to "emergency":
35
36 use AnyEvent::FCP;
37
38 my $fcp = new AnyEvent::FCP;
39
40 $fcp->watch_global_sync (1, 0);
41 my $req = $fcp->list_persistent_requests_sync;
42
43 for my $req (values %$req) {
44 if ($req->{filename} =~ /a/) {
45 $fcp->modify_persistent_request_sync (1, $req->{identifier}, undef, 0);
46 }
47 }
48
24=head2 IMPORT TAGS 49=head2 IMPORT TAGS
25 50
26Nothing much can be "imported" from this module right now. 51Nothing much can be "imported" from this module right now.
27 52
28=head2 FREENET BASICS 53=head2 THE AnyEvent::FCP CLASS
29
30Ok, this section will not explain any freenet basics to you, just some
31problems I found that you might want to avoid:
32 54
33=over 4 55=over 4
34 56
35=item freenet URIs are _NOT_ URIs
36
37Whenever a "uri" is required by the protocol, freenet expects a kind of
38URI prefixed with the "freenet:" scheme, e.g. "freenet:CHK...". However,
39these are not URIs, as freeent fails to parse them correctly, that is, you
40must unescape an escaped characters ("%2c" => ",") yourself. Maybe in the
41future this library will do it for you, so watch out for this incompatible
42change.
43
44=back
45
46=head2 THE AnyEvent::FCP CLASS
47
48=over 4
49
50=cut 57=cut
51 58
52package AnyEvent::FCP; 59package AnyEvent::FCP;
53 60
61use common::sense;
62
54use Carp; 63use Carp;
55 64
56$VERSION = '0.1'; 65our $VERSION = '0.3';
57 66
58no warnings; 67use Scalar::Util ();
59 68
60use AnyEvent; 69use AnyEvent;
61use AnyEvent::Socket; 70use AnyEvent::Handle;
71use AnyEvent::Util ();
62 72
63sub touc($) { 73sub touc($) {
64 local $_ = shift; 74 local $_ = shift;
65 1 while s/((?:^|_)(?:svk|chk|uri)(?:_|$))/\U$1/; 75 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
66 s/(?:^|_)(.)/\U$1/g; 76 s/(?:^|_)(.)/\U$1/g;
67 $_ 77 $_
68} 78}
69 79
70sub tolc($) { 80sub tolc($) {
71 local $_ = shift; 81 local $_ = shift;
72 1 while s/(SVK|CHK|URI)([^_])/$1\_$2/i; 82 1 while s/(SVK|CHK|URI|FCP|DS|MIME|DDA)([^_])/$1\_$2/i;
73 1 while s/([^_])(SVK|CHK|URI)/$1\_$2/i; 83 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME|DDA)/$1\_$2/i;
74 s/(?<=[a-z])(?=[A-Z])/_/g; 84 s/(?<=[a-z])(?=[A-Z])/_/g;
75 lc 85 lc
76} 86}
77 87
78=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 88=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name]
79 89
80Create a new FCP connection to the given host and port (default 90Create a new FCP connection to the given host and port (default
81127.0.0.1:8481, or the environment variables C<FREDHOST> and C<FREDPORT>). 91127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
82 92
83If no C<name> was specified, then AnyEvent::FCP will generate a (hopefully) 93If no C<name> was specified, then AnyEvent::FCP will generate a
84unique client name for you. 94(hopefully) unique client name for you.
85 95
86#TODO
87#You can install a progress callback that is being called with the Net::FCP 96You can install a progress callback that is being called with the AnyEvent::FCP
88#object, a txn object, the type of the transaction and the attributes. Use 97object, the type, a hashref with key-value pairs and a reference to any received data,
89#it like this: 98for all unsolicited messages.
90# 99
100Example:
101
91# sub progress_cb { 102 sub progress_cb {
92# my ($self, $txn, $type, $attr) = @_; 103 my ($self, $type, $kv, $rdata) = @_;
93# 104
94# warn "progress<$txn,$type," . (join ":", %$attr) . ">\n"; 105 if ($type eq "simple_progress") {
106 warn "$kv->{identifier} $kv->{succeeded}/$kv->{required}\n";
107 }
95# } 108 }
96 109
97=cut 110=cut
98 111
99sub new { 112sub new {
100 my $class = shift; 113 my $class = shift;
101 my $self = bless { @_ }, $class; 114 my $self = bless { @_ }, $class;
102 115
103 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 116 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1";
104 $self->{port} ||= $ENV{FREDPORT} || 9481; 117 $self->{port} ||= $ENV{FREDPORT} || 9481;
105 $self->{name} ||= time.rand.rand.rand; # lame 118 $self->{name} ||= time.rand.rand.rand; # lame
119 $self->{timeout} ||= 3600*2;
120 $self->{progress} ||= sub { };
106 121
122 $self->{id} = "a0";
123
124 {
125 Scalar::Util::weaken (my $self = $self);
126
107 $self->{conn} = new AnyEvent::Socket 127 $self->{hdl} = new AnyEvent::Handle
108 PeerAddr => "$self->{host}:$self->{port}", 128 connect => [$self->{host} => $self->{port}],
129 timeout => $self->{timeout},
130 on_error => sub {
131 warn "@_\n";#d#
132 exit 1;
133 },
134 on_read => sub { $self->on_read (@_) },
109 on_eof => $self->{on_eof} || sub { }, 135 on_eof => $self->{on_eof} || sub { };
136
137 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
138 }
139
140 $self->send_msg (
141 client_hello =>
142 name => $self->{name},
143 expected_version => "2.0",
144 );
110 145
111 $self 146 $self
112} 147}
113 148
114sub progress { 149sub send_msg {
115 my ($self, $txn, $type, $attr) = @_;
116
117 $self->{progress}->($self, $txn, $type, $attr)
118 if $self->{progress};
119}
120
121=item $txn = $fcp->txn (type => attr => val,...)
122
123The low-level interface to transactions. Don't use it unless you have
124"special needs". Instead, use predefiend transactions like this:
125
126The blocking case, no (visible) transactions involved:
127
128 my $nodehello = $fcp->client_hello;
129
130A transaction used in a blocking fashion:
131
132 my $txn = $fcp->txn_client_hello;
133 ...
134 my $nodehello = $txn->result;
135
136Or shorter:
137
138 my $nodehello = $fcp->txn_client_hello->result;
139
140Setting callbacks:
141
142 $fcp->txn_client_hello->cb(
143 sub { my $nodehello => $_[0]->result }
144 );
145
146=cut
147
148sub txn {
149 my ($self, $type, %attr) = @_; 150 my ($self, $type, %kv) = @_;
150 151
151 $type = touc $type; 152 my $data = delete $kv{data};
152 153
153 my $txn = "Net::FCP::Txn::$type"->new (fcp => $self, type => tolc $type, attr => \%attr); 154 if (exists $kv{id_cb}) {
154 155 my $id = $kv{identifier} ||= ++$self->{id};
155 $txn; 156 $self->{id}{$id} = delete $kv{id_cb};
156}
157
158{ # transactions
159
160my $txn = sub {
161 my ($name, $sub) = @_;
162 *{"txn_$name"} = $sub;
163 *{$name} = sub { $sub->(@_)->result };
164};
165
166=item $txn = $fcp->txn_client_hello
167
168=item $nodehello = $fcp->client_hello
169
170Executes a ClientHello request and returns it's results.
171
172 {
173 max_file_size => "5f5e100",
174 node => "Fred,0.6,1.46,7050"
175 protocol => "1.2",
176 } 157 }
177 158
178=cut 159 my $msg = (touc $type) . "\012"
160 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
179 161
180$txn->(client_hello => sub { 162 sub id {
163 my ($self) = @_;
164
165
166 }
167
168 if (defined $data) {
169 $msg .= "DataLength=" . (length $data) . "\012"
170 . "Data\012$data";
171 } else {
172 $msg .= "EndMessage\012";
173 }
174
175 $self->{hdl}->push_write ($msg);
176}
177
178sub on {
179 my ($self, $cb) = @_;
180
181 # cb return undef - message eaten, remove cb
182 # cb return 0 - message eaten
183 # cb return 1 - pass to next
184
185 push @{ $self->{on} }, $cb;
186}
187
188sub _push_queue {
189 my ($self, $queue) = @_;
190
191 shift @$queue;
192 $queue->[0]($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
193 if @$queue;
194}
195
196# lock so only one $type (arbitrary string) is in flight,
197# to work around horribly misdesigned protocol.
198sub serialise {
199 my ($self, $type, $cb) = @_;
200
201 my $queue = $self->{serialise}{$type} ||= [];
202 push @$queue, $cb;
203 $cb->($self, AnyEvent::Util::guard { $self->_push_queue ($queue) })
204 unless $#$queue;
205}
206
207sub on_read {
181 my ($self) = @_; 208 my ($self) = @_;
182 209
183 $self->txn ("client_hello"); 210 my $type;
184}); 211 my %kv;
212 my $rdata;
185 213
186=item $txn = $fcp->txn_client_info 214 my $done_cb = sub {
215 $kv{pkt_type} = $type;
187 216
188=item $nodeinfo = $fcp->client_info 217 my $on = $self->{on};
189 218 for (0 .. $#$on) {
190Executes a ClientInfo request and returns it's results. 219 unless (my $res = $on->[$_]($self, $type, \%kv, $rdata)) {
191 220 splice @$on, $_, 1 unless defined $res;
192 {
193 active_jobs => "1f",
194 allocated_memory => "bde0000",
195 architecture => "i386",
196 available_threads => 17,
197 datastore_free => "5ce03400",
198 datastore_max => "2540be400",
199 datastore_used => "1f72bb000",
200 estimated_load => 52,
201 free_memory => "5cc0148",
202 is_transient => "false",
203 java_name => "Java HotSpot(_T_M) Server VM",
204 java_vendor => "http://www.blackdown.org/",
205 java_version => "Blackdown-1.4.1-01",
206 least_recent_timestamp => "f41538b878",
207 max_file_size => "5f5e100",
208 most_recent_timestamp => "f77e2cc520"
209 node_address => "1.2.3.4",
210 node_port => 369,
211 operating_system => "Linux",
212 operating_system_version => "2.4.20",
213 routing_time => "a5",
214 }
215
216=cut
217
218$txn->(client_info => sub {
219 my ($self) = @_;
220
221 $self->txn ("client_info");
222});
223
224=item $txn = $fcp->txn_generate_chk ($metadata, $data[, $cipher])
225
226=item $uri = $fcp->generate_chk ($metadata, $data[, $cipher])
227
228Calculates a CHK, given the metadata and data. C<$cipher> is either
229C<Rijndael> or C<Twofish>, with the latter being the default.
230
231=cut
232
233$txn->(generate_chk => sub {
234 my ($self, $metadata, $data, $cipher) = @_;
235
236 $metadata = Net::FCP::Metadata::build_metadata $metadata;
237
238 $self->txn (generate_chk =>
239 data => "$metadata$data",
240 metadata_length => xeh length $metadata,
241 cipher => $cipher || "Twofish");
242});
243
244=item $txn = $fcp->txn_generate_svk_pair
245
246=item ($public, $private, $crypto) = @{ $fcp->generate_svk_pair }
247
248Creates a new SVK pair. Returns an arrayref with the public key, the
249private key and a crypto key, which is just additional entropy.
250
251 [
252 "acLx4dux9fvvABH15Gk6~d3I-yw",
253 "cPoDkDMXDGSMM32plaPZDhJDxSs",
254 "BH7LXCov0w51-y9i~BoB3g",
255 ]
256
257A private key (for inserting) can be constructed like this:
258
259 SSK@<private_key>,<crypto_key>/<name>
260
261It can be used to insert data. The corresponding public key looks like this:
262
263 SSK@<public_key>PAgM,<crypto_key>/<name>
264
265Watch out for the C<PAgM>-part!
266
267=cut
268
269$txn->(generate_svk_pair => sub {
270 my ($self) = @_;
271
272 $self->txn ("generate_svk_pair");
273});
274
275=item $txn = $fcp->txn_invert_private_key ($private)
276
277=item $public = $fcp->invert_private_key ($private)
278
279Inverts a private key (returns the public key). C<$private> can be either
280an insert URI (must start with C<freenet:SSK@>) or a raw private key (i.e.
281the private value you get back from C<generate_svk_pair>).
282
283Returns the public key.
284
285=cut
286
287$txn->(invert_private_key => sub {
288 my ($self, $privkey) = @_;
289
290 $self->txn (invert_private_key => private => $privkey);
291});
292
293=item $txn = $fcp->txn_get_size ($uri)
294
295=item $length = $fcp->get_size ($uri)
296
297Finds and returns the size (rounded up to the nearest power of two) of the
298given document.
299
300=cut
301
302$txn->(get_size => sub {
303 my ($self, $uri) = @_;
304
305 $self->txn (get_size => URI => $uri);
306});
307
308=item $txn = $fcp->txn_client_get ($uri [, $htl = 15 [, $removelocal = 0]])
309
310=item ($metadata, $data) = @{ $fcp->client_get ($uri, $htl, $removelocal)
311
312Fetches a (small, as it should fit into memory) key content block from
313freenet. C<$meta> is a C<Net::FCP::Metadata> object or C<undef>).
314
315The C<$uri> should begin with C<freenet:>, but the scheme is currently
316added, if missing.
317
318 my ($meta, $data) = @{
319 $fcp->client_get (
320 "freenet:CHK@hdXaxkwZ9rA8-SidT0AN-bniQlgPAwI,XdCDmBuGsd-ulqbLnZ8v~w"
321 )
322 };
323
324=cut
325
326$txn->(client_get => sub {
327 my ($self, $uri, $htl, $removelocal) = @_;
328
329 $uri =~ s/^freenet://; $uri = "freenet:$uri";
330
331 $self->txn (client_get => URI => $uri, hops_to_live => xeh (defined $htl ? $htl : 15),
332 remove_local_key => $removelocal ? "true" : "false");
333});
334
335=item $txn = $fcp->txn_client_put ($uri, $metadata, $data, $htl, $removelocal)
336
337=item my $uri = $fcp->client_put ($uri, $metadata, $data, $htl, $removelocal);
338
339Insert a new key. If the client is inserting a CHK, the URI may be
340abbreviated as just CHK@. In this case, the node will calculate the
341CHK. If the key is a private SSK key, the node will calculcate the public
342key and the resulting public URI.
343
344C<$meta> can be a hash reference (same format as returned by
345C<Net::FCP::parse_metadata>) or a string.
346
347The result is an arrayref with the keys C<uri>, C<public_key> and C<private_key>.
348
349=cut
350
351$txn->(client_put => sub {
352 my ($self, $uri, $metadata, $data, $htl, $removelocal) = @_;
353
354 $metadata = Net::FCP::Metadata::build_metadata $metadata;
355 $uri =~ s/^freenet://; $uri = "freenet:$uri";
356
357 $self->txn (client_put => URI => $uri,
358 hops_to_live => xeh (defined $htl ? $htl : 15),
359 remove_local_key => $removelocal ? "true" : "false",
360 data => "$metadata$data", metadata_length => xeh length $metadata);
361});
362
363} # transactions
364
365=back
366
367=head2 THE Net::FCP::Txn CLASS
368
369All requests (or transactions) are executed in a asynchronous way. For
370each request, a C<Net::FCP::Txn> object is created (worse: a tcp
371connection is created, too).
372
373For each request there is actually a different subclass (and it's possible
374to subclass these, although of course not documented).
375
376The most interesting method is C<result>.
377
378=over 4
379
380=cut
381
382package Net::FCP::Txn;
383
384use Fcntl;
385use Socket;
386
387=item new arg => val,...
388
389Creates a new C<Net::FCP::Txn> object. Not normally used.
390
391=cut
392
393sub new {
394 my $class = shift;
395 my $self = bless { @_ }, $class;
396
397 $self->{signal} = AnyEvent->condvar;
398
399 $self->{fcp}{txn}{$self} = $self;
400
401 my $attr = "";
402 my $data = delete $self->{attr}{data};
403
404 while (my ($k, $v) = each %{$self->{attr}}) {
405 $attr .= (Net::FCP::touc $k) . "=$v\012"
406 }
407
408 if (defined $data) {
409 $attr .= sprintf "DataLength=%x\012", length $data;
410 $data = "Data\012$data";
411 } else {
412 $data = "EndMessage\012";
413 }
414
415 socket my $fh, PF_INET, SOCK_STREAM, 0
416 or Carp::croak "unable to create new tcp socket: $!";
417 binmode $fh, ":raw";
418 fcntl $fh, F_SETFL, O_NONBLOCK;
419 connect $fh, (sockaddr_in $self->{fcp}{port}, inet_aton $self->{fcp}{host});
420# and Carp::croak "FCP::txn: unable to connect to $self->{fcp}{host}:$self->{fcp}{port}: $!\n";
421
422 $self->{sbuf} =
423 "\x00\x00\x00\x02"
424 . (Net::FCP::touc $self->{type})
425 . "\012$attr$data";
426
427 #shutdown $fh, 1; # freenet buggy?, well, it's java...
428
429 $self->{fh} = $fh;
430
431 $self->{w} = AnyEvent->io (fh => $fh, poll => 'w', cb => sub { $self->fh_ready_w });
432
433 $self;
434}
435
436=item $txn = $txn->cb ($coderef)
437
438Sets a callback to be called when the request is finished. The coderef
439will be called with the txn as it's sole argument, so it has to call
440C<result> itself.
441
442Returns the txn object, useful for chaining.
443
444Example:
445
446 $fcp->txn_client_get ("freenet:CHK....")
447 ->userdata ("ehrm")
448 ->cb(sub {
449 my $data = shift->result;
450 });
451
452=cut
453
454sub cb($$) {
455 my ($self, $cb) = @_;
456 $self->{cb} = $cb;
457 $self;
458}
459
460=item $txn = $txn->userdata ([$userdata])
461
462Set user-specific data. This is useful in progress callbacks. The data can be accessed
463using C<< $txn->{userdata} >>.
464
465Returns the txn object, useful for chaining.
466
467=cut
468
469sub userdata($$) {
470 my ($self, $data) = @_;
471 $self->{userdata} = $data;
472 $self;
473}
474
475=item $txn->cancel (%attr)
476
477Cancels the operation with a C<cancel> exception and the given attributes
478(consider at least giving the attribute C<reason>).
479
480UNTESTED.
481
482=cut
483
484sub cancel {
485 my ($self, %attr) = @_;
486 $self->throw (Net::FCP::Exception->new (cancel => { %attr }));
487 $self->set_result;
488 $self->eof;
489}
490
491sub fh_ready_w {
492 my ($self) = @_;
493
494 my $len = syswrite $self->{fh}, $self->{sbuf};
495
496 if ($len > 0) {
497 substr $self->{sbuf}, 0, $len, "";
498 unless (length $self->{sbuf}) {
499 fcntl $self->{fh}, F_SETFL, 0;
500 $self->{w} = AnyEvent->io (fh => $self->{fh}, poll => 'r', cb => sub { $self->fh_ready_r });
501 }
502 } elsif (defined $len) {
503 $self->throw (Net::FCP::Exception->new (network_error => { reason => "unexpected end of file while writing" }));
504 } else {
505 $self->throw (Net::FCP::Exception->new (network_error => { reason => "$!" }));
506 }
507}
508
509sub fh_ready_r {
510 my ($self) = @_;
511
512 if (sysread $self->{fh}, $self->{buf}, 16384 + 1024, length $self->{buf}) {
513 for (;;) {
514 if ($self->{datalen}) {
515 #warn "expecting new datachunk $self->{datalen}, got ".(length $self->{buf})."\n";#d#
516 if (length $self->{buf} >= $self->{datalen}) {
517 $self->rcv_data (substr $self->{buf}, 0, delete $self->{datalen}, "");
518 } else {
519 last;
520 }
521 } elsif ($self->{buf} =~ s/^DataChunk\015?\012Length=([0-9a-fA-F]+)\015?\012Data\015?\012//) {
522 $self->{datalen} = hex $1;
523 #warn "expecting new datachunk $self->{datalen}\n";#d#
524 } elsif ($self->{buf} =~ s/^([a-zA-Z]+)\015?\012(?:(.+?)\015?\012)?EndMessage\015?\012//s) {
525 $self->rcv ($1, {
526 map { my ($a, $b) = split /=/, $_, 2; ((Net::FCP::tolc $a), $b) }
527 split /\015?\012/, $2
528 });
529 } else {
530 last; 221 return;
531 } 222 }
532 } 223 }
224
225 if (my $cb = $self->{queue}[0]) {
226 $cb->($self, $type, \%kv, $rdata)
227 and shift @{ $self->{queue} };
228 } else {
229 $self->default_recv ($type, \%kv, $rdata);
230 }
231 };
232
233 my $hdr_cb; $hdr_cb = sub {
234 if ($_[1] =~ /^([^=]+)=(.*)$/) {
235 my ($k, $v) = ($1, $2);
236 my @k = split /\./, tolc $k;
237 my $ro = \\%kv;
238
239 while (@k) {
240 my $k = shift @k;
241 if ($k =~ /^\d+$/) {
242 $ro = \$$ro->[$k];
243 } else {
244 $ro = \$$ro->{$k};
245 }
246 }
247
248 $$ro = $v;
249
250 $_[0]->push_read (line => $hdr_cb);
251 } elsif ($_[1] eq "Data") {
252 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
253 $rdata = \$_[1];
254 $done_cb->();
255 });
256 } elsif ($_[1] eq "EndMessage") {
257 $done_cb->();
258 } else {
259 die "protocol error, expected message end, got $_[1]\n";#d#
260 }
261 };
262
263 $self->{hdl}->push_read (line => sub {
264 $type = tolc $_[1];
265 $_[0]->push_read (line => $hdr_cb);
266 });
267}
268
269sub default_recv {
270 my ($self, $type, $kv, $rdata) = @_;
271
272 if ($type eq "node_hello") {
273 $self->{node_hello} = $kv;
274 } elsif (exists $self->{id}{$kv->{identifier}}) {
275 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
276 and delete $self->{id}{$kv->{identifier}};
533 } else { 277 } else {
534 $self->eof; 278 &{ $self->{progress} };
535 } 279 }
536} 280}
537 281
538sub rcv { 282sub _txn {
283 my ($name, $sub) = @_;
284
285 *{$name} = sub {
286 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
287 &$sub;
288 $cv
289 };
290
291 *{"$name\_sync"} = sub {
292 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
293 &$sub;
294 $cv->recv
295 };
296}
297
298=item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]])
299
300=item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]])
301
302=cut
303
304_txn list_peers => sub {
305 my ($self, $cv, $with_metadata, $with_volatile) = @_;
306
307 my @res;
308
309 $self->send_msg (list_peers =>
310 with_metadata => $with_metadata ? "true" : "false",
311 with_volatile => $with_volatile ? "true" : "false",
312 id_cb => sub {
313 my ($self, $type, $kv, $rdata) = @_;
314
315 if ($type eq "end_list_peers") {
316 $cv->(\@res);
317 1
318 } else {
319 push @res, $kv;
320 0
321 }
322 },
323 );
324};
325
326=item $cv = $fcp->list_peer_notes ($node_identifier)
327
328=item $notes = $fcp->list_peer_notes_sync ($node_identifier)
329
330=cut
331
332_txn list_peer_notes => sub {
333 my ($self, $cv, $node_identifier) = @_;
334
335 $self->send_msg (list_peer_notes =>
336 node_identifier => $node_identifier,
337 id_cb => sub {
338 my ($self, $type, $kv, $rdata) = @_;
339
340 $cv->($kv);
341 1
342 },
343 );
344};
345
346=item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
347
348=item $fcp->watch_global_sync ($enabled[, $verbosity_mask])
349
350=cut
351
352_txn watch_global => sub {
353 my ($self, $cv, $enabled, $verbosity_mask) = @_;
354
355 $self->send_msg (watch_global =>
356 enabled => $enabled ? "true" : "false",
357 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
358 );
359
360 $cv->();
361};
362
363=item $cv = $fcp->list_persistent_requests
364
365=item $reqs = $fcp->list_persistent_requests_sync
366
367=cut
368
369_txn list_persistent_requests => sub {
370 my ($self, $cv) = @_;
371
372 $self->serialise (list_persistent_requests => sub {
373 my ($self, $guard) = @_;
374
375 my %res;
376
377 $self->send_msg ("list_persistent_requests");
378
379 $self->on (sub {
380 my ($self, $type, $kv, $rdata) = @_;
381
382 $guard if 0;
383
384 if ($type eq "end_list_persistent_requests") {
385 $cv->(\%res);
386 return;
387 } else {
388 my $id = $kv->{identifier};
389
390 if ($type =~ /^persistent_(get|put|put_dir)$/) {
391 $res{$id} = {
392 type => $1,
393 %{ $res{$id} },
394 %$kv,
395 };
396 } elsif ($type eq "simple_progress") {
397 delete $kv->{pkt_type}; # save memory
398 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
399 } else {
400 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
401 }
402 }
403
404 1
405 });
406 });
407};
408
409=item $cv = $fcp->remove_request ($global, $identifier)
410
411=item $status = $fcp->remove_request_sync ($global, $identifier)
412
413=cut
414
415_txn remove_request => sub {
416 my ($self, $cv, $global, $identifier) = @_;
417
418 $self->send_msg (remove_request =>
419 global => $global ? "true" : "false",
420 identifier => $identifier,
421 id_cb => sub {
422 my ($self, $type, $kv, $rdata) = @_;
423
424 $cv->($kv);
425 1
426 },
427 );
428};
429
430=item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
431
432=item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
433
434=cut
435
436_txn modify_persistent_request => sub {
437 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
438
439 $self->send_msg (modify_persistent_request =>
440 global => $global ? "true" : "false",
441 defined $client_token ? (client_token => $client_token ) : (),
442 defined $priority_class ? (priority_class => $priority_class) : (),
443 identifier => $identifier,
444 id_cb => sub {
445 my ($self, $type, $kv, $rdata) = @_;
446
447 $cv->($kv);
448 1
449 },
450 );
451};
452
453=item $cv = $fcp->get_plugin_info ($name, $detailed)
454
455=item $info = $fcp->get_plugin_info_sync ($name, $detailed)
456
457=cut
458
459_txn get_plugin_info => sub {
460 my ($self, $cv, $name, $detailed) = @_;
461
462 $self->send_msg (get_plugin_info =>
463 plugin_name => $name,
464 detailed => $detailed ? "true" : "false",
465 id_cb => sub {
466 my ($self, $type, $kv, $rdata) = @_;
467
468 $cv->($kv);
469 1
470 },
471 );
472};
473
474=item $cv = $fcp->client_get ($uri, $identifier, %kv)
475
476=item $status = $fcp->client_get_sync ($uri, $identifier, %kv)
477
478%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
479
480ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
481priority_class, persistence, client_token, global, return_type,
482binary_blob, allowed_mime_types, filename, temp_filename
483
484=cut
485
486_txn client_get => sub {
487 my ($self, $cv, $uri, $identifier, %kv) = @_;
488
489 $self->send_msg (client_get =>
490 %kv,
491 uri => $uri,
492 identifier => $identifier,
493 id_cb => sub {
494 my ($self, $type, $kv, $rdata) = @_;
495
496 $cv->($kv);
497 1
498 },
499 );
500};
501
502=item $cv = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write)
503
504=item ($can_read, $can_write) = $fcp->test_dda_sync ($local_directory, $remote_directory, $want_read, $want_write))
505
506The DDA test in FCP is probably the single most broken protocol - only
507one directory test can be outstanding at any time, and some guessing and
508heuristics are involved in mangling the paths.
509
510This function combines C<TestDDARequest> and C<TestDDAResponse> in one
511request, handling file reading and writing as well.
512
513=cut
514
515_txn test_dda => sub {
516 my ($self, $cv, $local, $remote, $want_read, $want_write) = @_;
517
518 $self->serialise (test_dda => sub {
519 my ($self, $guard) = @_;
520
521 $self->send_msg (test_dda_request =>
522 directory => $remote,
523 want_read_directory => $want_read ? "true" : "false",
524 want_write_directory => $want_write ? "true" : "false",
525 );
526 $self->on (sub {
539 my ($self, $type, $attr) = @_; 527 my ($self, $type, $kv) = @_;
540 528
541 $type = Net::FCP::tolc $type; 529 if ($type eq "test_dda_reply") {
530 # the filenames are all relative to the server-side directory,
531 # which might or might not match $remote anymore, so we
532 # need to rewrite the paths to be relative to $local
533 for my $k (qw(read_filename write_filename)) {
534 my $f = $kv->{$k};
535 for my $dir ($kv->{directory}, $remote) {
536 if ($dir eq substr $f, 0, length $dir) {
537 substr $f, 0, 1 + length $dir, "";
538 $kv->{$k} = $f;
539 last;
540 }
541 }
542 }
542 543
543 #use PApp::Util; warn PApp::Util::dumpval [$type, $attr]; 544 my %response = (directory => $remote);
544 545
545 if (my $method = $self->can("rcv_$type")) { 546 if (length $kv->{read_filename}) {
546 $method->($self, $attr, $type); 547 warn "$local/$kv->{read_filename}";#d#
548 if (open my $fh, "<:raw", "$local/$kv->{read_filename}") {
549 sysread $fh, my $buf, -s $fh;
550 $response{read_content} = $buf;
551 }
552 }
553
554 if (length $kv->{write_filename}) {
555 if (open my $fh, ">:raw", "$local/$kv->{write_filename}") {
556 syswrite $fh, $kv->{content_to_write};
557 }
558 }
559
560 $self->send_msg (test_dda_response => %response);
561
562 $self->on (sub {
563 my ($self, $type, $kv) = @_;
564
565 $guard if 0; # reference
566
567 if ($type eq "test_dda_complete") {
568 $cv->(
569 $kv->{read_directory_allowed} eq "true",
570 $kv->{write_directory_allowed} eq "true",
571 );
572 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
573 $cv->croak ($kv->{extra_description});
574 return;
575 }
576
577 1
578 });
579
580 return;
581 } elsif ($type eq "protocol_error" && $kv->{identifier} eq $remote) {
582 $cv->croak ($kv->{extra_description});
583 return;
584 }
585
586 1
587 });
588 });
589};
590
591=back
592
593=head1 EXAMPLE PROGRAM
594
595 use AnyEvent::FCP;
596
597 my $fcp = new AnyEvent::FCP;
598
599 # let us look at the global request list
600 $fcp->watch_global (1, 0);
601
602 # list them, synchronously
603 my $req = $fcp->list_persistent_requests_sync;
604
605 # go through all requests
606 for my $req (values %$req) {
607 # skip jobs not directly-to-disk
608 next unless $req->{return_type} eq "disk";
609 # skip jobs not issued by FProxy
610 next unless $req->{identifier} =~ /^FProxy:/;
611
612 if ($req->{data_found}) {
613 # file has been successfully downloaded
614
615 ... move the file away
616 (left as exercise)
617
618 # remove the request
619
620 $fcp->remove_request (1, $req->{identifier});
621 } elsif ($req->{get_failed}) {
622 # request has failed
623 if ($req->{get_failed}{code} == 11) {
624 # too many path components, should restart
625 } else {
626 # other failure
627 }
547 } else { 628 } else {
548 warn "received unexpected reply type '$type' for '$self->{type}', ignoring\n"; 629 # modify priorities randomly, to improve download rates
630 $fcp->modify_persistent_request (1, $req->{identifier}, undef, int 6 - 5 * (rand) ** 1.7)
631 if 0.1 > rand;
632 }
549 } 633 }
550}
551 634
552# used as a default exception thrower 635 # see if the dummy plugin is loaded, to ensure all previous requests have finished.
553sub rcv_throw_exception { 636 $fcp->get_plugin_info_sync ("dummy");
554 my ($self, $attr, $type) = @_;
555 $self->throw (Net::FCP::Exception->new ($type, $attr));
556}
557
558*rcv_failed = \&Net::FCP::Txn::rcv_throw_exception;
559*rcv_format_error = \&Net::FCP::Txn::rcv_throw_exception;
560
561sub throw {
562 my ($self, $exc) = @_;
563
564 $self->{exception} = $exc;
565 $self->set_result;
566 $self->eof; # must be last to avoid loops
567}
568
569sub set_result {
570 my ($self, $result) = @_;
571
572 unless (exists $self->{result}) {
573 $self->{result} = $result;
574 $self->{cb}->($self) if exists $self->{cb};
575 $self->{signal}->broadcast;
576 }
577}
578
579sub eof {
580 my ($self) = @_;
581
582 delete $self->{w};
583 delete $self->{fh};
584
585 delete $self->{fcp}{txn}{$self};
586
587 unless (exists $self->{result}) {
588 $self->throw (Net::FCP::Exception->new (short_data => {
589 reason => "unexpected eof or internal node error",
590 }));
591 }
592}
593
594sub progress {
595 my ($self, $type, $attr) = @_;
596
597 $self->{fcp}->progress ($self, $type, $attr);
598}
599
600=item $result = $txn->result
601
602Waits until a result is available and then returns it.
603
604This waiting is (depending on your event model) not very efficient, as it
605is done outside the "mainloop". The biggest problem, however, is that it's
606blocking one thread of execution. Try to use the callback mechanism, if
607possible, and call result from within the callback (or after is has been
608run), as then no waiting is necessary.
609
610=cut
611
612sub result {
613 my ($self) = @_;
614
615 $self->{signal}->wait while !exists $self->{result};
616
617 die $self->{exception} if $self->{exception};
618
619 return $self->{result};
620}
621
622package Net::FCP::Txn::ClientHello;
623
624use base Net::FCP::Txn;
625
626sub rcv_node_hello {
627 my ($self, $attr) = @_;
628
629 $self->set_result ($attr);
630}
631
632package Net::FCP::Txn::ClientInfo;
633
634use base Net::FCP::Txn;
635
636sub rcv_node_info {
637 my ($self, $attr) = @_;
638
639 $self->set_result ($attr);
640}
641
642package Net::FCP::Txn::GenerateCHK;
643
644use base Net::FCP::Txn;
645
646sub rcv_success {
647 my ($self, $attr) = @_;
648
649 $self->set_result ($attr->{uri});
650}
651
652package Net::FCP::Txn::GenerateSVKPair;
653
654use base Net::FCP::Txn;
655
656sub rcv_success {
657 my ($self, $attr) = @_;
658 $self->set_result ([$attr->{public_key}, $attr->{private_key}, $attr->{crypto_key}]);
659}
660
661package Net::FCP::Txn::InvertPrivateKey;
662
663use base Net::FCP::Txn;
664
665sub rcv_success {
666 my ($self, $attr) = @_;
667 $self->set_result ($attr->{public_key});
668}
669
670package Net::FCP::Txn::GetSize;
671
672use base Net::FCP::Txn;
673
674sub rcv_success {
675 my ($self, $attr) = @_;
676 $self->set_result (hex $attr->{length});
677}
678
679package Net::FCP::Txn::GetPut;
680
681# base class for get and put
682
683use base Net::FCP::Txn;
684
685*rcv_uri_error = \&Net::FCP::Txn::rcv_throw_exception;
686*rcv_route_not_found = \&Net::FCP::Txn::rcv_throw_exception;
687
688sub rcv_restarted {
689 my ($self, $attr, $type) = @_;
690
691 delete $self->{datalength};
692 delete $self->{metalength};
693 delete $self->{data};
694
695 $self->progress ($type, $attr);
696}
697
698package Net::FCP::Txn::ClientGet;
699
700use base Net::FCP::Txn::GetPut;
701
702*rcv_data_not_found = \&Net::FCP::Txn::rcv_throw_exception;
703
704sub rcv_data {
705 my ($self, $chunk) = @_;
706
707 $self->{data} .= $chunk;
708
709 $self->progress ("data", { chunk => length $chunk, received => length $self->{data}, total => $self->{datalength} });
710
711 if ($self->{datalength} == length $self->{data}) {
712 my $data = delete $self->{data};
713 my $meta = new Net::FCP::Metadata (substr $data, 0, $self->{metalength}, "");
714
715 $self->set_result ([$meta, $data]);
716 $self->eof;
717 }
718}
719
720sub rcv_data_found {
721 my ($self, $attr, $type) = @_;
722
723 $self->progress ($type, $attr);
724
725 $self->{datalength} = hex $attr->{data_length};
726 $self->{metalength} = hex $attr->{metadata_length};
727}
728
729package Net::FCP::Txn::ClientPut;
730
731use base Net::FCP::Txn::GetPut;
732
733*rcv_size_error = \&Net::FCP::Txn::rcv_throw_exception;
734
735sub rcv_pending {
736 my ($self, $attr, $type) = @_;
737 $self->progress ($type, $attr);
738}
739
740sub rcv_success {
741 my ($self, $attr, $type) = @_;
742 $self->set_result ($attr);
743}
744
745sub rcv_key_collision {
746 my ($self, $attr, $type) = @_;
747 $self->set_result ({ key_collision => 1, %$attr });
748}
749
750=back
751
752=head2 The Net::FCP::Exception CLASS
753
754Any unexpected (non-standard) responses that make it impossible to return
755the advertised result will result in an exception being thrown when the
756C<result> method is called.
757
758These exceptions are represented by objects of this class.
759
760=over 4
761
762=cut
763
764package Net::FCP::Exception;
765
766use overload
767 '""' => sub {
768 "Net::FCP::Exception<<$_[0][0]," . (join ":", %{$_[0][1]}) . ">>";
769 };
770
771=item $exc = new Net::FCP::Exception $type, \%attr
772
773Create a new exception object of the given type (a string like
774C<route_not_found>), and a hashref containing additional attributes
775(usually the attributes of the message causing the exception).
776
777=cut
778
779sub new {
780 my ($class, $type, $attr) = @_;
781
782 bless [Net::FCP::tolc $type, { %$attr }], $class;
783}
784
785=item $exc->type([$type])
786
787With no arguments, returns the exception type. Otherwise a boolean
788indicating wether the exception is of the given type is returned.
789
790=cut
791
792sub type {
793 my ($self, $type) = @_;
794
795 @_ >= 2
796 ? $self->[0] eq $type
797 : $self->[0];
798}
799
800=item $exc->attr([$attr])
801
802With no arguments, returns the attributes. Otherwise the named attribute
803value is returned.
804
805=cut
806
807sub attr {
808 my ($self, $attr) = @_;
809
810 @_ >= 2
811 ? $self->[1]{$attr}
812 : $self->[1];
813}
814
815=back
816 637
817=head1 SEE ALSO 638=head1 SEE ALSO
818 639
819L<http://freenet.sf.net>. 640L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0>, L<Net::FCP>.
820 641
821=head1 BUGS 642=head1 BUGS
822 643
823=head1 AUTHOR 644=head1 AUTHOR
824 645

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines