… | |
… | |
212 | use AnyEvent::MP; |
212 | use AnyEvent::MP; |
213 | |
213 | |
214 | configure nodeid => "eg_receiver/%u", binds => ["*:4040"]; |
214 | configure nodeid => "eg_receiver/%u", binds => ["*:4040"]; |
215 | |
215 | |
216 | my $port = port; |
216 | my $port = port; |
217 | my $db_guard = db_reg eg_receivers => $port; |
217 | db_set eg_receivers => $port; |
218 | |
218 | |
219 | rcv $port, test => sub { |
219 | rcv $port, test => sub { |
220 | my ($data, $reply_port) = @_; |
220 | my ($data, $reply_port) = @_; |
221 | |
221 | |
222 | print "Received data: " . $data . "\n"; |
222 | print "Received data: " . $data . "\n"; |
… | |
… | |
225 | AnyEvent->condvar->recv; |
225 | AnyEvent->condvar->recv; |
226 | |
226 | |
227 | =head3 AnyEvent::MP::Global |
227 | =head3 AnyEvent::MP::Global |
228 | |
228 | |
229 | Now, that wasn't too bad, was it? OK, let's step through the new functions |
229 | Now, that wasn't too bad, was it? OK, let's step through the new functions |
230 | and modules that have been used. |
230 | that have been used. |
231 | |
|
|
232 | For starters, there is now an additional module being |
|
|
233 | used: L<AnyEvent::MP::Global>. This module provides us with a I<global |
|
|
234 | registry>, which lets us register ports in groups that are visible on all |
|
|
235 | I<nodes> in a network. |
|
|
236 | |
|
|
237 | What is this useful for? Well, the I<port IDs> are random-looking strings, |
|
|
238 | assigned by L<AnyEvent::MP>. We cannot know those I<port IDs> in advance, |
|
|
239 | so we don't know which I<port ID> to send messages to, especially when the |
|
|
240 | message is to be passed between different I<nodes> (or UNIX processes). To |
|
|
241 | find the right I<port> of another I<node> in the network we will need |
|
|
242 | to communicate this somehow to the sender. And exactly that is what |
|
|
243 | L<AnyEvent::MP::Global> provides. |
|
|
244 | |
|
|
245 | Especially in larger, more anonymous networks this is handy: imagine you |
|
|
246 | have a few database backends, a few web front-ends and some processing |
|
|
247 | distributed over a number of hosts: all of these would simply register |
|
|
248 | themselves in the appropriate group, and your web front-ends can start to |
|
|
249 | find some database backend. |
|
|
250 | |
231 | |
251 | =head3 C<configure> and Joining and Maintaining the Network |
232 | =head3 C<configure> and Joining and Maintaining the Network |
252 | |
233 | |
253 | Now, let's have a look at the new function, C<configure>: |
234 | First let's have a look at C<configure>: |
254 | |
235 | |
255 | configure nodeid => "eg_receiver", binds => ["*:4040"]; |
236 | configure nodeid => "eg_receiver/%u", binds => ["*:4040"]; |
256 | |
237 | |
257 | Before we are able to send messages to other nodes we have to initialise |
238 | Before we are able to send messages to other nodes we have to initialise |
258 | ourself to become a "distributed node". Initialising a node means naming |
239 | ourself to become a "distributed node". Initialising a node means naming |
259 | the node, optionally binding some TCP listeners so that other nodes can |
240 | the node and binding some TCP listeners so that other nodes can |
260 | contact it and connecting to a predefined set of seed addresses so the |
241 | contact it. |
261 | node can discover the existing network - and the existing network can |
242 | |
262 | discover the node! |
243 | Additionally, to actually link all nodes in a network together, you can |
|
|
244 | specify a number of seed addresses, which will be used by the node to |
|
|
245 | connect itself into an existing network, as we will see shortly. |
263 | |
246 | |
264 | All of this (and more) can be passed to the C<configure> function - later |
247 | All of this (and more) can be passed to the C<configure> function - later |
265 | we will see how we can do all this without even passing anything to |
248 | we will see how we can do all this without even passing anything to |
266 | C<configure>! |
249 | C<configure>! |
267 | |
250 | |
268 | The first parameter, C<nodeid>, specified the node ID (in this case |
251 | The first parameter, C<nodeid>, specified the node ID (in this case |
269 | C<eg_receiver> - the default is to use the node name of the current host, |
252 | C<eg_receiver/%u> - the default is to use the node name of the current |
270 | but for this example we want to be able to run many nodes on the same |
253 | host plus C</%u>, which goves the node a name with a random suffix to |
271 | machine). Node IDs need to be unique within the network and can be almost |
254 | make it unique, but for this example we want the node to have a bit more |
272 | any string - if you don't care, you can specify a node ID of C<anon/> |
255 | personality, and name it C<eg_receiver> with a random suffix. |
273 | which will then be replaced by a random node name. |
256 | |
|
|
257 | Why the random suffix? Node IDs need to be unique within the network and |
|
|
258 | appending a random suffix is the easiest way to do that. |
274 | |
259 | |
275 | The second parameter, C<binds>, specifies a list of C<address:port> pairs |
260 | The second parameter, C<binds>, specifies a list of C<address:port> pairs |
276 | to bind TCP listeners on. The special "address" of C<*> means to bind on |
261 | to bind TCP listeners on. The special "address" of C<*> means to bind on |
277 | every local IP address (this might not work on every OS, so it should not |
262 | every local IP address (this might not work on every OS, so explicit IP |
278 | be used unless you know it works). |
263 | addresses are best). |
279 | |
264 | |
280 | The reason to bind on a TCP port is not just that other nodes can connect |
265 | The reason to bind on a TCP port is not just that other nodes can connect |
281 | to us: if no binds are specified, the node will still bind on a dynamic |
266 | to us: if no binds are specified, the node will still bind on a dynamic |
282 | port on all local addresses - but in this case we won't know the port, and |
267 | port on all local addresses - but in this case we won't know the port, and |
283 | cannot tell other nodes to connect to it as seed node. |
268 | cannot tell other nodes to connect to it as seed node. |
284 | |
269 | |
285 | A I<seed> is a (fixed) TCP address of some other node in the network. To |
270 | Now, a I<seed> is simply the TCP address of some other node in the |
286 | explain the need for seeds we have to look at the topology of a typical |
271 | network, often the same string as used for the C<binds> parameter of the |
287 | L<AnyEvent::MP> network. The topology is called a I<fully connected mesh>, |
272 | other node. The need for seeds is easy to explain: I<somehow> the nodes |
288 | here an example with 4 nodes: |
273 | of an aemp network have to find each other, and often this means over the |
|
|
274 | internet. So broadcasts are out. |
289 | |
275 | |
290 | N1--N2 |
276 | Instead, a node usually specifies the addresses of a few (for redundancy) |
291 | | \/ | |
277 | other nodes, some of which should be up. Two nodes can set each other as |
292 | | /\ | |
278 | seeds without any issues. You could even specify all nodes as seeds for |
293 | N3--N4 |
279 | all nodes, for total redundancy. But the common case is to have some more |
|
|
280 | or less central, stable servers running seed services for other nodes. |
294 | |
281 | |
295 | Now imagine another node - C<N5> - wants to connect itself to that network: |
282 | All you need to do to ensure that an AnyEvent::MP network connects |
|
|
283 | together is to make sure that all connections from nodes to their seed |
|
|
284 | nodes I<somehow> span the whole network. The simplest way to do that would |
|
|
285 | be for all nodes to specify a single node as seed node, and you would get |
|
|
286 | a star topology. If you specify all nodes as seed nodes, you get a fully |
|
|
287 | meshed network (that's what previous releases of AnyEvent::MP actually |
|
|
288 | did). |
296 | |
289 | |
297 | N1--N2 |
290 | A node tries to keep connections open to all of it's seed nodes at all |
298 | | \/ | N5 |
291 | times, while other connections are made on demand only. |
299 | | /\ | |
|
|
300 | N3--N4 |
|
|
301 | |
292 | |
302 | The new node needs to know the I<binds> of all nodes already |
293 | All of this ensures that the network stays one network - even if all the |
303 | connected. Exactly this is what the I<seeds> are for: Let's assume that |
294 | nodes in one half of the net are separated from the nodes in the other |
304 | the new node (C<N5>) uses the TCP address of the node C<N2> as seed. This |
295 | half by some network problem, once that is over, they will eventually |
305 | causes it to connect to C<N2>: |
296 | become a single network again. |
306 | |
297 | |
307 | N1--N2____ |
298 | In addition to creating the network, a node also expects the seed nodes to |
308 | | \/ | N5 |
299 | run the shared database service - if need be, by automatically starting it, |
309 | | /\ | |
300 | so you don't normally need to configure this explicitly. |
310 | N3--N4 |
|
|
311 | |
301 | |
312 | C<N2> then tells C<N5> about the I<binds> of the other nodes it is |
302 | #TODO# later?#d# |
313 | connected to, and C<N5> creates the rest of the connections: |
|
|
314 | |
|
|
315 | /--------\ |
|
|
316 | N1--N2____| |
|
|
317 | | \/ | N5 |
|
|
318 | | /\ | /| |
|
|
319 | N3--N4--- | |
|
|
320 | \________/ |
|
|
321 | |
|
|
322 | All done: C<N5> is now happily connected to the rest of the network. |
|
|
323 | |
|
|
324 | Apart form the obvious function - joining the network - seed nodes fulfill |
|
|
325 | another very important function: the connections created by connecting |
|
|
326 | to seed nodes are used to keep the network together - by trying to keep |
|
|
327 | connections to all seed nodes active, the network ensures that it will not |
|
|
328 | split into multiple networks without connection to each other. |
|
|
329 | |
|
|
330 | This means that the graph created by all seed node connections must span |
|
|
331 | the whole network, in some way. |
|
|
332 | |
|
|
333 | There are many ways of doing this - the most simple is probably to use |
|
|
334 | a single set of one or more seednodes as seednodes for all nodes in the |
|
|
335 | network - this creates a "hub" of seednodes that connect to each other, |
|
|
336 | and "leaf" nodes that connect to the nodes in the hub, keeping everything |
|
|
337 | together. |
|
|
338 | |
|
|
339 | The process of joining a network takes time, during which the node is |
303 | The process of joining a network takes time, during which the node |
340 | already running. This also means it takes time until the node is fully |
304 | is already running. This means it takes time until the node is |
341 | connected, and global groups and other information is available. The best |
305 | fully connected, and information about services in the network are |
342 | way to deal with this is to either retry regularly until you found the |
306 | available. This is why most AnyEvent::MP programs start by waiting a while |
343 | resource you were looking for, or to only start services on demand after a |
307 | until the information they need is available. |
344 | node has become available. |
308 | |
|
|
309 | We will see how this is done later, in the sender program. |
345 | |
310 | |
346 | =head3 Registering the Receiver |
311 | =head3 Registering the Receiver |
347 | |
312 | |
348 | Coming back to our example, we have now introduced the basic purpose of |
313 | Coming back to our example, after the node has been configured for network |
349 | L<AnyEvent::MP::Global> and C<configure> and its use of profiles. We |
314 | access, it is time to publish some service, namely the receive service. |
350 | also set up our profiles for later use and now we will finally continue |
|
|
351 | talking about the receiver. |
|
|
352 | |
315 | |
353 | Let's look at the next line(s): |
316 | For that, let's look at the next lines: |
354 | |
317 | |
355 | my $port = port; |
318 | my $port = port; |
356 | my $db_guard = db_reg eg_receivers => $port; |
319 | db_set eg_receivers => $port; |
357 | |
320 | |
358 | The C<port> function has already been discussed. It simply creates a new |
321 | The C<port> function has already been discussed. It simply creates a new |
359 | I<port> and returns the I<port ID>. The C<grp_reg> function, however, is |
322 | I<port> and returns the I<port ID>. The C<db_reg> function, however, is |
360 | new: The first argument is the name of a I<global group>, and the second |
323 | new: The first argument is the name of a I<database family> and the second |
361 | argument is the I<port ID> to register in that group. |
324 | argument is the name of a I<subkey> within that family. The third argument |
|
|
325 | would be the I<value> to be associated with the family and subkey, but, |
|
|
326 | since it is missing, it will simply be C<undef>. |
362 | |
327 | |
363 | You can choose the name of such a I<global group> freely (prefixing your |
328 | Ok, what's this weird tlak about families you wonder - AnyEvent::MP comes |
364 | package name is I<highly recommended> however and might be enforce din |
329 | with a distributed database. This database runs on so-called "global" |
365 | future versions!). The purpose of such a group is to store a set of port |
330 | nodes, which usually are the seed nodes of your network. The database |
366 | IDs. This set is made available throughout the L<AnyEvent::MP> network, |
331 | structure is "simply" a hash of hashes of values. |
367 | so that each node can see which ports belong to that group. |
|
|
368 | |
332 | |
369 | Later we will see how the sender looks for the ports in this global |
333 | In other words, if the database were stored in C<%DB>, then the C<db_set> |
370 | group to send messages to them. |
334 | function more or less would do this: |
|
|
335 | |
|
|
336 | $DB{eg_receivers}{$port} = undef; |
|
|
337 | |
|
|
338 | So the ominous "family" selects a hash in the database, and the "subkey" |
|
|
339 | is simply the key in this hash. And C<db_set> very much works like an |
|
|
340 | assignment. |
|
|
341 | |
|
|
342 | The family namespace is shared by all nodes in a network, so the names |
|
|
343 | should be reasonably unique, for example, they could start with the name |
|
|
344 | of your module, or the name of the program. |
|
|
345 | |
|
|
346 | The purpose behind adding this key to the database is that the sender can |
|
|
347 | look it up and find our port. We will shortly see how. |
371 | |
348 | |
372 | The last step in the example is to set up a receiver callback for those |
349 | The last step in the example is to set up a receiver callback for those |
373 | messages, just as was discussed in the first example. We again match |
350 | messages, just as was discussed in the first example. We again match |
374 | for the tag C<test>. The difference is that this time we don't exit the |
351 | for the tag C<test>. The difference is that this time we don't exit the |
375 | application after receiving the first message. Instead we continue to wait |
352 | application after receiving the first message. Instead we continue to wait |
… | |
… | |
382 | use AnyEvent; |
359 | use AnyEvent; |
383 | use AnyEvent::MP; |
360 | use AnyEvent::MP; |
384 | |
361 | |
385 | configure nodeid => "eg_sender/%u", seeds => ["*:4040"]; |
362 | configure nodeid => "eg_sender/%u", seeds => ["*:4040"]; |
386 | |
363 | |
387 | my $find_timer = |
364 | my $guard = db_mon eg_receivers => sub { |
388 | AnyEvent->timer (after => 0, interval => 1, cb => sub { |
365 | my ($family, $keys) = @_; |
389 | my $ports = grp_get "eg_receivers" |
366 | return unless %$family; |
390 | or return; |
|
|
391 | |
367 | |
392 | snd $_, test => time |
368 | # now there are some receivers, send them a message |
393 | for @$ports; |
369 | snd $_ => test => time, keys %$family |
394 | }); |
370 | for keys %$family; |
|
|
371 | }; |
395 | |
372 | |
396 | AnyEvent->condvar->recv; |
373 | AnyEvent->condvar->recv; |
397 | |
374 | |
398 | It's even less code. The C<configure> serves the same purpose as in the |
375 | It's even less code. The C<configure> serves the same purpose as in the |
399 | receiver, but instead of specifying binds we specify a list of seeds - |
376 | receiver, but instead of specifying binds we specify a list of seeds - |
400 | which happens to be the same as the binds used by the receiver, which |
377 | the seed happens to be the same as the bind used by the receiver, which |
401 | becomes our seed node. |
378 | therefore becomes our seed node. |
402 | |
379 | |
403 | Next we set up a timer that repeatedly (every second) calls this chunk of |
380 | Remember the part about having to wait till things become available? After |
404 | code: |
381 | configure returns, nothing has been done yet - the node is not connected |
|
|
382 | to the network, knows nothing about the database contents, and it can take |
|
|
383 | ages (for a computer :) for this situation to change. |
405 | |
384 | |
406 | my $ports = grp_get "eg_receivers" |
385 | Therefore, the sender waits, in this case by using the C<db_mon> |
407 | or return; |
386 | function. This function registers an interest in a specific database |
|
|
387 | family (C<eg_receivers>). Each time something inside the family changes |
|
|
388 | (a key is added, changed or deleted), it will call our callback with the |
|
|
389 | family hash as first argument, and the list of keys as second argument. |
408 | |
390 | |
409 | snd $_, test => time |
391 | The callback only checks whether the C<%$family> has is empty - in this |
410 | for @$ports; |
392 | case it does nothing. |
411 | |
393 | |
412 | The only new function here is the C<grp_get> function of |
394 | Eventually the family will, however, contain the port we set in the |
413 | L<AnyEvent::MP::Global>. It searches in the global group named |
395 | sender. Then it will send a message to it and any other receiver in the |
414 | C<eg_receivers> for ports. If none are found, it returns C<undef>, which |
396 | group. |
415 | makes our code return instantly and wait for the next round, as nobody is |
|
|
416 | interested in our message. |
|
|
417 | |
397 | |
418 | As soon as the receiver application has connected and the information |
398 | You can experiment by having multiple receivers - you have to change the |
419 | about the newly added port in the receiver has propagated to the sender |
399 | "binds" parameter in the receiver to the seeds used in the sender to start |
420 | node, C<grp_get> returns an array reference that contains the I<port ID> of |
400 | up additional receivers, but then you can start as many as you like. If |
421 | the receiver I<port(s)>. |
401 | you specify proper IP addresses for the seeds, you can even run them on |
|
|
402 | different computers. |
422 | |
403 | |
423 | We then just send a message with a tag and the current time to every |
404 | Each time you start the sender, it will send a message to all receivers it |
424 | I<port> in the global group. |
405 | finds (you have to interrupt it manualy afterwards). |
|
|
406 | |
|
|
407 | Things you could try include using C<PERL_ANYEVENT_MP_TRACE=1> to see |
|
|
408 | which messages are exchanged, or starting the sender first and see how |
|
|
409 | long it takes it to find the receiver. |
425 | |
410 | |
426 | =head3 Splitting Network Configuration and Application Code |
411 | =head3 Splitting Network Configuration and Application Code |
427 | |
412 | |
|
|
413 | #TODO# |
428 | OK, so far, this works. In the real world, however, the person configuring |
414 | OK, so far, this works. In the real world, however, the person configuring |
429 | your application to run on a specific network (the end user or network |
415 | your application to run on a specific network (the end user or network |
430 | administrator) is often different to the person coding the application. |
416 | administrator) is often different to the person coding the application. |
431 | |
417 | |
432 | Or to put it differently: the arguments passed to configure are usually |
418 | Or to put it differently: the arguments passed to configure are usually |