ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Base.pm
Revision: 1.8
Committed: Tue Aug 4 18:33:30 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.7: +28 -14 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Base - basis for AnyEvent::MP and Coro::MP
4
5 =head1 SYNOPSIS
6
7 # use AnyEvent::MP or Coro::MP instead
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 =head1 GLOBALS
16
17 =over 4
18
19 =cut
20
21 package AnyEvent::MP::Base;
22
23 use common::sense;
24 use Carp ();
25 use MIME::Base64 ();
26
27 use AE ();
28
29 use AnyEvent::MP::Node;
30 use AnyEvent::MP::Transport;
31
32 use base "Exporter";
33
34 our $VERSION = '0.01';
35 our @EXPORT = qw(
36 %NODE %PORT %PORT_DATA %REG $UNIQ $ID add_node
37
38 NODE $NODE node_of snd kil _any_
39 become_slave become_public
40 );
41
42 our $DEFAULT_SECRET;
43
44 our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
45 our $CONNECT_TIMEOUT = 30; # includes handshake
46
47 =item $AnyEvent::MP::Base::WARN
48
49 This value is called with an error or warning message, when e.g. a connection
50 could not be created, authorisation failed and so on.
51
52 The default simply logs the message to STDERR.
53
54 =cut
55
56 our $WARN = sub {
57 my $msg = $_[0];
58 $msg =~ s/\n$//;
59 warn "$msg\n";
60 };
61
62 sub nonce($) {
63 my $nonce;
64
65 if (open my $fh, "</dev/urandom") {
66 sysread $fh, $nonce, $_[0];
67 } else {
68 # shit...
69 our $nonce_init;
70 unless ($nonce_init++) {
71 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
72 }
73
74 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
75 }
76
77 $nonce
78 }
79
80 sub default_secret {
81 unless (defined $DEFAULT_SECRET) {
82 if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
83 sysread $fh, $DEFAULT_SECRET, -s $fh;
84 } else {
85 $DEFAULT_SECRET = nonce 32;
86 }
87 }
88
89 $DEFAULT_SECRET
90 }
91
92 sub gen_uniq {
93 my $uniq = pack "wN", $$, time;
94 $uniq = MIME::Base64::encode_base64 $uniq, "";
95 $uniq =~ s/=+$//;
96 $uniq
97 }
98
99 our $UNIQ = gen_uniq; # per-process/node unique cookie
100 our $ID = "a";
101 our $PUBLIC = 0;
102 our $NODE = $$;
103
104 our %NODE; # node id to transport mapping, or "undef", for local node
105 our (%PORT, %PORT_DATA); # local ports
106
107 our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
108 our %LMON; # monitored _local_ ports
109
110 our %REG; # registered port names
111
112 our %LISTENER;
113
114 our $SRCNODE; # holds the sending node during _inject
115
116 sub NODE() {
117 $NODE
118 }
119
120 sub node_of($) {
121 my ($noderef, undef) = split /#/, $_[0], 2;
122
123 $noderef
124 }
125
126 sub _ANY_() { 1 }
127 sub _any_() { \&_ANY_ }
128
129 sub _inject {
130 &{ $PORT{+shift} or return };
131 }
132
133 sub add_node {
134 my ($noderef) = @_;
135
136 return $NODE{$noderef}
137 if exists $NODE{$noderef};
138
139 for (split /,/, $noderef) {
140 return $NODE{$noderef} = $NODE{$_}
141 if exists $NODE{$_};
142 }
143
144 # for indirect sends, use a different class
145 my $node = new AnyEvent::MP::Node::Direct $noderef;
146
147 $NODE{$_} = $node
148 for $noderef, split /,/, $noderef;
149
150 $node
151 }
152
153 sub snd(@) {
154 my ($noderef, $port) = split /#/, shift, 2;
155
156 ($NODE{$noderef} || add_node $noderef)
157 ->send ([$port, @_]);
158 }
159
160 sub kil(@) {
161 my ($noderef, $port) = split /#/, shift, 2;
162
163 ($NODE{$noderef} || add_node $noderef)
164 ->kill ($port, @_);
165 }
166
167 sub become_public {
168 return if $PUBLIC;
169
170 my $noderef = join ",", @_;
171 my @args = @_;
172
173 $NODE = (AnyEvent::MP::Node::normalise_noderef $noderef)->recv;
174
175 for my $t (split /,/, $NODE) {
176 $NODE{$t} = $NODE{""};
177
178 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
179
180 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
181 @args,
182 sub {
183 my ($tp) = @_;
184
185 # TODO: urgs
186 my $node = add_node $tp->{remote_node};
187 $node->{trial}{accept} = $tp;
188 },
189 ;
190 }
191
192 $PUBLIC = 1;
193 }
194
195 #############################################################################
196 # self node code
197
198 our %node_req = (
199 # monitoring
200 mon0 => sub { # disable monitoring
201 my $portid = shift;
202 my $node = $SRCNODE;
203 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
204 },
205 mon1 => sub { # enable monitoring
206 my $portid = shift;
207 my $node = $SRCNODE;
208 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
209 $node->send (["", kil => $portid, @_]);
210 });
211 },
212 kil => sub {
213 my $cbs = delete $SRCNODE->{lmon}{+shift}
214 or return;
215
216 $_->(@_) for @$cbs;
217 },
218
219 # well-known-port lookup
220 lookup => sub {
221 my $name = shift;
222 my $port = $REG{$name};
223 #TODO: check vailidity
224 snd @_, $port;
225 },
226
227 # relay message to another node / generic echo
228 relay => sub {
229 &snd;
230 },
231
232 # random garbage
233 eval => sub {
234 my @res = eval shift;
235 snd @_, "$@", @res if @_;
236 },
237 time => sub {
238 snd @_, AE::time;
239 },
240 devnull => sub {
241 #
242 },
243 );
244
245 $NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE;
246 $PORT{""} = sub { &{ $node_req{+shift} or return } };
247
248 =back
249
250 =head1 SEE ALSO
251
252 L<AnyEvent::MP>.
253
254 =head1 AUTHOR
255
256 Marc Lehmann <schmorp@schmorp.de>
257 http://home.schmorp.de/
258
259 =cut
260
261 1
262