ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
Revision: 1.20
Committed: Mon Aug 3 22:05:55 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.19: +5 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP - multi-processing/message-passing framework
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP;
8    
9 root 1.2 NODE # returns this node identifier
10     $NODE # contains this node identifier
11    
12     snd $port, type => data...;
13    
14     rcv $port, smartmatch => $cb->($port, @msg);
15    
16     # examples:
17     rcv $port2, ping => sub { snd $_[0], "pong"; 0 };
18     rcv $port1, pong => sub { warn "pong received\n" };
19     snd $port2, ping => $port1;
20    
21     # more, smarter, matches (_any_ is exported by this module)
22     rcv $port, [child_died => $pid] => sub { ...
23     rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3
24    
25 root 1.1 =head1 DESCRIPTION
26    
27 root 1.2 This module (-family) implements a simple message passing framework.
28    
29     Despite its simplicity, you can securely message other processes running
30     on the same or other hosts.
31    
32 root 1.6 At the moment, this module family is severly brokena nd underdocumented,
33     so do not use. This was uploaded mainly to resreve the CPAN namespace -
34     stay tuned!
35    
36 root 1.2 =head1 CONCEPTS
37    
38     =over 4
39    
40     =item port
41    
42     A port is something you can send messages to with the C<snd> function, and
43     you can register C<rcv> handlers with. All C<rcv> handlers will receive
44     messages they match, messages will not be queued.
45    
46 root 1.3 =item port id - C<noderef#portname>
47 root 1.2
48 root 1.3 A port id is always the noderef, a hash-mark (C<#>) as separator, followed
49     by a port name (a printable string of unspecified format).
50 root 1.2
51     =item node
52    
53     A node is a single process containing at least one port - the node
54     port. You can send messages to node ports to let them create new ports,
55     among other things.
56    
57     Initially, nodes are either private (single-process only) or hidden
58 root 1.3 (connected to a master node only). Only when they epxlicitly "become
59     public" can you send them messages from unrelated other nodes.
60 root 1.2
61 root 1.5 =item noderef - C<host:port,host:port...>, C<id@noderef>, C<id>
62 root 1.2
63 root 1.3 A noderef is a string that either uniquely identifies a given node (for
64 root 1.2 private and hidden nodes), or contains a recipe on how to reach a given
65     node (for public nodes).
66    
67     =back
68    
69 root 1.3 =head1 VARIABLES/FUNCTIONS
70 root 1.2
71     =over 4
72    
73 root 1.1 =cut
74    
75     package AnyEvent::MP;
76    
77 root 1.8 use AnyEvent::MP::Base;
78 root 1.2
79 root 1.1 use common::sense;
80    
81 root 1.2 use Carp ();
82    
83 root 1.1 use AE ();
84    
85 root 1.2 use base "Exporter";
86    
87 root 1.9 our $VERSION = '0.02';
88 root 1.8 our @EXPORT = qw(
89 root 1.18 NODE $NODE $PORT snd rcv mon del _any_
90 root 1.8 create_port create_port_on
91 root 1.11 create_miniport
92 root 1.8 become_slave become_public
93     );
94 root 1.2
95 root 1.3 =item NODE / $NODE
96    
97     The C<NODE ()> function and the C<$NODE> variable contain the noderef of
98     the local node. The value is initialised by a call to C<become_public> or
99     C<become_slave>, after which all local port identifiers become invalid.
100    
101     =item snd $portid, type => @data
102    
103     =item snd $portid, @msg
104    
105 root 1.8 Send the given message to the given port ID, which can identify either
106     a local or a remote port, and can be either a string or soemthignt hat
107     stringifies a sa port ID (such as a port object :).
108    
109     While the message can be about anything, it is highly recommended to use a
110     string as first element (a portid, or some word that indicates a request
111     type etc.).
112 root 1.3
113     The message data effectively becomes read-only after a call to this
114     function: modifying any argument is not allowed and can cause many
115     problems.
116    
117     The type of data you can transfer depends on the transport protocol: when
118     JSON is used, then only strings, numbers and arrays and hashes consisting
119     of those are allowed (no objects). When Storable is used, then anything
120     that Storable can serialise and deserialise is allowed, and for the local
121     node, anything can be passed.
122    
123 root 1.20 =item $guard = mon $portid, $cb->()
124 root 1.18
125 root 1.20 Monitor the given port and call the given callback when the port is
126     destroyed or connection to it's node is lost.
127    
128     #TODO
129 root 1.18
130     =cut
131    
132     sub mon {
133     my ($noderef, $port) = split /#/, shift, 2;
134    
135     my $node = AnyEvent::MP::Base::add_node $noderef;
136    
137     my $cb = shift;
138    
139     $node->monitor ($port, $cb);
140    
141     defined wantarray
142     and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) }
143     }
144    
145 root 1.8 =item $local_port = create_port
146    
147     Create a new local port object. See the next section for allowed methods.
148    
149 root 1.3 =cut
150    
151 root 1.8 sub create_port {
152 root 1.16 my $id = "$AnyEvent::MP::Base::UNIQ." . $AnyEvent::MP::Base::ID++;
153 root 1.8
154     my $self = bless {
155     id => "$NODE#$id",
156     names => [$id],
157     }, "AnyEvent::MP::Port";
158    
159     $AnyEvent::MP::Base::PORT{$id} = sub {
160     unshift @_, $self;
161    
162     for (@{ $self->{rc0}{$_[1]} }) {
163     $_ && &{$_->[0]}
164     && undef $_;
165     }
166    
167     for (@{ $self->{rcv}{$_[1]} }) {
168     $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
169     && &{$_->[0]}
170     && undef $_;
171     }
172 root 1.2
173 root 1.8 for (@{ $self->{any} }) {
174     $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
175     && &{$_->[0]}
176     && undef $_;
177     }
178     };
179 root 1.2
180 root 1.8 $self
181 root 1.3 }
182    
183 root 1.15 =item $portid = miniport { my @msg = @_; $finished }
184 root 1.10
185 root 1.15 Creates a "mini port", that is, a very lightweight port without any
186     pattern matching behind it, and returns its ID.
187    
188     The block will be called for every message received on the port. When the
189     callback returns a true value its job is considered "done" and the port
190     will be destroyed. Otherwise it will stay alive.
191    
192 root 1.17 The message will be passed as-is, no extra argument (i.e. no port id) will
193 root 1.15 be passed to the callback.
194    
195     If you need the local port id in the callback, this works nicely:
196    
197     my $port; $port = miniport {
198     snd $otherport, reply => $port;
199     };
200 root 1.10
201     =cut
202    
203 root 1.15 sub miniport(&) {
204 root 1.10 my $cb = shift;
205 root 1.16 my $id = "$AnyEvent::MP::Base::UNIQ." . $AnyEvent::MP::Base::ID++;
206 root 1.10
207     $AnyEvent::MP::Base::PORT{$id} = sub {
208     &$cb
209 root 1.19 and del $id;
210 root 1.10 };
211    
212     "$NODE#$id"
213     }
214    
215 root 1.8 package AnyEvent::MP::Port;
216    
217     =back
218    
219     =head1 METHODS FOR PORT OBJECTS
220    
221     =over 4
222    
223     =item "$port"
224    
225     A port object stringifies to its port ID, so can be used directly for
226     C<snd> operations.
227    
228     =cut
229    
230     use overload
231     '""' => sub { $_[0]{id} },
232     fallback => 1;
233    
234 root 1.18 sub TO_JSON { $_[0]{id} }
235    
236 root 1.8 =item $port->rcv (type => $callback->($port, @msg))
237 root 1.3
238 root 1.8 =item $port->rcv ($smartmatch => $callback->($port, @msg))
239 root 1.3
240 root 1.8 =item $port->rcv ([$smartmatch...] => $callback->($port, @msg))
241 root 1.3
242 root 1.8 Register a callback on the given port.
243 root 1.3
244     The callback has to return a true value when its work is done, after
245     which is will be removed, or a false value in which case it will stay
246     registered.
247    
248     If the match is an array reference, then it will be matched against the
249     first elements of the message, otherwise only the first element is being
250     matched.
251    
252     Any element in the match that is specified as C<_any_> (a function
253     exported by this module) matches any single element of the message.
254    
255     While not required, it is highly recommended that the first matching
256     element is a string identifying the message. The one-string-only match is
257     also the most efficient match (by far).
258    
259     =cut
260    
261     sub rcv($@) {
262 root 1.8 my ($self, $match, $cb) = @_;
263 root 1.3
264     if (!ref $match) {
265 root 1.8 push @{ $self->{rc0}{$match} }, [$cb];
266 root 1.3 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
267     my ($type, @match) = @$match;
268     @match
269 root 1.8 ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
270     : push @{ $self->{rc0}{$match->[0]} }, [$cb];
271 root 1.3 } else {
272 root 1.8 push @{ $self->{any} }, [$cb, $match];
273 root 1.3 }
274 root 1.2 }
275    
276 root 1.8 =item $port->register ($name)
277 root 1.2
278 root 1.8 Registers the given port under the well known name C<$name>. If the name
279     already exists it is replaced.
280 root 1.2
281 root 1.8 A port can only be registered under one well known name.
282 root 1.3
283 root 1.8 =cut
284 root 1.3
285 root 1.8 sub register {
286     my ($self, $name) = @_;
287 root 1.3
288 root 1.8 $self->{wkname} = $name;
289     $AnyEvent::MP::Base::WKP{$name} = "$self";
290 root 1.1 }
291    
292 root 1.8 =item $port->destroy
293 root 1.2
294 root 1.8 Explicitly destroy/remove/nuke/vaporise the port.
295 root 1.2
296 root 1.8 Ports are normally kept alive by there mere existance alone, and need to
297     be destroyed explicitly.
298 root 1.2
299 root 1.8 =cut
300 root 1.1
301 root 1.8 sub destroy {
302     my ($self) = @_;
303 root 1.1
304 root 1.18 AnyEvent::MP::Base::del $self->{id};
305    
306 root 1.8 delete $AnyEvent::MP::Base::WKP{ $self->{wkname} };
307 root 1.2
308 root 1.8 delete $AnyEvent::MP::Base::PORT{$_}
309     for @{ $self->{names} };
310 root 1.2 }
311    
312 root 1.8 =back
313    
314     =head1 FUNCTIONS FOR NODES
315    
316     =over 4
317 root 1.2
318 root 1.8 =item mon $noderef, $callback->($noderef, $status, $)
319 root 1.2
320 root 1.8 Monitors the given noderef.
321 root 1.2
322 root 1.8 =item become_public endpoint...
323    
324     Tells the node to become a public node, i.e. reachable from other nodes.
325    
326     If no arguments are given, or the first argument is C<undef>, then
327     AnyEvent::MP tries to bind on port C<4040> on all IP addresses that the
328     local nodename resolves to.
329    
330     Otherwise the first argument must be an array-reference with transport
331     endpoints ("ip:port", "hostname:port") or port numbers (in which case the
332     local nodename is used as hostname). The endpoints are all resolved and
333     will become the node reference.
334 root 1.2
335 root 1.8 =cut
336 root 1.1
337 root 1.4 =back
338    
339     =head1 NODE MESSAGES
340    
341 root 1.5 Nodes understand the following messages sent to them. Many of them take
342     arguments called C<@reply>, which will simply be used to compose a reply
343     message - C<$reply[0]> is the port to reply to, C<$reply[1]> the type and
344     the remaining arguments are simply the message data.
345 root 1.4
346     =over 4
347    
348     =cut
349    
350 root 1.8 =item wkp => $name, @reply
351 root 1.3
352 root 1.8 Replies with the port ID of the specified well-known port, or C<undef>.
353 root 1.3
354 root 1.7 =item devnull => ...
355    
356     Generic data sink/CPU heat conversion.
357    
358 root 1.4 =item relay => $port, @msg
359    
360     Simply forwards the message to the given port.
361    
362     =item eval => $string[ @reply]
363    
364     Evaluates the given string. If C<@reply> is given, then a message of the
365 root 1.5 form C<@reply, $@, @evalres> is sent.
366    
367     Example: crash another node.
368    
369     snd $othernode, eval => "exit";
370 root 1.4
371     =item time => @reply
372    
373     Replies the the current node time to C<@reply>.
374    
375 root 1.5 Example: tell the current node to send the current time to C<$myport> in a
376     C<timereply> message.
377    
378     snd $NODE, time => $myport, timereply => 1, 2;
379     # => snd $myport, timereply => 1, 2, <time>
380    
381 root 1.2 =back
382    
383 root 1.1 =head1 SEE ALSO
384    
385     L<AnyEvent>.
386    
387     =head1 AUTHOR
388    
389     Marc Lehmann <schmorp@schmorp.de>
390     http://home.schmorp.de/
391    
392     =cut
393    
394     1
395