ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.55
Committed: Thu Mar 8 21:37:51 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.54: +15 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8    
9     =head1 DESCRIPTION
10    
11 root 1.42 This module maintains a fully-meshed network between global nodes and
12     tries to have connections with all nodes in the network.
13 root 1.1
14 root 1.15 It also manages named port groups - ports can register themselves in any
15     number of groups that will be available network-wide, which is great for
16     discovering services.
17 root 1.1
18 root 1.15 Running it on one node will automatically run it on all nodes, although,
19     at the moment, the global service is started by default anyways.
20 root 1.3
21 root 1.1 =head1 GLOBALS AND FUNCTIONS
22    
23     =over 4
24    
25     =cut
26    
27     package AnyEvent::MP::Global;
28    
29     use common::sense;
30     use Carp ();
31    
32 root 1.8 use AnyEvent ();
33 root 1.4
34 root 1.1 use AnyEvent::MP;
35     use AnyEvent::MP::Kernel;
36 root 1.20
37 root 1.5 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
38    
39 root 1.8 #############################################################################
40 root 1.44 # node protocol parts for global nodes
41 root 1.8
42 root 1.49 package AnyEvent::MP::Kernel;
43 root 1.8
44 root 1.49 # TODO: this is ugly (classical use vars vs. our),
45     # maybe this should go into MP::Kernel
46 root 1.26
47 root 1.49 our %NODE;
48     our $NODE;
49 root 1.46
50 root 1.49 our $GLOBAL;
51     our $MASTER;
52     our $MASTER_MON;
53     our $MASTER_TIMER;
54 root 1.46
55 root 1.49 our %GLOBAL_SLAVE;
56 root 1.26
57 root 1.49 our %GLOBAL_DB; # global db
58     our %LOCAL_DBS; # local databases of other global nodes
59     our %LOCAL_DB; # this node database
60 root 1.46
61 root 1.49 our $SRCNODE;
62     our %NODE_REQ;
63 root 1.46
64 root 1.49 # only in global code
65 root 1.51 our %GLOBAL_MON; # monitors {family}
66 root 1.47
67 root 1.49 sub other_globals() {
68     grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} }
69     }
70 root 1.47
71 root 1.49 # broadcasts a message to all other global nodes
72     sub g_broadcast {
73     snd $_, @_
74     for other_globals;
75     }
76 root 1.46
77 root 1.49 # add/replace a key in the database
78     sub g_add($$$$) {
79     $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]} =
80     $GLOBAL_DB {$_[1]}{$_[2]} = $_[3];
81 root 1.46
82 root 1.49 g_broadcast g_add => $_[1] => $_[2] => $_[3]
83     if exists $GLOBAL_SLAVE{$_[0]};
84 root 1.46
85 root 1.51 # tell subscribers we have added or replaced a key
86     snd $_ => g_chg2 => $_[1], $_[2], $_[3]
87     for keys %{ $GLOBAL_MON{$_[1]} };
88 root 1.49 }
89 root 1.46
90 root 1.49 # delete a key from the database
91     sub g_del($$$) {
92     delete $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]};
93 root 1.51 delete $LOCAL_DBS{$_[0]}{$_[1]} unless %{ $LOCAL_DBS{$_[0]}{$_[1]} };
94 root 1.49
95     g_broadcast g_del => $_[1] => $_[2]
96     if exists $GLOBAL_SLAVE{$_[0]};
97    
98     delete $GLOBAL_DB{$_[1]}{$_[2]};
99 root 1.51 delete $GLOBAL_DB{$_[1]} unless %{ $GLOBAL_DB{$_[1]} }; # could be moved below for
100 root 1.49
101     # check if other node maybe still has the key, then we don't delete, but add
102     for (values %LOCAL_DBS) {
103     if (exists $_->{$_[1]}{$_[2]}) {
104 root 1.51 my $val = $GLOBAL_DB{$_[1]}{$_[2]} = $_->{$_[1]}{$_[2]};
105    
106     # tell subscribers we have added or replaced a key
107     snd $_ => g_chg2 => $_[1], $_[2], $val
108     for keys %{ $GLOBAL_MON{$_[1]} };
109    
110 root 1.49 last;
111 root 1.46 }
112     }
113    
114 root 1.51 # tell subscribers key is actually gone
115     snd $_ => g_chg2 => $_[1], $_[2]
116     for keys %{ $GLOBAL_MON{$_[1]} };
117     }
118    
119     # set the whole (node-local) database - previous value must be empty
120     sub g_set($$) {
121     my ($node, $db) = @_;
122    
123     while (my ($f, $k) = each %$db) {
124     g_add $node, $f => $_ => delete $k->{$_}
125     for keys %$k;
126     }
127 root 1.49 }
128 root 1.46
129 root 1.49 # delete all keys from a database
130     sub g_clr($) {
131     my ($node) = @_;
132    
133     my $db = $LOCAL_DBS{$node};
134     while (my ($f, $k) = each %$db) {
135     g_del $node, $f => $_
136     for keys %$k;
137 root 1.46 }
138    
139 root 1.49 delete $LOCAL_DBS{$node};
140     }
141    
142     # gather node databases from slaves
143 root 1.46
144 root 1.49 # other node wants to make us the master
145     $NODE_REQ{g_slave} = sub {
146     my ($db) = @_;
147 root 1.46
148 root 1.49 my $node = $SRCNODE->{id};
149     undef $GLOBAL_SLAVE{$node};
150     g_set $node, $db;
151     };
152 root 1.46
153 root 1.49 $NODE_REQ{g_add} = sub {
154     &g_add ($SRCNODE->{id}, @_);
155     };
156 root 1.46
157 root 1.49 $NODE_REQ{g_del} = sub {
158     &g_del ($SRCNODE->{id}, @_);
159     };
160 root 1.46
161 root 1.49 $NODE_REQ{g_set} = sub {
162     g_set $SRCNODE->{id}, $_[0];
163     };
164 root 1.46
165 root 1.49 $NODE_REQ{g_find} = sub {
166     my ($node) = @_;
167 root 1.46
168 root 1.49 snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node};
169     };
170 root 1.46
171 root 1.55 $NODE_REQ{g_db_family} = sub {
172     my ($family, $id) = @_;
173     snd $SRCNODE->{id}, g_reply => $id, $GLOBAL_DB{$family} || {};
174     };
175    
176     $NODE_REQ{g_db_keys} = sub {
177     my ($family, $id) = @_;
178     snd $SRCNODE->{id}, g_reply => $id, [keys %{ $GLOBAL_DB{$family} } ];
179     };
180    
181     $NODE_REQ{g_db_values} = sub {
182     my ($family, $id) = @_;
183     snd $SRCNODE->{id}, g_reply => $id, [values %{ $GLOBAL_DB{$family} } ];
184     };
185    
186 root 1.49 # monitoring
187 root 1.47
188 root 1.49 sub g_slave_disconnect($) {
189     my ($node) = @_;
190 root 1.47
191 root 1.49 g_clr $node;
192 root 1.47
193 root 1.49 if (my $mon = delete $GLOBAL_SLAVE{$node}) {
194     while (my ($f, $fv) = each %$mon) {
195     delete $GLOBAL_MON{$f}{$_}
196     for keys %$fv;
197 root 1.51
198     delete $GLOBAL_MON{$f}
199     unless %{ $GLOBAL_MON{$f} };
200 root 1.47 }
201     }
202 root 1.49 }
203    
204 root 1.51 # g_mon0 family - stop monitoring
205 root 1.49 $NODE_REQ{g_mon0} = sub {
206 root 1.51 delete $GLOBAL_MON{$_[0]}{$SRCNODE->{id}};
207     delete $GLOBAL_MON{$_[0]} unless %{ $GLOBAL_MON{$_[0]} };
208    
209     delete $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]};
210 root 1.49 };
211    
212     # g_mon1 family key - start monitoring
213     $NODE_REQ{g_mon1} = sub {
214 root 1.51 undef $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]};
215     undef $GLOBAL_MON{$_[0]}{$SRCNODE->{id}};
216 root 1.49
217 root 1.51 snd $SRCNODE->{id}, g_chg1 => $_[0], $GLOBAL_DB{$_[0]};
218 root 1.49 };
219 root 1.47
220 root 1.49 #############################################################################
221     # switch to global mode
222    
223     $GLOBAL = 1;
224     $MASTER = $NODE;
225     undef $GLOBAL_SLAVE{$NODE}; # we are our own master (and slave)
226    
227     # regularly try to connect to global nodes - maybe use seeding code?
228     $MASTER_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub {
229     (add_node $_)->connect
230     for keys %{ $GLOBAL_DB{"'g"} };
231     };
232    
233     # instantly connect to other global nodes when we learn of them
234     # so we don't have to wait for the timer.
235     #TODO
236 root 1.47 # $GLOBAL_MON{"'g"}{""}{""} = sub {
237     # (add_node $_[1])->connect;
238     # };
239    
240 root 1.49 # delete slaves on node-down
241     # clear slave db on node-down
242     $MASTER_MON = mon_nodes sub {
243     g_slave_disconnect $_[0] unless $_[1];
244     };
245    
246     # tell everybody who connects that we are a global node
247     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
248     $_[0]{local_greeting}{global} = 1;
249     };
250    
251     # connect from a global node
252     sub g_global_connect {
253     my ($node) = @_;
254    
255     # we need to set this currently, as to avoid race conditions
256     # because it takes a while until the other global node tells us it is global.
257    
258     undef $GLOBAL_DB{"'g"}{$node};
259     undef $LOCAL_DBS{$node}{"'g"}{$node};
260    
261     # global nodes send all local databases, merged,
262     # as their local database to global nodes
263     my %db;
264    
265     for (values %LOCAL_DBS) {
266     while (my ($f, $fv) = each %$_) {
267     while (my ($k, $kv) = each %$fv) {
268     $db{$f}{$k} = $kv;
269 root 1.46 }
270     }
271 root 1.47 }
272    
273 root 1.49 snd $node => g_set => \%db;
274 root 1.4 }
275    
276 root 1.49 # send our database to every global node that connects
277     push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
278     return unless $_[0]{remote_greeting}{global};
279    
280     g_global_connect $_[0]{remote_node};
281     };
282    
283     # tell our master that we are global now
284     for (values %NODE) {
285     if ($_->{transport} && $_->{transport}{remote_greeting}{global}) {
286     snd $_->{id} => "g_global";
287     g_global_connect $_->{id};
288     }
289 root 1.20 }
290    
291 root 1.49 $NODE_REQ{g_global} = sub {
292     g_slave_disconnect $SRCNODE->{id};
293     $SRCNODE->{transport}{remote_greeting}{global} = 1;
294     g_global_connect $SRCNODE->{id};
295     };
296 root 1.20
297 root 1.54 # send the local db to themaster - outselves - to prime our global_db
298 root 1.53 snd $MASTER, g_set => \%LOCAL_DB;
299 root 1.49 # now add us to the set of global nodes
300     db_set "'g" => $NODE => undef;
301 root 1.3
302 root 1.1 =back
303    
304     =head1 SEE ALSO
305    
306     L<AnyEvent::MP>.
307    
308     =head1 AUTHOR
309    
310     Marc Lehmann <schmorp@schmorp.de>
311     http://home.schmorp.de/
312    
313     =cut
314    
315     1
316