1 |
=head1 NAME |
2 |
|
3 |
AnyEvent::MP - multi-processing/message-passing framework |
4 |
|
5 |
=head1 SYNOPSIS |
6 |
|
7 |
use AnyEvent::MP; |
8 |
|
9 |
$NODE # contains this node's noderef |
10 |
NODE # returns this node's noderef |
11 |
NODE $port # returns the noderef of the port |
12 |
|
13 |
snd $port, type => data...; |
14 |
|
15 |
$SELF # receiving/own port id in rcv callbacks |
16 |
|
17 |
rcv $port, smartmatch => $cb->($port, @msg); |
18 |
|
19 |
# examples: |
20 |
rcv $port2, ping => sub { snd $_[0], "pong"; 0 }; |
21 |
rcv $port1, pong => sub { warn "pong received\n" }; |
22 |
snd $port2, ping => $port1; |
23 |
|
24 |
# more, smarter, matches (_any_ is exported by this module) |
25 |
rcv $port, [child_died => $pid] => sub { ... |
26 |
rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3 |
27 |
|
28 |
=head1 DESCRIPTION |
29 |
|
30 |
This module (-family) implements a simple message passing framework. |
31 |
|
32 |
Despite its simplicity, you can securely message other processes running |
33 |
on the same or other hosts. |
34 |
|
35 |
For an introduction to this module family, see the L<AnyEvent::MP::Intro> |
36 |
manual page. |
37 |
|
38 |
At the moment, this module family is severly broken and underdocumented, |
39 |
so do not use. This was uploaded mainly to reserve the CPAN namespace - |
40 |
stay tuned! The basic API should be finished, however. |
41 |
|
42 |
=head1 CONCEPTS |
43 |
|
44 |
=over 4 |
45 |
|
46 |
=item port |
47 |
|
48 |
A port is something you can send messages to with the C<snd> function, and |
49 |
you can register C<rcv> handlers with. All C<rcv> handlers will receive |
50 |
messages they match, messages will not be queued. |
51 |
|
52 |
=item port id - C<noderef#portname> |
53 |
|
54 |
A port id is always the noderef, a hash-mark (C<#>) as separator, followed |
55 |
by a port name (a printable string of unspecified format). |
56 |
|
57 |
=item node |
58 |
|
59 |
A node is a single process containing at least one port - the node |
60 |
port. You can send messages to node ports to let them create new ports, |
61 |
among other things. |
62 |
|
63 |
Initially, nodes are either private (single-process only) or hidden |
64 |
(connected to a master node only). Only when they epxlicitly "become |
65 |
public" can you send them messages from unrelated other nodes. |
66 |
|
67 |
=item noderef - C<host:port,host:port...>, C<id@noderef>, C<id> |
68 |
|
69 |
A noderef is a string that either uniquely identifies a given node (for |
70 |
private and hidden nodes), or contains a recipe on how to reach a given |
71 |
node (for public nodes). |
72 |
|
73 |
=back |
74 |
|
75 |
=head1 VARIABLES/FUNCTIONS |
76 |
|
77 |
=over 4 |
78 |
|
79 |
=cut |
80 |
|
81 |
package AnyEvent::MP; |
82 |
|
83 |
use AnyEvent::MP::Base; |
84 |
|
85 |
use common::sense; |
86 |
|
87 |
use Carp (); |
88 |
|
89 |
use AE (); |
90 |
|
91 |
use base "Exporter"; |
92 |
|
93 |
our $VERSION = '0.02'; |
94 |
our @EXPORT = qw( |
95 |
NODE $NODE *SELF node_of _any_ |
96 |
become_slave become_public |
97 |
snd rcv mon kil reg psub |
98 |
port |
99 |
); |
100 |
|
101 |
our $SELF; |
102 |
|
103 |
sub _self_die() { |
104 |
my $msg = $@; |
105 |
$msg =~ s/\n+$// unless ref $msg; |
106 |
kil $SELF, die => $msg; |
107 |
} |
108 |
|
109 |
=item $thisnode = NODE / $NODE |
110 |
|
111 |
The C<NODE> function returns, and the C<$NODE> variable contains |
112 |
the noderef of the local node. The value is initialised by a call |
113 |
to C<become_public> or C<become_slave>, after which all local port |
114 |
identifiers become invalid. |
115 |
|
116 |
=item $noderef = node_of $portid |
117 |
|
118 |
Extracts and returns the noderef from a portid or a noderef. |
119 |
|
120 |
=item $SELF |
121 |
|
122 |
Contains the current port id while executing C<rcv> callbacks or C<psub> |
123 |
blocks. |
124 |
|
125 |
=item SELF, %SELF, @SELF... |
126 |
|
127 |
Due to some quirks in how perl exports variables, it is impossible to |
128 |
just export C<$SELF>, all the symbols called C<SELF> are exported by this |
129 |
module, but only C<$SELF> is currently used. |
130 |
|
131 |
=item snd $portid, type => @data |
132 |
|
133 |
=item snd $portid, @msg |
134 |
|
135 |
Send the given message to the given port ID, which can identify either |
136 |
a local or a remote port, and can be either a string or soemthignt hat |
137 |
stringifies a sa port ID (such as a port object :). |
138 |
|
139 |
While the message can be about anything, it is highly recommended to use a |
140 |
string as first element (a portid, or some word that indicates a request |
141 |
type etc.). |
142 |
|
143 |
The message data effectively becomes read-only after a call to this |
144 |
function: modifying any argument is not allowed and can cause many |
145 |
problems. |
146 |
|
147 |
The type of data you can transfer depends on the transport protocol: when |
148 |
JSON is used, then only strings, numbers and arrays and hashes consisting |
149 |
of those are allowed (no objects). When Storable is used, then anything |
150 |
that Storable can serialise and deserialise is allowed, and for the local |
151 |
node, anything can be passed. |
152 |
|
153 |
=item kil $portid[, @reason] |
154 |
|
155 |
Kill the specified port with the given C<@reason>. |
156 |
|
157 |
If no C<@reason> is specified, then the port is killed "normally" (linked |
158 |
ports will not be kileld, or even notified). |
159 |
|
160 |
Otherwise, linked ports get killed with the same reason (second form of |
161 |
C<mon>, see below). |
162 |
|
163 |
Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks |
164 |
will be reported as reason C<< die => $@ >>. |
165 |
|
166 |
Transport/communication errors are reported as C<< transport_error => |
167 |
$message >>. |
168 |
|
169 |
=item $guard = mon $portid, $cb->(@reason) |
170 |
|
171 |
=item $guard = mon $portid, $otherport |
172 |
|
173 |
=item $guard = mon $portid, $otherport, @msg |
174 |
|
175 |
Monitor the given port and do something when the port is killed. |
176 |
|
177 |
In the first form, the callback is simply called with any number |
178 |
of C<@reason> elements (no @reason means that the port was deleted |
179 |
"normally"). Note also that I<< the callback B<must> never die >>, so use |
180 |
C<eval> if unsure. |
181 |
|
182 |
In the second form, the other port will be C<kil>'ed with C<@reason>, iff |
183 |
a @reason was specified, i.e. on "normal" kils nothing happens, while |
184 |
under all other conditions, the other port is killed with the same reason. |
185 |
|
186 |
In the last form, a message of the form C<@msg, @reason> will be C<snd>. |
187 |
|
188 |
Example: call a given callback when C<$port> is killed. |
189 |
|
190 |
mon $port, sub { warn "port died because of <@_>\n" }; |
191 |
|
192 |
Example: kill ourselves when C<$port> is killed abnormally. |
193 |
|
194 |
mon $port, $self; |
195 |
|
196 |
Example: send us a restart message another C<$port> is killed. |
197 |
|
198 |
mon $port, $self => "restart"; |
199 |
|
200 |
=cut |
201 |
|
202 |
sub mon { |
203 |
my ($noderef, $port, $cb) = ((split /#/, shift, 2), shift); |
204 |
|
205 |
my $node = $NODE{$noderef} || add_node $noderef; |
206 |
|
207 |
#TODO: ports must not be references |
208 |
if (!ref $cb or "AnyEvent::MP::Port" eq ref $cb) { |
209 |
if (@_) { |
210 |
# send a kill info message |
211 |
my (@msg) = ($cb, @_); |
212 |
$cb = sub { snd @msg, @_ }; |
213 |
} else { |
214 |
# simply kill other port |
215 |
my $port = $cb; |
216 |
$cb = sub { kil $port, @_ if @_ }; |
217 |
} |
218 |
} |
219 |
|
220 |
$node->monitor ($port, $cb); |
221 |
|
222 |
defined wantarray |
223 |
and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) } |
224 |
} |
225 |
|
226 |
=item $guard = mon_guard $port, $ref, $ref... |
227 |
|
228 |
Monitors the given C<$port> and keeps the passed references. When the port |
229 |
is killed, the references will be freed. |
230 |
|
231 |
Optionally returns a guard that will stop the monitoring. |
232 |
|
233 |
This function is useful when you create e.g. timers or other watchers and |
234 |
want to free them when the port gets killed: |
235 |
|
236 |
$port->rcv (start => sub { |
237 |
my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub { |
238 |
undef $timer if 0.9 < rand; |
239 |
}); |
240 |
}); |
241 |
|
242 |
=cut |
243 |
|
244 |
sub mon_guard { |
245 |
my ($port, @refs) = @_; |
246 |
|
247 |
mon $port, sub { 0 && @refs } |
248 |
} |
249 |
|
250 |
=item lnk $port1, $port2 |
251 |
|
252 |
Link two ports. This is simply a shorthand for: |
253 |
|
254 |
mon $port1, $port2; |
255 |
mon $port2, $port1; |
256 |
|
257 |
It means that if either one is killed abnormally, the other one gets |
258 |
killed as well. |
259 |
|
260 |
=item $local_port = port |
261 |
|
262 |
Create a new local port object that supports message matching. |
263 |
|
264 |
=item $portid = port { my @msg = @_; $finished } |
265 |
|
266 |
Creates a "mini port", that is, a very lightweight port without any |
267 |
pattern matching behind it, and returns its ID. |
268 |
|
269 |
The block will be called for every message received on the port. When the |
270 |
callback returns a true value its job is considered "done" and the port |
271 |
will be destroyed. Otherwise it will stay alive. |
272 |
|
273 |
The message will be passed as-is, no extra argument (i.e. no port id) will |
274 |
be passed to the callback. |
275 |
|
276 |
If you need the local port id in the callback, this works nicely: |
277 |
|
278 |
my $port; $port = miniport { |
279 |
snd $otherport, reply => $port; |
280 |
}; |
281 |
|
282 |
=cut |
283 |
|
284 |
sub port(;&) { |
285 |
my $id = "$UNIQ." . $ID++; |
286 |
my $port = "$NODE#$id"; |
287 |
|
288 |
if (@_) { |
289 |
my $cb = shift; |
290 |
$PORT{$id} = sub { |
291 |
local $SELF = $port; |
292 |
eval { |
293 |
&$cb |
294 |
and kil $id; |
295 |
}; |
296 |
_self_die if $@; |
297 |
}; |
298 |
} else { |
299 |
my $self = bless { |
300 |
id => "$NODE#$id", |
301 |
}, "AnyEvent::MP::Port"; |
302 |
|
303 |
$PORT_DATA{$id} = $self; |
304 |
$PORT{$id} = sub { |
305 |
local $SELF = $port; |
306 |
|
307 |
eval { |
308 |
for (@{ $self->{rc0}{$_[0]} }) { |
309 |
$_ && &{$_->[0]} |
310 |
&& undef $_; |
311 |
} |
312 |
|
313 |
for (@{ $self->{rcv}{$_[0]} }) { |
314 |
$_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1] |
315 |
&& &{$_->[0]} |
316 |
&& undef $_; |
317 |
} |
318 |
|
319 |
for (@{ $self->{any} }) { |
320 |
$_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1] |
321 |
&& &{$_->[0]} |
322 |
&& undef $_; |
323 |
} |
324 |
}; |
325 |
_self_die if $@; |
326 |
}; |
327 |
} |
328 |
|
329 |
$port |
330 |
} |
331 |
|
332 |
=item reg $portid, $name |
333 |
|
334 |
Registers the given port under the name C<$name>. If the name already |
335 |
exists it is replaced. |
336 |
|
337 |
A port can only be registered under one well known name. |
338 |
|
339 |
A port automatically becomes unregistered when it is killed. |
340 |
|
341 |
=cut |
342 |
|
343 |
sub reg(@) { |
344 |
my ($portid, $name) = @_; |
345 |
|
346 |
$REG{$name} = $portid; |
347 |
} |
348 |
|
349 |
=item rcv $portid, tagstring => $callback->(@msg), ... |
350 |
|
351 |
=item rcv $portid, $smartmatch => $callback->(@msg), ... |
352 |
|
353 |
=item rcv $portid, [$smartmatch...] => $callback->(@msg), ... |
354 |
|
355 |
Register callbacks to be called on matching messages on the given port. |
356 |
|
357 |
The callback has to return a true value when its work is done, after |
358 |
which is will be removed, or a false value in which case it will stay |
359 |
registered. |
360 |
|
361 |
The global C<$SELF> (exported by this module) contains C<$portid> while |
362 |
executing the callback. |
363 |
|
364 |
Runtime errors wdurign callback execution will result in the port being |
365 |
C<kil>ed. |
366 |
|
367 |
If the match is an array reference, then it will be matched against the |
368 |
first elements of the message, otherwise only the first element is being |
369 |
matched. |
370 |
|
371 |
Any element in the match that is specified as C<_any_> (a function |
372 |
exported by this module) matches any single element of the message. |
373 |
|
374 |
While not required, it is highly recommended that the first matching |
375 |
element is a string identifying the message. The one-string-only match is |
376 |
also the most efficient match (by far). |
377 |
|
378 |
=cut |
379 |
|
380 |
sub rcv($@) { |
381 |
my ($noderef, $port) = split /#/, shift, 2; |
382 |
|
383 |
($NODE{$noderef} || add_node $noderef) == $NODE{""} |
384 |
or Carp::croak "$noderef#$port: rcv can only be called on local ports, caught"; |
385 |
|
386 |
my $self = $PORT_DATA{$port} |
387 |
or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught"; |
388 |
|
389 |
"AnyEvent::MP::Port" eq ref $self |
390 |
or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught"; |
391 |
|
392 |
while (@_) { |
393 |
my ($match, $cb) = splice @_, 0, 2; |
394 |
|
395 |
if (!ref $match) { |
396 |
push @{ $self->{rc0}{$match} }, [$cb]; |
397 |
} elsif (("ARRAY" eq ref $match && !ref $match->[0])) { |
398 |
my ($type, @match) = @$match; |
399 |
@match |
400 |
? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match] |
401 |
: push @{ $self->{rc0}{$match->[0]} }, [$cb]; |
402 |
} else { |
403 |
push @{ $self->{any} }, [$cb, $match]; |
404 |
} |
405 |
} |
406 |
} |
407 |
|
408 |
=item $closure = psub { BLOCK } |
409 |
|
410 |
Remembers C<$SELF> and creates a closure out of the BLOCK. When the |
411 |
closure is executed, sets up the environment in the same way as in C<rcv> |
412 |
callbacks, i.e. runtime errors will cause the port to get C<kil>ed. |
413 |
|
414 |
This is useful when you register callbacks from C<rcv> callbacks: |
415 |
|
416 |
rcv delayed_reply => sub { |
417 |
my ($delay, @reply) = @_; |
418 |
my $timer = AE::timer $delay, 0, psub { |
419 |
snd @reply, $SELF; |
420 |
}; |
421 |
}; |
422 |
|
423 |
=cut |
424 |
|
425 |
sub psub(&) { |
426 |
my $cb = shift; |
427 |
|
428 |
my $port = $SELF |
429 |
or Carp::croak "psub can only be called from within rcv or psub callbacks, not"; |
430 |
|
431 |
sub { |
432 |
local $SELF = $port; |
433 |
|
434 |
if (wantarray) { |
435 |
my @res = eval { &$cb }; |
436 |
_self_die if $@; |
437 |
@res |
438 |
} else { |
439 |
my $res = eval { &$cb }; |
440 |
_self_die if $@; |
441 |
$res |
442 |
} |
443 |
} |
444 |
} |
445 |
|
446 |
=back |
447 |
|
448 |
=head1 FUNCTIONS FOR NODES |
449 |
|
450 |
=over 4 |
451 |
|
452 |
=item become_public endpoint... |
453 |
|
454 |
Tells the node to become a public node, i.e. reachable from other nodes. |
455 |
|
456 |
If no arguments are given, or the first argument is C<undef>, then |
457 |
AnyEvent::MP tries to bind on port C<4040> on all IP addresses that the |
458 |
local nodename resolves to. |
459 |
|
460 |
Otherwise the first argument must be an array-reference with transport |
461 |
endpoints ("ip:port", "hostname:port") or port numbers (in which case the |
462 |
local nodename is used as hostname). The endpoints are all resolved and |
463 |
will become the node reference. |
464 |
|
465 |
=cut |
466 |
|
467 |
=back |
468 |
|
469 |
=head1 NODE MESSAGES |
470 |
|
471 |
Nodes understand the following messages sent to them. Many of them take |
472 |
arguments called C<@reply>, which will simply be used to compose a reply |
473 |
message - C<$reply[0]> is the port to reply to, C<$reply[1]> the type and |
474 |
the remaining arguments are simply the message data. |
475 |
|
476 |
=over 4 |
477 |
|
478 |
=cut |
479 |
|
480 |
=item lookup => $name, @reply |
481 |
|
482 |
Replies with the port ID of the specified well-known port, or C<undef>. |
483 |
|
484 |
=item devnull => ... |
485 |
|
486 |
Generic data sink/CPU heat conversion. |
487 |
|
488 |
=item relay => $port, @msg |
489 |
|
490 |
Simply forwards the message to the given port. |
491 |
|
492 |
=item eval => $string[ @reply] |
493 |
|
494 |
Evaluates the given string. If C<@reply> is given, then a message of the |
495 |
form C<@reply, $@, @evalres> is sent. |
496 |
|
497 |
Example: crash another node. |
498 |
|
499 |
snd $othernode, eval => "exit"; |
500 |
|
501 |
=item time => @reply |
502 |
|
503 |
Replies the the current node time to C<@reply>. |
504 |
|
505 |
Example: tell the current node to send the current time to C<$myport> in a |
506 |
C<timereply> message. |
507 |
|
508 |
snd $NODE, time => $myport, timereply => 1, 2; |
509 |
# => snd $myport, timereply => 1, 2, <time> |
510 |
|
511 |
=back |
512 |
|
513 |
=head1 SEE ALSO |
514 |
|
515 |
L<AnyEvent>. |
516 |
|
517 |
=head1 AUTHOR |
518 |
|
519 |
Marc Lehmann <schmorp@schmorp.de> |
520 |
http://home.schmorp.de/ |
521 |
|
522 |
=cut |
523 |
|
524 |
1 |
525 |
|