ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
(Generate patch)

Comparing AnyEvent-MP/MP.pm (file contents):
Revision 1.7 by root, Sat Aug 1 15:04:30 2009 UTC vs.
Revision 1.21 by root, Tue Aug 4 14:10:51 2009 UTC

28 28
29Despite its simplicity, you can securely message other processes running 29Despite its simplicity, you can securely message other processes running
30on the same or other hosts. 30on the same or other hosts.
31 31
32At the moment, this module family is severly brokena nd underdocumented, 32At the moment, this module family is severly brokena nd underdocumented,
33so do not use. This was uploaded mainly to resreve the CPAN namespace - 33so do not use. This was uploaded mainly to reserve the CPAN namespace -
34stay tuned! 34stay tuned!
35 35
36=head1 CONCEPTS 36=head1 CONCEPTS
37 37
38=over 4 38=over 4
72 72
73=cut 73=cut
74 74
75package AnyEvent::MP; 75package AnyEvent::MP;
76 76
77use AnyEvent::MP::Util ();
78use AnyEvent::MP::Node; 77use AnyEvent::MP::Base;
79use AnyEvent::MP::Transport;
80 78
81use utf8;
82use common::sense; 79use common::sense;
83 80
84use Carp (); 81use Carp ();
85 82
86use AE (); 83use AE ();
87 84
88use base "Exporter"; 85use base "Exporter";
89 86
90our $VERSION = '0.01'; 87our $VERSION = '0.02';
91our @EXPORT = qw(NODE $NODE $PORT snd rcv _any_); 88our @EXPORT = qw(
92 89 NODE $NODE $PORT snd rcv mon kil _any_
93our $DEFAULT_SECRET; 90 create_port create_port_on
94our $DEFAULT_PORT = "4040"; 91 miniport
95 92 become_slave become_public
96our $CONNECT_INTERVAL = 5; # new connect every 5s, at least 93);
97our $CONNECT_TIMEOUT = 30; # includes handshake
98
99sub default_secret {
100 unless (defined $DEFAULT_SECRET) {
101 if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
102 sysread $fh, $DEFAULT_SECRET, -s $fh;
103 } else {
104 $DEFAULT_SECRET = AnyEvent::MP::Util::nonce 32;
105 }
106 }
107
108 $DEFAULT_SECRET
109}
110 94
111=item NODE / $NODE 95=item NODE / $NODE
112 96
113The C<NODE ()> function and the C<$NODE> variable contain the noderef of 97The C<NODE ()> function and the C<$NODE> variable contain the noderef of
114the local node. The value is initialised by a call to C<become_public> or 98the local node. The value is initialised by a call to C<become_public> or
115C<become_slave>, after which all local port identifiers become invalid. 99C<become_slave>, after which all local port identifiers become invalid.
116 100
117=cut
118
119our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie
120our $ID = "a0";
121our $PUBLIC = 0;
122our $NODE;
123our $PORT;
124
125our %NODE; # node id to transport mapping, or "undef", for local node
126our %PORT; # local ports
127our %LISTENER; # local transports
128
129sub NODE() { $NODE }
130
131{
132 use POSIX ();
133 my $nodename = (POSIX::uname)[1];
134 $NODE = "$$\@$nodename";
135}
136
137sub _ANY_() { 1 }
138sub _any_() { \&_ANY_ }
139
140sub add_node {
141 my ($noderef) = @_;
142
143 return $NODE{$noderef}
144 if exists $NODE{$noderef};
145
146 for (split /,/, $noderef) {
147 return $NODE{$noderef} = $NODE{$_}
148 if exists $NODE{$_};
149 }
150
151 # for indirect sends, use a different class
152 my $node = new AnyEvent::MP::Node::Direct $noderef;
153
154 $NODE{$_} = $node
155 for $noderef, split /,/, $noderef;
156
157 $node
158}
159
160=item snd $portid, type => @data 101=item snd $portid, type => @data
161 102
162=item snd $portid, @msg 103=item snd $portid, @msg
163 104
164Send the given message to the given port ID, which can identify either a 105Send the given message to the given port ID, which can identify either
165local or a remote port. 106a local or a remote port, and can be either a string or soemthignt hat
107stringifies a sa port ID (such as a port object :).
166 108
167While the message can be about anything, it is highly recommended to use 109While the message can be about anything, it is highly recommended to use a
168a constant string as first element. 110string as first element (a portid, or some word that indicates a request
111type etc.).
169 112
170The message data effectively becomes read-only after a call to this 113The message data effectively becomes read-only after a call to this
171function: modifying any argument is not allowed and can cause many 114function: modifying any argument is not allowed and can cause many
172problems. 115problems.
173 116
175JSON is used, then only strings, numbers and arrays and hashes consisting 118JSON is used, then only strings, numbers and arrays and hashes consisting
176of those are allowed (no objects). When Storable is used, then anything 119of those are allowed (no objects). When Storable is used, then anything
177that Storable can serialise and deserialise is allowed, and for the local 120that Storable can serialise and deserialise is allowed, and for the local
178node, anything can be passed. 121node, anything can be passed.
179 122
180=cut 123=item $guard = mon $portid, $cb->()
181 124
182sub snd(@) { 125=item $guard = mon $portid, $otherport
126
127=item $guard = mon $portid, $otherport, @msg
128
129Monitor the given port and call the given callback when the port is
130destroyed or connection to it's node is lost.
131
132#TODO
133
134=cut
135
136sub mon {
183 my ($noderef, $port) = split /#/, shift, 2; 137 my ($noderef, $port, $cb) = ((split /#/, shift, 2), shift);
184 138
185 add_node $noderef 139 my $node = AnyEvent::MP::Base::add_node $noderef;
186 unless exists $NODE{$noderef};
187 140
188 $NODE{$noderef}->send (["$port", [@_]]); 141 #TODO: ports must not be references
189} 142 if (!ref $cb or "AnyEvent::MP::Port" eq ref $cb) {
143 if (@_) {
144 # send a kill info message
145 my (@msg) = ($cb, @_);
146 $cb = sub { snd @msg, @_ };
147 } else {
148 # simply kill other port
149 my $port = $cb;
150 $cb = sub { kil $port, @_ };
151 }
152 }
190 153
154 $node->monitor ($port, $cb);
155
156 defined wantarray
157 and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) }
158}
159
160=item $guard = mon_guard $port, $ref, $ref...
161
162Monitors the given C<$port> and keeps the passed references. When the port
163is killed, the references will be freed.
164
165Optionally returns a guard that will stop the monitoring.
166
167This function is useful when you create e.g. timers or other watchers and
168want to free them when the port gets killed:
169
170 $port->rcv (start => sub {
171 my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub {
172 undef $timer if 0.9 < rand;
173 });
174 });
175
176=cut
177
178sub mon_guard {
179 my ($port, @refs) = @_;
180
181 mon $port, sub { 0 && @refs }
182}
183
184=item $local_port = create_port
185
186Create a new local port object. See the next section for allowed methods.
187
188=cut
189
190sub create_port {
191 my $id = "$AnyEvent::MP::Base::UNIQ." . $AnyEvent::MP::Base::ID++;
192
193 my $self = bless {
194 id => "$NODE#$id",
195 }, "AnyEvent::MP::Port";
196
197 $AnyEvent::MP::Base::PORT{$id} = sub {
198 unshift @_, $self;
199
200 for (@{ $self->{rc0}{$_[1]} }) {
201 $_ && &{$_->[0]}
202 && undef $_;
203 }
204
205 for (@{ $self->{rcv}{$_[1]} }) {
206 $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
207 && &{$_->[0]}
208 && undef $_;
209 }
210
211 for (@{ $self->{any} }) {
212 $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
213 && &{$_->[0]}
214 && undef $_;
215 }
216 };
217
218 $self
219}
220
221=item $portid = miniport { my @msg = @_; $finished }
222
223Creates a "mini port", that is, a very lightweight port without any
224pattern matching behind it, and returns its ID.
225
226The block will be called for every message received on the port. When the
227callback returns a true value its job is considered "done" and the port
228will be destroyed. Otherwise it will stay alive.
229
230The message will be passed as-is, no extra argument (i.e. no port id) will
231be passed to the callback.
232
233If you need the local port id in the callback, this works nicely:
234
235 my $port; $port = miniport {
236 snd $otherport, reply => $port;
237 };
238
239=cut
240
241sub miniport(&) {
242 my $cb = shift;
243 my $id = "$AnyEvent::MP::Base::UNIQ." . $AnyEvent::MP::Base::ID++;
244
245 $AnyEvent::MP::Base::PORT{$id} = sub {
246 &$cb
247 and kil $id;
248 };
249
250 "$NODE#$id"
251}
252
253package AnyEvent::MP::Port;
254
255=back
256
257=head1 METHODS FOR PORT OBJECTS
258
259=over 4
260
261=item "$port"
262
263A port object stringifies to its port ID, so can be used directly for
264C<snd> operations.
265
266=cut
267
268use overload
269 '""' => sub { $_[0]{id} },
270 fallback => 1;
271
272sub TO_JSON { $_[0]{id} }
273
191=item rcv $portid, type => $callback->(@msg) 274=item $port->rcv (type => $callback->($port, @msg))
192 275
193=item rcv $portid, $smartmatch => $callback->(@msg) 276=item $port->rcv ($smartmatch => $callback->($port, @msg))
194 277
195=item rcv $portid, [$smartmatch...] => $callback->(@msg) 278=item $port->rcv ([$smartmatch...] => $callback->($port, @msg))
196 279
197Register a callback on the port identified by C<$portid>, which I<must> be 280Register a callback on the given port.
198a local port.
199 281
200The callback has to return a true value when its work is done, after 282The callback has to return a true value when its work is done, after
201which is will be removed, or a false value in which case it will stay 283which is will be removed, or a false value in which case it will stay
202registered. 284registered.
203 285
213also the most efficient match (by far). 295also the most efficient match (by far).
214 296
215=cut 297=cut
216 298
217sub rcv($@) { 299sub rcv($@) {
218 my ($port, $match, $cb) = @_; 300 my ($self, $match, $cb) = @_;
219
220 my $port = $PORT{$port}
221 or do {
222 my ($noderef, $lport) = split /#/, $port;
223 "AnyEvent::MP::Node::Self" eq ref $NODE{$noderef}
224 or Carp::croak "$port: can only rcv on local ports";
225
226 $PORT{$lport}
227 or Carp::croak "$port: port does not exist";
228
229 $PORT{$port} = $PORT{$lport} # also return
230 };
231 301
232 if (!ref $match) { 302 if (!ref $match) {
233 push @{ $port->{rc0}{$match} }, [$cb]; 303 push @{ $self->{rc0}{$match} }, [$cb];
234 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) { 304 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
235 my ($type, @match) = @$match; 305 my ($type, @match) = @$match;
236 @match 306 @match
237 ? push @{ $port->{rcv}{$match->[0]} }, [$cb, \@match] 307 ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
238 : push @{ $port->{rc0}{$match->[0]} }, [$cb]; 308 : push @{ $self->{rc0}{$match->[0]} }, [$cb];
239 } else { 309 } else {
240 push @{ $port->{any} }, [$cb, $match]; 310 push @{ $self->{any} }, [$cb, $match];
241 } 311 }
242} 312}
243 313
244sub _inject { 314=item $port->register ($name)
245 my ($port, $msg) = @{+shift};
246 315
247 $port = $PORT{$port} 316Registers the given port under the well known name C<$name>. If the name
248 or return; 317already exists it is replaced.
249 318
250 @_ = @$msg; 319A port can only be registered under one well known name.
251 320
252 for (@{ $port->{rc0}{$msg->[0]} }) { 321=cut
253 $_ && &{$_->[0]}
254 && undef $_;
255 }
256 322
257 for (@{ $port->{rcv}{$msg->[0]} }) { 323sub register {
258 $_ && [@_[1..$#{$_->[1]}]] ~~ $_->[1] 324 my ($self, $name) = @_;
259 && &{$_->[0]}
260 && undef $_;
261 }
262 325
263 for (@{ $port->{any} }) { 326 $self->{wkname} = $name;
264 $_ && [@_[0..$#{$_->[1]}]] ~~ $_->[1] 327 $AnyEvent::MP::Base::WKP{$name} = "$self";
265 && &{$_->[0]}
266 && undef $_;
267 }
268} 328}
269 329
270sub normalise_noderef($) { 330=item $port->destroy
331
332Explicitly destroy/remove/nuke/vaporise the port.
333
334Ports are normally kept alive by their mere existance alone, and need to
335be destroyed explicitly.
336
337=cut
338
339sub destroy {
271 my ($noderef) = @_; 340 my ($self) = @_;
272 341
273 my $cv = AE::cv; 342 delete $AnyEvent::MP::Base::WKP{ $self->{wkname} };
274 my @res;
275 343
276 $cv->begin (sub { 344 AnyEvent::MP::Base::kil $self->{id};
277 my %seen;
278 my @refs;
279 for (sort { $a->[0] <=> $b->[0] } @res) {
280 push @refs, $_->[1] unless $seen{$_->[1]}++
281 }
282 shift->send (join ",", @refs);
283 });
284
285 $noderef = $DEFAULT_PORT unless length $noderef;
286
287 my $idx;
288 for my $t (split /,/, $noderef) {
289 my $pri = ++$idx;
290
291 #TODO: this should be outside normalise_noderef and in become_public
292 if ($t =~ /^\d*$/) {
293 my $nodename = (POSIX::uname)[1];
294
295 $cv->begin;
296 AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
297 for (@_) {
298 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
299 push @res, [
300 $pri += 1e-5,
301 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
302 ];
303 }
304 $cv->end;
305 };
306
307# my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
308#
309# for (@ipv4) {
310# push @res, [
311# $pri,
312# AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
313# ];
314# }
315 } else {
316 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
317 or Carp::croak "$t: unparsable transport descriptor";
318
319 $cv->begin;
320 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
321 for (@_) {
322 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
323 push @res, [
324 $pri += 1e-5,
325 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
326 ];
327 }
328 $cv->end;
329 }
330 }
331 }
332
333 $cv->end;
334
335 $cv
336} 345}
337 346
338sub become_public { 347=back
339 return if $PUBLIC;
340 348
341 my $noderef = join ",", ref $_[0] ? @{+shift} : shift; 349=head1 FUNCTIONS FOR NODES
342 my @args = @_;
343 350
344 $NODE = (normalise_noderef $noderef)->recv; 351=over 4
345 352
346 for my $t (split /,/, $NODE) { 353=item mon $noderef, $callback->($noderef, $status, $)
347 $NODE{$t} = $NODE{""};
348 354
349 my ($host, $port) = AnyEvent::Socket::parse_hostport $t; 355Monitors the given noderef.
350 356
351 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port, 357=item become_public endpoint...
352 @args,
353 on_error => sub {
354 die "on_error<@_>\n";#d#
355 },
356 on_connect => sub {
357 my ($tp) = @_;
358 358
359 $NODE{$tp->{remote_id}} = $_[0]; 359Tells the node to become a public node, i.e. reachable from other nodes.
360 },
361 sub {
362 my ($tp) = @_;
363 360
364 $NODE{"$tp->{peerhost}:$tp->{peerport}"} = $tp; 361If no arguments are given, or the first argument is C<undef>, then
365 }, 362AnyEvent::MP tries to bind on port C<4040> on all IP addresses that the
366 ; 363local nodename resolves to.
367 }
368 364
369 $PUBLIC = 1; 365Otherwise the first argument must be an array-reference with transport
370} 366endpoints ("ip:port", "hostname:port") or port numbers (in which case the
367local nodename is used as hostname). The endpoints are all resolved and
368will become the node reference.
369
370=cut
371 371
372=back 372=back
373 373
374=head1 NODE MESSAGES 374=head1 NODE MESSAGES
375 375
380 380
381=over 4 381=over 4
382 382
383=cut 383=cut
384 384
385############################################################################# 385=item wkp => $name, @reply
386# self node code
387 386
388sub _new_port($) { 387Replies with the port ID of the specified well-known port, or C<undef>.
389 my ($name) = @_;
390
391 my ($noderef, $portname) = split /#/, $name;
392
393 $PORT{$name} =
394 $PORT{$portname} = {
395 names => [$name, $portname],
396 };
397}
398
399$NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE;
400_new_port "";
401 388
402=item devnull => ... 389=item devnull => ...
403 390
404Generic data sink/CPU heat conversion. 391Generic data sink/CPU heat conversion.
405 392
406=cut
407
408rcv "", devnull => sub { () };
409
410=item relay => $port, @msg 393=item relay => $port, @msg
411 394
412Simply forwards the message to the given port. 395Simply forwards the message to the given port.
413
414=cut
415
416rcv "", relay => sub { \&snd; () };
417 396
418=item eval => $string[ @reply] 397=item eval => $string[ @reply]
419 398
420Evaluates the given string. If C<@reply> is given, then a message of the 399Evaluates the given string. If C<@reply> is given, then a message of the
421form C<@reply, $@, @evalres> is sent. 400form C<@reply, $@, @evalres> is sent.
422 401
423Example: crash another node. 402Example: crash another node.
424 403
425 snd $othernode, eval => "exit"; 404 snd $othernode, eval => "exit";
426 405
427=cut
428
429rcv "", eval => sub {
430 my (undef, $string, @reply) = @_;
431 my @res = eval $string;
432 snd @reply, "$@", @res if @reply;
433 ()
434};
435
436=item time => @reply 406=item time => @reply
437 407
438Replies the the current node time to C<@reply>. 408Replies the the current node time to C<@reply>.
439 409
440Example: tell the current node to send the current time to C<$myport> in a 410Example: tell the current node to send the current time to C<$myport> in a
441C<timereply> message. 411C<timereply> message.
442 412
443 snd $NODE, time => $myport, timereply => 1, 2; 413 snd $NODE, time => $myport, timereply => 1, 2;
444 # => snd $myport, timereply => 1, 2, <time> 414 # => snd $myport, timereply => 1, 2, <time>
445 415
446=cut
447
448rcv "", time => sub { shift; snd @_, AE::time; () };
449
450=back 416=back
451 417
452=head1 SEE ALSO 418=head1 SEE ALSO
453 419
454L<AnyEvent>. 420L<AnyEvent>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines