ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.12 by root, Sat Aug 8 04:02:48 2015 UTC vs.
Revision 1.18 by root, Thu Dec 3 19:07:57 2015 UTC

61 61
62use common::sense; 62use common::sense;
63 63
64use Carp; 64use Carp;
65 65
66our $VERSION = '0.3'; 66our $VERSION = 0.4;
67 67
68use Scalar::Util (); 68use Scalar::Util ();
69 69
70use AnyEvent; 70use AnyEvent;
71use AnyEvent::Handle; 71use AnyEvent::Handle;
72use AnyEvent::Util (); 72use AnyEvent::Util ();
73
74our %TOLC; # tolc cache
73 75
74sub touc($) { 76sub touc($) {
75 local $_ = shift; 77 local $_ = shift;
76 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/; 78 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
77 s/(?:^|_)(.)/\U$1/g; 79 s/(?:^|_)(.)/\U$1/g;
96 98
97=cut 99=cut
98 100
99sub new { 101sub new {
100 my $class = shift; 102 my $class = shift;
103
104 my $rand = join "", map chr 0x21 + rand 94, 1..40; # ~ 262 bits entropy
105
101 my $self = bless { 106 my $self = bless {
102 host => $ENV{FREDHOST} || "127.0.0.1", 107 host => $ENV{FREDHOST} || "127.0.0.1",
103 port => $ENV{FREDPORT} || 9481, 108 port => $ENV{FREDPORT} || 9481,
104 timeout => 3600 * 2, 109 timeout => 3600 * 2,
110 keepalive => 9 * 60,
105 name => time.rand.rand.rand, # lame 111 name => time.rand.rand.rand, # lame
106 @_, 112 @_,
107 queue => [], 113 queue => [],
108 req => {}, 114 req => {},
115 prefix => "..:aefcpid:$rand:",
109 id => "a0", 116 idseq => "a0",
110 }, $class; 117 }, $class;
111 118
112 { 119 {
113 Scalar::Util::weaken (my $self = $self); 120 Scalar::Util::weaken (my $self = $self);
121
122 $self->{kw} = AE::timer $self->{keepalive}, $self->{keepalive}, sub {
123 $self->{hdl}->push_write ("\n");
124 };
125
126 our $ENDMESSAGE = qr<\012(EndMessage|Data)\012>;
127
128 # these are declared here for performance reasons
129 my ($k, $v, $type);
130 my $rdata;
131
132 my $on_read = sub {
133 my ($hdl) = @_;
134
135 # we only carve out whole messages here
136 while ($hdl->{rbuf} =~ /\012(EndMessage|Data)\012/) {
137 # remember end marker
138 $rdata = $1 eq "Data"
139 or $1 eq "EndMessage"
140 or die "protocol error, expected message end, got $1\n";
141
142 my @lines = split /\012/, substr $hdl->{rbuf}, 0, $-[0];
143
144 substr $hdl->{rbuf}, 0, $+[0], ""; # remove pkg
145
146 $type = shift @lines;
147 $type = ($TOLC{$type} ||= tolc $type);
148
149 my %kv;
150
151 for (@lines) {
152 ($k, $v) = split /=/, $_, 2;
153 $k = ($TOLC{$k} ||= tolc $k);
154
155 if ($k =~ /\./) {
156 # generic, slow case
157 my @k = split /\./, $k;
158 my $ro = \\%kv;
159
160 while (@k) {
161 $k = shift @k;
162 if ($k =~ /^\d+$/) {
163 $ro = \$$ro->[$k];
164 } else {
165 $ro = \$$ro->{$k};
166 }
167 }
168
169 $$ro = $v;
170
171 next;
172 }
173
174 # special comon case, for performance only
175 $kv{$k} = $v;
176 }
177
178 if ($rdata) {
179 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
180 $rdata = \$_[1];
181 $self->recv ($type, \%kv, $rdata);
182 });
183
184 last; # do not tgry to parse more messages
185 } else {
186 $self->recv ($type, \%kv);
187 }
188 }
189 };
114 190
115 $self->{hdl} = new AnyEvent::Handle 191 $self->{hdl} = new AnyEvent::Handle
116 connect => [$self->{host} => $self->{port}], 192 connect => [$self->{host} => $self->{port}],
117 timeout => $self->{timeout}, 193 timeout => $self->{timeout},
118 on_error => sub { 194 on_error => sub {
119 warn "@_\n";#d# 195 warn "$self->{host}: $_[2]\n";#d#
120 exit 1; 196 exit 1;
121 }, 197 },
122 on_read => sub { $self->on_read (@_) }, 198 on_read => $on_read,
123 on_eof => $self->{on_eof} || sub { }; 199 on_eof => $self->{on_eof} || sub { },
200 ;
124 201
125 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 202 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
126 } 203 }
127 204
128 $self->send_msg (client_hello => 205 $self->send_msg (client_hello =>
131 ); 208 );
132 209
133 $self 210 $self
134} 211}
135 212
213sub identifier {
214 $_[0]{prefix} . ++$_[0]{idseq}
215}
216
136sub send_msg { 217sub send_msg {
137 my ($self, $type, %kv) = @_; 218 my ($self, $type, %kv) = @_;
138 219
139 my $data = delete $kv{data}; 220 my $data = delete $kv{data};
140 221
141 if (exists $kv{id_cb}) { 222 if (exists $kv{id_cb}) {
142 my $id = $kv{identifier} ||= ++$self->{id}; 223 my $id = $kv{identifier} ||= $self->identifier;
143 $self->{id}{$id} = delete $kv{id_cb}; 224 $self->{id}{$id} = delete $kv{id_cb};
144 } 225 }
145 226
146 my $msg = (touc $type) . "\012" 227 my $msg = (touc $type) . "\012"
147 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; 228 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
224 305
225 if (my $cb = $PERSISTENT_TYPE{$type}) { 306 if (my $cb = $PERSISTENT_TYPE{$type}) {
226 my $id = $kv->{identifier}; 307 my $id = $kv->{identifier};
227 my $req = $_[0]{req}{$id} ||= {}; 308 my $req = $_[0]{req}{$id} ||= {};
228 $cb->($self, $req, $kv); 309 $cb->($self, $req, $kv);
229 $self->recv (request_change => $kv, $type, @extra); 310 $self->recv (request_changed => $kv, $type, @extra);
230 } 311 }
231 312
232 my $on = $self->{on}; 313 my $on = $self->{on};
233 for (0 .. $#$on) { 314 for (0 .. $#$on) {
234 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) { 315 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
243 } else { 324 } else {
244 $self->default_recv ($type, $kv, @extra); 325 $self->default_recv ($type, $kv, @extra);
245 } 326 }
246} 327}
247 328
248sub on_read {
249 my ($self) = @_;
250
251 my $type;
252 my %kv;
253 my $rdata;
254
255 my $hdr_cb; $hdr_cb = sub {
256 if ($_[1] =~ /^([^=]+)=(.*)$/) {
257 my ($k, $v) = ($1, $2);
258 my @k = split /\./, tolc $k;
259 my $ro = \\%kv;
260
261 while (@k) {
262 my $k = shift @k;
263 if ($k =~ /^\d+$/) {
264 $ro = \$$ro->[$k];
265 } else {
266 $ro = \$$ro->{$k};
267 }
268 }
269
270 $$ro = $v;
271
272 $_[0]->push_read (line => $hdr_cb);
273 } elsif ($_[1] eq "Data") {
274 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
275 $rdata = \$_[1];
276 $self->recv ($type, \%kv, $rdata);
277 });
278 } elsif ($_[1] eq "EndMessage") {
279 $self->recv ($type, \%kv);
280 } else {
281 die "protocol error, expected message end, got $_[1]\n";#d#
282 }
283 };
284
285 $self->{hdl}->push_read (line => sub {
286 $type = tolc $_[1];
287 $_[0]->push_read (line => $hdr_cb);
288 });
289}
290
291sub default_recv { 329sub default_recv {
292 my ($self, $type, $kv, $rdata) = @_; 330 my ($self, $type, $kv, $rdata) = @_;
293 331
294 if ($type eq "node_hello") { 332 if ($type eq "node_hello") {
295 $self->{node_hello} = $kv; 333 $self->{node_hello} = $kv;
371 my ($name, $sub) = @_; 409 my ($name, $sub) = @_;
372 410
373 *{$name} = sub { 411 *{$name} = sub {
374 my $cv = AE::cv; 412 my $cv = AE::cv;
375 413
376 splice @_, 1, 0, $cv, sub { $cv->throw ($_[0]{extra_description}) }; 414 splice @_, 1, 0, $cv, sub { $cv->croak ($_[0]{extra_description}) };
377 &$sub; 415 &$sub;
378 $cv->recv 416 $cv->recv
379 }; 417 };
380 418
381 *{"$name\_"} = sub { 419 *{"$name\_"} = sub {
382 my ($ok, $err) = pop; 420 my ($ok, $err) = pop;
383 421
384 if (ARRAY:: eq ref $ok) { 422 if (ARRAY:: eq ref $ok) {
385 ($ok, $err) = @$ok; 423 ($ok, $err) = @$ok;
386 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) { 424 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
387 $err = sub { $ok->throw ($_[0]{extra_description}) }; 425 $err = sub { $ok->croak ($_[0]{extra_description}) };
388 } else { 426 } else {
389 my $bt = Carp::longmess ""; 427 my $bt = Carp::longmess "";
390 $err = sub { 428 $err = sub {
391 die "$_[0]{extra_description}$bt"; 429 die "$_[0]{code_description} ($_[0]{extra_description})$bt";
392 }; 430 };
393 } 431 }
394 432
395 $ok ||= $NOP_CB; 433 $ok ||= $NOP_CB;
396 434
517 ); 555 );
518 556
519 $self->on (sub { 557 $self->on (sub {
520 my ($self, $type, $kv, @extra) = @_; 558 my ($self, $type, $kv, @extra) = @_;
521 559
560 $guard if 0;
561
522 if ($kv->{identifier} eq $identifier) { 562 if ($kv->{identifier} eq $identifier) {
523 if ($type eq "persistent_request_modified") { 563 if ($type eq "persistent_request_modified") {
524 $ok->($kv); 564 $ok->($kv);
525 return; 565 return;
526 } elsif ($type eq "protocol_error") { 566 } elsif ($type eq "protocol_error") {
539=cut 579=cut
540 580
541_txn get_plugin_info => sub { 581_txn get_plugin_info => sub {
542 my ($self, $ok, $err, $name, $detailed) = @_; 582 my ($self, $ok, $err, $name, $detailed) = @_;
543 583
584 my $id = $self->identifier;
585
544 $self->send_msg (get_plugin_info => 586 $self->send_msg (get_plugin_info =>
587 identifier => $id,
545 plugin_name => $name, 588 plugin_name => $name,
546 detailed => $detailed ? "true" : "false", 589 detailed => $detailed ? "true" : "false",
547 id_cb => sub {
548 my ($self, $type, $kv, $rdata) = @_;
549
550 $ok->($kv);
551 1
552 },
553 ); 590 );
591 $self->on (sub {
592 my ($self, $type, $kv) = @_;
593
594 if ($kv->{identifier} eq $id) {
595 if ($type eq "get_plugin_info") {
596 $ok->($kv);
597 } else {
598 $err->($kv, $type);
599 }
600 return;
601 }
602
603 1
604 });
554}; 605};
555 606
556=item $status = $fcp->client_get ($uri, $identifier, %kv) 607=item $status = $fcp->client_get ($uri, $identifier, %kv)
557 608
558%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>). 609%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).
564=cut 615=cut
565 616
566_txn client_get => sub { 617_txn client_get => sub {
567 my ($self, $ok, $err, $uri, $identifier, %kv) = @_; 618 my ($self, $ok, $err, $uri, $identifier, %kv) = @_;
568 619
620 $self->serialise ($identifier => sub {
621 my ($self, $guard) = @_;
622
569 $self->send_msg (client_get => 623 $self->send_msg (client_get =>
570 %kv, 624 %kv,
571 uri => $uri, 625 uri => $uri,
572 identifier => $identifier, 626 identifier => $identifier,
627 );
628
629 $self->on (sub {
630 my ($self, $type, $kv, @extra) = @_;
631
632 $guard if 0;
633
634 if ($kv->{identifier} eq $identifier) {
635 if ($type eq "persistent_get") {
636 $ok->($kv);
637 return;
638 } elsif ($type eq "protocol_error") {
639 $err->($kv);
640 return;
641 }
642 }
643
644 1
645 });
573 ); 646 });
574
575 $ok->();
576}; 647};
577 648
578=item $status = $fcp->remove_request ($identifier[, $global]) 649=item $status = $fcp->remove_request ($identifier[, $global])
579 650
580Remove the request with the given isdentifier. Returns true if successful, 651Remove the request with the given isdentifier. Returns true if successful,
592 identifier => $identifier, 663 identifier => $identifier,
593 global => $global ? "true" : "false", 664 global => $global ? "true" : "false",
594 ); 665 );
595 $self->on (sub { 666 $self->on (sub {
596 my ($self, $type, $kv, @extra) = @_; 667 my ($self, $type, $kv, @extra) = @_;
668
669 $guard if 0;
597 670
598 if ($kv->{identifier} eq $identifier) { 671 if ($kv->{identifier} eq $identifier) {
599 if ($type eq "persistent_request_removed") { 672 if ($type eq "persistent_request_removed") {
600 $ok->(1); 673 $ok->(1);
601 return; 674 return;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines