ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FCP/FCP.pm
(Generate patch)

Comparing AnyEvent-FCP/FCP.pm (file contents):
Revision 1.15 by root, Fri Aug 14 03:33:13 2015 UTC vs.
Revision 1.18 by root, Thu Dec 3 19:07:57 2015 UTC

61 61
62use common::sense; 62use common::sense;
63 63
64use Carp; 64use Carp;
65 65
66our $VERSION = '0.3'; 66our $VERSION = 0.4;
67 67
68use Scalar::Util (); 68use Scalar::Util ();
69 69
70use AnyEvent; 70use AnyEvent;
71use AnyEvent::Handle; 71use AnyEvent::Handle;
72use AnyEvent::Util (); 72use AnyEvent::Util ();
73
74our %TOLC; # tolc cache
73 75
74sub touc($) { 76sub touc($) {
75 local $_ = shift; 77 local $_ = shift;
76 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/; 78 1 while s/((?:^|_)(?:svk|chk|uri|fcp|ds|mime|dda)(?:_|$))/\U$1/;
77 s/(?:^|_)(.)/\U$1/g; 79 s/(?:^|_)(.)/\U$1/g;
103 105
104 my $self = bless { 106 my $self = bless {
105 host => $ENV{FREDHOST} || "127.0.0.1", 107 host => $ENV{FREDHOST} || "127.0.0.1",
106 port => $ENV{FREDPORT} || 9481, 108 port => $ENV{FREDPORT} || 9481,
107 timeout => 3600 * 2, 109 timeout => 3600 * 2,
110 keepalive => 9 * 60,
108 name => time.rand.rand.rand, # lame 111 name => time.rand.rand.rand, # lame
109 @_, 112 @_,
110 queue => [], 113 queue => [],
111 req => {}, 114 req => {},
112 prefix => "..:aefcpid:$rand:", 115 prefix => "..:aefcpid:$rand:",
114 }, $class; 117 }, $class;
115 118
116 { 119 {
117 Scalar::Util::weaken (my $self = $self); 120 Scalar::Util::weaken (my $self = $self);
118 121
122 $self->{kw} = AE::timer $self->{keepalive}, $self->{keepalive}, sub {
123 $self->{hdl}->push_write ("\n");
124 };
125
126 our $ENDMESSAGE = qr<\012(EndMessage|Data)\012>;
127
128 # these are declared here for performance reasons
129 my ($k, $v, $type);
130 my $rdata;
131
132 my $on_read = sub {
133 my ($hdl) = @_;
134
135 # we only carve out whole messages here
136 while ($hdl->{rbuf} =~ /\012(EndMessage|Data)\012/) {
137 # remember end marker
138 $rdata = $1 eq "Data"
139 or $1 eq "EndMessage"
140 or die "protocol error, expected message end, got $1\n";
141
142 my @lines = split /\012/, substr $hdl->{rbuf}, 0, $-[0];
143
144 substr $hdl->{rbuf}, 0, $+[0], ""; # remove pkg
145
146 $type = shift @lines;
147 $type = ($TOLC{$type} ||= tolc $type);
148
149 my %kv;
150
151 for (@lines) {
152 ($k, $v) = split /=/, $_, 2;
153 $k = ($TOLC{$k} ||= tolc $k);
154
155 if ($k =~ /\./) {
156 # generic, slow case
157 my @k = split /\./, $k;
158 my $ro = \\%kv;
159
160 while (@k) {
161 $k = shift @k;
162 if ($k =~ /^\d+$/) {
163 $ro = \$$ro->[$k];
164 } else {
165 $ro = \$$ro->{$k};
166 }
167 }
168
169 $$ro = $v;
170
171 next;
172 }
173
174 # special comon case, for performance only
175 $kv{$k} = $v;
176 }
177
178 if ($rdata) {
179 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
180 $rdata = \$_[1];
181 $self->recv ($type, \%kv, $rdata);
182 });
183
184 last; # do not tgry to parse more messages
185 } else {
186 $self->recv ($type, \%kv);
187 }
188 }
189 };
190
119 $self->{hdl} = new AnyEvent::Handle 191 $self->{hdl} = new AnyEvent::Handle
120 connect => [$self->{host} => $self->{port}], 192 connect => [$self->{host} => $self->{port}],
121 timeout => $self->{timeout}, 193 timeout => $self->{timeout},
122 on_error => sub { 194 on_error => sub {
123 warn "@_\n";#d# 195 warn "$self->{host}: $_[2]\n";#d#
124 exit 1; 196 exit 1;
125 }, 197 },
126 on_read => sub { $self->on_read (@_) }, 198 on_read => $on_read,
127 on_eof => $self->{on_eof} || sub { }; 199 on_eof => $self->{on_eof} || sub { },
200 ;
128 201
129 Scalar::Util::weaken ($self->{hdl}{fcp} = $self); 202 Scalar::Util::weaken ($self->{hdl}{fcp} = $self);
130 } 203 }
131 204
132 $self->send_msg (client_hello => 205 $self->send_msg (client_hello =>
232 305
233 if (my $cb = $PERSISTENT_TYPE{$type}) { 306 if (my $cb = $PERSISTENT_TYPE{$type}) {
234 my $id = $kv->{identifier}; 307 my $id = $kv->{identifier};
235 my $req = $_[0]{req}{$id} ||= {}; 308 my $req = $_[0]{req}{$id} ||= {};
236 $cb->($self, $req, $kv); 309 $cb->($self, $req, $kv);
237 $self->recv (request_change => $kv, $type, @extra); 310 $self->recv (request_changed => $kv, $type, @extra);
238 } 311 }
239 312
240 my $on = $self->{on}; 313 my $on = $self->{on};
241 for (0 .. $#$on) { 314 for (0 .. $#$on) {
242 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) { 315 unless (my $res = $on->[$_]($self, $type, $kv, @extra)) {
249 $cb->($self, $type, $kv, @extra) 322 $cb->($self, $type, $kv, @extra)
250 and shift @{ $self->{queue} }; 323 and shift @{ $self->{queue} };
251 } else { 324 } else {
252 $self->default_recv ($type, $kv, @extra); 325 $self->default_recv ($type, $kv, @extra);
253 } 326 }
254}
255
256sub on_read {
257 my ($self) = @_;
258
259 my $type;
260 my %kv;
261 my $rdata;
262
263 my $hdr_cb; $hdr_cb = sub {
264 if ($_[1] =~ /^([^=]+)=(.*)$/) {
265 my ($k, $v) = ($1, $2);
266 my @k = split /\./, tolc $k;
267 my $ro = \\%kv;
268
269 while (@k) {
270 my $k = shift @k;
271 if ($k =~ /^\d+$/) {
272 $ro = \$$ro->[$k];
273 } else {
274 $ro = \$$ro->{$k};
275 }
276 }
277
278 $$ro = $v;
279
280 $_[0]->push_read (line => $hdr_cb);
281 } elsif ($_[1] eq "Data") {
282 $_[0]->push_read (chunk => delete $kv{data_length}, sub {
283 $rdata = \$_[1];
284 $self->recv ($type, \%kv, $rdata);
285 });
286 } elsif ($_[1] eq "EndMessage") {
287 $self->recv ($type, \%kv);
288 } else {
289 die "protocol error, expected message end, got $_[1]\n";#d#
290 }
291 };
292
293 $self->{hdl}->push_read (line => sub {
294 $type = tolc $_[1];
295 $_[0]->push_read (line => $hdr_cb);
296 });
297} 327}
298 328
299sub default_recv { 329sub default_recv {
300 my ($self, $type, $kv, $rdata) = @_; 330 my ($self, $type, $kv, $rdata) = @_;
301 331

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines