ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Net-FCP/FCP.pm
(Generate patch)

Comparing Net-FCP/FCP.pm (file contents):
Revision 1.9 by root, Tue Sep 9 06:13:18 2003 UTC vs.
Revision 1.13 by root, Wed Sep 10 05:06:16 2003 UTC

44=cut 44=cut
45 45
46package Net::FCP; 46package Net::FCP;
47 47
48use Carp; 48use Carp;
49use IO::Socket::INET;
50 49
51$VERSION = 0.04; 50$VERSION = 0.05;
51
52no warnings;
52 53
53our $EVENT = Net::FCP::Event::Auto::; 54our $EVENT = Net::FCP::Event::Auto::;
54$EVENT = Net::FCP::Event::Event::;#d# 55$EVENT = Net::FCP::Event::Event;#d#
55 56
56sub import { 57sub import {
57 shift; 58 shift;
58 59
59 for (@_) { 60 for (@_) {
60 if (/^event=(\w+)$/) { 61 if (/^event=(\w+)$/) {
61 $EVENT = "Net::FCP::Event::$1"; 62 $EVENT = "Net::FCP::Event::$1";
62 } 63 }
63 } 64 }
64 eval "require $EVENT"; 65 eval "require $EVENT";
66 die $@ if $@;
65} 67}
66 68
67sub touc($) { 69sub touc($) {
68 local $_ = shift; 70 local $_ = shift;
69 1 while s/((?:^|_)(?:svk|chk|uri)(?:_|$))/\U$1/; 71 1 while s/((?:^|_)(?:svk|chk|uri)(?:_|$))/\U$1/;
119 my $hdr = $meta->{version} = {}; 121 my $hdr = $meta->{version} = {};
120 122
121 for (;;) { 123 for (;;) {
122 while ($data =~ /\G([^=\015\012]+)=([^\015\012]*)\015?\012/gc) { 124 while ($data =~ /\G([^=\015\012]+)=([^\015\012]*)\015?\012/gc) {
123 my ($k, $v) = ($1, $2); 125 my ($k, $v) = ($1, $2);
124 $hdr->{tolc $k} = $v; 126 my @p = split /\./, tolc $k, 3;
127
128 $hdr->{$p[0]} = $v if @p == 1; # lamest code I ever wrote
129 $hdr->{$p[0]}{$p[1]} = $v if @p == 2;
130 $hdr->{$p[0]}{$p[1]}{$p[3]} = $v if @p == 3;
131 die "FATAL: 4+ dot metadata" if @p >= 4;
125 } 132 }
126 133
127 if ($data =~ /\GEndPart\015?\012/gc) { 134 if ($data =~ /\GEndPart\015?\012/gc) {
135 # nop
128 } elsif ($data =~ /\GEnd\015?\012/gc) { 136 } elsif ($data =~ /\GEnd\015?\012/gc) {
129 last; 137 last;
130 } elsif ($data =~ /\G([A-Za-z0-9.\-]+)\015?\012/gcs) { 138 } elsif ($data =~ /\G([A-Za-z0-9.\-]+)\015?\012/gcs) {
131 push @{$meta->{tolc $1}}, $hdr = {}; 139 push @{$meta->{tolc $1}}, $hdr = {};
132 } elsif ($data =~ /\G(.*)/gcs) { 140 } elsif ($data =~ /\G(.*)/gcs) {
154sub new { 162sub new {
155 my $class = shift; 163 my $class = shift;
156 my $self = bless { @_ }, $class; 164 my $self = bless { @_ }, $class;
157 165
158 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 166 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1";
159 $self->{port} ||= $ENV{FREDPORt} || 8481; 167 $self->{port} ||= $ENV{FREDPORT} || 8481;
160 168
161 $self->{nodehello} = $self->client_hello 169 #$self->{nodehello} = $self->client_hello
162 or croak "unable to get nodehello from node\n"; 170 # or croak "unable to get nodehello from node\n";
163 171
164 $self; 172 $self;
165} 173}
166 174
167sub progress { 175sub progress {
170} 178}
171 179
172=item $txn = $fcp->txn(type => attr => val,...) 180=item $txn = $fcp->txn(type => attr => val,...)
173 181
174The low-level interface to transactions. Don't use it. 182The low-level interface to transactions. Don't use it.
183
184Here are some examples of using transactions:
185
186The blocking case, no (visible) transactions involved:
187
188 my $nodehello = $fcp->client_hello;
189
190A transaction used in a blocking fashion:
191
192 my $txn = $fcp->txn_client_hello;
193 ...
194 my $nodehello = $txn->result;
195
196Or shorter:
197
198 my $nodehello = $fcp->txn_client_hello->result;
199
200Setting callbacks:
201
202 $fcp->txn_client_hello->cb(
203 sub { my $nodehello => $_[0]->result }
204 );
175 205
176=cut 206=cut
177 207
178sub txn { 208sub txn {
179 my ($self, $type, %attr) = @_; 209 my ($self, $type, %attr) = @_;
338=cut 368=cut
339 369
340_txn client_get => sub { 370_txn client_get => sub {
341 my ($self, $uri, $htl, $removelocal) = @_; 371 my ($self, $uri, $htl, $removelocal) = @_;
342 372
343 $self->txn (client_get => URI => $uri, hops_to_live => ($htl || 15), remove_local => $removelocal*1); 373 $self->txn (client_get => URI => $uri, hops_to_live => ($htl || 15), remove_local_key => $removelocal ? "true" : "false");
344}; 374};
345 375
346=item MISSING: ClientPut 376=item MISSING: ClientPut
347 377
348=back 378=back
362 392
363=cut 393=cut
364 394
365package Net::FCP::Txn; 395package Net::FCP::Txn;
366 396
397use Fcntl;
398use Socket;
399
367=item new arg => val,... 400=item new arg => val,...
368 401
369Creates a new C<Net::FCP::Txn> object. Not normally used. 402Creates a new C<Net::FCP::Txn> object. Not normally used.
370 403
371=cut 404=cut
372 405
373sub new { 406sub new {
374 my $class = shift; 407 my $class = shift;
375 my $self = bless { @_ }, $class; 408 my $self = bless { @_ }, $class;
409
410 $self->{signal} = $EVENT->new_signal;
411
412 $self->{fcp}{txn}{$self} = $self;
376 413
377 my $attr = ""; 414 my $attr = "";
378 my $data = delete $self->{attr}{data}; 415 my $data = delete $self->{attr}{data};
379 416
380 while (my ($k, $v) = each %{$self->{attr}}) { 417 while (my ($k, $v) = each %{$self->{attr}}) {
386 $data = "Data\012$data"; 423 $data = "Data\012$data";
387 } else { 424 } else {
388 $data = "EndMessage\012"; 425 $data = "EndMessage\012";
389 } 426 }
390 427
391 my $fh = new IO::Socket::INET 428 socket my $fh, PF_INET, SOCK_STREAM, 0
392 PeerHost => $self->{fcp}{host}, 429 or Carp::croak "unable to create new tcp socket: $!";
393 PeerPort => $self->{fcp}{port}
394 or Carp::croak "FCP::txn: unable to connect to $self->{fcp}{host}:$self->{fcp}{port}: $!\n";
395
396 binmode $fh, ":raw"; 430 binmode $fh, ":raw";
431 fcntl $fh, F_SETFL, O_NONBLOCK;
432 connect $fh, (sockaddr_in $self->{fcp}{port}, inet_aton $self->{fcp}{host})
433 and !$!{EWOULDBLOCK}
434 and !$!{EINPROGRESS}
435 and Carp::croak "FCP::txn: unable to connect to $self->{fcp}{host}:$self->{fcp}{port}: $!\n";
397 436
398 if (0) { 437 $self->{sbuf} =
399 print 438 "\x00\x00\x00\x02"
400 Net::FCP::touc $self->{type}, "\012",
401 $attr,
402 $data, "\012";
403 }
404
405 print $fh
406 "\x00\x00", "\x00\x02", # SESSID, PRESID
407 Net::FCP::touc $self->{type}, "\012", 439 . Net::FCP::touc $self->{type}
408 $attr, 440 . "\012$attr$data";
409 $data;
410 441
411 #$fh->shutdown (1); # freenet buggy?, well, it's java... 442 #$fh->shutdown (1); # freenet buggy?, well, it's java...
412 443
413 $self->{fh} = $fh; 444 $self->{fh} = $fh;
414 445
415 $EVENT->reg_r_cb ($self); 446 $self->{w} = $EVENT->new_from_fh ($fh)->cb(sub { $self->fh_ready_w })->poll(0, 1, 1);
416 447
417 $self; 448 $self;
418} 449}
419 450
451=item $txn = $txn->cb ($coderef)
452
453Sets a callback to be called when the request is finished. The coderef
454will be called with the txn as it's sole argument, so it has to call
455C<result> itself.
456
457Returns the txn object, useful for chaining.
458
459Example:
460
461 $fcp->txn_client_get ("freenet:CHK....")
462 ->userdata ("ehrm")
463 ->cb(sub {
464 my $data = shift->result;
465 });
466
467=cut
468
469sub cb($$) {
470 my ($self, $cb) = @_;
471 $self->{cb} = $cb;
472 $self;
473}
474
420=item $userdata = $txn->userdata ([$userdata]) 475=item $txn = $txn->userdata ([$userdata])
421 476
422Get and/or set user-specific data. This is useful in progress callbacks. 477Set user-specific data. This is useful in progress callbacks. The data can be accessed
478using C<< $txn->{userdata} >>.
423 479
424=cut 480Returns the txn object, useful for chaining.
425 481
482=cut
483
426sub userdata($;$) { 484sub userdata($$) {
427 my ($self, $data) = @_; 485 my ($self, $data) = @_;
428 $self->{userdata} = $data if @_ >= 2; 486 $self->{userdata} = $data;
429 $self->{userdata}; 487 $self;
430} 488}
431 489
432sub fh_ready { 490sub fh_ready_w {
491 my ($self) = @_;
492
493 my $len = syswrite $self->{fh}, $self->{sbuf};
494
495 if ($len > 0) {
496 substr $self->{sbuf}, 0, $len, "";
497 unless (length $self->{sbuf}) {
498 fcntl $self->{fh}, F_SETFL, 0;
499 $self->{w}->cb(sub { $self->fh_ready_r })->poll (1, 0, 1);
500 }
501 } elsif (defined $len) {
502 $self->throw (Net::FCP::Exception->new (network_error => { reason => "unexpected end of file while writing" }));
503 } else {
504 $self->throw (Net::FCP::Exception->new (network_error => { reason => "$!" }));
505 }
506}
507
508sub fh_ready_r {
433 my ($self) = @_; 509 my ($self) = @_;
434 510
435 if (sysread $self->{fh}, $self->{buf}, 65536, length $self->{buf}) { 511 if (sysread $self->{fh}, $self->{buf}, 65536, length $self->{buf}) {
436 for (;;) { 512 for (;;) {
437 if ($self->{datalen}) { 513 if ($self->{datalen}) {
514 #warn "expecting new datachunk $self->{datalen}, got ".(length $self->{buf})."\n";#d#
438 if (length $self->{buf} >= $self->{datalen}) { 515 if (length $self->{buf} >= $self->{datalen}) {
439 $self->rcv_data (substr $self->{buf}, 0, $self->{datalen}, ""); 516 $self->rcv_data (substr $self->{buf}, 0, delete $self->{datalen}, "");
440 } else { 517 } else {
441 last; 518 last;
442 } 519 }
443 } elsif ($self->{buf} =~ s/^DataChunk\015?\012Length=([0-9a-fA-F]+)\015?\012Data\015?\012//) { 520 } elsif ($self->{buf} =~ s/^DataChunk\015?\012Length=([0-9a-fA-F]+)\015?\012Data\015?\012//) {
444 $self->{datalen} = hex $1; 521 $self->{datalen} = hex $1;
522 #warn "expecting new datachunk $self->{datalen}\n";#d#
445 } elsif ($self->{buf} =~ s/^([a-zA-Z]+)\015?\012(?:(.+?)\015?\012)?EndMessage\015?\012//s) { 523 } elsif ($self->{buf} =~ s/^([a-zA-Z]+)\015?\012(?:(.+?)\015?\012)?EndMessage\015?\012//s) {
446 $self->rcv ($1, { 524 $self->rcv ($1, {
447 map { my ($a, $b) = split /=/, $_, 2; ((Net::FCP::tolc $a), $b) } 525 map { my ($a, $b) = split /=/, $_, 2; ((Net::FCP::tolc $a), $b) }
448 split /\015?\012/, $2 526 split /\015?\012/, $2
449 }); 527 });
450 } else { 528 } else {
451 last; 529 last;
452 } 530 }
453 } 531 }
454 } else { 532 } else {
455 $EVENT->unreg_r_cb ($self);
456 delete $self->{fh};
457 $self->eof; 533 $self->eof;
458 } 534 }
459} 535}
460 536
461sub rcv_data { 537sub rcv_data {
478 } else { 554 } else {
479 warn "received unexpected reply type '$type' for '$self->{type}', ignoring\n"; 555 warn "received unexpected reply type '$type' for '$self->{type}', ignoring\n";
480 } 556 }
481} 557}
482 558
559# used as a default exception thrower
560sub rcv_throw_exception {
561 my ($self, $attr, $type) = @_;
562 $self->throw (new Net::FCP::Exception $type, $attr);
563}
564
565*rcv_failed = \&Net::FCP::Txn::rcv_throw_exception;
566*rcv_format_error = \&Net::FCP::Txn::rcv_throw_exception;
567
483sub throw { 568sub throw {
484 my ($self, $exc) = @_; 569 my ($self, $exc) = @_;
485 570
486 $self->{exception} = $exc; 571 $self->{exception} = $exc;
487 $self->set_result (1); 572 $self->set_result (1);
573 $self->eof; # must be last to avoid loops
488} 574}
489 575
490sub set_result { 576sub set_result {
491 my ($self, $result) = @_; 577 my ($self, $result) = @_;
492 578
493 $self->{result} = $result unless exists $self->{result}; 579 unless (exists $self->{result}) {
580 $self->{result} = $result;
581 $self->{cb}->($self) if exists $self->{cb};
582 $self->{signal}->send;
583 }
494} 584}
495 585
496sub eof { 586sub eof {
497 my ($self) = @_; 587 my ($self) = @_;
498 $self->set_result; 588
589 delete $self->{w};
590 delete $self->{fh};
591
592 delete $self->{fcp}{txn}{$self};
593
594 $self->set_result; # just in case
499} 595}
500 596
501sub progress { 597sub progress {
502 my ($self, $type, $attr) = @_; 598 my ($self, $type, $attr) = @_;
503 $self->{fcp}->progress ($self, $type, $attr); 599 $self->{fcp}->progress ($self, $type, $attr);
513=cut 609=cut
514 610
515sub result { 611sub result {
516 my ($self) = @_; 612 my ($self) = @_;
517 613
518 $EVENT->wait_event while !exists $self->{result}; 614 $self->{signal}->wait while !exists $self->{result};
519 615
520 die $self->{exception} if $self->{exception}; 616 die $self->{exception} if $self->{exception};
521 617
522 return $self->{result}; 618 return $self->{result};
523}
524
525sub DESTROY {
526 $EVENT->unreg_r_cb ($_[0]);
527 #$EVENT->unreg_w_cb ($_[0]);
528} 619}
529 620
530package Net::FCP::Txn::ClientHello; 621package Net::FCP::Txn::ClientHello;
531 622
532use base Net::FCP::Txn; 623use base Net::FCP::Txn;
585 my ($self, $attr) = @_; 676 my ($self, $attr) = @_;
586 677
587 $self->set_result ($attr->{Length}); 678 $self->set_result ($attr->{Length});
588} 679}
589 680
681package Net::FCP::Txn::GetPut;
682
683# base class for get and put
684
685use base Net::FCP::Txn;
686
687*rcv_uri_error = \&Net::FCP::Txn::rcv_throw_exception;
688*rcv_route_not_found = \&Net::FCP::Txn::rcv_throw_exception;
689
690sub rcv_restarted {
691 my ($self, $attr, $type) = @_;
692
693 delete $self->{datalength};
694 delete $self->{metalength};
695 delete $self->{data};
696
697 $self->progress ($type, $attr);
698}
699
590package Net::FCP::Txn::ClientGet; 700package Net::FCP::Txn::ClientGet;
591 701
592use base Net::FCP::Txn; 702use base Net::FCP::Txn::GetPut;
703
704*rcv_data_not_found = \&Net::FCP::Txn::rcv_throw_exception;
593 705
594sub rcv_data_found { 706sub rcv_data_found {
595 my ($self, $attr, $type) = @_; 707 my ($self, $attr, $type) = @_;
596 708
597 $self->progress ($type, $attr); 709 $self->progress ($type, $attr);
598 710
599 $self->{datalength} = hex $attr->{data_length}; 711 $self->{datalength} = hex $attr->{data_length};
600 $self->{metalength} = hex $attr->{metadata_length}; 712 $self->{metalength} = hex $attr->{metadata_length};
601} 713}
602 714
603sub rcv_route_not_found { 715sub eof {
604 my ($self, $attr, $type) = @_; 716 my ($self) = @_;
605 717
606 $self->throw (new Net::FCP::Exception $type, $attr); 718 if ($self->{datalength} == length $self->{data}) {
607} 719 my $data = delete $self->{data};
720 my $meta = Net::FCP::parse_metadata substr $data, 0, $self->{metalength}, "";
608 721
609sub rcv_data_not_found { 722 $self->set_result ([$meta, $data]);
610 my ($self, $attr, $type) = @_; 723 } elsif (!exists $self->{result}) {
611 724 $self->throw (Net::FCP::Exception->new (short_data => {
612 $self->throw (new Net::FCP::Exception $type, $attr); 725 reason => "unexpected eof or internal node error",
726 received => length $self->{data},
727 expected => $self->{datalength},
728 }));
729 }
613} 730}
614 731
615sub rcv_format_error { 732package Net::FCP::Txn::ClientPut;
616 my ($self, $attr, $type) = @_;
617 733
618 $self->throw (new Net::FCP::Exception $type, $attr); 734use base Net::FCP::Txn::GetPut;
619}
620 735
621sub rcv_restarted { 736*rcv_size_error = \&Net::FCP::Txn::rcv_throw_exception;
737*rcv_key_collision = \&Net::FCP::Txn::rcv_throw_exception;
738
739sub rcv_pending {
622 my ($self, $attr, $type) = @_; 740 my ($self, $attr, $type) = @_;
623 $self->progress ($type, $attr); 741 $self->progress ($type, $attr);
624} 742}
625 743
626sub eof { 744sub rcv_success {
627 my ($self) = @_; 745 my ($self, $attr, $type) = @_;
628
629 my $data = delete $self->{data};
630 my $meta = Net::FCP::parse_metadata substr $data, 0, $self->{metalength}, "";
631
632 $self->set_result ([$meta, $data]); 746 $self->set_result ($attr);
633} 747}
634 748
635package Net::FCP::Exception; 749package Net::FCP::Exception;
636 750
637use overload 751use overload
640 }; 754 };
641 755
642sub new { 756sub new {
643 my ($class, $type, $attr) = @_; 757 my ($class, $type, $attr) = @_;
644 758
645 bless [$type, { %$attr }], $class; 759 bless [Net::FCP::tolc $type, { %$attr }], $class;
646} 760}
647 761
648=back 762=back
649 763
650=head1 SEE ALSO 764=head1 SEE ALSO

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines