ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Base.pm
Revision: 1.4
Committed: Mon Aug 3 15:40:53 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.3: +15 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Base - basis for AnyEvent::MP and Coro::MP
4    
5     =head1 SYNOPSIS
6    
7     # use AnyEvent::MP or Coro::MP instead
8    
9 root 1.3 =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.4 =head1 GLOBALS
16    
17     =over 4
18    
19 root 1.1 =cut
20    
21     package AnyEvent::MP::Base;
22    
23     use AnyEvent::MP::Node;
24     use AnyEvent::MP::Transport;
25    
26     use common::sense;
27    
28     use Carp ();
29    
30     use AE ();
31    
32     use base "Exporter";
33    
34     our $VERSION = '0.01';
35     our @EXPORT = qw(NODE $NODE snd _any_ become_slave become_public);
36    
37     our $DEFAULT_SECRET;
38    
39     our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
40     our $CONNECT_TIMEOUT = 30; # includes handshake
41    
42 root 1.4 =item $AnyEvent::MP::Base::WARN
43    
44     This value is called with an error or warning message, when e.g. a connection
45     could not be created, authorisation failed and so on.
46    
47     The default simply logs the message to STDERR.
48    
49     =cut
50    
51 root 1.3 our $WARN = sub {
52     warn "$_[0]\n";
53     };
54    
55 root 1.1 sub nonce($) {
56     my $nonce;
57    
58     if (open my $fh, "</dev/urandom") {
59     sysread $fh, $nonce, $_[0];
60     } else {
61     # shit...
62     our $nonce_init;
63     unless ($nonce_init++) {
64     srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
65     }
66    
67     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
68     }
69    
70     $nonce
71     }
72    
73     sub default_secret {
74     unless (defined $DEFAULT_SECRET) {
75     if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
76     sysread $fh, $DEFAULT_SECRET, -s $fh;
77     } else {
78 root 1.2 $DEFAULT_SECRET = nonce 32;
79 root 1.1 }
80     }
81    
82     $DEFAULT_SECRET
83     }
84    
85     our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie
86     our $ID = "a";
87     our $PUBLIC = 0;
88     our $NODE = $$;
89     our $PORT;
90    
91     our %NODE; # node id to transport mapping, or "undef", for local node
92     our %PORT; # local ports
93     our %WKP;
94     our %LISTENER; # local transports
95    
96     sub NODE() { $NODE }
97    
98     sub _ANY_() { 1 }
99     sub _any_() { \&_ANY_ }
100    
101     sub _inject {
102     ($PORT{$_[0][0]} or return)->(@{$_[0][1]});
103     }
104    
105     sub add_node {
106     my ($noderef) = @_;
107    
108     return $NODE{$noderef}
109     if exists $NODE{$noderef};
110    
111     for (split /,/, $noderef) {
112     return $NODE{$noderef} = $NODE{$_}
113     if exists $NODE{$_};
114     }
115    
116     # for indirect sends, use a different class
117     my $node = new AnyEvent::MP::Node::Direct $noderef;
118    
119     $NODE{$_} = $node
120     for $noderef, split /,/, $noderef;
121    
122     $node
123     }
124    
125     sub snd(@) {
126     my ($noderef, $port) = split /#/, shift, 2;
127    
128     add_node $noderef
129     unless exists $NODE{$noderef};
130    
131     $NODE{$noderef}->send (["$port", [@_]]);
132     }
133    
134     sub become_public {
135     return if $PUBLIC;
136    
137     my $noderef = join ",", @_;
138     my @args = @_;
139    
140     $NODE = (AnyEvent::MP::Node::normalise_noderef $noderef)->recv;
141    
142     for my $t (split /,/, $NODE) {
143     $NODE{$t} = $NODE{""};
144    
145     my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
146    
147     $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
148     @args,
149     on_error => sub {
150     die "on_error<@_>\n";#d#
151     },
152     on_connect => sub {
153     my ($tp) = @_;
154    
155     $NODE{$tp->{remote_id}} = $_[0];
156     },
157     sub {
158     my ($tp) = @_;
159    
160     $NODE{"$tp->{peerhost}:$tp->{peerport}"} = $tp;
161     },
162     ;
163     }
164    
165     $PUBLIC = 1;
166     }
167    
168     #############################################################################
169     # self node code
170    
171     $NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE;
172     $PORT{""} = sub {
173     given (shift) {
174     when ("wkp") {
175     my $wkname = shift;
176     snd @_, $WKP{$wkname};
177     }
178     when ("relay") {
179     &snd;
180     }
181     when ("eval") {
182     my @res = eval shift;
183     snd @_, "$@", @res if @_;
184     }
185     when ("time") {
186     snd @_, AE::time;
187     }
188     when ("devnull") {
189     #
190     }
191     }
192     };
193    
194 root 1.4 =back
195    
196 root 1.1 =head1 SEE ALSO
197    
198     L<AnyEvent::MP>.
199    
200     =head1 AUTHOR
201    
202     Marc Lehmann <schmorp@schmorp.de>
203     http://home.schmorp.de/
204    
205     =cut
206    
207     1
208