ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
(Generate patch)

Comparing AnyEvent-MP/MP.pm (file contents):
Revision 1.7 by root, Sat Aug 1 15:04:30 2009 UTC vs.
Revision 1.10 by root, Sun Aug 2 18:05:43 2009 UTC

72 72
73=cut 73=cut
74 74
75package AnyEvent::MP; 75package AnyEvent::MP;
76 76
77use AnyEvent::MP::Util ();
78use AnyEvent::MP::Node; 77use AnyEvent::MP::Base;
79use AnyEvent::MP::Transport;
80 78
81use utf8;
82use common::sense; 79use common::sense;
83 80
84use Carp (); 81use Carp ();
85 82
86use AE (); 83use AE ();
87 84
88use base "Exporter"; 85use base "Exporter";
89 86
90our $VERSION = '0.01'; 87our $VERSION = '0.02';
91our @EXPORT = qw(NODE $NODE $PORT snd rcv _any_); 88our @EXPORT = qw(
92 89 NODE $NODE $PORT snd rcv _any_
93our $DEFAULT_SECRET; 90 create_port create_port_on
94our $DEFAULT_PORT = "4040"; 91 become_slave become_public
95 92);
96our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
97our $CONNECT_TIMEOUT = 30; # includes handshake
98
99sub default_secret {
100 unless (defined $DEFAULT_SECRET) {
101 if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
102 sysread $fh, $DEFAULT_SECRET, -s $fh;
103 } else {
104 $DEFAULT_SECRET = AnyEvent::MP::Util::nonce 32;
105 }
106 }
107
108 $DEFAULT_SECRET
109}
110 93
111=item NODE / $NODE 94=item NODE / $NODE
112 95
113The C<NODE ()> function and the C<$NODE> variable contain the noderef of 96The C<NODE ()> function and the C<$NODE> variable contain the noderef of
114the local node. The value is initialised by a call to C<become_public> or 97the local node. The value is initialised by a call to C<become_public> or
115C<become_slave>, after which all local port identifiers become invalid. 98C<become_slave>, after which all local port identifiers become invalid.
116 99
117=cut
118
119our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie
120our $ID = "a0";
121our $PUBLIC = 0;
122our $NODE;
123our $PORT;
124
125our %NODE; # node id to transport mapping, or "undef", for local node
126our %PORT; # local ports
127our %LISTENER; # local transports
128
129sub NODE() { $NODE }
130
131{
132 use POSIX ();
133 my $nodename = (POSIX::uname)[1];
134 $NODE = "$$\@$nodename";
135}
136
137sub _ANY_() { 1 }
138sub _any_() { \&_ANY_ }
139
140sub add_node {
141 my ($noderef) = @_;
142
143 return $NODE{$noderef}
144 if exists $NODE{$noderef};
145
146 for (split /,/, $noderef) {
147 return $NODE{$noderef} = $NODE{$_}
148 if exists $NODE{$_};
149 }
150
151 # for indirect sends, use a different class
152 my $node = new AnyEvent::MP::Node::Direct $noderef;
153
154 $NODE{$_} = $node
155 for $noderef, split /,/, $noderef;
156
157 $node
158}
159
160=item snd $portid, type => @data 100=item snd $portid, type => @data
161 101
162=item snd $portid, @msg 102=item snd $portid, @msg
163 103
164Send the given message to the given port ID, which can identify either a 104Send the given message to the given port ID, which can identify either
165local or a remote port. 105a local or a remote port, and can be either a string or soemthignt hat
106stringifies a sa port ID (such as a port object :).
166 107
167While the message can be about anything, it is highly recommended to use 108While the message can be about anything, it is highly recommended to use a
168a constant string as first element. 109string as first element (a portid, or some word that indicates a request
110type etc.).
169 111
170The message data effectively becomes read-only after a call to this 112The message data effectively becomes read-only after a call to this
171function: modifying any argument is not allowed and can cause many 113function: modifying any argument is not allowed and can cause many
172problems. 114problems.
173 115
175JSON is used, then only strings, numbers and arrays and hashes consisting 117JSON is used, then only strings, numbers and arrays and hashes consisting
176of those are allowed (no objects). When Storable is used, then anything 118of those are allowed (no objects). When Storable is used, then anything
177that Storable can serialise and deserialise is allowed, and for the local 119that Storable can serialise and deserialise is allowed, and for the local
178node, anything can be passed. 120node, anything can be passed.
179 121
180=cut 122=item $local_port = create_port
181 123
182sub snd(@) { 124Create a new local port object. See the next section for allowed methods.
183 my ($noderef, $port) = split /#/, shift, 2;
184 125
185 add_node $noderef 126=cut
186 unless exists $NODE{$noderef};
187 127
188 $NODE{$noderef}->send (["$port", [@_]]); 128sub create_port {
189} 129 my $id = "$AnyEvent::MP::Base::UNIQ." . ++$AnyEvent::MP::Base::ID;
190 130
131 my $self = bless {
132 id => "$NODE#$id",
133 names => [$id],
134 }, "AnyEvent::MP::Port";
135
136 $AnyEvent::MP::Base::PORT{$id} = sub {
137 unshift @_, $self;
138
139 for (@{ $self->{rc0}{$_[1]} }) {
140 $_ && &{$_->[0]}
141 && undef $_;
142 }
143
144 for (@{ $self->{rcv}{$_[1]} }) {
145 $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
146 && &{$_->[0]}
147 && undef $_;
148 }
149
150 for (@{ $self->{any} }) {
151 $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
152 && &{$_->[0]}
153 && undef $_;
154 }
155 };
156
157 $self
158}
159
160=item $portid = create_miniport { }
161
162Creates a "mini port", that is, a port without much #TODO
163
164=cut
165
166sub create_miniport(&) {
167 my $cb = shift;
168 my $id = "$AnyEvent::MP::Base::UNIQ." . ++$AnyEvent::MP::Base::ID;
169
170 $AnyEvent::MP::Base::PORT{$id} = sub {
171 &$cb
172 and delete $AnyEvent::MP::Base::PORT{$id};
173 };
174
175 "$NODE#$id"
176}
177
178package AnyEvent::MP::Port;
179
180=back
181
182=head1 METHODS FOR PORT OBJECTS
183
184=over 4
185
186=item "$port"
187
188A port object stringifies to its port ID, so can be used directly for
189C<snd> operations.
190
191=cut
192
193use overload
194 '""' => sub { $_[0]{id} },
195 fallback => 1;
196
191=item rcv $portid, type => $callback->(@msg) 197=item $port->rcv (type => $callback->($port, @msg))
192 198
193=item rcv $portid, $smartmatch => $callback->(@msg) 199=item $port->rcv ($smartmatch => $callback->($port, @msg))
194 200
195=item rcv $portid, [$smartmatch...] => $callback->(@msg) 201=item $port->rcv ([$smartmatch...] => $callback->($port, @msg))
196 202
197Register a callback on the port identified by C<$portid>, which I<must> be 203Register a callback on the given port.
198a local port.
199 204
200The callback has to return a true value when its work is done, after 205The callback has to return a true value when its work is done, after
201which is will be removed, or a false value in which case it will stay 206which is will be removed, or a false value in which case it will stay
202registered. 207registered.
203 208
213also the most efficient match (by far). 218also the most efficient match (by far).
214 219
215=cut 220=cut
216 221
217sub rcv($@) { 222sub rcv($@) {
218 my ($port, $match, $cb) = @_; 223 my ($self, $match, $cb) = @_;
219
220 my $port = $PORT{$port}
221 or do {
222 my ($noderef, $lport) = split /#/, $port;
223 "AnyEvent::MP::Node::Self" eq ref $NODE{$noderef}
224 or Carp::croak "$port: can only rcv on local ports";
225
226 $PORT{$lport}
227 or Carp::croak "$port: port does not exist";
228
229 $PORT{$port} = $PORT{$lport} # also return
230 };
231 224
232 if (!ref $match) { 225 if (!ref $match) {
233 push @{ $port->{rc0}{$match} }, [$cb]; 226 push @{ $self->{rc0}{$match} }, [$cb];
234 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) { 227 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
235 my ($type, @match) = @$match; 228 my ($type, @match) = @$match;
236 @match 229 @match
237 ? push @{ $port->{rcv}{$match->[0]} }, [$cb, \@match] 230 ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
238 : push @{ $port->{rc0}{$match->[0]} }, [$cb]; 231 : push @{ $self->{rc0}{$match->[0]} }, [$cb];
239 } else { 232 } else {
240 push @{ $port->{any} }, [$cb, $match]; 233 push @{ $self->{any} }, [$cb, $match];
241 } 234 }
242} 235}
243 236
244sub _inject { 237=item $port->register ($name)
245 my ($port, $msg) = @{+shift};
246 238
247 $port = $PORT{$port} 239Registers the given port under the well known name C<$name>. If the name
248 or return; 240already exists it is replaced.
249 241
250 @_ = @$msg; 242A port can only be registered under one well known name.
251 243
252 for (@{ $port->{rc0}{$msg->[0]} }) { 244=cut
253 $_ && &{$_->[0]}
254 && undef $_;
255 }
256 245
257 for (@{ $port->{rcv}{$msg->[0]} }) { 246sub register {
258 $_ && [@_[1..$#{$_->[1]}]] ~~ $_->[1] 247 my ($self, $name) = @_;
259 && &{$_->[0]}
260 && undef $_;
261 }
262 248
263 for (@{ $port->{any} }) { 249 $self->{wkname} = $name;
264 $_ && [@_[0..$#{$_->[1]}]] ~~ $_->[1] 250 $AnyEvent::MP::Base::WKP{$name} = "$self";
265 && &{$_->[0]}
266 && undef $_;
267 }
268} 251}
269 252
270sub normalise_noderef($) { 253=item $port->destroy
254
255Explicitly destroy/remove/nuke/vaporise the port.
256
257Ports are normally kept alive by there mere existance alone, and need to
258be destroyed explicitly.
259
260=cut
261
262sub destroy {
271 my ($noderef) = @_; 263 my ($self) = @_;
272 264
273 my $cv = AE::cv; 265 delete $AnyEvent::MP::Base::WKP{ $self->{wkname} };
274 my @res;
275 266
276 $cv->begin (sub { 267 delete $AnyEvent::MP::Base::PORT{$_}
277 my %seen; 268 for @{ $self->{names} };
278 my @refs;
279 for (sort { $a->[0] <=> $b->[0] } @res) {
280 push @refs, $_->[1] unless $seen{$_->[1]}++
281 }
282 shift->send (join ",", @refs);
283 });
284
285 $noderef = $DEFAULT_PORT unless length $noderef;
286
287 my $idx;
288 for my $t (split /,/, $noderef) {
289 my $pri = ++$idx;
290
291 #TODO: this should be outside normalise_noderef and in become_public
292 if ($t =~ /^\d*$/) {
293 my $nodename = (POSIX::uname)[1];
294
295 $cv->begin;
296 AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
297 for (@_) {
298 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
299 push @res, [
300 $pri += 1e-5,
301 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
302 ];
303 }
304 $cv->end;
305 };
306
307# my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
308#
309# for (@ipv4) {
310# push @res, [
311# $pri,
312# AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
313# ];
314# }
315 } else {
316 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
317 or Carp::croak "$t: unparsable transport descriptor";
318
319 $cv->begin;
320 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
321 for (@_) {
322 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
323 push @res, [
324 $pri += 1e-5,
325 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
326 ];
327 }
328 $cv->end;
329 }
330 }
331 }
332
333 $cv->end;
334
335 $cv
336} 269}
337 270
338sub become_public { 271=back
339 return if $PUBLIC;
340 272
341 my $noderef = join ",", ref $_[0] ? @{+shift} : shift; 273=head1 FUNCTIONS FOR NODES
342 my @args = @_;
343 274
344 $NODE = (normalise_noderef $noderef)->recv; 275=over 4
345 276
346 for my $t (split /,/, $NODE) { 277=item mon $noderef, $callback->($noderef, $status, $)
347 $NODE{$t} = $NODE{""};
348 278
349 my ($host, $port) = AnyEvent::Socket::parse_hostport $t; 279Monitors the given noderef.
350 280
351 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port, 281=item become_public endpoint...
352 @args,
353 on_error => sub {
354 die "on_error<@_>\n";#d#
355 },
356 on_connect => sub {
357 my ($tp) = @_;
358 282
359 $NODE{$tp->{remote_id}} = $_[0]; 283Tells the node to become a public node, i.e. reachable from other nodes.
360 },
361 sub {
362 my ($tp) = @_;
363 284
364 $NODE{"$tp->{peerhost}:$tp->{peerport}"} = $tp; 285If no arguments are given, or the first argument is C<undef>, then
365 }, 286AnyEvent::MP tries to bind on port C<4040> on all IP addresses that the
366 ; 287local nodename resolves to.
367 }
368 288
369 $PUBLIC = 1; 289Otherwise the first argument must be an array-reference with transport
370} 290endpoints ("ip:port", "hostname:port") or port numbers (in which case the
291local nodename is used as hostname). The endpoints are all resolved and
292will become the node reference.
293
294=cut
371 295
372=back 296=back
373 297
374=head1 NODE MESSAGES 298=head1 NODE MESSAGES
375 299
380 304
381=over 4 305=over 4
382 306
383=cut 307=cut
384 308
385############################################################################# 309=item wkp => $name, @reply
386# self node code
387 310
388sub _new_port($) { 311Replies with the port ID of the specified well-known port, or C<undef>.
389 my ($name) = @_;
390
391 my ($noderef, $portname) = split /#/, $name;
392
393 $PORT{$name} =
394 $PORT{$portname} = {
395 names => [$name, $portname],
396 };
397}
398
399$NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE;
400_new_port "";
401 312
402=item devnull => ... 313=item devnull => ...
403 314
404Generic data sink/CPU heat conversion. 315Generic data sink/CPU heat conversion.
405 316
406=cut
407
408rcv "", devnull => sub { () };
409
410=item relay => $port, @msg 317=item relay => $port, @msg
411 318
412Simply forwards the message to the given port. 319Simply forwards the message to the given port.
413
414=cut
415
416rcv "", relay => sub { \&snd; () };
417 320
418=item eval => $string[ @reply] 321=item eval => $string[ @reply]
419 322
420Evaluates the given string. If C<@reply> is given, then a message of the 323Evaluates the given string. If C<@reply> is given, then a message of the
421form C<@reply, $@, @evalres> is sent. 324form C<@reply, $@, @evalres> is sent.
422 325
423Example: crash another node. 326Example: crash another node.
424 327
425 snd $othernode, eval => "exit"; 328 snd $othernode, eval => "exit";
426 329
427=cut
428
429rcv "", eval => sub {
430 my (undef, $string, @reply) = @_;
431 my @res = eval $string;
432 snd @reply, "$@", @res if @reply;
433 ()
434};
435
436=item time => @reply 330=item time => @reply
437 331
438Replies the the current node time to C<@reply>. 332Replies the the current node time to C<@reply>.
439 333
440Example: tell the current node to send the current time to C<$myport> in a 334Example: tell the current node to send the current time to C<$myport> in a
441C<timereply> message. 335C<timereply> message.
442 336
443 snd $NODE, time => $myport, timereply => 1, 2; 337 snd $NODE, time => $myport, timereply => 1, 2;
444 # => snd $myport, timereply => 1, 2, <time> 338 # => snd $myport, timereply => 1, 2, <time>
445 339
446=cut
447
448rcv "", time => sub { shift; snd @_, AE::time; () };
449
450=back 340=back
451 341
452=head1 SEE ALSO 342=head1 SEE ALSO
453 343
454L<AnyEvent>. 344L<AnyEvent>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines