ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
(Generate patch)

Comparing AnyEvent-MP/MP.pm (file contents):
Revision 1.5 by root, Sat Aug 1 07:44:02 2009 UTC vs.
Revision 1.20 by root, Mon Aug 3 22:05:55 2009 UTC

27This module (-family) implements a simple message passing framework. 27This module (-family) implements a simple message passing framework.
28 28
29Despite its simplicity, you can securely message other processes running 29Despite its simplicity, you can securely message other processes running
30on the same or other hosts. 30on the same or other hosts.
31 31
32At the moment, this module family is severly brokena nd underdocumented,
33so do not use. This was uploaded mainly to resreve the CPAN namespace -
34stay tuned!
35
32=head1 CONCEPTS 36=head1 CONCEPTS
33 37
34=over 4 38=over 4
35 39
36=item port 40=item port
68 72
69=cut 73=cut
70 74
71package AnyEvent::MP; 75package AnyEvent::MP;
72 76
73use AnyEvent::MP::Util ();
74use AnyEvent::MP::Node; 77use AnyEvent::MP::Base;
75use AnyEvent::MP::Transport;
76 78
77use utf8;
78use common::sense; 79use common::sense;
79 80
80use Carp (); 81use Carp ();
81 82
82use AE (); 83use AE ();
83 84
84use base "Exporter"; 85use base "Exporter";
85 86
86our $VERSION = '0.0'; 87our $VERSION = '0.02';
87our @EXPORT = qw(NODE $NODE $PORT snd rcv _any_); 88our @EXPORT = qw(
88 89 NODE $NODE $PORT snd rcv mon del _any_
89our $DEFAULT_SECRET; 90 create_port create_port_on
90our $DEFAULT_PORT = "4040"; 91 create_miniport
91 92 become_slave become_public
92our $CONNECT_INTERVAL = 5; # new connect every 5s, at least 93);
93our $CONNECT_TIMEOUT = 30; # includes handshake
94
95sub default_secret {
96 unless (defined $DEFAULT_SECRET) {
97 if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
98 sysread $fh, $DEFAULT_SECRET, -s $fh;
99 } else {
100 $DEFAULT_SECRET = AnyEvent::MP::Util::nonce 32;
101 }
102 }
103
104 $DEFAULT_SECRET
105}
106 94
107=item NODE / $NODE 95=item NODE / $NODE
108 96
109The C<NODE ()> function and the C<$NODE> variable contain the noderef of 97The C<NODE ()> function and the C<$NODE> variable contain the noderef of
110the local node. The value is initialised by a call to C<become_public> or 98the local node. The value is initialised by a call to C<become_public> or
111C<become_slave>, after which all local port identifiers become invalid. 99C<become_slave>, after which all local port identifiers become invalid.
112 100
113=cut
114
115our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie
116our $PUBLIC = 0;
117our $NODE;
118our $PORT;
119
120our %NODE; # node id to transport mapping, or "undef", for local node
121our %PORT; # local ports
122our %LISTENER; # local transports
123
124sub NODE() { $NODE }
125
126{
127 use POSIX ();
128 my $nodename = (POSIX::uname)[1];
129 $NODE = "$$\@$nodename";
130}
131
132sub _ANY_() { 1 }
133sub _any_() { \&_ANY_ }
134
135sub add_node {
136 my ($noderef) = @_;
137
138 return $NODE{$noderef}
139 if exists $NODE{$noderef};
140
141 for (split /,/, $noderef) {
142 return $NODE{$noderef} = $NODE{$_}
143 if exists $NODE{$_};
144 }
145
146 # for indirect sends, use a different class
147 my $node = new AnyEvent::MP::Node::Direct $noderef;
148
149 $NODE{$_} = $node
150 for $noderef, split /,/, $noderef;
151
152 $node
153}
154
155=item snd $portid, type => @data 101=item snd $portid, type => @data
156 102
157=item snd $portid, @msg 103=item snd $portid, @msg
158 104
159Send the given message to the given port ID, which can identify either a 105Send the given message to the given port ID, which can identify either
160local or a remote port. 106a local or a remote port, and can be either a string or soemthignt hat
107stringifies a sa port ID (such as a port object :).
161 108
162While the message can be about anything, it is highly recommended to use 109While the message can be about anything, it is highly recommended to use a
163a constant string as first element. 110string as first element (a portid, or some word that indicates a request
111type etc.).
164 112
165The message data effectively becomes read-only after a call to this 113The message data effectively becomes read-only after a call to this
166function: modifying any argument is not allowed and can cause many 114function: modifying any argument is not allowed and can cause many
167problems. 115problems.
168 116
170JSON is used, then only strings, numbers and arrays and hashes consisting 118JSON is used, then only strings, numbers and arrays and hashes consisting
171of those are allowed (no objects). When Storable is used, then anything 119of those are allowed (no objects). When Storable is used, then anything
172that Storable can serialise and deserialise is allowed, and for the local 120that Storable can serialise and deserialise is allowed, and for the local
173node, anything can be passed. 121node, anything can be passed.
174 122
175=cut 123=item $guard = mon $portid, $cb->()
176 124
177sub snd(@) { 125Monitor the given port and call the given callback when the port is
126destroyed or connection to it's node is lost.
127
128#TODO
129
130=cut
131
132sub mon {
178 my ($noderef, $port) = split /#/, shift, 2; 133 my ($noderef, $port) = split /#/, shift, 2;
179 134
180 add_node $noderef 135 my $node = AnyEvent::MP::Base::add_node $noderef;
181 unless exists $NODE{$noderef};
182 136
183 $NODE{$noderef}->send (["$port", [@_]]); 137 my $cb = shift;
184}
185 138
139 $node->monitor ($port, $cb);
140
141 defined wantarray
142 and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) }
143}
144
145=item $local_port = create_port
146
147Create a new local port object. See the next section for allowed methods.
148
149=cut
150
151sub create_port {
152 my $id = "$AnyEvent::MP::Base::UNIQ." . $AnyEvent::MP::Base::ID++;
153
154 my $self = bless {
155 id => "$NODE#$id",
156 names => [$id],
157 }, "AnyEvent::MP::Port";
158
159 $AnyEvent::MP::Base::PORT{$id} = sub {
160 unshift @_, $self;
161
162 for (@{ $self->{rc0}{$_[1]} }) {
163 $_ && &{$_->[0]}
164 && undef $_;
165 }
166
167 for (@{ $self->{rcv}{$_[1]} }) {
168 $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
169 && &{$_->[0]}
170 && undef $_;
171 }
172
173 for (@{ $self->{any} }) {
174 $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
175 && &{$_->[0]}
176 && undef $_;
177 }
178 };
179
180 $self
181}
182
183=item $portid = miniport { my @msg = @_; $finished }
184
185Creates a "mini port", that is, a very lightweight port without any
186pattern matching behind it, and returns its ID.
187
188The block will be called for every message received on the port. When the
189callback returns a true value its job is considered "done" and the port
190will be destroyed. Otherwise it will stay alive.
191
192The message will be passed as-is, no extra argument (i.e. no port id) will
193be passed to the callback.
194
195If you need the local port id in the callback, this works nicely:
196
197 my $port; $port = miniport {
198 snd $otherport, reply => $port;
199 };
200
201=cut
202
203sub miniport(&) {
204 my $cb = shift;
205 my $id = "$AnyEvent::MP::Base::UNIQ." . $AnyEvent::MP::Base::ID++;
206
207 $AnyEvent::MP::Base::PORT{$id} = sub {
208 &$cb
209 and del $id;
210 };
211
212 "$NODE#$id"
213}
214
215package AnyEvent::MP::Port;
216
217=back
218
219=head1 METHODS FOR PORT OBJECTS
220
221=over 4
222
223=item "$port"
224
225A port object stringifies to its port ID, so can be used directly for
226C<snd> operations.
227
228=cut
229
230use overload
231 '""' => sub { $_[0]{id} },
232 fallback => 1;
233
234sub TO_JSON { $_[0]{id} }
235
186=item rcv $portid, type => $callback->(@msg) 236=item $port->rcv (type => $callback->($port, @msg))
187 237
188=item rcv $portid, $smartmatch => $callback->(@msg) 238=item $port->rcv ($smartmatch => $callback->($port, @msg))
189 239
190=item rcv $portid, [$smartmatch...] => $callback->(@msg) 240=item $port->rcv ([$smartmatch...] => $callback->($port, @msg))
191 241
192Register a callback on the port identified by C<$portid>, which I<must> be 242Register a callback on the given port.
193a local port.
194 243
195The callback has to return a true value when its work is done, after 244The callback has to return a true value when its work is done, after
196which is will be removed, or a false value in which case it will stay 245which is will be removed, or a false value in which case it will stay
197registered. 246registered.
198 247
208also the most efficient match (by far). 257also the most efficient match (by far).
209 258
210=cut 259=cut
211 260
212sub rcv($@) { 261sub rcv($@) {
213 my ($port, $match, $cb) = @_; 262 my ($self, $match, $cb) = @_;
214
215 my $port = $PORT{$port}
216 or do {
217 my ($noderef, $lport) = split /#/, $port;
218 "AnyEvent::MP::Node::Self" eq ref $NODE{$noderef}
219 or Carp::croak "$port: can only rcv on local ports";
220
221 $PORT{$lport}
222 or Carp::croak "$port: port does not exist";
223
224 $PORT{$port} = $PORT{$lport} # also return
225 };
226 263
227 if (!ref $match) { 264 if (!ref $match) {
228 push @{ $port->{rc0}{$match} }, [$cb]; 265 push @{ $self->{rc0}{$match} }, [$cb];
229 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) { 266 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
230 my ($type, @match) = @$match; 267 my ($type, @match) = @$match;
231 @match 268 @match
232 ? push @{ $port->{rcv}{$match->[0]} }, [$cb, \@match] 269 ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
233 : push @{ $port->{rc0}{$match->[0]} }, [$cb]; 270 : push @{ $self->{rc0}{$match->[0]} }, [$cb];
234 } else { 271 } else {
235 push @{ $port->{any} }, [$cb, $match]; 272 push @{ $self->{any} }, [$cb, $match];
236 } 273 }
237} 274}
238 275
239sub _inject { 276=item $port->register ($name)
240 my ($port, $msg) = @{+shift};
241 277
242 $port = $PORT{$port} 278Registers the given port under the well known name C<$name>. If the name
243 or return; 279already exists it is replaced.
244 280
245 @_ = @$msg; 281A port can only be registered under one well known name.
246 282
247 for (@{ $port->{rc0}{$msg->[0]} }) { 283=cut
248 $_ && &{$_->[0]}
249 && undef $_;
250 }
251 284
252 for (@{ $port->{rcv}{$msg->[0]} }) { 285sub register {
253 $_ && [@_[1..$#{$_->[1]}]] ~~ $_->[1] 286 my ($self, $name) = @_;
254 && &{$_->[0]}
255 && undef $_;
256 }
257 287
258 for (@{ $port->{any} }) { 288 $self->{wkname} = $name;
259 $_ && [@_[0..$#{$_->[1]}]] ~~ $_->[1] 289 $AnyEvent::MP::Base::WKP{$name} = "$self";
260 && &{$_->[0]}
261 && undef $_;
262 }
263} 290}
264 291
265sub normalise_noderef($) { 292=item $port->destroy
293
294Explicitly destroy/remove/nuke/vaporise the port.
295
296Ports are normally kept alive by there mere existance alone, and need to
297be destroyed explicitly.
298
299=cut
300
301sub destroy {
266 my ($noderef) = @_; 302 my ($self) = @_;
267 303
268 my $cv = AE::cv; 304 AnyEvent::MP::Base::del $self->{id};
269 my @res;
270 305
271 $cv->begin (sub { 306 delete $AnyEvent::MP::Base::WKP{ $self->{wkname} };
272 my %seen;
273 my @refs;
274 for (sort { $a->[0] <=> $b->[0] } @res) {
275 push @refs, $_->[1] unless $seen{$_->[1]}++
276 }
277 shift->send (join ",", @refs);
278 });
279 307
280 $noderef = $DEFAULT_PORT unless length $noderef; 308 delete $AnyEvent::MP::Base::PORT{$_}
281 309 for @{ $self->{names} };
282 my $idx;
283 for my $t (split /,/, $noderef) {
284 my $pri = ++$idx;
285
286 #TODO: this should be outside normalise_noderef and in become_public
287 if ($t =~ /^\d*$/) {
288 my $nodename = (POSIX::uname)[1];
289
290 $cv->begin;
291 AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
292 for (@_) {
293 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
294 push @res, [
295 $pri += 1e-5,
296 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
297 ];
298 }
299 $cv->end;
300 };
301
302# my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
303#
304# for (@ipv4) {
305# push @res, [
306# $pri,
307# AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
308# ];
309# }
310 } else {
311 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
312 or Carp::croak "$t: unparsable transport descriptor";
313
314 $cv->begin;
315 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
316 for (@_) {
317 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
318 push @res, [
319 $pri += 1e-5,
320 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
321 ];
322 }
323 $cv->end;
324 }
325 }
326 }
327
328 $cv->end;
329
330 $cv
331} 310}
332 311
333sub become_public { 312=back
334 return if $PUBLIC;
335 313
336 my $noderef = join ",", ref $_[0] ? @{+shift} : shift; 314=head1 FUNCTIONS FOR NODES
337 my @args = @_;
338 315
339 $NODE = (normalise_noderef $noderef)->recv; 316=over 4
340 317
341 for my $t (split /,/, $NODE) { 318=item mon $noderef, $callback->($noderef, $status, $)
342 $NODE{$t} = $NODE{""};
343 319
344 my ($host, $port) = AnyEvent::Socket::parse_hostport $t; 320Monitors the given noderef.
345 321
346 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port, 322=item become_public endpoint...
347 @args,
348 on_error => sub {
349 die "on_error<@_>\n";#d#
350 },
351 on_connect => sub {
352 my ($tp) = @_;
353 323
354 $NODE{$tp->{remote_id}} = $_[0]; 324Tells the node to become a public node, i.e. reachable from other nodes.
355 },
356 sub {
357 my ($tp) = @_;
358 325
359 $NODE{"$tp->{peerhost}:$tp->{peerport}"} = $tp; 326If no arguments are given, or the first argument is C<undef>, then
360 }, 327AnyEvent::MP tries to bind on port C<4040> on all IP addresses that the
361 ; 328local nodename resolves to.
362 }
363 329
364 $PUBLIC = 1; 330Otherwise the first argument must be an array-reference with transport
365} 331endpoints ("ip:port", "hostname:port") or port numbers (in which case the
332local nodename is used as hostname). The endpoints are all resolved and
333will become the node reference.
334
335=cut
366 336
367=back 337=back
368 338
369=head1 NODE MESSAGES 339=head1 NODE MESSAGES
370 340
375 345
376=over 4 346=over 4
377 347
378=cut 348=cut
379 349
380############################################################################# 350=item wkp => $name, @reply
381# self node code
382 351
383sub _new_port($) { 352Replies with the port ID of the specified well-known port, or C<undef>.
384 my ($name) = @_;
385 353
386 my ($noderef, $portname) = split /#/, $name; 354=item devnull => ...
387 355
388 $PORT{$name} = 356Generic data sink/CPU heat conversion.
389 $PORT{$portname} = {
390 names => [$name, $portname],
391 };
392}
393
394$NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE;
395_new_port "";
396 357
397=item relay => $port, @msg 358=item relay => $port, @msg
398 359
399Simply forwards the message to the given port. 360Simply forwards the message to the given port.
400
401=cut
402
403rcv "", relay => \&snd;
404 361
405=item eval => $string[ @reply] 362=item eval => $string[ @reply]
406 363
407Evaluates the given string. If C<@reply> is given, then a message of the 364Evaluates the given string. If C<@reply> is given, then a message of the
408form C<@reply, $@, @evalres> is sent. 365form C<@reply, $@, @evalres> is sent.
409 366
410Example: crash another node. 367Example: crash another node.
411 368
412 snd $othernode, eval => "exit"; 369 snd $othernode, eval => "exit";
413 370
414=cut
415
416rcv "", eval => sub {
417 my (undef, $string, @reply) = @_;
418 my @res = eval $string;
419 snd @reply, "$@", @res if @reply;
420};
421
422=item time => @reply 371=item time => @reply
423 372
424Replies the the current node time to C<@reply>. 373Replies the the current node time to C<@reply>.
425 374
426Example: tell the current node to send the current time to C<$myport> in a 375Example: tell the current node to send the current time to C<$myport> in a
427C<timereply> message. 376C<timereply> message.
428 377
429 snd $NODE, time => $myport, timereply => 1, 2; 378 snd $NODE, time => $myport, timereply => 1, 2;
430 # => snd $myport, timereply => 1, 2, <time> 379 # => snd $myport, timereply => 1, 2, <time>
431 380
432=cut
433
434rcv "", time => sub { shift; snd @_, AE::time };
435
436=back 381=back
437 382
438=head1 SEE ALSO 383=head1 SEE ALSO
439 384
440L<AnyEvent>. 385L<AnyEvent>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines