ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Net-FCP/FCP.pm
(Generate patch)

Comparing Net-FCP/FCP.pm (file contents):
Revision 1.10 by root, Tue Sep 9 06:22:58 2003 UTC vs.
Revision 1.13 by root, Wed Sep 10 05:06:16 2003 UTC

44=cut 44=cut
45 45
46package Net::FCP; 46package Net::FCP;
47 47
48use Carp; 48use Carp;
49use IO::Socket::INET;
50 49
51$VERSION = 0.05; 50$VERSION = 0.05;
52 51
53no warnings; 52no warnings;
54 53
62 if (/^event=(\w+)$/) { 61 if (/^event=(\w+)$/) {
63 $EVENT = "Net::FCP::Event::$1"; 62 $EVENT = "Net::FCP::Event::$1";
64 } 63 }
65 } 64 }
66 eval "require $EVENT"; 65 eval "require $EVENT";
66 die $@ if $@;
67} 67}
68 68
69sub touc($) { 69sub touc($) {
70 local $_ = shift; 70 local $_ = shift;
71 1 while s/((?:^|_)(?:svk|chk|uri)(?:_|$))/\U$1/; 71 1 while s/((?:^|_)(?:svk|chk|uri)(?:_|$))/\U$1/;
121 my $hdr = $meta->{version} = {}; 121 my $hdr = $meta->{version} = {};
122 122
123 for (;;) { 123 for (;;) {
124 while ($data =~ /\G([^=\015\012]+)=([^\015\012]*)\015?\012/gc) { 124 while ($data =~ /\G([^=\015\012]+)=([^\015\012]*)\015?\012/gc) {
125 my ($k, $v) = ($1, $2); 125 my ($k, $v) = ($1, $2);
126 $hdr->{tolc $k} = $v; 126 my @p = split /\./, tolc $k, 3;
127
128 $hdr->{$p[0]} = $v if @p == 1; # lamest code I ever wrote
129 $hdr->{$p[0]}{$p[1]} = $v if @p == 2;
130 $hdr->{$p[0]}{$p[1]}{$p[3]} = $v if @p == 3;
131 die "FATAL: 4+ dot metadata" if @p >= 4;
127 } 132 }
128 133
129 if ($data =~ /\GEndPart\015?\012/gc) { 134 if ($data =~ /\GEndPart\015?\012/gc) {
135 # nop
130 } elsif ($data =~ /\GEnd\015?\012/gc) { 136 } elsif ($data =~ /\GEnd\015?\012/gc) {
131 last; 137 last;
132 } elsif ($data =~ /\G([A-Za-z0-9.\-]+)\015?\012/gcs) { 138 } elsif ($data =~ /\G([A-Za-z0-9.\-]+)\015?\012/gcs) {
133 push @{$meta->{tolc $1}}, $hdr = {}; 139 push @{$meta->{tolc $1}}, $hdr = {};
134 } elsif ($data =~ /\G(.*)/gcs) { 140 } elsif ($data =~ /\G(.*)/gcs) {
156sub new { 162sub new {
157 my $class = shift; 163 my $class = shift;
158 my $self = bless { @_ }, $class; 164 my $self = bless { @_ }, $class;
159 165
160 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1"; 166 $self->{host} ||= $ENV{FREDHOST} || "127.0.0.1";
161 $self->{port} ||= $ENV{FREDPORt} || 8481; 167 $self->{port} ||= $ENV{FREDPORT} || 8481;
162 168
163 $self->{nodehello} = $self->client_hello 169 #$self->{nodehello} = $self->client_hello
164 or croak "unable to get nodehello from node\n"; 170 # or croak "unable to get nodehello from node\n";
165 171
166 $self; 172 $self;
167} 173}
168 174
169sub progress { 175sub progress {
172} 178}
173 179
174=item $txn = $fcp->txn(type => attr => val,...) 180=item $txn = $fcp->txn(type => attr => val,...)
175 181
176The low-level interface to transactions. Don't use it. 182The low-level interface to transactions. Don't use it.
183
184Here are some examples of using transactions:
185
186The blocking case, no (visible) transactions involved:
187
188 my $nodehello = $fcp->client_hello;
189
190A transaction used in a blocking fashion:
191
192 my $txn = $fcp->txn_client_hello;
193 ...
194 my $nodehello = $txn->result;
195
196Or shorter:
197
198 my $nodehello = $fcp->txn_client_hello->result;
199
200Setting callbacks:
201
202 $fcp->txn_client_hello->cb(
203 sub { my $nodehello => $_[0]->result }
204 );
177 205
178=cut 206=cut
179 207
180sub txn { 208sub txn {
181 my ($self, $type, %attr) = @_; 209 my ($self, $type, %attr) = @_;
340=cut 368=cut
341 369
342_txn client_get => sub { 370_txn client_get => sub {
343 my ($self, $uri, $htl, $removelocal) = @_; 371 my ($self, $uri, $htl, $removelocal) = @_;
344 372
345 $self->txn (client_get => URI => $uri, hops_to_live => ($htl || 15), remove_local => $removelocal*1); 373 $self->txn (client_get => URI => $uri, hops_to_live => ($htl || 15), remove_local_key => $removelocal ? "true" : "false");
346}; 374};
347 375
348=item MISSING: ClientPut 376=item MISSING: ClientPut
349 377
350=back 378=back
364 392
365=cut 393=cut
366 394
367package Net::FCP::Txn; 395package Net::FCP::Txn;
368 396
397use Fcntl;
398use Socket;
399
369=item new arg => val,... 400=item new arg => val,...
370 401
371Creates a new C<Net::FCP::Txn> object. Not normally used. 402Creates a new C<Net::FCP::Txn> object. Not normally used.
372 403
373=cut 404=cut
374 405
375sub new { 406sub new {
376 my $class = shift; 407 my $class = shift;
377 my $self = bless { @_ }, $class; 408 my $self = bless { @_ }, $class;
409
410 $self->{signal} = $EVENT->new_signal;
411
412 $self->{fcp}{txn}{$self} = $self;
378 413
379 my $attr = ""; 414 my $attr = "";
380 my $data = delete $self->{attr}{data}; 415 my $data = delete $self->{attr}{data};
381 416
382 while (my ($k, $v) = each %{$self->{attr}}) { 417 while (my ($k, $v) = each %{$self->{attr}}) {
388 $data = "Data\012$data"; 423 $data = "Data\012$data";
389 } else { 424 } else {
390 $data = "EndMessage\012"; 425 $data = "EndMessage\012";
391 } 426 }
392 427
393 my $fh = new IO::Socket::INET 428 socket my $fh, PF_INET, SOCK_STREAM, 0
394 PeerHost => $self->{fcp}{host}, 429 or Carp::croak "unable to create new tcp socket: $!";
395 PeerPort => $self->{fcp}{port}
396 or Carp::croak "FCP::txn: unable to connect to $self->{fcp}{host}:$self->{fcp}{port}: $!\n";
397
398 binmode $fh, ":raw"; 430 binmode $fh, ":raw";
431 fcntl $fh, F_SETFL, O_NONBLOCK;
432 connect $fh, (sockaddr_in $self->{fcp}{port}, inet_aton $self->{fcp}{host})
433 and !$!{EWOULDBLOCK}
434 and !$!{EINPROGRESS}
435 and Carp::croak "FCP::txn: unable to connect to $self->{fcp}{host}:$self->{fcp}{port}: $!\n";
399 436
400 if (0) { 437 $self->{sbuf} =
401 print 438 "\x00\x00\x00\x02"
402 Net::FCP::touc $self->{type}, "\012",
403 $attr,
404 $data, "\012";
405 }
406
407 print $fh
408 "\x00\x00", "\x00\x02", # SESSID, PRESID
409 Net::FCP::touc $self->{type}, "\012", 439 . Net::FCP::touc $self->{type}
410 $attr, 440 . "\012$attr$data";
411 $data;
412 441
413 #$fh->shutdown (1); # freenet buggy?, well, it's java... 442 #$fh->shutdown (1); # freenet buggy?, well, it's java...
414 443
415 $self->{fh} = $fh; 444 $self->{fh} = $fh;
416 445
417 $EVENT->reg_r_cb ($self); 446 $self->{w} = $EVENT->new_from_fh ($fh)->cb(sub { $self->fh_ready_w })->poll(0, 1, 1);
418 447
419 $self; 448 $self;
420} 449}
421 450
451=item $txn = $txn->cb ($coderef)
452
453Sets a callback to be called when the request is finished. The coderef
454will be called with the txn as it's sole argument, so it has to call
455C<result> itself.
456
457Returns the txn object, useful for chaining.
458
459Example:
460
461 $fcp->txn_client_get ("freenet:CHK....")
462 ->userdata ("ehrm")
463 ->cb(sub {
464 my $data = shift->result;
465 });
466
467=cut
468
469sub cb($$) {
470 my ($self, $cb) = @_;
471 $self->{cb} = $cb;
472 $self;
473}
474
422=item $userdata = $txn->userdata ([$userdata]) 475=item $txn = $txn->userdata ([$userdata])
423 476
424Get and/or set user-specific data. This is useful in progress callbacks. 477Set user-specific data. This is useful in progress callbacks. The data can be accessed
478using C<< $txn->{userdata} >>.
425 479
426=cut 480Returns the txn object, useful for chaining.
427 481
482=cut
483
428sub userdata($;$) { 484sub userdata($$) {
429 my ($self, $data) = @_; 485 my ($self, $data) = @_;
430 $self->{userdata} = $data if @_ >= 2; 486 $self->{userdata} = $data;
431 $self->{userdata}; 487 $self;
432} 488}
433 489
434sub fh_ready { 490sub fh_ready_w {
491 my ($self) = @_;
492
493 my $len = syswrite $self->{fh}, $self->{sbuf};
494
495 if ($len > 0) {
496 substr $self->{sbuf}, 0, $len, "";
497 unless (length $self->{sbuf}) {
498 fcntl $self->{fh}, F_SETFL, 0;
499 $self->{w}->cb(sub { $self->fh_ready_r })->poll (1, 0, 1);
500 }
501 } elsif (defined $len) {
502 $self->throw (Net::FCP::Exception->new (network_error => { reason => "unexpected end of file while writing" }));
503 } else {
504 $self->throw (Net::FCP::Exception->new (network_error => { reason => "$!" }));
505 }
506}
507
508sub fh_ready_r {
435 my ($self) = @_; 509 my ($self) = @_;
436 510
437 if (sysread $self->{fh}, $self->{buf}, 65536, length $self->{buf}) { 511 if (sysread $self->{fh}, $self->{buf}, 65536, length $self->{buf}) {
438 for (;;) { 512 for (;;) {
439 if ($self->{datalen}) { 513 if ($self->{datalen}) {
514 #warn "expecting new datachunk $self->{datalen}, got ".(length $self->{buf})."\n";#d#
440 if (length $self->{buf} >= $self->{datalen}) { 515 if (length $self->{buf} >= $self->{datalen}) {
441 $self->rcv_data (substr $self->{buf}, 0, $self->{datalen}, ""); 516 $self->rcv_data (substr $self->{buf}, 0, delete $self->{datalen}, "");
442 } else { 517 } else {
443 last; 518 last;
444 } 519 }
445 } elsif ($self->{buf} =~ s/^DataChunk\015?\012Length=([0-9a-fA-F]+)\015?\012Data\015?\012//) { 520 } elsif ($self->{buf} =~ s/^DataChunk\015?\012Length=([0-9a-fA-F]+)\015?\012Data\015?\012//) {
446 $self->{datalen} = hex $1; 521 $self->{datalen} = hex $1;
522 #warn "expecting new datachunk $self->{datalen}\n";#d#
447 } elsif ($self->{buf} =~ s/^([a-zA-Z]+)\015?\012(?:(.+?)\015?\012)?EndMessage\015?\012//s) { 523 } elsif ($self->{buf} =~ s/^([a-zA-Z]+)\015?\012(?:(.+?)\015?\012)?EndMessage\015?\012//s) {
448 $self->rcv ($1, { 524 $self->rcv ($1, {
449 map { my ($a, $b) = split /=/, $_, 2; ((Net::FCP::tolc $a), $b) } 525 map { my ($a, $b) = split /=/, $_, 2; ((Net::FCP::tolc $a), $b) }
450 split /\015?\012/, $2 526 split /\015?\012/, $2
451 }); 527 });
452 } else { 528 } else {
453 last; 529 last;
454 } 530 }
455 } 531 }
456 } else { 532 } else {
457 $EVENT->unreg_r_cb ($self);
458 delete $self->{fh};
459 $self->eof; 533 $self->eof;
460 } 534 }
461} 535}
462 536
463sub rcv_data { 537sub rcv_data {
480 } else { 554 } else {
481 warn "received unexpected reply type '$type' for '$self->{type}', ignoring\n"; 555 warn "received unexpected reply type '$type' for '$self->{type}', ignoring\n";
482 } 556 }
483} 557}
484 558
559# used as a default exception thrower
560sub rcv_throw_exception {
561 my ($self, $attr, $type) = @_;
562 $self->throw (new Net::FCP::Exception $type, $attr);
563}
564
565*rcv_failed = \&Net::FCP::Txn::rcv_throw_exception;
566*rcv_format_error = \&Net::FCP::Txn::rcv_throw_exception;
567
485sub throw { 568sub throw {
486 my ($self, $exc) = @_; 569 my ($self, $exc) = @_;
487 570
488 $self->{exception} = $exc; 571 $self->{exception} = $exc;
489 $self->set_result (1); 572 $self->set_result (1);
573 $self->eof; # must be last to avoid loops
490} 574}
491 575
492sub set_result { 576sub set_result {
493 my ($self, $result) = @_; 577 my ($self, $result) = @_;
494 578
495 $self->{result} = $result unless exists $self->{result}; 579 unless (exists $self->{result}) {
580 $self->{result} = $result;
581 $self->{cb}->($self) if exists $self->{cb};
582 $self->{signal}->send;
583 }
496} 584}
497 585
498sub eof { 586sub eof {
499 my ($self) = @_; 587 my ($self) = @_;
500 $self->set_result; 588
589 delete $self->{w};
590 delete $self->{fh};
591
592 delete $self->{fcp}{txn}{$self};
593
594 $self->set_result; # just in case
501} 595}
502 596
503sub progress { 597sub progress {
504 my ($self, $type, $attr) = @_; 598 my ($self, $type, $attr) = @_;
505 $self->{fcp}->progress ($self, $type, $attr); 599 $self->{fcp}->progress ($self, $type, $attr);
515=cut 609=cut
516 610
517sub result { 611sub result {
518 my ($self) = @_; 612 my ($self) = @_;
519 613
520 $EVENT->wait_event while !exists $self->{result}; 614 $self->{signal}->wait while !exists $self->{result};
521 615
522 die $self->{exception} if $self->{exception}; 616 die $self->{exception} if $self->{exception};
523 617
524 return $self->{result}; 618 return $self->{result};
525}
526
527sub DESTROY {
528 $EVENT->unreg_r_cb ($_[0]);
529 #$EVENT->unreg_w_cb ($_[0]);
530} 619}
531 620
532package Net::FCP::Txn::ClientHello; 621package Net::FCP::Txn::ClientHello;
533 622
534use base Net::FCP::Txn; 623use base Net::FCP::Txn;
587 my ($self, $attr) = @_; 676 my ($self, $attr) = @_;
588 677
589 $self->set_result ($attr->{Length}); 678 $self->set_result ($attr->{Length});
590} 679}
591 680
681package Net::FCP::Txn::GetPut;
682
683# base class for get and put
684
685use base Net::FCP::Txn;
686
687*rcv_uri_error = \&Net::FCP::Txn::rcv_throw_exception;
688*rcv_route_not_found = \&Net::FCP::Txn::rcv_throw_exception;
689
690sub rcv_restarted {
691 my ($self, $attr, $type) = @_;
692
693 delete $self->{datalength};
694 delete $self->{metalength};
695 delete $self->{data};
696
697 $self->progress ($type, $attr);
698}
699
592package Net::FCP::Txn::ClientGet; 700package Net::FCP::Txn::ClientGet;
593 701
594use base Net::FCP::Txn; 702use base Net::FCP::Txn::GetPut;
703
704*rcv_data_not_found = \&Net::FCP::Txn::rcv_throw_exception;
595 705
596sub rcv_data_found { 706sub rcv_data_found {
597 my ($self, $attr, $type) = @_; 707 my ($self, $attr, $type) = @_;
598 708
599 $self->progress ($type, $attr); 709 $self->progress ($type, $attr);
600 710
601 $self->{datalength} = hex $attr->{data_length}; 711 $self->{datalength} = hex $attr->{data_length};
602 $self->{metalength} = hex $attr->{metadata_length}; 712 $self->{metalength} = hex $attr->{metadata_length};
603} 713}
604 714
605sub rcv_route_not_found { 715sub eof {
606 my ($self, $attr, $type) = @_; 716 my ($self) = @_;
607 717
608 $self->throw (new Net::FCP::Exception $type, $attr); 718 if ($self->{datalength} == length $self->{data}) {
609} 719 my $data = delete $self->{data};
720 my $meta = Net::FCP::parse_metadata substr $data, 0, $self->{metalength}, "";
610 721
611sub rcv_data_not_found { 722 $self->set_result ([$meta, $data]);
612 my ($self, $attr, $type) = @_; 723 } elsif (!exists $self->{result}) {
613 724 $self->throw (Net::FCP::Exception->new (short_data => {
614 $self->throw (new Net::FCP::Exception $type, $attr); 725 reason => "unexpected eof or internal node error",
726 received => length $self->{data},
727 expected => $self->{datalength},
728 }));
729 }
615} 730}
616 731
617sub rcv_format_error { 732package Net::FCP::Txn::ClientPut;
618 my ($self, $attr, $type) = @_;
619 733
620 $self->throw (new Net::FCP::Exception $type, $attr); 734use base Net::FCP::Txn::GetPut;
621}
622 735
623sub rcv_restarted { 736*rcv_size_error = \&Net::FCP::Txn::rcv_throw_exception;
737*rcv_key_collision = \&Net::FCP::Txn::rcv_throw_exception;
738
739sub rcv_pending {
624 my ($self, $attr, $type) = @_; 740 my ($self, $attr, $type) = @_;
625 $self->progress ($type, $attr); 741 $self->progress ($type, $attr);
626} 742}
627 743
628sub eof { 744sub rcv_success {
629 my ($self) = @_; 745 my ($self, $attr, $type) = @_;
630
631 my $data = delete $self->{data};
632 my $meta = Net::FCP::parse_metadata substr $data, 0, $self->{metalength}, "";
633
634 $self->set_result ([$meta, $data]); 746 $self->set_result ($attr);
635} 747}
636 748
637package Net::FCP::Exception; 749package Net::FCP::Exception;
638 750
639use overload 751use overload
642 }; 754 };
643 755
644sub new { 756sub new {
645 my ($class, $type, $attr) = @_; 757 my ($class, $type, $attr) = @_;
646 758
647 bless [$type, { %$attr }], $class; 759 bless [Net::FCP::tolc $type, { %$attr }], $class;
648} 760}
649 761
650=back 762=back
651 763
652=head1 SEE ALSO 764=head1 SEE ALSO

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines