ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.50
Committed: Sat Mar 3 13:07:19 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.49: +3 -3 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8
9 =head1 DESCRIPTION
10
11 This module maintains a fully-meshed network between global nodes and
12 tries to have connections with all nodes in the network.
13
14 It also manages named port groups - ports can register themselves in any
15 number of groups that will be available network-wide, which is great for
16 discovering services.
17
18 Running it on one node will automatically run it on all nodes, although,
19 at the moment, the global service is started by default anyways.
20
21 =head1 GLOBALS AND FUNCTIONS
22
23 =over 4
24
25 =cut
26
27 package AnyEvent::MP::Global;
28
29 use common::sense;
30 use Carp ();
31
32 use AnyEvent ();
33
34 use AnyEvent::MP;
35 use AnyEvent::MP::Kernel;
36
37 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
38
39 #############################################################################
40 # node protocol parts for global nodes
41
42 package AnyEvent::MP::Kernel;
43
44 # TODO: this is ugly (classical use vars vs. our),
45 # maybe this should go into MP::Kernel
46
47 our %NODE;
48 our $NODE;
49 our $LISTENER;
50
51 our $GLOBAL;
52 our $MASTER;
53 our $MASTER_MON;
54 our $MASTER_TIMER;
55
56 our %GLOBAL_SLAVE;
57
58 our %GLOBAL_DB; # global db
59 our %LOCAL_DBS; # local databases of other global nodes
60 our %LOCAL_DB; # this node database
61
62 our $SRCNODE;
63 our %NODE_REQ;
64
65 # only in global code
66 our %GLOBAL_MON; # monitors {family}{"" or key}
67
68 sub other_globals() {
69 grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} }
70 }
71
72 # broadcasts a message to all other global nodes
73 sub g_broadcast {
74 snd $_, @_
75 for other_globals;
76 }
77
78 sub g_mon_check {
79 warn "g_mon_check<@_>\n";#d#
80
81 my %node = (
82 %{ $GLOBAL_MON{$_[1]}{$_[2]} },
83 %{ $GLOBAL_MON{$_[1]}{"" } },
84 %{ $GLOBAL_MON{"" }{"" } },
85 );
86
87 snd $_ => g_chg2 => $_[1], $_[2], @_ > 2 ? $_[3] : ()
88 for keys %node;
89 }
90
91 # add/replace a key in the database
92 sub g_add($$$$) {
93 $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]} =
94 $GLOBAL_DB {$_[1]}{$_[2]} = $_[3];
95
96 g_broadcast g_add => $_[1] => $_[2] => $_[3]
97 if exists $GLOBAL_SLAVE{$_[0]};
98
99 warn "g_add<@_>\n";#d#
100 &g_mon_check;
101 }
102
103 # delete a key from the database
104 sub g_del($$$) {
105 delete $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]};
106
107 g_broadcast g_del => $_[1] => $_[2]
108 if exists $GLOBAL_SLAVE{$_[0]};
109
110 delete $GLOBAL_DB{$_[1]}{$_[2]};
111
112 # check if other node maybe still has the key, then we don't delete, but add
113 for (values %LOCAL_DBS) {
114 if (exists $_->{$_[1]}{$_[2]}) {
115 $GLOBAL_DB{$_[1]}{$_[2]} = $_->{$_[1]}{$_[2]};
116 last;
117 }
118 }
119
120 warn "g_del<@_>\n";#d#
121 &g_mon_check;
122 }
123
124 # delete all keys from a database
125 sub g_clr($) {
126 my ($node) = @_;
127
128 my $db = $LOCAL_DBS{$node};
129 while (my ($f, $k) = each %$db) {
130 g_del $node, $f => $_
131 for keys %$k;
132 }
133
134 delete $LOCAL_DBS{$node};
135 }
136
137 # set the whole (node-local) database - previous value must be empty
138 sub g_set($$) {
139 my ($node, $db) = @_;
140
141 while (my ($f, $k) = each %$db) {
142 g_add $node, $f => $_ => delete $k->{$_}
143 for keys %$k;
144 }
145 }
146
147 # gather node databases from slaves
148
149 # other node wants to make us the master
150 $NODE_REQ{g_slave} = sub {
151 my ($db) = @_;
152
153 my $node = $SRCNODE->{id};
154 undef $GLOBAL_SLAVE{$node};
155 g_set $node, $db;
156 };
157
158 $NODE_REQ{g_add} = sub {
159 &g_add ($SRCNODE->{id}, @_);
160 };
161
162 $NODE_REQ{g_del} = sub {
163 &g_del ($SRCNODE->{id}, @_);
164 };
165
166 $NODE_REQ{g_set} = sub {
167 g_set $SRCNODE->{id}, $_[0];
168 };
169
170 $NODE_REQ{g_find} = sub {
171 my ($node) = @_;
172
173 snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node};
174 };
175
176 # monitoring
177
178 sub g_slave_disconnect($) {
179 my ($node) = @_;
180
181 g_clr $node;
182
183 if (my $mon = delete $GLOBAL_SLAVE{$node}) {
184 while (my ($f, $fv) = each %$mon) {
185 delete $GLOBAL_MON{$f}{$_}
186 for keys %$fv;
187 }
188 }
189 }
190
191 # g_mon0 family key - stop monitoring
192 $NODE_REQ{g_mon0} = sub {
193 delete $GLOBAL_MON{$_[0]}{$_[1]}{$SRCNODE->{id}};
194 delete $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]}{$_[1]};
195 };
196
197 # g_mon1 family key - start monitoring
198 $NODE_REQ{g_mon1} = sub {
199 undef $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]}{$_[1]};
200 undef $GLOBAL_MON{$_[0]}{$_[1]}{$SRCNODE->{id}};
201
202 snd $SRCNODE->{id}, g_chg1 => $_[0], $_[1],
203 defined $_[1] ? $GLOBAL_DB{$_[0]}{$_[1]} : $GLOBAL_DB{$_[0]};
204 };
205
206 #############################################################################
207 # switch to global mode
208
209 $GLOBAL = 1;
210 $MASTER = $NODE;
211 undef $GLOBAL_SLAVE{$NODE}; # we are our own master (and slave)
212 $LOCAL_DBS{$NODE} = { %LOCAL_DB };
213
214 # regularly try to connect to global nodes - maybe use seeding code?
215 $MASTER_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub {
216 (add_node $_)->connect
217 for keys %{ $GLOBAL_DB{"'g"} };
218 };
219
220 # instantly connect to other global nodes when we learn of them
221 # so we don't have to wait for the timer.
222 #TODO
223 # $GLOBAL_MON{"'g"}{""}{""} = sub {
224 # (add_node $_[1])->connect;
225 # };
226
227 # delete slaves on node-down
228 # clear slave db on node-down
229 $MASTER_MON = mon_nodes sub {
230 g_slave_disconnect $_[0] unless $_[1];
231 };
232
233 # tell everybody who connects that we are a global node
234 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
235 $_[0]{local_greeting}{global} = 1;
236 };
237
238 # connect from a global node
239 sub g_global_connect {
240 my ($node) = @_;
241
242 # we need to set this currently, as to avoid race conditions
243 # because it takes a while until the other global node tells us it is global.
244
245 undef $GLOBAL_DB{"'g"}{$node};
246 undef $LOCAL_DBS{$node}{"'g"}{$node};
247
248 # global nodes send all local databases, merged,
249 # as their local database to global nodes
250 my %db;
251
252 for (values %LOCAL_DBS) {
253 while (my ($f, $fv) = each %$_) {
254 while (my ($k, $kv) = each %$fv) {
255 $db{$f}{$k} = $kv;
256 }
257 }
258 }
259
260 snd $node => g_set => \%db;
261 }
262
263 # send our database to every global node that connects
264 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
265 return unless $_[0]{remote_greeting}{global};
266
267 g_global_connect $_[0]{remote_node};
268 };
269
270 # tell our master that we are global now
271 for (values %NODE) {
272 if ($_->{transport} && $_->{transport}{remote_greeting}{global}) {
273 snd $_->{id} => "g_global";
274 g_global_connect $_->{id};
275 }
276 }
277
278 $NODE_REQ{g_global} = sub {
279 g_slave_disconnect $SRCNODE->{id};
280 $SRCNODE->{transport}{remote_greeting}{global} = 1;
281 g_global_connect $SRCNODE->{id};
282 };
283
284 # now add us to the set of global nodes
285 db_set "'g" => $NODE => undef;
286
287 =back
288
289 =head1 SEE ALSO
290
291 L<AnyEvent::MP>.
292
293 =head1 AUTHOR
294
295 Marc Lehmann <schmorp@schmorp.de>
296 http://home.schmorp.de/
297
298 =cut
299
300 1
301