… | |
… | |
4 | |
4 | |
5 | =head1 SYNOPSIS |
5 | =head1 SYNOPSIS |
6 | |
6 | |
7 | use AnyEvent::MP::Kernel; |
7 | use AnyEvent::MP::Kernel; |
8 | |
8 | |
|
|
9 | $AnyEvent::MP::Kernel::SRCNODE # contains msg origin node id, for debugging |
|
|
10 | |
|
|
11 | snd_to_func $node, $func, @args # send msg to function |
|
|
12 | snd_on $node, @msg # snd message again (relay) |
|
|
13 | eval_on $node, $string[, @reply] # execute perl code on another node |
|
|
14 | |
|
|
15 | node_is_up $nodeid # return true if a node is connected |
|
|
16 | @nodes = up_nodes # return a list of all connected nodes |
|
|
17 | $guard = mon_nodes $callback->($node, $is_up, @reason) # connections up/downs |
|
|
18 | |
9 | =head1 DESCRIPTION |
19 | =head1 DESCRIPTION |
10 | |
20 | |
11 | This module provides most of the basic functionality of AnyEvent::MP, |
21 | This module implements most of the inner workings of AnyEvent::MP. It |
12 | exposed through higher level interfaces such as L<AnyEvent::MP> and |
22 | offers mostly lower-level functions that deal with network connectivity |
13 | L<Coro::MP>. |
23 | and special requests. |
14 | |
24 | |
15 | This module is mainly of interest when knowledge about connectivity, |
25 | You normally interface with AnyEvent::MP through a higher level interface |
16 | connected nodes etc. is sought. |
26 | such as L<AnyEvent::MP> and L<Coro::MP>, although there is nothing wrong |
|
|
27 | with using the functions from this module. |
17 | |
28 | |
18 | =head1 GLOBALS AND FUNCTIONS |
29 | =head1 GLOBALS AND FUNCTIONS |
19 | |
30 | |
20 | =over 4 |
31 | =over 4 |
21 | |
32 | |
… | |
… | |
32 | use AnyEvent::MP::Node; |
43 | use AnyEvent::MP::Node; |
33 | use AnyEvent::MP::Transport; |
44 | use AnyEvent::MP::Transport; |
34 | |
45 | |
35 | use base "Exporter"; |
46 | use base "Exporter"; |
36 | |
47 | |
37 | our @EXPORT_OK = qw( |
48 | # for re-export in AnyEvent::MP and Coro::MP |
38 | %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID |
|
|
39 | ); |
|
|
40 | |
|
|
41 | our @EXPORT = qw( |
49 | our @EXPORT_API = qw( |
42 | add_node load_func snd_to_func snd_on eval_on |
50 | NODE $NODE |
43 | |
|
|
44 | NODE $NODE node_of snd kil port_is_local |
|
|
45 | configure |
51 | configure |
46 | up_nodes mon_nodes node_is_up |
52 | node_of port_is_local |
|
|
53 | snd kil |
47 | db_set db_del |
54 | db_set db_del |
48 | db_mon db_family db_keys db_values |
55 | db_mon db_family db_keys db_values |
|
|
56 | ); |
|
|
57 | |
|
|
58 | our @EXPORT_OK = ( |
|
|
59 | # these are internal |
|
|
60 | qw( |
|
|
61 | %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID |
|
|
62 | add_node load_func |
|
|
63 | ), |
|
|
64 | @EXPORT_API, |
|
|
65 | ); |
|
|
66 | |
|
|
67 | our @EXPORT = qw( |
|
|
68 | snd_to_func snd_on eval_on |
|
|
69 | port_is_local |
|
|
70 | up_nodes mon_nodes node_is_up |
49 | ); |
71 | ); |
50 | |
72 | |
51 | sub load_func($) { |
73 | sub load_func($) { |
52 | my $func = $_[0]; |
74 | my $func = $_[0]; |
53 | |
75 | |
… | |
… | |
77 | |
99 | |
78 | sub nonce62($) { |
100 | sub nonce62($) { |
79 | join "", map $alnum[rand 62], 1 .. $_[0] |
101 | join "", map $alnum[rand 62], 1 .. $_[0] |
80 | } |
102 | } |
81 | |
103 | |
82 | sub gen_uniq { |
|
|
83 | my $now = AE::now; |
|
|
84 | (join "", |
|
|
85 | map $alnum[$_], |
|
|
86 | $$ / 62 % 62, |
|
|
87 | $$ % 62, |
|
|
88 | (int $now ) % 62, |
|
|
89 | (int $now * 100) % 62, |
|
|
90 | (int $now * 10000) % 62, |
|
|
91 | ) . nonce62 4; |
|
|
92 | } |
|
|
93 | |
|
|
94 | our $CONFIG; # this node's configuration |
104 | our $CONFIG; # this node's configuration |
95 | our $SECURE; |
105 | our $SECURE; |
96 | |
106 | |
97 | our $RUNIQ; # remote uniq value |
107 | our $RUNIQ; # remote uniq value |
98 | our $UNIQ; # per-process/node unique cookie |
108 | our $UNIQ; # per-process/node unique cookie |
… | |
… | |
109 | our %BINDS; |
119 | our %BINDS; |
110 | our $BINDS; # our listeners, as arrayref |
120 | our $BINDS; # our listeners, as arrayref |
111 | |
121 | |
112 | our $SRCNODE; # holds the sending node _object_ during _inject |
122 | our $SRCNODE; # holds the sending node _object_ during _inject |
113 | |
123 | |
114 | sub _init_names { |
124 | # initialise names for non-networked operation |
|
|
125 | { |
115 | # ~54 bits, for local port names, lowercase $ID appended |
126 | # ~54 bits, for local port names, lowercase $ID appended |
116 | $UNIQ = gen_uniq; |
127 | my $now = AE::now; |
|
|
128 | $UNIQ = |
|
|
129 | (join "", |
|
|
130 | map $alnum[$_], |
|
|
131 | $$ / 62 % 62, |
|
|
132 | $$ % 62, |
|
|
133 | (int $now ) % 62, |
|
|
134 | (int $now * 100) % 62, |
|
|
135 | (int $now * 10000) % 62, |
|
|
136 | ) . nonce62 4 |
|
|
137 | ; |
117 | |
138 | |
118 | # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes |
139 | # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes |
119 | $RUNIQ = nonce62 10; |
140 | $RUNIQ = nonce62 10; |
120 | $RUNIQ =~ s/(.)$/\U$1/; |
141 | $RUNIQ =~ s/(.)$/\U$1/; |
121 | |
142 | |
122 | $NODE = "anon/$RUNIQ"; |
143 | $NODE = ""; |
123 | } |
144 | } |
124 | |
|
|
125 | _init_names; |
|
|
126 | |
145 | |
127 | sub NODE() { |
146 | sub NODE() { |
128 | $NODE |
147 | $NODE |
129 | } |
148 | } |
130 | |
149 | |
… | |
… | |
185 | |
204 | |
186 | ($NODE{$nodeid} || add_node $nodeid) |
205 | ($NODE{$nodeid} || add_node $nodeid) |
187 | ->{send} (["$portid", @_]); |
206 | ->{send} (["$portid", @_]); |
188 | } |
207 | } |
189 | |
208 | |
190 | =item $is_local = port_is_local $port |
|
|
191 | |
|
|
192 | Returns true iff the port is a local port. |
|
|
193 | |
|
|
194 | =cut |
|
|
195 | |
|
|
196 | sub port_is_local($) { |
209 | sub port_is_local($) { |
197 | my ($nodeid, undef) = split /#/, $_[0], 2; |
210 | my ($nodeid, undef) = split /#/, $_[0], 2; |
198 | |
211 | |
199 | $NODE{$nodeid} == $NODE{""} |
212 | $nodeid eq $NODE |
200 | } |
213 | } |
201 | |
214 | |
202 | =item snd_to_func $node, $func, @args |
215 | =item snd_to_func $node, $func, @args |
203 | |
216 | |
204 | Expects a node ID and a name of a function. Asynchronously tries to call |
217 | Expects a node ID and a name of a function. Asynchronously tries to call |
… | |
… | |
256 | } |
269 | } |
257 | |
270 | |
258 | ############################################################################# |
271 | ############################################################################# |
259 | # node monitoring and info |
272 | # node monitoring and info |
260 | |
273 | |
261 | =item node_is_up $nodeid |
274 | =item $bool = node_is_up $nodeid |
262 | |
275 | |
263 | Returns true if the given node is "up", that is, the kernel thinks it has |
276 | Returns true if the given node is "up", that is, the kernel thinks it has |
264 | a working connection to it. |
277 | a working connection to it. |
265 | |
278 | |
266 | If the node is up, returns C<1>. If the node is currently connecting or |
279 | If the node is up, returns C<1>. If the node is currently connecting or |
… | |
… | |
272 | sub node_is_up($) { |
285 | sub node_is_up($) { |
273 | ($NODE{$_[0]} or return)->{transport} |
286 | ($NODE{$_[0]} or return)->{transport} |
274 | ? 1 : 0 |
287 | ? 1 : 0 |
275 | } |
288 | } |
276 | |
289 | |
277 | =item up_nodes |
290 | =item @nodes = up_nodes |
278 | |
291 | |
279 | Return the node IDs of all nodes that are currently connected (excluding |
292 | Return the node IDs of all nodes that are currently connected (excluding |
280 | the node itself). |
293 | the node itself). |
281 | |
294 | |
282 | =cut |
295 | =cut |
… | |
… | |
419 | # empty messages are keepalives or similar devnull-applications |
432 | # empty messages are keepalives or similar devnull-applications |
420 | }, |
433 | }, |
421 | ); |
434 | ); |
422 | |
435 | |
423 | # the node port |
436 | # the node port |
424 | $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE; |
437 | $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE; |
425 | $PORT{""} = sub { |
438 | $PORT{""} = sub { |
426 | my $tag = shift; |
439 | my $tag = shift; |
427 | eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } }; |
440 | eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } }; |
428 | AE::log die => "error processing node message from $SRCNODE: $@" if $@; |
441 | AE::log die => "error processing node message from $SRCNODE: $@" if $@; |
429 | }; |
442 | }; |
… | |
… | |
934 | |
947 | |
935 | sub configure(@) { |
948 | sub configure(@) { |
936 | unshift @_, "profile" if @_ & 1; |
949 | unshift @_, "profile" if @_ & 1; |
937 | my (%kv) = @_; |
950 | my (%kv) = @_; |
938 | |
951 | |
939 | delete $NODE{$NODE}; # we do not support doing stuff before configure |
|
|
940 | _init_names; |
|
|
941 | |
|
|
942 | my $profile = delete $kv{profile}; |
952 | my $profile = delete $kv{profile}; |
943 | |
953 | |
944 | $profile = nodename |
954 | $profile = nodename |
945 | unless defined $profile; |
955 | unless defined $profile; |
946 | |
956 | |
… | |
… | |
949 | $SECURE = $CONFIG->{secure}; |
959 | $SECURE = $CONFIG->{secure}; |
950 | |
960 | |
951 | my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/"; |
961 | my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/"; |
952 | |
962 | |
953 | $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n"; |
963 | $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n"; |
|
|
964 | |
|
|
965 | my $node_obj = delete $NODE{$NODE}; # we do not support doing stuff before configure |
954 | |
966 | |
955 | $NODE = $node; |
967 | $NODE = $node; |
956 | |
968 | |
957 | $NODE =~ s/%n/nodename/ge; |
969 | $NODE =~ s/%n/nodename/ge; |
958 | |
970 | |
959 | if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) { |
971 | if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) { |
960 | # nodes with randomised node names do not need randomised port names |
972 | # nodes with randomised node names do not need randomised port names |
961 | $UNIQ = ""; |
973 | $UNIQ = ""; |
962 | } |
974 | } |
963 | |
975 | |
964 | $NODE{$NODE} = $NODE{""}; |
976 | $node_obj->{id} = $NODE; |
965 | $NODE{$NODE}{id} = $NODE; |
977 | $NODE{$NODE} = $node_obj; |
966 | |
978 | |
967 | my $seeds = $CONFIG->{seeds}; |
979 | my $seeds = $CONFIG->{seeds}; |
968 | my $binds = $CONFIG->{binds}; |
980 | my $binds = $CONFIG->{binds}; |
969 | |
981 | |
970 | $binds ||= ["*"]; |
982 | $binds ||= ["*"]; |