ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Base.pm
Revision: 1.9
Committed: Tue Aug 4 21:06:47 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
CVS Tags: rel-0_1
Changes since 1.8: +2 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Base - basis for AnyEvent::MP and Coro::MP
4    
5     =head1 SYNOPSIS
6    
7     # use AnyEvent::MP or Coro::MP instead
8    
9 root 1.3 =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.4 =head1 GLOBALS
16    
17     =over 4
18    
19 root 1.1 =cut
20    
21     package AnyEvent::MP::Base;
22    
23     use common::sense;
24     use Carp ();
25 root 1.6 use MIME::Base64 ();
26 root 1.1
27     use AE ();
28    
29 root 1.6 use AnyEvent::MP::Node;
30     use AnyEvent::MP::Transport;
31    
32 root 1.1 use base "Exporter";
33    
34     our $VERSION = '0.01';
35 root 1.5 our @EXPORT = qw(
36 root 1.8 %NODE %PORT %PORT_DATA %REG $UNIQ $ID add_node
37    
38     NODE $NODE node_of snd kil _any_
39 root 1.5 become_slave become_public
40     );
41 root 1.1
42     our $DEFAULT_SECRET;
43    
44     our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
45     our $CONNECT_TIMEOUT = 30; # includes handshake
46    
47 root 1.4 =item $AnyEvent::MP::Base::WARN
48    
49     This value is called with an error or warning message, when e.g. a connection
50     could not be created, authorisation failed and so on.
51    
52     The default simply logs the message to STDERR.
53    
54     =cut
55    
56 root 1.3 our $WARN = sub {
57 root 1.8 my $msg = $_[0];
58     $msg =~ s/\n$//;
59     warn "$msg\n";
60 root 1.3 };
61    
62 root 1.1 sub nonce($) {
63     my $nonce;
64    
65     if (open my $fh, "</dev/urandom") {
66     sysread $fh, $nonce, $_[0];
67     } else {
68     # shit...
69     our $nonce_init;
70     unless ($nonce_init++) {
71     srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
72     }
73    
74     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
75     }
76    
77     $nonce
78     }
79    
80     sub default_secret {
81     unless (defined $DEFAULT_SECRET) {
82     if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
83     sysread $fh, $DEFAULT_SECRET, -s $fh;
84     } else {
85 root 1.2 $DEFAULT_SECRET = nonce 32;
86 root 1.1 }
87     }
88    
89     $DEFAULT_SECRET
90     }
91    
92 root 1.6 sub gen_uniq {
93     my $uniq = pack "wN", $$, time;
94     $uniq = MIME::Base64::encode_base64 $uniq, "";
95     $uniq =~ s/=+$//;
96     $uniq
97     }
98    
99     our $UNIQ = gen_uniq; # per-process/node unique cookie
100 root 1.1 our $ID = "a";
101     our $PUBLIC = 0;
102 root 1.9 our $NODE = unpack "H*", nonce 16;
103 root 1.1
104     our %NODE; # node id to transport mapping, or "undef", for local node
105 root 1.8 our (%PORT, %PORT_DATA); # local ports
106 root 1.5
107     our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
108     our %LMON; # monitored _local_ ports
109    
110 root 1.8 our %REG; # registered port names
111    
112     our %LISTENER;
113 root 1.1
114 root 1.5 our $SRCNODE; # holds the sending node during _inject
115    
116 root 1.8 sub NODE() {
117     $NODE
118     }
119    
120     sub node_of($) {
121     my ($noderef, undef) = split /#/, $_[0], 2;
122    
123     $noderef
124     }
125 root 1.1
126     sub _ANY_() { 1 }
127     sub _any_() { \&_ANY_ }
128    
129     sub _inject {
130 root 1.5 &{ $PORT{+shift} or return };
131 root 1.1 }
132    
133     sub add_node {
134     my ($noderef) = @_;
135    
136     return $NODE{$noderef}
137     if exists $NODE{$noderef};
138    
139     for (split /,/, $noderef) {
140     return $NODE{$noderef} = $NODE{$_}
141     if exists $NODE{$_};
142     }
143    
144     # for indirect sends, use a different class
145     my $node = new AnyEvent::MP::Node::Direct $noderef;
146    
147     $NODE{$_} = $node
148     for $noderef, split /,/, $noderef;
149    
150     $node
151     }
152    
153     sub snd(@) {
154     my ($noderef, $port) = split /#/, shift, 2;
155    
156 root 1.5 ($NODE{$noderef} || add_node $noderef)
157     ->send ([$port, @_]);
158     }
159    
160 root 1.8 sub kil(@) {
161 root 1.5 my ($noderef, $port) = split /#/, shift, 2;
162    
163 root 1.7 ($NODE{$noderef} || add_node $noderef)
164     ->kill ($port, @_);
165 root 1.1 }
166    
167     sub become_public {
168     return if $PUBLIC;
169    
170     my $noderef = join ",", @_;
171     my @args = @_;
172    
173     $NODE = (AnyEvent::MP::Node::normalise_noderef $noderef)->recv;
174    
175     for my $t (split /,/, $NODE) {
176     $NODE{$t} = $NODE{""};
177    
178     my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
179    
180     $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
181     @args,
182     sub {
183     my ($tp) = @_;
184    
185 root 1.5 # TODO: urgs
186     my $node = add_node $tp->{remote_node};
187     $node->{trial}{accept} = $tp;
188 root 1.1 },
189     ;
190     }
191    
192     $PUBLIC = 1;
193     }
194    
195     #############################################################################
196     # self node code
197    
198 root 1.5 our %node_req = (
199     # monitoring
200     mon0 => sub { # disable monitoring
201     my $portid = shift;
202     my $node = $SRCNODE;
203     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
204     },
205     mon1 => sub { # enable monitoring
206     my $portid = shift;
207     my $node = $SRCNODE;
208     $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
209 root 1.8 $node->send (["", kil => $portid, @_]);
210 root 1.5 });
211     },
212 root 1.7 kil => sub {
213 root 1.8 my $cbs = delete $SRCNODE->{lmon}{+shift}
214 root 1.5 or return;
215    
216 root 1.8 $_->(@_) for @$cbs;
217 root 1.5 },
218    
219     # well-known-port lookup
220 root 1.8 lookup => sub {
221     my $name = shift;
222     my $port = $REG{$name};
223     #TODO: check vailidity
224     snd @_, $port;
225 root 1.5 },
226    
227     # relay message to another node / generic echo
228     relay => sub {
229     &snd;
230     },
231    
232     # random garbage
233     eval => sub {
234     my @res = eval shift;
235     snd @_, "$@", @res if @_;
236     },
237     time => sub {
238     snd @_, AE::time;
239     },
240     devnull => sub {
241     #
242     },
243     );
244    
245 root 1.9 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
246 root 1.5 $PORT{""} = sub { &{ $node_req{+shift} or return } };
247 root 1.1
248 root 1.4 =back
249    
250 root 1.1 =head1 SEE ALSO
251    
252     L<AnyEvent::MP>.
253    
254     =head1 AUTHOR
255    
256     Marc Lehmann <schmorp@schmorp.de>
257     http://home.schmorp.de/
258    
259     =cut
260    
261     1
262