ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.14 by root, Sat Aug 8 14:09:47 2015 UTC vs.
Revision 1.19 by root, Tue Jun 7 18:53:23 2016 UTC

61 61
62use common::sense; 62use common::sense;
63 63
64use Carp; 64use Carp;
65 65
66our $VERSION = '0.3'; 66our $VERSION = 0.4;
67 67
68use Scalar::Util (); 68use Scalar::Util ();
69 69
70use AnyEvent; 70use AnyEvent;
71use AnyEvent::Handle; 71use AnyEvent::Handle;
72use AnyEvent::Util (); 72use AnyEvent::Util ();
73
74our %TOLC; # tolc cache
73 75
74sub touc($) { 76sub touc($) {
75 local $_ = shift; 77 local $_ = shift;
76 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/; 78 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
77 s/(?:^|_)(.)/\U$1/g; 79 s/(?:^|_)(.)/\U$1/g;
103 105
104 my $self = bless { 106 my $self = bless {
105 host => $ENV{FREDHOST} || "127.0.0.1", 107 host => $ENV{FREDHOST} || "127.0.0.1",
106 port => $ENV{FREDPORT} || 9481, 108 port => $ENV{FREDPORT} || 9481,
107 timeout => 3600 * 2, 109 timeout => 3600 * 2,
110 keepalive => 9 * 60,
108 name => time.rand.rand.rand, # lame 111 name => time.rand.rand.rand, # lame
109 @_, 112 @_,
110 queue => [], 113 queue => [],
111 req => {}, 114 req => {},
112 prefix => "..:aefcpid-$rand:", 115 prefix => "..:aefcpid:$rand:",
113 idseq => "a0", 116 idseq => "a0",
114 }, $class; 117 }, $class;
115 118
116 { 119 {
117 Scalar::Util::weaken (my $self = $self); 120 Scalar::Util::weaken (my $self = $self);
121
122 $self->{kw} = AE::timer $self->{keepalive}, $self->{keepalive}, sub {
123 $self->{hdl}->push_write ("\n");
124 };
125
126 our $ENDMESSAGE = qr<\012(EndMessage|Data)\012>;
127
128 # these are declared here for performance reasons
129 my ($k, $v, $type);
130 my $rdata;
131
132 my $on_read = sub {
133 my ($hdl) = @_;
134
135 # we only carve out whole messages here
136 while ($hdl->{rbuf} =~ /\012(EndMessage|Data)\012/) {
137 # remember end marker
138 $rdata = $1 eq "Data"
139 or $1 eq "EndMessage"
140 or return $self->fatal ("protocol error, expected message end, got $1\n");
141
142 my @lines = split /\012/, substr $hdl->{rbuf}, 0, $-[0];
143
144 substr $hdl->{rbuf}, 0, $+[0], ""; # remove pkg
145
146 $type = shift @lines;
147 $type = ($TOLC{$type} ||= tolc $type);
148
149 my %kv;
150
151 for (@lines) {
152 ($k, $v) = split /=/, $_, 2;
153 $k = ($TOLC{$k} ||= tolc $k);
154
155 if ($k =~ /\./) {
156 # generic, slow case
157 my @k = split /\./, $k;
158 my $ro = \\%kv;
159
160 while (@k) {
161 $k = shift @k;
162 if ($k =~ /^\d+$/) {
163 $ro = \$$ro->[$k];
164 } else {
165 $ro = \$$ro->{$k};
166 }
167 }
168
169 $$ro = $v;
170
171 next;
172 }
173
174 # special comon case, for performance only
175 $kv{$k} = $v;
176 }
177
178 if ($rdata) {
179 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
180 $rdata = \$_[1];
181 $self->recv ($type, \%kv, $rdata);
182 });
183
184 last; # do not tgry to parse more messages
185 } else {
186 $self->recv ($type, \%kv);
187 }
188 }
189 };
118 190
119 $self->{hdl} = new AnyEvent::Handle 191 $self->{hdl} = new AnyEvent::Handle
120 connect => [$self->{host} => $self->{port}], 192 connect => [$self->{host} => $self->{port}],
121 timeout => $self->{timeout}, 193 timeout => $self->{timeout},
194 on_read => $on_read,
195 on_eof => $self->{on_eof},
122 on_error => sub { 196 on_error => sub {
123 warn "@_\n";#d# 197 $self->fatal ($_[2]);
124 exit 1;
125 }, 198 },
126 on_read => sub { $self->on_read (@_) }, 199 ;
127 on_eof => $self->{on_eof} || sub { };
128 200
129 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 201 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
130 } 202 }
131 203
132 $self->send_msg (client_hello => 204 $self->send_msg (client_hello =>
133 name => $self->{name}, 205 name => $self->{name},
134 expected_version => "2.0", 206 expected_version => "2.0",
135 ); 207 );
136 208
137 $self 209 $self
210}
211
212sub fatal {
213 my ($self, $msg) = @_;
214
215 $self->{hdl}->shutdown;
216 delete $self->{kw};
217
218 if ($self->{on_error}) {
219 $self->{on_error}->($msg);
220 } else {
221 die $msg;
222 }
138} 223}
139 224
140sub identifier { 225sub identifier {
141 $_[0]{prefix} . ++$_[0]{idseq} 226 $_[0]{prefix} . ++$_[0]{idseq}
142} 227}
232 317
233 if (my $cb = $PERSISTENT_TYPE{$type}) { 318 if (my $cb = $PERSISTENT_TYPE{$type}) {
234 my $id = $kv->{identifier}; 319 my $id = $kv->{identifier};
235 my $req = $_[0]{req}{$id} ||= {}; 320 my $req = $_[0]{req}{$id} ||= {};
236 $cb->($self, $req, $kv); 321 $cb->($self, $req, $kv);
237 $self->recv (request_change => $kv, $type, @extra); 322 $self->recv (request_changed => $kv, $type, @extra);
238 } 323 }
239 324
240 my $on = $self->{on}; 325 my $on = $self->{on};
241 for (0 .. $#$on) { 326 for (0 .. $#$on) {
242 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) { 327 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
249 $cb->($self, $type, $kv, @extra) 334 $cb->($self, $type, $kv, @extra)
250 and shift @{ $self->{queue} }; 335 and shift @{ $self->{queue} };
251 } else { 336 } else {
252 $self->default_recv ($type, $kv, @extra); 337 $self->default_recv ($type, $kv, @extra);
253 } 338 }
254}
255
256sub on_read {
257 my ($self) = @_;
258
259 my $type;
260 my %kv;
261 my $rdata;
262
263 my $hdr_cb; $hdr_cb = sub {
264 if ($_[1] =~ /^([^=]+)=(.*)$/) {
265 my ($k, $v) = ($1, $2);
266 my @k = split /\./, tolc $k;
267 my $ro = \\%kv;
268
269 while (@k) {
270 my $k = shift @k;
271 if ($k =~ /^\d+$/) {
272 $ro = \$$ro->[$k];
273 } else {
274 $ro = \$$ro->{$k};
275 }
276 }
277
278 $$ro = $v;
279
280 $_[0]->push_read (line => $hdr_cb);
281 } elsif ($_[1] eq "Data") {
282 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
283 $rdata = \$_[1];
284 $self->recv ($type, \%kv, $rdata);
285 });
286 } elsif ($_[1] eq "EndMessage") {
287 $self->recv ($type, \%kv);
288 } else {
289 die "protocol error, expected message end, got $_[1]\n";#d#
290 }
291 };
292
293 $self->{hdl}->push_read (line => sub {
294 $type = tolc $_[1];
295 $_[0]->push_read (line => $hdr_cb);
296 });
297} 339}
298 340
299sub default_recv { 341sub default_recv {
300 my ($self, $type, $kv, $rdata) = @_; 342 my ($self, $type, $kv, $rdata) = @_;
301 343
394 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) { 436 } elsif (UNIVERSAL::isa $ok, AnyEvent::CondVar::) {
395 $err = sub { $ok->croak ($_[0]{extra_description}) }; 437 $err = sub { $ok->croak ($_[0]{extra_description}) };
396 } else { 438 } else {
397 my $bt = Carp::longmess ""; 439 my $bt = Carp::longmess "";
398 $err = sub { 440 $err = sub {
399 die "$_[0]{extra_description}$bt"; 441 die "$_[0]{code_description} ($_[0]{extra_description})$bt";
400 }; 442 };
401 } 443 }
402 444
403 $ok ||= $NOP_CB; 445 $ok ||= $NOP_CB;
404 446
549=cut 591=cut
550 592
551_txn get_plugin_info => sub { 593_txn get_plugin_info => sub {
552 my ($self, $ok, $err, $name, $detailed) = @_; 594 my ($self, $ok, $err, $name, $detailed) = @_;
553 595
596 my $id = $self->identifier;
597
554 $self->send_msg (get_plugin_info => 598 $self->send_msg (get_plugin_info =>
599 identifier => $id,
555 plugin_name => $name, 600 plugin_name => $name,
556 detailed => $detailed ? "true" : "false", 601 detailed => $detailed ? "true" : "false",
557 id_cb => sub {
558 my ($self, $type, $kv, $rdata) = @_;
559
560 $ok->($kv);
561 1
562 },
563 ); 602 );
603 $self->on (sub {
604 my ($self, $type, $kv) = @_;
605
606 if ($kv->{identifier} eq $id) {
607 if ($type eq "get_plugin_info") {
608 $ok->($kv);
609 } else {
610 $err->($kv, $type);
611 }
612 return;
613 }
614
615 1
616 });
564}; 617};
565 618
566=item $status = $fcp->client_get ($uri, $identifier, %kv) 619=item $status = $fcp->client_get ($uri, $identifier, %kv)
567 620
568%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>). 621%kv can contain (L<http://wiki.freenetproject.org/FCP2p0ClientGet>).

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines