ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
(Generate patch)

Comparing AnyEvent-MP/MP.pm (file contents):
Revision 1.10 by root, Sun Aug 2 18:05:43 2009 UTC vs.
Revision 1.21 by root, Tue Aug 4 14:10:51 2009 UTC

28 28
29Despite its simplicity, you can securely message other processes running 29Despite its simplicity, you can securely message other processes running
30on the same or other hosts. 30on the same or other hosts.
31 31
32At the moment, this module family is severly brokena nd underdocumented, 32At the moment, this module family is severly brokena nd underdocumented,
33so do not use. This was uploaded mainly to resreve the CPAN namespace - 33so do not use. This was uploaded mainly to reserve the CPAN namespace -
34stay tuned! 34stay tuned!
35 35
36=head1 CONCEPTS 36=head1 CONCEPTS
37 37
38=over 4 38=over 4
84 84
85use base "Exporter"; 85use base "Exporter";
86 86
87our $VERSION = '0.02'; 87our $VERSION = '0.02';
88our @EXPORT = qw( 88our @EXPORT = qw(
89 NODE $NODE $PORT snd rcv _any_ 89 NODE $NODE $PORT snd rcv mon kil _any_
90 create_port create_port_on 90 create_port create_port_on
91 miniport
91 become_slave become_public 92 become_slave become_public
92); 93);
93 94
94=item NODE / $NODE 95=item NODE / $NODE
95 96
117JSON is used, then only strings, numbers and arrays and hashes consisting 118JSON is used, then only strings, numbers and arrays and hashes consisting
118of those are allowed (no objects). When Storable is used, then anything 119of those are allowed (no objects). When Storable is used, then anything
119that Storable can serialise and deserialise is allowed, and for the local 120that Storable can serialise and deserialise is allowed, and for the local
120node, anything can be passed. 121node, anything can be passed.
121 122
123=item $guard = mon $portid, $cb->()
124
125=item $guard = mon $portid, $otherport
126
127=item $guard = mon $portid, $otherport, @msg
128
129Monitor the given port and call the given callback when the port is
130destroyed or connection to it's node is lost.
131
132#TODO
133
134=cut
135
136sub mon {
137 my ($noderef, $port, $cb) = ((split /#/, shift, 2), shift);
138
139 my $node = AnyEvent::MP::Base::add_node $noderef;
140
141 #TODO: ports must not be references
142 if (!ref $cb or "AnyEvent::MP::Port" eq ref $cb) {
143 if (@_) {
144 # send a kill info message
145 my (@msg) = ($cb, @_);
146 $cb = sub { snd @msg, @_ };
147 } else {
148 # simply kill other port
149 my $port = $cb;
150 $cb = sub { kil $port, @_ };
151 }
152 }
153
154 $node->monitor ($port, $cb);
155
156 defined wantarray
157 and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) }
158}
159
160=item $guard = mon_guard $port, $ref, $ref...
161
162Monitors the given C<$port> and keeps the passed references. When the port
163is killed, the references will be freed.
164
165Optionally returns a guard that will stop the monitoring.
166
167This function is useful when you create e.g. timers or other watchers and
168want to free them when the port gets killed:
169
170 $port->rcv (start => sub {
171 my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub {
172 undef $timer if 0.9 < rand;
173 });
174 });
175
176=cut
177
178sub mon_guard {
179 my ($port, @refs) = @_;
180
181 mon $port, sub { 0 && @refs }
182}
183
122=item $local_port = create_port 184=item $local_port = create_port
123 185
124Create a new local port object. See the next section for allowed methods. 186Create a new local port object. See the next section for allowed methods.
125 187
126=cut 188=cut
127 189
128sub create_port { 190sub create_port {
129 my $id = "$AnyEvent::MP::Base::UNIQ." . ++$AnyEvent::MP::Base::ID; 191 my $id = "$AnyEvent::MP::Base::UNIQ." . $AnyEvent::MP::Base::ID++;
130 192
131 my $self = bless { 193 my $self = bless {
132 id => "$NODE#$id", 194 id => "$NODE#$id",
133 names => [$id],
134 }, "AnyEvent::MP::Port"; 195 }, "AnyEvent::MP::Port";
135 196
136 $AnyEvent::MP::Base::PORT{$id} = sub { 197 $AnyEvent::MP::Base::PORT{$id} = sub {
137 unshift @_, $self; 198 unshift @_, $self;
138 199
155 }; 216 };
156 217
157 $self 218 $self
158} 219}
159 220
160=item $portid = create_miniport { } 221=item $portid = miniport { my @msg = @_; $finished }
161 222
162Creates a "mini port", that is, a port without much #TODO 223Creates a "mini port", that is, a very lightweight port without any
224pattern matching behind it, and returns its ID.
163 225
164=cut 226The block will be called for every message received on the port. When the
227callback returns a true value its job is considered "done" and the port
228will be destroyed. Otherwise it will stay alive.
165 229
230The message will be passed as-is, no extra argument (i.e. no port id) will
231be passed to the callback.
232
233If you need the local port id in the callback, this works nicely:
234
235 my $port; $port = miniport {
236 snd $otherport, reply => $port;
237 };
238
239=cut
240
166sub create_miniport(&) { 241sub miniport(&) {
167 my $cb = shift; 242 my $cb = shift;
168 my $id = "$AnyEvent::MP::Base::UNIQ." . ++$AnyEvent::MP::Base::ID; 243 my $id = "$AnyEvent::MP::Base::UNIQ." . $AnyEvent::MP::Base::ID++;
169 244
170 $AnyEvent::MP::Base::PORT{$id} = sub { 245 $AnyEvent::MP::Base::PORT{$id} = sub {
171 &$cb 246 &$cb
172 and delete $AnyEvent::MP::Base::PORT{$id}; 247 and kil $id;
173 }; 248 };
174 249
175 "$NODE#$id" 250 "$NODE#$id"
176} 251}
177 252
191=cut 266=cut
192 267
193use overload 268use overload
194 '""' => sub { $_[0]{id} }, 269 '""' => sub { $_[0]{id} },
195 fallback => 1; 270 fallback => 1;
271
272sub TO_JSON { $_[0]{id} }
196 273
197=item $port->rcv (type => $callback->($port, @msg)) 274=item $port->rcv (type => $callback->($port, @msg))
198 275
199=item $port->rcv ($smartmatch => $callback->($port, @msg)) 276=item $port->rcv ($smartmatch => $callback->($port, @msg))
200 277
252 329
253=item $port->destroy 330=item $port->destroy
254 331
255Explicitly destroy/remove/nuke/vaporise the port. 332Explicitly destroy/remove/nuke/vaporise the port.
256 333
257Ports are normally kept alive by there mere existance alone, and need to 334Ports are normally kept alive by their mere existance alone, and need to
258be destroyed explicitly. 335be destroyed explicitly.
259 336
260=cut 337=cut
261 338
262sub destroy { 339sub destroy {
263 my ($self) = @_; 340 my ($self) = @_;
264 341
265 delete $AnyEvent::MP::Base::WKP{ $self->{wkname} }; 342 delete $AnyEvent::MP::Base::WKP{ $self->{wkname} };
266 343
267 delete $AnyEvent::MP::Base::PORT{$_} 344 AnyEvent::MP::Base::kil $self->{id};
268 for @{ $self->{names} };
269} 345}
270 346
271=back 347=back
272 348
273=head1 FUNCTIONS FOR NODES 349=head1 FUNCTIONS FOR NODES

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines