ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.1 by root, Sat Jul 18 05:57:59 2009 UTC vs.
Revision 1.2 by root, Sat Jul 25 06:28:49 2009 UTC

49 49
50=cut 50=cut
51 51
52package AnyEvent::FCP; 52package AnyEvent::FCP;
53 53
54use common::sense;
55
54use Carp; 56use Carp;
55 57
56$VERSION = '0.1'; 58our $VERSION = '0.1';
57 59
58no warnings; 60use Scalar::Util ();
59 61
60use AnyEvent; 62use AnyEvent;
61use AnyEvent::Socket; 63use AnyEvent::Handle;
62 64
63sub touc($) { 65sub touc($) {
64 local $_ = shift; 66 local $_ = shift;
65 1 while s/((?:^|_)(?:svk|chk|uri)(?:_|$))/\U$1/; 67 1 while s/((?:^|_)(?:svk|chk|uri|fcp)(?:_|$))/\U$1/;
66 s/(?:^|_)(.)/\U$1/g; 68 s/(?:^|_)(.)/\U$1/g;
67 $_ 69 $_
68} 70}
69 71
70sub tolc($) { 72sub tolc($) {
71 local $_ = shift; 73 local $_ = shift;
72 1 while s/(SVK|CHK|URI)([^_])/$1\_$2/i; 74 1 while s/(SVK|CHK|URI|FCP)([^_])/$1\_$2/i;
73 1 while s/([^_])(SVK|CHK|URI)/$1\_$2/i; 75 1 while s/([^_])(SVK|CHK|URI|FCP)/$1\_$2/i;
74 s/(?<=[a-z])(?=[A-Z])/_/g; 76 s/(?<=[a-z])(?=[A-Z])/_/g;
75 lc 77 lc
76} 78}
77 79
78=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name] 80=item $fcp = new AnyEvent::FCP [host => $host][, port => $port][, progress => \&cb][, name => $name]
79 81
80Create a new FCP connection to the given host and port (default 82Create a new FCP connection to the given host and port (default
81127.0.0.1:8481, or the environment variables C<FREDHOST> and C<FREDPORT>). 83127.0.0.1:9481, or the environment variables C<FREDHOST> and C<FREDPORT>).
82 84
83If no C<name> was specified, then AnyEvent::FCP will generate a (hopefully) 85If no C<name> was specified, then AnyEvent::FCP will generate a (hopefully)
84unique client name for you. 86unique client name for you.
85 87
86#TODO 88#TODO
87#You can install a progress callback that is being called with the Net::FCP 89#You can install a progress callback that is being called with the AnyEvent::FCP
88#object, a txn object, the type of the transaction and the attributes. Use 90#object, a txn object, the type of the transaction and the attributes. Use
89#it like this: 91#it like this:
90# 92#
91# sub progress_cb { 93# sub progress_cb {
92# my ($self, $txn, $type, $attr) = @_; 94# my ($self, $txn, $type, $attr) = @_;
98 100
99sub new { 101sub new {
100 my $class = shift; 102 my $class = shift;
101 my $self = bless { @_ }, $class; 103 my $self = bless { @_ }, $class;
102 104
103 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 105 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1";
104 $self->{port} ||= $ENV{FREDPORT} || 9481; 106 $self->{port} ||= $ENV{FREDPORT} || 9481;
105 $self->{name} ||= time.rand.rand.rand; # lame 107 $self->{name} ||= time.rand.rand.rand; # lame
108 $self->{timeout} ||= 600;
106 109
110 $self->{id} = "a0";
111
112 {
113 Scalar::Util::weaken (my $self = $self);
114
107 $self->{conn} = new AnyEvent::Socket 115 $self->{hdl} = new AnyEvent::Handle
108 PeerAddr => "$self->{host}:$self->{port}", 116 connect => [$self->{host} => $self->{port}],
117 timeout => $self->{timeout},
118 on_error => sub {
119 warn "<@_>\n";
120 exit 1;
121 },
122 on_read => sub { $self->on_read (@_) },
109 on_eof => $self->{on_eof} || sub { }, 123 on_eof => $self->{on_eof} || sub { };
124
125 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
126 }
127
128 $self->send_msg (
129 client_hello =>
130 name => $self->{name},
131 expected_version => "2.0",
132 );
110 133
111 $self 134 $self
112} 135}
113 136
114sub progress { 137sub progress {
116 139
117 $self->{progress}->($self, $txn, $type, $attr) 140 $self->{progress}->($self, $txn, $type, $attr)
118 if $self->{progress}; 141 if $self->{progress};
119} 142}
120 143
121=item $txn = $fcp->txn (type => attr => val,...) 144sub send_msg {
145 my ($self, $type, %kv) = @_;
122 146
123The low-level interface to transactions. Don't use it unless you have 147 my $data = delete $kv{data};
124"special needs". Instead, use predefiend transactions like this:
125 148
126The blocking case, no (visible) transactions involved: 149 if (exists $kv{id_cb}) {
127 150 my $id = $kv{identifier} || ++$self->{id};
128 my $nodehello = $fcp->client_hello; 151 $self->{id}{$id} = delete $kv{id_cb};
129 152 $kv{identifier} = $id;
130A transaction used in a blocking fashion:
131 153 }
132 my $txn = $fcp->txn_client_hello;
133 ...
134 my $nodehello = $txn->result;
135 154
136Or shorter: 155 my $msg = (touc $type) . "\012"
156 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
137 157
138 my $nodehello = $fcp->txn_client_hello->result; 158 sub id {
159 my ($self) = @_;
139 160
140Setting callbacks:
141 161
142 $fcp->txn_client_hello->cb( 162 }
143 sub { my $nodehello => $_[0]->result } 163
164 if (defined $data) {
165 $msg .= "DataLength=" . (length $data) . "\012"
166 . "Data\012$data";
167 } else {
168 $msg .= "EndMessage\012";
169 }
170
171 $self->{hdl}->push_write ($msg);
172}
173
174sub on_read {
175 my ($self) = @_;
176
177 my $type;
178 my %kv;
179 my $rdata;
180
181 my $done_cb = sub {
182 $kv{pkt_type} = $type;
183
184 if (my $cb = $self->{queue}[0]) {
185 $cb->($self, $type, \%kv, $rdata)
186 and shift @{ $self->{queue} };
187 } else {
188 $self->default_recv ($type, \%kv, $rdata);
189 }
190 };
191
192 my $hdr_cb; $hdr_cb = sub {
193 if ($_[1] =~ /^([^=]+)=(.*)$/) {
194 my ($k, $v) = ($1, $2);
195 my @k = split /\./, tolc $k;
196 my $ro = \\%kv;
197
198 while (@k) {
199 my $k = shift @k;
200 if ($k =~ /^\d+$/) {
201 $ro = \$$ro->[$k];
202 } else {
203 $ro = \$$ro->{$k};
204 }
205 }
206
207 $$ro = $v;
208
209 $_[0]->push_read (line => $hdr_cb);
210 } elsif ($_[1] eq "Data") {
211 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
212 $rdata = \$_[1];
213 $done_cb->();
214 });
215 } elsif ($_[1] eq "EndMessage") {
216 $done_cb->();
217 } else {
218 die "protocol error, expected message end, got $_[1]\n";#d#
219 }
220 };
221
222 $self->{hdl}->push_read (line => sub {
223 $type = tolc $_[1];
224 $_[0]->push_read (line => $hdr_cb);
144 ); 225 });
226}
145 227
146=cut 228sub default_recv {
229 my ($self, $type, $kv, $rdata) = @_;
147 230
231 if ($type eq "node_hello") {
232 $self->{node_hello} = $kv;
233 } elsif (exists $self->{id}{$kv->{identifier}}) {
234 $self->{id}{$kv->{identifier}}($self, $type, $kv, $rdata)
235 and delete $self->{id}{$kv->{identifier}};
236 } else {
237 # on_warn
238 #warn "protocol warning (unexpected $type message)\n";
239 }
240}
241
148sub txn { 242sub _txn {
149 my ($self, $type, %attr) = @_;
150
151 $type = touc $type;
152
153 my $txn = "Net::FCP::Txn::$type"->new (fcp => $self, type => tolc $type, attr => \%attr);
154
155 $txn;
156}
157
158{ # transactions
159
160my $txn = sub {
161 my ($name, $sub) = @_; 243 my ($name, $sub) = @_;
244
162 *{"txn_$name"} = $sub; 245 *{$name} = sub {
163 *{$name} = sub { $sub->(@_)->result }; 246 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
164}; 247 &$sub;
165 248 $cv
166=item $txn = $fcp->txn_client_hello
167
168=item $nodehello = $fcp->client_hello
169
170Executes a ClientHello request and returns it's results.
171
172 {
173 max_file_size => "5f5e100",
174 node => "Fred,0.6,1.46,7050"
175 protocol => "1.2",
176 } 249 };
177 250
178=cut 251 *{"$name\_sync"} = sub {
179 252 splice @_, 1, 0, (my $cv = AnyEvent->condvar);
180$txn->(client_hello => sub { 253 &$sub;
181 my ($self) = @_; 254 $cv->recv
182
183 $self->txn ("client_hello");
184});
185
186=item $txn = $fcp->txn_client_info
187
188=item $nodeinfo = $fcp->client_info
189
190Executes a ClientInfo request and returns it's results.
191
192 {
193 active_jobs => "1f",
194 allocated_memory => "bde0000",
195 architecture => "i386",
196 available_threads => 17,
197 datastore_free => "5ce03400",
198 datastore_max => "2540be400",
199 datastore_used => "1f72bb000",
200 estimated_load => 52,
201 free_memory => "5cc0148",
202 is_transient => "false",
203 java_name => "Java HotSpot(_T_M) Server VM",
204 java_vendor => "http://www.blackdown.org/",
205 java_version => "Blackdown-1.4.1-01",
206 least_recent_timestamp => "f41538b878",
207 max_file_size => "5f5e100",
208 most_recent_timestamp => "f77e2cc520"
209 node_address => "1.2.3.4",
210 node_port => 369,
211 operating_system => "Linux",
212 operating_system_version => "2.4.20",
213 routing_time => "a5",
214 } 255 };
256}
215 257
216=cut 258_txn list_peers => sub {
259 my ($self, $cv, $with_metadata, $with_volatile) = @_;
217 260
218$txn->(client_info => sub { 261 my @res;
219 my ($self) = @_;
220 262
221 $self->txn ("client_info"); 263 $self->send_msg (list_peers =>
264 with_metadata => $with_metadata ? "true" : "false",
265 with_volatile => $with_volatile ? "true" : "false",
266 id_cb => sub {
267 my ($self, $type, $kv, $rdata) = @_;
268
269 if ($type eq "end_list_peers") {
270 $cv->(\@res);
271 1
272 } else {
273 push @res, $kv;
274 0
275 }
276 },
277 );
222}); 278};
223 279
224=item $txn = $fcp->txn_generate_chk ($metadata, $data[, $cipher]) 280_txn list_peer_notes => sub {
281 my ($self, $cv, $node_identifier) = @_;
225 282
226=item $uri = $fcp->generate_chk ($metadata, $data[, $cipher]) 283 $self->send_msg (list_peer_notes =>
284 node_identifier => $node_identifier,
285 id_cb => sub {
286 my ($self, $type, $kv, $rdata) = @_;
227 287
228Calculates a CHK, given the metadata and data. C<$cipher> is either 288 $cv->($kv);
229C<Rijndael> or C<Twofish>, with the latter being the default. 289 1
230 290 },
231=cut 291 );
232
233$txn->(generate_chk => sub {
234 my ($self, $metadata, $data, $cipher) = @_;
235
236 $metadata = Net::FCP::Metadata::build_metadata $metadata;
237
238 $self->txn (generate_chk =>
239 data => "$metadata$data",
240 metadata_length => xeh length $metadata,
241 cipher => $cipher || "Twofish");
242}); 292};
243 293
244=item $txn = $fcp->txn_generate_svk_pair 294_txn watch_global => sub {
295 my ($self, $cv, $enabled, $verbosity_mask) = @_;
245 296
246=item ($public, $private, $crypto) = @{ $fcp->generate_svk_pair } 297 $self->send_msg (watch_global =>
298 enabled => $enabled ? "true" : "false",
299 defined $verbosity_mask ? (verbosity_mask => $verbosity_mask+0) : (),
300 );
247 301
248Creates a new SVK pair. Returns an arrayref with the public key, the 302 $cv->();
249private key and a crypto key, which is just additional entropy.
250
251 [
252 "acLx4dux9fvvABH15Gk6~d3I-yw",
253 "cPoDkDMXDGSMM32plaPZDhJDxSs",
254 "BH7LXCov0w51-y9i~BoB3g",
255 ]
256
257A private key (for inserting) can be constructed like this:
258
259 SSK@<private_key>,<crypto_key>/<name>
260
261It can be used to insert data. The corresponding public key looks like this:
262
263 SSK@<public_key>PAgM,<crypto_key>/<name>
264
265Watch out for the C<PAgM>-part!
266
267=cut
268
269$txn->(generate_svk_pair => sub {
270 my ($self) = @_;
271
272 $self->txn ("generate_svk_pair");
273}); 303};
274 304
275=item $txn = $fcp->txn_invert_private_key ($private) 305_txn list_persistent_requests => sub {
276
277=item $public = $fcp->invert_private_key ($private)
278
279Inverts a private key (returns the public key). C<$private> can be either
280an insert URI (must start with C<freenet:SSK@>) or a raw private key (i.e.
281the private value you get back from C<generate_svk_pair>).
282
283Returns the public key.
284
285=cut
286
287$txn->(invert_private_key => sub {
288 my ($self, $privkey) = @_;
289
290 $self->txn (invert_private_key => private => $privkey);
291});
292
293=item $txn = $fcp->txn_get_size ($uri)
294
295=item $length = $fcp->get_size ($uri)
296
297Finds and returns the size (rounded up to the nearest power of two) of the
298given document.
299
300=cut
301
302$txn->(get_size => sub {
303 my ($self, $uri) = @_; 306 my ($self, $cv) = @_;
304 307
305 $self->txn (get_size => URI => $uri); 308 my %res;
306});
307 309
308=item $txn = $fcp->txn_client_get ($uri [, $htl = 15 [, $removelocal = 0]]) 310 $self->send_msg ("list_persistent_requests");
309 311
310=item ($metadata, $data) = @{ $fcp->client_get ($uri, $htl, $removelocal) 312 push @{ $self->{queue} }, sub {
313 my ($self, $type, $kv, $rdata) = @_;
311 314
312Fetches a (small, as it should fit into memory) key content block from 315 if ($type eq "end_list_persistent_requests") {
313freenet. C<$meta> is a C<Net::FCP::Metadata> object or C<undef>). 316 $cv->(\%res);
317 1
318 } else {
319 my $id = $kv->{identifier};
314 320
315The C<$uri> should begin with C<freenet:>, but the scheme is currently 321 if ($type =~ /^persistent_(get|put|put_dir)$/) {
316added, if missing. 322 $res{$id} = {
317 323 type => $1,
318 my ($meta, $data) = @{ 324 %{ $res{$id} },
319 $fcp->client_get ( 325 %$kv,
320 "freenet:CHK@hdXaxkwZ9rA8-SidT0AN-bniQlgPAwI,XdCDmBuGsd-ulqbLnZ8v~w" 326 };
327 } elsif ($type eq "simple_progress") {
328 delete $kv->{pkt_type}; # save memory
329 push @{ $res{delete $kv->{identifier}}{simple_progress} }, $kv;
330 } else {
331 $res{delete $kv->{identifier}}{delete $kv->{pkt_type}} = $kv;
332 }
333 0
321 ) 334 }
322 }; 335 };
323
324=cut
325
326$txn->(client_get => sub {
327 my ($self, $uri, $htl, $removelocal) = @_;
328
329 $uri =~ s/^freenet://; $uri = "freenet:$uri";
330
331 $self->txn (client_get => URI => $uri, hops_to_live => xeh (defined $htl ? $htl : 15),
332 remove_local_key => $removelocal ? "true" : "false");
333}); 336};
334 337
335=item $txn = $fcp->txn_client_put ($uri, $metadata, $data, $htl, $removelocal) 338_txn remove_request => sub {
339 my ($self, $cv, $global, $identifier) = @_;
336 340
337=item my $uri = $fcp->client_put ($uri, $metadata, $data, $htl, $removelocal); 341 $self->send_msg (remove_request =>
342 global => $global ? "true" : "false",
343 identifier => $identifier,
344 id_cb => sub {
345 my ($self, $type, $kv, $rdata) = @_;
338 346
339Insert a new key. If the client is inserting a CHK, the URI may be 347 $cv->($kv);
340abbreviated as just CHK@. In this case, the node will calculate the 348 1
341CHK. If the key is a private SSK key, the node will calculcate the public 349 },
342key and the resulting public URI. 350 );
343 351
344C<$meta> can be a hash reference (same format as returned by 352 $cv->();
345C<Net::FCP::parse_metadata>) or a string.
346
347The result is an arrayref with the keys C<uri>, C<public_key> and C<private_key>.
348
349=cut
350
351$txn->(client_put => sub {
352 my ($self, $uri, $metadata, $data, $htl, $removelocal) = @_;
353
354 $metadata = Net::FCP::Metadata::build_metadata $metadata;
355 $uri =~ s/^freenet://; $uri = "freenet:$uri";
356
357 $self->txn (client_put => URI => $uri,
358 hops_to_live => xeh (defined $htl ? $htl : 15),
359 remove_local_key => $removelocal ? "true" : "false",
360 data => "$metadata$data", metadata_length => xeh length $metadata);
361}); 353};
362 354
363} # transactions 355_txn modify_persistent_request => sub {
356 my ($self, $cv, $global, $identifier, $client_token, $priority_class) = @_;
357
358 $self->send_msg (modify_persistent_request =>
359 global => $global ? "true" : "false",
360 identifier => $identifier,
361 defined $client_token ? (client_token => $client_token ) : (),
362 defined $priority_class ? (priority_class => $priority_class) : (),
363 id_cb => sub {
364 my ($self, $type, $kv, $rdata) = @_;
365
366 $cv->($kv);
367 1
368 },
369 );
370
371 $cv->();
372};
373
374_txn get_plugin_info => sub {
375 my ($self, $cv, $name, $detailed) = @_;
376
377 $self->send_msg (get_plugin_info =>
378 plugin_name => $name,
379 detailed => $detailed ? "true" : "false",
380 id_cb => sub {
381 my ($self, $type, $kv, $rdata) = @_;
382
383 $cv->($kv);
384 1
385 },
386 );
387
388 $cv->();
389};
364 390
365=back 391=back
366 392
367=head2 THE Net::FCP::Txn CLASS
368
369All requests (or transactions) are executed in a asynchronous way. For
370each request, a C<Net::FCP::Txn> object is created (worse: a tcp
371connection is created, too).
372
373For each request there is actually a different subclass (and it's possible
374to subclass these, although of course not documented).
375
376The most interesting method is C<result>.
377
378=over 4
379
380=cut
381
382package Net::FCP::Txn;
383
384use Fcntl;
385use Socket;
386
387=item new arg => val,...
388
389Creates a new C<Net::FCP::Txn> object. Not normally used.
390
391=cut
392
393sub new {
394 my $class = shift;
395 my $self = bless { @_ }, $class;
396
397 $self->{signal} = AnyEvent->condvar;
398
399 $self->{fcp}{txn}{$self} = $self;
400
401 my $attr = "";
402 my $data = delete $self->{attr}{data};
403
404 while (my ($k, $v) = each %{$self->{attr}}) {
405 $attr .= (Net::FCP::touc $k) . "=$v\012"
406 }
407
408 if (defined $data) {
409 $attr .= sprintf "DataLength=%x\012", length $data;
410 $data = "Data\012$data";
411 } else {
412 $data = "EndMessage\012";
413 }
414
415 socket my $fh, PF_INET, SOCK_STREAM, 0
416 or Carp::croak "unable to create new tcp socket: $!";
417 binmode $fh, ":raw";
418 fcntl $fh, F_SETFL, O_NONBLOCK;
419 connect $fh, (sockaddr_in $self->{fcp}{port}, inet_aton $self->{fcp}{host});
420# and Carp::croak "FCP::txn: unable to connect to $self->{fcp}{host}:$self->{fcp}{port}: $!\n";
421
422 $self->{sbuf} =
423 "\x00\x00\x00\x02"
424 . (Net::FCP::touc $self->{type})
425 . "\012$attr$data";
426
427 #shutdown $fh, 1; # freenet buggy?, well, it's java...
428
429 $self->{fh} = $fh;
430
431 $self->{w} = AnyEvent->io (fh => $fh, poll => 'w', cb => sub { $self->fh_ready_w });
432
433 $self;
434}
435
436=item $txn = $txn->cb ($coderef)
437
438Sets a callback to be called when the request is finished. The coderef
439will be called with the txn as it's sole argument, so it has to call
440C<result> itself.
441
442Returns the txn object, useful for chaining.
443
444Example:
445
446 $fcp->txn_client_get ("freenet:CHK....")
447 ->userdata ("ehrm")
448 ->cb(sub {
449 my $data = shift->result;
450 });
451
452=cut
453
454sub cb($$) {
455 my ($self, $cb) = @_;
456 $self->{cb} = $cb;
457 $self;
458}
459
460=item $txn = $txn->userdata ([$userdata])
461
462Set user-specific data. This is useful in progress callbacks. The data can be accessed
463using C<< $txn->{userdata} >>.
464
465Returns the txn object, useful for chaining.
466
467=cut
468
469sub userdata($$) {
470 my ($self, $data) = @_;
471 $self->{userdata} = $data;
472 $self;
473}
474
475=item $txn->cancel (%attr)
476
477Cancels the operation with a C<cancel> exception and the given attributes
478(consider at least giving the attribute C<reason>).
479
480UNTESTED.
481
482=cut
483
484sub cancel {
485 my ($self, %attr) = @_;
486 $self->throw (Net::FCP::Exception->new (cancel => { %attr }));
487 $self->set_result;
488 $self->eof;
489}
490
491sub fh_ready_w {
492 my ($self) = @_;
493
494 my $len = syswrite $self->{fh}, $self->{sbuf};
495
496 if ($len > 0) {
497 substr $self->{sbuf}, 0, $len, "";
498 unless (length $self->{sbuf}) {
499 fcntl $self->{fh}, F_SETFL, 0;
500 $self->{w} = AnyEvent->io (fh => $self->{fh}, poll => 'r', cb => sub { $self->fh_ready_r });
501 }
502 } elsif (defined $len) {
503 $self->throw (Net::FCP::Exception->new (network_error => { reason => "unexpected end of file while writing" }));
504 } else {
505 $self->throw (Net::FCP::Exception->new (network_error => { reason => "$!" }));
506 }
507}
508
509sub fh_ready_r {
510 my ($self) = @_;
511
512 if (sysread $self->{fh}, $self->{buf}, 16384 + 1024, length $self->{buf}) {
513 for (;;) {
514 if ($self->{datalen}) {
515 #warn "expecting new datachunk $self->{datalen}, got ".(length $self->{buf})."\n";#d#
516 if (length $self->{buf} >= $self->{datalen}) {
517 $self->rcv_data (substr $self->{buf}, 0, delete $self->{datalen}, "");
518 } else {
519 last;
520 }
521 } elsif ($self->{buf} =~ s/^DataChunk\015?\012Length=([0-9a-fA-F]+)\015?\012Data\015?\012//) {
522 $self->{datalen} = hex $1;
523 #warn "expecting new datachunk $self->{datalen}\n";#d#
524 } elsif ($self->{buf} =~ s/^([a-zA-Z]+)\015?\012(?:(.+?)\015?\012)?EndMessage\015?\012//s) {
525 $self->rcv ($1, {
526 map { my ($a, $b) = split /=/, $_, 2; ((Net::FCP::tolc $a), $b) }
527 split /\015?\012/, $2
528 });
529 } else {
530 last;
531 }
532 }
533 } else {
534 $self->eof;
535 }
536}
537
538sub rcv {
539 my ($self, $type, $attr) = @_;
540
541 $type = Net::FCP::tolc $type;
542
543 #use PApp::Util; warn PApp::Util::dumpval [$type, $attr];
544
545 if (my $method = $self->can("rcv_$type")) {
546 $method->($self, $attr, $type);
547 } else {
548 warn "received unexpected reply type '$type' for '$self->{type}', ignoring\n";
549 }
550}
551
552# used as a default exception thrower
553sub rcv_throw_exception {
554 my ($self, $attr, $type) = @_;
555 $self->throw (Net::FCP::Exception->new ($type, $attr));
556}
557
558*rcv_failed = \&Net::FCP::Txn::rcv_throw_exception;
559*rcv_format_error = \&Net::FCP::Txn::rcv_throw_exception;
560
561sub throw {
562 my ($self, $exc) = @_;
563
564 $self->{exception} = $exc;
565 $self->set_result;
566 $self->eof; # must be last to avoid loops
567}
568
569sub set_result {
570 my ($self, $result) = @_;
571
572 unless (exists $self->{result}) {
573 $self->{result} = $result;
574 $self->{cb}->($self) if exists $self->{cb};
575 $self->{signal}->broadcast;
576 }
577}
578
579sub eof {
580 my ($self) = @_;
581
582 delete $self->{w};
583 delete $self->{fh};
584
585 delete $self->{fcp}{txn}{$self};
586
587 unless (exists $self->{result}) {
588 $self->throw (Net::FCP::Exception->new (short_data => {
589 reason => "unexpected eof or internal node error",
590 }));
591 }
592}
593
594sub progress {
595 my ($self, $type, $attr) = @_;
596
597 $self->{fcp}->progress ($self, $type, $attr);
598}
599
600=item $result = $txn->result
601
602Waits until a result is available and then returns it.
603
604This waiting is (depending on your event model) not very efficient, as it
605is done outside the "mainloop". The biggest problem, however, is that it's
606blocking one thread of execution. Try to use the callback mechanism, if
607possible, and call result from within the callback (or after is has been
608run), as then no waiting is necessary.
609
610=cut
611
612sub result {
613 my ($self) = @_;
614
615 $self->{signal}->wait while !exists $self->{result};
616
617 die $self->{exception} if $self->{exception};
618
619 return $self->{result};
620}
621
622package Net::FCP::Txn::ClientHello;
623
624use base Net::FCP::Txn;
625
626sub rcv_node_hello {
627 my ($self, $attr) = @_;
628
629 $self->set_result ($attr);
630}
631
632package Net::FCP::Txn::ClientInfo;
633
634use base Net::FCP::Txn;
635
636sub rcv_node_info {
637 my ($self, $attr) = @_;
638
639 $self->set_result ($attr);
640}
641
642package Net::FCP::Txn::GenerateCHK;
643
644use base Net::FCP::Txn;
645
646sub rcv_success {
647 my ($self, $attr) = @_;
648
649 $self->set_result ($attr->{uri});
650}
651
652package Net::FCP::Txn::GenerateSVKPair;
653
654use base Net::FCP::Txn;
655
656sub rcv_success {
657 my ($self, $attr) = @_;
658 $self->set_result ([$attr->{public_key}, $attr->{private_key}, $attr->{crypto_key}]);
659}
660
661package Net::FCP::Txn::InvertPrivateKey;
662
663use base Net::FCP::Txn;
664
665sub rcv_success {
666 my ($self, $attr) = @_;
667 $self->set_result ($attr->{public_key});
668}
669
670package Net::FCP::Txn::GetSize;
671
672use base Net::FCP::Txn;
673
674sub rcv_success {
675 my ($self, $attr) = @_;
676 $self->set_result (hex $attr->{length});
677}
678
679package Net::FCP::Txn::GetPut;
680
681# base class for get and put
682
683use base Net::FCP::Txn;
684
685*rcv_uri_error = \&Net::FCP::Txn::rcv_throw_exception;
686*rcv_route_not_found = \&Net::FCP::Txn::rcv_throw_exception;
687
688sub rcv_restarted {
689 my ($self, $attr, $type) = @_;
690
691 delete $self->{datalength};
692 delete $self->{metalength};
693 delete $self->{data};
694
695 $self->progress ($type, $attr);
696}
697
698package Net::FCP::Txn::ClientGet;
699
700use base Net::FCP::Txn::GetPut;
701
702*rcv_data_not_found = \&Net::FCP::Txn::rcv_throw_exception;
703
704sub rcv_data {
705 my ($self, $chunk) = @_;
706
707 $self->{data} .= $chunk;
708
709 $self->progress ("data", { chunk => length $chunk, received => length $self->{data}, total => $self->{datalength} });
710
711 if ($self->{datalength} == length $self->{data}) {
712 my $data = delete $self->{data};
713 my $meta = new Net::FCP::Metadata (substr $data, 0, $self->{metalength}, "");
714
715 $self->set_result ([$meta, $data]);
716 $self->eof;
717 }
718}
719
720sub rcv_data_found {
721 my ($self, $attr, $type) = @_;
722
723 $self->progress ($type, $attr);
724
725 $self->{datalength} = hex $attr->{data_length};
726 $self->{metalength} = hex $attr->{metadata_length};
727}
728
729package Net::FCP::Txn::ClientPut;
730
731use base Net::FCP::Txn::GetPut;
732
733*rcv_size_error = \&Net::FCP::Txn::rcv_throw_exception;
734
735sub rcv_pending {
736 my ($self, $attr, $type) = @_;
737 $self->progress ($type, $attr);
738}
739
740sub rcv_success {
741 my ($self, $attr, $type) = @_;
742 $self->set_result ($attr);
743}
744
745sub rcv_key_collision {
746 my ($self, $attr, $type) = @_;
747 $self->set_result ({ key_collision => 1, %$attr });
748}
749
750=back
751
752=head2 The Net::FCP::Exception CLASS
753
754Any unexpected (non-standard) responses that make it impossible to return
755the advertised result will result in an exception being thrown when the
756C<result> method is called.
757
758These exceptions are represented by objects of this class.
759
760=over 4
761
762=cut
763
764package Net::FCP::Exception;
765
766use overload
767 '""' => sub {
768 "Net::FCP::Exception<<$_[0][0]," . (join ":", %{$_[0][1]}) . ">>";
769 };
770
771=item $exc = new Net::FCP::Exception $type, \%attr
772
773Create a new exception object of the given type (a string like
774C<route_not_found>), and a hashref containing additional attributes
775(usually the attributes of the message causing the exception).
776
777=cut
778
779sub new {
780 my ($class, $type, $attr) = @_;
781
782 bless [Net::FCP::tolc $type, { %$attr }], $class;
783}
784
785=item $exc->type([$type])
786
787With no arguments, returns the exception type. Otherwise a boolean
788indicating wether the exception is of the given type is returned.
789
790=cut
791
792sub type {
793 my ($self, $type) = @_;
794
795 @_ >= 2
796 ? $self->[0] eq $type
797 : $self->[0];
798}
799
800=item $exc->attr([$attr])
801
802With no arguments, returns the attributes. Otherwise the named attribute
803value is returned.
804
805=cut
806
807sub attr {
808 my ($self, $attr) = @_;
809
810 @_ >= 2
811 ? $self->[1]{$attr}
812 : $self->[1];
813}
814
815=back
816
817=head1 SEE ALSO 393=head1 SEE ALSO
818 394
819L<http://freenet.sf.net>. 395L<http://wiki.freenetproject.org/FreenetFCPSpec2Point0>, L<Net::FCP>.
820 396
821=head1 BUGS 397=head1 BUGS
822 398
823=head1 AUTHOR 399=head1 AUTHOR
824 400

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines