ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Global.pm (file contents):
Revision 1.7 by root, Mon Aug 17 03:50:28 2009 UTC vs.
Revision 1.8 by root, Thu Aug 27 21:29:37 2009 UTC

28 28
29use common::sense; 29use common::sense;
30use Carp (); 30use Carp ();
31use MIME::Base64 (); 31use MIME::Base64 ();
32 32
33use AnyEvent ();
33use AnyEvent::Util (); 34use AnyEvent::Util ();
34 35
35use AnyEvent::MP; 36use AnyEvent::MP;
36use AnyEvent::MP::Kernel; 37use AnyEvent::MP::Kernel;
37 38
38our $VERSION = $AnyEvent::MP::VERSION; 39our $VERSION = $AnyEvent::MP::VERSION;
40
41our %addr; # port ID => [address...] mapping
39 42
40our %port; # our rendezvous port on the other side 43our %port; # our rendezvous port on the other side
41our %lreg; # local registry, name => [pid...] 44our %lreg; # local registry, name => [pid...]
42our %lmon; # local rgeistry monitoring name,pid => mon 45our %lmon; # local rgeistry monitoring name,pid => mon
43our %greg; # global regstry, name => [pid...] 46our %greg; # global regstry, name => [pid...]
44 47
48our $nodecnt;
49
45$AnyEvent::MP::Kernel::WARN->(7, "starting global service."); 50$AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
46 51
52#############################################################################
53# seednodes
54
55our @SEEDS;
56our $SEED_WATCHER;
57
58sub seed_connect {
59 my ($seed) = @_;
60
61 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
62 or Carp::croak "$seed: unparsable seed address";
63
64 # ughhh
65 my $tp; $tp = AnyEvent::MP::Transport::mp_connect $host, $port,
66 release => \&more_seeding,
67 sub {
68 $tp = shift;
69 $tp->{keepalive} = $tp;
70 },
71 ;
72}
73
74sub more_seeding {
75 return if $nodecnt;
76 return unless @SEEDS;
77
78 $AnyEvent::MP::Kernel::WARN->(9, "no nodes connected, seeding.");
79
80 seed_connect $SEEDS[rand @SEEDS];
81}
82
83sub set_seeds(@) {
84 @SEEDS = @_;
85
86 $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
87
88 seed_connect $_
89 for @SEEDS;
90}
91
92#############################################################################
93
47sub unreg_groups($) { 94sub unreg_groups($) {
48 my ($noderef) = @_; 95 my ($node) = @_;
49 96
50 my $qr = qr/^\Q$noderef\E(?:#|$)/; 97 my $qr = qr/^\Q$node\E(?:#|$)/;
51 98
52 for my $group (values %greg) { 99 for my $group (values %greg) {
53 @$group = grep $_ !~ $qr, @$group; 100 @$group = grep $_ !~ $qr, @$group;
54 } 101 }
55} 102}
56 103
57sub set_groups($$) { 104sub set_groups($$) {
58 my ($noderef, $lreg) = @_; 105 my ($node, $lreg) = @_;
59 106
60 while (my ($k, $v) = each %$lreg) { 107 while (my ($k, $v) = each %$lreg) {
61 push @{ $greg{$k} }, @$v; 108 push @{ $greg{$k} }, @$v;
62 } 109 }
63} 110}
131 178
132 wantarray && AnyEvent::Util::guard { unregister $port, $group } 179 wantarray && AnyEvent::Util::guard { unregister $port, $group }
133} 180}
134 181
135sub start_node { 182sub start_node {
136 my ($noderef) = @_; 183 my ($node) = @_;
137 184
138 return if exists $port{$noderef}; 185 return if exists $port{$node};
139 return if $noderef eq $NODE; # do not connect to ourselves 186 return if $node eq $NODE; # do not connect to ourselves
140 187
141 # establish connection 188 # establish connection
142 my $port = $port{$noderef} = spawn $noderef, "AnyEvent::MP::Global::connect", 0, $NODE; 189 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE;
143 # request any other nodes possibly known to us 190
144 mon $port, sub { 191 mon $port, sub {
145 unreg_groups $noderef; 192 unreg_groups $node;
146 delete $port{$noderef}; 193 delete $port{$node};
147 }; 194 };
195
196 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER if @$AnyEvent::MP::Kernel::LISTENER;
148 snd $port, connect_nodes => up_nodes; 197 snd $port, connect_nodes => \%addr if %addr;
149 snd $port, set => \%lreg; 198 snd $port, set => \%lreg if %lreg;
150} 199}
151 200
152# other nodes connect via this 201# other nodes connect via this
153sub connect { 202sub connect {
154 my ($version, $noderef) = @_; 203 my ($version, $node) = @_;
155 204
156 # monitor them, silently die 205 # monitor them, silently die
157 mon $noderef, psub { kil $SELF }; 206 mon $node, psub { kil $SELF };
158 207
159 rcv $SELF, 208 rcv $SELF,
209 addr => sub {
210 my $addresses = shift;
211 $AnyEvent::MP::Kernel::WARN->(9, "$node told us it's addresses (@$addresses).");
212 $addr{$node} = $addresses;
213 },
160 connect_nodes => sub { 214 connect_nodes => sub {
161 for (@_) { 215 my ($kv) = @_;
162 connect_node $_; 216
217 use JSON::XS;#d#
218 my $kv_txt = JSON::XS->new->encode ($kv);#d#
219 $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
220
221 while (my ($id, $addresses) = each %$kv) {
222 my $node = AnyEvent::MP::Kernel::add_node $id;
223 $node->connect (@$addresses);
163 start_node $_; 224 start_node $id;
164 } 225 }
165 }, 226 },
227 find_node => sub {
228 my ($othernode) = @_;
229
230 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
231 snd $port{$node}, connect_nodes => { $othernode => $addr{$othernode} }
232 if $addr{$othernode};
233 },
166 set => sub { 234 set => sub {
167 set_groups $noderef, shift; 235 set_groups $node, shift;
168 }, 236 },
169 reg1 => \&_register, 237 reg1 => \&_register,
170 reg0 => \&_unregister, 238 reg0 => \&_unregister,
171 ; 239 ;
172} 240}
173 241
174sub mon_node { 242sub mon_node {
175 my ($noderef, $is_up) = @_; 243 my ($node, $is_up) = @_;
176 244
177 if ($is_up) { 245 if ($is_up) {
246 ++$nodecnt;
178 start_node $noderef; 247 start_node $node;
179 } else { 248 } else {
249 --$nodecnt;
250 more_seeding unless $nodecnt;
180 unreg_groups $noderef; 251 unreg_groups $node;
252
253 # forget about the node
254 delete $addr{$node};
255 # ask other nodes if they know the node
256 snd $_, find_node => $node
257 for values %port;
181 } 258 }
182 #warn "node<$noderef,$is_up>\n";#d# 259 #warn "node<$node,$is_up>\n";#d#
183} 260}
184 261
185mon_node $_, 1 262mon_node $_, 1
186 for up_nodes; 263 for up_nodes;
187 264

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines