ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.51
Committed: Sat Mar 3 19:43:41 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.50: +36 -36 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8    
9     =head1 DESCRIPTION
10    
11 root 1.42 This module maintains a fully-meshed network between global nodes and
12     tries to have connections with all nodes in the network.
13 root 1.1
14 root 1.15 It also manages named port groups - ports can register themselves in any
15     number of groups that will be available network-wide, which is great for
16     discovering services.
17 root 1.1
18 root 1.15 Running it on one node will automatically run it on all nodes, although,
19     at the moment, the global service is started by default anyways.
20 root 1.3
21 root 1.1 =head1 GLOBALS AND FUNCTIONS
22    
23     =over 4
24    
25     =cut
26    
27     package AnyEvent::MP::Global;
28    
29     use common::sense;
30     use Carp ();
31    
32 root 1.8 use AnyEvent ();
33 root 1.4
34 root 1.1 use AnyEvent::MP;
35     use AnyEvent::MP::Kernel;
36 root 1.20
37 root 1.5 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
38    
39 root 1.8 #############################################################################
40 root 1.44 # node protocol parts for global nodes
41 root 1.8
42 root 1.49 package AnyEvent::MP::Kernel;
43 root 1.8
44 root 1.49 # TODO: this is ugly (classical use vars vs. our),
45     # maybe this should go into MP::Kernel
46 root 1.26
47 root 1.49 our %NODE;
48     our $NODE;
49     our $LISTENER;
50 root 1.46
51 root 1.49 our $GLOBAL;
52     our $MASTER;
53     our $MASTER_MON;
54     our $MASTER_TIMER;
55 root 1.46
56 root 1.49 our %GLOBAL_SLAVE;
57 root 1.26
58 root 1.49 our %GLOBAL_DB; # global db
59     our %LOCAL_DBS; # local databases of other global nodes
60     our %LOCAL_DB; # this node database
61 root 1.46
62 root 1.49 our $SRCNODE;
63     our %NODE_REQ;
64 root 1.46
65 root 1.49 # only in global code
66 root 1.51 our %GLOBAL_MON; # monitors {family}
67 root 1.47
68 root 1.49 sub other_globals() {
69     grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} }
70     }
71 root 1.47
72 root 1.49 # broadcasts a message to all other global nodes
73     sub g_broadcast {
74     snd $_, @_
75     for other_globals;
76     }
77 root 1.46
78 root 1.49 # add/replace a key in the database
79     sub g_add($$$$) {
80     $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]} =
81     $GLOBAL_DB {$_[1]}{$_[2]} = $_[3];
82 root 1.46
83 root 1.49 g_broadcast g_add => $_[1] => $_[2] => $_[3]
84     if exists $GLOBAL_SLAVE{$_[0]};
85 root 1.46
86 root 1.51 # tell subscribers we have added or replaced a key
87     snd $_ => g_chg2 => $_[1], $_[2], $_[3]
88     for keys %{ $GLOBAL_MON{$_[1]} };
89 root 1.49 }
90 root 1.46
91 root 1.49 # delete a key from the database
92     sub g_del($$$) {
93     delete $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]};
94 root 1.51 delete $LOCAL_DBS{$_[0]}{$_[1]} unless %{ $LOCAL_DBS{$_[0]}{$_[1]} };
95 root 1.49
96     g_broadcast g_del => $_[1] => $_[2]
97     if exists $GLOBAL_SLAVE{$_[0]};
98    
99     delete $GLOBAL_DB{$_[1]}{$_[2]};
100 root 1.51 delete $GLOBAL_DB{$_[1]} unless %{ $GLOBAL_DB{$_[1]} }; # could be moved below for
101 root 1.49
102     # check if other node maybe still has the key, then we don't delete, but add
103     for (values %LOCAL_DBS) {
104     if (exists $_->{$_[1]}{$_[2]}) {
105 root 1.51 my $val = $GLOBAL_DB{$_[1]}{$_[2]} = $_->{$_[1]}{$_[2]};
106    
107     # tell subscribers we have added or replaced a key
108     snd $_ => g_chg2 => $_[1], $_[2], $val
109     for keys %{ $GLOBAL_MON{$_[1]} };
110    
111 root 1.49 last;
112 root 1.46 }
113     }
114    
115 root 1.51 # tell subscribers key is actually gone
116     snd $_ => g_chg2 => $_[1], $_[2]
117     for keys %{ $GLOBAL_MON{$_[1]} };
118     }
119    
120     # set the whole (node-local) database - previous value must be empty
121     sub g_set($$) {
122     my ($node, $db) = @_;
123    
124     while (my ($f, $k) = each %$db) {
125     g_add $node, $f => $_ => delete $k->{$_}
126     for keys %$k;
127     }
128 root 1.49 }
129 root 1.46
130 root 1.49 # delete all keys from a database
131     sub g_clr($) {
132     my ($node) = @_;
133    
134     my $db = $LOCAL_DBS{$node};
135     while (my ($f, $k) = each %$db) {
136     g_del $node, $f => $_
137     for keys %$k;
138 root 1.46 }
139    
140 root 1.49 delete $LOCAL_DBS{$node};
141     }
142    
143     # gather node databases from slaves
144 root 1.46
145 root 1.49 # other node wants to make us the master
146     $NODE_REQ{g_slave} = sub {
147     my ($db) = @_;
148 root 1.46
149 root 1.49 my $node = $SRCNODE->{id};
150     undef $GLOBAL_SLAVE{$node};
151     g_set $node, $db;
152     };
153 root 1.46
154 root 1.49 $NODE_REQ{g_add} = sub {
155     &g_add ($SRCNODE->{id}, @_);
156     };
157 root 1.46
158 root 1.49 $NODE_REQ{g_del} = sub {
159     &g_del ($SRCNODE->{id}, @_);
160     };
161 root 1.46
162 root 1.49 $NODE_REQ{g_set} = sub {
163     g_set $SRCNODE->{id}, $_[0];
164     };
165 root 1.46
166 root 1.49 $NODE_REQ{g_find} = sub {
167     my ($node) = @_;
168 root 1.46
169 root 1.49 snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node};
170     };
171 root 1.46
172 root 1.49 # monitoring
173 root 1.47
174 root 1.49 sub g_slave_disconnect($) {
175     my ($node) = @_;
176 root 1.47
177 root 1.49 g_clr $node;
178 root 1.47
179 root 1.49 if (my $mon = delete $GLOBAL_SLAVE{$node}) {
180     while (my ($f, $fv) = each %$mon) {
181     delete $GLOBAL_MON{$f}{$_}
182     for keys %$fv;
183 root 1.51
184     delete $GLOBAL_MON{$f}
185     unless %{ $GLOBAL_MON{$f} };
186 root 1.47 }
187     }
188 root 1.49 }
189    
190 root 1.51 # g_mon0 family - stop monitoring
191 root 1.49 $NODE_REQ{g_mon0} = sub {
192 root 1.51 delete $GLOBAL_MON{$_[0]}{$SRCNODE->{id}};
193     delete $GLOBAL_MON{$_[0]} unless %{ $GLOBAL_MON{$_[0]} };
194    
195     delete $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]};
196 root 1.49 };
197    
198     # g_mon1 family key - start monitoring
199     $NODE_REQ{g_mon1} = sub {
200 root 1.51 undef $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]};
201     undef $GLOBAL_MON{$_[0]}{$SRCNODE->{id}};
202 root 1.49
203 root 1.51 snd $SRCNODE->{id}, g_chg1 => $_[0], $GLOBAL_DB{$_[0]};
204 root 1.49 };
205 root 1.47
206 root 1.49 #############################################################################
207     # switch to global mode
208    
209     $GLOBAL = 1;
210     $MASTER = $NODE;
211     undef $GLOBAL_SLAVE{$NODE}; # we are our own master (and slave)
212     $LOCAL_DBS{$NODE} = { %LOCAL_DB };
213    
214     # regularly try to connect to global nodes - maybe use seeding code?
215     $MASTER_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub {
216     (add_node $_)->connect
217     for keys %{ $GLOBAL_DB{"'g"} };
218     };
219    
220     # instantly connect to other global nodes when we learn of them
221     # so we don't have to wait for the timer.
222     #TODO
223 root 1.47 # $GLOBAL_MON{"'g"}{""}{""} = sub {
224     # (add_node $_[1])->connect;
225     # };
226    
227 root 1.49 # delete slaves on node-down
228     # clear slave db on node-down
229     $MASTER_MON = mon_nodes sub {
230     g_slave_disconnect $_[0] unless $_[1];
231     };
232    
233     # tell everybody who connects that we are a global node
234     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
235     $_[0]{local_greeting}{global} = 1;
236     };
237    
238     # connect from a global node
239     sub g_global_connect {
240     my ($node) = @_;
241    
242     # we need to set this currently, as to avoid race conditions
243     # because it takes a while until the other global node tells us it is global.
244    
245     undef $GLOBAL_DB{"'g"}{$node};
246     undef $LOCAL_DBS{$node}{"'g"}{$node};
247    
248     # global nodes send all local databases, merged,
249     # as their local database to global nodes
250     my %db;
251    
252     for (values %LOCAL_DBS) {
253     while (my ($f, $fv) = each %$_) {
254     while (my ($k, $kv) = each %$fv) {
255     $db{$f}{$k} = $kv;
256 root 1.46 }
257     }
258 root 1.47 }
259    
260 root 1.49 snd $node => g_set => \%db;
261 root 1.4 }
262    
263 root 1.49 # send our database to every global node that connects
264     push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
265     return unless $_[0]{remote_greeting}{global};
266    
267     g_global_connect $_[0]{remote_node};
268     };
269    
270     # tell our master that we are global now
271     for (values %NODE) {
272     if ($_->{transport} && $_->{transport}{remote_greeting}{global}) {
273     snd $_->{id} => "g_global";
274     g_global_connect $_->{id};
275     }
276 root 1.20 }
277    
278 root 1.49 $NODE_REQ{g_global} = sub {
279     g_slave_disconnect $SRCNODE->{id};
280     $SRCNODE->{transport}{remote_greeting}{global} = 1;
281     g_global_connect $SRCNODE->{id};
282     };
283 root 1.20
284 root 1.49 # now add us to the set of global nodes
285     db_set "'g" => $NODE => undef;
286 root 1.3
287 root 1.1 =back
288    
289     =head1 SEE ALSO
290    
291     L<AnyEvent::MP>.
292    
293     =head1 AUTHOR
294    
295     Marc Lehmann <schmorp@schmorp.de>
296     http://home.schmorp.de/
297    
298     =cut
299    
300     1
301