ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Base.pm
Revision: 1.4
Committed: Mon Aug 3 15:40:53 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.3: +15 -0 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Base - basis for AnyEvent::MP and Coro::MP
4
5 =head1 SYNOPSIS
6
7 # use AnyEvent::MP or Coro::MP instead
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 =head1 GLOBALS
16
17 =over 4
18
19 =cut
20
21 package AnyEvent::MP::Base;
22
23 use AnyEvent::MP::Node;
24 use AnyEvent::MP::Transport;
25
26 use common::sense;
27
28 use Carp ();
29
30 use AE ();
31
32 use base "Exporter";
33
34 our $VERSION = '0.01';
35 our @EXPORT = qw(NODE $NODE snd _any_ become_slave become_public);
36
37 our $DEFAULT_SECRET;
38
39 our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
40 our $CONNECT_TIMEOUT = 30; # includes handshake
41
42 =item $AnyEvent::MP::Base::WARN
43
44 This value is called with an error or warning message, when e.g. a connection
45 could not be created, authorisation failed and so on.
46
47 The default simply logs the message to STDERR.
48
49 =cut
50
51 our $WARN = sub {
52 warn "$_[0]\n";
53 };
54
55 sub nonce($) {
56 my $nonce;
57
58 if (open my $fh, "</dev/urandom") {
59 sysread $fh, $nonce, $_[0];
60 } else {
61 # shit...
62 our $nonce_init;
63 unless ($nonce_init++) {
64 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
65 }
66
67 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
68 }
69
70 $nonce
71 }
72
73 sub default_secret {
74 unless (defined $DEFAULT_SECRET) {
75 if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
76 sysread $fh, $DEFAULT_SECRET, -s $fh;
77 } else {
78 $DEFAULT_SECRET = nonce 32;
79 }
80 }
81
82 $DEFAULT_SECRET
83 }
84
85 our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie
86 our $ID = "a";
87 our $PUBLIC = 0;
88 our $NODE = $$;
89 our $PORT;
90
91 our %NODE; # node id to transport mapping, or "undef", for local node
92 our %PORT; # local ports
93 our %WKP;
94 our %LISTENER; # local transports
95
96 sub NODE() { $NODE }
97
98 sub _ANY_() { 1 }
99 sub _any_() { \&_ANY_ }
100
101 sub _inject {
102 ($PORT{$_[0][0]} or return)->(@{$_[0][1]});
103 }
104
105 sub add_node {
106 my ($noderef) = @_;
107
108 return $NODE{$noderef}
109 if exists $NODE{$noderef};
110
111 for (split /,/, $noderef) {
112 return $NODE{$noderef} = $NODE{$_}
113 if exists $NODE{$_};
114 }
115
116 # for indirect sends, use a different class
117 my $node = new AnyEvent::MP::Node::Direct $noderef;
118
119 $NODE{$_} = $node
120 for $noderef, split /,/, $noderef;
121
122 $node
123 }
124
125 sub snd(@) {
126 my ($noderef, $port) = split /#/, shift, 2;
127
128 add_node $noderef
129 unless exists $NODE{$noderef};
130
131 $NODE{$noderef}->send (["$port", [@_]]);
132 }
133
134 sub become_public {
135 return if $PUBLIC;
136
137 my $noderef = join ",", @_;
138 my @args = @_;
139
140 $NODE = (AnyEvent::MP::Node::normalise_noderef $noderef)->recv;
141
142 for my $t (split /,/, $NODE) {
143 $NODE{$t} = $NODE{""};
144
145 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
146
147 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
148 @args,
149 on_error => sub {
150 die "on_error<@_>\n";#d#
151 },
152 on_connect => sub {
153 my ($tp) = @_;
154
155 $NODE{$tp->{remote_id}} = $_[0];
156 },
157 sub {
158 my ($tp) = @_;
159
160 $NODE{"$tp->{peerhost}:$tp->{peerport}"} = $tp;
161 },
162 ;
163 }
164
165 $PUBLIC = 1;
166 }
167
168 #############################################################################
169 # self node code
170
171 $NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE;
172 $PORT{""} = sub {
173 given (shift) {
174 when ("wkp") {
175 my $wkname = shift;
176 snd @_, $WKP{$wkname};
177 }
178 when ("relay") {
179 &snd;
180 }
181 when ("eval") {
182 my @res = eval shift;
183 snd @_, "$@", @res if @_;
184 }
185 when ("time") {
186 snd @_, AE::time;
187 }
188 when ("devnull") {
189 #
190 }
191 }
192 };
193
194 =back
195
196 =head1 SEE ALSO
197
198 L<AnyEvent::MP>.
199
200 =head1 AUTHOR
201
202 Marc Lehmann <schmorp@schmorp.de>
203 http://home.schmorp.de/
204
205 =cut
206
207 1
208