… | |
… | |
37 | |
37 | |
38 | A port is something you can send messages to with the C<snd> function, and |
38 | A port is something you can send messages to with the C<snd> function, and |
39 | you can register C<rcv> handlers with. All C<rcv> handlers will receive |
39 | you can register C<rcv> handlers with. All C<rcv> handlers will receive |
40 | messages they match, messages will not be queued. |
40 | messages they match, messages will not be queued. |
41 | |
41 | |
42 | =item port id - C<pid@host#portname> |
42 | =item port id - C<noderef#portname> |
43 | |
43 | |
44 | A port id is always the node id, a hash-mark (C<#>) as separator, followed |
44 | A port id is always the noderef, a hash-mark (C<#>) as separator, followed |
45 | by a port name. |
45 | by a port name (a printable string of unspecified format). |
46 | |
|
|
47 | A port name can be a well known port (basically an identifier/bareword), |
|
|
48 | or a generated name, consisting of node id, a dot (C<.>), and an |
|
|
49 | identifier. |
|
|
50 | |
46 | |
51 | =item node |
47 | =item node |
52 | |
48 | |
53 | A node is a single process containing at least one port - the node |
49 | A node is a single process containing at least one port - the node |
54 | port. You can send messages to node ports to let them create new ports, |
50 | port. You can send messages to node ports to let them create new ports, |
55 | among other things. |
51 | among other things. |
56 | |
52 | |
57 | Initially, nodes are either private (single-process only) or hidden |
53 | Initially, nodes are either private (single-process only) or hidden |
58 | (connected to a father node only). Only when they epxlicitly "go public" |
54 | (connected to a master node only). Only when they epxlicitly "become |
59 | can you send them messages form unrelated other nodes. |
55 | public" can you send them messages from unrelated other nodes. |
60 | |
56 | |
61 | Public nodes automatically connect to all other public nodes in a network |
57 | =item noderef - C<host:port,host:port...>, C<id@noderef, C<id> |
62 | when they connect, creating a full mesh. |
|
|
63 | |
58 | |
64 | =item node id - C<host:port>, C<id@host>, C<id> |
|
|
65 | |
|
|
66 | A node ID is a string that either uniquely identifies a given node (For |
59 | A noderef is a string that either uniquely identifies a given node (for |
67 | private and hidden nodes), or contains a recipe on how to reach a given |
60 | private and hidden nodes), or contains a recipe on how to reach a given |
68 | node (for public nodes). |
61 | node (for public nodes). |
69 | |
62 | |
70 | =back |
63 | =back |
71 | |
64 | |
72 | =head1 FUNCTIONS |
65 | =head1 VARIABLES/FUNCTIONS |
73 | |
66 | |
74 | =over 4 |
67 | =over 4 |
75 | |
68 | |
76 | =cut |
69 | =cut |
77 | |
70 | |
… | |
… | |
109 | } |
102 | } |
110 | |
103 | |
111 | $DEFAULT_SECRET |
104 | $DEFAULT_SECRET |
112 | } |
105 | } |
113 | |
106 | |
|
|
107 | =item NODE / $NODE |
|
|
108 | |
|
|
109 | The C<NODE ()> function and the C<$NODE> variable contain the noderef of |
|
|
110 | the local node. The value is initialised by a call to C<become_public> or |
|
|
111 | C<become_slave>, after which all local port identifiers become invalid. |
|
|
112 | |
|
|
113 | =cut |
|
|
114 | |
114 | our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie |
115 | our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie |
115 | our $PUBLIC = 0; |
116 | our $PUBLIC = 0; |
116 | our $NODE; |
117 | our $NODE; |
117 | our $PORT; |
118 | our $PORT; |
118 | |
119 | |
… | |
… | |
149 | for $noderef, split /,/, $noderef; |
150 | for $noderef, split /,/, $noderef; |
150 | |
151 | |
151 | $node |
152 | $node |
152 | } |
153 | } |
153 | |
154 | |
|
|
155 | =item snd $portid, type => @data |
|
|
156 | |
|
|
157 | =item snd $portid, @msg |
|
|
158 | |
|
|
159 | Send the given message to the given port ID, which can identify either a |
|
|
160 | local or a remote port. |
|
|
161 | |
|
|
162 | While the message can be about anything, it is highly recommended to use |
|
|
163 | a constant string as first element. |
|
|
164 | |
|
|
165 | The message data effectively becomes read-only after a call to this |
|
|
166 | function: modifying any argument is not allowed and can cause many |
|
|
167 | problems. |
|
|
168 | |
|
|
169 | The type of data you can transfer depends on the transport protocol: when |
|
|
170 | JSON is used, then only strings, numbers and arrays and hashes consisting |
|
|
171 | of those are allowed (no objects). When Storable is used, then anything |
|
|
172 | that Storable can serialise and deserialise is allowed, and for the local |
|
|
173 | node, anything can be passed. |
|
|
174 | |
|
|
175 | =cut |
|
|
176 | |
154 | sub snd($@) { |
177 | sub snd($@) { |
155 | my ($noderef, $port) = split /#/, shift, 2; |
178 | my ($noderef, $port) = split /#/, shift, 2; |
156 | |
179 | |
157 | add_node $noderef |
180 | add_node $noderef |
158 | unless exists $NODE{$noderef}; |
181 | unless exists $NODE{$noderef}; |
159 | |
182 | |
160 | $NODE{$noderef}->send ([$port, [@_]]); |
183 | $NODE{$noderef}->send (["$port", [@_]]); |
|
|
184 | } |
|
|
185 | |
|
|
186 | =item rcv $portid, type => $callback->(@msg) |
|
|
187 | |
|
|
188 | =item rcv $portid, $smartmatch => $callback->(@msg) |
|
|
189 | |
|
|
190 | =item rcv $portid, [$smartmatch...] => $callback->(@msg) |
|
|
191 | |
|
|
192 | Register a callback on the port identified by C<$portid>, which I<must> be |
|
|
193 | a local port. |
|
|
194 | |
|
|
195 | The callback has to return a true value when its work is done, after |
|
|
196 | which is will be removed, or a false value in which case it will stay |
|
|
197 | registered. |
|
|
198 | |
|
|
199 | If the match is an array reference, then it will be matched against the |
|
|
200 | first elements of the message, otherwise only the first element is being |
|
|
201 | matched. |
|
|
202 | |
|
|
203 | Any element in the match that is specified as C<_any_> (a function |
|
|
204 | exported by this module) matches any single element of the message. |
|
|
205 | |
|
|
206 | While not required, it is highly recommended that the first matching |
|
|
207 | element is a string identifying the message. The one-string-only match is |
|
|
208 | also the most efficient match (by far). |
|
|
209 | |
|
|
210 | =cut |
|
|
211 | |
|
|
212 | sub rcv($@) { |
|
|
213 | my ($port, $match, $cb) = @_; |
|
|
214 | |
|
|
215 | my $port = $PORT{$port} |
|
|
216 | or do { |
|
|
217 | my ($noderef, $lport) = split /#/, $port; |
|
|
218 | "AnyEvent::MP::Node::Self" eq ref $NODE{$noderef} |
|
|
219 | or Carp::croak "$port: can only rcv on local ports"; |
|
|
220 | |
|
|
221 | $PORT{$lport} |
|
|
222 | or Carp::croak "$port: port does not exist"; |
|
|
223 | |
|
|
224 | $PORT{$port} = $PORT{$lport} # also return |
|
|
225 | }; |
|
|
226 | |
|
|
227 | if (!ref $match) { |
|
|
228 | push @{ $port->{rc0}{$match} }, [$cb]; |
|
|
229 | } elsif (("ARRAY" eq ref $match && !ref $match->[0])) { |
|
|
230 | my ($type, @match) = @$match; |
|
|
231 | @match |
|
|
232 | ? push @{ $port->{rcv}{$match->[0]} }, [$cb, \@match] |
|
|
233 | : push @{ $port->{rc0}{$match->[0]} }, [$cb]; |
|
|
234 | } else { |
|
|
235 | push @{ $port->{any} }, [$cb, $match]; |
|
|
236 | } |
161 | } |
237 | } |
162 | |
238 | |
163 | sub _inject { |
239 | sub _inject { |
164 | my ($port, $msg) = @{+shift}; |
240 | my ($port, $msg) = @{+shift}; |
165 | |
241 | |
166 | $port = $PORT{$port} |
242 | $port = $PORT{$port} |
167 | or return; |
243 | or return; |
168 | |
244 | |
169 | use Data::Dumper; |
245 | @_ = @$msg; |
170 | warn Dumper $msg; |
246 | |
|
|
247 | for (@{ $port->{rc0}{$msg->[0]} }) { |
|
|
248 | $_ && &{$_->[0]} |
|
|
249 | && undef $_; |
|
|
250 | } |
|
|
251 | |
|
|
252 | for (@{ $port->{rcv}{$msg->[0]} }) { |
|
|
253 | $_ && [@_[1..$#{$_->[1]}]] ~~ $_->[1] |
|
|
254 | && &{$_->[0]} |
|
|
255 | && undef $_; |
|
|
256 | } |
|
|
257 | |
|
|
258 | for (@{ $port->{any} }) { |
|
|
259 | $_ && [@_[0..$#{$_->[1]}]] ~~ $_->[1] |
|
|
260 | && &{$_->[0]} |
|
|
261 | && undef $_; |
|
|
262 | } |
171 | } |
263 | } |
172 | |
264 | |
173 | sub normalise_noderef($) { |
265 | sub normalise_noderef($) { |
174 | my ($noderef) = @_; |
266 | my ($noderef) = @_; |
175 | |
267 | |
… | |
… | |
244 | my $noderef = join ",", ref $_[0] ? @{+shift} : shift; |
336 | my $noderef = join ",", ref $_[0] ? @{+shift} : shift; |
245 | my @args = @_; |
337 | my @args = @_; |
246 | |
338 | |
247 | $NODE = (normalise_noderef $noderef)->recv; |
339 | $NODE = (normalise_noderef $noderef)->recv; |
248 | |
340 | |
249 | my $self = new AnyEvent::MP::Node::Self noderef => $NODE; |
|
|
250 | |
|
|
251 | $NODE{""} = $self; # empty string == local node |
|
|
252 | |
|
|
253 | for my $t (split /,/, $NODE) { |
341 | for my $t (split /,/, $NODE) { |
254 | $NODE{$t} = $self; |
342 | $NODE{$t} = $NODE{""}; |
255 | |
343 | |
256 | my ($host, $port) = AnyEvent::Socket::parse_hostport $t; |
344 | my ($host, $port) = AnyEvent::Socket::parse_hostport $t; |
257 | |
345 | |
258 | $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port, |
346 | $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port, |
259 | @args, |
347 | @args, |
… | |
… | |
274 | } |
362 | } |
275 | |
363 | |
276 | $PUBLIC = 1; |
364 | $PUBLIC = 1; |
277 | } |
365 | } |
278 | |
366 | |
|
|
367 | ############################################################################# |
|
|
368 | # self node code |
|
|
369 | |
|
|
370 | sub _new_port($) { |
|
|
371 | my ($name) = @_; |
|
|
372 | |
|
|
373 | my ($noderef, $portname) = split /#/, $name; |
|
|
374 | |
|
|
375 | $PORT{$name} = |
|
|
376 | $PORT{$portname} = { |
|
|
377 | names => [$name, $portname], |
|
|
378 | }; |
|
|
379 | } |
|
|
380 | |
|
|
381 | $NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE; |
|
|
382 | _new_port ""; |
|
|
383 | |
|
|
384 | rcv "", relay => \&snd; |
|
|
385 | |
279 | =back |
386 | =back |
280 | |
387 | |
281 | =head1 SEE ALSO |
388 | =head1 SEE ALSO |
282 | |
389 | |
283 | L<AnyEvent>. |
390 | L<AnyEvent>. |