ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Kernel.pm (file contents):
Revision 1.20 by root, Thu Aug 27 07:12:48 2009 UTC vs.
Revision 1.21 by root, Thu Aug 27 21:29:37 2009 UTC

36use base "Exporter"; 36use base "Exporter";
37 37
38our $VERSION = '0.8'; 38our $VERSION = '0.8';
39our @EXPORT = qw( 39our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID 40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 connect_node add_node load_func snd_to_func snd_on eval_on 41 add_node load_func snd_to_func snd_on eval_on
42 42
43 NODE $NODE node_of snd kil 43 NODE $NODE node_of snd kil
44 port_is_local 44 port_is_local
45 resolve_node initialise_node 45 resolve_node initialise_node
46 known_nodes up_nodes mon_nodes node_is_known node_is_up 46 known_nodes up_nodes mon_nodes node_is_known node_is_up
108 } 108 }
109 109
110 $nonce 110 $nonce
111} 111}
112 112
113sub asciibits($) { 113sub alnumbits($) {
114 my $data = $_[0]; 114 my $data = $_[0];
115 115
116 if (eval "use Math::GMP 2.05; 1") { 116 if (eval "use Math::GMP 2.05; 1") {
117 $data = Math::GMP::get_str_gmp ( 117 $data = Math::GMP::get_str_gmp (
118 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)), 118 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
127 127
128 $data 128 $data
129} 129}
130 130
131sub gen_uniq { 131sub gen_uniq {
132 asciibits pack "wNa*", $$, time, nonce 2 132 alnumbits pack "wNa*", $$, time, nonce 2
133} 133}
134 134
135=item $AnyEvent::MP::Kernel::PUBLIC 135=item $AnyEvent::MP::Kernel::PUBLIC
136 136
137A boolean indicating whether this is a full/public node, which can create 137A boolean indicating whether this is a public node, which can create and
138and accept direct connections form othe rnodes. 138accept direct connections from other nodes.
139
140=item $AnyEvent::MP::Kernel::SLAVE
141
142A boolean indicating whether this node is a slave node, i.e. does most of it's
143message sending/receiving through some master node.
144
145=item $AnyEvent::MP::Kernel::MASTER
146
147Defined only in slave mode, in which case it contains the noderef of the
148master node.
149 139
150=cut 140=cut
151 141
152our $CONFIG; # this node's configuration 142our $CONFIG; # this node's configuration
153our $PUBLIC = 0;
154our $SLAVE = "";
155our $MASTER; # master noderef when $SLAVE
156 143
157our $NODE = asciibits nonce 16; 144our $RUNIQ = alnumbits nonce 16;; # remote uniq value
158our $NODEID = $NODE; # same as NODE, except slave nodes have no @master part
159our $RUNIQ = $NODE; # remote uniq value
160our $UNIQ = gen_uniq; # per-process/node unique cookie 145our $UNIQ = gen_uniq; # per-process/node unique cookie
146our $NODE = "anon/$RUNIQ";
161our $ID = "a"; 147our $ID = "a";
162 148
163our %NODE; # node id to transport mapping, or "undef", for local node 149our %NODE; # node id to transport mapping, or "undef", for local node
164our (%PORT, %PORT_DATA); # local ports 150our (%PORT, %PORT_DATA); # local ports
165 151
166our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb) 152our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
167our %LMON; # monitored _local_ ports 153our %LMON; # monitored _local_ ports
168 154
169our %LISTENER; 155our %LISTENER;
156our $LISTENER; # our listeners, as arrayref
170 157
171our $SRCNODE; # holds the sending node during _inject 158our $SRCNODE; # holds the sending node during _inject
172 159
173sub NODE() { 160sub NODE() {
174 $NODE 161 $NODE
175} 162}
176 163
177sub node_of($) { 164sub node_of($) {
178 my ($noderef, undef) = split /#/, $_[0], 2; 165 my ($node, undef) = split /#/, $_[0], 2;
179 166
180 $noderef 167 $node
181} 168}
182 169
183BEGIN { 170BEGIN {
184 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE} 171 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
185 ? sub () { 1 } 172 ? sub () { 1 }
186 : sub () { 0 }; 173 : sub () { 0 };
187} 174}
188 175
189sub _inject { 176sub _inject {
190 warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE && @_;#d# 177 warn "RCV $SRCNODE->{id} -> @_\n" if TRACE && @_;#d#
191 &{ $PORT{+shift} or return }; 178 &{ $PORT{+shift} or return };
192} 179}
193 180
194# this function adds a node-ref, so you can send stuff to it 181# this function adds a node-ref, so you can send stuff to it
195# it is basically the central routing component. 182# it is basically the central routing component.
196sub add_node { 183sub add_node {
197 my ($noderef) = @_; 184 my ($node) = @_;
198 185
199 $NODE{$noderef} ||= do {
200 # new node, check validity
201 my $node;
202
203 if ($noderef =~ /^slave\/.+$/) {
204 # slave node without routing part -> direct connection
205 # only really valid from transports
206 $node = new AnyEvent::MP::Node::Direct $noderef; 186 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
207
208 } else {
209 # direct node (or slave node without routing part)
210
211 for (split /,/, $noderef) {
212 my ($host, $port) = AnyEvent::Socket::parse_hostport $_
213 or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
214
215 $port > 0
216 or Carp::croak "$noderef: not a resolved node reference ('$_' contains invalid port)";
217
218 AnyEvent::Socket::parse_address $host
219 or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
220 }
221
222 $node = new AnyEvent::MP::Node::Direct $noderef;
223 }
224
225 $node
226 }
227}
228
229sub connect_node {
230 &add_node->connect;
231} 187}
232 188
233sub snd(@) { 189sub snd(@) {
234 my ($noderef, $portid) = split /#/, shift, 2; 190 my ($nodeid, $portid) = split /#/, shift, 2;
235 191
236 warn "SND $noderef <- $portid @_\n" if TRACE;#d# 192 warn "SND $nodeid <- $portid @_\n" if TRACE;#d#
237 193
238 ($NODE{$noderef} || add_node $noderef) 194 ($NODE{$nodeid} || add_node $nodeid)
239 ->{send} (["$portid", @_]); 195 ->{send} (["$portid", @_]);
240} 196}
241 197
242=item $is_local = port_is_local $port 198=item $is_local = port_is_local $port
243 199
244Returns true iff the port is a local port. 200Returns true iff the port is a local port.
245 201
246=cut 202=cut
247 203
248sub port_is_local($) { 204sub port_is_local($) {
249 my ($noderef, undef) = split /#/, $_[0], 2; 205 my ($nodeid, undef) = split /#/, $_[0], 2;
250 206
251 $NODE{$noderef} == $NODE{""} 207 $NODE{$nodeid} == $NODE{""}
252} 208}
253 209
254=item snd_to_func $node, $func, @args 210=item snd_to_func $node, $func, @args
255 211
256Expects a noderef and a name of a function. Asynchronously tries to call 212Expects a node ID and a name of a function. Asynchronously tries to call
257this function with the given arguments on that node. 213this function with the given arguments on that node.
258 214
259This function can be used to implement C<spawn>-like interfaces. 215This function can be used to implement C<spawn>-like interfaces.
260 216
261=cut 217=cut
262 218
263sub snd_to_func($$;@) { 219sub snd_to_func($$;@) {
264 my $noderef = shift; 220 my $nodeid = shift;
265 221
266 ($NODE{$noderef} || add_node $noderef) 222 ($NODE{$nodeid} || add_node $nodeid)
267 ->send (["", @_]); 223 ->send (["", @_]);
268} 224}
269 225
270=item snd_on $node, @msg 226=item snd_on $node, @msg
271 227
289 my $node = shift; 245 my $node = shift;
290 snd $node, eval => @_; 246 snd $node, eval => @_;
291} 247}
292 248
293sub kil(@) { 249sub kil(@) {
294 my ($noderef, $portid) = split /#/, shift, 2; 250 my ($nodeid, $portid) = split /#/, shift, 2;
295 251
296 length $portid 252 length $portid
297 or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught"; 253 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
298 254
299 ($NODE{$noderef} || add_node $noderef) 255 ($NODE{$nodeid} || add_node $nodeid)
300 ->kill ("$portid", @_); 256 ->kill ("$portid", @_);
301} 257}
302 258
303sub _nodename { 259sub _nodename {
304 require POSIX; 260 require POSIX;
305 (POSIX::uname ())[1] 261 (POSIX::uname ())[1]
306} 262}
307 263
308sub resolve_node($) { 264sub _resolve($) {
309 my ($noderef) = @_; 265 my ($nodeid) = @_;
310 266
311 my $cv = AE::cv; 267 my $cv = AE::cv;
312 my @res; 268 my @res;
313 269
314 $cv->begin (sub { 270 $cv->begin (sub {
315 my %seen; 271 my %seen;
316 my @refs; 272 my @refs;
317 for (sort { $a->[0] <=> $b->[0] } @res) { 273 for (sort { $a->[0] <=> $b->[0] } @res) {
318 push @refs, $_->[1] unless $seen{$_->[1]}++ 274 push @refs, $_->[1] unless $seen{$_->[1]}++
319 } 275 }
320 shift->send (join ",", @refs); 276 shift->send (@refs);
321 }); 277 });
322 278
323 $noderef = $DEFAULT_PORT unless length $noderef; 279 $nodeid = $DEFAULT_PORT unless length $nodeid;
324 280
325 my $idx; 281 my $idx;
326 for my $t (split /,/, $noderef) { 282 for my $t (split /,/, $nodeid) {
327 my $pri = ++$idx; 283 my $pri = ++$idx;
328 284
329 $t = length $t ? _nodename . ":$t" : _nodename 285 $t = length $t ? _nodename . ":$t" : _nodename
330 if $t =~ /^\d*$/; 286 if $t =~ /^\d*$/;
331 287
348 $cv->end; 304 $cv->end;
349 305
350 $cv 306 $cv
351} 307}
352 308
353sub initialise_node(@) { 309sub initialise_node($;%) {
354 my ($noderef, @others) = @_; 310 my ($profile) = @_;
355 311
312 $profile = _nodename
313 unless defined $profile;
314
356 $CONFIG = AnyEvent::MP::Config::find_profile 315 $CONFIG = AnyEvent::MP::Config::find_profile $profile;
357 +(defined $noderef ? $noderef : _nodename); 316 $NODE = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid}
358 317 : $profile eq "anon/" ? $NODE
359 $noderef = $CONFIG->{noderef} 318 : $profile;
360 if exists $CONFIG->{noderef};
361
362 push @others, @{ $CONFIG->{seeds} };
363
364 @others = map $_->recv, map +(resolve_node $_), @others;
365
366 if ($noderef =~ /^slave\/(.*)$/) {
367 my $name = $1;
368 $name = $NODE unless length $name;
369
370 @others
371 or Carp::croak "seed nodes must be specified for slave nodes";
372
373 $SLAVE = 1;
374 $NODE = "slave/$name";
375
376 } else {
377 $PUBLIC = 1;
378 $NODE = (resolve_node $noderef)->recv;
379 }
380 319
381 $NODE{$NODE} = $NODE{""}; 320 $NODE{$NODE} = $NODE{""};
382 $NODE{$NODE}{noderef} = $NODE; 321 $NODE{$NODE}{id} = $NODE;
383 322
384 unless ($SLAVE) { 323 my $seeds = $CONFIG->{seeds};
385 for my $t (split /,/, $NODE) { 324 my $binds = $CONFIG->{binds};
325
326 $binds ||= [$NODE];
327
328 $WARN->(8, "node $NODE starting up.");
329
330 for (map _resolve $_, @$binds) {
331 for my $bind ($_->recv) {
386 my ($host, $port) = AnyEvent::Socket::parse_hostport $t; 332 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
333 or Carp::croak "$bind: unparsable local bind address";
387 334
388 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port, 335 $LISTENER{$bind} = AnyEvent::MP::Transport::mp_server $host, $port;
389 sub { 336 push @$LISTENER, $bind;
390 my ($tp) = @_;
391
392 # TODO: urgs
393 my $node = add_node $tp->{remote_node};
394 $node->{trial}{accept} = $tp;
395 },
396 ;
397 } 337 }
398 } 338 }
399 339
400 for (@others) { 340 # the global service is mandatory currently
401 my $node = add_node $_; 341 require AnyEvent::MP::Global;
402 $node->{autoconnect} = 1; 342
403 $node->connect; 343 # connect to all seednodes
404 } 344 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
405 345
406 for (@{ $CONFIG->{services} }) { 346 for (@{ $CONFIG->{services} }) {
407 if (s/::$//) { 347 if (s/::$//) {
408 eval "require $_"; 348 eval "require $_";
409 die $@ if $@; 349 die $@ if $@;
410 } else { 350 } else {
411 (load_func $_)->(); 351 (load_func $_)->();
412 } 352 }
413 } 353 }
414
415 # slave nodes need global
416 require AnyEvent::MP::Global
417 if $SLAVE;
418} 354}
419 355
420############################################################################# 356#############################################################################
421# node monitoring and info 357# node monitoring and info
422 358
427 363
428 values %node; 364 values %node;
429} 365}
430 366
431sub _public_nodes { 367sub _public_nodes {
432 grep $_->{noderef} !~ /^slave\//, _uniq_nodes 368 &_uniq_nodes
433} 369}
434 370
435=item node_is_known $noderef 371=item node_is_known $nodeid
436 372
437Returns true iff the given node is currently known to the system. 373Returns true iff the given node is currently known to the system.
438 374
439=cut 375=cut
440 376
441sub node_is_known($) { 377sub node_is_known($) {
442 exists $NODE{$_[0]} 378 exists $NODE{$_[0]}
443} 379}
444 380
445=item node_is_up $noderef 381=item node_is_up $nodeid
446 382
447Returns true if the given node is "up", that is, the kernel thinks it has 383Returns true if the given node is "up", that is, the kernel thinks it has
448a working connection to it. 384a working connection to it.
449 385
450If the node is known but not currently connected, returns C<0>. If the 386If the node is known but not currently connected, returns C<0>. If the
457 ? 1 : 0 393 ? 1 : 0
458} 394}
459 395
460=item known_nodes 396=item known_nodes
461 397
462Returns the noderefs of all public nodes connected to this node, including 398Returns the node IDs of all public nodes connected to this node, including
463itself. 399itself.
464 400
465=cut 401=cut
466 402
467sub known_nodes { 403sub known_nodes {
468 map $_->{noderef}, _public_nodes 404 map $_->{id}, _public_nodes
469} 405}
470 406
471=item up_nodes 407=item up_nodes
472 408
473Return the noderefs of all public nodes that are currently connected 409Return the node IDs of all public nodes that are currently connected
474(excluding the node itself). 410(excluding the node itself).
475 411
476=cut 412=cut
477 413
478sub up_nodes { 414sub up_nodes {
479 map $_->{noderef}, grep $_->{transport}, _public_nodes 415 map $_->{id}, grep $_->{transport}, _public_nodes
480} 416}
481 417
482=item $guard = mon_nodes $callback->($noderef, $is_up, @reason) 418=item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
483 419
484Registers a callback that is called each time a node goes up (connection 420Registers a callback that is called each time a node goes up (connection
485is established) or down (connection is lost). 421is established) or down (connection is lost).
486 422
487Node up messages can only be followed by node down messages for the same 423Node up messages can only be followed by node down messages for the same
504 440
505sub _inject_nodeevent($$;@) { 441sub _inject_nodeevent($$;@) {
506 my ($node, $up, @reason) = @_; 442 my ($node, $up, @reason) = @_;
507 443
508 for my $cb (values %MON_NODES) { 444 for my $cb (values %MON_NODES) {
509 eval { $cb->($node->{noderef}, $up, @reason); 1 } 445 eval { $cb->($node->{id}, $up, @reason); 1 }
510 or $WARN->(1, $@); 446 or $WARN->(1, $@);
511 } 447 }
512 448
513 $WARN->(7, "$node->{noderef} is " . ($up ? "up" : "down") . " (@reason)"); 449 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
514} 450}
515 451
516############################################################################# 452#############################################################################
517# self node code 453# self node code
518 454

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines