#!/opt/bin/perl =head1 NAME aemp - AnyEvent:MP utility =head1 SYNOPSIS aemp command args... # protocol commands aemp snd # send a message aemp mon # wait till port is killed aemp rpc # send message, append reply aemp eval # evaluate expression # run a node aemp run initialise_args... # run a node # node configuration: protocol endpoints aemp setnodeid # configure the real node id aemp delnodeid # reset node id to default (= inherit) # node configuration: secret aemp gensecret # generate a random shared secret aemp setsecret # set the shared secret aemp delsecret # remove the secret (= inherit) # node configuration: TLS aemp gencert # generate a random certificate aemp setcert # set a certificate (key.pem + certificate.pem) aemp delcert # remove certificate (= inherit) # node configuration: seed addresses for bootstrapping aemp setseeds ,... # set seeds aemp delseeds # clear all seeds (= inherit) aemp addseed # add a seed aemp delseed # remove seed # node configuration: bind addresses aemp setbinds ,... # set binds aemp delbinds # clear all binds (= inherit) aemp addbind # add a bind address aemp delbind # remove a bind address # node configuration: services aemp setservices initfunc,... # set service functions aemp delservices # clear all services (= inherit) aemp addservice # add an instance of a service aemp delservice # delete one instance of a service # profile-specific configuration aemp profile ... # apply command to profile only aemp delprofile # eradicate the named profile aemp showprofile # display given profile aemp showconfig # display effective config # debugging aemp trace # trace the network topology =head1 DESCRIPTION With aemp you can configure various aspects of AnyEvent::MP and its protocol. You can also start a "default node", a node that only depends on the static configuration. =cut use common::sense; # should come before anything else, so all modules # will be loaded on each restart BEGIN { if ($ARGV[0] eq "run") { shift; # d'oh require AnyEvent::Watchdog; # now we can load extra modules AnyEvent::Watchdog::autorestart (1); AnyEvent::Watchdog::heartbeat (300); require AnyEvent; require AnyEvent::MP; AnyEvent::MP::initialise_node (@ARGV); AnyEvent::detect () eq "AnyEvent::Impl::EV" ? EV::loop () : AE::cv ()->recv; } } use Carp (); use JSON::XS; use AnyEvent; use AnyEvent::Util; use AnyEvent::MP; use AnyEvent::MP::Config; sub my_run_cmd { my ($cmd) = @_; my $cv = &run_cmd; my $status = $cv->recv; $status and die "@$cmd: command failed with exit status $status."; } sub gen_cert { my_run_cmd [qw(openssl req -new -nodes -x509 -days 3650 -newkey rsa:2048 -keyout /dev/fd/3 -batch -subj /CN=AnyEvent::MP )], "<", "/dev/null", ">" , \my $cert, "3>", \my $key, "2>", "/dev/null"; "$cert$key" } our $cfg = AnyEvent::MP::Config::config; our $profile = $cfg; sub trace { my ($node) = @_; my $cv = AE::cv; my %seen; my $to = AE::timer 15, 0, sub { warn "timeout\n"; $cv->(); }; initialise_node "anon/"; my $reply = port { my ($node, @neigh) = @_; @neigh = grep $_ ne $NODE, @neigh; print +(join " ", $node, @neigh), "\n"; for (@neigh) { unless ($seen{$_}++) { $cv->begin; snd $_, up_nodes => $SELF => $_; } } $cv->end; }; $cv->begin; snd $reply, seed => $node; $cv->recv; } sub docmd; our %CMD = ( snd => sub { my $port = shift @ARGV; initialise_node "anon/"; snd $port, @ARGV; @ARGV = (); my $cv = AE::cv; my $to = AE::timer 5, 0, sub { $cv->("timeout") }; mon $port, $cv; my $reply = port sub { &$cv }; snd node_of $port, snd => $reply, "message sent successfully"; print join " ", $cv->recv, "\n"; }, rpc => sub { my $port = shift @ARGV; initialise_node "anon/"; my $cv = AE::cv; my $to = AE::timer 5, 0, sub { $cv->("timeout") }; snd $port, @ARGV, port { &$cv }; @ARGV = (); mon $port, $cv; print join " ", $cv->recv, "\n"; }, mon => sub { my $port = shift @ARGV; initialise_node "anon/"; mon $port, my $cv = AE::cv; print join " ", $cv->recv, "\n"; }, eval => sub { my $node = node_of shift @ARGV; my $expr = join " ", @ARGV; @ARGV = (); initialise_node "anon/"; my $cv = AE::cv; my $to = AE::timer 5, 0, sub { $cv->("timeout") }; AnyEvent::MP::Kernel::eval_on $node, $expr, port { &$cv }; mon $node, $cv; my ($err, @res) = $cv->recv; die $err if length $err; print +(substr JSON::XS->new->encode (\@res), 1, -1), "\n"; }, trace => sub { @ARGV >= 1 or die "node id missing\n"; trace shift @ARGV; }, setnodeid => sub { @ARGV >= 1 or die "shared secret missing\n"; $profile->{nodeid} = shift @ARGV; ++$cfg->{dirty}; }, delnodeid => sub { delete $profile->{nodeid}; ++$cfg->{dirty}; }, setsecret => sub { @ARGV >= 1 or die "shared secret missing\n"; $profile->{secret} = shift @ARGV; ++$cfg->{dirty}; }, gensecret => sub { $profile->{secret} = AnyEvent::MP::Kernel::alnumbits AnyEvent::MP::Kernel::nonce 64; ++$cfg->{dirty}; }, delsecret => sub { delete $profile->{secret}; ++$cfg->{dirty}; }, setcert => sub { @ARGV >= 1 or die "key+certificate pem filename missing\n"; open my $fh, "<", $ARGV[0] or die "$ARGV[0]: $!"; local $/; $profile->{cert} = <$fh>; ++$cfg->{dirty}; }, gencert => sub { $profile->{cert} = gen_cert; ++$cfg->{dirty}; }, delcert => sub { delete $profile->{cert}; ++$cfg->{dirty}; }, setbinds => sub { @ARGV >= 1 or die "bind addresses missing\n"; $profile->{binds} = [split /,/, shift @ARGV]; ++$cfg->{dirty}; }, delbinds => sub { delete $profile->{binds}; ++$cfg->{dirty}; }, addbind => sub { @ARGV >= 1 or die "bind address missing\n"; my $bind = shift @ARGV; @{ $profile->{binds} } = grep $_ ne $bind, @{ $profile->{binds} }; push @{ $profile->{binds} }, $bind; ++$cfg->{dirty}; }, delbind => sub { @ARGV >= 1 or die "bind address missing\n"; my $bind = shift @ARGV; @{ $profile->{binds} } = grep $_ ne $bind, @{ $profile->{binds} }; ++$cfg->{dirty}; }, setseeds => sub { @ARGV >= 1 or die "seed addresses missing\n"; $profile->{seeds} = [split /,/, shift @ARGV]; ++$cfg->{dirty}; }, delseeds => sub { delete $profile->{seeds}; ++$cfg->{dirty}; }, addseed => sub { @ARGV >= 1 or die "seed address missing\n"; my $seed = shift @ARGV; @{ $profile->{seeds} } = grep $_ ne $seed, @{ $profile->{seeds} }; push @{ $profile->{seeds} }, $seed; ++$cfg->{dirty}; }, delseed => sub { @ARGV >= 1 or die "seed address missing\n"; my $seed = shift @ARGV; @{ $profile->{seeds} } = grep $_ ne $seed, @{ $profile->{seeds} }; ++$cfg->{dirty}; }, setservices => sub { @ARGV >= 1 or die "service specifications missing\n"; $profile->{services} = [split /,/, shift @ARGV]; @ARGV = (); ++$cfg->{dirty}; }, delservices => sub { delete $profile->{services}; ++$cfg->{dirty}; }, addservice => sub { @ARGV >= 1 or die "service specification missing\n"; my $service = shift @ARGV; push @{ $profile->{services} }, $service; ++$cfg->{dirty}; }, delservice => sub { @ARGV >= 1 or die "service specification missing\n"; my $service = shift @ARGV; for (0 .. $#{ $profile->{services} }) { next unless $profile->{services}[$_] eq $service; splice @{ $profile->{services} }, $_, 1; last; } ++$cfg->{dirty}; }, profile => sub { @ARGV >= 1 or die "profile name is missing\n"; my $name = shift @ARGV; $profile = $cfg->{profile}{$name} ||= {}; ++$cfg->{dirty}; }, delprofile => sub { @ARGV >= 1 or die "profile name is missing\n"; my $name = shift @ARGV; delete $cfg->{profile}{$name}; ++$cfg->{dirty}; }, showprofile => sub { @ARGV >= 1 or die "profile name is missing\n"; my $name = shift @ARGV; print JSON::XS->new->pretty->encode ($cfg->{profile}{$name} || {}); }, showconfig => sub { my $name = @ARGV ? shift @ARGV : AnyEvent::MP::Kernel::_nodename; print JSON::XS->new->pretty->encode (AnyEvent::MP::Config::find_profile $name); }, ); sub docmd { my $cmd = shift @ARGV; $CMD{$cmd} or die "$cmd: no such aemp command (try man aemp)"; $CMD{$cmd}(); } @ARGV or die "Usage: aemp subcommand ... (try man aemp)\n"; docmd while @ARGV;