… | |
… | |
1047 | irc backend running. |
1047 | irc backend running. |
1048 | |
1048 | |
1049 | That's plenty of possibilities you can use - it's all up to you how you |
1049 | That's plenty of possibilities you can use - it's all up to you how you |
1050 | structure your application. |
1050 | structure your application. |
1051 | |
1051 | |
|
|
1052 | =head1 PART 4: Coro::MP - selective receive |
|
|
1053 | |
|
|
1054 | Not all problems lend themselves naturally to an event-based solution: |
|
|
1055 | sometimes things are easier if you can decide in what order you want to |
|
|
1056 | receive messages, irregardless of the order in which they were sent. |
|
|
1057 | |
|
|
1058 | In these cases, L<Coro::MP> can provide a nice solution: instead of |
|
|
1059 | registering callbacks for each message type, C<Coro::MP> attached a |
|
|
1060 | (coro-) thread to a port. The thread can then opt to selectively receive |
|
|
1061 | messages it is interested in. Other messages are not lost, but queued, and |
|
|
1062 | can be received at a later time. |
|
|
1063 | |
|
|
1064 | The C<Coro::MP> module is not part of L<AnyEvent::MP>, but a seperate |
|
|
1065 | module. It is, however, tightly integrated into C<AnyEvent::MP> - the |
|
|
1066 | ports it creates are fully compatible to C<AnyEvent::MP> ports. |
|
|
1067 | |
|
|
1068 | In fact, C<Coro::MP> is more of an extension than a separate module: all |
|
|
1069 | functions exported by C<AnyEvent::MP> are exported by it as well. |
|
|
1070 | |
|
|
1071 | To illustrate how programing with C<Coro::MP> looks like, consider the |
|
|
1072 | following (slightly contrived) example: Let's implement a server that |
|
|
1073 | accepts a C<< (write_file =>, $port, $path) >> message with a (source) |
|
|
1074 | port and a filename, followed by as many C<< (data => $port, $data) >> |
|
|
1075 | messages as required to fill the file, followed by an empty C<< (data => |
|
|
1076 | $port) >> message. |
|
|
1077 | |
|
|
1078 | The server only writes a single file at a time, other requests will stay |
|
|
1079 | in the queue until the current file has been finished. |
|
|
1080 | |
|
|
1081 | Here is an example implementation that uses L<Coro::AIO> and largely |
|
|
1082 | ignores error handling: |
|
|
1083 | |
|
|
1084 | my $ioserver = port_async { |
|
|
1085 | while () { |
|
|
1086 | my ($tag, $port, $path) = get_cond; |
|
|
1087 | |
|
|
1088 | $tag eq "write_file" |
|
|
1089 | or die "only write_file messages expected"; |
|
|
1090 | |
|
|
1091 | my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666 |
|
|
1092 | or die "$path: $!"; |
|
|
1093 | |
|
|
1094 | while () { |
|
|
1095 | my (undef, undef, $data) = get_cond { |
|
|
1096 | $_[0] eq "data" && $_[1] eq $port |
|
|
1097 | } 5 |
|
|
1098 | or die "timeout waiting for data message from $port\n"; |
|
|
1099 | |
|
|
1100 | length $data or last; |
|
|
1101 | |
|
|
1102 | aio_write $fh, undef, undef, $data, 0; |
|
|
1103 | }; |
|
|
1104 | } |
|
|
1105 | }; |
|
|
1106 | |
|
|
1107 | mon $ioserver, sub { |
|
|
1108 | warn "ioserver was killed: @_\n"; |
|
|
1109 | }; |
|
|
1110 | |
|
|
1111 | Let's go through it part by part. |
|
|
1112 | |
|
|
1113 | my $ioserver = port_async { |
|
|
1114 | |
|
|
1115 | Ports cna be created by attaching a thread to an existing port via |
|
|
1116 | C<rcv_async>, or as here by calling C<port_async> with the code to exeucte |
|
|
1117 | as a thread. The C<async> component comes from the fact that threads are |
|
|
1118 | created using the C<Coro::async> function. |
|
|
1119 | |
|
|
1120 | The thread runs in a normal port context (so C<$SELF> is set). In |
|
|
1121 | addition, when the thread returns, it will be C<kil> I<normally>, i.e. |
|
|
1122 | without a reason argument. |
|
|
1123 | |
|
|
1124 | while () { |
|
|
1125 | my ($tag, $port, $path) = get_cond; |
|
|
1126 | or die "only write_file messages expected"; |
|
|
1127 | |
|
|
1128 | The thread is supposed to serve many file writes, which is why it executes |
|
|
1129 | in a loop. The first thing it does is fetch the next message, using |
|
|
1130 | C<get_cond>, the "conditional message get". Without a condition, it simply |
|
|
1131 | fetches the next message from the queue, which I<must> be a C<write_file> |
|
|
1132 | message. |
|
|
1133 | |
|
|
1134 | The message contains the C<$path> to the file, which is then created: |
|
|
1135 | |
|
|
1136 | my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666 |
|
|
1137 | or die "$path: $!"; |
|
|
1138 | |
|
|
1139 | Then we enter a loop again, to serve as many C<data> messages as |
|
|
1140 | neccessary: |
|
|
1141 | |
|
|
1142 | while () { |
|
|
1143 | my (undef, undef, $data) = get_cond { |
|
|
1144 | $_[0] eq "data" && $_[1] eq $port |
|
|
1145 | } 5 |
|
|
1146 | or die "timeout waiting for data message from $port\n"; |
|
|
1147 | |
|
|
1148 | This time, the condition is not empty, but instead a code block: similarly |
|
|
1149 | to grep, the code block will be called with C<@_> set to each message in |
|
|
1150 | the queue, and it has to return whether it wants to receive the message or |
|
|
1151 | not. |
|
|
1152 | |
|
|
1153 | In this case we are interested in C<data> messages (C<< $_[0] eq "data" |
|
|
1154 | >>), whose first element is the source port (C<< $_[1] eq $port >>). |
|
|
1155 | |
|
|
1156 | The condition must be this strict, as it is possible to receive both |
|
|
1157 | C<write_file> messages and C<data> messages from other ports while we |
|
|
1158 | handle the file writing. |
|
|
1159 | |
|
|
1160 | The lone C<5> at the end is a timeout - when no matching message is |
|
|
1161 | received within C<5> seconds, we assume an error and C<die>. |
|
|
1162 | |
|
|
1163 | When an empty C<data> message is received we are done and can close the |
|
|
1164 | file (which is done automatically as C<$fh> goes out of scope): |
|
|
1165 | |
|
|
1166 | length $data or last; |
|
|
1167 | |
|
|
1168 | Otherwise we need to write the data: |
|
|
1169 | |
|
|
1170 | aio_write $fh, undef, undef, $data, 0; |
|
|
1171 | |
|
|
1172 | That's basically it. Note that every process should ahve some kind of |
|
|
1173 | supervisor. In our case, the supervisor simply prints any error message: |
|
|
1174 | |
|
|
1175 | mon $ioserver, sub { |
|
|
1176 | warn "ioserver was killed: @_\n"; |
|
|
1177 | }; |
|
|
1178 | |
|
|
1179 | Here is a usage example: |
|
|
1180 | |
|
|
1181 | port_async { |
|
|
1182 | snd $ioserver, write_file => $SELF, "/tmp/unsafe"; |
|
|
1183 | snd $ioserver, data => $SELF, "abc\n"; |
|
|
1184 | snd $ioserver, data => $SELF, "def\n"; |
|
|
1185 | snd $ioserver, data => $SELF; |
|
|
1186 | }; |
|
|
1187 | |
|
|
1188 | The messages are sent without any flow control or acknowledgement (feel |
|
|
1189 | free to improve). Also, the source port does not actually need to be a |
|
|
1190 | port - any unique ID will do - but port identifiers happen to be a simple |
|
|
1191 | source of network-wide unique IDs. |
|
|
1192 | |
|
|
1193 | Apart from C<get_cond> as seen above, there are other ways to receive |
|
|
1194 | messages. The C<write_file> message above could also selectively be |
|
|
1195 | received using a C<get> call: |
|
|
1196 | |
|
|
1197 | my ($port, $path) = get "write_file"; |
|
|
1198 | |
|
|
1199 | This is simpler, but when some other code part sends an unexpected message |
|
|
1200 | to the C<$ioserver> it will stay in the queue forever. As a rule of thumb, |
|
|
1201 | every threaded port should have a "fetch next message unconditionally" |
|
|
1202 | somewhere, to avoid filling up the queue. |
|
|
1203 | |
|
|
1204 | It is also possible to switch-like C<get_conds>: |
|
|
1205 | |
|
|
1206 | get_cond { |
|
|
1207 | $_[0] eq "msg1" and return sub { |
|
|
1208 | my (undef, @msg1_data) = @_; |
|
|
1209 | ...; |
|
|
1210 | }; |
|
|
1211 | |
|
|
1212 | $_[0] eq "msg2" and return sub { |
|
|
1213 | my (undef, @msg2_data) = @_; |
|
|
1214 | ...; |
|
|
1215 | }; |
|
|
1216 | |
|
|
1217 | die "unexpected message $_[0] received"; |
|
|
1218 | }; |
|
|
1219 | |
1052 | =head1 THE END |
1220 | =head1 THE END |
1053 | |
1221 | |
1054 | This is the end of this introduction, but hopefully not the end of |
1222 | This is the end of this introduction, but hopefully not the end of |
1055 | your career als AEMP user. I hope the tutorial was enough to make the |
1223 | your career als AEMP user. I hope the tutorial was enough to make the |
1056 | basic concepts clear. Keep in mind that distributed programming is not |
1224 | basic concepts clear. Keep in mind that distributed programming is not |
… | |
… | |
1061 | |
1229 | |
1062 | L<AnyEvent::MP> |
1230 | L<AnyEvent::MP> |
1063 | |
1231 | |
1064 | L<AnyEvent::MP::Global> |
1232 | L<AnyEvent::MP::Global> |
1065 | |
1233 | |
|
|
1234 | L<Coro::MP> |
|
|
1235 | |
1066 | L<AnyEvent> |
1236 | L<AnyEvent> |
1067 | |
1237 | |
1068 | =head1 AUTHOR |
1238 | =head1 AUTHOR |
1069 | |
1239 | |
1070 | Robin Redeker <elmex@ta-sa.org> |
1240 | Robin Redeker <elmex@ta-sa.org> |