ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
(Generate patch)

Comparing AnyEvent-MP/MP.pm (file contents):
Revision 1.4 by root, Sat Aug 1 07:36:30 2009 UTC vs.
Revision 1.22 by root, Tue Aug 4 18:33:30 2009 UTC

4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::MP; 7 use AnyEvent::MP;
8 8
9 NODE # returns this node identifier
10 $NODE # contains this node identifier 9 $NODE # contains this node's noderef
10 NODE # returns this node's noderef
11 NODE $port # returns the noderef of the port
11 12
12 snd $port, type => data...; 13 snd $port, type => data...;
14
15 $SELF # receiving/own port id in rcv callbacks
13 16
14 rcv $port, smartmatch => $cb->($port, @msg); 17 rcv $port, smartmatch => $cb->($port, @msg);
15 18
16 # examples: 19 # examples:
17 rcv $port2, ping => sub { snd $_[0], "pong"; 0 }; 20 rcv $port2, ping => sub { snd $_[0], "pong"; 0 };
27This module (-family) implements a simple message passing framework. 30This module (-family) implements a simple message passing framework.
28 31
29Despite its simplicity, you can securely message other processes running 32Despite its simplicity, you can securely message other processes running
30on the same or other hosts. 33on the same or other hosts.
31 34
35At the moment, this module family is severly brokena nd underdocumented,
36so do not use. This was uploaded mainly to reserve the CPAN namespace -
37stay tuned!
38
32=head1 CONCEPTS 39=head1 CONCEPTS
33 40
34=over 4 41=over 4
35 42
36=item port 43=item port
52 59
53Initially, nodes are either private (single-process only) or hidden 60Initially, nodes are either private (single-process only) or hidden
54(connected to a master node only). Only when they epxlicitly "become 61(connected to a master node only). Only when they epxlicitly "become
55public" can you send them messages from unrelated other nodes. 62public" can you send them messages from unrelated other nodes.
56 63
57=item noderef - C<host:port,host:port...>, C<id@noderef, C<id> 64=item noderef - C<host:port,host:port...>, C<id@noderef>, C<id>
58 65
59A noderef is a string that either uniquely identifies a given node (for 66A noderef is a string that either uniquely identifies a given node (for
60private and hidden nodes), or contains a recipe on how to reach a given 67private and hidden nodes), or contains a recipe on how to reach a given
61node (for public nodes). 68node (for public nodes).
62 69
68 75
69=cut 76=cut
70 77
71package AnyEvent::MP; 78package AnyEvent::MP;
72 79
73use AnyEvent::MP::Util ();
74use AnyEvent::MP::Node; 80use AnyEvent::MP::Base;
75use AnyEvent::MP::Transport;
76 81
77use utf8;
78use common::sense; 82use common::sense;
79 83
80use Carp (); 84use Carp ();
81 85
82use AE (); 86use AE ();
83 87
84use base "Exporter"; 88use base "Exporter";
85 89
86our $VERSION = '0.0'; 90our $VERSION = '0.02';
87our @EXPORT = qw(NODE $NODE $PORT snd rcv _any_); 91our @EXPORT = qw(
92 NODE $NODE *SELF node_of _any_
93 become_slave become_public
94 snd rcv mon kil reg psub
95 port
96);
88 97
89our $DEFAULT_SECRET; 98our $SELF;
90our $DEFAULT_PORT = "4040";
91 99
92our $CONNECT_INTERVAL = 5; # new connect every 5s, at least 100sub _self_die() {
93our $CONNECT_TIMEOUT = 30; # includes handshake 101 my $msg = $@;
94 102 $msg =~ s/\n+$// unless ref $msg;
95sub default_secret { 103 kil $SELF, die => $msg;
96 unless (defined $DEFAULT_SECRET) {
97 if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
98 sysread $fh, $DEFAULT_SECRET, -s $fh;
99 } else {
100 $DEFAULT_SECRET = AnyEvent::MP::Util::nonce 32;
101 }
102 }
103
104 $DEFAULT_SECRET
105} 104}
106 105
107=item NODE / $NODE 106=item $thisnode = NODE / $NODE
108 107
109The C<NODE ()> function and the C<$NODE> variable contain the noderef of 108The C<NODE> function returns, and the C<$NODE> variable contains
110the local node. The value is initialised by a call to C<become_public> or 109the noderef of the local node. The value is initialised by a call
111C<become_slave>, after which all local port identifiers become invalid. 110to C<become_public> or C<become_slave>, after which all local port
111identifiers become invalid.
112 112
113=cut 113=item $noderef = node_of $portid
114 114
115our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie 115Extracts and returns the noderef from a portid or a noderef.
116our $PUBLIC = 0;
117our $NODE;
118our $PORT;
119 116
120our %NODE; # node id to transport mapping, or "undef", for local node 117=item $SELF
121our %PORT; # local ports
122our %LISTENER; # local transports
123 118
124sub NODE() { $NODE } 119Contains the current port id while executing C<rcv> callbacks or C<psub>
120blocks.
125 121
126{ 122=item SELF, %SELF, @SELF...
127 use POSIX ();
128 my $nodename = (POSIX::uname)[1];
129 $NODE = "$$\@$nodename";
130}
131 123
132sub _ANY_() { 1 } 124Due to some quirks in how perl exports variables, it is impossible to
133sub _any_() { \&_ANY_ } 125just export C<$SELF>, all the symbols called C<SELF> are exported by this
134 126module, but only C<$SELF> is currently used.
135sub add_node {
136 my ($noderef) = @_;
137
138 return $NODE{$noderef}
139 if exists $NODE{$noderef};
140
141 for (split /,/, $noderef) {
142 return $NODE{$noderef} = $NODE{$_}
143 if exists $NODE{$_};
144 }
145
146 # for indirect sends, use a different class
147 my $node = new AnyEvent::MP::Node::Direct $noderef;
148
149 $NODE{$_} = $node
150 for $noderef, split /,/, $noderef;
151
152 $node
153}
154 127
155=item snd $portid, type => @data 128=item snd $portid, type => @data
156 129
157=item snd $portid, @msg 130=item snd $portid, @msg
158 131
159Send the given message to the given port ID, which can identify either a 132Send the given message to the given port ID, which can identify either
160local or a remote port. 133a local or a remote port, and can be either a string or soemthignt hat
134stringifies a sa port ID (such as a port object :).
161 135
162While the message can be about anything, it is highly recommended to use 136While the message can be about anything, it is highly recommended to use a
163a constant string as first element. 137string as first element (a portid, or some word that indicates a request
138type etc.).
164 139
165The message data effectively becomes read-only after a call to this 140The message data effectively becomes read-only after a call to this
166function: modifying any argument is not allowed and can cause many 141function: modifying any argument is not allowed and can cause many
167problems. 142problems.
168 143
170JSON is used, then only strings, numbers and arrays and hashes consisting 145JSON is used, then only strings, numbers and arrays and hashes consisting
171of those are allowed (no objects). When Storable is used, then anything 146of those are allowed (no objects). When Storable is used, then anything
172that Storable can serialise and deserialise is allowed, and for the local 147that Storable can serialise and deserialise is allowed, and for the local
173node, anything can be passed. 148node, anything can be passed.
174 149
175=cut 150=item kil $portid[, @reason]
176 151
177sub snd(@) { 152Kill the specified port with the given C<@reason>.
153
154If no C<@reason> is specified, then the port is killed "normally" (linked
155ports will not be kileld, or even notified).
156
157Otherwise, linked ports get killed with the same reason (second form of
158C<mon>, see below).
159
160Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
161will be reported as reason C<< die => $@ >>.
162
163Transport/communication errors are reported as C<< transport_error =>
164$message >>.
165
166=item $guard = mon $portid, $cb->(@reason)
167
168=item $guard = mon $portid, $otherport
169
170=item $guard = mon $portid, $otherport, @msg
171
172Monitor the given port and do something when the port is killed.
173
174In the first form, the callback is simply called with any number
175of C<@reason> elements (no @reason means that the port was deleted
176"normally"). Note also that I<< the callback B<must> never die >>, so use
177C<eval> if unsure.
178
179In the second form, the other port will be C<kil>'ed with C<@reason>, iff
180a @reason was specified, i.e. on "normal" kils nothing happens, while
181under all other conditions, the other port is killed with the same reason.
182
183In the last form, a message of the form C<@msg, @reason> will be C<snd>.
184
185Example: call a given callback when C<$port> is killed.
186
187 mon $port, sub { warn "port died because of <@_>\n" };
188
189Example: kill ourselves when C<$port> is killed abnormally.
190
191 mon $port, $self;
192
193Example: send us a restart message another C<$port> is killed.
194
195 mon $port, $self => "restart";
196
197=cut
198
199sub mon {
178 my ($noderef, $port) = split /#/, shift, 2; 200 my ($noderef, $port, $cb) = ((split /#/, shift, 2), shift);
179 201
180 add_node $noderef 202 my $node = $NODE{$noderef} || add_node $noderef;
181 unless exists $NODE{$noderef};
182 203
183 $NODE{$noderef}->send (["$port", [@_]]); 204 #TODO: ports must not be references
184} 205 if (!ref $cb or "AnyEvent::MP::Port" eq ref $cb) {
206 if (@_) {
207 # send a kill info message
208 my (@msg) = ($cb, @_);
209 $cb = sub { snd @msg, @_ };
210 } else {
211 # simply kill other port
212 my $port = $cb;
213 $cb = sub { kil $port, @_ if @_ };
214 }
215 }
185 216
217 $node->monitor ($port, $cb);
218
219 defined wantarray
220 and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) }
221}
222
223=item $guard = mon_guard $port, $ref, $ref...
224
225Monitors the given C<$port> and keeps the passed references. When the port
226is killed, the references will be freed.
227
228Optionally returns a guard that will stop the monitoring.
229
230This function is useful when you create e.g. timers or other watchers and
231want to free them when the port gets killed:
232
233 $port->rcv (start => sub {
234 my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub {
235 undef $timer if 0.9 < rand;
236 });
237 });
238
239=cut
240
241sub mon_guard {
242 my ($port, @refs) = @_;
243
244 mon $port, sub { 0 && @refs }
245}
246
247=item $local_port = port
248
249Create a new local port object that supports message matching.
250
251=item $portid = port { my @msg = @_; $finished }
252
253Creates a "mini port", that is, a very lightweight port without any
254pattern matching behind it, and returns its ID.
255
256The block will be called for every message received on the port. When the
257callback returns a true value its job is considered "done" and the port
258will be destroyed. Otherwise it will stay alive.
259
260The message will be passed as-is, no extra argument (i.e. no port id) will
261be passed to the callback.
262
263If you need the local port id in the callback, this works nicely:
264
265 my $port; $port = miniport {
266 snd $otherport, reply => $port;
267 };
268
269=cut
270
271sub port(;&) {
272 my $id = "$UNIQ." . $ID++;
273 my $port = "$NODE#$id";
274
275 if (@_) {
276 my $cb = shift;
277 $PORT{$id} = sub {
278 local $SELF = $port;
279 eval {
280 &$cb
281 and kil $id;
282 };
283 _self_die if $@;
284 };
285 } else {
286 my $self = bless {
287 id => "$NODE#$id",
288 }, "AnyEvent::MP::Port";
289
290 $PORT_DATA{$id} = $self;
291 $PORT{$id} = sub {
292 local $SELF = $port;
293
294 eval {
295 for (@{ $self->{rc0}{$_[0]} }) {
296 $_ && &{$_->[0]}
297 && undef $_;
298 }
299
300 for (@{ $self->{rcv}{$_[0]} }) {
301 $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
302 && &{$_->[0]}
303 && undef $_;
304 }
305
306 for (@{ $self->{any} }) {
307 $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
308 && &{$_->[0]}
309 && undef $_;
310 }
311 };
312 _self_die if $@;
313 };
314 }
315
316 $port
317}
318
319=item reg $portid, $name
320
321Registers the given port under the name C<$name>. If the name already
322exists it is replaced.
323
324A port can only be registered under one well known name.
325
326A port automatically becomes unregistered when it is killed.
327
328=cut
329
330sub reg(@) {
331 my ($portid, $name) = @_;
332
333 $REG{$name} = $portid;
334}
335
186=item rcv $portid, type => $callback->(@msg) 336=item rcv $portid, tagstring => $callback->(@msg), ...
187 337
188=item rcv $portid, $smartmatch => $callback->(@msg) 338=item rcv $portid, $smartmatch => $callback->(@msg), ...
189 339
190=item rcv $portid, [$smartmatch...] => $callback->(@msg) 340=item rcv $portid, [$smartmatch...] => $callback->(@msg), ...
191 341
192Register a callback on the port identified by C<$portid>, which I<must> be 342Register callbacks to be called on matching messages on the given port.
193a local port.
194 343
195The callback has to return a true value when its work is done, after 344The callback has to return a true value when its work is done, after
196which is will be removed, or a false value in which case it will stay 345which is will be removed, or a false value in which case it will stay
197registered. 346registered.
198 347
348The global C<$SELF> (exported by this module) contains C<$portid> while
349executing the callback.
350
351Runtime errors wdurign callback execution will result in the port being
352C<kil>ed.
353
199If the match is an array reference, then it will be matched against the 354If the match is an array reference, then it will be matched against the
200first elements of the message, otherwise only the first element is being 355first elements of the message, otherwise only the first element is being
201matched. 356matched.
202 357
203Any element in the match that is specified as C<_any_> (a function 358Any element in the match that is specified as C<_any_> (a function
208also the most efficient match (by far). 363also the most efficient match (by far).
209 364
210=cut 365=cut
211 366
212sub rcv($@) { 367sub rcv($@) {
213 my ($port, $match, $cb) = @_;
214
215 my $port = $PORT{$port}
216 or do {
217 my ($noderef, $lport) = split /#/, $port; 368 my ($noderef, $port) = split /#/, shift, 2;
218 "AnyEvent::MP::Node::Self" eq ref $NODE{$noderef} 369
370 ($NODE{$noderef} || add_node $noderef) == $NODE{""}
219 or Carp::croak "$port: can only rcv on local ports"; 371 or Carp::croak "$noderef#$port: rcv can only be called on local ports, caught";
220 372
221 $PORT{$lport} 373 my $self = $PORT_DATA{$port}
222 or Carp::croak "$port: port does not exist"; 374 or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught";
223
224 $PORT{$port} = $PORT{$lport} # also return
225 };
226 375
376 "AnyEvent::MP::Port" eq ref $self
377 or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught";
378
379 while (@_) {
380 my ($match, $cb) = splice @_, 0, 2;
381
227 if (!ref $match) { 382 if (!ref $match) {
228 push @{ $port->{rc0}{$match} }, [$cb]; 383 push @{ $self->{rc0}{$match} }, [$cb];
229 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) { 384 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
230 my ($type, @match) = @$match; 385 my ($type, @match) = @$match;
231 @match 386 @match
232 ? push @{ $port->{rcv}{$match->[0]} }, [$cb, \@match] 387 ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
233 : push @{ $port->{rc0}{$match->[0]} }, [$cb]; 388 : push @{ $self->{rc0}{$match->[0]} }, [$cb];
234 } else {
235 push @{ $port->{any} }, [$cb, $match];
236 }
237}
238
239sub _inject {
240 my ($port, $msg) = @{+shift};
241
242 $port = $PORT{$port}
243 or return;
244
245 @_ = @$msg;
246
247 for (@{ $port->{rc0}{$msg->[0]} }) {
248 $_ && &{$_->[0]}
249 && undef $_;
250 }
251
252 for (@{ $port->{rcv}{$msg->[0]} }) {
253 $_ && [@_[1..$#{$_->[1]}]] ~~ $_->[1]
254 && &{$_->[0]}
255 && undef $_;
256 }
257
258 for (@{ $port->{any} }) {
259 $_ && [@_[0..$#{$_->[1]}]] ~~ $_->[1]
260 && &{$_->[0]}
261 && undef $_;
262 }
263}
264
265sub normalise_noderef($) {
266 my ($noderef) = @_;
267
268 my $cv = AE::cv;
269 my @res;
270
271 $cv->begin (sub {
272 my %seen;
273 my @refs;
274 for (sort { $a->[0] <=> $b->[0] } @res) {
275 push @refs, $_->[1] unless $seen{$_->[1]}++
276 }
277 shift->send (join ",", @refs);
278 });
279
280 $noderef = $DEFAULT_PORT unless length $noderef;
281
282 my $idx;
283 for my $t (split /,/, $noderef) {
284 my $pri = ++$idx;
285
286 #TODO: this should be outside normalise_noderef and in become_public
287 if ($t =~ /^\d*$/) {
288 my $nodename = (POSIX::uname)[1];
289
290 $cv->begin;
291 AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
292 for (@_) {
293 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
294 push @res, [
295 $pri += 1e-5,
296 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
297 ];
298 }
299 $cv->end;
300 };
301
302# my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
303#
304# for (@ipv4) {
305# push @res, [
306# $pri,
307# AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
308# ];
309# }
310 } else { 389 } else {
311 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT" 390 push @{ $self->{any} }, [$cb, $match];
312 or Carp::croak "$t: unparsable transport descriptor";
313
314 $cv->begin;
315 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
316 for (@_) {
317 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
318 push @res, [
319 $pri += 1e-5,
320 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
321 ];
322 }
323 $cv->end;
324 }
325 } 391 }
326 } 392 }
327
328 $cv->end;
329
330 $cv
331} 393}
332 394
333sub become_public { 395=item $closure = psub { BLOCK }
334 return if $PUBLIC;
335 396
336 my $noderef = join ",", ref $_[0] ? @{+shift} : shift; 397Remembers C<$SELF> and creates a closure out of the BLOCK. When the
337 my @args = @_; 398closure is executed, sets up the environment in the same way as in C<rcv>
399callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
338 400
339 $NODE = (normalise_noderef $noderef)->recv; 401This is useful when you register callbacks from C<rcv> callbacks:
340 402
341 for my $t (split /,/, $NODE) { 403 rcv delayed_reply => sub {
342 $NODE{$t} = $NODE{""}; 404 my ($delay, @reply) = @_;
343 405 my $timer = AE::timer $delay, 0, psub {
344 my ($host, $port) = AnyEvent::Socket::parse_hostport $t; 406 snd @reply, $SELF;
345
346 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
347 @args,
348 on_error => sub {
349 die "on_error<@_>\n";#d#
350 },
351 on_connect => sub {
352 my ($tp) = @_;
353
354 $NODE{$tp->{remote_id}} = $_[0];
355 },
356 sub {
357 my ($tp) = @_;
358
359 $NODE{"$tp->{peerhost}:$tp->{peerport}"} = $tp;
360 },
361 ; 407 };
408 };
409
410=cut
411
412sub psub(&) {
413 my $cb = shift;
414
415 my $port = $SELF
416 or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
417
418 sub {
419 local $SELF = $port;
420
421 if (wantarray) {
422 my @res = eval { &$cb };
423 _self_die if $@;
424 @res
425 } else {
426 my $res = eval { &$cb };
427 _self_die if $@;
428 $res
429 }
362 } 430 }
363
364 $PUBLIC = 1;
365} 431}
366 432
367=back 433=back
368 434
435=head1 FUNCTIONS FOR NODES
436
437=over 4
438
439=item become_public endpoint...
440
441Tells the node to become a public node, i.e. reachable from other nodes.
442
443If no arguments are given, or the first argument is C<undef>, then
444AnyEvent::MP tries to bind on port C<4040> on all IP addresses that the
445local nodename resolves to.
446
447Otherwise the first argument must be an array-reference with transport
448endpoints ("ip:port", "hostname:port") or port numbers (in which case the
449local nodename is used as hostname). The endpoints are all resolved and
450will become the node reference.
451
452=cut
453
454=back
455
369=head1 NODE MESSAGES 456=head1 NODE MESSAGES
370 457
371Nodes understand the following messages sent to them: 458Nodes understand the following messages sent to them. Many of them take
459arguments called C<@reply>, which will simply be used to compose a reply
460message - C<$reply[0]> is the port to reply to, C<$reply[1]> the type and
461the remaining arguments are simply the message data.
372 462
373=over 4 463=over 4
374 464
375=cut 465=cut
376 466
377############################################################################# 467=item lookup => $name, @reply
378# self node code
379 468
380sub _new_port($) { 469Replies with the port ID of the specified well-known port, or C<undef>.
381 my ($name) = @_;
382 470
383 my ($noderef, $portname) = split /#/, $name; 471=item devnull => ...
384 472
385 $PORT{$name} = 473Generic data sink/CPU heat conversion.
386 $PORT{$portname} = {
387 names => [$name, $portname],
388 };
389}
390
391$NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE;
392_new_port "";
393 474
394=item relay => $port, @msg 475=item relay => $port, @msg
395 476
396Simply forwards the message to the given port. 477Simply forwards the message to the given port.
397 478
398=cut
399
400rcv "", relay => \&snd;
401
402=item eval => $string[ @reply] 479=item eval => $string[ @reply]
403 480
404Evaluates the given string. If C<@reply> is given, then a message of the 481Evaluates the given string. If C<@reply> is given, then a message of the
405form C<@reply, $@, @evalres> is sent (C<$reply[0]> is the port to reply to). 482form C<@reply, $@, @evalres> is sent.
406 483
407=cut 484Example: crash another node.
408 485
409rcv "", eval => sub { 486 snd $othernode, eval => "exit";
410 my (undef, $string, @reply) = @_;
411 my @res = eval $string;
412 snd @reply, "$@", @res if @reply;
413};
414 487
415=item time => @reply 488=item time => @reply
416 489
417Replies the the current node time to C<@reply>. 490Replies the the current node time to C<@reply>.
418 491
419=cut 492Example: tell the current node to send the current time to C<$myport> in a
493C<timereply> message.
420 494
421rcv "", time => sub { shift; snd @_, AE::time }; 495 snd $NODE, time => $myport, timereply => 1, 2;
496 # => snd $myport, timereply => 1, 2, <time>
422 497
423=back 498=back
424 499
425=head1 SEE ALSO 500=head1 SEE ALSO
426 501

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines