ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
(Generate patch)

Comparing AnyEvent-MP/MP.pm (file contents):
Revision 1.2 by root, Fri Jul 31 20:55:46 2009 UTC vs.
Revision 1.3 by root, Sat Aug 1 07:11:45 2009 UTC

37 37
38A port is something you can send messages to with the C<snd> function, and 38A port is something you can send messages to with the C<snd> function, and
39you can register C<rcv> handlers with. All C<rcv> handlers will receive 39you can register C<rcv> handlers with. All C<rcv> handlers will receive
40messages they match, messages will not be queued. 40messages they match, messages will not be queued.
41 41
42=item port id - C<pid@host#portname> 42=item port id - C<noderef#portname>
43 43
44A port id is always the node id, a hash-mark (C<#>) as separator, followed 44A port id is always the noderef, a hash-mark (C<#>) as separator, followed
45by a port name. 45by a port name (a printable string of unspecified format).
46
47A port name can be a well known port (basically an identifier/bareword),
48or a generated name, consisting of node id, a dot (C<.>), and an
49identifier.
50 46
51=item node 47=item node
52 48
53A node is a single process containing at least one port - the node 49A node is a single process containing at least one port - the node
54port. You can send messages to node ports to let them create new ports, 50port. You can send messages to node ports to let them create new ports,
55among other things. 51among other things.
56 52
57Initially, nodes are either private (single-process only) or hidden 53Initially, nodes are either private (single-process only) or hidden
58(connected to a father node only). Only when they epxlicitly "go public" 54(connected to a master node only). Only when they epxlicitly "become
59can you send them messages form unrelated other nodes. 55public" can you send them messages from unrelated other nodes.
60 56
61Public nodes automatically connect to all other public nodes in a network 57=item noderef - C<host:port,host:port...>, C<id@noderef, C<id>
62when they connect, creating a full mesh.
63 58
64=item node id - C<host:port>, C<id@host>, C<id>
65
66A node ID is a string that either uniquely identifies a given node (For 59A noderef is a string that either uniquely identifies a given node (for
67private and hidden nodes), or contains a recipe on how to reach a given 60private and hidden nodes), or contains a recipe on how to reach a given
68node (for public nodes). 61node (for public nodes).
69 62
70=back 63=back
71 64
72=head1 FUNCTIONS 65=head1 VARIABLES/FUNCTIONS
73 66
74=over 4 67=over 4
75 68
76=cut 69=cut
77 70
109 } 102 }
110 103
111 $DEFAULT_SECRET 104 $DEFAULT_SECRET
112} 105}
113 106
107=item NODE / $NODE
108
109The C<NODE ()> function and the C<$NODE> variable contain the noderef of
110the local node. The value is initialised by a call to C<become_public> or
111C<become_slave>, after which all local port identifiers become invalid.
112
113=cut
114
114our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie 115our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie
115our $PUBLIC = 0; 116our $PUBLIC = 0;
116our $NODE; 117our $NODE;
117our $PORT; 118our $PORT;
118 119
149 for $noderef, split /,/, $noderef; 150 for $noderef, split /,/, $noderef;
150 151
151 $node 152 $node
152} 153}
153 154
155=item snd $portid, type => @data
156
157=item snd $portid, @msg
158
159Send the given message to the given port ID, which can identify either a
160local or a remote port.
161
162While the message can be about anything, it is highly recommended to use
163a constant string as first element.
164
165The message data effectively becomes read-only after a call to this
166function: modifying any argument is not allowed and can cause many
167problems.
168
169The type of data you can transfer depends on the transport protocol: when
170JSON is used, then only strings, numbers and arrays and hashes consisting
171of those are allowed (no objects). When Storable is used, then anything
172that Storable can serialise and deserialise is allowed, and for the local
173node, anything can be passed.
174
175=cut
176
154sub snd($@) { 177sub snd($@) {
155 my ($noderef, $port) = split /#/, shift, 2; 178 my ($noderef, $port) = split /#/, shift, 2;
156 179
157 add_node $noderef 180 add_node $noderef
158 unless exists $NODE{$noderef}; 181 unless exists $NODE{$noderef};
159 182
160 $NODE{$noderef}->send ([$port, [@_]]); 183 $NODE{$noderef}->send (["$port", [@_]]);
184}
185
186=item rcv $portid, type => $callback->(@msg)
187
188=item rcv $portid, $smartmatch => $callback->(@msg)
189
190=item rcv $portid, [$smartmatch...] => $callback->(@msg)
191
192Register a callback on the port identified by C<$portid>, which I<must> be
193a local port.
194
195The callback has to return a true value when its work is done, after
196which is will be removed, or a false value in which case it will stay
197registered.
198
199If the match is an array reference, then it will be matched against the
200first elements of the message, otherwise only the first element is being
201matched.
202
203Any element in the match that is specified as C<_any_> (a function
204exported by this module) matches any single element of the message.
205
206While not required, it is highly recommended that the first matching
207element is a string identifying the message. The one-string-only match is
208also the most efficient match (by far).
209
210=cut
211
212sub rcv($@) {
213 my ($port, $match, $cb) = @_;
214
215 my $port = $PORT{$port}
216 or do {
217 my ($noderef, $lport) = split /#/, $port;
218 "AnyEvent::MP::Node::Self" eq ref $NODE{$noderef}
219 or Carp::croak "$port: can only rcv on local ports";
220
221 $PORT{$lport}
222 or Carp::croak "$port: port does not exist";
223
224 $PORT{$port} = $PORT{$lport} # also return
225 };
226
227 if (!ref $match) {
228 push @{ $port->{rc0}{$match} }, [$cb];
229 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
230 my ($type, @match) = @$match;
231 @match
232 ? push @{ $port->{rcv}{$match->[0]} }, [$cb, \@match]
233 : push @{ $port->{rc0}{$match->[0]} }, [$cb];
234 } else {
235 push @{ $port->{any} }, [$cb, $match];
236 }
161} 237}
162 238
163sub _inject { 239sub _inject {
164 my ($port, $msg) = @{+shift}; 240 my ($port, $msg) = @{+shift};
165 241
166 $port = $PORT{$port} 242 $port = $PORT{$port}
167 or return; 243 or return;
168 244
169 use Data::Dumper; 245 @_ = @$msg;
170 warn Dumper $msg; 246
247 for (@{ $port->{rc0}{$msg->[0]} }) {
248 $_ && &{$_->[0]}
249 && undef $_;
250 }
251
252 for (@{ $port->{rcv}{$msg->[0]} }) {
253 $_ && [@_[1..$#{$_->[1]}]] ~~ $_->[1]
254 && &{$_->[0]}
255 && undef $_;
256 }
257
258 for (@{ $port->{any} }) {
259 $_ && [@_[0..$#{$_->[1]}]] ~~ $_->[1]
260 && &{$_->[0]}
261 && undef $_;
262 }
171} 263}
172 264
173sub normalise_noderef($) { 265sub normalise_noderef($) {
174 my ($noderef) = @_; 266 my ($noderef) = @_;
175 267
244 my $noderef = join ",", ref $_[0] ? @{+shift} : shift; 336 my $noderef = join ",", ref $_[0] ? @{+shift} : shift;
245 my @args = @_; 337 my @args = @_;
246 338
247 $NODE = (normalise_noderef $noderef)->recv; 339 $NODE = (normalise_noderef $noderef)->recv;
248 340
249 my $self = new AnyEvent::MP::Node::Self noderef => $NODE;
250
251 $NODE{""} = $self; # empty string == local node
252
253 for my $t (split /,/, $NODE) { 341 for my $t (split /,/, $NODE) {
254 $NODE{$t} = $self; 342 $NODE{$t} = $NODE{""};
255 343
256 my ($host, $port) = AnyEvent::Socket::parse_hostport $t; 344 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
257 345
258 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port, 346 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
259 @args, 347 @args,
274 } 362 }
275 363
276 $PUBLIC = 1; 364 $PUBLIC = 1;
277} 365}
278 366
367#############################################################################
368# self node code
369
370sub _new_port($) {
371 my ($name) = @_;
372
373 my ($noderef, $portname) = split /#/, $name;
374
375 $PORT{$name} =
376 $PORT{$portname} = {
377 names => [$name, $portname],
378 };
379}
380
381$NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE;
382_new_port "";
383
384rcv "", relay => \&snd;
385
279=back 386=back
280 387
281=head1 SEE ALSO 388=head1 SEE ALSO
282 389
283L<AnyEvent>. 390L<AnyEvent>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines