… | |
… | |
95 | $uniq = MIME::Base64::encode_base64 $uniq, ""; |
95 | $uniq = MIME::Base64::encode_base64 $uniq, ""; |
96 | $uniq =~ s/=+$//; |
96 | $uniq =~ s/=+$//; |
97 | $uniq |
97 | $uniq |
98 | } |
98 | } |
99 | |
99 | |
|
|
100 | our $NODE = unpack "H*", nonce 16; |
100 | our $UNIQ = gen_uniq; # per-process/node unique cookie |
101 | our $UNIQ = gen_uniq; # per-process/node unique cookie |
101 | our $ID = "a"; |
102 | our $ID = "a"; |
102 | our $PUBLIC = 0; |
103 | our $PUBLIC = 0; |
103 | our $NODE = unpack "H*", nonce 16; |
104 | our $SLAVE = 0; |
104 | |
105 | |
105 | our %NODE; # node id to transport mapping, or "undef", for local node |
106 | our %NODE; # node id to transport mapping, or "undef", for local node |
106 | our (%PORT, %PORT_DATA); # local ports |
107 | our (%PORT, %PORT_DATA); # local ports |
107 | |
108 | |
108 | our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb) |
109 | our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb) |
… | |
… | |
125 | } |
126 | } |
126 | |
127 | |
127 | sub _ANY_() { 1 } |
128 | sub _ANY_() { 1 } |
128 | sub _any_() { \&_ANY_ } |
129 | sub _any_() { \&_ANY_ } |
129 | |
130 | |
|
|
131 | sub TRACE() { 1 } |
|
|
132 | |
130 | sub _inject { |
133 | sub _inject { |
|
|
134 | warn "$SRCNODE->{noderef} -> @_\n" if TRACE;#d# |
131 | &{ $PORT{+shift} or return }; |
135 | &{ $PORT{+shift} or return }; |
132 | } |
136 | } |
133 | |
137 | |
134 | sub add_node { |
138 | sub add_node { |
135 | my ($noderef) = @_; |
139 | my ($noderef) = @_; |
… | |
… | |
141 | return $NODE{$noderef} = $NODE{$_} |
145 | return $NODE{$noderef} = $NODE{$_} |
142 | if exists $NODE{$_}; |
146 | if exists $NODE{$_}; |
143 | } |
147 | } |
144 | |
148 | |
145 | # new node, check validity |
149 | # new node, check validity |
|
|
150 | my $node; |
146 | |
151 | |
|
|
152 | if ($noderef =~ /^slave\/.+$/) { |
|
|
153 | $node = new AnyEvent::MP::Node::Slave $noderef; |
|
|
154 | |
|
|
155 | } else { |
147 | for (split /,/, $noderef) { |
156 | for (split /,/, $noderef) { |
148 | my ($host, $port) = AnyEvent::Socket::parse_hostport $_ |
157 | my ($host, $port) = AnyEvent::Socket::parse_hostport $_ |
149 | or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)"; |
158 | or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)"; |
150 | |
159 | |
151 | $port > 0 |
160 | $port > 0 |
152 | or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)"; |
161 | or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)"; |
153 | |
162 | |
154 | AnyEvent::Socket::parse_address $host |
163 | AnyEvent::Socket::parse_address $host |
155 | or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)"; |
164 | or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)"; |
156 | } |
165 | } |
157 | |
166 | |
158 | # TODO: for indirect sends, use a different class |
167 | # TODO: for indirect sends, use a different class |
159 | my $node = new AnyEvent::MP::Node::Direct $noderef; |
168 | $node = new AnyEvent::MP::Node::Direct $noderef; |
|
|
169 | } |
160 | |
170 | |
161 | $NODE{$_} = $node |
171 | $NODE{$_} = $node |
162 | for $noderef, split /,/, $noderef; |
172 | for $noderef, split /,/, $noderef; |
163 | |
173 | |
164 | $node |
174 | $node |
165 | } |
175 | } |
166 | |
176 | |
167 | sub snd(@) { |
177 | sub snd(@) { |
168 | my ($noderef, $port) = split /#/, shift, 2; |
178 | my ($noderef, $portid) = split /#/, shift, 2; |
|
|
179 | |
|
|
180 | warn "$noderef <- $portid @_\n" if TRACE;#d# |
169 | |
181 | |
170 | ($NODE{$noderef} || add_node $noderef) |
182 | ($NODE{$noderef} || add_node $noderef) |
171 | ->send ([$port, @_]); |
183 | ->send (["$portid", @_]); |
172 | } |
184 | } |
173 | |
185 | |
174 | sub kil(@) { |
186 | sub kil(@) { |
175 | my ($noderef, $port) = split /#/, shift, 2; |
187 | my ($noderef, $portid) = split /#/, shift, 2; |
176 | |
188 | |
177 | length $port or Carp::cluck "yuk\n";#d# |
|
|
178 | length $port |
189 | length $portid |
179 | or Carp::croak "$noderef: killing a node port is not allowed, caught"; |
190 | or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught"; |
180 | |
191 | |
181 | ($NODE{$noderef} || add_node $noderef) |
192 | ($NODE{$noderef} || add_node $noderef) |
182 | ->kill ($port, @_); |
193 | ->kill ("$portid", @_); |
183 | } |
194 | } |
184 | |
195 | |
185 | sub resolve_node($) { |
196 | sub resolve_node($) { |
186 | my ($noderef) = @_; |
197 | my ($noderef) = @_; |
187 | |
198 | |
… | |
… | |
252 | |
263 | |
253 | sub initialise_node($;@) { |
264 | sub initialise_node($;@) { |
254 | my ($noderef, @others) = @_; |
265 | my ($noderef, @others) = @_; |
255 | |
266 | |
256 | if ($noderef =~ /^slave\/(.*)$/) { |
267 | if ($noderef =~ /^slave\/(.*)$/) { |
|
|
268 | $SLAVE = AE::cv; |
257 | my $name = $1; |
269 | my $name = $1; |
258 | $name = $UNIQ unless length $name; |
270 | $name = $UNIQ unless length $name; |
259 | $noderef = AE::cv; |
271 | $noderef = AE::cv; |
260 | $noderef->send ("slave/$name"); |
272 | $noderef->send ("slave/$name"); |
|
|
273 | |
|
|
274 | @others |
|
|
275 | or Carp::croak "seed nodes must be specified for slave nodes"; |
|
|
276 | |
261 | } else { |
277 | } else { |
|
|
278 | $PUBLIC = 1; |
262 | $noderef = resolve_node $noderef; |
279 | $noderef = resolve_node $noderef; |
263 | } |
280 | } |
264 | |
281 | |
265 | @others = map $_->recv, map +(resolve_node $_), @others; |
282 | @others = map $_->recv, map +(resolve_node $_), @others; |
266 | |
283 | |
… | |
… | |
280 | $node->{trial}{accept} = $tp; |
297 | $node->{trial}{accept} = $tp; |
281 | }, |
298 | }, |
282 | ; |
299 | ; |
283 | } |
300 | } |
284 | |
301 | |
285 | $PUBLIC = 1; |
302 | (add_node $_)->connect for @others; |
|
|
303 | |
|
|
304 | if ($SLAVE) { |
|
|
305 | $SLAVE->recv; |
|
|
306 | $SLAVE = 1; |
|
|
307 | } |
286 | } |
308 | } |
287 | |
309 | |
288 | ############################################################################# |
310 | ############################################################################# |
289 | # self node code |
311 | # self node code |
290 | |
312 | |
291 | our %node_req = ( |
313 | our %node_req = ( |
|
|
314 | # internal services |
|
|
315 | |
292 | # monitoring |
316 | # monitoring |
293 | mon0 => sub { # disable monitoring |
317 | mon0 => sub { # disable monitoring |
294 | my $portid = shift; |
318 | my $portid = shift; |
295 | my $node = $SRCNODE; |
319 | my $node = $SRCNODE; |
296 | $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid}); |
320 | $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid}); |
… | |
… | |
306 | my $cbs = delete $SRCNODE->{lmon}{+shift} |
330 | my $cbs = delete $SRCNODE->{lmon}{+shift} |
307 | or return; |
331 | or return; |
308 | |
332 | |
309 | $_->(@_) for @$cbs; |
333 | $_->(@_) for @$cbs; |
310 | }, |
334 | }, |
|
|
335 | # node changed its name (for slave nodes) |
|
|
336 | iam => sub { |
|
|
337 | $SRCNODE->{noderef} = $_[0]; |
|
|
338 | $NODE{$_[0]} = $SRCNODE; |
|
|
339 | }, |
|
|
340 | |
|
|
341 | # public services |
311 | |
342 | |
312 | # well-known-port lookup |
343 | # well-known-port lookup |
313 | lookup => sub { |
344 | lookup => sub { |
314 | my $name = shift; |
345 | my $name = shift; |
315 | my $port = $REG{$name}; |
346 | my $port = $REG{$name}; |