ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.7
Committed: Mon Aug 17 03:50:28 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
CVS Tags: rel-0_8
Changes since 1.6: +14 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8     # -OR-
9     aemp addservice AnyEvent::MP::Global::
10    
11     =head1 DESCRIPTION
12    
13     This module provides an assortment of network-global functions: group name
14     registration and non-local locks.
15    
16     It will also try to build and maintain a full mesh of all network nodes.
17    
18 root 1.3 While it isn't mandatory to run the global services, running it on one
19     node will automatically run it on all nodes.
20    
21 root 1.1 =head1 GLOBALS AND FUNCTIONS
22    
23     =over 4
24    
25     =cut
26    
27     package AnyEvent::MP::Global;
28    
29     use common::sense;
30     use Carp ();
31     use MIME::Base64 ();
32    
33 root 1.4 use AnyEvent::Util ();
34    
35 root 1.1 use AnyEvent::MP;
36     use AnyEvent::MP::Kernel;
37    
38     our $VERSION = $AnyEvent::MP::VERSION;
39    
40 root 1.4 our %port; # our rendezvous port on the other side
41     our %lreg; # local registry, name => [pid...]
42     our %lmon; # local rgeistry monitoring name,pid => mon
43     our %greg; # global regstry, name => [pid...]
44    
45 root 1.5 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
46    
47 root 1.4 sub unreg_groups($) {
48     my ($noderef) = @_;
49    
50     my $qr = qr/^\Q$noderef\E(?:#|$)/;
51    
52     for my $group (values %greg) {
53     @$group = grep $_ !~ $qr, @$group;
54     }
55     }
56    
57     sub set_groups($$) {
58     my ($noderef, $lreg) = @_;
59 root 1.5
60 root 1.6 while (my ($k, $v) = each %$lreg) {
61     push @{ $greg{$k} }, @$v;
62     }
63 root 1.4 }
64    
65 root 1.7 =item $ports = find $group
66    
67     Returns all the ports currently registered to the given group (as
68     read-only array reference). When the group has no registered members,
69     return C<undef>.
70    
71     =cut
72    
73     sub find($) {
74     @{ $greg{$_[0]} }
75     ? $greg{$_[0]}
76     : undef
77     }
78    
79 root 1.4 =item $guard = register $port, $group
80    
81     Register the given (local!) port in the named global group C<$group>.
82    
83     The port will be unregistered automatically when the port is destroyed.
84    
85     When not called in void context, then a guard object will be returned that
86     will also cause the name to be unregistered when destroyed.
87    
88     =cut
89    
90     # register port from any node
91     sub _register {
92     my ($port, $group) = @_;
93    
94     push @{ $greg{$group} }, $port;
95     }
96    
97 root 1.6 # unregister port from any node
98     sub _unregister {
99     my ($port, $group) = @_;
100    
101     @{ $greg{$group} } = grep $_ ne $port, @{ $greg{$group} };
102     }
103    
104 root 1.4 # unregister local port
105     sub unregister {
106     my ($port, $group) = @_;
107    
108     delete $lmon{"$group\x00$port"};
109     @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
110    
111     _unregister $port, $group;
112    
113     snd $_, reg0 => $port, $group
114     for values %port;
115     }
116    
117     # register local port
118     sub register($$) {
119     my ($port, $group) = @_;
120    
121     port_is_local $port
122     or Carp::croak "AnyEvent::MP::Global::register can only be called for local ports, caught";
123    
124     $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
125     push @{ $lreg{$group} }, $port;
126    
127     snd $_, reg1 => $port, $group
128     for values %port;
129    
130     _register $port, $group;
131    
132     wantarray && AnyEvent::Util::guard { unregister $port, $group }
133     }
134    
135     sub start_node {
136     my ($noderef) = @_;
137    
138     return if exists $port{$noderef};
139 root 1.5 return if $noderef eq $NODE; # do not connect to ourselves
140 root 1.4
141     # establish connection
142 root 1.5 my $port = $port{$noderef} = spawn $noderef, "AnyEvent::MP::Global::connect", 0, $NODE;
143 root 1.4 # request any other nodes possibly known to us
144     mon $port, sub {
145     unreg_groups $noderef;
146     delete $port{$noderef};
147     };
148     snd $port, connect_nodes => up_nodes;
149 root 1.6 snd $port, set => \%lreg;
150 root 1.4 }
151 root 1.3
152 root 1.5 # other nodes connect via this
153 root 1.3 sub connect {
154 root 1.5 my ($version, $noderef) = @_;
155 root 1.1
156 root 1.3 # monitor them, silently die
157     mon $noderef, psub { kil $SELF };
158    
159 root 1.4 rcv $SELF,
160     connect_nodes => sub {
161     for (@_) {
162     connect_node $_;
163     start_node $_;
164     }
165     },
166     set => sub {
167     set_groups $noderef, shift;
168     },
169     reg1 => \&_register,
170     reg0 => \&_unregister,
171     ;
172 root 1.3 }
173 root 1.1
174     sub mon_node {
175     my ($noderef, $is_up) = @_;
176    
177     if ($is_up) {
178 root 1.4 start_node $noderef;
179 root 1.2 } else {
180 root 1.4 unreg_groups $noderef;
181 root 1.1 }
182     #warn "node<$noderef,$is_up>\n";#d#
183     }
184    
185     mon_node $_, 1
186     for up_nodes;
187    
188     mon_nodes \&mon_node;
189    
190     =back
191    
192     =head1 SEE ALSO
193    
194     L<AnyEvent::MP>.
195    
196     =head1 AUTHOR
197    
198     Marc Lehmann <schmorp@schmorp.de>
199     http://home.schmorp.de/
200    
201     =cut
202    
203     1
204