… | |
… | |
1056 | "the other side" from the remote node, and all will be cleaned up just |
1056 | "the other side" from the remote node, and all will be cleaned up just |
1057 | fine. |
1057 | fine. |
1058 | |
1058 | |
1059 | =head2 Services |
1059 | =head2 Services |
1060 | |
1060 | |
1061 | Above it was mentioned that C<spawn> automatically loads modules, and this |
1061 | Above it was mentioned that C<spawn> automatically loads modules. This can |
1062 | can be exploited in various ways. |
1062 | be exploited in various useful ways. |
1063 | |
1063 | |
1064 | Assume for a moment you put the server into a file called |
1064 | Assume for a moment you put the server into a file called |
1065 | F<mymod/chatserver.pm> reachable from the current directory. Then you |
1065 | F<mymod/chatserver.pm> reachable from the current directory. Then you |
1066 | could run a node there with: |
1066 | could run a node there with: |
1067 | |
1067 | |
1068 | aemp run |
1068 | aemp run |
1069 | |
1069 | |
1070 | The other nodes could C<spawn> the server by using |
1070 | The other nodes could C<spawn> the server by using |
1071 | C<mymod::chatserver::client_connect> as init function. |
1071 | C<mymod::chatserver::client_connect> as init function - without any other |
|
|
1072 | configuration. |
1072 | |
1073 | |
1073 | Likewise, when you have some service that starts automatically (similar to |
1074 | Likewise, when you have some service that starts automatically when loaded |
1074 | AnyEvent::MP::Global), then you can configure this service statically: |
1075 | (similar to AnyEvent::MP::Global), then you can configure this service |
|
|
1076 | statically: |
1075 | |
1077 | |
1076 | aemp profile mysrvnode services mymod::service:: |
1078 | aemp profile mysrvnode services mymod::service:: |
1077 | aemp run profile mysrvnode |
1079 | aemp run profile mysrvnode |
1078 | |
1080 | |
1079 | And the module will automatically be loaded in the node, as specifying a |
1081 | And the module will automatically be loaded in the node, as specifying a |
1080 | module name (with C<::>-suffix) will simply load the module, which is then |
1082 | module name (with C<::>-suffix) will simply load the module, which is then |
1081 | free to do whatever it wants. |
1083 | free to do whatever it wants. |
1082 | |
1084 | |
1083 | Of course, you can also do it in the much more standard way by writing |
1085 | Of course, you can also do it in the much more standard way by writing |
1084 | a module (e.g. C<BK::Backend::IRC>), installing it as part of a module |
1086 | a module (e.g. C<BK::Backend::IRC>), installing it as part of a module |
1085 | distribution and then configure nodes, for example, if I want to run the |
1087 | distribution and then configure nodes. For example, if I wanted to run the |
1086 | Bummskraut IRC backend on a machine named "ruth", I could do this: |
1088 | Bummskraut IRC backend on a machine named "ruth", I could do this: |
1087 | |
1089 | |
1088 | aemp profile ruth addservice BK::Backend::IRC:: |
1090 | aemp profile ruth addservice BK::Backend::IRC:: |
1089 | |
1091 | |
1090 | And any F<aemp run> on that host will automatically have the Bummskraut |
1092 | And any F<aemp run> on that host will automatically have the Bummskraut |
1091 | IRC backend running. |
1093 | IRC backend running. |
1092 | |
1094 | |
1093 | That's plenty of possibilities you can use - it's all up to you how you |
1095 | There are plenty of possibilities you can use - it's all up to you how you |
1094 | structure your application. |
1096 | structure your application. |
1095 | |
1097 | |
1096 | =head1 PART 4: Coro::MP - selective receive |
1098 | =head1 PART 4: Coro::MP - selective receive |
1097 | |
1099 | |
1098 | Not all problems lend themselves naturally to an event-based solution: |
1100 | Not all problems lend themselves naturally to an event-based solution: |
1099 | sometimes things are easier if you can decide in what order you want to |
1101 | sometimes things are easier if you can decide in what order you want to |
1100 | receive messages, irregardless of the order in which they were sent. |
1102 | receive messages, regardless of the order in which they were sent. |
1101 | |
1103 | |
1102 | In these cases, L<Coro::MP> can provide a nice solution: instead of |
1104 | In these cases, L<Coro::MP> can provide a nice solution: instead of |
1103 | registering callbacks for each message type, C<Coro::MP> attached a |
1105 | registering callbacks for each message type, C<Coro::MP> attaches a |
1104 | (coro-) thread to a port. The thread can then opt to selectively receive |
1106 | (coro-) thread to a port. The thread can then opt to selectively receive |
1105 | messages it is interested in. Other messages are not lost, but queued, and |
1107 | messages it is interested in. Other messages are not lost, but queued, and |
1106 | can be received at a later time. |
1108 | can be received at a later time. |
1107 | |
1109 | |
1108 | The C<Coro::MP> module is not part of L<AnyEvent::MP>, but a separate |
1110 | The C<Coro::MP> module is not part of L<AnyEvent::MP>, but a separate |
… | |
… | |
1150 | |
1152 | |
1151 | mon $ioserver, sub { |
1153 | mon $ioserver, sub { |
1152 | warn "ioserver was killed: @_\n"; |
1154 | warn "ioserver was killed: @_\n"; |
1153 | }; |
1155 | }; |
1154 | |
1156 | |
1155 | Let's go through it part by part. |
1157 | Let's go through it, section by section. |
1156 | |
1158 | |
1157 | my $ioserver = port_async { |
1159 | my $ioserver = port_async { |
1158 | |
1160 | |
1159 | Ports can be created by attaching a thread to an existing port via |
1161 | Ports can be created by attaching a thread to an existing port via |
1160 | C<rcv_async>, or as here by calling C<port_async> with the code to execute |
1162 | C<rcv_async>, or as in this example, by calling C<port_async> with the |
1161 | as a thread. The C<async> component comes from the fact that threads are |
1163 | code to execute as a thread. The C<async> component comes from the fact |
1162 | created using the C<Coro::async> function. |
1164 | that threads are created using the C<Coro::async> function. |
1163 | |
1165 | |
1164 | The thread runs in a normal port context (so C<$SELF> is set). In |
1166 | The thread runs in a normal port context (so C<$SELF> is set). In |
1165 | addition, when the thread returns, it will be C<kil> I<normally>, i.e. |
1167 | addition, when the thread returns, it will be C<kil> I<normally>, i.e. |
1166 | without a reason argument. |
1168 | without a reason argument. |
1167 | |
1169 | |
1168 | while () { |
1170 | while () { |
1169 | my ($tag, $port, $path) = get_cond; |
1171 | my ($tag, $port, $path) = get_cond; |
1170 | or die "only write_file messages expected"; |
1172 | or die "only write_file messages expected"; |
1171 | |
1173 | |
1172 | The thread is supposed to serve many file writes, which is why it executes |
1174 | The thread is supposed to serve many file writes, which is why it |
1173 | in a loop. The first thing it does is fetch the next message, using |
1175 | executes in a loop. The first thing it does is fetch the next message, |
1174 | C<get_cond>, the "conditional message get". Without a condition, it simply |
1176 | using C<get_cond>, the "conditional message get". Without arguments, it |
1175 | fetches the next message from the queue, which I<must> be a C<write_file> |
1177 | merely fetches the I<next> message from the queue, which I<must> be a |
1176 | message. |
1178 | C<write_file> message. |
1177 | |
1179 | |
1178 | The message contains the C<$path> to the file, which is then created: |
1180 | The message contains the C<$path> to the file, which is then created: |
1179 | |
1181 | |
1180 | my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666 |
1182 | my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666 |
1181 | or die "$path: $!"; |
1183 | or die "$path: $!"; |
… | |
… | |
1199 | |
1201 | |
1200 | The condition must be this strict, as it is possible to receive both |
1202 | The condition must be this strict, as it is possible to receive both |
1201 | C<write_file> messages and C<data> messages from other ports while we |
1203 | C<write_file> messages and C<data> messages from other ports while we |
1202 | handle the file writing. |
1204 | handle the file writing. |
1203 | |
1205 | |
1204 | The lone C<5> at the end is a timeout - when no matching message is |
1206 | The lone C<5> argument at the end is a timeout - when no matching message |
1205 | received within C<5> seconds, we assume an error and C<die>. |
1207 | is received within C<5> seconds, we assume an error and C<die>. |
1206 | |
1208 | |
1207 | When an empty C<data> message is received we are done and can close the |
1209 | When an empty C<data> message is received we are done and can close the |
1208 | file (which is done automatically as C<$fh> goes out of scope): |
1210 | file (which is done automatically as C<$fh> goes out of scope): |
1209 | |
1211 | |
1210 | length $data or last; |
1212 | length $data or last; |
1211 | |
1213 | |
1212 | Otherwise we need to write the data: |
1214 | Otherwise we need to write the data: |
1213 | |
1215 | |
1214 | aio_write $fh, undef, undef, $data, 0; |
1216 | aio_write $fh, undef, undef, $data, 0; |
1215 | |
1217 | |
1216 | That's basically it. Note that every process should have some kind of |
1218 | And that's basically it. Note that every port thread should have some |
1217 | supervisor. In our case, the supervisor simply prints any error message: |
1219 | kind of supervisor. In our case, the supervisor simply prints any error |
|
|
1220 | message: |
1218 | |
1221 | |
1219 | mon $ioserver, sub { |
1222 | mon $ioserver, sub { |
1220 | warn "ioserver was killed: @_\n"; |
1223 | warn "ioserver was killed: @_\n"; |
1221 | }; |
1224 | }; |
1222 | |
1225 | |
… | |
… | |
1243 | This is simpler, but when some other code part sends an unexpected message |
1246 | This is simpler, but when some other code part sends an unexpected message |
1244 | to the C<$ioserver> it will stay in the queue forever. As a rule of thumb, |
1247 | to the C<$ioserver> it will stay in the queue forever. As a rule of thumb, |
1245 | every threaded port should have a "fetch next message unconditionally" |
1248 | every threaded port should have a "fetch next message unconditionally" |
1246 | somewhere, to avoid filling up the queue. |
1249 | somewhere, to avoid filling up the queue. |
1247 | |
1250 | |
1248 | It is also possible to switch-like C<get_conds>: |
1251 | Finally, it is also possible to use more switch-like C<get_conds>: |
1249 | |
1252 | |
1250 | get_cond { |
1253 | get_cond { |
1251 | $_[0] eq "msg1" and return sub { |
1254 | $_[0] eq "msg1" and return sub { |
1252 | my (undef, @msg1_data) = @_; |
1255 | my (undef, @msg1_data) = @_; |
1253 | ...; |
1256 | ...; |
… | |
… | |
1264 | =head1 THE END |
1267 | =head1 THE END |
1265 | |
1268 | |
1266 | This is the end of this introduction, but hopefully not the end of |
1269 | This is the end of this introduction, but hopefully not the end of |
1267 | your career as AEMP user. I hope the tutorial was enough to make the |
1270 | your career as AEMP user. I hope the tutorial was enough to make the |
1268 | basic concepts clear. Keep in mind that distributed programming is not |
1271 | basic concepts clear. Keep in mind that distributed programming is not |
1269 | completely trivial, that AnyEvent::MP is still in it's infancy, and I hope |
1272 | completely trivial, in fact, it's pretty complicated. We hope AEMP makes |
1270 | it will be useful to create exciting new applications. |
1273 | it simpler and will be useful to create exciting new applications. |
1271 | |
1274 | |
1272 | =head1 SEE ALSO |
1275 | =head1 SEE ALSO |
1273 | |
1276 | |
1274 | L<AnyEvent::MP> |
1277 | L<AnyEvent::MP> |
1275 | |
1278 | |