ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.51
Committed: Sat Mar 3 19:43:41 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.50: +36 -36 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8
9 =head1 DESCRIPTION
10
11 This module maintains a fully-meshed network between global nodes and
12 tries to have connections with all nodes in the network.
13
14 It also manages named port groups - ports can register themselves in any
15 number of groups that will be available network-wide, which is great for
16 discovering services.
17
18 Running it on one node will automatically run it on all nodes, although,
19 at the moment, the global service is started by default anyways.
20
21 =head1 GLOBALS AND FUNCTIONS
22
23 =over 4
24
25 =cut
26
27 package AnyEvent::MP::Global;
28
29 use common::sense;
30 use Carp ();
31
32 use AnyEvent ();
33
34 use AnyEvent::MP;
35 use AnyEvent::MP::Kernel;
36
37 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
38
39 #############################################################################
40 # node protocol parts for global nodes
41
42 package AnyEvent::MP::Kernel;
43
44 # TODO: this is ugly (classical use vars vs. our),
45 # maybe this should go into MP::Kernel
46
47 our %NODE;
48 our $NODE;
49 our $LISTENER;
50
51 our $GLOBAL;
52 our $MASTER;
53 our $MASTER_MON;
54 our $MASTER_TIMER;
55
56 our %GLOBAL_SLAVE;
57
58 our %GLOBAL_DB; # global db
59 our %LOCAL_DBS; # local databases of other global nodes
60 our %LOCAL_DB; # this node database
61
62 our $SRCNODE;
63 our %NODE_REQ;
64
65 # only in global code
66 our %GLOBAL_MON; # monitors {family}
67
68 sub other_globals() {
69 grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} }
70 }
71
72 # broadcasts a message to all other global nodes
73 sub g_broadcast {
74 snd $_, @_
75 for other_globals;
76 }
77
78 # add/replace a key in the database
79 sub g_add($$$$) {
80 $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]} =
81 $GLOBAL_DB {$_[1]}{$_[2]} = $_[3];
82
83 g_broadcast g_add => $_[1] => $_[2] => $_[3]
84 if exists $GLOBAL_SLAVE{$_[0]};
85
86 # tell subscribers we have added or replaced a key
87 snd $_ => g_chg2 => $_[1], $_[2], $_[3]
88 for keys %{ $GLOBAL_MON{$_[1]} };
89 }
90
91 # delete a key from the database
92 sub g_del($$$) {
93 delete $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]};
94 delete $LOCAL_DBS{$_[0]}{$_[1]} unless %{ $LOCAL_DBS{$_[0]}{$_[1]} };
95
96 g_broadcast g_del => $_[1] => $_[2]
97 if exists $GLOBAL_SLAVE{$_[0]};
98
99 delete $GLOBAL_DB{$_[1]}{$_[2]};
100 delete $GLOBAL_DB{$_[1]} unless %{ $GLOBAL_DB{$_[1]} }; # could be moved below for
101
102 # check if other node maybe still has the key, then we don't delete, but add
103 for (values %LOCAL_DBS) {
104 if (exists $_->{$_[1]}{$_[2]}) {
105 my $val = $GLOBAL_DB{$_[1]}{$_[2]} = $_->{$_[1]}{$_[2]};
106
107 # tell subscribers we have added or replaced a key
108 snd $_ => g_chg2 => $_[1], $_[2], $val
109 for keys %{ $GLOBAL_MON{$_[1]} };
110
111 last;
112 }
113 }
114
115 # tell subscribers key is actually gone
116 snd $_ => g_chg2 => $_[1], $_[2]
117 for keys %{ $GLOBAL_MON{$_[1]} };
118 }
119
120 # set the whole (node-local) database - previous value must be empty
121 sub g_set($$) {
122 my ($node, $db) = @_;
123
124 while (my ($f, $k) = each %$db) {
125 g_add $node, $f => $_ => delete $k->{$_}
126 for keys %$k;
127 }
128 }
129
130 # delete all keys from a database
131 sub g_clr($) {
132 my ($node) = @_;
133
134 my $db = $LOCAL_DBS{$node};
135 while (my ($f, $k) = each %$db) {
136 g_del $node, $f => $_
137 for keys %$k;
138 }
139
140 delete $LOCAL_DBS{$node};
141 }
142
143 # gather node databases from slaves
144
145 # other node wants to make us the master
146 $NODE_REQ{g_slave} = sub {
147 my ($db) = @_;
148
149 my $node = $SRCNODE->{id};
150 undef $GLOBAL_SLAVE{$node};
151 g_set $node, $db;
152 };
153
154 $NODE_REQ{g_add} = sub {
155 &g_add ($SRCNODE->{id}, @_);
156 };
157
158 $NODE_REQ{g_del} = sub {
159 &g_del ($SRCNODE->{id}, @_);
160 };
161
162 $NODE_REQ{g_set} = sub {
163 g_set $SRCNODE->{id}, $_[0];
164 };
165
166 $NODE_REQ{g_find} = sub {
167 my ($node) = @_;
168
169 snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node};
170 };
171
172 # monitoring
173
174 sub g_slave_disconnect($) {
175 my ($node) = @_;
176
177 g_clr $node;
178
179 if (my $mon = delete $GLOBAL_SLAVE{$node}) {
180 while (my ($f, $fv) = each %$mon) {
181 delete $GLOBAL_MON{$f}{$_}
182 for keys %$fv;
183
184 delete $GLOBAL_MON{$f}
185 unless %{ $GLOBAL_MON{$f} };
186 }
187 }
188 }
189
190 # g_mon0 family - stop monitoring
191 $NODE_REQ{g_mon0} = sub {
192 delete $GLOBAL_MON{$_[0]}{$SRCNODE->{id}};
193 delete $GLOBAL_MON{$_[0]} unless %{ $GLOBAL_MON{$_[0]} };
194
195 delete $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]};
196 };
197
198 # g_mon1 family key - start monitoring
199 $NODE_REQ{g_mon1} = sub {
200 undef $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]};
201 undef $GLOBAL_MON{$_[0]}{$SRCNODE->{id}};
202
203 snd $SRCNODE->{id}, g_chg1 => $_[0], $GLOBAL_DB{$_[0]};
204 };
205
206 #############################################################################
207 # switch to global mode
208
209 $GLOBAL = 1;
210 $MASTER = $NODE;
211 undef $GLOBAL_SLAVE{$NODE}; # we are our own master (and slave)
212 $LOCAL_DBS{$NODE} = { %LOCAL_DB };
213
214 # regularly try to connect to global nodes - maybe use seeding code?
215 $MASTER_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub {
216 (add_node $_)->connect
217 for keys %{ $GLOBAL_DB{"'g"} };
218 };
219
220 # instantly connect to other global nodes when we learn of them
221 # so we don't have to wait for the timer.
222 #TODO
223 # $GLOBAL_MON{"'g"}{""}{""} = sub {
224 # (add_node $_[1])->connect;
225 # };
226
227 # delete slaves on node-down
228 # clear slave db on node-down
229 $MASTER_MON = mon_nodes sub {
230 g_slave_disconnect $_[0] unless $_[1];
231 };
232
233 # tell everybody who connects that we are a global node
234 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
235 $_[0]{local_greeting}{global} = 1;
236 };
237
238 # connect from a global node
239 sub g_global_connect {
240 my ($node) = @_;
241
242 # we need to set this currently, as to avoid race conditions
243 # because it takes a while until the other global node tells us it is global.
244
245 undef $GLOBAL_DB{"'g"}{$node};
246 undef $LOCAL_DBS{$node}{"'g"}{$node};
247
248 # global nodes send all local databases, merged,
249 # as their local database to global nodes
250 my %db;
251
252 for (values %LOCAL_DBS) {
253 while (my ($f, $fv) = each %$_) {
254 while (my ($k, $kv) = each %$fv) {
255 $db{$f}{$k} = $kv;
256 }
257 }
258 }
259
260 snd $node => g_set => \%db;
261 }
262
263 # send our database to every global node that connects
264 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
265 return unless $_[0]{remote_greeting}{global};
266
267 g_global_connect $_[0]{remote_node};
268 };
269
270 # tell our master that we are global now
271 for (values %NODE) {
272 if ($_->{transport} && $_->{transport}{remote_greeting}{global}) {
273 snd $_->{id} => "g_global";
274 g_global_connect $_->{id};
275 }
276 }
277
278 $NODE_REQ{g_global} = sub {
279 g_slave_disconnect $SRCNODE->{id};
280 $SRCNODE->{transport}{remote_greeting}{global} = 1;
281 g_global_connect $SRCNODE->{id};
282 };
283
284 # now add us to the set of global nodes
285 db_set "'g" => $NODE => undef;
286
287 =back
288
289 =head1 SEE ALSO
290
291 L<AnyEvent::MP>.
292
293 =head1 AUTHOR
294
295 Marc Lehmann <schmorp@schmorp.de>
296 http://home.schmorp.de/
297
298 =cut
299
300 1
301