… | |
… | |
52 | { |
52 | { |
53 | package AnyEvent::MP::Kernel; |
53 | package AnyEvent::MP::Kernel; |
54 | |
54 | |
55 | # TODO: this is ugly, maybe this should go into MP::Kernel or a separate module #d# |
55 | # TODO: this is ugly, maybe this should go into MP::Kernel or a separate module #d# |
56 | |
56 | |
|
|
57 | our %NODE; |
|
|
58 | our $NODE; |
|
|
59 | our $LISTENER; |
|
|
60 | |
57 | our $GLOBAL; |
61 | our $GLOBAL; |
58 | our $MASTER; |
62 | our $MASTER; |
59 | our $NODE_ADDR; |
63 | |
60 | our $GLOBAL_ADDR; |
|
|
61 | our $NODE; |
|
|
62 | our %GLOBAL_SLAVE; |
64 | our %GLOBAL_SLAVE; |
63 | our $GLOBAL_MON; |
65 | our $GLOBAL_MON; |
64 | our $LISTENER; |
|
|
65 | |
66 | |
|
|
67 | our %GLOBAL_DB; # global db |
|
|
68 | our %LOCAL_DBS; # local databases of other global nodes |
|
|
69 | our %LOCAL_DB; # this node database |
|
|
70 | |
|
|
71 | our $SRCNODE; |
|
|
72 | our %node_req; |
|
|
73 | |
|
|
74 | # broadcasts a message to all other global nodes |
|
|
75 | sub g_broadcast { |
|
|
76 | snd $_, @_ |
|
|
77 | for other_globals; |
|
|
78 | } |
|
|
79 | |
|
|
80 | sub g_mon_check { |
|
|
81 | warn "g_mon_check<@_>\n";#d# |
|
|
82 | use Data::Dump; ddx \%GLOBAL_DB;#d# |
|
|
83 | } |
|
|
84 | |
|
|
85 | # add/replace a key in the database |
|
|
86 | sub g_add($$$$) { |
|
|
87 | $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]} = |
|
|
88 | $GLOBAL_DB {$_[1]}{$_[2]} = $_[3]; |
|
|
89 | |
|
|
90 | g_broadcast g_add => $_[1] => $_[2] => $_[3] |
|
|
91 | if exists $GLOBAL_SLAVE{$_[0]}; |
|
|
92 | |
|
|
93 | warn "g_add<@_>\n";#d# |
|
|
94 | &g_mon_check; |
|
|
95 | } |
|
|
96 | |
|
|
97 | # delete a key from the database |
|
|
98 | sub g_del($$$) { |
|
|
99 | delete $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]}; |
|
|
100 | |
|
|
101 | g_broadcast g_del => $_[1] => $_[2] |
|
|
102 | if exists $GLOBAL_SLAVE{$_[0]}; |
|
|
103 | |
|
|
104 | delete $GLOBAL_DB{$_[1]}{$_[2]}; |
|
|
105 | |
|
|
106 | # check if other node maybe still has the key, then we don't delete, but add |
|
|
107 | for (values %LOCAL_DBS) { |
|
|
108 | if (exists $_->{$_[1]}{$_[2]}) { |
|
|
109 | $GLOBAL_DB{$_[1]}{$_[2]} = $_->{$_[1]}{$_[2]}; |
|
|
110 | last; |
|
|
111 | } |
|
|
112 | } |
|
|
113 | |
|
|
114 | warn "g_del<@_>\n";#d# |
|
|
115 | &g_mon_check; |
|
|
116 | } |
|
|
117 | |
|
|
118 | # delete all keys from a database |
|
|
119 | sub g_clr($) { |
|
|
120 | my ($node) = @_; |
|
|
121 | |
|
|
122 | my $db = $LOCAL_DBS{$node}; |
|
|
123 | while (my ($f, $k) = each %$db) { |
|
|
124 | g_del $node, $f => $_ |
|
|
125 | for keys %$k; |
|
|
126 | } |
|
|
127 | |
|
|
128 | delete $LOCAL_DBS{$node}; |
|
|
129 | } |
|
|
130 | |
|
|
131 | # set the whole (node-local) database - previous value must be empty |
|
|
132 | sub g_set($$) { |
|
|
133 | my ($node, $db) = @_; |
|
|
134 | |
|
|
135 | while (my ($f, $k) = each %$db) { |
|
|
136 | g_add $node, $f => $_ => delete $k->{$_} |
|
|
137 | for keys %$k; |
|
|
138 | } |
|
|
139 | } |
|
|
140 | |
|
|
141 | # gather node databases from slaves |
|
|
142 | |
|
|
143 | # other node wants to make us the master |
|
|
144 | $node_req{g_slave} = sub { |
|
|
145 | my ($db) = @_; |
|
|
146 | |
|
|
147 | my $node = $SRCNODE->{id}; |
|
|
148 | undef $GLOBAL_SLAVE{$node}; |
|
|
149 | g_set $node, $db; |
|
|
150 | }; |
|
|
151 | |
|
|
152 | $node_req{g_add} = sub { |
|
|
153 | &g_add ($SRCNODE->{id}, @_); |
|
|
154 | }; |
|
|
155 | |
|
|
156 | $node_req{g_del} = sub { |
|
|
157 | &g_del ($SRCNODE->{id}, @_); |
|
|
158 | }; |
|
|
159 | |
|
|
160 | $node_req{g_set} = sub { |
|
|
161 | g_set $SRCNODE->{id}, $_[0]; |
|
|
162 | }; |
|
|
163 | |
|
|
164 | $node_req{g_find} = sub { |
|
|
165 | my ($node) = @_; |
|
|
166 | |
|
|
167 | snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node}; |
|
|
168 | }; |
|
|
169 | |
|
|
170 | ############################################################################# |
66 | # switch to global mode |
171 | # switch to global mode |
|
|
172 | |
67 | $GLOBAL = 1; |
173 | $GLOBAL = 1; |
68 | $MASTER = $NODE; |
174 | $MASTER = $NODE; |
|
|
175 | undef $GLOBAL_SLAVE{$NODE}; # we are our own master (and slave) |
69 | undef $GLOBAL_MON; |
176 | undef $GLOBAL_MON; |
70 | |
177 | |
71 | $GLOBAL_ADDR->{$NODE} = $LISTENER; |
178 | # delete slaves on nodw-down |
72 | |
179 | # clear slave db on node-down |
73 | $GLOBAL_MON = mon_nodes sub { |
180 | $GLOBAL_MON = mon_nodes sub { |
74 | return if $_[1]; |
181 | return if $_[1]; |
75 | |
182 | |
76 | delete $NODE_ADDR->{$_[0]}; |
|
|
77 | |
|
|
78 | if (delete $GLOBAL_ADDR->{$_[0]}) { |
183 | delete $GLOBAL_SLAVE{$_[0]}; |
79 | # if the node is global, tell our slaves |
184 | g_clr $_[0]; |
80 | |
185 | ldb_set "'l" => $_[0]; |
81 | our %GLOBAL_SLAVE; # ugh, will be in AnyEvent::MP::Global |
186 | # clear listener and global database entries on node-down |
82 | snd $_, g_del => $_[0] |
187 | #ldb_set "'g" => $_[0]; # really? |
83 | for keys %GLOBAL_SLAVE; |
|
|
84 | } |
|
|
85 | }; |
188 | }; |
86 | |
189 | |
87 | # tell everybody who connects that we are a global node |
190 | # tell everybody who connects that we are a global node |
88 | push @AnyEvent::MP::Transport::HOOK_GREET, sub { |
191 | push @AnyEvent::MP::Transport::HOOK_GREET, sub { |
89 | $_[0]{local_greeting}{global} = 1; |
192 | $_[0]{local_greeting}{global} = 1; |
90 | }; |
193 | }; |
91 | |
194 | |
92 | # tell every global node that connects that we are global too |
195 | # send our database to every global node that connects |
93 | push @AnyEvent::MP::Transport::HOOK_CONNECT, sub { |
196 | push @AnyEvent::MP::Transport::HOOK_CONNECT, sub { |
94 | snd $_[0], g_add => $NODE, $LISTENER |
|
|
95 | if $_[0]{remote_greeting}{global}; |
197 | return unless $_[0]{remote_greeting}{global}; |
|
|
198 | |
|
|
199 | # global nodes send all local databases, merged, |
|
|
200 | # as their local database to global nodes |
|
|
201 | my %db; |
|
|
202 | |
|
|
203 | for (values %LOCAL_DBS) { |
|
|
204 | while (my ($f, $k) = each %$_) { |
|
|
205 | while (my ($kk, $kv) = each %$k) { |
|
|
206 | $db{$f}{$kk} = $kv; |
|
|
207 | } |
|
|
208 | } |
|
|
209 | } |
|
|
210 | |
|
|
211 | snd $_[0]{remote_node} => g_set => \%db; |
|
|
212 | }; |
|
|
213 | |
|
|
214 | # # tell our master else that we are global now |
|
|
215 | # for (values %NODE) { |
|
|
216 | # if ($_->{transport} && $_->{transport}{remote_greeting}{global}) { |
|
|
217 | # snd $_->{id} => "g_global"; |
|
|
218 | # snd $_->{id} => g_set => \%LOCAL_DB; |
|
|
219 | # } |
96 | }; |
220 | # } |
|
|
221 | # |
|
|
222 | #d#d disconnect everybody to bootstrap development grr |
97 | |
223 | |
98 | # tell everybody else that we are global now |
224 | $_->transport_error # remove Self::transport_error |
99 | snd $_ => g_add => $NODE, $LISTENER |
225 | for values %NODE; |
100 | for up_nodes; |
226 | |
|
|
227 | # now add us to the set of global nodes |
|
|
228 | ldb_set "'g" => $NODE => undef; |
|
|
229 | g_set $NODE => \%LOCAL_DB; |
101 | } |
230 | } |
102 | |
231 | |
103 | =item $guard = grp_reg $group, $port |
232 | =item $guard = grp_reg $group, $port |
104 | |
233 | |
105 | Register the given (local!) port in the named global group C<$group>. |
234 | Register the given (local!) port in the named global group C<$group>. |