… | |
… | |
4 | |
4 | |
5 | =head1 SYNOPSIS |
5 | =head1 SYNOPSIS |
6 | |
6 | |
7 | use AnyEvent::MP; |
7 | use AnyEvent::MP; |
8 | |
8 | |
9 | NODE # returns this node identifier |
|
|
10 | $NODE # contains this node identifier |
9 | $NODE # contains this node's noderef |
|
|
10 | NODE # returns this node's noderef |
|
|
11 | NODE $port # returns the noderef of the port |
11 | |
12 | |
12 | snd $port, type => data...; |
13 | snd $port, type => data...; |
|
|
14 | |
|
|
15 | $SELF # receiving/own port id in rcv callbacks |
13 | |
16 | |
14 | rcv $port, smartmatch => $cb->($port, @msg); |
17 | rcv $port, smartmatch => $cb->($port, @msg); |
15 | |
18 | |
16 | # examples: |
19 | # examples: |
17 | rcv $port2, ping => sub { snd $_[0], "pong"; 0 }; |
20 | rcv $port2, ping => sub { snd $_[0], "pong"; 0 }; |
… | |
… | |
84 | |
87 | |
85 | use base "Exporter"; |
88 | use base "Exporter"; |
86 | |
89 | |
87 | our $VERSION = '0.02'; |
90 | our $VERSION = '0.02'; |
88 | our @EXPORT = qw( |
91 | our @EXPORT = qw( |
89 | NODE $NODE $PORT snd rcv mon kil _any_ |
92 | NODE $NODE *SELF node_of _any_ |
90 | create_port create_port_on |
|
|
91 | miniport |
|
|
92 | become_slave become_public |
93 | become_slave become_public |
|
|
94 | snd rcv mon kil reg psub |
|
|
95 | port |
93 | ); |
96 | ); |
94 | |
97 | |
|
|
98 | our $SELF; |
|
|
99 | |
|
|
100 | sub _self_die() { |
|
|
101 | my $msg = $@; |
|
|
102 | $msg =~ s/\n+$// unless ref $msg; |
|
|
103 | kil $SELF, die => $msg; |
|
|
104 | } |
|
|
105 | |
95 | =item NODE / $NODE |
106 | =item $thisnode = NODE / $NODE |
96 | |
107 | |
97 | The C<NODE ()> function and the C<$NODE> variable contain the noderef of |
108 | The C<NODE> function returns, and the C<$NODE> variable contains |
98 | the local node. The value is initialised by a call to C<become_public> or |
109 | the noderef of the local node. The value is initialised by a call |
99 | C<become_slave>, after which all local port identifiers become invalid. |
110 | to C<become_public> or C<become_slave>, after which all local port |
|
|
111 | identifiers become invalid. |
|
|
112 | |
|
|
113 | =item $noderef = node_of $portid |
|
|
114 | |
|
|
115 | Extracts and returns the noderef from a portid or a noderef. |
|
|
116 | |
|
|
117 | =item $SELF |
|
|
118 | |
|
|
119 | Contains the current port id while executing C<rcv> callbacks or C<psub> |
|
|
120 | blocks. |
|
|
121 | |
|
|
122 | =item SELF, %SELF, @SELF... |
|
|
123 | |
|
|
124 | Due to some quirks in how perl exports variables, it is impossible to |
|
|
125 | just export C<$SELF>, all the symbols called C<SELF> are exported by this |
|
|
126 | module, but only C<$SELF> is currently used. |
100 | |
127 | |
101 | =item snd $portid, type => @data |
128 | =item snd $portid, type => @data |
102 | |
129 | |
103 | =item snd $portid, @msg |
130 | =item snd $portid, @msg |
104 | |
131 | |
… | |
… | |
118 | JSON is used, then only strings, numbers and arrays and hashes consisting |
145 | JSON is used, then only strings, numbers and arrays and hashes consisting |
119 | of those are allowed (no objects). When Storable is used, then anything |
146 | of those are allowed (no objects). When Storable is used, then anything |
120 | that Storable can serialise and deserialise is allowed, and for the local |
147 | that Storable can serialise and deserialise is allowed, and for the local |
121 | node, anything can be passed. |
148 | node, anything can be passed. |
122 | |
149 | |
|
|
150 | =item kil $portid[, @reason] |
|
|
151 | |
|
|
152 | Kill the specified port with the given C<@reason>. |
|
|
153 | |
|
|
154 | If no C<@reason> is specified, then the port is killed "normally" (linked |
|
|
155 | ports will not be kileld, or even notified). |
|
|
156 | |
|
|
157 | Otherwise, linked ports get killed with the same reason (second form of |
|
|
158 | C<mon>, see below). |
|
|
159 | |
|
|
160 | Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks |
|
|
161 | will be reported as reason C<< die => $@ >>. |
|
|
162 | |
|
|
163 | Transport/communication errors are reported as C<< transport_error => |
|
|
164 | $message >>. |
|
|
165 | |
123 | =item $guard = mon $portid, $cb->() |
166 | =item $guard = mon $portid, $cb->(@reason) |
124 | |
167 | |
125 | =item $guard = mon $portid, $otherport |
168 | =item $guard = mon $portid, $otherport |
126 | |
169 | |
127 | =item $guard = mon $portid, $otherport, @msg |
170 | =item $guard = mon $portid, $otherport, @msg |
128 | |
171 | |
129 | Monitor the given port and call the given callback when the port is |
172 | Monitor the given port and do something when the port is killed. |
130 | destroyed or connection to it's node is lost. |
|
|
131 | |
173 | |
132 | #TODO |
174 | In the first form, the callback is simply called with any number |
|
|
175 | of C<@reason> elements (no @reason means that the port was deleted |
|
|
176 | "normally"). Note also that I<< the callback B<must> never die >>, so use |
|
|
177 | C<eval> if unsure. |
|
|
178 | |
|
|
179 | In the second form, the other port will be C<kil>'ed with C<@reason>, iff |
|
|
180 | a @reason was specified, i.e. on "normal" kils nothing happens, while |
|
|
181 | under all other conditions, the other port is killed with the same reason. |
|
|
182 | |
|
|
183 | In the last form, a message of the form C<@msg, @reason> will be C<snd>. |
|
|
184 | |
|
|
185 | Example: call a given callback when C<$port> is killed. |
|
|
186 | |
|
|
187 | mon $port, sub { warn "port died because of <@_>\n" }; |
|
|
188 | |
|
|
189 | Example: kill ourselves when C<$port> is killed abnormally. |
|
|
190 | |
|
|
191 | mon $port, $self; |
|
|
192 | |
|
|
193 | Example: send us a restart message another C<$port> is killed. |
|
|
194 | |
|
|
195 | mon $port, $self => "restart"; |
133 | |
196 | |
134 | =cut |
197 | =cut |
135 | |
198 | |
136 | sub mon { |
199 | sub mon { |
137 | my ($noderef, $port, $cb) = ((split /#/, shift, 2), shift); |
200 | my ($noderef, $port, $cb) = ((split /#/, shift, 2), shift); |
138 | |
201 | |
139 | my $node = AnyEvent::MP::Base::add_node $noderef; |
202 | my $node = $NODE{$noderef} || add_node $noderef; |
140 | |
203 | |
141 | #TODO: ports must not be references |
204 | #TODO: ports must not be references |
142 | if (!ref $cb or "AnyEvent::MP::Port" eq ref $cb) { |
205 | if (!ref $cb or "AnyEvent::MP::Port" eq ref $cb) { |
143 | if (@_) { |
206 | if (@_) { |
144 | # send a kill info message |
207 | # send a kill info message |
145 | my (@msg) = ($cb, @_); |
208 | my (@msg) = ($cb, @_); |
146 | $cb = sub { snd @msg, @_ }; |
209 | $cb = sub { snd @msg, @_ }; |
147 | } else { |
210 | } else { |
148 | # simply kill other port |
211 | # simply kill other port |
149 | my $port = $cb; |
212 | my $port = $cb; |
150 | $cb = sub { kil $port, @_ }; |
213 | $cb = sub { kil $port, @_ if @_ }; |
151 | } |
214 | } |
152 | } |
215 | } |
153 | |
216 | |
154 | $node->monitor ($port, $cb); |
217 | $node->monitor ($port, $cb); |
155 | |
218 | |
… | |
… | |
179 | my ($port, @refs) = @_; |
242 | my ($port, @refs) = @_; |
180 | |
243 | |
181 | mon $port, sub { 0 && @refs } |
244 | mon $port, sub { 0 && @refs } |
182 | } |
245 | } |
183 | |
246 | |
184 | =item $local_port = create_port |
247 | =item $local_port = port |
185 | |
248 | |
186 | Create a new local port object. See the next section for allowed methods. |
249 | Create a new local port object that supports message matching. |
187 | |
250 | |
188 | =cut |
|
|
189 | |
|
|
190 | sub create_port { |
|
|
191 | my $id = "$AnyEvent::MP::Base::UNIQ." . $AnyEvent::MP::Base::ID++; |
|
|
192 | |
|
|
193 | my $self = bless { |
|
|
194 | id => "$NODE#$id", |
|
|
195 | }, "AnyEvent::MP::Port"; |
|
|
196 | |
|
|
197 | $AnyEvent::MP::Base::PORT{$id} = sub { |
|
|
198 | unshift @_, $self; |
|
|
199 | |
|
|
200 | for (@{ $self->{rc0}{$_[1]} }) { |
|
|
201 | $_ && &{$_->[0]} |
|
|
202 | && undef $_; |
|
|
203 | } |
|
|
204 | |
|
|
205 | for (@{ $self->{rcv}{$_[1]} }) { |
|
|
206 | $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1] |
|
|
207 | && &{$_->[0]} |
|
|
208 | && undef $_; |
|
|
209 | } |
|
|
210 | |
|
|
211 | for (@{ $self->{any} }) { |
|
|
212 | $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1] |
|
|
213 | && &{$_->[0]} |
|
|
214 | && undef $_; |
|
|
215 | } |
|
|
216 | }; |
|
|
217 | |
|
|
218 | $self |
|
|
219 | } |
|
|
220 | |
|
|
221 | =item $portid = miniport { my @msg = @_; $finished } |
251 | =item $portid = port { my @msg = @_; $finished } |
222 | |
252 | |
223 | Creates a "mini port", that is, a very lightweight port without any |
253 | Creates a "mini port", that is, a very lightweight port without any |
224 | pattern matching behind it, and returns its ID. |
254 | pattern matching behind it, and returns its ID. |
225 | |
255 | |
226 | The block will be called for every message received on the port. When the |
256 | The block will be called for every message received on the port. When the |
… | |
… | |
236 | snd $otherport, reply => $port; |
266 | snd $otherport, reply => $port; |
237 | }; |
267 | }; |
238 | |
268 | |
239 | =cut |
269 | =cut |
240 | |
270 | |
241 | sub miniport(&) { |
271 | sub port(;&) { |
|
|
272 | my $id = "$UNIQ." . $ID++; |
|
|
273 | my $port = "$NODE#$id"; |
|
|
274 | |
|
|
275 | if (@_) { |
242 | my $cb = shift; |
276 | my $cb = shift; |
243 | my $id = "$AnyEvent::MP::Base::UNIQ." . $AnyEvent::MP::Base::ID++; |
277 | $PORT{$id} = sub { |
244 | |
278 | local $SELF = $port; |
245 | $AnyEvent::MP::Base::PORT{$id} = sub { |
279 | eval { |
246 | &$cb |
280 | &$cb |
247 | and kil $id; |
281 | and kil $id; |
|
|
282 | }; |
|
|
283 | _self_die if $@; |
|
|
284 | }; |
|
|
285 | } else { |
|
|
286 | my $self = bless { |
|
|
287 | id => "$NODE#$id", |
|
|
288 | }, "AnyEvent::MP::Port"; |
|
|
289 | |
|
|
290 | $PORT_DATA{$id} = $self; |
|
|
291 | $PORT{$id} = sub { |
|
|
292 | local $SELF = $port; |
|
|
293 | |
|
|
294 | eval { |
|
|
295 | for (@{ $self->{rc0}{$_[0]} }) { |
|
|
296 | $_ && &{$_->[0]} |
|
|
297 | && undef $_; |
|
|
298 | } |
|
|
299 | |
|
|
300 | for (@{ $self->{rcv}{$_[0]} }) { |
|
|
301 | $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1] |
|
|
302 | && &{$_->[0]} |
|
|
303 | && undef $_; |
|
|
304 | } |
|
|
305 | |
|
|
306 | for (@{ $self->{any} }) { |
|
|
307 | $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1] |
|
|
308 | && &{$_->[0]} |
|
|
309 | && undef $_; |
|
|
310 | } |
|
|
311 | }; |
|
|
312 | _self_die if $@; |
|
|
313 | }; |
248 | }; |
314 | } |
249 | |
315 | |
250 | "$NODE#$id" |
316 | $port |
251 | } |
317 | } |
252 | |
318 | |
253 | package AnyEvent::MP::Port; |
319 | =item reg $portid, $name |
254 | |
320 | |
255 | =back |
321 | Registers the given port under the name C<$name>. If the name already |
|
|
322 | exists it is replaced. |
256 | |
323 | |
257 | =head1 METHODS FOR PORT OBJECTS |
324 | A port can only be registered under one well known name. |
258 | |
325 | |
259 | =over 4 |
326 | A port automatically becomes unregistered when it is killed. |
260 | |
327 | |
261 | =item "$port" |
|
|
262 | |
|
|
263 | A port object stringifies to its port ID, so can be used directly for |
|
|
264 | C<snd> operations. |
|
|
265 | |
|
|
266 | =cut |
328 | =cut |
267 | |
329 | |
268 | use overload |
330 | sub reg(@) { |
269 | '""' => sub { $_[0]{id} }, |
331 | my ($portid, $name) = @_; |
270 | fallback => 1; |
|
|
271 | |
332 | |
272 | sub TO_JSON { $_[0]{id} } |
333 | $REG{$name} = $portid; |
|
|
334 | } |
273 | |
335 | |
274 | =item $port->rcv (type => $callback->($port, @msg)) |
336 | =item rcv $portid, tagstring => $callback->(@msg), ... |
275 | |
337 | |
276 | =item $port->rcv ($smartmatch => $callback->($port, @msg)) |
338 | =item rcv $portid, $smartmatch => $callback->(@msg), ... |
277 | |
339 | |
278 | =item $port->rcv ([$smartmatch...] => $callback->($port, @msg)) |
340 | =item rcv $portid, [$smartmatch...] => $callback->(@msg), ... |
279 | |
341 | |
280 | Register a callback on the given port. |
342 | Register callbacks to be called on matching messages on the given port. |
281 | |
343 | |
282 | The callback has to return a true value when its work is done, after |
344 | The callback has to return a true value when its work is done, after |
283 | which is will be removed, or a false value in which case it will stay |
345 | which is will be removed, or a false value in which case it will stay |
284 | registered. |
346 | registered. |
285 | |
347 | |
|
|
348 | The global C<$SELF> (exported by this module) contains C<$portid> while |
|
|
349 | executing the callback. |
|
|
350 | |
|
|
351 | Runtime errors wdurign callback execution will result in the port being |
|
|
352 | C<kil>ed. |
|
|
353 | |
286 | If the match is an array reference, then it will be matched against the |
354 | If the match is an array reference, then it will be matched against the |
287 | first elements of the message, otherwise only the first element is being |
355 | first elements of the message, otherwise only the first element is being |
288 | matched. |
356 | matched. |
289 | |
357 | |
290 | Any element in the match that is specified as C<_any_> (a function |
358 | Any element in the match that is specified as C<_any_> (a function |
… | |
… | |
295 | also the most efficient match (by far). |
363 | also the most efficient match (by far). |
296 | |
364 | |
297 | =cut |
365 | =cut |
298 | |
366 | |
299 | sub rcv($@) { |
367 | sub rcv($@) { |
300 | my ($self, $match, $cb) = @_; |
368 | my ($noderef, $port) = split /#/, shift, 2; |
301 | |
369 | |
|
|
370 | ($NODE{$noderef} || add_node $noderef) == $NODE{""} |
|
|
371 | or Carp::croak "$noderef#$port: rcv can only be called on local ports, caught"; |
|
|
372 | |
|
|
373 | my $self = $PORT_DATA{$port} |
|
|
374 | or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught"; |
|
|
375 | |
|
|
376 | "AnyEvent::MP::Port" eq ref $self |
|
|
377 | or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught"; |
|
|
378 | |
|
|
379 | while (@_) { |
|
|
380 | my ($match, $cb) = splice @_, 0, 2; |
|
|
381 | |
302 | if (!ref $match) { |
382 | if (!ref $match) { |
303 | push @{ $self->{rc0}{$match} }, [$cb]; |
383 | push @{ $self->{rc0}{$match} }, [$cb]; |
304 | } elsif (("ARRAY" eq ref $match && !ref $match->[0])) { |
384 | } elsif (("ARRAY" eq ref $match && !ref $match->[0])) { |
305 | my ($type, @match) = @$match; |
385 | my ($type, @match) = @$match; |
306 | @match |
386 | @match |
307 | ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match] |
387 | ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match] |
308 | : push @{ $self->{rc0}{$match->[0]} }, [$cb]; |
388 | : push @{ $self->{rc0}{$match->[0]} }, [$cb]; |
309 | } else { |
389 | } else { |
310 | push @{ $self->{any} }, [$cb, $match]; |
390 | push @{ $self->{any} }, [$cb, $match]; |
|
|
391 | } |
311 | } |
392 | } |
312 | } |
393 | } |
313 | |
394 | |
314 | =item $port->register ($name) |
395 | =item $closure = psub { BLOCK } |
315 | |
396 | |
316 | Registers the given port under the well known name C<$name>. If the name |
397 | Remembers C<$SELF> and creates a closure out of the BLOCK. When the |
317 | already exists it is replaced. |
398 | closure is executed, sets up the environment in the same way as in C<rcv> |
|
|
399 | callbacks, i.e. runtime errors will cause the port to get C<kil>ed. |
318 | |
400 | |
319 | A port can only be registered under one well known name. |
401 | This is useful when you register callbacks from C<rcv> callbacks: |
320 | |
402 | |
321 | =cut |
403 | rcv delayed_reply => sub { |
|
|
404 | my ($delay, @reply) = @_; |
|
|
405 | my $timer = AE::timer $delay, 0, psub { |
|
|
406 | snd @reply, $SELF; |
|
|
407 | }; |
|
|
408 | }; |
322 | |
409 | |
323 | sub register { |
|
|
324 | my ($self, $name) = @_; |
|
|
325 | |
|
|
326 | $self->{wkname} = $name; |
|
|
327 | $AnyEvent::MP::Base::WKP{$name} = "$self"; |
|
|
328 | } |
|
|
329 | |
|
|
330 | =item $port->destroy |
|
|
331 | |
|
|
332 | Explicitly destroy/remove/nuke/vaporise the port. |
|
|
333 | |
|
|
334 | Ports are normally kept alive by their mere existance alone, and need to |
|
|
335 | be destroyed explicitly. |
|
|
336 | |
|
|
337 | =cut |
410 | =cut |
338 | |
411 | |
339 | sub destroy { |
412 | sub psub(&) { |
340 | my ($self) = @_; |
413 | my $cb = shift; |
341 | |
414 | |
342 | delete $AnyEvent::MP::Base::WKP{ $self->{wkname} }; |
415 | my $port = $SELF |
|
|
416 | or Carp::croak "psub can only be called from within rcv or psub callbacks, not"; |
343 | |
417 | |
344 | AnyEvent::MP::Base::kil $self->{id}; |
418 | sub { |
|
|
419 | local $SELF = $port; |
|
|
420 | |
|
|
421 | if (wantarray) { |
|
|
422 | my @res = eval { &$cb }; |
|
|
423 | _self_die if $@; |
|
|
424 | @res |
|
|
425 | } else { |
|
|
426 | my $res = eval { &$cb }; |
|
|
427 | _self_die if $@; |
|
|
428 | $res |
|
|
429 | } |
|
|
430 | } |
345 | } |
431 | } |
346 | |
432 | |
347 | =back |
433 | =back |
348 | |
434 | |
349 | =head1 FUNCTIONS FOR NODES |
435 | =head1 FUNCTIONS FOR NODES |
350 | |
436 | |
351 | =over 4 |
437 | =over 4 |
352 | |
|
|
353 | =item mon $noderef, $callback->($noderef, $status, $) |
|
|
354 | |
|
|
355 | Monitors the given noderef. |
|
|
356 | |
438 | |
357 | =item become_public endpoint... |
439 | =item become_public endpoint... |
358 | |
440 | |
359 | Tells the node to become a public node, i.e. reachable from other nodes. |
441 | Tells the node to become a public node, i.e. reachable from other nodes. |
360 | |
442 | |
… | |
… | |
380 | |
462 | |
381 | =over 4 |
463 | =over 4 |
382 | |
464 | |
383 | =cut |
465 | =cut |
384 | |
466 | |
385 | =item wkp => $name, @reply |
467 | =item lookup => $name, @reply |
386 | |
468 | |
387 | Replies with the port ID of the specified well-known port, or C<undef>. |
469 | Replies with the port ID of the specified well-known port, or C<undef>. |
388 | |
470 | |
389 | =item devnull => ... |
471 | =item devnull => ... |
390 | |
472 | |