ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/AnyEvent-MP/MP.pm
(Generate patch)

Comparing cvsroot/AnyEvent-MP/MP.pm (file contents):
Revision 1.4 by root, Sat Aug 1 07:36:30 2009 UTC vs.
Revision 1.10 by root, Sun Aug 2 18:05:43 2009 UTC

27This module (-family) implements a simple message passing framework. 27This module (-family) implements a simple message passing framework.
28 28
29Despite its simplicity, you can securely message other processes running 29Despite its simplicity, you can securely message other processes running
30on the same or other hosts. 30on the same or other hosts.
31 31
32At the moment, this module family is severly brokena nd underdocumented,
33so do not use. This was uploaded mainly to resreve the CPAN namespace -
34stay tuned!
35
32=head1 CONCEPTS 36=head1 CONCEPTS
33 37
34=over 4 38=over 4
35 39
36=item port 40=item port
52 56
53Initially, nodes are either private (single-process only) or hidden 57Initially, nodes are either private (single-process only) or hidden
54(connected to a master node only). Only when they epxlicitly "become 58(connected to a master node only). Only when they epxlicitly "become
55public" can you send them messages from unrelated other nodes. 59public" can you send them messages from unrelated other nodes.
56 60
57=item noderef - C<host:port,host:port...>, C<id@noderef, C<id> 61=item noderef - C<host:port,host:port...>, C<id@noderef>, C<id>
58 62
59A noderef is a string that either uniquely identifies a given node (for 63A noderef is a string that either uniquely identifies a given node (for
60private and hidden nodes), or contains a recipe on how to reach a given 64private and hidden nodes), or contains a recipe on how to reach a given
61node (for public nodes). 65node (for public nodes).
62 66
68 72
69=cut 73=cut
70 74
71package AnyEvent::MP; 75package AnyEvent::MP;
72 76
73use AnyEvent::MP::Util ();
74use AnyEvent::MP::Node; 77use AnyEvent::MP::Base;
75use AnyEvent::MP::Transport;
76 78
77use utf8;
78use common::sense; 79use common::sense;
79 80
80use Carp (); 81use Carp ();
81 82
82use AE (); 83use AE ();
83 84
84use base "Exporter"; 85use base "Exporter";
85 86
86our $VERSION = '0.0'; 87our $VERSION = '0.02';
87our @EXPORT = qw(NODE $NODE $PORT snd rcv _any_); 88our @EXPORT = qw(
88 89 NODE $NODE $PORT snd rcv _any_
89our $DEFAULT_SECRET; 90 create_port create_port_on
90our $DEFAULT_PORT = "4040"; 91 become_slave become_public
91 92);
92our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
93our $CONNECT_TIMEOUT = 30; # includes handshake
94
95sub default_secret {
96 unless (defined $DEFAULT_SECRET) {
97 if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
98 sysread $fh, $DEFAULT_SECRET, -s $fh;
99 } else {
100 $DEFAULT_SECRET = AnyEvent::MP::Util::nonce 32;
101 }
102 }
103
104 $DEFAULT_SECRET
105}
106 93
107=item NODE / $NODE 94=item NODE / $NODE
108 95
109The C<NODE ()> function and the C<$NODE> variable contain the noderef of 96The C<NODE ()> function and the C<$NODE> variable contain the noderef of
110the local node. The value is initialised by a call to C<become_public> or 97the local node. The value is initialised by a call to C<become_public> or
111C<become_slave>, after which all local port identifiers become invalid. 98C<become_slave>, after which all local port identifiers become invalid.
112 99
113=cut
114
115our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie
116our $PUBLIC = 0;
117our $NODE;
118our $PORT;
119
120our %NODE; # node id to transport mapping, or "undef", for local node
121our %PORT; # local ports
122our %LISTENER; # local transports
123
124sub NODE() { $NODE }
125
126{
127 use POSIX ();
128 my $nodename = (POSIX::uname)[1];
129 $NODE = "$$\@$nodename";
130}
131
132sub _ANY_() { 1 }
133sub _any_() { \&_ANY_ }
134
135sub add_node {
136 my ($noderef) = @_;
137
138 return $NODE{$noderef}
139 if exists $NODE{$noderef};
140
141 for (split /,/, $noderef) {
142 return $NODE{$noderef} = $NODE{$_}
143 if exists $NODE{$_};
144 }
145
146 # for indirect sends, use a different class
147 my $node = new AnyEvent::MP::Node::Direct $noderef;
148
149 $NODE{$_} = $node
150 for $noderef, split /,/, $noderef;
151
152 $node
153}
154
155=item snd $portid, type => @data 100=item snd $portid, type => @data
156 101
157=item snd $portid, @msg 102=item snd $portid, @msg
158 103
159Send the given message to the given port ID, which can identify either a 104Send the given message to the given port ID, which can identify either
160local or a remote port. 105a local or a remote port, and can be either a string or soemthignt hat
106stringifies a sa port ID (such as a port object :).
161 107
162While the message can be about anything, it is highly recommended to use 108While the message can be about anything, it is highly recommended to use a
163a constant string as first element. 109string as first element (a portid, or some word that indicates a request
110type etc.).
164 111
165The message data effectively becomes read-only after a call to this 112The message data effectively becomes read-only after a call to this
166function: modifying any argument is not allowed and can cause many 113function: modifying any argument is not allowed and can cause many
167problems. 114problems.
168 115
170JSON is used, then only strings, numbers and arrays and hashes consisting 117JSON is used, then only strings, numbers and arrays and hashes consisting
171of those are allowed (no objects). When Storable is used, then anything 118of those are allowed (no objects). When Storable is used, then anything
172that Storable can serialise and deserialise is allowed, and for the local 119that Storable can serialise and deserialise is allowed, and for the local
173node, anything can be passed. 120node, anything can be passed.
174 121
175=cut 122=item $local_port = create_port
176 123
177sub snd(@) { 124Create a new local port object. See the next section for allowed methods.
178 my ($noderef, $port) = split /#/, shift, 2;
179 125
180 add_node $noderef 126=cut
181 unless exists $NODE{$noderef};
182 127
183 $NODE{$noderef}->send (["$port", [@_]]); 128sub create_port {
184} 129 my $id = "$AnyEvent::MP::Base::UNIQ." . ++$AnyEvent::MP::Base::ID;
185 130
131 my $self = bless {
132 id => "$NODE#$id",
133 names => [$id],
134 }, "AnyEvent::MP::Port";
135
136 $AnyEvent::MP::Base::PORT{$id} = sub {
137 unshift @_, $self;
138
139 for (@{ $self->{rc0}{$_[1]} }) {
140 $_ && &{$_->[0]}
141 && undef $_;
142 }
143
144 for (@{ $self->{rcv}{$_[1]} }) {
145 $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
146 && &{$_->[0]}
147 && undef $_;
148 }
149
150 for (@{ $self->{any} }) {
151 $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
152 && &{$_->[0]}
153 && undef $_;
154 }
155 };
156
157 $self
158}
159
160=item $portid = create_miniport { }
161
162Creates a "mini port", that is, a port without much #TODO
163
164=cut
165
166sub create_miniport(&) {
167 my $cb = shift;
168 my $id = "$AnyEvent::MP::Base::UNIQ." . ++$AnyEvent::MP::Base::ID;
169
170 $AnyEvent::MP::Base::PORT{$id} = sub {
171 &$cb
172 and delete $AnyEvent::MP::Base::PORT{$id};
173 };
174
175 "$NODE#$id"
176}
177
178package AnyEvent::MP::Port;
179
180=back
181
182=head1 METHODS FOR PORT OBJECTS
183
184=over 4
185
186=item "$port"
187
188A port object stringifies to its port ID, so can be used directly for
189C<snd> operations.
190
191=cut
192
193use overload
194 '""' => sub { $_[0]{id} },
195 fallback => 1;
196
186=item rcv $portid, type => $callback->(@msg) 197=item $port->rcv (type => $callback->($port, @msg))
187 198
188=item rcv $portid, $smartmatch => $callback->(@msg) 199=item $port->rcv ($smartmatch => $callback->($port, @msg))
189 200
190=item rcv $portid, [$smartmatch...] => $callback->(@msg) 201=item $port->rcv ([$smartmatch...] => $callback->($port, @msg))
191 202
192Register a callback on the port identified by C<$portid>, which I<must> be 203Register a callback on the given port.
193a local port.
194 204
195The callback has to return a true value when its work is done, after 205The callback has to return a true value when its work is done, after
196which is will be removed, or a false value in which case it will stay 206which is will be removed, or a false value in which case it will stay
197registered. 207registered.
198 208
208also the most efficient match (by far). 218also the most efficient match (by far).
209 219
210=cut 220=cut
211 221
212sub rcv($@) { 222sub rcv($@) {
213 my ($port, $match, $cb) = @_; 223 my ($self, $match, $cb) = @_;
214
215 my $port = $PORT{$port}
216 or do {
217 my ($noderef, $lport) = split /#/, $port;
218 "AnyEvent::MP::Node::Self" eq ref $NODE{$noderef}
219 or Carp::croak "$port: can only rcv on local ports";
220
221 $PORT{$lport}
222 or Carp::croak "$port: port does not exist";
223
224 $PORT{$port} = $PORT{$lport} # also return
225 };
226 224
227 if (!ref $match) { 225 if (!ref $match) {
228 push @{ $port->{rc0}{$match} }, [$cb]; 226 push @{ $self->{rc0}{$match} }, [$cb];
229 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) { 227 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
230 my ($type, @match) = @$match; 228 my ($type, @match) = @$match;
231 @match 229 @match
232 ? push @{ $port->{rcv}{$match->[0]} }, [$cb, \@match] 230 ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
233 : push @{ $port->{rc0}{$match->[0]} }, [$cb]; 231 : push @{ $self->{rc0}{$match->[0]} }, [$cb];
234 } else { 232 } else {
235 push @{ $port->{any} }, [$cb, $match]; 233 push @{ $self->{any} }, [$cb, $match];
236 } 234 }
237} 235}
238 236
239sub _inject { 237=item $port->register ($name)
240 my ($port, $msg) = @{+shift};
241 238
242 $port = $PORT{$port} 239Registers the given port under the well known name C<$name>. If the name
243 or return; 240already exists it is replaced.
244 241
245 @_ = @$msg; 242A port can only be registered under one well known name.
246 243
247 for (@{ $port->{rc0}{$msg->[0]} }) { 244=cut
248 $_ && &{$_->[0]}
249 && undef $_;
250 }
251 245
252 for (@{ $port->{rcv}{$msg->[0]} }) { 246sub register {
253 $_ && [@_[1..$#{$_->[1]}]] ~~ $_->[1] 247 my ($self, $name) = @_;
254 && &{$_->[0]}
255 && undef $_;
256 }
257 248
258 for (@{ $port->{any} }) { 249 $self->{wkname} = $name;
259 $_ && [@_[0..$#{$_->[1]}]] ~~ $_->[1] 250 $AnyEvent::MP::Base::WKP{$name} = "$self";
260 && &{$_->[0]}
261 && undef $_;
262 }
263} 251}
264 252
265sub normalise_noderef($) { 253=item $port->destroy
254
255Explicitly destroy/remove/nuke/vaporise the port.
256
257Ports are normally kept alive by there mere existance alone, and need to
258be destroyed explicitly.
259
260=cut
261
262sub destroy {
266 my ($noderef) = @_; 263 my ($self) = @_;
267 264
268 my $cv = AE::cv; 265 delete $AnyEvent::MP::Base::WKP{ $self->{wkname} };
269 my @res;
270 266
271 $cv->begin (sub { 267 delete $AnyEvent::MP::Base::PORT{$_}
272 my %seen; 268 for @{ $self->{names} };
273 my @refs;
274 for (sort { $a->[0] <=> $b->[0] } @res) {
275 push @refs, $_->[1] unless $seen{$_->[1]}++
276 }
277 shift->send (join ",", @refs);
278 });
279
280 $noderef = $DEFAULT_PORT unless length $noderef;
281
282 my $idx;
283 for my $t (split /,/, $noderef) {
284 my $pri = ++$idx;
285
286 #TODO: this should be outside normalise_noderef and in become_public
287 if ($t =~ /^\d*$/) {
288 my $nodename = (POSIX::uname)[1];
289
290 $cv->begin;
291 AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
292 for (@_) {
293 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
294 push @res, [
295 $pri += 1e-5,
296 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
297 ];
298 }
299 $cv->end;
300 };
301
302# my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
303#
304# for (@ipv4) {
305# push @res, [
306# $pri,
307# AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
308# ];
309# }
310 } else {
311 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
312 or Carp::croak "$t: unparsable transport descriptor";
313
314 $cv->begin;
315 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
316 for (@_) {
317 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
318 push @res, [
319 $pri += 1e-5,
320 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
321 ];
322 }
323 $cv->end;
324 }
325 }
326 }
327
328 $cv->end;
329
330 $cv
331} 269}
332 270
333sub become_public { 271=back
334 return if $PUBLIC;
335 272
336 my $noderef = join ",", ref $_[0] ? @{+shift} : shift; 273=head1 FUNCTIONS FOR NODES
337 my @args = @_;
338 274
339 $NODE = (normalise_noderef $noderef)->recv; 275=over 4
340 276
341 for my $t (split /,/, $NODE) { 277=item mon $noderef, $callback->($noderef, $status, $)
342 $NODE{$t} = $NODE{""};
343 278
344 my ($host, $port) = AnyEvent::Socket::parse_hostport $t; 279Monitors the given noderef.
345 280
346 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port, 281=item become_public endpoint...
347 @args,
348 on_error => sub {
349 die "on_error<@_>\n";#d#
350 },
351 on_connect => sub {
352 my ($tp) = @_;
353 282
354 $NODE{$tp->{remote_id}} = $_[0]; 283Tells the node to become a public node, i.e. reachable from other nodes.
355 },
356 sub {
357 my ($tp) = @_;
358 284
359 $NODE{"$tp->{peerhost}:$tp->{peerport}"} = $tp; 285If no arguments are given, or the first argument is C<undef>, then
360 }, 286AnyEvent::MP tries to bind on port C<4040> on all IP addresses that the
361 ; 287local nodename resolves to.
362 }
363 288
364 $PUBLIC = 1; 289Otherwise the first argument must be an array-reference with transport
365} 290endpoints ("ip:port", "hostname:port") or port numbers (in which case the
291local nodename is used as hostname). The endpoints are all resolved and
292will become the node reference.
293
294=cut
366 295
367=back 296=back
368 297
369=head1 NODE MESSAGES 298=head1 NODE MESSAGES
370 299
371Nodes understand the following messages sent to them: 300Nodes understand the following messages sent to them. Many of them take
301arguments called C<@reply>, which will simply be used to compose a reply
302message - C<$reply[0]> is the port to reply to, C<$reply[1]> the type and
303the remaining arguments are simply the message data.
372 304
373=over 4 305=over 4
374 306
375=cut 307=cut
376 308
377############################################################################# 309=item wkp => $name, @reply
378# self node code
379 310
380sub _new_port($) { 311Replies with the port ID of the specified well-known port, or C<undef>.
381 my ($name) = @_;
382 312
383 my ($noderef, $portname) = split /#/, $name; 313=item devnull => ...
384 314
385 $PORT{$name} = 315Generic data sink/CPU heat conversion.
386 $PORT{$portname} = {
387 names => [$name, $portname],
388 };
389}
390
391$NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE;
392_new_port "";
393 316
394=item relay => $port, @msg 317=item relay => $port, @msg
395 318
396Simply forwards the message to the given port. 319Simply forwards the message to the given port.
397 320
398=cut
399
400rcv "", relay => \&snd;
401
402=item eval => $string[ @reply] 321=item eval => $string[ @reply]
403 322
404Evaluates the given string. If C<@reply> is given, then a message of the 323Evaluates the given string. If C<@reply> is given, then a message of the
405form C<@reply, $@, @evalres> is sent (C<$reply[0]> is the port to reply to). 324form C<@reply, $@, @evalres> is sent.
406 325
407=cut 326Example: crash another node.
408 327
409rcv "", eval => sub { 328 snd $othernode, eval => "exit";
410 my (undef, $string, @reply) = @_;
411 my @res = eval $string;
412 snd @reply, "$@", @res if @reply;
413};
414 329
415=item time => @reply 330=item time => @reply
416 331
417Replies the the current node time to C<@reply>. 332Replies the the current node time to C<@reply>.
418 333
419=cut 334Example: tell the current node to send the current time to C<$myport> in a
335C<timereply> message.
420 336
421rcv "", time => sub { shift; snd @_, AE::time }; 337 snd $NODE, time => $myport, timereply => 1, 2;
338 # => snd $myport, timereply => 1, 2, <time>
422 339
423=back 340=back
424 341
425=head1 SEE ALSO 342=head1 SEE ALSO
426 343

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines