ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Intro.pod
(Generate patch)

Comparing AnyEvent-MP/MP/Intro.pod (file contents):
Revision 1.41 by root, Tue Oct 6 18:38:25 2009 UTC vs.
Revision 1.42 by root, Tue Oct 6 19:39:04 2009 UTC

1047irc backend running. 1047irc backend running.
1048 1048
1049That's plenty of possibilities you can use - it's all up to you how you 1049That's plenty of possibilities you can use - it's all up to you how you
1050structure your application. 1050structure your application.
1051 1051
1052=head1 PART 4: Coro::MP - selective receive
1053
1054Not all problems lend themselves naturally to an event-based solution:
1055sometimes things are easier if you can decide in what order you want to
1056receive messages, irregardless of the order in which they were sent.
1057
1058In these cases, L<Coro::MP> can provide a nice solution: instead of
1059registering callbacks for each message type, C<Coro::MP> attached a
1060(coro-) thread to a port. The thread can then opt to selectively receive
1061messages it is interested in. Other messages are not lost, but queued, and
1062can be received at a later time.
1063
1064The C<Coro::MP> module is not part of L<AnyEvent::MP>, but a seperate
1065module. It is, however, tightly integrated into C<AnyEvent::MP> - the
1066ports it creates are fully compatible to C<AnyEvent::MP> ports.
1067
1068In fact, C<Coro::MP> is more of an extension than a separate module: all
1069functions exported by C<AnyEvent::MP> are exported by it as well.
1070
1071To illustrate how programing with C<Coro::MP> looks like, consider the
1072following (slightly contrived) example: Let's implement a server that
1073accepts a C<< (write_file =>, $port, $path) >> message with a (source)
1074port and a filename, followed by as many C<< (data => $port, $data) >>
1075messages as required to fill the file, followed by an empty C<< (data =>
1076$port) >> message.
1077
1078The server only writes a single file at a time, other requests will stay
1079in the queue until the current file has been finished.
1080
1081Here is an example implementation that uses L<Coro::AIO> and largely
1082ignores error handling:
1083
1084 my $ioserver = port_async {
1085 while () {
1086 my ($tag, $port, $path) = get_cond;
1087
1088 $tag eq "write_file"
1089 or die "only write_file messages expected";
1090
1091 my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
1092 or die "$path: $!";
1093
1094 while () {
1095 my (undef, undef, $data) = get_cond {
1096 $_[0] eq "data" && $_[1] eq $port
1097 } 5
1098 or die "timeout waiting for data message from $port\n";
1099
1100 length $data or last;
1101
1102 aio_write $fh, undef, undef, $data, 0;
1103 };
1104 }
1105 };
1106
1107 mon $ioserver, sub {
1108 warn "ioserver was killed: @_\n";
1109 };
1110
1111Let's go through it part by part.
1112
1113 my $ioserver = port_async {
1114
1115Ports cna be created by attaching a thread to an existing port via
1116C<rcv_async>, or as here by calling C<port_async> with the code to exeucte
1117as a thread. The C<async> component comes from the fact that threads are
1118created using the C<Coro::async> function.
1119
1120The thread runs in a normal port context (so C<$SELF> is set). In
1121addition, when the thread returns, it will be C<kil> I<normally>, i.e.
1122without a reason argument.
1123
1124 while () {
1125 my ($tag, $port, $path) = get_cond;
1126 or die "only write_file messages expected";
1127
1128The thread is supposed to serve many file writes, which is why it executes
1129in a loop. The first thing it does is fetch the next message, using
1130C<get_cond>, the "conditional message get". Without a condition, it simply
1131fetches the next message from the queue, which I<must> be a C<write_file>
1132message.
1133
1134The message contains the C<$path> to the file, which is then created:
1135
1136 my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
1137 or die "$path: $!";
1138
1139Then we enter a loop again, to serve as many C<data> messages as
1140neccessary:
1141
1142 while () {
1143 my (undef, undef, $data) = get_cond {
1144 $_[0] eq "data" && $_[1] eq $port
1145 } 5
1146 or die "timeout waiting for data message from $port\n";
1147
1148This time, the condition is not empty, but instead a code block: similarly
1149to grep, the code block will be called with C<@_> set to each message in
1150the queue, and it has to return whether it wants to receive the message or
1151not.
1152
1153In this case we are interested in C<data> messages (C<< $_[0] eq "data"
1154>>), whose first element is the source port (C<< $_[1] eq $port >>).
1155
1156The condition must be this strict, as it is possible to receive both
1157C<write_file> messages and C<data> messages from other ports while we
1158handle the file writing.
1159
1160The lone C<5> at the end is a timeout - when no matching message is
1161received within C<5> seconds, we assume an error and C<die>.
1162
1163When an empty C<data> message is received we are done and can close the
1164file (which is done automatically as C<$fh> goes out of scope):
1165
1166 length $data or last;
1167
1168Otherwise we need to write the data:
1169
1170 aio_write $fh, undef, undef, $data, 0;
1171
1172That's basically it. Note that every process should ahve some kind of
1173supervisor. In our case, the supervisor simply prints any error message:
1174
1175 mon $ioserver, sub {
1176 warn "ioserver was killed: @_\n";
1177 };
1178
1179Here is a usage example:
1180
1181 port_async {
1182 snd $ioserver, write_file => $SELF, "/tmp/unsafe";
1183 snd $ioserver, data => $SELF, "abc\n";
1184 snd $ioserver, data => $SELF, "def\n";
1185 snd $ioserver, data => $SELF;
1186 };
1187
1188The messages are sent without any flow control or acknowledgement (feel
1189free to improve). Also, the source port does not actually need to be a
1190port - any unique ID will do - but port identifiers happen to be a simple
1191source of network-wide unique IDs.
1192
1193Apart from C<get_cond> as seen above, there are other ways to receive
1194messages. The C<write_file> message above could also selectively be
1195received using a C<get> call:
1196
1197 my ($port, $path) = get "write_file";
1198
1199This is simpler, but when some other code part sends an unexpected message
1200to the C<$ioserver> it will stay in the queue forever. As a rule of thumb,
1201every threaded port should have a "fetch next message unconditionally"
1202somewhere, to avoid filling up the queue.
1203
1204It is also possible to switch-like C<get_conds>:
1205
1206 get_cond {
1207 $_[0] eq "msg1" and return sub {
1208 my (undef, @msg1_data) = @_;
1209 ...;
1210 };
1211
1212 $_[0] eq "msg2" and return sub {
1213 my (undef, @msg2_data) = @_;
1214 ...;
1215 };
1216
1217 die "unexpected message $_[0] received";
1218 };
1219
1052=head1 THE END 1220=head1 THE END
1053 1221
1054This is the end of this introduction, but hopefully not the end of 1222This is the end of this introduction, but hopefully not the end of
1055your career als AEMP user. I hope the tutorial was enough to make the 1223your career als AEMP user. I hope the tutorial was enough to make the
1056basic concepts clear. Keep in mind that distributed programming is not 1224basic concepts clear. Keep in mind that distributed programming is not
1061 1229
1062L<AnyEvent::MP> 1230L<AnyEvent::MP>
1063 1231
1064L<AnyEvent::MP::Global> 1232L<AnyEvent::MP::Global>
1065 1233
1234L<Coro::MP>
1235
1066L<AnyEvent> 1236L<AnyEvent>
1067 1237
1068=head1 AUTHOR 1238=head1 AUTHOR
1069 1239
1070 Robin Redeker <elmex@ta-sa.org> 1240 Robin Redeker <elmex@ta-sa.org>

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines