ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/AnyEvent-MP/MP.pm
(Generate patch)

Comparing cvsroot/AnyEvent-MP/MP.pm (file contents):
Revision 1.3 by root, Sat Aug 1 07:11:45 2009 UTC vs.
Revision 1.19 by root, Mon Aug 3 21:37:19 2009 UTC

27This module (-family) implements a simple message passing framework. 27This module (-family) implements a simple message passing framework.
28 28
29Despite its simplicity, you can securely message other processes running 29Despite its simplicity, you can securely message other processes running
30on the same or other hosts. 30on the same or other hosts.
31 31
32At the moment, this module family is severly brokena nd underdocumented,
33so do not use. This was uploaded mainly to resreve the CPAN namespace -
34stay tuned!
35
32=head1 CONCEPTS 36=head1 CONCEPTS
33 37
34=over 4 38=over 4
35 39
36=item port 40=item port
52 56
53Initially, nodes are either private (single-process only) or hidden 57Initially, nodes are either private (single-process only) or hidden
54(connected to a master node only). Only when they epxlicitly "become 58(connected to a master node only). Only when they epxlicitly "become
55public" can you send them messages from unrelated other nodes. 59public" can you send them messages from unrelated other nodes.
56 60
57=item noderef - C<host:port,host:port...>, C<id@noderef, C<id> 61=item noderef - C<host:port,host:port...>, C<id@noderef>, C<id>
58 62
59A noderef is a string that either uniquely identifies a given node (for 63A noderef is a string that either uniquely identifies a given node (for
60private and hidden nodes), or contains a recipe on how to reach a given 64private and hidden nodes), or contains a recipe on how to reach a given
61node (for public nodes). 65node (for public nodes).
62 66
68 72
69=cut 73=cut
70 74
71package AnyEvent::MP; 75package AnyEvent::MP;
72 76
73use AnyEvent::MP::Util ();
74use AnyEvent::MP::Node; 77use AnyEvent::MP::Base;
75use AnyEvent::MP::Transport;
76 78
77use utf8;
78use common::sense; 79use common::sense;
79 80
80use Carp (); 81use Carp ();
81 82
82use AE (); 83use AE ();
83 84
84use base "Exporter"; 85use base "Exporter";
85 86
86our $VERSION = '0.0'; 87our $VERSION = '0.02';
87our @EXPORT = qw(NODE $NODE $PORT snd rcv _any_); 88our @EXPORT = qw(
88 89 NODE $NODE $PORT snd rcv mon del _any_
89our $DEFAULT_SECRET; 90 create_port create_port_on
90our $DEFAULT_PORT = "4040"; 91 create_miniport
91 92 become_slave become_public
92our $CONNECT_INTERVAL = 5; # new connect every 5s, at least 93);
93our $CONNECT_TIMEOUT = 30; # includes handshake
94
95sub default_secret {
96 unless (defined $DEFAULT_SECRET) {
97 if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
98 sysread $fh, $DEFAULT_SECRET, -s $fh;
99 } else {
100 $DEFAULT_SECRET = AnyEvent::MP::Util::nonce 32;
101 }
102 }
103
104 $DEFAULT_SECRET
105}
106 94
107=item NODE / $NODE 95=item NODE / $NODE
108 96
109The C<NODE ()> function and the C<$NODE> variable contain the noderef of 97The C<NODE ()> function and the C<$NODE> variable contain the noderef of
110the local node. The value is initialised by a call to C<become_public> or 98the local node. The value is initialised by a call to C<become_public> or
111C<become_slave>, after which all local port identifiers become invalid. 99C<become_slave>, after which all local port identifiers become invalid.
112 100
113=cut
114
115our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie
116our $PUBLIC = 0;
117our $NODE;
118our $PORT;
119
120our %NODE; # node id to transport mapping, or "undef", for local node
121our %PORT; # local ports
122our %LISTENER; # local transports
123
124sub NODE() { $NODE }
125
126{
127 use POSIX ();
128 my $nodename = (POSIX::uname)[1];
129 $NODE = "$$\@$nodename";
130}
131
132sub _ANY_() { 1 }
133sub _any_() { \&_ANY_ }
134
135sub add_node {
136 my ($noderef) = @_;
137
138 return $NODE{$noderef}
139 if exists $NODE{$noderef};
140
141 for (split /,/, $noderef) {
142 return $NODE{$noderef} = $NODE{$_}
143 if exists $NODE{$_};
144 }
145
146 # for indirect sends, use a different class
147 my $node = new AnyEvent::MP::Node::Direct $noderef;
148
149 $NODE{$_} = $node
150 for $noderef, split /,/, $noderef;
151
152 $node
153}
154
155=item snd $portid, type => @data 101=item snd $portid, type => @data
156 102
157=item snd $portid, @msg 103=item snd $portid, @msg
158 104
159Send the given message to the given port ID, which can identify either a 105Send the given message to the given port ID, which can identify either
160local or a remote port. 106a local or a remote port, and can be either a string or soemthignt hat
107stringifies a sa port ID (such as a port object :).
161 108
162While the message can be about anything, it is highly recommended to use 109While the message can be about anything, it is highly recommended to use a
163a constant string as first element. 110string as first element (a portid, or some word that indicates a request
111type etc.).
164 112
165The message data effectively becomes read-only after a call to this 113The message data effectively becomes read-only after a call to this
166function: modifying any argument is not allowed and can cause many 114function: modifying any argument is not allowed and can cause many
167problems. 115problems.
168 116
170JSON is used, then only strings, numbers and arrays and hashes consisting 118JSON is used, then only strings, numbers and arrays and hashes consisting
171of those are allowed (no objects). When Storable is used, then anything 119of those are allowed (no objects). When Storable is used, then anything
172that Storable can serialise and deserialise is allowed, and for the local 120that Storable can serialise and deserialise is allowed, and for the local
173node, anything can be passed. 121node, anything can be passed.
174 122
175=cut 123=item mon $portid, sub { }
176 124
177sub snd($@) { 125#TODO monitor the given port
126
127=cut
128
129sub mon {
178 my ($noderef, $port) = split /#/, shift, 2; 130 my ($noderef, $port) = split /#/, shift, 2;
179 131
180 add_node $noderef 132 my $node = AnyEvent::MP::Base::add_node $noderef;
181 unless exists $NODE{$noderef};
182 133
183 $NODE{$noderef}->send (["$port", [@_]]); 134 my $cb = shift;
184}
185 135
136 $node->monitor ($port, $cb);
137
138 defined wantarray
139 and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) }
140}
141
142=item $local_port = create_port
143
144Create a new local port object. See the next section for allowed methods.
145
146=cut
147
148sub create_port {
149 my $id = "$AnyEvent::MP::Base::UNIQ." . $AnyEvent::MP::Base::ID++;
150
151 my $self = bless {
152 id => "$NODE#$id",
153 names => [$id],
154 }, "AnyEvent::MP::Port";
155
156 $AnyEvent::MP::Base::PORT{$id} = sub {
157 unshift @_, $self;
158
159 for (@{ $self->{rc0}{$_[1]} }) {
160 $_ && &{$_->[0]}
161 && undef $_;
162 }
163
164 for (@{ $self->{rcv}{$_[1]} }) {
165 $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
166 && &{$_->[0]}
167 && undef $_;
168 }
169
170 for (@{ $self->{any} }) {
171 $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
172 && &{$_->[0]}
173 && undef $_;
174 }
175 };
176
177 $self
178}
179
180=item $portid = miniport { my @msg = @_; $finished }
181
182Creates a "mini port", that is, a very lightweight port without any
183pattern matching behind it, and returns its ID.
184
185The block will be called for every message received on the port. When the
186callback returns a true value its job is considered "done" and the port
187will be destroyed. Otherwise it will stay alive.
188
189The message will be passed as-is, no extra argument (i.e. no port id) will
190be passed to the callback.
191
192If you need the local port id in the callback, this works nicely:
193
194 my $port; $port = miniport {
195 snd $otherport, reply => $port;
196 };
197
198=cut
199
200sub miniport(&) {
201 my $cb = shift;
202 my $id = "$AnyEvent::MP::Base::UNIQ." . $AnyEvent::MP::Base::ID++;
203
204 $AnyEvent::MP::Base::PORT{$id} = sub {
205 &$cb
206 and del $id;
207 };
208
209 "$NODE#$id"
210}
211
212package AnyEvent::MP::Port;
213
214=back
215
216=head1 METHODS FOR PORT OBJECTS
217
218=over 4
219
220=item "$port"
221
222A port object stringifies to its port ID, so can be used directly for
223C<snd> operations.
224
225=cut
226
227use overload
228 '""' => sub { $_[0]{id} },
229 fallback => 1;
230
231sub TO_JSON { $_[0]{id} }
232
186=item rcv $portid, type => $callback->(@msg) 233=item $port->rcv (type => $callback->($port, @msg))
187 234
188=item rcv $portid, $smartmatch => $callback->(@msg) 235=item $port->rcv ($smartmatch => $callback->($port, @msg))
189 236
190=item rcv $portid, [$smartmatch...] => $callback->(@msg) 237=item $port->rcv ([$smartmatch...] => $callback->($port, @msg))
191 238
192Register a callback on the port identified by C<$portid>, which I<must> be 239Register a callback on the given port.
193a local port.
194 240
195The callback has to return a true value when its work is done, after 241The callback has to return a true value when its work is done, after
196which is will be removed, or a false value in which case it will stay 242which is will be removed, or a false value in which case it will stay
197registered. 243registered.
198 244
208also the most efficient match (by far). 254also the most efficient match (by far).
209 255
210=cut 256=cut
211 257
212sub rcv($@) { 258sub rcv($@) {
213 my ($port, $match, $cb) = @_; 259 my ($self, $match, $cb) = @_;
214
215 my $port = $PORT{$port}
216 or do {
217 my ($noderef, $lport) = split /#/, $port;
218 "AnyEvent::MP::Node::Self" eq ref $NODE{$noderef}
219 or Carp::croak "$port: can only rcv on local ports";
220
221 $PORT{$lport}
222 or Carp::croak "$port: port does not exist";
223
224 $PORT{$port} = $PORT{$lport} # also return
225 };
226 260
227 if (!ref $match) { 261 if (!ref $match) {
228 push @{ $port->{rc0}{$match} }, [$cb]; 262 push @{ $self->{rc0}{$match} }, [$cb];
229 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) { 263 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
230 my ($type, @match) = @$match; 264 my ($type, @match) = @$match;
231 @match 265 @match
232 ? push @{ $port->{rcv}{$match->[0]} }, [$cb, \@match] 266 ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
233 : push @{ $port->{rc0}{$match->[0]} }, [$cb]; 267 : push @{ $self->{rc0}{$match->[0]} }, [$cb];
234 } else { 268 } else {
235 push @{ $port->{any} }, [$cb, $match]; 269 push @{ $self->{any} }, [$cb, $match];
236 } 270 }
237} 271}
238 272
239sub _inject { 273=item $port->register ($name)
240 my ($port, $msg) = @{+shift};
241 274
242 $port = $PORT{$port} 275Registers the given port under the well known name C<$name>. If the name
243 or return; 276already exists it is replaced.
244 277
245 @_ = @$msg; 278A port can only be registered under one well known name.
246 279
247 for (@{ $port->{rc0}{$msg->[0]} }) { 280=cut
248 $_ && &{$_->[0]}
249 && undef $_;
250 }
251 281
252 for (@{ $port->{rcv}{$msg->[0]} }) { 282sub register {
253 $_ && [@_[1..$#{$_->[1]}]] ~~ $_->[1]
254 && &{$_->[0]}
255 && undef $_;
256 }
257
258 for (@{ $port->{any} }) {
259 $_ && [@_[0..$#{$_->[1]}]] ~~ $_->[1]
260 && &{$_->[0]}
261 && undef $_;
262 }
263}
264
265sub normalise_noderef($) {
266 my ($noderef) = @_;
267
268 my $cv = AE::cv;
269 my @res;
270
271 $cv->begin (sub {
272 my %seen;
273 my @refs;
274 for (sort { $a->[0] <=> $b->[0] } @res) {
275 push @refs, $_->[1] unless $seen{$_->[1]}++
276 }
277 shift->send (join ",", @refs);
278 });
279
280 $noderef = $DEFAULT_PORT unless length $noderef;
281
282 my $idx;
283 for my $t (split /,/, $noderef) {
284 my $pri = ++$idx;
285
286 #TODO: this should be outside normalise_noderef and in become_public
287 if ($t =~ /^\d*$/) {
288 my $nodename = (POSIX::uname)[1];
289
290 $cv->begin;
291 AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
292 for (@_) {
293 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
294 push @res, [
295 $pri += 1e-5,
296 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
297 ];
298 }
299 $cv->end;
300 };
301
302# my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
303#
304# for (@ipv4) {
305# push @res, [
306# $pri,
307# AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
308# ];
309# }
310 } else {
311 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
312 or Carp::croak "$t: unparsable transport descriptor";
313
314 $cv->begin;
315 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
316 for (@_) {
317 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
318 push @res, [
319 $pri += 1e-5,
320 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
321 ];
322 }
323 $cv->end;
324 }
325 }
326 }
327
328 $cv->end;
329
330 $cv
331}
332
333sub become_public {
334 return if $PUBLIC;
335
336 my $noderef = join ",", ref $_[0] ? @{+shift} : shift;
337 my @args = @_;
338
339 $NODE = (normalise_noderef $noderef)->recv;
340
341 for my $t (split /,/, $NODE) {
342 $NODE{$t} = $NODE{""};
343
344 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
345
346 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
347 @args,
348 on_error => sub {
349 die "on_error<@_>\n";#d#
350 },
351 on_connect => sub {
352 my ($tp) = @_;
353
354 $NODE{$tp->{remote_id}} = $_[0];
355 },
356 sub {
357 my ($tp) = @_;
358
359 $NODE{"$tp->{peerhost}:$tp->{peerport}"} = $tp;
360 },
361 ;
362 }
363
364 $PUBLIC = 1;
365}
366
367#############################################################################
368# self node code
369
370sub _new_port($) {
371 my ($name) = @_; 283 my ($self, $name) = @_;
372 284
373 my ($noderef, $portname) = split /#/, $name; 285 $self->{wkname} = $name;
374 286 $AnyEvent::MP::Base::WKP{$name} = "$self";
375 $PORT{$name} =
376 $PORT{$portname} = {
377 names => [$name, $portname],
378 };
379} 287}
380 288
381$NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE; 289=item $port->destroy
382_new_port "";
383 290
384rcv "", relay => \&snd; 291Explicitly destroy/remove/nuke/vaporise the port.
292
293Ports are normally kept alive by there mere existance alone, and need to
294be destroyed explicitly.
295
296=cut
297
298sub destroy {
299 my ($self) = @_;
300
301 AnyEvent::MP::Base::del $self->{id};
302
303 delete $AnyEvent::MP::Base::WKP{ $self->{wkname} };
304
305 delete $AnyEvent::MP::Base::PORT{$_}
306 for @{ $self->{names} };
307}
308
309=back
310
311=head1 FUNCTIONS FOR NODES
312
313=over 4
314
315=item mon $noderef, $callback->($noderef, $status, $)
316
317Monitors the given noderef.
318
319=item become_public endpoint...
320
321Tells the node to become a public node, i.e. reachable from other nodes.
322
323If no arguments are given, or the first argument is C<undef>, then
324AnyEvent::MP tries to bind on port C<4040> on all IP addresses that the
325local nodename resolves to.
326
327Otherwise the first argument must be an array-reference with transport
328endpoints ("ip:port", "hostname:port") or port numbers (in which case the
329local nodename is used as hostname). The endpoints are all resolved and
330will become the node reference.
331
332=cut
333
334=back
335
336=head1 NODE MESSAGES
337
338Nodes understand the following messages sent to them. Many of them take
339arguments called C<@reply>, which will simply be used to compose a reply
340message - C<$reply[0]> is the port to reply to, C<$reply[1]> the type and
341the remaining arguments are simply the message data.
342
343=over 4
344
345=cut
346
347=item wkp => $name, @reply
348
349Replies with the port ID of the specified well-known port, or C<undef>.
350
351=item devnull => ...
352
353Generic data sink/CPU heat conversion.
354
355=item relay => $port, @msg
356
357Simply forwards the message to the given port.
358
359=item eval => $string[ @reply]
360
361Evaluates the given string. If C<@reply> is given, then a message of the
362form C<@reply, $@, @evalres> is sent.
363
364Example: crash another node.
365
366 snd $othernode, eval => "exit";
367
368=item time => @reply
369
370Replies the the current node time to C<@reply>.
371
372Example: tell the current node to send the current time to C<$myport> in a
373C<timereply> message.
374
375 snd $NODE, time => $myport, timereply => 1, 2;
376 # => snd $myport, timereply => 1, 2, <time>
385 377
386=back 378=back
387 379
388=head1 SEE ALSO 380=head1 SEE ALSO
389 381

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines