ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.5
Committed: Sun Aug 16 05:02:24 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.4: +8 -3 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8     # -OR-
9     aemp addservice AnyEvent::MP::Global::
10    
11     =head1 DESCRIPTION
12    
13     This module provides an assortment of network-global functions: group name
14     registration and non-local locks.
15    
16     It will also try to build and maintain a full mesh of all network nodes.
17    
18 root 1.3 While it isn't mandatory to run the global services, running it on one
19     node will automatically run it on all nodes.
20    
21 root 1.1 =head1 GLOBALS AND FUNCTIONS
22    
23     =over 4
24    
25     =cut
26    
27     package AnyEvent::MP::Global;
28    
29     use common::sense;
30     use Carp ();
31     use MIME::Base64 ();
32    
33 root 1.4 use AnyEvent::Util ();
34    
35 root 1.1 use AnyEvent::MP;
36     use AnyEvent::MP::Kernel;
37    
38     our $VERSION = $AnyEvent::MP::VERSION;
39    
40 root 1.4 our %port; # our rendezvous port on the other side
41     our %lreg; # local registry, name => [pid...]
42     our %lmon; # local rgeistry monitoring name,pid => mon
43     our %greg; # global regstry, name => [pid...]
44    
45 root 1.5 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
46    
47 root 1.4 sub unreg_groups($) {
48     my ($noderef) = @_;
49    
50     my $qr = qr/^\Q$noderef\E(?:#|$)/;
51    
52     for my $group (values %greg) {
53     @$group = grep $_ !~ $qr, @$group;
54     }
55     }
56    
57     sub set_groups($$) {
58     my ($noderef, $lreg) = @_;
59 root 1.5
60     use Data::Dumper; warn Dumper $lreg;#d#
61 root 1.4 }
62    
63     =item $guard = register $port, $group
64    
65     Register the given (local!) port in the named global group C<$group>.
66    
67     The port will be unregistered automatically when the port is destroyed.
68    
69     When not called in void context, then a guard object will be returned that
70     will also cause the name to be unregistered when destroyed.
71    
72     =cut
73    
74     # register port from any node
75     sub _register {
76     my ($port, $group) = @_;
77    
78     push @{ $greg{$group} }, $port;
79     }
80    
81     # unregister local port
82     sub unregister {
83     my ($port, $group) = @_;
84    
85     delete $lmon{"$group\x00$port"};
86     @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
87    
88     _unregister $port, $group;
89    
90     snd $_, reg0 => $port, $group
91     for values %port;
92     }
93    
94     # register local port
95     sub register($$) {
96     my ($port, $group) = @_;
97    
98     port_is_local $port
99     or Carp::croak "AnyEvent::MP::Global::register can only be called for local ports, caught";
100    
101     $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
102     push @{ $lreg{$group} }, $port;
103    
104     snd $_, reg1 => $port, $group
105     for values %port;
106    
107     _register $port, $group;
108    
109     wantarray && AnyEvent::Util::guard { unregister $port, $group }
110     }
111    
112     sub start_node {
113     my ($noderef) = @_;
114    
115     return if exists $port{$noderef};
116 root 1.5 return if $noderef eq $NODE; # do not connect to ourselves
117 root 1.4
118     # establish connection
119 root 1.5 my $port = $port{$noderef} = spawn $noderef, "AnyEvent::MP::Global::connect", 0, $NODE;
120 root 1.4 # request any other nodes possibly known to us
121     mon $port, sub {
122     unreg_groups $noderef;
123     delete $port{$noderef};
124     };
125     snd $port, connect_nodes => up_nodes;
126     snd $port, set => \%greg;
127     }
128 root 1.3
129 root 1.5 # other nodes connect via this
130 root 1.3 sub connect {
131 root 1.5 my ($version, $noderef) = @_;
132 root 1.1
133 root 1.3 # monitor them, silently die
134     mon $noderef, psub { kil $SELF };
135    
136 root 1.4 rcv $SELF,
137     connect_nodes => sub {
138     for (@_) {
139     connect_node $_;
140     start_node $_;
141     }
142     },
143     set => sub {
144     unreg_groups $noderef;
145     set_groups $noderef, shift;
146     },
147     reg1 => \&_register,
148     reg0 => \&_unregister,
149     ;
150 root 1.3 }
151 root 1.1
152     sub mon_node {
153     my ($noderef, $is_up) = @_;
154    
155     if ($is_up) {
156 root 1.4 start_node $noderef;
157 root 1.2 } else {
158 root 1.4 unreg_groups $noderef;
159 root 1.1 }
160     #warn "node<$noderef,$is_up>\n";#d#
161     }
162    
163     mon_node $_, 1
164     for up_nodes;
165    
166     mon_nodes \&mon_node;
167    
168     =back
169    
170     =head1 SEE ALSO
171    
172     L<AnyEvent::MP>.
173    
174     =head1 AUTHOR
175    
176     Marc Lehmann <schmorp@schmorp.de>
177     http://home.schmorp.de/
178    
179     =cut
180    
181     1
182