ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/AnyEvent-MP/MP.pm
(Generate patch)

Comparing cvsroot/AnyEvent-MP/MP.pm (file contents):
Revision 1.6 by root, Sat Aug 1 10:02:33 2009 UTC vs.
Revision 1.20 by root, Mon Aug 3 22:05:55 2009 UTC

72 72
73=cut 73=cut
74 74
75package AnyEvent::MP; 75package AnyEvent::MP;
76 76
77use AnyEvent::MP::Util ();
78use AnyEvent::MP::Node; 77use AnyEvent::MP::Base;
79use AnyEvent::MP::Transport;
80 78
81use utf8;
82use common::sense; 79use common::sense;
83 80
84use Carp (); 81use Carp ();
85 82
86use AE (); 83use AE ();
87 84
88use base "Exporter"; 85use base "Exporter";
89 86
90our $VERSION = '0.01'; 87our $VERSION = '0.02';
91our @EXPORT = qw(NODE $NODE $PORT snd rcv _any_); 88our @EXPORT = qw(
92 89 NODE $NODE $PORT snd rcv mon del _any_
93our $DEFAULT_SECRET; 90 create_port create_port_on
94our $DEFAULT_PORT = "4040"; 91 create_miniport
95 92 become_slave become_public
96our $CONNECT_INTERVAL = 5; # new connect every 5s, at least 93);
97our $CONNECT_TIMEOUT = 30; # includes handshake
98
99sub default_secret {
100 unless (defined $DEFAULT_SECRET) {
101 if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
102 sysread $fh, $DEFAULT_SECRET, -s $fh;
103 } else {
104 $DEFAULT_SECRET = AnyEvent::MP::Util::nonce 32;
105 }
106 }
107
108 $DEFAULT_SECRET
109}
110 94
111=item NODE / $NODE 95=item NODE / $NODE
112 96
113The C<NODE ()> function and the C<$NODE> variable contain the noderef of 97The C<NODE ()> function and the C<$NODE> variable contain the noderef of
114the local node. The value is initialised by a call to C<become_public> or 98the local node. The value is initialised by a call to C<become_public> or
115C<become_slave>, after which all local port identifiers become invalid. 99C<become_slave>, after which all local port identifiers become invalid.
116 100
117=cut
118
119our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie
120our $ID = "a0";
121our $PUBLIC = 0;
122our $NODE;
123our $PORT;
124
125our %NODE; # node id to transport mapping, or "undef", for local node
126our %PORT; # local ports
127our %LISTENER; # local transports
128
129sub NODE() { $NODE }
130
131{
132 use POSIX ();
133 my $nodename = (POSIX::uname)[1];
134 $NODE = "$$\@$nodename";
135}
136
137sub _ANY_() { 1 }
138sub _any_() { \&_ANY_ }
139
140sub add_node {
141 my ($noderef) = @_;
142
143 return $NODE{$noderef}
144 if exists $NODE{$noderef};
145
146 for (split /,/, $noderef) {
147 return $NODE{$noderef} = $NODE{$_}
148 if exists $NODE{$_};
149 }
150
151 # for indirect sends, use a different class
152 my $node = new AnyEvent::MP::Node::Direct $noderef;
153
154 $NODE{$_} = $node
155 for $noderef, split /,/, $noderef;
156
157 $node
158}
159
160=item snd $portid, type => @data 101=item snd $portid, type => @data
161 102
162=item snd $portid, @msg 103=item snd $portid, @msg
163 104
164Send the given message to the given port ID, which can identify either a 105Send the given message to the given port ID, which can identify either
165local or a remote port. 106a local or a remote port, and can be either a string or soemthignt hat
107stringifies a sa port ID (such as a port object :).
166 108
167While the message can be about anything, it is highly recommended to use 109While the message can be about anything, it is highly recommended to use a
168a constant string as first element. 110string as first element (a portid, or some word that indicates a request
111type etc.).
169 112
170The message data effectively becomes read-only after a call to this 113The message data effectively becomes read-only after a call to this
171function: modifying any argument is not allowed and can cause many 114function: modifying any argument is not allowed and can cause many
172problems. 115problems.
173 116
175JSON is used, then only strings, numbers and arrays and hashes consisting 118JSON is used, then only strings, numbers and arrays and hashes consisting
176of those are allowed (no objects). When Storable is used, then anything 119of those are allowed (no objects). When Storable is used, then anything
177that Storable can serialise and deserialise is allowed, and for the local 120that Storable can serialise and deserialise is allowed, and for the local
178node, anything can be passed. 121node, anything can be passed.
179 122
180=cut 123=item $guard = mon $portid, $cb->()
181 124
182sub snd(@) { 125Monitor the given port and call the given callback when the port is
126destroyed or connection to it's node is lost.
127
128#TODO
129
130=cut
131
132sub mon {
183 my ($noderef, $port) = split /#/, shift, 2; 133 my ($noderef, $port) = split /#/, shift, 2;
184 134
185 add_node $noderef 135 my $node = AnyEvent::MP::Base::add_node $noderef;
186 unless exists $NODE{$noderef};
187 136
188 $NODE{$noderef}->send (["$port", [@_]]); 137 my $cb = shift;
189}
190 138
139 $node->monitor ($port, $cb);
140
141 defined wantarray
142 and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) }
143}
144
145=item $local_port = create_port
146
147Create a new local port object. See the next section for allowed methods.
148
149=cut
150
151sub create_port {
152 my $id = "$AnyEvent::MP::Base::UNIQ." . $AnyEvent::MP::Base::ID++;
153
154 my $self = bless {
155 id => "$NODE#$id",
156 names => [$id],
157 }, "AnyEvent::MP::Port";
158
159 $AnyEvent::MP::Base::PORT{$id} = sub {
160 unshift @_, $self;
161
162 for (@{ $self->{rc0}{$_[1]} }) {
163 $_ && &{$_->[0]}
164 && undef $_;
165 }
166
167 for (@{ $self->{rcv}{$_[1]} }) {
168 $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
169 && &{$_->[0]}
170 && undef $_;
171 }
172
173 for (@{ $self->{any} }) {
174 $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
175 && &{$_->[0]}
176 && undef $_;
177 }
178 };
179
180 $self
181}
182
183=item $portid = miniport { my @msg = @_; $finished }
184
185Creates a "mini port", that is, a very lightweight port without any
186pattern matching behind it, and returns its ID.
187
188The block will be called for every message received on the port. When the
189callback returns a true value its job is considered "done" and the port
190will be destroyed. Otherwise it will stay alive.
191
192The message will be passed as-is, no extra argument (i.e. no port id) will
193be passed to the callback.
194
195If you need the local port id in the callback, this works nicely:
196
197 my $port; $port = miniport {
198 snd $otherport, reply => $port;
199 };
200
201=cut
202
203sub miniport(&) {
204 my $cb = shift;
205 my $id = "$AnyEvent::MP::Base::UNIQ." . $AnyEvent::MP::Base::ID++;
206
207 $AnyEvent::MP::Base::PORT{$id} = sub {
208 &$cb
209 and del $id;
210 };
211
212 "$NODE#$id"
213}
214
215package AnyEvent::MP::Port;
216
217=back
218
219=head1 METHODS FOR PORT OBJECTS
220
221=over 4
222
223=item "$port"
224
225A port object stringifies to its port ID, so can be used directly for
226C<snd> operations.
227
228=cut
229
230use overload
231 '""' => sub { $_[0]{id} },
232 fallback => 1;
233
234sub TO_JSON { $_[0]{id} }
235
191=item rcv $portid, type => $callback->(@msg) 236=item $port->rcv (type => $callback->($port, @msg))
192 237
193=item rcv $portid, $smartmatch => $callback->(@msg) 238=item $port->rcv ($smartmatch => $callback->($port, @msg))
194 239
195=item rcv $portid, [$smartmatch...] => $callback->(@msg) 240=item $port->rcv ([$smartmatch...] => $callback->($port, @msg))
196 241
197Register a callback on the port identified by C<$portid>, which I<must> be 242Register a callback on the given port.
198a local port.
199 243
200The callback has to return a true value when its work is done, after 244The callback has to return a true value when its work is done, after
201which is will be removed, or a false value in which case it will stay 245which is will be removed, or a false value in which case it will stay
202registered. 246registered.
203 247
213also the most efficient match (by far). 257also the most efficient match (by far).
214 258
215=cut 259=cut
216 260
217sub rcv($@) { 261sub rcv($@) {
218 my ($port, $match, $cb) = @_; 262 my ($self, $match, $cb) = @_;
219
220 my $port = $PORT{$port}
221 or do {
222 my ($noderef, $lport) = split /#/, $port;
223 "AnyEvent::MP::Node::Self" eq ref $NODE{$noderef}
224 or Carp::croak "$port: can only rcv on local ports";
225
226 $PORT{$lport}
227 or Carp::croak "$port: port does not exist";
228
229 $PORT{$port} = $PORT{$lport} # also return
230 };
231 263
232 if (!ref $match) { 264 if (!ref $match) {
233 push @{ $port->{rc0}{$match} }, [$cb]; 265 push @{ $self->{rc0}{$match} }, [$cb];
234 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) { 266 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
235 my ($type, @match) = @$match; 267 my ($type, @match) = @$match;
236 @match 268 @match
237 ? push @{ $port->{rcv}{$match->[0]} }, [$cb, \@match] 269 ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
238 : push @{ $port->{rc0}{$match->[0]} }, [$cb]; 270 : push @{ $self->{rc0}{$match->[0]} }, [$cb];
239 } else { 271 } else {
240 push @{ $port->{any} }, [$cb, $match]; 272 push @{ $self->{any} }, [$cb, $match];
241 } 273 }
242} 274}
243 275
244sub _inject { 276=item $port->register ($name)
245 my ($port, $msg) = @{+shift};
246 277
247 $port = $PORT{$port} 278Registers the given port under the well known name C<$name>. If the name
248 or return; 279already exists it is replaced.
249 280
250 @_ = @$msg; 281A port can only be registered under one well known name.
251 282
252 for (@{ $port->{rc0}{$msg->[0]} }) { 283=cut
253 $_ && &{$_->[0]}
254 && undef $_;
255 }
256 284
257 for (@{ $port->{rcv}{$msg->[0]} }) { 285sub register {
258 $_ && [@_[1..$#{$_->[1]}]] ~~ $_->[1] 286 my ($self, $name) = @_;
259 && &{$_->[0]}
260 && undef $_;
261 }
262 287
263 for (@{ $port->{any} }) { 288 $self->{wkname} = $name;
264 $_ && [@_[0..$#{$_->[1]}]] ~~ $_->[1] 289 $AnyEvent::MP::Base::WKP{$name} = "$self";
265 && &{$_->[0]}
266 && undef $_;
267 }
268} 290}
269 291
270sub normalise_noderef($) { 292=item $port->destroy
293
294Explicitly destroy/remove/nuke/vaporise the port.
295
296Ports are normally kept alive by there mere existance alone, and need to
297be destroyed explicitly.
298
299=cut
300
301sub destroy {
271 my ($noderef) = @_; 302 my ($self) = @_;
272 303
273 my $cv = AE::cv; 304 AnyEvent::MP::Base::del $self->{id};
274 my @res;
275 305
276 $cv->begin (sub { 306 delete $AnyEvent::MP::Base::WKP{ $self->{wkname} };
277 my %seen;
278 my @refs;
279 for (sort { $a->[0] <=> $b->[0] } @res) {
280 push @refs, $_->[1] unless $seen{$_->[1]}++
281 }
282 shift->send (join ",", @refs);
283 });
284 307
285 $noderef = $DEFAULT_PORT unless length $noderef; 308 delete $AnyEvent::MP::Base::PORT{$_}
286 309 for @{ $self->{names} };
287 my $idx;
288 for my $t (split /,/, $noderef) {
289 my $pri = ++$idx;
290
291 #TODO: this should be outside normalise_noderef and in become_public
292 if ($t =~ /^\d*$/) {
293 my $nodename = (POSIX::uname)[1];
294
295 $cv->begin;
296 AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
297 for (@_) {
298 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
299 push @res, [
300 $pri += 1e-5,
301 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
302 ];
303 }
304 $cv->end;
305 };
306
307# my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
308#
309# for (@ipv4) {
310# push @res, [
311# $pri,
312# AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
313# ];
314# }
315 } else {
316 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
317 or Carp::croak "$t: unparsable transport descriptor";
318
319 $cv->begin;
320 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
321 for (@_) {
322 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
323 push @res, [
324 $pri += 1e-5,
325 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
326 ];
327 }
328 $cv->end;
329 }
330 }
331 }
332
333 $cv->end;
334
335 $cv
336} 310}
337 311
338sub become_public { 312=back
339 return if $PUBLIC;
340 313
341 my $noderef = join ",", ref $_[0] ? @{+shift} : shift; 314=head1 FUNCTIONS FOR NODES
342 my @args = @_;
343 315
344 $NODE = (normalise_noderef $noderef)->recv; 316=over 4
345 317
346 for my $t (split /,/, $NODE) { 318=item mon $noderef, $callback->($noderef, $status, $)
347 $NODE{$t} = $NODE{""};
348 319
349 my ($host, $port) = AnyEvent::Socket::parse_hostport $t; 320Monitors the given noderef.
350 321
351 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port, 322=item become_public endpoint...
352 @args,
353 on_error => sub {
354 die "on_error<@_>\n";#d#
355 },
356 on_connect => sub {
357 my ($tp) = @_;
358 323
359 $NODE{$tp->{remote_id}} = $_[0]; 324Tells the node to become a public node, i.e. reachable from other nodes.
360 },
361 sub {
362 my ($tp) = @_;
363 325
364 $NODE{"$tp->{peerhost}:$tp->{peerport}"} = $tp; 326If no arguments are given, or the first argument is C<undef>, then
365 }, 327AnyEvent::MP tries to bind on port C<4040> on all IP addresses that the
366 ; 328local nodename resolves to.
367 }
368 329
369 $PUBLIC = 1; 330Otherwise the first argument must be an array-reference with transport
370} 331endpoints ("ip:port", "hostname:port") or port numbers (in which case the
332local nodename is used as hostname). The endpoints are all resolved and
333will become the node reference.
334
335=cut
371 336
372=back 337=back
373 338
374=head1 NODE MESSAGES 339=head1 NODE MESSAGES
375 340
380 345
381=over 4 346=over 4
382 347
383=cut 348=cut
384 349
385############################################################################# 350=item wkp => $name, @reply
386# self node code
387 351
388sub _new_port($) { 352Replies with the port ID of the specified well-known port, or C<undef>.
389 my ($name) = @_;
390 353
391 my ($noderef, $portname) = split /#/, $name; 354=item devnull => ...
392 355
393 $PORT{$name} = 356Generic data sink/CPU heat conversion.
394 $PORT{$portname} = {
395 names => [$name, $portname],
396 };
397}
398
399$NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE;
400_new_port "";
401 357
402=item relay => $port, @msg 358=item relay => $port, @msg
403 359
404Simply forwards the message to the given port. 360Simply forwards the message to the given port.
405
406=cut
407
408rcv "", relay => \&snd;
409 361
410=item eval => $string[ @reply] 362=item eval => $string[ @reply]
411 363
412Evaluates the given string. If C<@reply> is given, then a message of the 364Evaluates the given string. If C<@reply> is given, then a message of the
413form C<@reply, $@, @evalres> is sent. 365form C<@reply, $@, @evalres> is sent.
414 366
415Example: crash another node. 367Example: crash another node.
416 368
417 snd $othernode, eval => "exit"; 369 snd $othernode, eval => "exit";
418 370
419=cut
420
421rcv "", eval => sub {
422 my (undef, $string, @reply) = @_;
423 my @res = eval $string;
424 snd @reply, "$@", @res if @reply;
425};
426
427=item time => @reply 371=item time => @reply
428 372
429Replies the the current node time to C<@reply>. 373Replies the the current node time to C<@reply>.
430 374
431Example: tell the current node to send the current time to C<$myport> in a 375Example: tell the current node to send the current time to C<$myport> in a
432C<timereply> message. 376C<timereply> message.
433 377
434 snd $NODE, time => $myport, timereply => 1, 2; 378 snd $NODE, time => $myport, timereply => 1, 2;
435 # => snd $myport, timereply => 1, 2, <time> 379 # => snd $myport, timereply => 1, 2, <time>
436 380
437=cut
438
439rcv "", time => sub { shift; snd @_, AE::time };
440
441=back 381=back
442 382
443=head1 SEE ALSO 383=head1 SEE ALSO
444 384
445L<AnyEvent>. 385L<AnyEvent>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines