ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Base.pm
Revision: 1.5
Committed: Mon Aug 3 21:35:03 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Changes since 1.4: +72 -35 lines
Log Message:
first round of monitoring

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Base - basis for AnyEvent::MP and Coro::MP
4
5 =head1 SYNOPSIS
6
7 # use AnyEvent::MP or Coro::MP instead
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 =head1 GLOBALS
16
17 =over 4
18
19 =cut
20
21 package AnyEvent::MP::Base;
22
23 use AnyEvent::MP::Node;
24 use AnyEvent::MP::Transport;
25
26 use common::sense;
27
28 use Carp ();
29
30 use AE ();
31
32 use base "Exporter";
33
34 our $VERSION = '0.01';
35 our @EXPORT = qw(
36 NODE $NODE snd del _any_
37 become_slave become_public
38 );
39
40 our $DEFAULT_SECRET;
41
42 our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
43 our $CONNECT_TIMEOUT = 30; # includes handshake
44
45 =item $AnyEvent::MP::Base::WARN
46
47 This value is called with an error or warning message, when e.g. a connection
48 could not be created, authorisation failed and so on.
49
50 The default simply logs the message to STDERR.
51
52 =cut
53
54 our $WARN = sub {
55 warn "$_[0]\n";
56 };
57
58 sub nonce($) {
59 my $nonce;
60
61 if (open my $fh, "</dev/urandom") {
62 sysread $fh, $nonce, $_[0];
63 } else {
64 # shit...
65 our $nonce_init;
66 unless ($nonce_init++) {
67 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
68 }
69
70 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
71 }
72
73 $nonce
74 }
75
76 sub default_secret {
77 unless (defined $DEFAULT_SECRET) {
78 if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
79 sysread $fh, $DEFAULT_SECRET, -s $fh;
80 } else {
81 $DEFAULT_SECRET = nonce 32;
82 }
83 }
84
85 $DEFAULT_SECRET
86 }
87
88 our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie
89 our $ID = "a";
90 our $PUBLIC = 0;
91 our $NODE = $$;
92 our $PORT;
93
94 our %NODE; # node id to transport mapping, or "undef", for local node
95 our %PORT; # local ports
96
97 our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
98 our %LMON; # monitored _local_ ports
99
100 our %WKP;
101 our %LISTENER; # local transports
102
103 our $SRCNODE; # holds the sending node during _inject
104
105 sub NODE() { $NODE }
106
107 sub _ANY_() { 1 }
108 sub _any_() { \&_ANY_ }
109
110 sub _inject {
111 &{ $PORT{+shift} or return };
112 }
113
114 sub add_node {
115 my ($noderef) = @_;
116
117 return $NODE{$noderef}
118 if exists $NODE{$noderef};
119
120 for (split /,/, $noderef) {
121 return $NODE{$noderef} = $NODE{$_}
122 if exists $NODE{$_};
123 }
124
125 # for indirect sends, use a different class
126 my $node = new AnyEvent::MP::Node::Direct $noderef;
127
128 $NODE{$_} = $node
129 for $noderef, split /,/, $noderef;
130
131 $node
132 }
133
134 sub snd(@) {
135 my ($noderef, $port) = split /#/, shift, 2;
136
137 ($NODE{$noderef} || add_node $noderef)
138 ->send ([$port, @_]);
139 }
140
141 sub del($) {
142 my ($noderef, $port) = split /#/, shift, 2;
143
144 delete $PORT{$port};
145
146 my $mon = delete $LMON{$port}
147 or return;
148
149 $_->() for values %$mon;
150 }
151
152 sub become_public {
153 return if $PUBLIC;
154
155 my $noderef = join ",", @_;
156 my @args = @_;
157
158 $NODE = (AnyEvent::MP::Node::normalise_noderef $noderef)->recv;
159
160 for my $t (split /,/, $NODE) {
161 $NODE{$t} = $NODE{""};
162
163 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
164
165 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
166 @args,
167 sub {
168 my ($tp) = @_;
169
170 # TODO: urgs
171 my $node = add_node $tp->{remote_node};
172 $node->{trial}{accept} = $tp;
173 },
174 ;
175 }
176
177 $PUBLIC = 1;
178 }
179
180 #############################################################################
181 # self node code
182
183 our %node_req = (
184 # monitoring
185 mon0 => sub { # disable monitoring
186 my $portid = shift;
187 my $node = $SRCNODE;
188 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
189 },
190 mon1 => sub { # enable monitoring
191 my $portid = shift;
192 my $node = $SRCNODE;
193 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
194 $node->send (["", exit => $portid]);
195 });
196 },
197 exit => sub {
198 my $cbs = delete $SRCNODE->{lmon}{$_[0]}
199 or return;
200
201 $_->() for @$cbs;
202 },
203
204 # well-known-port lookup
205 wkp => sub {
206 my $wkname = shift;
207 snd @_, $WKP{$wkname};
208 },
209
210 # relay message to another node / generic echo
211 relay => sub {
212 &snd;
213 },
214
215 # random garbage
216 eval => sub {
217 my @res = eval shift;
218 snd @_, "$@", @res if @_;
219 },
220 time => sub {
221 snd @_, AE::time;
222 },
223 devnull => sub {
224 #
225 },
226 );
227
228 $NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE;
229 $PORT{""} = sub { &{ $node_req{+shift} or return } };
230
231 =back
232
233 =head1 SEE ALSO
234
235 L<AnyEvent::MP>.
236
237 =head1 AUTHOR
238
239 Marc Lehmann <schmorp@schmorp.de>
240 http://home.schmorp.de/
241
242 =cut
243
244 1
245