ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Base.pm
Revision: 1.2
Committed: Sun Aug 2 14:52:41 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
CVS Tags: rel-0_02
Changes since 1.1: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Base - basis for AnyEvent::MP and Coro::MP
4    
5     =head1 SYNOPSIS
6    
7     # use AnyEvent::MP or Coro::MP instead
8    
9     =cut
10    
11     package AnyEvent::MP::Base;
12    
13     use AnyEvent::MP::Node;
14     use AnyEvent::MP::Transport;
15    
16     use common::sense;
17    
18     use Carp ();
19    
20     use AE ();
21    
22     use base "Exporter";
23    
24     our $VERSION = '0.01';
25     our @EXPORT = qw(NODE $NODE snd _any_ become_slave become_public);
26    
27     our $DEFAULT_SECRET;
28    
29     our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
30     our $CONNECT_TIMEOUT = 30; # includes handshake
31    
32     sub nonce($) {
33     my $nonce;
34    
35     if (open my $fh, "</dev/urandom") {
36     sysread $fh, $nonce, $_[0];
37     } else {
38     # shit...
39     our $nonce_init;
40     unless ($nonce_init++) {
41     srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
42     }
43    
44     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
45     }
46    
47     $nonce
48     }
49    
50     sub default_secret {
51     unless (defined $DEFAULT_SECRET) {
52     if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
53     sysread $fh, $DEFAULT_SECRET, -s $fh;
54     } else {
55 root 1.2 $DEFAULT_SECRET = nonce 32;
56 root 1.1 }
57     }
58    
59     $DEFAULT_SECRET
60     }
61    
62     our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie
63     our $ID = "a";
64     our $PUBLIC = 0;
65     our $NODE = $$;
66     our $PORT;
67    
68     our %NODE; # node id to transport mapping, or "undef", for local node
69     our %PORT; # local ports
70     our %WKP;
71     our %LISTENER; # local transports
72    
73     sub NODE() { $NODE }
74    
75     sub _ANY_() { 1 }
76     sub _any_() { \&_ANY_ }
77    
78     sub _inject {
79     ($PORT{$_[0][0]} or return)->(@{$_[0][1]});
80     }
81    
82     sub add_node {
83     my ($noderef) = @_;
84    
85     return $NODE{$noderef}
86     if exists $NODE{$noderef};
87    
88     for (split /,/, $noderef) {
89     return $NODE{$noderef} = $NODE{$_}
90     if exists $NODE{$_};
91     }
92    
93     # for indirect sends, use a different class
94     my $node = new AnyEvent::MP::Node::Direct $noderef;
95    
96     $NODE{$_} = $node
97     for $noderef, split /,/, $noderef;
98    
99     $node
100     }
101    
102     sub snd(@) {
103     my ($noderef, $port) = split /#/, shift, 2;
104    
105     add_node $noderef
106     unless exists $NODE{$noderef};
107    
108     $NODE{$noderef}->send (["$port", [@_]]);
109     }
110    
111     sub become_public {
112     return if $PUBLIC;
113    
114     my $noderef = join ",", @_;
115     my @args = @_;
116    
117     $NODE = (AnyEvent::MP::Node::normalise_noderef $noderef)->recv;
118    
119     for my $t (split /,/, $NODE) {
120     $NODE{$t} = $NODE{""};
121    
122     my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
123    
124     $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
125     @args,
126     on_error => sub {
127     die "on_error<@_>\n";#d#
128     },
129     on_connect => sub {
130     my ($tp) = @_;
131    
132     $NODE{$tp->{remote_id}} = $_[0];
133     },
134     sub {
135     my ($tp) = @_;
136    
137     $NODE{"$tp->{peerhost}:$tp->{peerport}"} = $tp;
138     },
139     ;
140     }
141    
142     $PUBLIC = 1;
143     }
144    
145     #############################################################################
146     # self node code
147    
148     $NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE;
149     $PORT{""} = sub {
150     given (shift) {
151     when ("wkp") {
152     my $wkname = shift;
153     snd @_, $WKP{$wkname};
154     }
155     when ("relay") {
156     &snd;
157     }
158     when ("eval") {
159     my @res = eval shift;
160     snd @_, "$@", @res if @_;
161     }
162     when ("time") {
163     snd @_, AE::time;
164     }
165     when ("devnull") {
166     #
167     }
168     }
169     };
170    
171     =head1 SEE ALSO
172    
173     L<AnyEvent::MP>.
174    
175     =head1 AUTHOR
176    
177     Marc Lehmann <schmorp@schmorp.de>
178     http://home.schmorp.de/
179    
180     =cut
181    
182     1
183