ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Net-FCP/FCP.pm
(Generate patch)

Comparing Net-FCP/FCP.pm (file contents):
Revision 1.6 by root, Mon Sep 8 00:36:44 2003 UTC vs.
Revision 1.13 by root, Wed Sep 10 05:06:16 2003 UTC

17of what the messages do. I am too lazy to document all this here. 17of what the messages do. I am too lazy to document all this here.
18 18
19=head1 WARNING 19=head1 WARNING
20 20
21This module is alpha. While it probably won't destroy (much :) of your 21This module is alpha. While it probably won't destroy (much :) of your
22data, it currently works only with the Event module (alkthough the event 22data, it currently falls short of what it should provide (intelligent uri
23mechanism is fully pluggable). 23following, splitfile downloads, healing...)
24
25=head2 IMPORT TAGS
26
27Nothing much can be "imported" from this module right now. There are,
28however, certain "import tags" that can be used to select the event model
29to be used.
30
31Event models are implemented as modules under the C<Net::FCP::Event::xyz>
32class, where C<xyz> is the event model to use. The default is C<Event> (or
33later C<Auto>).
34
35The import tag to use is named C<event=xyz>, e.g. C<event=Event>,
36C<event=Glib> etc.
37
38You should specify the event module to use only in the main program.
24 39
25=head2 THE Net::FCP CLASS 40=head2 THE Net::FCP CLASS
26 41
27=over 4 42=over 4
28 43
29=cut 44=cut
30 45
31package Net::FCP; 46package Net::FCP;
32 47
33use Carp; 48use Carp;
34use IO::Socket::INET;
35 49
36$VERSION = 0.03; 50$VERSION = 0.05;
37 51
38sub event_reg_cb { 52no warnings;
39 my ($obj) = @_;
40 require Event;
41 53
42 $obj->{eventdata} = Event->io ( 54our $EVENT = Net::FCP::Event::Auto::;
43 fd => $obj->{fh}, 55$EVENT = Net::FCP::Event::Event;#d#
44 poll => 'r', 56
45 cb => sub { 57sub import {
46 $obj->fh_ready; 58 shift;
59
60 for (@_) {
61 if (/^event=(\w+)$/) {
62 $EVENT = "Net::FCP::Event::$1";
47 }, 63 }
48 ); 64 }
65 eval "require $EVENT";
66 die $@ if $@;
49} 67}
50
51sub event_unreg_cb {
52 $_[0]{eventdata}
53 and (delete $_[0]{eventdata})->cancel;
54}
55
56sub event_wait_cb {
57 Event::one_event();
58}
59
60$regcb = \&event_reg_cb;
61$unregcb = \&event_unreg_cb;
62$waitcb = \&event_wait_cb;
63 68
64sub touc($) { 69sub touc($) {
65 local $_ = shift; 70 local $_ = shift;
66 1 while s/((?:^|_)(?:svk|chk|uri)(?:_|$))/\U$1/; 71 1 while s/((?:^|_)(?:svk|chk|uri)(?:_|$))/\U$1/;
67 s/(?:^|_)(.)/\U$1/g; 72 s/(?:^|_)(.)/\U$1/g;
72 local $_ = shift; 77 local $_ = shift;
73 s/(?<=[a-z])(?=[A-Z])/_/g; 78 s/(?<=[a-z])(?=[A-Z])/_/g;
74 lc $_; 79 lc $_;
75} 80}
76 81
82=item $meta = Net::FCP::parse_metadata $string
83
84Parse a metadata string and return it.
85
86The metadata will be a hashref with key C<version> (containing
87the mandatory version header entries).
88
89All other headers are represented by arrayrefs (they can be repeated).
90
91Since this is confusing, here is a rather verbose example of a parsed
92manifest:
93
94 (
95 version => { revision => 1 },
96 document => [
97 {
98 "info.format" => "image/jpeg",
99 name => "background.jpg",
100 "redirect.target" => "freenet:CHK\@ZcagI,ra726bSw"
101 },
102 {
103 "info.format" => "text/html",
104 name => ".next",
105 "redirect.target" => "freenet:SSK\@ilUPAgM/TFEE/3"
106 },
107 {
108 "info.format" => "text/html",
109 "redirect.target" => "freenet:CHK\@8M8Po8ucwI,8xA"
110 }
111 ]
112 )
113
114=cut
115
116sub parse_metadata {
117 my $meta;
118
119 my $data = shift;
120 if ($data =~ /^Version\015?\012/gc) {
121 my $hdr = $meta->{version} = {};
122
123 for (;;) {
124 while ($data =~ /\G([^=\015\012]+)=([^\015\012]*)\015?\012/gc) {
125 my ($k, $v) = ($1, $2);
126 my @p = split /\./, tolc $k, 3;
127
128 $hdr->{$p[0]} = $v if @p == 1; # lamest code I ever wrote
129 $hdr->{$p[0]}{$p[1]} = $v if @p == 2;
130 $hdr->{$p[0]}{$p[1]}{$p[3]} = $v if @p == 3;
131 die "FATAL: 4+ dot metadata" if @p >= 4;
132 }
133
134 if ($data =~ /\GEndPart\015?\012/gc) {
135 # nop
136 } elsif ($data =~ /\GEnd\015?\012/gc) {
137 last;
138 } elsif ($data =~ /\G([A-Za-z0-9.\-]+)\015?\012/gcs) {
139 push @{$meta->{tolc $1}}, $hdr = {};
140 } elsif ($data =~ /\G(.*)/gcs) {
141 die "metadata format error ($1)";
142 }
143 }
144 }
145
146 #$meta->{tail} = substr $data, pos $data;
147
148 $meta;
149}
150
77=item $fcp = new Net::FCP [host => $host][, port => $port] 151=item $fcp = new Net::FCP [host => $host][, port => $port]
78 152
79Create a new virtual FCP connection to the given host and port (default 153Create a new virtual FCP connection to the given host and port (default
80127.0.0.1:8481, or the environment variables C<FREDHOST> and C<FREDPORT>). 154127.0.0.1:8481, or the environment variables C<FREDHOST> and C<FREDPORT>).
81 155
88sub new { 162sub new {
89 my $class = shift; 163 my $class = shift;
90 my $self = bless { @_ }, $class; 164 my $self = bless { @_ }, $class;
91 165
92 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 166 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1";
93 $self->{port} ||= $ENV{FREDPORt} || 8481; 167 $self->{port} ||= $ENV{FREDPORT} || 8481;
94 168
95 $self->{nodehello} = $self->client_hello 169 #$self->{nodehello} = $self->client_hello
96 or croak "unable to get nodehello from node\n"; 170 # or croak "unable to get nodehello from node\n";
97 171
98 $self; 172 $self;
99} 173}
100 174
175sub progress {
176 my ($self, $txn, $type, $attr) = @_;
177 warn "progress<$txn,$type," . (join ":", %$attr) . ">\n";
178}
179
101=item $txn = $fcp->txn(type => attr => val,...) 180=item $txn = $fcp->txn(type => attr => val,...)
102 181
103The low-level interface to transactions. Don't use it. 182The low-level interface to transactions. Don't use it.
183
184Here are some examples of using transactions:
185
186The blocking case, no (visible) transactions involved:
187
188 my $nodehello = $fcp->client_hello;
189
190A transaction used in a blocking fashion:
191
192 my $txn = $fcp->txn_client_hello;
193 ...
194 my $nodehello = $txn->result;
195
196Or shorter:
197
198 my $nodehello = $fcp->txn_client_hello->result;
199
200Setting callbacks:
201
202 $fcp->txn_client_hello->cb(
203 sub { my $nodehello => $_[0]->result }
204 );
104 205
105=cut 206=cut
106 207
107sub txn { 208sub txn {
108 my ($self, $type, %attr) = @_; 209 my ($self, $type, %attr) = @_;
187=cut 288=cut
188 289
189_txn generate_chk => sub { 290_txn generate_chk => sub {
190 my ($self, $metadata, $data) = @_; 291 my ($self, $metadata, $data) = @_;
191 292
192 $self->txn (generate_chk => data => "$data$metadata", meta_data_length => length $metadata); 293 $self->txn (generate_chk => data => "$data$metadata", metadata_length => length $metadata);
193}; 294};
194 295
195=item $txn = $fcp->txn_generate_svk_pair 296=item $txn = $fcp->txn_generate_svk_pair
196 297
197=item ($public, $private) = @{ $fcp->generate_svk_pair } 298=item ($public, $private) = @{ $fcp->generate_svk_pair }
248 $self->txn (get_size => URI => $uri); 349 $self->txn (get_size => URI => $uri);
249}; 350};
250 351
251=item $txn = $fcp->txn_client_get ($uri [, $htl = 15 [, $removelocal = 0]]) 352=item $txn = $fcp->txn_client_get ($uri [, $htl = 15 [, $removelocal = 0]])
252 353
253=item ($data, $metadata) = @{ $fcp->client_get ($uri, $htl, $removelocal) 354=item ($metadata, $data) = @{ $fcp->client_get ($uri, $htl, $removelocal)
254 355
255Fetches a (small, as it should fit into memory) file from freenet. 356Fetches a (small, as it should fit into memory) file from
357freenet. C<$meta> is the metadata (as returned by C<parse_metadata> or
358C<undef>).
256 359
257Due to the overhead, a better method to download big fiels should be used. 360Due to the overhead, a better method to download big files should be used.
258 361
259 my ($data, $meta) = @{ 362 my ($meta, $data) = @{
260 $fcp->client_get ( 363 $fcp->client_get (
261 "freenet:CHK@hdXaxkwZ9rA8-SidT0AN-bniQlgPAwI,XdCDmBuGsd-ulqbLnZ8v~w" 364 "freenet:CHK@hdXaxkwZ9rA8-SidT0AN-bniQlgPAwI,XdCDmBuGsd-ulqbLnZ8v~w"
262 ) 365 )
263 }; 366 };
264 367
265=cut 368=cut
266 369
267_txn client_get => sub { 370_txn client_get => sub {
268 my ($self, $uri, $htl, $removelocal) = @_; 371 my ($self, $uri, $htl, $removelocal) = @_;
269 372
270 $self->txn (client_get => URI => $uri, hops_to_live => ($htl || 15), remove_local => $removelocal*1); 373 $self->txn (client_get => URI => $uri, hops_to_live => ($htl || 15), remove_local_key => $removelocal ? "true" : "false");
271}; 374};
272 375
273=item MISSING: ClientPut 376=item MISSING: ClientPut
274 377
275=back 378=back
289 392
290=cut 393=cut
291 394
292package Net::FCP::Txn; 395package Net::FCP::Txn;
293 396
397use Fcntl;
398use Socket;
399
294=item new arg => val,... 400=item new arg => val,...
295 401
296Creates a new C<Net::FCP::Txn> object. Not normally used. 402Creates a new C<Net::FCP::Txn> object. Not normally used.
297 403
298=cut 404=cut
299 405
300sub new { 406sub new {
301 my $class = shift; 407 my $class = shift;
302 my $self = bless { @_ }, $class; 408 my $self = bless { @_ }, $class;
409
410 $self->{signal} = $EVENT->new_signal;
411
412 $self->{fcp}{txn}{$self} = $self;
303 413
304 my $attr = ""; 414 my $attr = "";
305 my $data = delete $self->{attr}{data}; 415 my $data = delete $self->{attr}{data};
306 416
307 while (my ($k, $v) = each %{$self->{attr}}) { 417 while (my ($k, $v) = each %{$self->{attr}}) {
313 $data = "Data\012$data"; 423 $data = "Data\012$data";
314 } else { 424 } else {
315 $data = "EndMessage\012"; 425 $data = "EndMessage\012";
316 } 426 }
317 427
318 my $fh = new IO::Socket::INET 428 socket my $fh, PF_INET, SOCK_STREAM, 0
319 PeerHost => $self->{fcp}{host}, 429 or Carp::croak "unable to create new tcp socket: $!";
320 PeerPort => $self->{fcp}{port}
321 or Carp::croak "FCP::txn: unable to connect to $self->{fcp}{host}:$self->{fcp}{port}: $!\n";
322
323 binmode $fh, ":raw"; 430 binmode $fh, ":raw";
431 fcntl $fh, F_SETFL, O_NONBLOCK;
432 connect $fh, (sockaddr_in $self->{fcp}{port}, inet_aton $self->{fcp}{host})
433 and !$!{EWOULDBLOCK}
434 and !$!{EINPROGRESS}
435 and Carp::croak "FCP::txn: unable to connect to $self->{fcp}{host}:$self->{fcp}{port}: $!\n";
324 436
325 if (0) { 437 $self->{sbuf} =
326 print 438 "\x00\x00\x00\x02"
327 Net::FCP::touc $self->{type}, "\012",
328 $attr,
329 $data, "\012";
330 }
331
332 print $fh
333 "\x00\x00", "\x00\x02", # SESSID, PRESID
334 Net::FCP::touc $self->{type}, "\012", 439 . Net::FCP::touc $self->{type}
335 $attr, 440 . "\012$attr$data";
336 $data;
337 441
338 #$fh->shutdown (1); # freenet buggy?, well, it's java... 442 #$fh->shutdown (1); # freenet buggy?, well, it's java...
339 443
340 $self->{fh} = $fh; 444 $self->{fh} = $fh;
341 445
342 $Net::FCP::regcb->($self); 446 $self->{w} = $EVENT->new_from_fh ($fh)->cb(sub { $self->fh_ready_w })->poll(0, 1, 1);
343 447
344 $self; 448 $self;
345} 449}
346 450
451=item $txn = $txn->cb ($coderef)
452
453Sets a callback to be called when the request is finished. The coderef
454will be called with the txn as it's sole argument, so it has to call
455C<result> itself.
456
457Returns the txn object, useful for chaining.
458
459Example:
460
461 $fcp->txn_client_get ("freenet:CHK....")
462 ->userdata ("ehrm")
463 ->cb(sub {
464 my $data = shift->result;
465 });
466
467=cut
468
469sub cb($$) {
470 my ($self, $cb) = @_;
471 $self->{cb} = $cb;
472 $self;
473}
474
475=item $txn = $txn->userdata ([$userdata])
476
477Set user-specific data. This is useful in progress callbacks. The data can be accessed
478using C<< $txn->{userdata} >>.
479
480Returns the txn object, useful for chaining.
481
482=cut
483
484sub userdata($$) {
485 my ($self, $data) = @_;
486 $self->{userdata} = $data;
487 $self;
488}
489
347sub fh_ready { 490sub fh_ready_w {
491 my ($self) = @_;
492
493 my $len = syswrite $self->{fh}, $self->{sbuf};
494
495 if ($len > 0) {
496 substr $self->{sbuf}, 0, $len, "";
497 unless (length $self->{sbuf}) {
498 fcntl $self->{fh}, F_SETFL, 0;
499 $self->{w}->cb(sub { $self->fh_ready_r })->poll (1, 0, 1);
500 }
501 } elsif (defined $len) {
502 $self->throw (Net::FCP::Exception->new (network_error => { reason => "unexpected end of file while writing" }));
503 } else {
504 $self->throw (Net::FCP::Exception->new (network_error => { reason => "$!" }));
505 }
506}
507
508sub fh_ready_r {
348 my ($self) = @_; 509 my ($self) = @_;
349 510
350 if (sysread $self->{fh}, $self->{buf}, 65536, length $self->{buf}) { 511 if (sysread $self->{fh}, $self->{buf}, 65536, length $self->{buf}) {
351 for (;;) { 512 for (;;) {
352 if ($self->{datalen}) { 513 if ($self->{datalen}) {
514 #warn "expecting new datachunk $self->{datalen}, got ".(length $self->{buf})."\n";#d#
353 if (length $self->{buf} >= $self->{datalen}) { 515 if (length $self->{buf} >= $self->{datalen}) {
354 $self->rcv_data (substr $self->{buf}, 0, $self->{datalen}, ""); 516 $self->rcv_data (substr $self->{buf}, 0, delete $self->{datalen}, "");
355 } else { 517 } else {
356 last; 518 last;
357 } 519 }
358 } elsif ($self->{buf} =~ s/^DataChunk\015?\012Length=([0-9a-fA-F]+)\015?\012Data\015?\012//) { 520 } elsif ($self->{buf} =~ s/^DataChunk\015?\012Length=([0-9a-fA-F]+)\015?\012Data\015?\012//) {
359 $self->{datalen} = hex $1; 521 $self->{datalen} = hex $1;
522 #warn "expecting new datachunk $self->{datalen}\n";#d#
360 } elsif ($self->{buf} =~ s/^([a-zA-Z]+)\015?\012(.*?)\015?\012EndMessage\015?\012//s) { 523 } elsif ($self->{buf} =~ s/^([a-zA-Z]+)\015?\012(?:(.+?)\015?\012)?EndMessage\015?\012//s) {
361 $self->rcv ($1, { 524 $self->rcv ($1, {
362 map { my ($a, $b) = split /=/, $_, 2; ((Net::FCP::tolc $a), $b) } 525 map { my ($a, $b) = split /=/, $_, 2; ((Net::FCP::tolc $a), $b) }
363 split /\015?\012/, $2 526 split /\015?\012/, $2
364 }); 527 });
365 } else { 528 } else {
366 last; 529 last;
367 } 530 }
368 } 531 }
369 } else { 532 } else {
370 $Net::FCP::unregcb->($self);
371 delete $self->{fh};
372 $self->eof; 533 $self->eof;
373 } 534 }
374} 535}
375 536
376sub rcv_data { 537sub rcv_data {
377 my ($self, $chunk) = @_; 538 my ($self, $chunk) = @_;
378 539
379 $self->{data} .= $chunk; 540 $self->{data} .= $chunk;
541
542 $self->progress ("data", { chunk => length $chunk, total => length $self->{data}, end => $self->{datalength} });
380} 543}
381 544
382sub rcv { 545sub rcv {
383 my ($self, $type, $attr) = @_; 546 my ($self, $type, $attr) = @_;
384 547
391 } else { 554 } else {
392 warn "received unexpected reply type '$type' for '$self->{type}', ignoring\n"; 555 warn "received unexpected reply type '$type' for '$self->{type}', ignoring\n";
393 } 556 }
394} 557}
395 558
559# used as a default exception thrower
560sub rcv_throw_exception {
561 my ($self, $attr, $type) = @_;
562 $self->throw (new Net::FCP::Exception $type, $attr);
563}
564
565*rcv_failed = \&Net::FCP::Txn::rcv_throw_exception;
566*rcv_format_error = \&Net::FCP::Txn::rcv_throw_exception;
567
568sub throw {
569 my ($self, $exc) = @_;
570
571 $self->{exception} = $exc;
572 $self->set_result (1);
573 $self->eof; # must be last to avoid loops
574}
575
396sub set_result { 576sub set_result {
397 my ($self, $result) = @_; 577 my ($self, $result) = @_;
398 578
399 $self->{result} = $result unless exists $self->{result}; 579 unless (exists $self->{result}) {
580 $self->{result} = $result;
581 $self->{cb}->($self) if exists $self->{cb};
582 $self->{signal}->send;
583 }
400} 584}
401 585
402sub eof { 586sub eof {
403 my ($self) = @_; 587 my ($self) = @_;
404 $self->set_result; 588
589 delete $self->{w};
590 delete $self->{fh};
591
592 delete $self->{fcp}{txn}{$self};
593
594 $self->set_result; # just in case
595}
596
597sub progress {
598 my ($self, $type, $attr) = @_;
599 $self->{fcp}->progress ($self, $type, $attr);
405} 600}
406 601
407=item $result = $txn->result 602=item $result = $txn->result
408 603
409Waits until a result is available and then returns it. 604Waits until a result is available and then returns it.
414=cut 609=cut
415 610
416sub result { 611sub result {
417 my ($self) = @_; 612 my ($self) = @_;
418 613
419 $Net::FCP::waitcb->() while !exists $self->{result}; 614 $self->{signal}->wait while !exists $self->{result};
615
616 die $self->{exception} if $self->{exception};
420 617
421 return $self->{result}; 618 return $self->{result};
422}
423
424sub DESTROY {
425 $Net::FCP::unregcb->($_[0]);
426} 619}
427 620
428package Net::FCP::Txn::ClientHello; 621package Net::FCP::Txn::ClientHello;
429 622
430use base Net::FCP::Txn; 623use base Net::FCP::Txn;
483 my ($self, $attr) = @_; 676 my ($self, $attr) = @_;
484 677
485 $self->set_result ($attr->{Length}); 678 $self->set_result ($attr->{Length});
486} 679}
487 680
681package Net::FCP::Txn::GetPut;
682
683# base class for get and put
684
685use base Net::FCP::Txn;
686
687*rcv_uri_error = \&Net::FCP::Txn::rcv_throw_exception;
688*rcv_route_not_found = \&Net::FCP::Txn::rcv_throw_exception;
689
690sub rcv_restarted {
691 my ($self, $attr, $type) = @_;
692
693 delete $self->{datalength};
694 delete $self->{metalength};
695 delete $self->{data};
696
697 $self->progress ($type, $attr);
698}
699
488package Net::FCP::Txn::ClientGet; 700package Net::FCP::Txn::ClientGet;
489 701
490use base Net::FCP::Txn; 702use base Net::FCP::Txn::GetPut;
703
704*rcv_data_not_found = \&Net::FCP::Txn::rcv_throw_exception;
491 705
492sub rcv_data_found { 706sub rcv_data_found {
493 my ($self, $attr) = @_; 707 my ($self, $attr, $type) = @_;
708
709 $self->progress ($type, $attr);
494 710
495 $self->{datalength} = hex $attr->{data_length}; 711 $self->{datalength} = hex $attr->{data_length};
496 $self->{metalength} = hex $attr->{meta_data_length}; 712 $self->{metalength} = hex $attr->{metadata_length};
497} 713}
498 714
499sub eof { 715sub eof {
500 my ($self) = @_; 716 my ($self) = @_;
501 #use PApp::Util; warn PApp::Util::dumpval $self; 717
718 if ($self->{datalength} == length $self->{data}) {
502 my $data = delete $self->{data}; 719 my $data = delete $self->{data};
720 my $meta = Net::FCP::parse_metadata substr $data, 0, $self->{metalength}, "";
721
722 $self->set_result ([$meta, $data]);
723 } elsif (!exists $self->{result}) {
724 $self->throw (Net::FCP::Exception->new (short_data => {
725 reason => "unexpected eof or internal node error",
726 received => length $self->{data},
727 expected => $self->{datalength},
728 }));
729 }
730}
731
732package Net::FCP::Txn::ClientPut;
733
734use base Net::FCP::Txn::GetPut;
735
736*rcv_size_error = \&Net::FCP::Txn::rcv_throw_exception;
737*rcv_key_collision = \&Net::FCP::Txn::rcv_throw_exception;
738
739sub rcv_pending {
740 my ($self, $attr, $type) = @_;
741 $self->progress ($type, $attr);
742}
743
744sub rcv_success {
745 my ($self, $attr, $type) = @_;
503 $self->set_result ([ 746 $self->set_result ($attr);
504 (substr $data, 0, $self->{datalength}-$self->{metalength}), 747}
505 (substr $data, $self->{datalength}-$self->{metalength}), 748
506 ]); 749package Net::FCP::Exception;
750
751use overload
752 '""' => sub {
753 "Net::FCP::Exception<<$_[0][0]," . (join ":", %{$_[0][1]}) . ">>\n";
754 };
755
756sub new {
757 my ($class, $type, $attr) = @_;
758
759 bless [Net::FCP::tolc $type, { %$attr }], $class;
507} 760}
508 761
509=back 762=back
510 763
511=head1 SEE ALSO 764=head1 SEE ALSO

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines