ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Net-FCP/FCP.pm
(Generate patch)

Comparing Net-FCP/FCP.pm (file contents):
Revision 1.8 by root, Mon Sep 8 01:47:31 2003 UTC vs.
Revision 1.12 by root, Wed Sep 10 04:50:44 2003 UTC

17of what the messages do. I am too lazy to document all this here. 17of what the messages do. I am too lazy to document all this here.
18 18
19=head1 WARNING 19=head1 WARNING
20 20
21This module is alpha. While it probably won't destroy (much :) of your 21This module is alpha. While it probably won't destroy (much :) of your
22data, it currently works only with the Event module (alkthough the event 22data, it currently falls short of what it should provide (intelligent uri
23mechanism is fully pluggable). 23following, splitfile downloads, healing...)
24
25=head2 IMPORT TAGS
26
27Nothing much can be "imported" from this module right now. There are,
28however, certain "import tags" that can be used to select the event model
29to be used.
30
31Event models are implemented as modules under the C<Net::FCP::Event::xyz>
32class, where C<xyz> is the event model to use. The default is C<Event> (or
33later C<Auto>).
34
35The import tag to use is named C<event=xyz>, e.g. C<event=Event>,
36C<event=Glib> etc.
37
38You should specify the event module to use only in the main program.
24 39
25=head2 THE Net::FCP CLASS 40=head2 THE Net::FCP CLASS
26 41
27=over 4 42=over 4
28 43
29=cut 44=cut
30 45
31package Net::FCP; 46package Net::FCP;
32 47
33use Carp; 48use Carp;
34use IO::Socket::INET;
35 49
36$VERSION = 0.04; 50$VERSION = 0.05;
37 51
38sub event_reg_cb { 52no warnings;
39 my ($obj) = @_;
40 require Event;
41 53
42 $obj->{eventdata} = Event->io ( 54our $EVENT = Net::FCP::Event::Auto::;
43 fd => $obj->{fh}, 55$EVENT = Net::FCP::Event::Event;#d#
44 poll => 'r', 56
45 cb => sub { 57sub import {
46 $obj->fh_ready; 58 shift;
59
60 for (@_) {
61 if (/^event=(\w+)$/) {
62 $EVENT = "Net::FCP::Event::$1";
47 }, 63 }
48 ); 64 }
65 eval "require $EVENT";
66 die $@ if $@;
49} 67}
50
51sub event_unreg_cb {
52 $_[0]{eventdata}
53 and (delete $_[0]{eventdata})->cancel;
54}
55
56sub event_wait_cb {
57 Event::one_event();
58}
59
60$regcb = \&event_reg_cb;
61$unregcb = \&event_unreg_cb;
62$waitcb = \&event_wait_cb;
63 68
64sub touc($) { 69sub touc($) {
65 local $_ = shift; 70 local $_ = shift;
66 1 while s/((?:^|_)(?:svk|chk|uri)(?:_|$))/\U$1/; 71 1 while s/((?:^|_)(?:svk|chk|uri)(?:_|$))/\U$1/;
67 s/(?:^|_)(.)/\U$1/g; 72 s/(?:^|_)(.)/\U$1/g;
116 my $hdr = $meta->{version} = {}; 121 my $hdr = $meta->{version} = {};
117 122
118 for (;;) { 123 for (;;) {
119 while ($data =~ /\G([^=\015\012]+)=([^\015\012]*)\015?\012/gc) { 124 while ($data =~ /\G([^=\015\012]+)=([^\015\012]*)\015?\012/gc) {
120 my ($k, $v) = ($1, $2); 125 my ($k, $v) = ($1, $2);
121 $hdr->{tolc $k} = $v; 126 my @p = split /\./, tolc $k, 3;
127
128 $hdr->{$p[0]} = $v if @p == 1; # lamest code I ever wrote
129 $hdr->{$p[0]}{$p[1]} = $v if @p == 2;
130 $hdr->{$p[0]}{$p[1]}{$p[3]} = $v if @p == 3;
131 die "FATAL: 4+ dot metadata" if @p >= 4;
122 } 132 }
123 133
124 if ($data =~ /\GEndPart\015?\012/gc) { 134 if ($data =~ /\GEndPart\015?\012/gc) {
135 # nop
125 } elsif ($data =~ /\GEnd\015?\012/gc) { 136 } elsif ($data =~ /\GEnd\015?\012/gc) {
126 last; 137 last;
127 } elsif ($data =~ /\G([A-Za-z0-9.\-]+)\015?\012/gcs) { 138 } elsif ($data =~ /\G([A-Za-z0-9.\-]+)\015?\012/gcs) {
128 push @{$meta->{tolc $1}}, $hdr = {}; 139 push @{$meta->{tolc $1}}, $hdr = {};
129 } elsif ($data =~ /\G(.*)/gcs) { 140 } elsif ($data =~ /\G(.*)/gcs) {
151sub new { 162sub new {
152 my $class = shift; 163 my $class = shift;
153 my $self = bless { @_ }, $class; 164 my $self = bless { @_ }, $class;
154 165
155 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 166 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1";
156 $self->{port} ||= $ENV{FREDPORt} || 8481; 167 $self->{port} ||= $ENV{FREDPORT} || 8481;
157 168
158 $self->{nodehello} = $self->client_hello 169 #$self->{nodehello} = $self->client_hello
159 or croak "unable to get nodehello from node\n"; 170 # or croak "unable to get nodehello from node\n";
160 171
161 $self; 172 $self;
162} 173}
163 174
175sub progress {
176 my ($self, $txn, $type, $attr) = @_;
177 warn "progress<$txn,$type," . (join ":", %$attr) . ">\n";
178}
179
164=item $txn = $fcp->txn(type => attr => val,...) 180=item $txn = $fcp->txn(type => attr => val,...)
165 181
166The low-level interface to transactions. Don't use it. 182The low-level interface to transactions. Don't use it.
183
184Here are some examples of using transactions:
185
186The blocking case, no (visible) transactions involved:
187
188 my $nodehello = $fcp->client_hello;
189
190A transaction used in a blocking fashion:
191
192 my $txn = $fcp->txn_client_hello;
193 ...
194 my $nodehello = $txn->result;
195
196Or shorter:
197
198 my $nodehello = $fcp->txn_client_hello->result;
199
200Setting callbacks:
201
202 $fcp->txn_client_hello->cb(
203 sub { my $nodehello => $_[0]->result }
204 );
167 205
168=cut 206=cut
169 207
170sub txn { 208sub txn {
171 my ($self, $type, %attr) = @_; 209 my ($self, $type, %attr) = @_;
330=cut 368=cut
331 369
332_txn client_get => sub { 370_txn client_get => sub {
333 my ($self, $uri, $htl, $removelocal) = @_; 371 my ($self, $uri, $htl, $removelocal) = @_;
334 372
335 $self->txn (client_get => URI => $uri, hops_to_live => ($htl || 15), remove_local => $removelocal*1); 373 $self->txn (client_get => URI => $uri, hops_to_live => ($htl || 15), remove_local_key => $removelocal ? "true" : "false");
336}; 374};
337 375
338=item MISSING: ClientPut 376=item MISSING: ClientPut
339 377
340=back 378=back
354 392
355=cut 393=cut
356 394
357package Net::FCP::Txn; 395package Net::FCP::Txn;
358 396
397use Fcntl;
398use Socket;
399
359=item new arg => val,... 400=item new arg => val,...
360 401
361Creates a new C<Net::FCP::Txn> object. Not normally used. 402Creates a new C<Net::FCP::Txn> object. Not normally used.
362 403
363=cut 404=cut
364 405
365sub new { 406sub new {
366 my $class = shift; 407 my $class = shift;
367 my $self = bless { @_ }, $class; 408 my $self = bless { @_ }, $class;
409
410 $self->{signal} = $EVENT->new_signal;
411
412 $self->{fcp}{txn}{$self} = $self;
368 413
369 my $attr = ""; 414 my $attr = "";
370 my $data = delete $self->{attr}{data}; 415 my $data = delete $self->{attr}{data};
371 416
372 while (my ($k, $v) = each %{$self->{attr}}) { 417 while (my ($k, $v) = each %{$self->{attr}}) {
378 $data = "Data\012$data"; 423 $data = "Data\012$data";
379 } else { 424 } else {
380 $data = "EndMessage\012"; 425 $data = "EndMessage\012";
381 } 426 }
382 427
383 my $fh = new IO::Socket::INET 428 socket my $fh, PF_INET, SOCK_STREAM, 0
384 PeerHost => $self->{fcp}{host}, 429 or Carp::croak "unable to create new tcp socket: $!";
385 PeerPort => $self->{fcp}{port}
386 or Carp::croak "FCP::txn: unable to connect to $self->{fcp}{host}:$self->{fcp}{port}: $!\n";
387
388 binmode $fh, ":raw"; 430 binmode $fh, ":raw";
431 fcntl $fh, F_SETFL, O_NONBLOCK;
432 connect $fh, (sockaddr_in $self->{fcp}{port}, inet_aton $self->{fcp}{host})
433 and !$!{EWOULDBLOCK}
434 and !$!{EINPROGRESS}
435 and Carp::croak "FCP::txn: unable to connect to $self->{fcp}{host}:$self->{fcp}{port}: $!\n";
389 436
390 if (0) { 437 $self->{sbuf} =
391 print 438 "\x00\x00\x00\x02"
392 Net::FCP::touc $self->{type}, "\012",
393 $attr,
394 $data, "\012";
395 }
396
397 print $fh
398 "\x00\x00", "\x00\x02", # SESSID, PRESID
399 Net::FCP::touc $self->{type}, "\012", 439 . Net::FCP::touc $self->{type}
400 $attr, 440 . "\012$attr$data";
401 $data;
402 441
403 #$fh->shutdown (1); # freenet buggy?, well, it's java... 442 #$fh->shutdown (1); # freenet buggy?, well, it's java...
404 443
405 $self->{fh} = $fh; 444 $self->{fh} = $fh;
406 445
407 $Net::FCP::regcb->($self); 446 $self->{w} = $EVENT->new_from_fh ($fh)->cb(sub { $self->fh_ready_w })->poll(0, 1, 1);
408 447
409 $self; 448 $self;
410} 449}
411 450
451=item $txn = $txn->cb ($coderef)
452
453Sets a callback to be called when the request is finished. The coderef
454will be called with the txn as it's sole argument, so it has to call
455C<result> itself.
456
457Returns the txn object, useful for chaining.
458
459Example:
460
461 $fcp->txn_client_get ("freenet:CHK....")
462 ->userdata ("ehrm")
463 ->cb(sub {
464 my $data = shift->result;
465 });
466
467=cut
468
469sub cb($$) {
470 my ($self, $cb) = @_;
471 $self->{cb} = $cb;
472 $self;
473}
474
475=item $txn = $txn->userdata ([$userdata])
476
477Set user-specific data. This is useful in progress callbacks. The data can be accessed
478using C<< $txn->{userdata} >>.
479
480Returns the txn object, useful for chaining.
481
482=cut
483
484sub userdata($$) {
485 my ($self, $data) = @_;
486 $self->{userdata} = $data;
487 $self;
488}
489
412sub fh_ready { 490sub fh_ready_w {
491 my ($self) = @_;
492
493 my $len = syswrite $self->{fh}, $self->{sbuf};
494
495 if ($len > 0) {
496 substr $self->{sbuf}, 0, $len, "";
497 unless (length $self->{sbuf}) {
498 fcntl $self->{fh}, F_SETFL, 0;
499 $self->{w}->cb(sub { $self->fh_ready_r })->poll (1, 0, 1);
500 }
501 } elsif (defined $len) {
502 $self->throw (Net::FCP::Exception->new (network_error => { reason => "unexpected end of file while writing" }));
503 } else {
504 $self->throw (Net::FCP::Exception->new (network_error => { reason => "$!" }));
505 }
506}
507
508sub fh_ready_r {
413 my ($self) = @_; 509 my ($self) = @_;
414 510
415 if (sysread $self->{fh}, $self->{buf}, 65536, length $self->{buf}) { 511 if (sysread $self->{fh}, $self->{buf}, 65536, length $self->{buf}) {
416 for (;;) { 512 for (;;) {
417 if ($self->{datalen}) { 513 if ($self->{datalen}) {
514 warn "expecting new datachunk $self->{datalen}, got ".(length $self->{buf})."\n";#d#
418 if (length $self->{buf} >= $self->{datalen}) { 515 if (length $self->{buf} >= $self->{datalen}) {
419 $self->rcv_data (substr $self->{buf}, 0, $self->{datalen}, ""); 516 $self->rcv_data (substr $self->{buf}, 0, delete $self->{datalen}, "");
420 } else { 517 } else {
421 last; 518 last;
422 } 519 }
423 } elsif ($self->{buf} =~ s/^DataChunk\015?\012Length=([0-9a-fA-F]+)\015?\012Data\015?\012//) { 520 } elsif ($self->{buf} =~ s/^DataChunk\015?\012Length=([0-9a-fA-F]+)\015?\012Data\015?\012//) {
424 $self->{datalen} = hex $1; 521 $self->{datalen} = hex $1;
522 warn "expecting new datachunk $self->{datalen}\n";#d#
425 } elsif ($self->{buf} =~ s/^([a-zA-Z]+)\015?\012(?:(.+?)\015?\012)?EndMessage\015?\012//s) { 523 } elsif ($self->{buf} =~ s/^([a-zA-Z]+)\015?\012(?:(.+?)\015?\012)?EndMessage\015?\012//s) {
426 $self->rcv ($1, { 524 $self->rcv ($1, {
427 map { my ($a, $b) = split /=/, $_, 2; ((Net::FCP::tolc $a), $b) } 525 map { my ($a, $b) = split /=/, $_, 2; ((Net::FCP::tolc $a), $b) }
428 split /\015?\012/, $2 526 split /\015?\012/, $2
429 }); 527 });
430 } else { 528 } else {
431 last; 529 last;
432 } 530 }
433 } 531 }
434 } else { 532 } else {
435 $Net::FCP::unregcb->($self);
436 delete $self->{fh};
437 $self->eof; 533 $self->eof;
438 } 534 }
439} 535}
440 536
441sub rcv_data { 537sub rcv_data {
442 my ($self, $chunk) = @_; 538 my ($self, $chunk) = @_;
443 539
444 $self->{data} .= $chunk; 540 $self->{data} .= $chunk;
541
542 $self->progress ("data", { chunk => length $chunk, total => length $self->{data}, end => $self->{datalength} });
445} 543}
446 544
447sub rcv { 545sub rcv {
448 my ($self, $type, $attr) = @_; 546 my ($self, $type, $attr) = @_;
449 547
456 } else { 554 } else {
457 warn "received unexpected reply type '$type' for '$self->{type}', ignoring\n"; 555 warn "received unexpected reply type '$type' for '$self->{type}', ignoring\n";
458 } 556 }
459} 557}
460 558
559# used as a default exception thrower
560sub rcv_throw_exception {
561 my ($self, $attr, $type) = @_;
562 $self->throw (new Net::FCP::Exception $type, $attr);
563}
564
565*rcv_failed = \&Net::FCP::Txn::rcv_throw_exception;
566*rcv_format_error = \&Net::FCP::Txn::rcv_throw_exception;
567
568sub throw {
569 my ($self, $exc) = @_;
570
571 $self->{exception} = $exc;
572 $self->set_result (1);
573 $self->eof; # must be last to avoid loops
574}
575
461sub set_result { 576sub set_result {
462 my ($self, $result) = @_; 577 my ($self, $result) = @_;
463 578
464 $self->{result} = $result unless exists $self->{result}; 579 unless (exists $self->{result}) {
580 $self->{result} = $result;
581 $self->{cb}->($self) if exists $self->{cb};
582 $self->{signal}->send;
583 }
465} 584}
466 585
467sub eof { 586sub eof {
468 my ($self) = @_; 587 my ($self) = @_;
469 $self->set_result; 588
589 delete $self->{w};
590 delete $self->{fh};
591
592 delete $self->{fcp}{txn}{$self};
593
594 $self->set_result; # just in case
595}
596
597sub progress {
598 my ($self, $type, $attr) = @_;
599 $self->{fcp}->progress ($self, $type, $attr);
470} 600}
471 601
472=item $result = $txn->result 602=item $result = $txn->result
473 603
474Waits until a result is available and then returns it. 604Waits until a result is available and then returns it.
479=cut 609=cut
480 610
481sub result { 611sub result {
482 my ($self) = @_; 612 my ($self) = @_;
483 613
484 $Net::FCP::waitcb->() while !exists $self->{result}; 614 $self->{signal}->wait while !exists $self->{result};
615
616 die $self->{exception} if $self->{exception};
485 617
486 return $self->{result}; 618 return $self->{result};
487}
488
489sub DESTROY {
490 $Net::FCP::unregcb->($_[0]);
491} 619}
492 620
493package Net::FCP::Txn::ClientHello; 621package Net::FCP::Txn::ClientHello;
494 622
495use base Net::FCP::Txn; 623use base Net::FCP::Txn;
548 my ($self, $attr) = @_; 676 my ($self, $attr) = @_;
549 677
550 $self->set_result ($attr->{Length}); 678 $self->set_result ($attr->{Length});
551} 679}
552 680
681package Net::FCP::Txn::GetPut;
682
683# base class for get and put
684
685use base Net::FCP::Txn;
686
687*rcv_uri_error = \&Net::FCP::Txn::rcv_throw_exception;
688*rcv_route_not_found = \&Net::FCP::Txn::rcv_throw_exception;
689
690sub rcv_restarted {
691 my ($self, $attr, $type) = @_;
692
693 delete $self->{datalength};
694 delete $self->{metalength};
695 delete $self->{data};
696
697 $self->progress ($type, $attr);
698}
699
553package Net::FCP::Txn::ClientGet; 700package Net::FCP::Txn::ClientGet;
554 701
555use base Net::FCP::Txn; 702use base Net::FCP::Txn::GetPut;
703
704*rcv_data_not_found = \&Net::FCP::Txn::rcv_throw_exception;
556 705
557sub rcv_data_found { 706sub rcv_data_found {
558 my ($self, $attr) = @_; 707 my ($self, $attr, $type) = @_;
708
709 $self->progress ($type, $attr);
559 710
560 $self->{datalength} = hex $attr->{data_length}; 711 $self->{datalength} = hex $attr->{data_length};
561 $self->{metalength} = hex $attr->{metadata_length}; 712 $self->{metalength} = hex $attr->{metadata_length};
562} 713}
563 714
564sub rcv_restarted {
565 # nop, maybe feedback
566}
567
568sub eof { 715sub eof {
569 my ($self) = @_; 716 my ($self) = @_;
570 717
718 if ($self->{datalength} == length $self->{data}) {
571 my $data = delete $self->{data}; 719 my $data = delete $self->{data};
572 my $meta = Net::FCP::parse_metadata substr $data, 0, $self->{metalength}, ""; 720 my $meta = Net::FCP::parse_metadata substr $data, 0, $self->{metalength}, "";
573 721
574 $self->set_result ([$meta, $data]); 722 $self->set_result ([$meta, $data]);
723 } elsif (!exists $self->{result}) {
724 $self->throw (Net::FCP::Exception->new (short_data => {
725 reason => "unexpected eof or internal node error",
726 received => length $self->{data},
727 expected => $self->{datalength},
728 }));
729 }
730}
731
732package Net::FCP::Txn::ClientPut;
733
734use base Net::FCP::Txn::GetPut;
735
736*rcv_size_error = \&Net::FCP::Txn::rcv_throw_exception;
737*rcv_key_collision = \&Net::FCP::Txn::rcv_throw_exception;
738
739sub rcv_pending {
740 my ($self, $attr, $type) = @_;
741 $self->progress ($type, $attr);
742}
743
744sub rcv_success {
745 my ($self, $attr, $type) = @_;
746 $self->set_result ($attr);
747}
748
749package Net::FCP::Exception;
750
751use overload
752 '""' => sub {
753 "Net::FCP::Exception<<$_[0][0]," . (join ":", %{$_[0][1]}) . ">>\n";
754 };
755
756sub new {
757 my ($class, $type, $attr) = @_;
758
759 bless [Net::FCP::tolc $type, { %$attr }], $class;
575} 760}
576 761
577=back 762=back
578 763
579=head1 SEE ALSO 764=head1 SEE ALSO

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines