1 |
=head1 NAME |
2 |
|
3 |
AnyEvent::MP - multi-processing/message-passing framework |
4 |
|
5 |
=head1 SYNOPSIS |
6 |
|
7 |
use AnyEvent::MP; |
8 |
|
9 |
$NODE # contains this node's noderef |
10 |
NODE # returns this node's noderef |
11 |
NODE $port # returns the noderef of the port |
12 |
|
13 |
snd $port, type => data...; |
14 |
|
15 |
$SELF # receiving/own port id in rcv callbacks |
16 |
|
17 |
rcv $port, smartmatch => $cb->($port, @msg); |
18 |
|
19 |
# examples: |
20 |
rcv $port2, ping => sub { snd $_[0], "pong"; 0 }; |
21 |
rcv $port1, pong => sub { warn "pong received\n" }; |
22 |
snd $port2, ping => $port1; |
23 |
|
24 |
# more, smarter, matches (_any_ is exported by this module) |
25 |
rcv $port, [child_died => $pid] => sub { ... |
26 |
rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3 |
27 |
|
28 |
=head1 DESCRIPTION |
29 |
|
30 |
This module (-family) implements a simple message passing framework. |
31 |
|
32 |
Despite its simplicity, you can securely message other processes running |
33 |
on the same or other hosts. |
34 |
|
35 |
At the moment, this module family is severly brokena nd underdocumented, |
36 |
so do not use. This was uploaded mainly to reserve the CPAN namespace - |
37 |
stay tuned! |
38 |
|
39 |
=head1 CONCEPTS |
40 |
|
41 |
=over 4 |
42 |
|
43 |
=item port |
44 |
|
45 |
A port is something you can send messages to with the C<snd> function, and |
46 |
you can register C<rcv> handlers with. All C<rcv> handlers will receive |
47 |
messages they match, messages will not be queued. |
48 |
|
49 |
=item port id - C<noderef#portname> |
50 |
|
51 |
A port id is always the noderef, a hash-mark (C<#>) as separator, followed |
52 |
by a port name (a printable string of unspecified format). |
53 |
|
54 |
=item node |
55 |
|
56 |
A node is a single process containing at least one port - the node |
57 |
port. You can send messages to node ports to let them create new ports, |
58 |
among other things. |
59 |
|
60 |
Initially, nodes are either private (single-process only) or hidden |
61 |
(connected to a master node only). Only when they epxlicitly "become |
62 |
public" can you send them messages from unrelated other nodes. |
63 |
|
64 |
=item noderef - C<host:port,host:port...>, C<id@noderef>, C<id> |
65 |
|
66 |
A noderef is a string that either uniquely identifies a given node (for |
67 |
private and hidden nodes), or contains a recipe on how to reach a given |
68 |
node (for public nodes). |
69 |
|
70 |
=back |
71 |
|
72 |
=head1 VARIABLES/FUNCTIONS |
73 |
|
74 |
=over 4 |
75 |
|
76 |
=cut |
77 |
|
78 |
package AnyEvent::MP; |
79 |
|
80 |
use AnyEvent::MP::Base; |
81 |
|
82 |
use common::sense; |
83 |
|
84 |
use Carp (); |
85 |
|
86 |
use AE (); |
87 |
|
88 |
use base "Exporter"; |
89 |
|
90 |
our $VERSION = '0.02'; |
91 |
our @EXPORT = qw( |
92 |
NODE $NODE *SELF node_of _any_ |
93 |
become_slave become_public |
94 |
snd rcv mon kil reg psub |
95 |
port |
96 |
); |
97 |
|
98 |
our $SELF; |
99 |
|
100 |
sub _self_die() { |
101 |
my $msg = $@; |
102 |
$msg =~ s/\n+$// unless ref $msg; |
103 |
kil $SELF, die => $msg; |
104 |
} |
105 |
|
106 |
=item $thisnode = NODE / $NODE |
107 |
|
108 |
The C<NODE> function returns, and the C<$NODE> variable contains |
109 |
the noderef of the local node. The value is initialised by a call |
110 |
to C<become_public> or C<become_slave>, after which all local port |
111 |
identifiers become invalid. |
112 |
|
113 |
=item $noderef = node_of $portid |
114 |
|
115 |
Extracts and returns the noderef from a portid or a noderef. |
116 |
|
117 |
=item $SELF |
118 |
|
119 |
Contains the current port id while executing C<rcv> callbacks or C<psub> |
120 |
blocks. |
121 |
|
122 |
=item SELF, %SELF, @SELF... |
123 |
|
124 |
Due to some quirks in how perl exports variables, it is impossible to |
125 |
just export C<$SELF>, all the symbols called C<SELF> are exported by this |
126 |
module, but only C<$SELF> is currently used. |
127 |
|
128 |
=item snd $portid, type => @data |
129 |
|
130 |
=item snd $portid, @msg |
131 |
|
132 |
Send the given message to the given port ID, which can identify either |
133 |
a local or a remote port, and can be either a string or soemthignt hat |
134 |
stringifies a sa port ID (such as a port object :). |
135 |
|
136 |
While the message can be about anything, it is highly recommended to use a |
137 |
string as first element (a portid, or some word that indicates a request |
138 |
type etc.). |
139 |
|
140 |
The message data effectively becomes read-only after a call to this |
141 |
function: modifying any argument is not allowed and can cause many |
142 |
problems. |
143 |
|
144 |
The type of data you can transfer depends on the transport protocol: when |
145 |
JSON is used, then only strings, numbers and arrays and hashes consisting |
146 |
of those are allowed (no objects). When Storable is used, then anything |
147 |
that Storable can serialise and deserialise is allowed, and for the local |
148 |
node, anything can be passed. |
149 |
|
150 |
=item kil $portid[, @reason] |
151 |
|
152 |
Kill the specified port with the given C<@reason>. |
153 |
|
154 |
If no C<@reason> is specified, then the port is killed "normally" (linked |
155 |
ports will not be kileld, or even notified). |
156 |
|
157 |
Otherwise, linked ports get killed with the same reason (second form of |
158 |
C<mon>, see below). |
159 |
|
160 |
Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks |
161 |
will be reported as reason C<< die => $@ >>. |
162 |
|
163 |
Transport/communication errors are reported as C<< transport_error => |
164 |
$message >>. |
165 |
|
166 |
=item $guard = mon $portid, $cb->(@reason) |
167 |
|
168 |
=item $guard = mon $portid, $otherport |
169 |
|
170 |
=item $guard = mon $portid, $otherport, @msg |
171 |
|
172 |
Monitor the given port and do something when the port is killed. |
173 |
|
174 |
In the first form, the callback is simply called with any number |
175 |
of C<@reason> elements (no @reason means that the port was deleted |
176 |
"normally"). Note also that I<< the callback B<must> never die >>, so use |
177 |
C<eval> if unsure. |
178 |
|
179 |
In the second form, the other port will be C<kil>'ed with C<@reason>, iff |
180 |
a @reason was specified, i.e. on "normal" kils nothing happens, while |
181 |
under all other conditions, the other port is killed with the same reason. |
182 |
|
183 |
In the last form, a message of the form C<@msg, @reason> will be C<snd>. |
184 |
|
185 |
Example: call a given callback when C<$port> is killed. |
186 |
|
187 |
mon $port, sub { warn "port died because of <@_>\n" }; |
188 |
|
189 |
Example: kill ourselves when C<$port> is killed abnormally. |
190 |
|
191 |
mon $port, $self; |
192 |
|
193 |
Example: send us a restart message another C<$port> is killed. |
194 |
|
195 |
mon $port, $self => "restart"; |
196 |
|
197 |
=cut |
198 |
|
199 |
sub mon { |
200 |
my ($noderef, $port, $cb) = ((split /#/, shift, 2), shift); |
201 |
|
202 |
my $node = $NODE{$noderef} || add_node $noderef; |
203 |
|
204 |
#TODO: ports must not be references |
205 |
if (!ref $cb or "AnyEvent::MP::Port" eq ref $cb) { |
206 |
if (@_) { |
207 |
# send a kill info message |
208 |
my (@msg) = ($cb, @_); |
209 |
$cb = sub { snd @msg, @_ }; |
210 |
} else { |
211 |
# simply kill other port |
212 |
my $port = $cb; |
213 |
$cb = sub { kil $port, @_ if @_ }; |
214 |
} |
215 |
} |
216 |
|
217 |
$node->monitor ($port, $cb); |
218 |
|
219 |
defined wantarray |
220 |
and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) } |
221 |
} |
222 |
|
223 |
=item $guard = mon_guard $port, $ref, $ref... |
224 |
|
225 |
Monitors the given C<$port> and keeps the passed references. When the port |
226 |
is killed, the references will be freed. |
227 |
|
228 |
Optionally returns a guard that will stop the monitoring. |
229 |
|
230 |
This function is useful when you create e.g. timers or other watchers and |
231 |
want to free them when the port gets killed: |
232 |
|
233 |
$port->rcv (start => sub { |
234 |
my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub { |
235 |
undef $timer if 0.9 < rand; |
236 |
}); |
237 |
}); |
238 |
|
239 |
=cut |
240 |
|
241 |
sub mon_guard { |
242 |
my ($port, @refs) = @_; |
243 |
|
244 |
mon $port, sub { 0 && @refs } |
245 |
} |
246 |
|
247 |
=item $local_port = port |
248 |
|
249 |
Create a new local port object that supports message matching. |
250 |
|
251 |
=item $portid = port { my @msg = @_; $finished } |
252 |
|
253 |
Creates a "mini port", that is, a very lightweight port without any |
254 |
pattern matching behind it, and returns its ID. |
255 |
|
256 |
The block will be called for every message received on the port. When the |
257 |
callback returns a true value its job is considered "done" and the port |
258 |
will be destroyed. Otherwise it will stay alive. |
259 |
|
260 |
The message will be passed as-is, no extra argument (i.e. no port id) will |
261 |
be passed to the callback. |
262 |
|
263 |
If you need the local port id in the callback, this works nicely: |
264 |
|
265 |
my $port; $port = miniport { |
266 |
snd $otherport, reply => $port; |
267 |
}; |
268 |
|
269 |
=cut |
270 |
|
271 |
sub port(;&) { |
272 |
my $id = "$UNIQ." . $ID++; |
273 |
my $port = "$NODE#$id"; |
274 |
|
275 |
if (@_) { |
276 |
my $cb = shift; |
277 |
$PORT{$id} = sub { |
278 |
local $SELF = $port; |
279 |
eval { |
280 |
&$cb |
281 |
and kil $id; |
282 |
}; |
283 |
_self_die if $@; |
284 |
}; |
285 |
} else { |
286 |
my $self = bless { |
287 |
id => "$NODE#$id", |
288 |
}, "AnyEvent::MP::Port"; |
289 |
|
290 |
$PORT_DATA{$id} = $self; |
291 |
$PORT{$id} = sub { |
292 |
local $SELF = $port; |
293 |
|
294 |
eval { |
295 |
for (@{ $self->{rc0}{$_[0]} }) { |
296 |
$_ && &{$_->[0]} |
297 |
&& undef $_; |
298 |
} |
299 |
|
300 |
for (@{ $self->{rcv}{$_[0]} }) { |
301 |
$_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1] |
302 |
&& &{$_->[0]} |
303 |
&& undef $_; |
304 |
} |
305 |
|
306 |
for (@{ $self->{any} }) { |
307 |
$_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1] |
308 |
&& &{$_->[0]} |
309 |
&& undef $_; |
310 |
} |
311 |
}; |
312 |
_self_die if $@; |
313 |
}; |
314 |
} |
315 |
|
316 |
$port |
317 |
} |
318 |
|
319 |
=item reg $portid, $name |
320 |
|
321 |
Registers the given port under the name C<$name>. If the name already |
322 |
exists it is replaced. |
323 |
|
324 |
A port can only be registered under one well known name. |
325 |
|
326 |
A port automatically becomes unregistered when it is killed. |
327 |
|
328 |
=cut |
329 |
|
330 |
sub reg(@) { |
331 |
my ($portid, $name) = @_; |
332 |
|
333 |
$REG{$name} = $portid; |
334 |
} |
335 |
|
336 |
=item rcv $portid, tagstring => $callback->(@msg), ... |
337 |
|
338 |
=item rcv $portid, $smartmatch => $callback->(@msg), ... |
339 |
|
340 |
=item rcv $portid, [$smartmatch...] => $callback->(@msg), ... |
341 |
|
342 |
Register callbacks to be called on matching messages on the given port. |
343 |
|
344 |
The callback has to return a true value when its work is done, after |
345 |
which is will be removed, or a false value in which case it will stay |
346 |
registered. |
347 |
|
348 |
The global C<$SELF> (exported by this module) contains C<$portid> while |
349 |
executing the callback. |
350 |
|
351 |
Runtime errors wdurign callback execution will result in the port being |
352 |
C<kil>ed. |
353 |
|
354 |
If the match is an array reference, then it will be matched against the |
355 |
first elements of the message, otherwise only the first element is being |
356 |
matched. |
357 |
|
358 |
Any element in the match that is specified as C<_any_> (a function |
359 |
exported by this module) matches any single element of the message. |
360 |
|
361 |
While not required, it is highly recommended that the first matching |
362 |
element is a string identifying the message. The one-string-only match is |
363 |
also the most efficient match (by far). |
364 |
|
365 |
=cut |
366 |
|
367 |
sub rcv($@) { |
368 |
my ($noderef, $port) = split /#/, shift, 2; |
369 |
|
370 |
($NODE{$noderef} || add_node $noderef) == $NODE{""} |
371 |
or Carp::croak "$noderef#$port: rcv can only be called on local ports, caught"; |
372 |
|
373 |
my $self = $PORT_DATA{$port} |
374 |
or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught"; |
375 |
|
376 |
"AnyEvent::MP::Port" eq ref $self |
377 |
or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught"; |
378 |
|
379 |
while (@_) { |
380 |
my ($match, $cb) = splice @_, 0, 2; |
381 |
|
382 |
if (!ref $match) { |
383 |
push @{ $self->{rc0}{$match} }, [$cb]; |
384 |
} elsif (("ARRAY" eq ref $match && !ref $match->[0])) { |
385 |
my ($type, @match) = @$match; |
386 |
@match |
387 |
? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match] |
388 |
: push @{ $self->{rc0}{$match->[0]} }, [$cb]; |
389 |
} else { |
390 |
push @{ $self->{any} }, [$cb, $match]; |
391 |
} |
392 |
} |
393 |
} |
394 |
|
395 |
=item $closure = psub { BLOCK } |
396 |
|
397 |
Remembers C<$SELF> and creates a closure out of the BLOCK. When the |
398 |
closure is executed, sets up the environment in the same way as in C<rcv> |
399 |
callbacks, i.e. runtime errors will cause the port to get C<kil>ed. |
400 |
|
401 |
This is useful when you register callbacks from C<rcv> callbacks: |
402 |
|
403 |
rcv delayed_reply => sub { |
404 |
my ($delay, @reply) = @_; |
405 |
my $timer = AE::timer $delay, 0, psub { |
406 |
snd @reply, $SELF; |
407 |
}; |
408 |
}; |
409 |
|
410 |
=cut |
411 |
|
412 |
sub psub(&) { |
413 |
my $cb = shift; |
414 |
|
415 |
my $port = $SELF |
416 |
or Carp::croak "psub can only be called from within rcv or psub callbacks, not"; |
417 |
|
418 |
sub { |
419 |
local $SELF = $port; |
420 |
|
421 |
if (wantarray) { |
422 |
my @res = eval { &$cb }; |
423 |
_self_die if $@; |
424 |
@res |
425 |
} else { |
426 |
my $res = eval { &$cb }; |
427 |
_self_die if $@; |
428 |
$res |
429 |
} |
430 |
} |
431 |
} |
432 |
|
433 |
=back |
434 |
|
435 |
=head1 FUNCTIONS FOR NODES |
436 |
|
437 |
=over 4 |
438 |
|
439 |
=item become_public endpoint... |
440 |
|
441 |
Tells the node to become a public node, i.e. reachable from other nodes. |
442 |
|
443 |
If no arguments are given, or the first argument is C<undef>, then |
444 |
AnyEvent::MP tries to bind on port C<4040> on all IP addresses that the |
445 |
local nodename resolves to. |
446 |
|
447 |
Otherwise the first argument must be an array-reference with transport |
448 |
endpoints ("ip:port", "hostname:port") or port numbers (in which case the |
449 |
local nodename is used as hostname). The endpoints are all resolved and |
450 |
will become the node reference. |
451 |
|
452 |
=cut |
453 |
|
454 |
=back |
455 |
|
456 |
=head1 NODE MESSAGES |
457 |
|
458 |
Nodes understand the following messages sent to them. Many of them take |
459 |
arguments called C<@reply>, which will simply be used to compose a reply |
460 |
message - C<$reply[0]> is the port to reply to, C<$reply[1]> the type and |
461 |
the remaining arguments are simply the message data. |
462 |
|
463 |
=over 4 |
464 |
|
465 |
=cut |
466 |
|
467 |
=item lookup => $name, @reply |
468 |
|
469 |
Replies with the port ID of the specified well-known port, or C<undef>. |
470 |
|
471 |
=item devnull => ... |
472 |
|
473 |
Generic data sink/CPU heat conversion. |
474 |
|
475 |
=item relay => $port, @msg |
476 |
|
477 |
Simply forwards the message to the given port. |
478 |
|
479 |
=item eval => $string[ @reply] |
480 |
|
481 |
Evaluates the given string. If C<@reply> is given, then a message of the |
482 |
form C<@reply, $@, @evalres> is sent. |
483 |
|
484 |
Example: crash another node. |
485 |
|
486 |
snd $othernode, eval => "exit"; |
487 |
|
488 |
=item time => @reply |
489 |
|
490 |
Replies the the current node time to C<@reply>. |
491 |
|
492 |
Example: tell the current node to send the current time to C<$myport> in a |
493 |
C<timereply> message. |
494 |
|
495 |
snd $NODE, time => $myport, timereply => 1, 2; |
496 |
# => snd $myport, timereply => 1, 2, <time> |
497 |
|
498 |
=back |
499 |
|
500 |
=head1 SEE ALSO |
501 |
|
502 |
L<AnyEvent>. |
503 |
|
504 |
=head1 AUTHOR |
505 |
|
506 |
Marc Lehmann <schmorp@schmorp.de> |
507 |
http://home.schmorp.de/ |
508 |
|
509 |
=cut |
510 |
|
511 |
1 |
512 |
|