=head1 NAME AnyEvent::MP::Global - some network-global services =head1 SYNOPSIS use AnyEvent::MP::Global; =head1 DESCRIPTION This module maintains a fully-meshed network between global nodes and tries to have connections with all nodes in the network. It also manages named port groups - ports can register themselves in any number of groups that will be available network-wide, which is great for discovering services. Running it on one node will automatically run it on all nodes, although, at the moment, the global service is started by default anyways. =head1 GLOBALS AND FUNCTIONS =over 4 =cut package AnyEvent::MP::Global; use common::sense; use Carp (); use AnyEvent (); use AnyEvent::MP; use AnyEvent::MP::Kernel; $AnyEvent::MP::Kernel::WARN->(7, "starting global service."); ############################################################################# # node protocol parts for global nodes package AnyEvent::MP::Kernel; # TODO: this is ugly (classical use vars vs. our), # maybe this should go into MP::Kernel our %NODE; our $NODE; our $GLOBAL; our $MASTER; our $MASTER_MON; our $MASTER_TIMER; our %GLOBAL_SLAVE; our %GLOBAL_DB; # global db our %LOCAL_DBS; # local databases of other global nodes our %LOCAL_DB; # this node database our $SRCNODE; our %NODE_REQ; # only in global code our %GLOBAL_MON; # monitors {family} sub other_globals() { grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} } } # broadcasts a message to all other global nodes sub g_broadcast { snd $_, @_ for other_globals; } # add/replace a key in the database sub g_add($$$$) { $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]} = $GLOBAL_DB {$_[1]}{$_[2]} = $_[3]; g_broadcast g_add => $_[1] => $_[2] => $_[3] if exists $GLOBAL_SLAVE{$_[0]}; # tell subscribers we have added or replaced a key snd $_ => g_chg2 => $_[1], $_[2], $_[3] for keys %{ $GLOBAL_MON{$_[1]} }; } # delete a key from the database sub g_del($$$) { delete $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]}; delete $LOCAL_DBS{$_[0]}{$_[1]} unless %{ $LOCAL_DBS{$_[0]}{$_[1]} }; g_broadcast g_del => $_[1] => $_[2] if exists $GLOBAL_SLAVE{$_[0]}; delete $GLOBAL_DB{$_[1]}{$_[2]}; delete $GLOBAL_DB{$_[1]} unless %{ $GLOBAL_DB{$_[1]} }; # could be moved below for # check if other node maybe still has the key, then we don't delete, but add for (values %LOCAL_DBS) { if (exists $_->{$_[1]}{$_[2]}) { my $val = $GLOBAL_DB{$_[1]}{$_[2]} = $_->{$_[1]}{$_[2]}; # tell subscribers we have added or replaced a key snd $_ => g_chg2 => $_[1], $_[2], $val for keys %{ $GLOBAL_MON{$_[1]} }; last; } } # tell subscribers key is actually gone snd $_ => g_chg2 => $_[1], $_[2] for keys %{ $GLOBAL_MON{$_[1]} }; } # set the whole (node-local) database - previous value must be empty sub g_set($$) { my ($node, $db) = @_; while (my ($f, $k) = each %$db) { g_add $node, $f => $_ => delete $k->{$_} for keys %$k; } } # delete all keys from a database sub g_clr($) { my ($node) = @_; my $db = $LOCAL_DBS{$node}; while (my ($f, $k) = each %$db) { g_del $node, $f => $_ for keys %$k; } delete $LOCAL_DBS{$node}; } # gather node databases from slaves # other node wants to make us the master $NODE_REQ{g_slave} = sub { my ($db) = @_; my $node = $SRCNODE->{id}; undef $GLOBAL_SLAVE{$node}; g_set $node, $db; }; $NODE_REQ{g_add} = sub { &g_add ($SRCNODE->{id}, @_); }; $NODE_REQ{g_del} = sub { &g_del ($SRCNODE->{id}, @_); }; $NODE_REQ{g_set} = sub { g_set $SRCNODE->{id}, $_[0]; }; $NODE_REQ{g_find} = sub { my ($node) = @_; snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node}; }; # monitoring sub g_slave_disconnect($) { my ($node) = @_; g_clr $node; if (my $mon = delete $GLOBAL_SLAVE{$node}) { while (my ($f, $fv) = each %$mon) { delete $GLOBAL_MON{$f}{$_} for keys %$fv; delete $GLOBAL_MON{$f} unless %{ $GLOBAL_MON{$f} }; } } } # g_mon0 family - stop monitoring $NODE_REQ{g_mon0} = sub { delete $GLOBAL_MON{$_[0]}{$SRCNODE->{id}}; delete $GLOBAL_MON{$_[0]} unless %{ $GLOBAL_MON{$_[0]} }; delete $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]}; }; # g_mon1 family key - start monitoring $NODE_REQ{g_mon1} = sub { undef $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]}; undef $GLOBAL_MON{$_[0]}{$SRCNODE->{id}}; snd $SRCNODE->{id}, g_chg1 => $_[0], $GLOBAL_DB{$_[0]}; }; ############################################################################# # switch to global mode $GLOBAL = 1; $MASTER = $NODE; undef $GLOBAL_SLAVE{$NODE}; # we are our own master (and slave) # regularly try to connect to global nodes - maybe use seeding code? $MASTER_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub { (add_node $_)->connect for keys %{ $GLOBAL_DB{"'g"} }; }; # instantly connect to other global nodes when we learn of them # so we don't have to wait for the timer. #TODO # $GLOBAL_MON{"'g"}{""}{""} = sub { # (add_node $_[1])->connect; # }; # delete slaves on node-down # clear slave db on node-down $MASTER_MON = mon_nodes sub { g_slave_disconnect $_[0] unless $_[1]; }; # tell everybody who connects that we are a global node push @AnyEvent::MP::Transport::HOOK_GREET, sub { $_[0]{local_greeting}{global} = 1; }; # connect from a global node sub g_global_connect { my ($node) = @_; # we need to set this currently, as to avoid race conditions # because it takes a while until the other global node tells us it is global. undef $GLOBAL_DB{"'g"}{$node}; undef $LOCAL_DBS{$node}{"'g"}{$node}; # global nodes send all local databases, merged, # as their local database to global nodes my %db; for (values %LOCAL_DBS) { while (my ($f, $fv) = each %$_) { while (my ($k, $kv) = each %$fv) { $db{$f}{$k} = $kv; } } } snd $node => g_set => \%db; } # send our database to every global node that connects push @AnyEvent::MP::Transport::HOOK_CONNECT, sub { return unless $_[0]{remote_greeting}{global}; g_global_connect $_[0]{remote_node}; }; # tell our master that we are global now for (values %NODE) { if ($_->{transport} && $_->{transport}{remote_greeting}{global}) { snd $_->{id} => "g_global"; g_global_connect $_->{id}; } } $NODE_REQ{g_global} = sub { g_slave_disconnect $SRCNODE->{id}; $SRCNODE->{transport}{remote_greeting}{global} = 1; g_global_connect $SRCNODE->{id}; }; snd $MASTER, g_set => \%LOCAL_DB; # now add us to the set of global nodes db_set "'g" => $NODE => undef; =back =head1 SEE ALSO L. =head1 AUTHOR Marc Lehmann http://home.schmorp.de/ =cut 1