ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Base.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Base.pm (file contents):
Revision 1.12 by root, Wed Aug 5 19:55:58 2009 UTC vs.
Revision 1.13 by root, Wed Aug 5 22:40:51 2009 UTC

95 $uniq = MIME::Base64::encode_base64 $uniq, ""; 95 $uniq = MIME::Base64::encode_base64 $uniq, "";
96 $uniq =~ s/=+$//; 96 $uniq =~ s/=+$//;
97 $uniq 97 $uniq
98} 98}
99 99
100our $NODE = unpack "H*", nonce 16;
100our $UNIQ = gen_uniq; # per-process/node unique cookie 101our $UNIQ = gen_uniq; # per-process/node unique cookie
101our $ID = "a"; 102our $ID = "a";
102our $PUBLIC = 0; 103our $PUBLIC = 0;
103our $NODE = unpack "H*", nonce 16; 104our $SLAVE = 0;
104 105
105our %NODE; # node id to transport mapping, or "undef", for local node 106our %NODE; # node id to transport mapping, or "undef", for local node
106our (%PORT, %PORT_DATA); # local ports 107our (%PORT, %PORT_DATA); # local ports
107 108
108our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb) 109our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
125} 126}
126 127
127sub _ANY_() { 1 } 128sub _ANY_() { 1 }
128sub _any_() { \&_ANY_ } 129sub _any_() { \&_ANY_ }
129 130
131sub TRACE() { 1 }
132
130sub _inject { 133sub _inject {
134 warn "$SRCNODE->{noderef} -> @_\n" if TRACE;#d#
131 &{ $PORT{+shift} or return }; 135 &{ $PORT{+shift} or return };
132} 136}
133 137
134sub add_node { 138sub add_node {
135 my ($noderef) = @_; 139 my ($noderef) = @_;
141 return $NODE{$noderef} = $NODE{$_} 145 return $NODE{$noderef} = $NODE{$_}
142 if exists $NODE{$_}; 146 if exists $NODE{$_};
143 } 147 }
144 148
145 # new node, check validity 149 # new node, check validity
150 my $node;
146 151
152 if ($noderef =~ /^slave\/.+$/) {
153 $node = new AnyEvent::MP::Node::Slave $noderef;
154
155 } else {
147 for (split /,/, $noderef) { 156 for (split /,/, $noderef) {
148 my ($host, $port) = AnyEvent::Socket::parse_hostport $_ 157 my ($host, $port) = AnyEvent::Socket::parse_hostport $_
149 or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)"; 158 or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
150 159
151 $port > 0 160 $port > 0
152 or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)"; 161 or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
153 162
154 AnyEvent::Socket::parse_address $host 163 AnyEvent::Socket::parse_address $host
155 or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)"; 164 or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
156 } 165 }
157 166
158 # TODO: for indirect sends, use a different class 167 # TODO: for indirect sends, use a different class
159 my $node = new AnyEvent::MP::Node::Direct $noderef; 168 $node = new AnyEvent::MP::Node::Direct $noderef;
169 }
160 170
161 $NODE{$_} = $node 171 $NODE{$_} = $node
162 for $noderef, split /,/, $noderef; 172 for $noderef, split /,/, $noderef;
163 173
164 $node 174 $node
165} 175}
166 176
167sub snd(@) { 177sub snd(@) {
168 my ($noderef, $port) = split /#/, shift, 2; 178 my ($noderef, $portid) = split /#/, shift, 2;
179
180 warn "$noderef <- $portid @_\n" if TRACE;#d#
169 181
170 ($NODE{$noderef} || add_node $noderef) 182 ($NODE{$noderef} || add_node $noderef)
171 ->send ([$port, @_]); 183 ->send (["$portid", @_]);
172} 184}
173 185
174sub kil(@) { 186sub kil(@) {
175 my ($noderef, $port) = split /#/, shift, 2; 187 my ($noderef, $portid) = split /#/, shift, 2;
176 188
177 length $port or Carp::cluck "yuk\n";#d#
178 length $port 189 length $portid
179 or Carp::croak "$noderef: killing a node port is not allowed, caught"; 190 or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
180 191
181 ($NODE{$noderef} || add_node $noderef) 192 ($NODE{$noderef} || add_node $noderef)
182 ->kill ($port, @_); 193 ->kill ("$portid", @_);
183} 194}
184 195
185sub resolve_node($) { 196sub resolve_node($) {
186 my ($noderef) = @_; 197 my ($noderef) = @_;
187 198
252 263
253sub initialise_node($;@) { 264sub initialise_node($;@) {
254 my ($noderef, @others) = @_; 265 my ($noderef, @others) = @_;
255 266
256 if ($noderef =~ /^slave\/(.*)$/) { 267 if ($noderef =~ /^slave\/(.*)$/) {
268 $SLAVE = AE::cv;
257 my $name = $1; 269 my $name = $1;
258 $name = $UNIQ unless length $name; 270 $name = $UNIQ unless length $name;
259 $noderef = AE::cv; 271 $noderef = AE::cv;
260 $noderef->send ("slave/$name"); 272 $noderef->send ("slave/$name");
273
274 @others
275 or Carp::croak "seed nodes must be specified for slave nodes";
276
261 } else { 277 } else {
278 $PUBLIC = 1;
262 $noderef = resolve_node $noderef; 279 $noderef = resolve_node $noderef;
263 } 280 }
264 281
265 @others = map $_->recv, map +(resolve_node $_), @others; 282 @others = map $_->recv, map +(resolve_node $_), @others;
266 283
280 $node->{trial}{accept} = $tp; 297 $node->{trial}{accept} = $tp;
281 }, 298 },
282 ; 299 ;
283 } 300 }
284 301
285 $PUBLIC = 1; 302 (add_node $_)->connect for @others;
303
304 if ($SLAVE) {
305 $SLAVE->recv;
306 $SLAVE = 1;
307 }
286} 308}
287 309
288############################################################################# 310#############################################################################
289# self node code 311# self node code
290 312
291our %node_req = ( 313our %node_req = (
314 # internal services
315
292 # monitoring 316 # monitoring
293 mon0 => sub { # disable monitoring 317 mon0 => sub { # disable monitoring
294 my $portid = shift; 318 my $portid = shift;
295 my $node = $SRCNODE; 319 my $node = $SRCNODE;
296 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid}); 320 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
306 my $cbs = delete $SRCNODE->{lmon}{+shift} 330 my $cbs = delete $SRCNODE->{lmon}{+shift}
307 or return; 331 or return;
308 332
309 $_->(@_) for @$cbs; 333 $_->(@_) for @$cbs;
310 }, 334 },
335 # node changed its name (for slave nodes)
336 iam => sub {
337 $SRCNODE->{noderef} = $_[0];
338 $NODE{$_[0]} = $SRCNODE;
339 },
340
341 # public services
311 342
312 # well-known-port lookup 343 # well-known-port lookup
313 lookup => sub { 344 lookup => sub {
314 my $name = shift; 345 my $name = shift;
315 my $port = $REG{$name}; 346 my $port = $REG{$name};

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines