ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.6
Committed: Mon Aug 17 03:33:18 2009 UTC (14 years, 11 months ago) by root
Branch: MAIN
Changes since 1.5: +11 -3 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8     # -OR-
9     aemp addservice AnyEvent::MP::Global::
10    
11     =head1 DESCRIPTION
12    
13     This module provides an assortment of network-global functions: group name
14     registration and non-local locks.
15    
16     It will also try to build and maintain a full mesh of all network nodes.
17    
18 root 1.3 While it isn't mandatory to run the global services, running it on one
19     node will automatically run it on all nodes.
20    
21 root 1.1 =head1 GLOBALS AND FUNCTIONS
22    
23     =over 4
24    
25     =cut
26    
27     package AnyEvent::MP::Global;
28    
29     use common::sense;
30     use Carp ();
31     use MIME::Base64 ();
32    
33 root 1.4 use AnyEvent::Util ();
34    
35 root 1.1 use AnyEvent::MP;
36     use AnyEvent::MP::Kernel;
37    
38     our $VERSION = $AnyEvent::MP::VERSION;
39    
40 root 1.4 our %port; # our rendezvous port on the other side
41     our %lreg; # local registry, name => [pid...]
42     our %lmon; # local rgeistry monitoring name,pid => mon
43     our %greg; # global regstry, name => [pid...]
44    
45 root 1.5 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
46    
47 root 1.4 sub unreg_groups($) {
48     my ($noderef) = @_;
49    
50     my $qr = qr/^\Q$noderef\E(?:#|$)/;
51    
52     for my $group (values %greg) {
53     @$group = grep $_ !~ $qr, @$group;
54     }
55     }
56    
57     sub set_groups($$) {
58     my ($noderef, $lreg) = @_;
59 root 1.5
60 root 1.6 while (my ($k, $v) = each %$lreg) {
61     push @{ $greg{$k} }, @$v;
62     }
63 root 1.4 }
64    
65     =item $guard = register $port, $group
66    
67     Register the given (local!) port in the named global group C<$group>.
68    
69     The port will be unregistered automatically when the port is destroyed.
70    
71     When not called in void context, then a guard object will be returned that
72     will also cause the name to be unregistered when destroyed.
73    
74     =cut
75    
76     # register port from any node
77     sub _register {
78     my ($port, $group) = @_;
79    
80     push @{ $greg{$group} }, $port;
81     }
82    
83 root 1.6 # unregister port from any node
84     sub _unregister {
85     my ($port, $group) = @_;
86    
87     @{ $greg{$group} } = grep $_ ne $port, @{ $greg{$group} };
88     }
89    
90 root 1.4 # unregister local port
91     sub unregister {
92     my ($port, $group) = @_;
93    
94     delete $lmon{"$group\x00$port"};
95     @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
96    
97     _unregister $port, $group;
98    
99     snd $_, reg0 => $port, $group
100     for values %port;
101     }
102    
103     # register local port
104     sub register($$) {
105     my ($port, $group) = @_;
106    
107     port_is_local $port
108     or Carp::croak "AnyEvent::MP::Global::register can only be called for local ports, caught";
109    
110     $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
111     push @{ $lreg{$group} }, $port;
112    
113     snd $_, reg1 => $port, $group
114     for values %port;
115    
116     _register $port, $group;
117    
118     wantarray && AnyEvent::Util::guard { unregister $port, $group }
119     }
120    
121     sub start_node {
122     my ($noderef) = @_;
123    
124     return if exists $port{$noderef};
125 root 1.5 return if $noderef eq $NODE; # do not connect to ourselves
126 root 1.4
127     # establish connection
128 root 1.5 my $port = $port{$noderef} = spawn $noderef, "AnyEvent::MP::Global::connect", 0, $NODE;
129 root 1.4 # request any other nodes possibly known to us
130     mon $port, sub {
131     unreg_groups $noderef;
132     delete $port{$noderef};
133     };
134     snd $port, connect_nodes => up_nodes;
135 root 1.6 snd $port, set => \%lreg;
136 root 1.4 }
137 root 1.3
138 root 1.5 # other nodes connect via this
139 root 1.3 sub connect {
140 root 1.5 my ($version, $noderef) = @_;
141 root 1.1
142 root 1.3 # monitor them, silently die
143     mon $noderef, psub { kil $SELF };
144    
145 root 1.4 rcv $SELF,
146     connect_nodes => sub {
147     for (@_) {
148     connect_node $_;
149     start_node $_;
150     }
151     },
152     set => sub {
153     set_groups $noderef, shift;
154     },
155     reg1 => \&_register,
156     reg0 => \&_unregister,
157     ;
158 root 1.3 }
159 root 1.1
160     sub mon_node {
161     my ($noderef, $is_up) = @_;
162    
163     if ($is_up) {
164 root 1.4 start_node $noderef;
165 root 1.2 } else {
166 root 1.4 unreg_groups $noderef;
167 root 1.1 }
168     #warn "node<$noderef,$is_up>\n";#d#
169     }
170    
171     mon_node $_, 1
172     for up_nodes;
173    
174     mon_nodes \&mon_node;
175    
176     =back
177    
178     =head1 SEE ALSO
179    
180     L<AnyEvent::MP>.
181    
182     =head1 AUTHOR
183    
184     Marc Lehmann <schmorp@schmorp.de>
185     http://home.schmorp.de/
186    
187     =cut
188    
189     1
190