ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Base.pm
Revision: 1.3
Committed: Mon Aug 3 14:47:25 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Changes since 1.2: +10 -0 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Base - basis for AnyEvent::MP and Coro::MP
4
5 =head1 SYNOPSIS
6
7 # use AnyEvent::MP or Coro::MP instead
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 =cut
16
17 package AnyEvent::MP::Base;
18
19 use AnyEvent::MP::Node;
20 use AnyEvent::MP::Transport;
21
22 use common::sense;
23
24 use Carp ();
25
26 use AE ();
27
28 use base "Exporter";
29
30 our $VERSION = '0.01';
31 our @EXPORT = qw(NODE $NODE snd _any_ become_slave become_public);
32
33 our $DEFAULT_SECRET;
34
35 our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
36 our $CONNECT_TIMEOUT = 30; # includes handshake
37
38 our $WARN = sub {
39 warn "$_[0]\n";
40 };
41
42 sub nonce($) {
43 my $nonce;
44
45 if (open my $fh, "</dev/urandom") {
46 sysread $fh, $nonce, $_[0];
47 } else {
48 # shit...
49 our $nonce_init;
50 unless ($nonce_init++) {
51 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
52 }
53
54 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
55 }
56
57 $nonce
58 }
59
60 sub default_secret {
61 unless (defined $DEFAULT_SECRET) {
62 if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
63 sysread $fh, $DEFAULT_SECRET, -s $fh;
64 } else {
65 $DEFAULT_SECRET = nonce 32;
66 }
67 }
68
69 $DEFAULT_SECRET
70 }
71
72 our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie
73 our $ID = "a";
74 our $PUBLIC = 0;
75 our $NODE = $$;
76 our $PORT;
77
78 our %NODE; # node id to transport mapping, or "undef", for local node
79 our %PORT; # local ports
80 our %WKP;
81 our %LISTENER; # local transports
82
83 sub NODE() { $NODE }
84
85 sub _ANY_() { 1 }
86 sub _any_() { \&_ANY_ }
87
88 sub _inject {
89 ($PORT{$_[0][0]} or return)->(@{$_[0][1]});
90 }
91
92 sub add_node {
93 my ($noderef) = @_;
94
95 return $NODE{$noderef}
96 if exists $NODE{$noderef};
97
98 for (split /,/, $noderef) {
99 return $NODE{$noderef} = $NODE{$_}
100 if exists $NODE{$_};
101 }
102
103 # for indirect sends, use a different class
104 my $node = new AnyEvent::MP::Node::Direct $noderef;
105
106 $NODE{$_} = $node
107 for $noderef, split /,/, $noderef;
108
109 $node
110 }
111
112 sub snd(@) {
113 my ($noderef, $port) = split /#/, shift, 2;
114
115 add_node $noderef
116 unless exists $NODE{$noderef};
117
118 $NODE{$noderef}->send (["$port", [@_]]);
119 }
120
121 sub become_public {
122 return if $PUBLIC;
123
124 my $noderef = join ",", @_;
125 my @args = @_;
126
127 $NODE = (AnyEvent::MP::Node::normalise_noderef $noderef)->recv;
128
129 for my $t (split /,/, $NODE) {
130 $NODE{$t} = $NODE{""};
131
132 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
133
134 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
135 @args,
136 on_error => sub {
137 die "on_error<@_>\n";#d#
138 },
139 on_connect => sub {
140 my ($tp) = @_;
141
142 $NODE{$tp->{remote_id}} = $_[0];
143 },
144 sub {
145 my ($tp) = @_;
146
147 $NODE{"$tp->{peerhost}:$tp->{peerport}"} = $tp;
148 },
149 ;
150 }
151
152 $PUBLIC = 1;
153 }
154
155 #############################################################################
156 # self node code
157
158 $NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE;
159 $PORT{""} = sub {
160 given (shift) {
161 when ("wkp") {
162 my $wkname = shift;
163 snd @_, $WKP{$wkname};
164 }
165 when ("relay") {
166 &snd;
167 }
168 when ("eval") {
169 my @res = eval shift;
170 snd @_, "$@", @res if @_;
171 }
172 when ("time") {
173 snd @_, AE::time;
174 }
175 when ("devnull") {
176 #
177 }
178 }
179 };
180
181 =head1 SEE ALSO
182
183 L<AnyEvent::MP>.
184
185 =head1 AUTHOR
186
187 Marc Lehmann <schmorp@schmorp.de>
188 http://home.schmorp.de/
189
190 =cut
191
192 1
193