… | |
… | |
36 | use base "Exporter"; |
36 | use base "Exporter"; |
37 | |
37 | |
38 | our $VERSION = '0.8'; |
38 | our $VERSION = '0.8'; |
39 | our @EXPORT = qw( |
39 | our @EXPORT = qw( |
40 | %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID |
40 | %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID |
41 | connect_node add_node load_func snd_to_func snd_on eval_on |
41 | add_node load_func snd_to_func snd_on eval_on |
42 | |
42 | |
43 | NODE $NODE node_of snd kil |
43 | NODE $NODE node_of snd kil |
44 | port_is_local |
44 | port_is_local |
45 | resolve_node initialise_node |
45 | resolve_node initialise_node |
46 | known_nodes up_nodes mon_nodes node_is_known node_is_up |
46 | known_nodes up_nodes mon_nodes node_is_known node_is_up |
… | |
… | |
108 | } |
108 | } |
109 | |
109 | |
110 | $nonce |
110 | $nonce |
111 | } |
111 | } |
112 | |
112 | |
113 | sub asciibits($) { |
113 | sub alnumbits($) { |
114 | my $data = $_[0]; |
114 | my $data = $_[0]; |
115 | |
115 | |
116 | if (eval "use Math::GMP 2.05; 1") { |
116 | if (eval "use Math::GMP 2.05; 1") { |
117 | $data = Math::GMP::get_str_gmp ( |
117 | $data = Math::GMP::get_str_gmp ( |
118 | (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)), |
118 | (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)), |
… | |
… | |
127 | |
127 | |
128 | $data |
128 | $data |
129 | } |
129 | } |
130 | |
130 | |
131 | sub gen_uniq { |
131 | sub gen_uniq { |
132 | asciibits pack "wNa*", $$, time, nonce 2 |
132 | alnumbits pack "wNa*", $$, time, nonce 2 |
133 | } |
133 | } |
134 | |
134 | |
135 | =item $AnyEvent::MP::Kernel::PUBLIC |
135 | =item $AnyEvent::MP::Kernel::PUBLIC |
136 | |
136 | |
137 | A boolean indicating whether this is a full/public node, which can create |
137 | A boolean indicating whether this is a public node, which can create and |
138 | and accept direct connections form othe rnodes. |
138 | accept direct connections from other nodes. |
139 | |
|
|
140 | =item $AnyEvent::MP::Kernel::SLAVE |
|
|
141 | |
|
|
142 | A boolean indicating whether this node is a slave node, i.e. does most of it's |
|
|
143 | message sending/receiving through some master node. |
|
|
144 | |
|
|
145 | =item $AnyEvent::MP::Kernel::MASTER |
|
|
146 | |
|
|
147 | Defined only in slave mode, in which case it contains the noderef of the |
|
|
148 | master node. |
|
|
149 | |
139 | |
150 | =cut |
140 | =cut |
151 | |
141 | |
152 | our $CONFIG; # this node's configuration |
142 | our $CONFIG; # this node's configuration |
153 | our $PUBLIC = 0; |
|
|
154 | our $SLAVE = ""; |
|
|
155 | our $MASTER; # master noderef when $SLAVE |
|
|
156 | |
143 | |
157 | our $NODE = asciibits nonce 16; |
144 | our $RUNIQ = alnumbits nonce 16;; # remote uniq value |
158 | our $NODEID = $NODE; # same as NODE, except slave nodes have no @master part |
|
|
159 | our $RUNIQ = $NODE; # remote uniq value |
|
|
160 | our $UNIQ = gen_uniq; # per-process/node unique cookie |
145 | our $UNIQ = gen_uniq; # per-process/node unique cookie |
|
|
146 | our $NODE = "anon/$RUNIQ"; |
161 | our $ID = "a"; |
147 | our $ID = "a"; |
162 | |
148 | |
163 | our %NODE; # node id to transport mapping, or "undef", for local node |
149 | our %NODE; # node id to transport mapping, or "undef", for local node |
164 | our (%PORT, %PORT_DATA); # local ports |
150 | our (%PORT, %PORT_DATA); # local ports |
165 | |
151 | |
166 | our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb) |
152 | our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb) |
167 | our %LMON; # monitored _local_ ports |
153 | our %LMON; # monitored _local_ ports |
168 | |
154 | |
169 | our %LISTENER; |
155 | our %LISTENER; |
|
|
156 | our $LISTENER; # our listeners, as arrayref |
170 | |
157 | |
171 | our $SRCNODE; # holds the sending node during _inject |
158 | our $SRCNODE; # holds the sending node during _inject |
172 | |
159 | |
173 | sub NODE() { |
160 | sub NODE() { |
174 | $NODE |
161 | $NODE |
175 | } |
162 | } |
176 | |
163 | |
177 | sub node_of($) { |
164 | sub node_of($) { |
178 | my ($noderef, undef) = split /#/, $_[0], 2; |
165 | my ($node, undef) = split /#/, $_[0], 2; |
179 | |
166 | |
180 | $noderef |
167 | $node |
181 | } |
168 | } |
182 | |
169 | |
183 | BEGIN { |
170 | BEGIN { |
184 | *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE} |
171 | *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE} |
185 | ? sub () { 1 } |
172 | ? sub () { 1 } |
186 | : sub () { 0 }; |
173 | : sub () { 0 }; |
187 | } |
174 | } |
188 | |
175 | |
189 | sub _inject { |
176 | sub _inject { |
190 | warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE && @_;#d# |
177 | warn "RCV $SRCNODE->{id} -> @_\n" if TRACE && @_;#d# |
191 | &{ $PORT{+shift} or return }; |
178 | &{ $PORT{+shift} or return }; |
192 | } |
179 | } |
193 | |
180 | |
194 | # this function adds a node-ref, so you can send stuff to it |
181 | # this function adds a node-ref, so you can send stuff to it |
195 | # it is basically the central routing component. |
182 | # it is basically the central routing component. |
196 | sub add_node { |
183 | sub add_node { |
197 | my ($noderef) = @_; |
184 | my ($node) = @_; |
198 | |
185 | |
199 | $NODE{$noderef} ||= do { |
|
|
200 | # new node, check validity |
|
|
201 | my $node; |
|
|
202 | |
|
|
203 | if ($noderef =~ /^slave\/.+$/) { |
|
|
204 | # slave node without routing part -> direct connection |
|
|
205 | # only really valid from transports |
|
|
206 | $node = new AnyEvent::MP::Node::Direct $noderef; |
186 | $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node |
207 | |
|
|
208 | } else { |
|
|
209 | # direct node (or slave node without routing part) |
|
|
210 | |
|
|
211 | for (split /,/, $noderef) { |
|
|
212 | my ($host, $port) = AnyEvent::Socket::parse_hostport $_ |
|
|
213 | or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)"; |
|
|
214 | |
|
|
215 | $port > 0 |
|
|
216 | or Carp::croak "$noderef: not a resolved node reference ('$_' contains invalid port)"; |
|
|
217 | |
|
|
218 | AnyEvent::Socket::parse_address $host |
|
|
219 | or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)"; |
|
|
220 | } |
|
|
221 | |
|
|
222 | $node = new AnyEvent::MP::Node::Direct $noderef; |
|
|
223 | } |
|
|
224 | |
|
|
225 | $node |
|
|
226 | } |
|
|
227 | } |
|
|
228 | |
|
|
229 | sub connect_node { |
|
|
230 | &add_node->connect; |
|
|
231 | } |
187 | } |
232 | |
188 | |
233 | sub snd(@) { |
189 | sub snd(@) { |
234 | my ($noderef, $portid) = split /#/, shift, 2; |
190 | my ($nodeid, $portid) = split /#/, shift, 2; |
235 | |
191 | |
236 | warn "SND $noderef <- $portid @_\n" if TRACE;#d# |
192 | warn "SND $nodeid <- $portid @_\n" if TRACE;#d# |
237 | |
193 | |
238 | ($NODE{$noderef} || add_node $noderef) |
194 | ($NODE{$nodeid} || add_node $nodeid) |
239 | ->{send} (["$portid", @_]); |
195 | ->{send} (["$portid", @_]); |
240 | } |
196 | } |
241 | |
197 | |
242 | =item $is_local = port_is_local $port |
198 | =item $is_local = port_is_local $port |
243 | |
199 | |
244 | Returns true iff the port is a local port. |
200 | Returns true iff the port is a local port. |
245 | |
201 | |
246 | =cut |
202 | =cut |
247 | |
203 | |
248 | sub port_is_local($) { |
204 | sub port_is_local($) { |
249 | my ($noderef, undef) = split /#/, $_[0], 2; |
205 | my ($nodeid, undef) = split /#/, $_[0], 2; |
250 | |
206 | |
251 | $NODE{$noderef} == $NODE{""} |
207 | $NODE{$nodeid} == $NODE{""} |
252 | } |
208 | } |
253 | |
209 | |
254 | =item snd_to_func $node, $func, @args |
210 | =item snd_to_func $node, $func, @args |
255 | |
211 | |
256 | Expects a noderef and a name of a function. Asynchronously tries to call |
212 | Expects a node ID and a name of a function. Asynchronously tries to call |
257 | this function with the given arguments on that node. |
213 | this function with the given arguments on that node. |
258 | |
214 | |
259 | This function can be used to implement C<spawn>-like interfaces. |
215 | This function can be used to implement C<spawn>-like interfaces. |
260 | |
216 | |
261 | =cut |
217 | =cut |
262 | |
218 | |
263 | sub snd_to_func($$;@) { |
219 | sub snd_to_func($$;@) { |
264 | my $noderef = shift; |
220 | my $nodeid = shift; |
265 | |
221 | |
266 | ($NODE{$noderef} || add_node $noderef) |
222 | ($NODE{$nodeid} || add_node $nodeid) |
267 | ->send (["", @_]); |
223 | ->send (["", @_]); |
268 | } |
224 | } |
269 | |
225 | |
270 | =item snd_on $node, @msg |
226 | =item snd_on $node, @msg |
271 | |
227 | |
… | |
… | |
289 | my $node = shift; |
245 | my $node = shift; |
290 | snd $node, eval => @_; |
246 | snd $node, eval => @_; |
291 | } |
247 | } |
292 | |
248 | |
293 | sub kil(@) { |
249 | sub kil(@) { |
294 | my ($noderef, $portid) = split /#/, shift, 2; |
250 | my ($nodeid, $portid) = split /#/, shift, 2; |
295 | |
251 | |
296 | length $portid |
252 | length $portid |
297 | or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught"; |
253 | or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught"; |
298 | |
254 | |
299 | ($NODE{$noderef} || add_node $noderef) |
255 | ($NODE{$nodeid} || add_node $nodeid) |
300 | ->kill ("$portid", @_); |
256 | ->kill ("$portid", @_); |
301 | } |
257 | } |
302 | |
258 | |
303 | sub _nodename { |
259 | sub _nodename { |
304 | require POSIX; |
260 | require POSIX; |
305 | (POSIX::uname ())[1] |
261 | (POSIX::uname ())[1] |
306 | } |
262 | } |
307 | |
263 | |
308 | sub resolve_node($) { |
264 | sub _resolve($) { |
309 | my ($noderef) = @_; |
265 | my ($nodeid) = @_; |
310 | |
266 | |
311 | my $cv = AE::cv; |
267 | my $cv = AE::cv; |
312 | my @res; |
268 | my @res; |
313 | |
269 | |
314 | $cv->begin (sub { |
270 | $cv->begin (sub { |
315 | my %seen; |
271 | my %seen; |
316 | my @refs; |
272 | my @refs; |
317 | for (sort { $a->[0] <=> $b->[0] } @res) { |
273 | for (sort { $a->[0] <=> $b->[0] } @res) { |
318 | push @refs, $_->[1] unless $seen{$_->[1]}++ |
274 | push @refs, $_->[1] unless $seen{$_->[1]}++ |
319 | } |
275 | } |
320 | shift->send (join ",", @refs); |
276 | shift->send (@refs); |
321 | }); |
277 | }); |
322 | |
278 | |
323 | $noderef = $DEFAULT_PORT unless length $noderef; |
279 | $nodeid = $DEFAULT_PORT unless length $nodeid; |
324 | |
280 | |
325 | my $idx; |
281 | my $idx; |
326 | for my $t (split /,/, $noderef) { |
282 | for my $t (split /,/, $nodeid) { |
327 | my $pri = ++$idx; |
283 | my $pri = ++$idx; |
328 | |
284 | |
329 | $t = length $t ? _nodename . ":$t" : _nodename |
285 | $t = length $t ? _nodename . ":$t" : _nodename |
330 | if $t =~ /^\d*$/; |
286 | if $t =~ /^\d*$/; |
331 | |
287 | |
… | |
… | |
348 | $cv->end; |
304 | $cv->end; |
349 | |
305 | |
350 | $cv |
306 | $cv |
351 | } |
307 | } |
352 | |
308 | |
353 | sub initialise_node(@) { |
309 | sub initialise_node($;%) { |
354 | my ($noderef, @others) = @_; |
310 | my ($profile) = @_; |
355 | |
311 | |
|
|
312 | $profile = _nodename |
|
|
313 | unless defined $profile; |
|
|
314 | |
356 | $CONFIG = AnyEvent::MP::Config::find_profile |
315 | $CONFIG = AnyEvent::MP::Config::find_profile $profile; |
357 | +(defined $noderef ? $noderef : _nodename); |
316 | $NODE = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} |
358 | |
317 | : $profile eq "anon/" ? $NODE |
359 | $noderef = $CONFIG->{noderef} |
318 | : $profile; |
360 | if exists $CONFIG->{noderef}; |
|
|
361 | |
|
|
362 | push @others, @{ $CONFIG->{seeds} }; |
|
|
363 | |
|
|
364 | @others = map $_->recv, map +(resolve_node $_), @others; |
|
|
365 | |
|
|
366 | if ($noderef =~ /^slave\/(.*)$/) { |
|
|
367 | my $name = $1; |
|
|
368 | $name = $NODE unless length $name; |
|
|
369 | |
|
|
370 | @others |
|
|
371 | or Carp::croak "seed nodes must be specified for slave nodes"; |
|
|
372 | |
|
|
373 | $SLAVE = 1; |
|
|
374 | $NODE = "slave/$name"; |
|
|
375 | |
|
|
376 | } else { |
|
|
377 | $PUBLIC = 1; |
|
|
378 | $NODE = (resolve_node $noderef)->recv; |
|
|
379 | } |
|
|
380 | |
319 | |
381 | $NODE{$NODE} = $NODE{""}; |
320 | $NODE{$NODE} = $NODE{""}; |
382 | $NODE{$NODE}{noderef} = $NODE; |
321 | $NODE{$NODE}{id} = $NODE; |
383 | |
322 | |
384 | unless ($SLAVE) { |
323 | my $seeds = $CONFIG->{seeds}; |
385 | for my $t (split /,/, $NODE) { |
324 | my $binds = $CONFIG->{binds}; |
|
|
325 | |
|
|
326 | $binds ||= [$NODE]; |
|
|
327 | |
|
|
328 | $WARN->(8, "node $NODE starting up."); |
|
|
329 | |
|
|
330 | for (map _resolve $_, @$binds) { |
|
|
331 | for my $bind ($_->recv) { |
386 | my ($host, $port) = AnyEvent::Socket::parse_hostport $t; |
332 | my ($host, $port) = AnyEvent::Socket::parse_hostport $bind |
|
|
333 | or Carp::croak "$bind: unparsable local bind address"; |
387 | |
334 | |
388 | $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port, |
335 | $LISTENER{$bind} = AnyEvent::MP::Transport::mp_server $host, $port; |
389 | sub { |
336 | push @$LISTENER, $bind; |
390 | my ($tp) = @_; |
|
|
391 | |
|
|
392 | # TODO: urgs |
|
|
393 | my $node = add_node $tp->{remote_node}; |
|
|
394 | $node->{trial}{accept} = $tp; |
|
|
395 | }, |
|
|
396 | ; |
|
|
397 | } |
337 | } |
398 | } |
338 | } |
399 | |
339 | |
400 | for (@others) { |
340 | # the global service is mandatory currently |
401 | my $node = add_node $_; |
341 | require AnyEvent::MP::Global; |
402 | $node->{autoconnect} = 1; |
342 | |
403 | $node->connect; |
343 | # connect to all seednodes |
404 | } |
344 | AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds); |
405 | |
345 | |
406 | for (@{ $CONFIG->{services} }) { |
346 | for (@{ $CONFIG->{services} }) { |
407 | if (s/::$//) { |
347 | if (s/::$//) { |
408 | eval "require $_"; |
348 | eval "require $_"; |
409 | die $@ if $@; |
349 | die $@ if $@; |
410 | } else { |
350 | } else { |
411 | (load_func $_)->(); |
351 | (load_func $_)->(); |
412 | } |
352 | } |
413 | } |
353 | } |
414 | |
|
|
415 | # slave nodes need global |
|
|
416 | require AnyEvent::MP::Global |
|
|
417 | if $SLAVE; |
|
|
418 | } |
354 | } |
419 | |
355 | |
420 | ############################################################################# |
356 | ############################################################################# |
421 | # node monitoring and info |
357 | # node monitoring and info |
422 | |
358 | |
… | |
… | |
427 | |
363 | |
428 | values %node; |
364 | values %node; |
429 | } |
365 | } |
430 | |
366 | |
431 | sub _public_nodes { |
367 | sub _public_nodes { |
432 | grep $_->{noderef} !~ /^slave\//, _uniq_nodes |
368 | &_uniq_nodes |
433 | } |
369 | } |
434 | |
370 | |
435 | =item node_is_known $noderef |
371 | =item node_is_known $nodeid |
436 | |
372 | |
437 | Returns true iff the given node is currently known to the system. |
373 | Returns true iff the given node is currently known to the system. |
438 | |
374 | |
439 | =cut |
375 | =cut |
440 | |
376 | |
441 | sub node_is_known($) { |
377 | sub node_is_known($) { |
442 | exists $NODE{$_[0]} |
378 | exists $NODE{$_[0]} |
443 | } |
379 | } |
444 | |
380 | |
445 | =item node_is_up $noderef |
381 | =item node_is_up $nodeid |
446 | |
382 | |
447 | Returns true if the given node is "up", that is, the kernel thinks it has |
383 | Returns true if the given node is "up", that is, the kernel thinks it has |
448 | a working connection to it. |
384 | a working connection to it. |
449 | |
385 | |
450 | If the node is known but not currently connected, returns C<0>. If the |
386 | If the node is known but not currently connected, returns C<0>. If the |
… | |
… | |
457 | ? 1 : 0 |
393 | ? 1 : 0 |
458 | } |
394 | } |
459 | |
395 | |
460 | =item known_nodes |
396 | =item known_nodes |
461 | |
397 | |
462 | Returns the noderefs of all public nodes connected to this node, including |
398 | Returns the node IDs of all public nodes connected to this node, including |
463 | itself. |
399 | itself. |
464 | |
400 | |
465 | =cut |
401 | =cut |
466 | |
402 | |
467 | sub known_nodes { |
403 | sub known_nodes { |
468 | map $_->{noderef}, _public_nodes |
404 | map $_->{id}, _public_nodes |
469 | } |
405 | } |
470 | |
406 | |
471 | =item up_nodes |
407 | =item up_nodes |
472 | |
408 | |
473 | Return the noderefs of all public nodes that are currently connected |
409 | Return the node IDs of all public nodes that are currently connected |
474 | (excluding the node itself). |
410 | (excluding the node itself). |
475 | |
411 | |
476 | =cut |
412 | =cut |
477 | |
413 | |
478 | sub up_nodes { |
414 | sub up_nodes { |
479 | map $_->{noderef}, grep $_->{transport}, _public_nodes |
415 | map $_->{id}, grep $_->{transport}, _public_nodes |
480 | } |
416 | } |
481 | |
417 | |
482 | =item $guard = mon_nodes $callback->($noderef, $is_up, @reason) |
418 | =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason) |
483 | |
419 | |
484 | Registers a callback that is called each time a node goes up (connection |
420 | Registers a callback that is called each time a node goes up (connection |
485 | is established) or down (connection is lost). |
421 | is established) or down (connection is lost). |
486 | |
422 | |
487 | Node up messages can only be followed by node down messages for the same |
423 | Node up messages can only be followed by node down messages for the same |
… | |
… | |
504 | |
440 | |
505 | sub _inject_nodeevent($$;@) { |
441 | sub _inject_nodeevent($$;@) { |
506 | my ($node, $up, @reason) = @_; |
442 | my ($node, $up, @reason) = @_; |
507 | |
443 | |
508 | for my $cb (values %MON_NODES) { |
444 | for my $cb (values %MON_NODES) { |
509 | eval { $cb->($node->{noderef}, $up, @reason); 1 } |
445 | eval { $cb->($node->{id}, $up, @reason); 1 } |
510 | or $WARN->(1, $@); |
446 | or $WARN->(1, $@); |
511 | } |
447 | } |
512 | |
448 | |
513 | $WARN->(7, "$node->{noderef} is " . ($up ? "up" : "down") . " (@reason)"); |
449 | $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)"); |
514 | } |
450 | } |
515 | |
451 | |
516 | ############################################################################# |
452 | ############################################################################# |
517 | # self node code |
453 | # self node code |
518 | |
454 | |