ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Base.pm
Revision: 1.7
Committed: Tue Aug 4 14:10:51 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Changes since 1.6: +6 -10 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Base - basis for AnyEvent::MP and Coro::MP
4    
5     =head1 SYNOPSIS
6    
7     # use AnyEvent::MP or Coro::MP instead
8    
9 root 1.3 =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.4 =head1 GLOBALS
16    
17     =over 4
18    
19 root 1.1 =cut
20    
21     package AnyEvent::MP::Base;
22    
23     use common::sense;
24     use Carp ();
25 root 1.6 use MIME::Base64 ();
26 root 1.1
27     use AE ();
28    
29 root 1.6 use AnyEvent::MP::Node;
30     use AnyEvent::MP::Transport;
31    
32 root 1.1 use base "Exporter";
33    
34     our $VERSION = '0.01';
35 root 1.5 our @EXPORT = qw(
36 root 1.7 NODE $NODE snd kil _any_
37 root 1.5 become_slave become_public
38     );
39 root 1.1
40     our $DEFAULT_SECRET;
41    
42     our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
43     our $CONNECT_TIMEOUT = 30; # includes handshake
44    
45 root 1.4 =item $AnyEvent::MP::Base::WARN
46    
47     This value is called with an error or warning message, when e.g. a connection
48     could not be created, authorisation failed and so on.
49    
50     The default simply logs the message to STDERR.
51    
52     =cut
53    
54 root 1.3 our $WARN = sub {
55     warn "$_[0]\n";
56     };
57    
58 root 1.1 sub nonce($) {
59     my $nonce;
60    
61     if (open my $fh, "</dev/urandom") {
62     sysread $fh, $nonce, $_[0];
63     } else {
64     # shit...
65     our $nonce_init;
66     unless ($nonce_init++) {
67     srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
68     }
69    
70     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
71     }
72    
73     $nonce
74     }
75    
76     sub default_secret {
77     unless (defined $DEFAULT_SECRET) {
78     if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
79     sysread $fh, $DEFAULT_SECRET, -s $fh;
80     } else {
81 root 1.2 $DEFAULT_SECRET = nonce 32;
82 root 1.1 }
83     }
84    
85     $DEFAULT_SECRET
86     }
87    
88 root 1.6 sub gen_uniq {
89     my $uniq = pack "wN", $$, time;
90     $uniq = MIME::Base64::encode_base64 $uniq, "";
91     $uniq =~ s/=+$//;
92     $uniq
93     }
94    
95     our $UNIQ = gen_uniq; # per-process/node unique cookie
96 root 1.1 our $ID = "a";
97     our $PUBLIC = 0;
98     our $NODE = $$;
99     our $PORT;
100    
101     our %NODE; # node id to transport mapping, or "undef", for local node
102     our %PORT; # local ports
103 root 1.5
104     our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
105     our %LMON; # monitored _local_ ports
106    
107 root 1.1 our %WKP;
108     our %LISTENER; # local transports
109    
110 root 1.5 our $SRCNODE; # holds the sending node during _inject
111    
112 root 1.1 sub NODE() { $NODE }
113    
114     sub _ANY_() { 1 }
115     sub _any_() { \&_ANY_ }
116    
117     sub _inject {
118 root 1.5 &{ $PORT{+shift} or return };
119 root 1.1 }
120    
121     sub add_node {
122     my ($noderef) = @_;
123    
124     return $NODE{$noderef}
125     if exists $NODE{$noderef};
126    
127     for (split /,/, $noderef) {
128     return $NODE{$noderef} = $NODE{$_}
129     if exists $NODE{$_};
130     }
131    
132     # for indirect sends, use a different class
133     my $node = new AnyEvent::MP::Node::Direct $noderef;
134    
135     $NODE{$_} = $node
136     for $noderef, split /,/, $noderef;
137    
138     $node
139     }
140    
141     sub snd(@) {
142     my ($noderef, $port) = split /#/, shift, 2;
143    
144 root 1.5 ($NODE{$noderef} || add_node $noderef)
145     ->send ([$port, @_]);
146     }
147    
148 root 1.7 sub kil($) {
149 root 1.5 my ($noderef, $port) = split /#/, shift, 2;
150    
151 root 1.7 ($NODE{$noderef} || add_node $noderef)
152     ->kill ($port, @_);
153 root 1.1 }
154    
155     sub become_public {
156     return if $PUBLIC;
157    
158     my $noderef = join ",", @_;
159     my @args = @_;
160    
161     $NODE = (AnyEvent::MP::Node::normalise_noderef $noderef)->recv;
162    
163     for my $t (split /,/, $NODE) {
164     $NODE{$t} = $NODE{""};
165    
166     my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
167    
168     $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
169     @args,
170     sub {
171     my ($tp) = @_;
172    
173 root 1.5 # TODO: urgs
174     my $node = add_node $tp->{remote_node};
175     $node->{trial}{accept} = $tp;
176 root 1.1 },
177     ;
178     }
179    
180     $PUBLIC = 1;
181     }
182    
183     #############################################################################
184     # self node code
185    
186 root 1.5 our %node_req = (
187     # monitoring
188     mon0 => sub { # disable monitoring
189     my $portid = shift;
190     my $node = $SRCNODE;
191     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
192     },
193     mon1 => sub { # enable monitoring
194     my $portid = shift;
195     my $node = $SRCNODE;
196     $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
197 root 1.7 $node->send (["", kil => $portid]);
198 root 1.5 });
199     },
200 root 1.7 kil => sub {
201 root 1.5 my $cbs = delete $SRCNODE->{lmon}{$_[0]}
202     or return;
203    
204     $_->() for @$cbs;
205     },
206    
207     # well-known-port lookup
208     wkp => sub {
209     my $wkname = shift;
210     snd @_, $WKP{$wkname};
211     },
212    
213     # relay message to another node / generic echo
214     relay => sub {
215     &snd;
216     },
217    
218     # random garbage
219     eval => sub {
220     my @res = eval shift;
221     snd @_, "$@", @res if @_;
222     },
223     time => sub {
224     snd @_, AE::time;
225     },
226     devnull => sub {
227     #
228     },
229     );
230    
231 root 1.1 $NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE;
232 root 1.5 $PORT{""} = sub { &{ $node_req{+shift} or return } };
233 root 1.1
234 root 1.4 =back
235    
236 root 1.1 =head1 SEE ALSO
237    
238     L<AnyEvent::MP>.
239    
240     =head1 AUTHOR
241    
242     Marc Lehmann <schmorp@schmorp.de>
243     http://home.schmorp.de/
244    
245     =cut
246    
247     1
248