ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
(Generate patch)

Comparing AnyEvent-MP/MP.pm (file contents):
Revision 1.139 by root, Thu Mar 22 20:07:31 2012 UTC vs.
Revision 1.153 by root, Sat Nov 2 01:30:49 2019 UTC

47 # execute callbacks in $SELF port context 47 # execute callbacks in $SELF port context
48 my $timer = AE::timer 1, 0, psub { 48 my $timer = AE::timer 1, 0, psub {
49 die "kill the port, delayed"; 49 die "kill the port, delayed";
50 }; 50 };
51 51
52=head1 CURRENT STATUS 52 # distributed database - modification
53 db_set $family => $subkey [=> $value] # add a subkey
54 db_del $family => $subkey... # delete one or more subkeys
55 db_reg $family => $port [=> $value] # register a port
53 56
54 bin/aemp - stable. 57 # distributed database - queries
55 AnyEvent::MP - stable API, should work. 58 db_family $family => $cb->(\%familyhash)
56 AnyEvent::MP::Intro - explains most concepts. 59 db_keys $family => $cb->(\@keys)
57 AnyEvent::MP::Kernel - mostly stable API. 60 db_values $family => $cb->(\@values)
58 AnyEvent::MP::Global - stable API. 61
62 # distributed database - monitoring a family
63 db_mon $family => $cb->(\%familyhash, \@added, \@changed, \@deleted)
59 64
60=head1 DESCRIPTION 65=head1 DESCRIPTION
61 66
62This module (-family) implements a simple message passing framework. 67This module (-family) implements a simple message passing framework.
63 68
113each other. To do this, nodes should listen on one or more local transport 118each other. To do this, nodes should listen on one or more local transport
114endpoints - binds. 119endpoints - binds.
115 120
116Currently, only standard C<ip:port> specifications can be used, which 121Currently, only standard C<ip:port> specifications can be used, which
117specify TCP ports to listen on. So a bind is basically just a tcp socket 122specify TCP ports to listen on. So a bind is basically just a tcp socket
118in listening mode thta accepts conenctions form other nodes. 123in listening mode that accepts connections from other nodes.
119 124
120=item seed nodes 125=item seed nodes
121 126
122When a node starts, it knows nothing about the network it is in - it 127When a node starts, it knows nothing about the network it is in - it
123needs to connect to at least one other node that is already in the 128needs to connect to at least one other node that is already in the
124network. These other nodes are called "seed nodes". 129network. These other nodes are called "seed nodes".
125 130
126Seed nodes themselves are not special - they are seed nodes only because 131Seed nodes themselves are not special - they are seed nodes only because
127some other node I<uses> them as such, but any node can be used as seed 132some other node I<uses> them as such, but any node can be used as seed
128node for other nodes, and eahc node cna use a different set of seed nodes. 133node for other nodes, and eahc node can use a different set of seed nodes.
129 134
130In addition to discovering the network, seed nodes are also used to 135In addition to discovering the network, seed nodes are also used to
131maintain the network - all nodes using the same seed node form are part of 136maintain the network - all nodes using the same seed node are part of the
132the same network. If a network is split into multiple subnets because e.g. 137same network. If a network is split into multiple subnets because e.g. the
133the network link between the parts goes down, then using the same seed 138network link between the parts goes down, then using the same seed nodes
134nodes for all nodes ensures that eventually the subnets get merged again. 139for all nodes ensures that eventually the subnets get merged again.
135 140
136Seed nodes are expected to be long-running, and at least one seed node 141Seed nodes are expected to be long-running, and at least one seed node
137should always be available. They should also be relatively responsive - a 142should always be available. They should also be relatively responsive - a
138seed node that blocks for long periods will slow down everybody else. 143seed node that blocks for long periods will slow down everybody else.
139 144
163 168
164Any node that loads the L<AnyEvent::MP::Global> module becomes a global 169Any node that loads the L<AnyEvent::MP::Global> module becomes a global
165node and tries to keep connections to all other nodes. So while it can 170node and tries to keep connections to all other nodes. So while it can
166make sense to make every node "global" in small networks, it usually makes 171make sense to make every node "global" in small networks, it usually makes
167sense to only make seed nodes into global nodes in large networks (nodes 172sense to only make seed nodes into global nodes in large networks (nodes
168keep connections to seed nodes and global nodes, so makign them the same 173keep connections to seed nodes and global nodes, so making them the same
169reduces overhead). 174reduces overhead).
170 175
171=back 176=back
172 177
173=head1 VARIABLES/FUNCTIONS 178=head1 VARIABLES/FUNCTIONS
178 183
179package AnyEvent::MP; 184package AnyEvent::MP;
180 185
181use AnyEvent::MP::Config (); 186use AnyEvent::MP::Config ();
182use AnyEvent::MP::Kernel; 187use AnyEvent::MP::Kernel;
183use AnyEvent::MP::Kernel qw(%NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID); 188use AnyEvent::MP::Kernel qw(
189 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
190 add_node load_func
191
192 NODE $NODE
193 configure
194 node_of port_is_local
195 snd kil
196 db_set db_del
197 db_mon db_family db_keys db_values
198);
184 199
185use common::sense; 200use common::sense;
186 201
187use Carp (); 202use Carp ();
188 203
189use AE (); 204use AnyEvent ();
190use Guard (); 205use Guard ();
191 206
192use base "Exporter"; 207use base "Exporter";
193 208
194our $VERSION = $AnyEvent::MP::Config::VERSION; 209our $VERSION = '2.02'; # also in MP/Config.pm
195 210
196our @EXPORT = qw( 211our @EXPORT = qw(
197 NODE $NODE *SELF node_of after
198 configure 212 configure
213
214 NODE $NODE
215 *SELF
216
217 node_of port_is_local
218
219 snd kil
199 snd rcv mon mon_guard kil psub peval spawn cal 220 port rcv mon mon_guard psub peval spawn cal
200 port
201 db_set db_del db_reg 221 db_set db_del db_reg
202 db_mon db_family db_keys db_values 222 db_mon db_family db_keys db_values
223
224 after
203); 225);
204 226
205our $SELF; 227our $SELF;
206 228
207sub _self_die() { 229sub _self_die() {
218 240
219=item $nodeid = node_of $port 241=item $nodeid = node_of $port
220 242
221Extracts and returns the node ID from a port ID or a node ID. 243Extracts and returns the node ID from a port ID or a node ID.
222 244
245=item $is_local = port_is_local $port
246
247Returns true iff the port is a local port.
248
223=item configure $profile, key => value... 249=item configure $profile, key => value...
224 250
225=item configure key => value... 251=item configure key => value...
226 252
227Before a node can talk to other nodes on the network (i.e. enter 253Before a node can talk to other nodes on the network (i.e. enter
238=over 4 264=over 4
239 265
240=item norc => $boolean (default false) 266=item norc => $boolean (default false)
241 267
242If true, then the rc file (e.g. F<~/.perl-anyevent-mp>) will I<not> 268If true, then the rc file (e.g. F<~/.perl-anyevent-mp>) will I<not>
243be consulted - all configuraiton options must be specified in the 269be consulted - all configuration options must be specified in the
244C<configure> call. 270C<configure> call.
245 271
246=item force => $boolean (default false) 272=item force => $boolean (default false)
247 273
248IF true, then the values specified in the C<configure> will take 274IF true, then the values specified in the C<configure> will take
249precedence over any values configured via the rc file. The default is for 275precedence over any values configured via the rc file. The default is for
250the rc file to override any options specified in the program. 276the rc file to override any options specified in the program.
251
252=item secure => $pass->(@msg)
253
254In addition to specifying a boolean, you can specify a code reference that
255is called for every code execution attempt - the execution request is
256granted iff the callback returns a true value.
257
258Most of the time the callback should look only at
259C<$AnyEvent::MP::Kernel::SRCNODE> to make a decision, and not at the
260actual message (which can be about anything, and is mostly provided for
261diagnostic purposes).
262
263See F<semp setsecure> for more info.
264 277
265=back 278=back
266 279
267=over 4 280=over 4
268 281
297 310
298=item step 2, bind listener sockets 311=item step 2, bind listener sockets
299 312
300The next step is to look up the binds in the profile, followed by binding 313The next step is to look up the binds in the profile, followed by binding
301aemp protocol listeners on all binds specified (it is possible and valid 314aemp protocol listeners on all binds specified (it is possible and valid
302to have no binds, meaning that the node cannot be contacted form the 315to have no binds, meaning that the node cannot be contacted from the
303outside. This means the node cannot talk to other nodes that also have no 316outside. This means the node cannot talk to other nodes that also have no
304binds, but it can still talk to all "normal" nodes). 317binds, but it can still talk to all "normal" nodes).
305 318
306If the profile does not specify a binds list, then a default of C<*> is 319If the profile does not specify a binds list, then a default of C<*> is
307used, meaning the node will bind on a dynamically-assigned port on every 320used, meaning the node will bind on a dynamically-assigned port on every
404=cut 417=cut
405 418
406sub rcv($@); 419sub rcv($@);
407 420
408my $KILME = sub { 421my $KILME = sub {
409 (my $tag = substr $_[0], 0, 30) =~ s/([\x20-\x7e])/./g; 422 (my $tag = substr $_[0], 0, 30) =~ s/([^\x20-\x7e])/./g;
410 kil $SELF, unhandled_message => "no callback found for message '$tag'"; 423 kil $SELF, unhandled_message => "no callback found for message '$tag'";
411}; 424};
412 425
413sub port(;&) { 426sub port(;&) {
414 my $id = $UNIQ . ++$ID; 427 my $id = $UNIQ . ++$ID;
472 485
473sub rcv($@) { 486sub rcv($@) {
474 my $port = shift; 487 my $port = shift;
475 my ($nodeid, $portid) = split /#/, $port, 2; 488 my ($nodeid, $portid) = split /#/, $port, 2;
476 489
477 $NODE{$nodeid} == $NODE{""} 490 $nodeid eq $NODE
478 or Carp::croak "$port: rcv can only be called on local ports, caught"; 491 or Carp::croak "$port: rcv can only be called on local ports, caught";
479 492
480 while (@_) { 493 while (@_) {
481 if (ref $_[0]) { 494 if (ref $_[0]) {
482 if (my $self = $PORT_DATA{$portid}) { 495 if (my $self = $PORT_DATA{$portid}) {
525 $port 538 $port
526} 539}
527 540
528=item peval $port, $coderef[, @args] 541=item peval $port, $coderef[, @args]
529 542
530Evaluates the given C<$codref> within the contetx of C<$port>, that is, 543Evaluates the given C<$codref> within the context of C<$port>, that is,
531when the code throews an exception the C<$port> will be killed. 544when the code throws an exception the C<$port> will be killed.
532 545
533Any remaining args will be passed to the callback. Any return values will 546Any remaining args will be passed to the callback. Any return values will
534be returned to the caller. 547be returned to the caller.
535 548
536This is useful when you temporarily want to execute code in the context of 549This is useful when you temporarily want to execute code in the context of
1041=item db_values $family => $cb->(\@values) 1054=item db_values $family => $cb->(\@values)
1042 1055
1043Same as C<db_family>, except it only queries the family I<values> and passes them 1056Same as C<db_family>, except it only queries the family I<values> and passes them
1044as array reference to the callback. 1057as array reference to the callback.
1045 1058
1046=item $guard = db_mon $family => $cb->($familyhash, \@added, \@changed, \@deleted) 1059=item $guard = db_mon $family => $cb->(\%familyhash, \@added, \@changed, \@deleted)
1047 1060
1048Creates a monitor on the given database family. Each time a key is set 1061Creates a monitor on the given database family. Each time a key is
1049or or is deleted the callback is called with a hash containing the 1062set or is deleted the callback is called with a hash containing the
1050database family and three lists of added, changed and deleted subkeys, 1063database family and three lists of added, changed and deleted subkeys,
1051respectively. If no keys have changed then the array reference might be 1064respectively. If no keys have changed then the array reference might be
1052C<undef> or even missing. 1065C<undef> or even missing.
1053 1066
1054If not called in void context, a guard object is returned that, when 1067If not called in void context, a guard object is returned that, when
1082 return unless %$family; 1095 return unless %$family;
1083 undef $guard; 1096 undef $guard;
1084 print "My::Module::workers now nonempty\n"; 1097 print "My::Module::workers now nonempty\n";
1085 }; 1098 };
1086 1099
1087Example: print all changes to the family "AnyRvent::Fantasy::Module". 1100Example: print all changes to the family "AnyEvent::Fantasy::Module".
1088 1101
1089 my $guard = db_mon AnyRvent::Fantasy::Module => sub { 1102 my $guard = db_mon AnyEvent::Fantasy::Module => sub {
1090 my ($family, $a, $c, $d) = @_; 1103 my ($family, $a, $c, $d) = @_;
1091 1104
1092 print "+$_=$family->{$_}\n" for @$a; 1105 print "+$_=$family->{$_}\n" for @$a;
1093 print "*$_=$family->{$_}\n" for @$c; 1106 print "*$_=$family->{$_}\n" for @$c;
1094 print "-$_=$family->{$_}\n" for @$d; 1107 print "-$_=$family->{$_}\n" for @$d;
1145filter messages without dequeuing them. 1158filter messages without dequeuing them.
1146 1159
1147This is not a philosophical difference, but simply stems from AnyEvent::MP 1160This is not a philosophical difference, but simply stems from AnyEvent::MP
1148being event-based, while Erlang is process-based. 1161being event-based, while Erlang is process-based.
1149 1162
1150You cna have a look at L<Coro::MP> for a more Erlang-like process model on 1163You can have a look at L<Coro::MP> for a more Erlang-like process model on
1151top of AEMP and Coro threads. 1164top of AEMP and Coro threads.
1152 1165
1153=item * Erlang sends are synchronous, AEMP sends are asynchronous. 1166=item * Erlang sends are synchronous, AEMP sends are asynchronous.
1154 1167
1155Sending messages in Erlang is synchronous and blocks the process until 1168Sending messages in Erlang is synchronous and blocks the process until
1156a conenction has been established and the message sent (and so does not 1169a connection has been established and the message sent (and so does not
1157need a queue that can overflow). AEMP sends return immediately, connection 1170need a queue that can overflow). AEMP sends return immediately, connection
1158establishment is handled in the background. 1171establishment is handled in the background.
1159 1172
1160=item * Erlang suffers from silent message loss, AEMP does not. 1173=item * Erlang suffers from silent message loss, AEMP does not.
1161 1174
1267 1280
1268=over 4 1281=over 4
1269 1282
1270=item AnyEvent::MP::Global no longer has group management functions. 1283=item AnyEvent::MP::Global no longer has group management functions.
1271 1284
1285At least not officially - the grp_* functions are still exported and might
1286work, but they will be removed in some later release.
1287
1272AnyEvent::MP now comes with a distributed database that is more 1288AnyEvent::MP now comes with a distributed database that is more
1273powerful. Its database families map closely to port groups, but the API 1289powerful. Its database families map closely to port groups, but the API
1274has changed (the functions are also now exported by AnyEvent::MP). Here is 1290has changed (the functions are also now exported by AnyEvent::MP). Here is
1275a rough porting guide: 1291a rough porting guide:
1276 1292
1297C<grp_mon> can be replaced by C<db_mon> with minor changes - C<db_mon> 1313C<grp_mon> can be replaced by C<db_mon> with minor changes - C<db_mon>
1298passes a hash as first argument, and an extra C<$chg> argument that can be 1314passes a hash as first argument, and an extra C<$chg> argument that can be
1299ignored: 1315ignored:
1300 1316
1301 db_mon $group => sub { 1317 db_mon $group => sub {
1302 my ($ports, $add, $chg, $lde) = @_; 1318 my ($ports, $add, $chg, $del) = @_;
1303 $ports = [keys %$ports]; 1319 $ports = [keys %$ports];
1304 1320
1305 # now $ports, $add and $del are the same as 1321 # now $ports, $add and $del are the same as
1306 # were originally passed by grp_mon. 1322 # were originally passed by grp_mon.
1307 ... 1323 ...
1360 1376
1361=back 1377=back
1362 1378
1363=head1 LOGGING 1379=head1 LOGGING
1364 1380
1365AnyEvent::MP does not normally log anything by itself, but sinc eit is the 1381AnyEvent::MP does not normally log anything by itself, but since it is the
1366root of the contetx hierarchy for AnyEvent::MP modules, it will receive 1382root of the context hierarchy for AnyEvent::MP modules, it will receive
1367all log messages by submodules. 1383all log messages by submodules.
1368 1384
1369=head1 SEE ALSO 1385=head1 SEE ALSO
1370 1386
1371L<AnyEvent::MP::Intro> - a gentle introduction. 1387L<AnyEvent::MP::Intro> - a gentle introduction.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines