ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
(Generate patch)

Comparing AnyEvent-MP/MP.pm (file contents):
Revision 1.8 by root, Sun Aug 2 14:44:37 2009 UTC vs.
Revision 1.21 by root, Tue Aug 4 14:10:51 2009 UTC

28 28
29Despite its simplicity, you can securely message other processes running 29Despite its simplicity, you can securely message other processes running
30on the same or other hosts. 30on the same or other hosts.
31 31
32At the moment, this module family is severly brokena nd underdocumented, 32At the moment, this module family is severly brokena nd underdocumented,
33so do not use. This was uploaded mainly to resreve the CPAN namespace - 33so do not use. This was uploaded mainly to reserve the CPAN namespace -
34stay tuned! 34stay tuned!
35 35
36=head1 CONCEPTS 36=head1 CONCEPTS
37 37
38=over 4 38=over 4
82 82
83use AE (); 83use AE ();
84 84
85use base "Exporter"; 85use base "Exporter";
86 86
87our $VERSION = '0.01'; 87our $VERSION = '0.02';
88our @EXPORT = qw( 88our @EXPORT = qw(
89 NODE $NODE $PORT snd rcv _any_ 89 NODE $NODE $PORT snd rcv mon kil _any_
90 create_port create_port_on 90 create_port create_port_on
91 miniport
91 become_slave become_public 92 become_slave become_public
92); 93);
93 94
94=item NODE / $NODE 95=item NODE / $NODE
95 96
117JSON is used, then only strings, numbers and arrays and hashes consisting 118JSON is used, then only strings, numbers and arrays and hashes consisting
118of those are allowed (no objects). When Storable is used, then anything 119of those are allowed (no objects). When Storable is used, then anything
119that Storable can serialise and deserialise is allowed, and for the local 120that Storable can serialise and deserialise is allowed, and for the local
120node, anything can be passed. 121node, anything can be passed.
121 122
123=item $guard = mon $portid, $cb->()
124
125=item $guard = mon $portid, $otherport
126
127=item $guard = mon $portid, $otherport, @msg
128
129Monitor the given port and call the given callback when the port is
130destroyed or connection to it's node is lost.
131
132#TODO
133
134=cut
135
136sub mon {
137 my ($noderef, $port, $cb) = ((split /#/, shift, 2), shift);
138
139 my $node = AnyEvent::MP::Base::add_node $noderef;
140
141 #TODO: ports must not be references
142 if (!ref $cb or "AnyEvent::MP::Port" eq ref $cb) {
143 if (@_) {
144 # send a kill info message
145 my (@msg) = ($cb, @_);
146 $cb = sub { snd @msg, @_ };
147 } else {
148 # simply kill other port
149 my $port = $cb;
150 $cb = sub { kil $port, @_ };
151 }
152 }
153
154 $node->monitor ($port, $cb);
155
156 defined wantarray
157 and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) }
158}
159
160=item $guard = mon_guard $port, $ref, $ref...
161
162Monitors the given C<$port> and keeps the passed references. When the port
163is killed, the references will be freed.
164
165Optionally returns a guard that will stop the monitoring.
166
167This function is useful when you create e.g. timers or other watchers and
168want to free them when the port gets killed:
169
170 $port->rcv (start => sub {
171 my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub {
172 undef $timer if 0.9 < rand;
173 });
174 });
175
176=cut
177
178sub mon_guard {
179 my ($port, @refs) = @_;
180
181 mon $port, sub { 0 && @refs }
182}
183
122=item $local_port = create_port 184=item $local_port = create_port
123 185
124Create a new local port object. See the next section for allowed methods. 186Create a new local port object. See the next section for allowed methods.
125 187
126=cut 188=cut
127 189
128sub create_port { 190sub create_port {
129 my $id = "$AnyEvent::MP::Base::UNIQ." . ++$AnyEvent::MP::Base::ID; 191 my $id = "$AnyEvent::MP::Base::UNIQ." . $AnyEvent::MP::Base::ID++;
130 192
131 my $self = bless { 193 my $self = bless {
132 id => "$NODE#$id", 194 id => "$NODE#$id",
133 names => [$id],
134 }, "AnyEvent::MP::Port"; 195 }, "AnyEvent::MP::Port";
135 196
136 $AnyEvent::MP::Base::PORT{$id} = sub { 197 $AnyEvent::MP::Base::PORT{$id} = sub {
137 unshift @_, $self; 198 unshift @_, $self;
138 199
155 }; 216 };
156 217
157 $self 218 $self
158} 219}
159 220
221=item $portid = miniport { my @msg = @_; $finished }
222
223Creates a "mini port", that is, a very lightweight port without any
224pattern matching behind it, and returns its ID.
225
226The block will be called for every message received on the port. When the
227callback returns a true value its job is considered "done" and the port
228will be destroyed. Otherwise it will stay alive.
229
230The message will be passed as-is, no extra argument (i.e. no port id) will
231be passed to the callback.
232
233If you need the local port id in the callback, this works nicely:
234
235 my $port; $port = miniport {
236 snd $otherport, reply => $port;
237 };
238
239=cut
240
241sub miniport(&) {
242 my $cb = shift;
243 my $id = "$AnyEvent::MP::Base::UNIQ." . $AnyEvent::MP::Base::ID++;
244
245 $AnyEvent::MP::Base::PORT{$id} = sub {
246 &$cb
247 and kil $id;
248 };
249
250 "$NODE#$id"
251}
252
160package AnyEvent::MP::Port; 253package AnyEvent::MP::Port;
161 254
162=back 255=back
163 256
164=head1 METHODS FOR PORT OBJECTS 257=head1 METHODS FOR PORT OBJECTS
173=cut 266=cut
174 267
175use overload 268use overload
176 '""' => sub { $_[0]{id} }, 269 '""' => sub { $_[0]{id} },
177 fallback => 1; 270 fallback => 1;
271
272sub TO_JSON { $_[0]{id} }
178 273
179=item $port->rcv (type => $callback->($port, @msg)) 274=item $port->rcv (type => $callback->($port, @msg))
180 275
181=item $port->rcv ($smartmatch => $callback->($port, @msg)) 276=item $port->rcv ($smartmatch => $callback->($port, @msg))
182 277
234 329
235=item $port->destroy 330=item $port->destroy
236 331
237Explicitly destroy/remove/nuke/vaporise the port. 332Explicitly destroy/remove/nuke/vaporise the port.
238 333
239Ports are normally kept alive by there mere existance alone, and need to 334Ports are normally kept alive by their mere existance alone, and need to
240be destroyed explicitly. 335be destroyed explicitly.
241 336
242=cut 337=cut
243 338
244sub destroy { 339sub destroy {
245 my ($self) = @_; 340 my ($self) = @_;
246 341
247 delete $AnyEvent::MP::Base::WKP{ $self->{wkname} }; 342 delete $AnyEvent::MP::Base::WKP{ $self->{wkname} };
248 343
249 delete $AnyEvent::MP::Base::PORT{$_} 344 AnyEvent::MP::Base::kil $self->{id};
250 for @{ $self->{names} };
251} 345}
252 346
253=back 347=back
254 348
255=head1 FUNCTIONS FOR NODES 349=head1 FUNCTIONS FOR NODES

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines