ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.13 by root, Sat Aug 8 04:07:28 2015 UTC vs.
Revision 1.19 by root, Tue Jun 7 18:53:23 2016 UTC

61 61
62use common::sense; 62use common::sense;
63 63
64use Carp; 64use Carp;
65 65
66our $VERSION = '0.3'; 66our $VERSION = 0.4;
67 67
68use Scalar::Util (); 68use Scalar::Util ();
69 69
70use AnyEvent; 70use AnyEvent;
71use AnyEvent::Handle; 71use AnyEvent::Handle;
72use AnyEvent::Util (); 72use AnyEvent::Util ();
73
74our %TOLC; # tolc cache
73 75
74sub touc($) { 76sub touc($) {
75 local $_ = shift; 77 local $_ = shift;
76 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/; 78 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
77 s/(?:^|_)(.)/\U$1/g; 79 s/(?:^|_)(.)/\U$1/g;
96 98
97=cut 99=cut
98 100
99sub new { 101sub new {
100 my $class = shift; 102 my $class = shift;
103
104 my $rand = join "", map chr 0x21 + rand 94, 1..40; # ~ 262 bits entropy
105
101 my $self = bless { 106 my $self = bless {
102 host => $ENV{FREDHOST} || "127.0.0.1", 107 host => $ENV{FREDHOST} || "127.0.0.1",
103 port => $ENV{FREDPORT} || 9481, 108 port => $ENV{FREDPORT} || 9481,
104 timeout => 3600 * 2, 109 timeout => 3600 * 2,
110 keepalive => 9 * 60,
105 name => time.rand.rand.rand, # lame 111 name => time.rand.rand.rand, # lame
106 @_, 112 @_,
107 queue => [], 113 queue => [],
108 req => {}, 114 req => {},
115 prefix => "..:aefcpid:$rand:",
109 id => "a0", 116 idseq => "a0",
110 }, $class; 117 }, $class;
111 118
112 { 119 {
113 Scalar::Util::weaken (my $self = $self); 120 Scalar::Util::weaken (my $self = $self);
121
122 $self->{kw} = AE::timer $self->{keepalive}, $self->{keepalive}, sub {
123 $self->{hdl}->push_write ("\n");
124 };
125
126 our $ENDMESSAGE = qr<\012(EndMessage|Data)\012>;
127
128 # these are declared here for performance reasons
129 my ($k, $v, $type);
130 my $rdata;
131
132 my $on_read = sub {
133 my ($hdl) = @_;
134
135 # we only carve out whole messages here
136 while ($hdl->{rbuf} =~ /\012(EndMessage|Data)\012/) {
137 # remember end marker
138 $rdata = $1 eq "Data"
139 or $1 eq "EndMessage"
140 or return $self->fatal ("protocol error, expected message end, got $1\n");
141
142 my @lines = split /\012/, substr $hdl->{rbuf}, 0, $-[0];
143
144 substr $hdl->{rbuf}, 0, $+[0], ""; # remove pkg
145
146 $type = shift @lines;
147 $type = ($TOLC{$type} ||= tolc $type);
148
149 my %kv;
150
151 for (@lines) {
152 ($k, $v) = split /=/, $_, 2;
153 $k = ($TOLC{$k} ||= tolc $k);
154
155 if ($k =~ /\./) {
156 # generic, slow case
157 my @k = split /\./, $k;
158 my $ro = \\%kv;
159
160 while (@k) {
161 $k = shift @k;
162 if ($k =~ /^\d+$/) {
163 $ro = \$$ro->[$k];
164 } else {
165 $ro = \$$ro->{$k};
166 }
167 }
168
169 $$ro = $v;
170
171 next;
172 }
173
174 # special comon case, for performance only
175 $kv{$k} = $v;
176 }
177
178 if ($rdata) {
179 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
180 $rdata = \$_[1];
181 $self->recv ($type, \%kv, $rdata);
182 });
183
184 last; # do not tgry to parse more messages
185 } else {
186 $self->recv ($type, \%kv);
187 }
188 }
189 };
114 190
115 $self->{hdl} = new AnyEvent::Handle 191 $self->{hdl} = new AnyEvent::Handle
116 connect => [$self->{host} => $self->{port}], 192 connect => [$self->{host} => $self->{port}],
117 timeout => $self->{timeout}, 193 timeout => $self->{timeout},
194 on_read => $on_read,
195 on_eof => $self->{on_eof},
118 on_error => sub { 196 on_error => sub {
119 warn "@_\n";#d# 197 $self->fatal ($_[2]);
120 exit 1;
121 }, 198 },
122 on_read => sub { $self->on_read (@_) }, 199 ;
123 on_eof => $self->{on_eof} || sub { };
124 200
125 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 201 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
126 } 202 }
127 203
128 $self->send_msg (client_hello => 204 $self->send_msg (client_hello =>
131 ); 207 );
132 208
133 $self 209 $self
134} 210}
135 211
212sub fatal {
213 my ($self, $msg) = @_;
214
215 $self->{hdl}->shutdown;
216 delete $self->{kw};
217
218 if ($self->{on_error}) {
219 $self->{on_error}->($msg);
220 } else {
221 die $msg;
222 }
223}
224
225sub identifier {
226 $_[0]{prefix} . ++$_[0]{idseq}
227}
228
136sub send_msg { 229sub send_msg {
137 my ($self, $type, %kv) = @_; 230 my ($self, $type, %kv) = @_;
138 231
139 my $data = delete $kv{data}; 232 my $data = delete $kv{data};
140 233
141 if (exists $kv{id_cb}) { 234 if (exists $kv{id_cb}) {
142 my $id = $kv{identifier} ||= ++$self->{id}; 235 my $id = $kv{identifier} ||= $self->identifier;
143 $self->{id}{$id} = delete $kv{id_cb}; 236 $self->{id}{$id} = delete $kv{id_cb};
144 } 237 }
145 238
146 my $msg = (touc $type) . "\012" 239 my $msg = (touc $type) . "\012"
147 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv; 240 . join "", map +(touc $_) . "=$kv{$_}\012", keys %kv;
224 317
225 if (my $cb = $PERSISTENT_TYPE{$type}) { 318 if (my $cb = $PERSISTENT_TYPE{$type}) {
226 my $id = $kv->{identifier}; 319 my $id = $kv->{identifier};
227 my $req = $_[0]{req}{$id} ||= {}; 320 my $req = $_[0]{req}{$id} ||= {};
228 $cb->($self, $req, $kv); 321 $cb->($self, $req, $kv);
229 $self->recv (request_change => $kv, $type, @extra); 322 $self->recv (request_changed => $kv, $type, @extra);
230 } 323 }
231 324
232 my $on = $self->{on}; 325 my $on = $self->{on};
233 for (0 .. $#$on) { 326 for (0 .. $#$on) {
234 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) { 327 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
243 } else { 336 } else {
244 $self->default_recv ($type, $kv, @extra); 337 $self->default_recv ($type, $kv, @extra);
245 } 338 }
246} 339}
247 340
248sub on_read {
249 my ($self) = @_;
250
251 my $type;
252 my %kv;
253 my $rdata;
254
255 my $hdr_cb; $hdr_cb = sub {
256 if ($_[1] =~ /^([^=]+)=(.*)$/) {
257 my ($k, $v) = ($1, $2);
258 my @k = split /\./, tolc $k;
259 my $ro = \\%kv;
260
261 while (@k) {
262 my $k = shift @k;
263 if ($k =~ /^\d+$/) {
264 $ro = \$$ro->[$k];
265 } else {
266 $ro = \$$ro->{$k};
267 }
268 }
269
270 $$ro = $v;
271
272 $_[0]->push_read (line => $hdr_cb);
273 } elsif ($_[1] eq "Data") {
274 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
275 $rdata = \$_[1];
276 $self->recv ($type, \%kv, $rdata);
277 });
278 } elsif ($_[1] eq "EndMessage") {
279 $self->recv ($type, \%kv);
280 } else {
281 die "protocol error, expected message end, got $_[1]\n";#d#
282 }
283 };
284
285 $self->{hdl}->push_read (line => sub {
286 $type = tolc $_[1];
287 $_[0]->push_read (line => $hdr_cb);
288 });
289}
290
291sub default_recv { 341sub default_recv {
292 my ($self, $type, $kv, $rdata) = @_; 342 my ($self, $type, $kv, $rdata) = @_;
293 343
294 if ($type eq "node_hello") { 344 if ($type eq "node_hello") {
295 $self->{node_hello} = $kv; 345 $self->{node_hello} = $kv;
371 my ($name, $sub) = @_; 421 my ($name, $sub) = @_;
372 422
373 *{$name} = sub { 423 *{$name} = sub {
374 my $cv = AE::cv; 424 my $cv = AE::cv;
375 425
376 splice @_, 1, 0, $cv, sub { $cv->throw ($_[0]{extra_description}) }; 426 splice @_, 1, 0, $cv, sub { $cv->croak ($_[0]{extra_description}) };
377 &$sub; 427 &$sub;
378 $cv->recv 428 $cv->recv
379 }; 429 };
380 430
381 *{"$name\_"} = sub { 431 *{"$name\_"} = sub {
382 my ($ok, $err) = pop; 432 my ($ok, $err) = pop;
383 433
384 if (ARRAY:: eq ref $ok) { 434 if (ARRAY:: eq ref $ok) {
385 ($ok, $err) = @$ok; 435 ($ok, $err) = @$ok;
386 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) { 436 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
387 $err = sub { $ok->throw ($_[0]{extra_description}) }; 437 $err = sub { $ok->croak ($_[0]{extra_description}) };
388 } else { 438 } else {
389 my $bt = Carp::longmess ""; 439 my $bt = Carp::longmess "";
390 $err = sub { 440 $err = sub {
391 die "$_[0]{extra_description}$bt"; 441 die "$_[0]{code_description} ($_[0]{extra_description})$bt";
392 }; 442 };
393 } 443 }
394 444
395 $ok ||= $NOP_CB; 445 $ok ||= $NOP_CB;
396 446
541=cut 591=cut
542 592
543_txn get_plugin_info => sub { 593_txn get_plugin_info => sub {
544 my ($self, $ok, $err, $name, $detailed) = @_; 594 my ($self, $ok, $err, $name, $detailed) = @_;
545 595
596 my $id = $self->identifier;
597
546 $self->send_msg (get_plugin_info => 598 $self->send_msg (get_plugin_info =>
599 identifier => $id,
547 plugin_name => $name, 600 plugin_name => $name,
548 detailed => $detailed ? "true" : "false", 601 detailed => $detailed ? "true" : "false",
549 id_cb => sub {
550 my ($self, $type, $kv, $rdata) = @_;
551
552 $ok->($kv);
553 1
554 },
555 ); 602 );
603 $self->on (sub {
604 my ($self, $type, $kv) = @_;
605
606 if ($kv->{identifier} eq $id) {
607 if ($type eq "get_plugin_info") {
608 $ok->($kv);
609 } else {
610 $err->($kv, $type);
611 }
612 return;
613 }
614
615 1
616 });
556}; 617};
557 618
558=item $status = $fcp->client_get ($uri, $identifier, %kv) 619=item $status = $fcp->client_get ($uri, $identifier, %kv)
559 620
560%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>). 621%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines