ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Kernel.pm (file contents):
Revision 1.105 by root, Fri Mar 23 13:46:03 2012 UTC vs.
Revision 1.106 by root, Fri Mar 23 21:16:25 2012 UTC

4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::MP::Kernel; 7 use AnyEvent::MP::Kernel;
8 8
9 $AnyEvent::MP::Kernel::SRCNODE # contains msg origin node id, for debugging
10
11 snd_to_func $node, $func, @args # send msg to function
12 snd_on $node, @msg # snd message again (relay)
13 eval_on $node, $string[, @reply] # execute perl code on another node
14
15 node_is_up $nodeid # return true if a node is connected
16 @nodes = up_nodes # return a list of all connected nodes
17 $guard = mon_nodes $callback->($node, $is_up, @reason) # connections up/downs
18
9=head1 DESCRIPTION 19=head1 DESCRIPTION
10 20
11This module provides most of the basic functionality of AnyEvent::MP, 21This module implements most of the inner workings of AnyEvent::MP. It
12exposed through higher level interfaces such as L<AnyEvent::MP> and 22offers mostly lower-level functions that deal with network connectivity
13L<Coro::MP>. 23and special requests.
14 24
15This module is mainly of interest when knowledge about connectivity, 25You normally interface with AnyEvent::MP through a higher level interface
16connected nodes etc. is sought. 26such as L<AnyEvent::MP> and L<Coro::MP>, although there is nothing wrong
27with using the functions from this module.
17 28
18=head1 GLOBALS AND FUNCTIONS 29=head1 GLOBALS AND FUNCTIONS
19 30
20=over 4 31=over 4
21 32
32use AnyEvent::MP::Node; 43use AnyEvent::MP::Node;
33use AnyEvent::MP::Transport; 44use AnyEvent::MP::Transport;
34 45
35use base "Exporter"; 46use base "Exporter";
36 47
37our @EXPORT_OK = qw( 48# for re-export in AnyEvent::MP and Coro::MP
38 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
39);
40
41our @EXPORT = qw( 49our @EXPORT_API = qw(
42 add_node load_func snd_to_func snd_on eval_on 50 NODE $NODE
43
44 NODE $NODE node_of snd kil port_is_local
45 configure 51 configure
46 up_nodes mon_nodes node_is_up 52 node_of port_is_local
53 snd kil
47 db_set db_del 54 db_set db_del
48 db_mon db_family db_keys db_values 55 db_mon db_family db_keys db_values
56);
57
58our @EXPORT_OK = (
59 # these are internal
60 qw(
61 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
62 add_node load_func
63 ),
64 @EXPORT_API,
65);
66
67our @EXPORT = qw(
68 snd_to_func snd_on eval_on
69 port_is_local
70 up_nodes mon_nodes node_is_up
49); 71);
50 72
51sub load_func($) { 73sub load_func($) {
52 my $func = $_[0]; 74 my $func = $_[0];
53 75
77 99
78sub nonce62($) { 100sub nonce62($) {
79 join "", map $alnum[rand 62], 1 .. $_[0] 101 join "", map $alnum[rand 62], 1 .. $_[0]
80} 102}
81 103
82sub gen_uniq {
83 my $now = AE::now;
84 (join "",
85 map $alnum[$_],
86 $$ / 62 % 62,
87 $$ % 62,
88 (int $now ) % 62,
89 (int $now * 100) % 62,
90 (int $now * 10000) % 62,
91 ) . nonce62 4;
92}
93
94our $CONFIG; # this node's configuration 104our $CONFIG; # this node's configuration
95our $SECURE; 105our $SECURE;
96 106
97our $RUNIQ; # remote uniq value 107our $RUNIQ; # remote uniq value
98our $UNIQ; # per-process/node unique cookie 108our $UNIQ; # per-process/node unique cookie
109our %BINDS; 119our %BINDS;
110our $BINDS; # our listeners, as arrayref 120our $BINDS; # our listeners, as arrayref
111 121
112our $SRCNODE; # holds the sending node _object_ during _inject 122our $SRCNODE; # holds the sending node _object_ during _inject
113 123
114sub _init_names { 124# initialise names for non-networked operation
125{
115 # ~54 bits, for local port names, lowercase $ID appended 126 # ~54 bits, for local port names, lowercase $ID appended
116 $UNIQ = gen_uniq; 127 my $now = AE::now;
128 $UNIQ =
129 (join "",
130 map $alnum[$_],
131 $$ / 62 % 62,
132 $$ % 62,
133 (int $now ) % 62,
134 (int $now * 100) % 62,
135 (int $now * 10000) % 62,
136 ) . nonce62 4
137 ;
117 138
118 # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes 139 # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
119 $RUNIQ = nonce62 10; 140 $RUNIQ = nonce62 10;
120 $RUNIQ =~ s/(.)$/\U$1/; 141 $RUNIQ =~ s/(.)$/\U$1/;
121 142
122 $NODE = "anon/$RUNIQ"; 143 $NODE = "";
123} 144}
124
125_init_names;
126 145
127sub NODE() { 146sub NODE() {
128 $NODE 147 $NODE
129} 148}
130 149
185 204
186 ($NODE{$nodeid} || add_node $nodeid) 205 ($NODE{$nodeid} || add_node $nodeid)
187 ->{send} (["$portid", @_]); 206 ->{send} (["$portid", @_]);
188} 207}
189 208
190=item $is_local = port_is_local $port
191
192Returns true iff the port is a local port.
193
194=cut
195
196sub port_is_local($) { 209sub port_is_local($) {
197 my ($nodeid, undef) = split /#/, $_[0], 2; 210 my ($nodeid, undef) = split /#/, $_[0], 2;
198 211
199 $NODE{$nodeid} == $NODE{""} 212 $nodeid eq $NODE
200} 213}
201 214
202=item snd_to_func $node, $func, @args 215=item snd_to_func $node, $func, @args
203 216
204Expects a node ID and a name of a function. Asynchronously tries to call 217Expects a node ID and a name of a function. Asynchronously tries to call
256} 269}
257 270
258############################################################################# 271#############################################################################
259# node monitoring and info 272# node monitoring and info
260 273
261=item node_is_up $nodeid 274=item $bool = node_is_up $nodeid
262 275
263Returns true if the given node is "up", that is, the kernel thinks it has 276Returns true if the given node is "up", that is, the kernel thinks it has
264a working connection to it. 277a working connection to it.
265 278
266If the node is up, returns C<1>. If the node is currently connecting or 279If the node is up, returns C<1>. If the node is currently connecting or
272sub node_is_up($) { 285sub node_is_up($) {
273 ($NODE{$_[0]} or return)->{transport} 286 ($NODE{$_[0]} or return)->{transport}
274 ? 1 : 0 287 ? 1 : 0
275} 288}
276 289
277=item up_nodes 290=item @nodes = up_nodes
278 291
279Return the node IDs of all nodes that are currently connected (excluding 292Return the node IDs of all nodes that are currently connected (excluding
280the node itself). 293the node itself).
281 294
282=cut 295=cut
419 # empty messages are keepalives or similar devnull-applications 432 # empty messages are keepalives or similar devnull-applications
420 }, 433 },
421); 434);
422 435
423# the node port 436# the node port
424$NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE; 437$NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
425$PORT{""} = sub { 438$PORT{""} = sub {
426 my $tag = shift; 439 my $tag = shift;
427 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } }; 440 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
428 AE::log die => "error processing node message from $SRCNODE: $@" if $@; 441 AE::log die => "error processing node message from $SRCNODE: $@" if $@;
429}; 442};
934 947
935sub configure(@) { 948sub configure(@) {
936 unshift @_, "profile" if @_ & 1; 949 unshift @_, "profile" if @_ & 1;
937 my (%kv) = @_; 950 my (%kv) = @_;
938 951
939 delete $NODE{$NODE}; # we do not support doing stuff before configure
940 _init_names;
941
942 my $profile = delete $kv{profile}; 952 my $profile = delete $kv{profile};
943 953
944 $profile = nodename 954 $profile = nodename
945 unless defined $profile; 955 unless defined $profile;
946 956
949 $SECURE = $CONFIG->{secure}; 959 $SECURE = $CONFIG->{secure};
950 960
951 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/"; 961 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
952 962
953 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n"; 963 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
964
965 my $node_obj = delete $NODE{$NODE}; # we do not support doing stuff before configure
954 966
955 $NODE = $node; 967 $NODE = $node;
956 968
957 $NODE =~ s/%n/nodename/ge; 969 $NODE =~ s/%n/nodename/ge;
958 970
959 if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) { 971 if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
960 # nodes with randomised node names do not need randomised port names 972 # nodes with randomised node names do not need randomised port names
961 $UNIQ = ""; 973 $UNIQ = "";
962 } 974 }
963 975
964 $NODE{$NODE} = $NODE{""}; 976 $node_obj->{id} = $NODE;
965 $NODE{$NODE}{id} = $NODE; 977 $NODE{$NODE} = $node_obj;
966 978
967 my $seeds = $CONFIG->{seeds}; 979 my $seeds = $CONFIG->{seeds};
968 my $binds = $CONFIG->{binds}; 980 my $binds = $CONFIG->{binds};
969 981
970 $binds ||= ["*"]; 982 $binds ||= ["*"];

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines