ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Base.pm
Revision: 1.7
Committed: Tue Aug 4 14:10:51 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.6: +6 -10 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Base - basis for AnyEvent::MP and Coro::MP
4
5 =head1 SYNOPSIS
6
7 # use AnyEvent::MP or Coro::MP instead
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 =head1 GLOBALS
16
17 =over 4
18
19 =cut
20
21 package AnyEvent::MP::Base;
22
23 use common::sense;
24 use Carp ();
25 use MIME::Base64 ();
26
27 use AE ();
28
29 use AnyEvent::MP::Node;
30 use AnyEvent::MP::Transport;
31
32 use base "Exporter";
33
34 our $VERSION = '0.01';
35 our @EXPORT = qw(
36 NODE $NODE snd kil _any_
37 become_slave become_public
38 );
39
40 our $DEFAULT_SECRET;
41
42 our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
43 our $CONNECT_TIMEOUT = 30; # includes handshake
44
45 =item $AnyEvent::MP::Base::WARN
46
47 This value is called with an error or warning message, when e.g. a connection
48 could not be created, authorisation failed and so on.
49
50 The default simply logs the message to STDERR.
51
52 =cut
53
54 our $WARN = sub {
55 warn "$_[0]\n";
56 };
57
58 sub nonce($) {
59 my $nonce;
60
61 if (open my $fh, "</dev/urandom") {
62 sysread $fh, $nonce, $_[0];
63 } else {
64 # shit...
65 our $nonce_init;
66 unless ($nonce_init++) {
67 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
68 }
69
70 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
71 }
72
73 $nonce
74 }
75
76 sub default_secret {
77 unless (defined $DEFAULT_SECRET) {
78 if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
79 sysread $fh, $DEFAULT_SECRET, -s $fh;
80 } else {
81 $DEFAULT_SECRET = nonce 32;
82 }
83 }
84
85 $DEFAULT_SECRET
86 }
87
88 sub gen_uniq {
89 my $uniq = pack "wN", $$, time;
90 $uniq = MIME::Base64::encode_base64 $uniq, "";
91 $uniq =~ s/=+$//;
92 $uniq
93 }
94
95 our $UNIQ = gen_uniq; # per-process/node unique cookie
96 our $ID = "a";
97 our $PUBLIC = 0;
98 our $NODE = $$;
99 our $PORT;
100
101 our %NODE; # node id to transport mapping, or "undef", for local node
102 our %PORT; # local ports
103
104 our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
105 our %LMON; # monitored _local_ ports
106
107 our %WKP;
108 our %LISTENER; # local transports
109
110 our $SRCNODE; # holds the sending node during _inject
111
112 sub NODE() { $NODE }
113
114 sub _ANY_() { 1 }
115 sub _any_() { \&_ANY_ }
116
117 sub _inject {
118 &{ $PORT{+shift} or return };
119 }
120
121 sub add_node {
122 my ($noderef) = @_;
123
124 return $NODE{$noderef}
125 if exists $NODE{$noderef};
126
127 for (split /,/, $noderef) {
128 return $NODE{$noderef} = $NODE{$_}
129 if exists $NODE{$_};
130 }
131
132 # for indirect sends, use a different class
133 my $node = new AnyEvent::MP::Node::Direct $noderef;
134
135 $NODE{$_} = $node
136 for $noderef, split /,/, $noderef;
137
138 $node
139 }
140
141 sub snd(@) {
142 my ($noderef, $port) = split /#/, shift, 2;
143
144 ($NODE{$noderef} || add_node $noderef)
145 ->send ([$port, @_]);
146 }
147
148 sub kil($) {
149 my ($noderef, $port) = split /#/, shift, 2;
150
151 ($NODE{$noderef} || add_node $noderef)
152 ->kill ($port, @_);
153 }
154
155 sub become_public {
156 return if $PUBLIC;
157
158 my $noderef = join ",", @_;
159 my @args = @_;
160
161 $NODE = (AnyEvent::MP::Node::normalise_noderef $noderef)->recv;
162
163 for my $t (split /,/, $NODE) {
164 $NODE{$t} = $NODE{""};
165
166 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
167
168 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
169 @args,
170 sub {
171 my ($tp) = @_;
172
173 # TODO: urgs
174 my $node = add_node $tp->{remote_node};
175 $node->{trial}{accept} = $tp;
176 },
177 ;
178 }
179
180 $PUBLIC = 1;
181 }
182
183 #############################################################################
184 # self node code
185
186 our %node_req = (
187 # monitoring
188 mon0 => sub { # disable monitoring
189 my $portid = shift;
190 my $node = $SRCNODE;
191 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
192 },
193 mon1 => sub { # enable monitoring
194 my $portid = shift;
195 my $node = $SRCNODE;
196 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
197 $node->send (["", kil => $portid]);
198 });
199 },
200 kil => sub {
201 my $cbs = delete $SRCNODE->{lmon}{$_[0]}
202 or return;
203
204 $_->() for @$cbs;
205 },
206
207 # well-known-port lookup
208 wkp => sub {
209 my $wkname = shift;
210 snd @_, $WKP{$wkname};
211 },
212
213 # relay message to another node / generic echo
214 relay => sub {
215 &snd;
216 },
217
218 # random garbage
219 eval => sub {
220 my @res = eval shift;
221 snd @_, "$@", @res if @_;
222 },
223 time => sub {
224 snd @_, AE::time;
225 },
226 devnull => sub {
227 #
228 },
229 );
230
231 $NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE;
232 $PORT{""} = sub { &{ $node_req{+shift} or return } };
233
234 =back
235
236 =head1 SEE ALSO
237
238 L<AnyEvent::MP>.
239
240 =head1 AUTHOR
241
242 Marc Lehmann <schmorp@schmorp.de>
243 http://home.schmorp.de/
244
245 =cut
246
247 1
248