ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.1 by root, Sat Jul 18 05:57:59 2009 UTC vs.
Revision 1.7 by root, Sat Jun 5 14:49:25 2010 UTC

2 2
3AnyEvent::FCP - freenet client protocol 2.0 3AnyEvent::FCP - freenet client protocol 2.0
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::FCP; 7 use AnyEvent::FCP;
8 8
9 my $fcp = new AnyEvent::FCP; 9 my $fcp = new AnyEvent::FCP;
10 10
11 my $ni = $fcp->txn_node_info->result; 11 # transactions return condvars
12 my $ni = $fcp->node_info; 12 my $lp_cv = $fcp->list_peers;
13 my $pr_cv = $fcp->list_persistent_requests;
14
15 my $peers = $lp_cv->recv;
16 my $reqs = $pr_cv->recv;
13 17
14=head1 DESCRIPTION 18=head1 DESCRIPTION
15 19
16This module implements the freenet client protocol version 2.0, as used by 20This module implements the freenet client protocol version 2.0, as used by
17freenet 0.7. See L<Net::FCP> for the earlier freenet 0.5 version. 21freenet 0.7. See L<Net::FCP> for the earlier freenet 0.5 version.
18 22
19See L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0> for a description 23See L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0> for a
20of what the messages do. 24description of what the messages do.
21 25
22The module uses L<AnyEvent> to find a suitable event module. 26The module uses L<AnyEvent> to find a suitable event module.
23 27
28Only very little is implemented, ask if you need more, and look at the
29example program later in this section.
30
31=head2 EXAMPLE
32
33This example fetches the download list and sets the priority of all files
34with "a" in their name to "emergency":
35
36 use AnyEvent::FCP;
37
38 my $fcp = new AnyEvent::FCP;
39
40 $fcp->watch_global_sync (1, 0);
41 my $req = $fcp->list_persistent_requests_sync;
42
43 for my $req (values %$req) {
44 if ($req->{filename} =~ /a/) {
45 $fcp->modify_persistent_request_sync (1, $req->{identifier}, undef, 0);
46 }
47 }
48
24=head2 IMPORT TAGS 49=head2 IMPORT TAGS
25 50
26Nothing much can be "imported" from this module right now. 51Nothing much can be "imported" from this module right now.
27 52
28=head2 FREENET BASICS 53=head2 THE AnyEvent::FCP CLASS
29
30Ok, this section will not explain any freenet basics to you, just some
31problems I found that you might want to avoid:
32 54
33=over 4 55=over 4
34 56
35=item freenet URIs are _NOT_ URIs
36
37Whenever a "uri" is required by the protocol, freenet expects a kind of
38URI prefixed with the "freenet:" scheme, e.g. "freenet:CHK...". However,
39these are not URIs, as freeent fails to parse them correctly, that is, you
40must unescape an escaped characters ("%2c" => ",") yourself. Maybe in the
41future this library will do it for you, so watch out for this incompatible
42change.
43
44=back
45
46=head2 THE AnyEvent::FCP CLASS
47
48=over 4
49
50=cut 57=cut
51 58
52package AnyEvent::FCP; 59package AnyEvent::FCP;
53 60
61use common::sense;
62
54use Carp; 63use Carp;
55 64
56$VERSION = '0.1'; 65our $VERSION = '0.3';
57 66
58no warnings; 67use Scalar::Util ();
59 68
60use AnyEvent; 69use AnyEvent;
61use AnyEvent::Socket; 70use AnyEvent::Handle;
62 71
63sub touc($) { 72sub touc($) {
64 local $_ = shift; 73 local $_ = shift;
65 1 while s/((?:^|_)(?:svk|chk|uri)(?:_|$))/\U$1/; 74 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime)(?:_|$))/\U$1/;
66 s/(?:^|_)(.)/\U$1/g; 75 s/(?:^|_)(.)/\U$1/g;
67 $_ 76 $_
68} 77}
69 78
70sub tolc($) { 79sub tolc($) {
71 local $_ = shift; 80 local $_ = shift;
72 1 while s/(SVK|CHK|URI)([^_])/$1\_$2/i; 81 1 while s/(SVK|CHK|URI|FCP|DS|MIME)([^_])/$1\_$2/i;
73 1 while s/([^_])(SVK|CHK|URI)/$1\_$2/i; 82 1 while s/([^_])(SVK|CHK|URI|FCP|DS|MIME)/$1\_$2/i;
74 s/(?<=[a-z])(?=[A-Z])/_/g; 83 s/(?<=[a-z])(?=[A-Z])/_/g;
75 lc 84 lc
76} 85}
77 86
78=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 87=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name]
79 88
80Create a new FCP connection to the given host and port (default 89Create a new FCP connection to the given host and port (default
81127.0.0.1:8481, or the environment variables C<FREDHOST> and C<FREDPORT>). 90127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
82 91
83If no C<name> was specified, then AnyEvent::FCP will generate a (hopefully) 92If no C<name> was specified, then AnyEvent::FCP will generate a
84unique client name for you. 93(hopefully) unique client name for you.
85 94
86#TODO
87#You can install a progress callback that is being called with the Net::FCP 95You can install a progress callback that is being called with the AnyEvent::FCP
88#object, a txn object, the type of the transaction and the attributes. Use 96object, the type, a hashref with key-value pairs and a reference to any received data,
89#it like this: 97for all unsolicited messages.
90# 98
99Example:
100
91# sub progress_cb { 101 sub progress_cb {
92# my ($self, $txn, $type, $attr) = @_; 102 my ($self, $type, $kv, $rdata) = @_;
93# 103
94# warn "progress<$txn,$type," . (join ":", %$attr) . ">\n"; 104 if ($type eq "simple_progress") {
105 warn "$kv->{identifier} $kv->{succeeded}/$kv->{required}\n";
106 }
95# } 107 }
96 108
97=cut 109=cut
98 110
99sub new { 111sub new {
100 my $class = shift; 112 my $class = shift;
101 my $self = bless { @_ }, $class; 113 my $self = bless { @_ }, $class;
102 114
103 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 115 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1";
104 $self->{port} ||= $ENV{FREDPORT} || 9481; 116 $self->{port} ||= $ENV{FREDPORT} || 9481;
105 $self->{name} ||= time.rand.rand.rand; # lame 117 $self->{name} ||= time.rand.rand.rand; # lame
118 $self->{timeout} ||= 600;
119 $self->{progress} ||= sub { };
106 120
121 $self->{id} = "a0";
122
123 {
124 Scalar::Util::weaken (my $self = $self);
125
107 $self->{conn} = new AnyEvent::Socket 126 $self->{hdl} = new AnyEvent::Handle
108 PeerAddr => "$self->{host}:$self->{port}", 127 connect => [$self->{host} => $self->{port}],
128 timeout => $self->{timeout},
129 on_error => sub {
130 warn "<@_>\n";
131 exit 1;
132 },
133 on_read => sub { $self->on_read (@_) },
109 on_eof => $self->{on_eof} || sub { }, 134 on_eof => $self->{on_eof} || sub { };
135
136 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
137 }
138
139 $self->send_msg (
140 client_hello =>
141 name => $self->{name},
142 expected_version => "2.0",
143 );
110 144
111 $self 145 $self
112} 146}
113 147
114sub progress { 148sub send_msg {
115 my ($self, $txn, $type, $attr) = @_; 149 my ($self, $type, %kv) = @_;
116 150
117 $self->{progress}->($self, $txn, $type, $attr) 151 my $data = delete $kv{data};
118 if $self->{progress};
119}
120 152
121=item $txn = $fcp->txn (type => attr => val,...) 153 if (exists $kv{id_cb}) {
122 154 my $id = $kv{identifier} || ++$self->{id};
123The low-level interface to transactions. Don't use it unless you have 155 $self->{id}{$id} = delete $kv{id_cb};
124"special needs". Instead, use predefiend transactions like this: 156 $kv{identifier} = $id;
125
126The blocking case, no (visible) transactions involved:
127
128 my $nodehello = $fcp->client_hello;
129
130A transaction used in a blocking fashion:
131 157 }
132 my $txn = $fcp->txn_client_hello;
133 ...
134 my $nodehello = $txn->result;
135 158
136Or shorter: 159 my $msg = (touc $type) . "\012"
160 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
137 161
138 my $nodehello = $fcp->txn_client_hello->result; 162 sub id {
163 my ($self) = @_;
139 164
140Setting callbacks:
141 165
142 $fcp->txn_client_hello->cb( 166 }
143 sub { my $nodehello => $_[0]->result } 167
168 if (defined $data) {
169 $msg .= "DataLength=" . (length $data) . "\012"
170 . "Data\012$data";
171 } else {
172 $msg .= "EndMessage\012";
173 }
174
175 $self->{hdl}->push_write ($msg);
176}
177
178sub on_read {
179 my ($self) = @_;
180
181 my $type;
182 my %kv;
183 my $rdata;
184
185 my $done_cb = sub {
186 $kv{pkt_type} = $type;
187
188 if (my $cb = $self->{queue}[0]) {
189 $cb->($self, $type, \%kv, $rdata)
190 and shift @{ $self->{queue} };
191 } else {
192 $self->default_recv ($type, \%kv, $rdata);
193 }
194 };
195
196 my $hdr_cb; $hdr_cb = sub {
197 if ($_[1] =~ /^([^=]+)=(.*)$/) {
198 my ($k, $v) = ($1, $2);
199 my @k = split /\./, tolc $k;
200 my $ro = \\%kv;
201
202 while (@k) {
203 my $k = shift @k;
204 if ($k =~ /^\d+$/) {
205 $ro = \$$ro->[$k];
206 } else {
207 $ro = \$$ro->{$k};
208 }
209 }
210
211 $$ro = $v;
212
213 $_[0]->push_read (line => $hdr_cb);
214 } elsif ($_[1] eq "Data") {
215 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
216 $rdata = \$_[1];
217 $done_cb->();
218 });
219 } elsif ($_[1] eq "EndMessage") {
220 $done_cb->();
221 } else {
222 die "protocol error, expected message end, got $_[1]\n";#d#
223 }
224 };
225
226 $self->{hdl}->push_read (line => sub {
227 $type = tolc $_[1];
228 $_[0]->push_read (line => $hdr_cb);
144 ); 229 });
230}
145 231
146=cut 232sub default_recv {
233 my ($self, $type, $kv, $rdata) = @_;
147 234
235 if ($type eq "node_hello") {
236 $self->{node_hello} = $kv;
237 } elsif (exists $self->{id}{$kv->{identifier}}) {
238 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
239 and delete $self->{id}{$kv->{identifier}};
240 } else {
241 &{ $self->{progress} };
242 }
243}
244
148sub txn { 245sub _txn {
149 my ($self, $type, %attr) = @_;
150
151 $type = touc $type;
152
153 my $txn = "Net::FCP::Txn::$type"->new (fcp => $self, type => tolc $type, attr => \%attr);
154
155 $txn;
156}
157
158{ # transactions
159
160my $txn = sub {
161 my ($name, $sub) = @_; 246 my ($name, $sub) = @_;
247
162 *{"txn_$name"} = $sub; 248 *{$name} = sub {
163 *{$name} = sub { $sub->(@_)->result }; 249 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
164}; 250 &$sub;
165 251 $cv
166=item $txn = $fcp->txn_client_hello
167
168=item $nodehello = $fcp->client_hello
169
170Executes a ClientHello request and returns it's results.
171
172 {
173 max_file_size => "5f5e100",
174 node => "Fred,0.6,1.46,7050"
175 protocol => "1.2",
176 } 252 };
177 253
178=cut 254 *{"$name\_sync"} = sub {
179 255 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
180$txn->(client_hello => sub { 256 &$sub;
181 my ($self) = @_; 257 $cv->recv
182
183 $self->txn ("client_hello");
184});
185
186=item $txn = $fcp->txn_client_info
187
188=item $nodeinfo = $fcp->client_info
189
190Executes a ClientInfo request and returns it's results.
191
192 {
193 active_jobs => "1f",
194 allocated_memory => "bde0000",
195 architecture => "i386",
196 available_threads => 17,
197 datastore_free => "5ce03400",
198 datastore_max => "2540be400",
199 datastore_used => "1f72bb000",
200 estimated_load => 52,
201 free_memory => "5cc0148",
202 is_transient => "false",
203 java_name => "Java HotSpot(_T_M) Server VM",
204 java_vendor => "http://www.blackdown.org/",
205 java_version => "Blackdown-1.4.1-01",
206 least_recent_timestamp => "f41538b878",
207 max_file_size => "5f5e100",
208 most_recent_timestamp => "f77e2cc520"
209 node_address => "1.2.3.4",
210 node_port => 369,
211 operating_system => "Linux",
212 operating_system_version => "2.4.20",
213 routing_time => "a5",
214 } 258 };
259}
215 260
216=cut 261=item $cv = $fcp->list_peers ([$with_metdata[, $with_volatile]])
217 262
218$txn->(client_info => sub { 263=item $peers = $fcp->list_peers_sync ([$with_metdata[, $with_volatile]])
219 my ($self) = @_;
220 264
221 $self->txn ("client_info"); 265=cut
266
267_txn list_peers => sub {
268 my ($self, $cv, $with_metadata, $with_volatile) = @_;
269
270 my @res;
271
272 $self->send_msg (list_peers =>
273 with_metadata => $with_metadata ? "true" : "false",
274 with_volatile => $with_volatile ? "true" : "false",
275 id_cb => sub {
276 my ($self, $type, $kv, $rdata) = @_;
277
278 if ($type eq "end_list_peers") {
279 $cv->(\@res);
280 1
281 } else {
282 push @res, $kv;
283 0
284 }
285 },
286 );
222}); 287};
223 288
224=item $txn = $fcp->txn_generate_chk ($metadata, $data[, $cipher]) 289=item $cv = $fcp->list_peer_notes ($node_identifier)
225 290
226=item $uri = $fcp->generate_chk ($metadata, $data[, $cipher]) 291=item $notes = $fcp->list_peer_notes_sync ($node_identifier)
227 292
228Calculates a CHK, given the metadata and data. C<$cipher> is either
229C<Rijndael> or C<Twofish>, with the latter being the default.
230
231=cut 293=cut
232 294
233$txn->(generate_chk => sub { 295_txn list_peer_notes => sub {
234 my ($self, $metadata, $data, $cipher) = @_; 296 my ($self, $cv, $node_identifier) = @_;
235 297
236 $metadata = Net::FCP::Metadata::build_metadata $metadata; 298 $self->send_msg (list_peer_notes =>
299 node_identifier => $node_identifier,
300 id_cb => sub {
301 my ($self, $type, $kv, $rdata) = @_;
237 302
238 $self->txn (generate_chk => 303 $cv->($kv);
239 data => "$metadata$data", 304 1
240 metadata_length => xeh length $metadata, 305 },
241 cipher => $cipher || "Twofish"); 306 );
242}); 307};
243 308
244=item $txn = $fcp->txn_generate_svk_pair 309=item $cv = $fcp->watch_global ($enabled[, $verbosity_mask])
245 310
246=item ($public, $private, $crypto) = @{ $fcp->generate_svk_pair } 311=item $fcp->watch_global_sync ($enabled[, $verbosity_mask])
247 312
248Creates a new SVK pair. Returns an arrayref with the public key, the
249private key and a crypto key, which is just additional entropy.
250
251 [
252 "acLx4dux9fvvABH15Gk6~d3I-yw",
253 "cPoDkDMXDGSMM32plaPZDhJDxSs",
254 "BH7LXCov0w51-y9i~BoB3g",
255 ]
256
257A private key (for inserting) can be constructed like this:
258
259 SSK@<private_key>,<crypto_key>/<name>
260
261It can be used to insert data. The corresponding public key looks like this:
262
263 SSK@<public_key>PAgM,<crypto_key>/<name>
264
265Watch out for the C<PAgM>-part!
266
267=cut 313=cut
268 314
269$txn->(generate_svk_pair => sub { 315_txn watch_global => sub {
270 my ($self) = @_; 316 my ($self, $cv, $enabled, $verbosity_mask) = @_;
271 317
272 $self->txn ("generate_svk_pair"); 318 $self->send_msg (watch_global =>
319 enabled => $enabled ? "true" : "false",
320 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
321 );
322
323 $cv->();
273}); 324};
274 325
275=item $txn = $fcp->txn_invert_private_key ($private) 326=item $cv = $fcp->list_persistent_requests
276 327
277=item $public = $fcp->invert_private_key ($private) 328=item $reqs = $fcp->list_persistent_requests_sync
278 329
279Inverts a private key (returns the public key). C<$private> can be either
280an insert URI (must start with C<freenet:SSK@>) or a raw private key (i.e.
281the private value you get back from C<generate_svk_pair>).
282
283Returns the public key.
284
285=cut 330=cut
286 331
287$txn->(invert_private_key => sub { 332_txn list_persistent_requests => sub {
288 my ($self, $privkey) = @_;
289
290 $self->txn (invert_private_key => private => $privkey);
291});
292
293=item $txn = $fcp->txn_get_size ($uri)
294
295=item $length = $fcp->get_size ($uri)
296
297Finds and returns the size (rounded up to the nearest power of two) of the
298given document.
299
300=cut
301
302$txn->(get_size => sub {
303 my ($self, $uri) = @_; 333 my ($self, $cv) = @_;
304 334
305 $self->txn (get_size => URI => $uri); 335 my %res;
306});
307 336
308=item $txn = $fcp->txn_client_get ($uri [, $htl = 15 [, $removelocal = 0]]) 337 $self->send_msg ("list_persistent_requests");
309 338
310=item ($metadata, $data) = @{ $fcp->client_get ($uri, $htl, $removelocal) 339 push @{ $self->{queue} }, sub {
340 my ($self, $type, $kv, $rdata) = @_;
311 341
312Fetches a (small, as it should fit into memory) key content block from 342 if ($type eq "end_list_persistent_requests") {
313freenet. C<$meta> is a C<Net::FCP::Metadata> object or C<undef>). 343 $cv->(\%res);
344 1
345 } else {
346 my $id = $kv->{identifier};
314 347
315The C<$uri> should begin with C<freenet:>, but the scheme is currently 348 if ($type =~ /^persistent_(get|put|put_dir)$/) {
316added, if missing. 349 $res{$id} = {
317 350 type => $1,
318 my ($meta, $data) = @{ 351 %{ $res{$id} },
319 $fcp->client_get ( 352 %$kv,
320 "freenet:CHK@hdXaxkwZ9rA8-SidT0AN-bniQlgPAwI,XdCDmBuGsd-ulqbLnZ8v~w" 353 };
354 } elsif ($type eq "simple_progress") {
355 delete $kv->{pkt_type}; # save memory
356 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
357 } else {
358 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
359 }
360 0
321 ) 361 }
322 }; 362 };
363};
323 364
324=cut 365=item $cv = $fcp->remove_request ($global, $identifier)
325 366
367=item $status = $fcp->remove_request_sync ($global, $identifier)
368
369=cut
370
371_txn remove_request => sub {
372 my ($self, $cv, $global, $identifier) = @_;
373
374 $self->send_msg (remove_request =>
375 global => $global ? "true" : "false",
376 identifier => $identifier,
377 id_cb => sub {
378 my ($self, $type, $kv, $rdata) = @_;
379
380 $cv->($kv);
381 1
382 },
383 );
384};
385
386=item $cv = $fcp->modify_persistent_request ($global, $identifier[, $client_token[, $priority_class]])
387
388=item $sync = $fcp->modify_persistent_request_sync ($global, $identifier[, $client_token[, $priority_class]])
389
390=cut
391
392_txn modify_persistent_request => sub {
393 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
394
395 $self->send_msg (modify_persistent_request =>
396 global => $global ? "true" : "false",
397 defined $client_token ? (client_token => $client_token ) : (),
398 defined $priority_class ? (priority_class => $priority_class) : (),
399 identifier => $identifier,
400 id_cb => sub {
401 my ($self, $type, $kv, $rdata) = @_;
402
403 $cv->($kv);
404 1
405 },
406 );
407};
408
409=item $cv = $fcp->get_plugin_info ($name, $detailed)
410
411=item $info = $fcp->get_plugin_info_sync ($name, $detailed)
412
413=cut
414
415_txn get_plugin_info => sub {
416 my ($self, $cv, $name, $detailed) = @_;
417
418 $self->send_msg (get_plugin_info =>
419 plugin_name => $name,
420 detailed => $detailed ? "true" : "false",
421 id_cb => sub {
422 my ($self, $type, $kv, $rdata) = @_;
423
424 $cv->($kv);
425 1
426 },
427 );
428};
429
430=item $cv = $fcp->client_get ($uri, $identifier, %kv)
431
432=item $status = $fcp->client_get_sync ($uri, $identifier, %kv)
433
434%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
435
436ignore_ds, ds_only, verbosity, max_size, max_temp_size, max_retries,
437priority_class, persistence, client_token, global, return_type,
438binary_blob, allowed_mime_types, filename, temp_filename
439
440=cut
441
326$txn->(client_get => sub { 442_txn client_get => sub {
327 my ($self, $uri, $htl, $removelocal) = @_; 443 my ($self, $cv, $uri, $identifier, %kv) = @_;
328 444
329 $uri =~ s/^freenet://; $uri = "freenet:$uri"; 445 $self->send_msg (client_get =>
446 %kv,
447 uri => $uri,
448 identifier => $identifier,
449 id_cb => sub {
450 my ($self, $type, $kv, $rdata) = @_;
330 451
331 $self->txn (client_get => URI => $uri, hops_to_live => xeh (defined $htl ? $htl : 15), 452 $cv->($kv);
332 remove_local_key => $removelocal ? "true" : "false"); 453 1
454 },
455 );
333}); 456};
334
335=item $txn = $fcp->txn_client_put ($uri, $metadata, $data, $htl, $removelocal)
336
337=item my $uri = $fcp->client_put ($uri, $metadata, $data, $htl, $removelocal);
338
339Insert a new key. If the client is inserting a CHK, the URI may be
340abbreviated as just CHK@. In this case, the node will calculate the
341CHK. If the key is a private SSK key, the node will calculcate the public
342key and the resulting public URI.
343
344C<$meta> can be a hash reference (same format as returned by
345C<Net::FCP::parse_metadata>) or a string.
346
347The result is an arrayref with the keys C<uri>, C<public_key> and C<private_key>.
348
349=cut
350
351$txn->(client_put => sub {
352 my ($self, $uri, $metadata, $data, $htl, $removelocal) = @_;
353
354 $metadata = Net::FCP::Metadata::build_metadata $metadata;
355 $uri =~ s/^freenet://; $uri = "freenet:$uri";
356
357 $self->txn (client_put => URI => $uri,
358 hops_to_live => xeh (defined $htl ? $htl : 15),
359 remove_local_key => $removelocal ? "true" : "false",
360 data => "$metadata$data", metadata_length => xeh length $metadata);
361});
362
363} # transactions
364 457
365=back 458=back
366 459
367=head2 THE Net::FCP::Txn CLASS 460=head1 EXAMPLE PROGRAM
368 461
369All requests (or transactions) are executed in a asynchronous way. For 462 use AnyEvent::FCP;
370each request, a C<Net::FCP::Txn> object is created (worse: a tcp
371connection is created, too).
372 463
373For each request there is actually a different subclass (and it's possible 464 my $fcp = new AnyEvent::FCP;
374to subclass these, although of course not documented).
375 465
376The most interesting method is C<result>. 466 # let us look at the global request list
467 $fcp->watch_global (1, 0);
377 468
378=over 4 469 # list them, synchronously
470 my $req = $fcp->list_persistent_requests_sync;
379 471
380=cut 472 # go through all requests
473 for my $req (values %$req) {
474 # skip jobs not directly-to-disk
475 next unless $req->{return_type} eq "disk";
476 # skip jobs not issued by FProxy
477 next unless $req->{identifier} =~ /^FProxy:/;
381 478
382package Net::FCP::Txn;
383
384use Fcntl;
385use Socket;
386
387=item new arg => val,...
388
389Creates a new C<Net::FCP::Txn> object. Not normally used.
390
391=cut
392
393sub new {
394 my $class = shift;
395 my $self = bless { @_ }, $class;
396
397 $self->{signal} = AnyEvent->condvar;
398
399 $self->{fcp}{txn}{$self} = $self;
400
401 my $attr = "";
402 my $data = delete $self->{attr}{data};
403
404 while (my ($k, $v) = each %{$self->{attr}}) {
405 $attr .= (Net::FCP::touc $k) . "=$v\012"
406 }
407
408 if (defined $data) {
409 $attr .= sprintf "DataLength=%x\012", length $data;
410 $data = "Data\012$data";
411 } else {
412 $data = "EndMessage\012";
413 }
414
415 socket my $fh, PF_INET, SOCK_STREAM, 0
416 or Carp::croak "unable to create new tcp socket: $!";
417 binmode $fh, ":raw";
418 fcntl $fh, F_SETFL, O_NONBLOCK;
419 connect $fh, (sockaddr_in $self->{fcp}{port}, inet_aton $self->{fcp}{host});
420# and Carp::croak "FCP::txn: unable to connect to $self->{fcp}{host}:$self->{fcp}{port}: $!\n";
421
422 $self->{sbuf} =
423 "\x00\x00\x00\x02"
424 . (Net::FCP::touc $self->{type})
425 . "\012$attr$data";
426
427 #shutdown $fh, 1; # freenet buggy?, well, it's java...
428
429 $self->{fh} = $fh;
430
431 $self->{w} = AnyEvent->io (fh => $fh, poll => 'w', cb => sub { $self->fh_ready_w });
432
433 $self;
434}
435
436=item $txn = $txn->cb ($coderef)
437
438Sets a callback to be called when the request is finished. The coderef
439will be called with the txn as it's sole argument, so it has to call
440C<result> itself.
441
442Returns the txn object, useful for chaining.
443
444Example:
445
446 $fcp->txn_client_get ("freenet:CHK....")
447 ->userdata ("ehrm")
448 ->cb(sub {
449 my $data = shift->result;
450 });
451
452=cut
453
454sub cb($$) {
455 my ($self, $cb) = @_;
456 $self->{cb} = $cb;
457 $self;
458}
459
460=item $txn = $txn->userdata ([$userdata])
461
462Set user-specific data. This is useful in progress callbacks. The data can be accessed
463using C<< $txn->{userdata} >>.
464
465Returns the txn object, useful for chaining.
466
467=cut
468
469sub userdata($$) {
470 my ($self, $data) = @_;
471 $self->{userdata} = $data;
472 $self;
473}
474
475=item $txn->cancel (%attr)
476
477Cancels the operation with a C<cancel> exception and the given attributes
478(consider at least giving the attribute C<reason>).
479
480UNTESTED.
481
482=cut
483
484sub cancel {
485 my ($self, %attr) = @_;
486 $self->throw (Net::FCP::Exception->new (cancel => { %attr }));
487 $self->set_result;
488 $self->eof;
489}
490
491sub fh_ready_w {
492 my ($self) = @_;
493
494 my $len = syswrite $self->{fh}, $self->{sbuf};
495
496 if ($len > 0) {
497 substr $self->{sbuf}, 0, $len, "";
498 unless (length $self->{sbuf}) {
499 fcntl $self->{fh}, F_SETFL, 0;
500 $self->{w} = AnyEvent->io (fh => $self->{fh}, poll => 'r', cb => sub { $self->fh_ready_r });
501 }
502 } elsif (defined $len) {
503 $self->throw (Net::FCP::Exception->new (network_error => { reason => "unexpected end of file while writing" }));
504 } else {
505 $self->throw (Net::FCP::Exception->new (network_error => { reason => "$!" }));
506 }
507}
508
509sub fh_ready_r {
510 my ($self) = @_;
511
512 if (sysread $self->{fh}, $self->{buf}, 16384 + 1024, length $self->{buf}) {
513 for (;;) {
514 if ($self->{datalen}) { 479 if ($req->{data_found}) {
515 #warn "expecting new datachunk $self->{datalen}, got ".(length $self->{buf})."\n";#d# 480 # file has been successfully downloaded
516 if (length $self->{buf} >= $self->{datalen}) {
517 $self->rcv_data (substr $self->{buf}, 0, delete $self->{datalen}, "");
518 } else {
519 last;
520 } 481
521 } elsif ($self->{buf} =~ s/^DataChunk\015?\012Length=([0-9a-fA-F]+)\015?\012Data\015?\012//) { 482 ... move the file away
522 $self->{datalen} = hex $1; 483 (left as exercise)
523 #warn "expecting new datachunk $self->{datalen}\n";#d# 484
524 } elsif ($self->{buf} =~ s/^([a-zA-Z]+)\015?\012(?:(.+?)\015?\012)?EndMessage\015?\012//s) { 485 # remove the request
525 $self->rcv ($1, { 486
526 map { my ($a, $b) = split /=/, $_, 2; ((Net::FCP::tolc $a), $b) } 487 $fcp->remove_request (1, $req->{identifier});
527 split /\015?\012/, $2 488 } elsif ($req->{get_failed}) {
528 }); 489 # request has failed
490 if ($req->{get_failed}{code} == 11) {
491 # too many path components, should restart
529 } else { 492 } else {
530 last; 493 # other failure
531 } 494 }
532 }
533 } else { 495 } else {
534 $self->eof; 496 # modify priorities randomly, to improve download rates
535 } 497 $fcp->modify_persistent_request (1, $req->{identifier}, undef, int 6 - 5 * (rand) ** 1.7)
536} 498 if 0.1 > rand;
537
538sub rcv {
539 my ($self, $type, $attr) = @_;
540
541 $type = Net::FCP::tolc $type;
542
543 #use PApp::Util; warn PApp::Util::dumpval [$type, $attr];
544
545 if (my $method = $self->can("rcv_$type")) {
546 $method->($self, $attr, $type);
547 } else {
548 warn "received unexpected reply type '$type' for '$self->{type}', ignoring\n";
549 }
550}
551
552# used as a default exception thrower
553sub rcv_throw_exception {
554 my ($self, $attr, $type) = @_;
555 $self->throw (Net::FCP::Exception->new ($type, $attr));
556}
557
558*rcv_failed = \&Net::FCP::Txn::rcv_throw_exception;
559*rcv_format_error = \&Net::FCP::Txn::rcv_throw_exception;
560
561sub throw {
562 my ($self, $exc) = @_;
563
564 $self->{exception} = $exc;
565 $self->set_result;
566 $self->eof; # must be last to avoid loops
567}
568
569sub set_result {
570 my ($self, $result) = @_;
571
572 unless (exists $self->{result}) {
573 $self->{result} = $result;
574 $self->{cb}->($self) if exists $self->{cb};
575 $self->{signal}->broadcast;
576 }
577}
578
579sub eof {
580 my ($self) = @_;
581
582 delete $self->{w};
583 delete $self->{fh};
584
585 delete $self->{fcp}{txn}{$self};
586
587 unless (exists $self->{result}) {
588 $self->throw (Net::FCP::Exception->new (short_data => {
589 reason => "unexpected eof or internal node error",
590 })); 499 }
591 }
592}
593
594sub progress {
595 my ($self, $type, $attr) = @_;
596
597 $self->{fcp}->progress ($self, $type, $attr);
598}
599
600=item $result = $txn->result
601
602Waits until a result is available and then returns it.
603
604This waiting is (depending on your event model) not very efficient, as it
605is done outside the "mainloop". The biggest problem, however, is that it's
606blocking one thread of execution. Try to use the callback mechanism, if
607possible, and call result from within the callback (or after is has been
608run), as then no waiting is necessary.
609
610=cut
611
612sub result {
613 my ($self) = @_;
614
615 $self->{signal}->wait while !exists $self->{result};
616
617 die $self->{exception} if $self->{exception};
618
619 return $self->{result};
620}
621
622package Net::FCP::Txn::ClientHello;
623
624use base Net::FCP::Txn;
625
626sub rcv_node_hello {
627 my ($self, $attr) = @_;
628
629 $self->set_result ($attr);
630}
631
632package Net::FCP::Txn::ClientInfo;
633
634use base Net::FCP::Txn;
635
636sub rcv_node_info {
637 my ($self, $attr) = @_;
638
639 $self->set_result ($attr);
640}
641
642package Net::FCP::Txn::GenerateCHK;
643
644use base Net::FCP::Txn;
645
646sub rcv_success {
647 my ($self, $attr) = @_;
648
649 $self->set_result ($attr->{uri});
650}
651
652package Net::FCP::Txn::GenerateSVKPair;
653
654use base Net::FCP::Txn;
655
656sub rcv_success {
657 my ($self, $attr) = @_;
658 $self->set_result ([$attr->{public_key}, $attr->{private_key}, $attr->{crypto_key}]);
659}
660
661package Net::FCP::Txn::InvertPrivateKey;
662
663use base Net::FCP::Txn;
664
665sub rcv_success {
666 my ($self, $attr) = @_;
667 $self->set_result ($attr->{public_key});
668}
669
670package Net::FCP::Txn::GetSize;
671
672use base Net::FCP::Txn;
673
674sub rcv_success {
675 my ($self, $attr) = @_;
676 $self->set_result (hex $attr->{length});
677}
678
679package Net::FCP::Txn::GetPut;
680
681# base class for get and put
682
683use base Net::FCP::Txn;
684
685*rcv_uri_error = \&Net::FCP::Txn::rcv_throw_exception;
686*rcv_route_not_found = \&Net::FCP::Txn::rcv_throw_exception;
687
688sub rcv_restarted {
689 my ($self, $attr, $type) = @_;
690
691 delete $self->{datalength};
692 delete $self->{metalength};
693 delete $self->{data};
694
695 $self->progress ($type, $attr);
696}
697
698package Net::FCP::Txn::ClientGet;
699
700use base Net::FCP::Txn::GetPut;
701
702*rcv_data_not_found = \&Net::FCP::Txn::rcv_throw_exception;
703
704sub rcv_data {
705 my ($self, $chunk) = @_;
706
707 $self->{data} .= $chunk;
708
709 $self->progress ("data", { chunk => length $chunk, received => length $self->{data}, total => $self->{datalength} });
710
711 if ($self->{datalength} == length $self->{data}) {
712 my $data = delete $self->{data};
713 my $meta = new Net::FCP::Metadata (substr $data, 0, $self->{metalength}, "");
714
715 $self->set_result ([$meta, $data]);
716 $self->eof;
717 }
718}
719
720sub rcv_data_found {
721 my ($self, $attr, $type) = @_;
722
723 $self->progress ($type, $attr);
724
725 $self->{datalength} = hex $attr->{data_length};
726 $self->{metalength} = hex $attr->{metadata_length};
727}
728
729package Net::FCP::Txn::ClientPut;
730
731use base Net::FCP::Txn::GetPut;
732
733*rcv_size_error = \&Net::FCP::Txn::rcv_throw_exception;
734
735sub rcv_pending {
736 my ($self, $attr, $type) = @_;
737 $self->progress ($type, $attr);
738}
739
740sub rcv_success {
741 my ($self, $attr, $type) = @_;
742 $self->set_result ($attr);
743}
744
745sub rcv_key_collision {
746 my ($self, $attr, $type) = @_;
747 $self->set_result ({ key_collision => 1, %$attr });
748}
749
750=back
751
752=head2 The Net::FCP::Exception CLASS
753
754Any unexpected (non-standard) responses that make it impossible to return
755the advertised result will result in an exception being thrown when the
756C<result> method is called.
757
758These exceptions are represented by objects of this class.
759
760=over 4
761
762=cut
763
764package Net::FCP::Exception;
765
766use overload
767 '""' => sub {
768 "Net::FCP::Exception<<$_[0][0]," . (join ":", %{$_[0][1]}) . ">>";
769 }; 500 }
770 501
771=item $exc = new Net::FCP::Exception $type, \%attr 502 # see if the dummy plugin is loaded, to ensure all previous requests have finished.
772 503 $fcp->get_plugin_info_sync ("dummy");
773Create a new exception object of the given type (a string like
774C<route_not_found>), and a hashref containing additional attributes
775(usually the attributes of the message causing the exception).
776
777=cut
778
779sub new {
780 my ($class, $type, $attr) = @_;
781
782 bless [Net::FCP::tolc $type, { %$attr }], $class;
783}
784
785=item $exc->type([$type])
786
787With no arguments, returns the exception type. Otherwise a boolean
788indicating wether the exception is of the given type is returned.
789
790=cut
791
792sub type {
793 my ($self, $type) = @_;
794
795 @_ >= 2
796 ? $self->[0] eq $type
797 : $self->[0];
798}
799
800=item $exc->attr([$attr])
801
802With no arguments, returns the attributes. Otherwise the named attribute
803value is returned.
804
805=cut
806
807sub attr {
808 my ($self, $attr) = @_;
809
810 @_ >= 2
811 ? $self->[1]{$attr}
812 : $self->[1];
813}
814
815=back
816 504
817=head1 SEE ALSO 505=head1 SEE ALSO
818 506
819L<http://freenet.sf.net>. 507L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0>, L<Net::FCP>.
820 508
821=head1 BUGS 509=head1 BUGS
822 510
823=head1 AUTHOR 511=head1 AUTHOR
824 512

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines