ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Base.pm
Revision: 1.1
Committed: Sun Aug 2 14:44:37 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Base - basis for AnyEvent::MP and Coro::MP
4
5 =head1 SYNOPSIS
6
7 # use AnyEvent::MP or Coro::MP instead
8
9 =cut
10
11 package AnyEvent::MP::Base;
12
13 use AnyEvent::MP::Node;
14 use AnyEvent::MP::Transport;
15
16 use common::sense;
17
18 use Carp ();
19
20 use AE ();
21
22 use base "Exporter";
23
24 our $VERSION = '0.01';
25 our @EXPORT = qw(NODE $NODE snd _any_ become_slave become_public);
26
27 our $DEFAULT_SECRET;
28
29 our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
30 our $CONNECT_TIMEOUT = 30; # includes handshake
31
32 sub nonce($) {
33 my $nonce;
34
35 if (open my $fh, "</dev/urandom") {
36 sysread $fh, $nonce, $_[0];
37 } else {
38 # shit...
39 our $nonce_init;
40 unless ($nonce_init++) {
41 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
42 }
43
44 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
45 }
46
47 $nonce
48 }
49
50 sub default_secret {
51 unless (defined $DEFAULT_SECRET) {
52 if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
53 sysread $fh, $DEFAULT_SECRET, -s $fh;
54 } else {
55 $DEFAULT_SECRET = AnyEvent::MP::Util::nonce 32;
56 }
57 }
58
59 $DEFAULT_SECRET
60 }
61
62 our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie
63 our $ID = "a";
64 our $PUBLIC = 0;
65 our $NODE = $$;
66 our $PORT;
67
68 our %NODE; # node id to transport mapping, or "undef", for local node
69 our %PORT; # local ports
70 our %WKP;
71 our %LISTENER; # local transports
72
73 sub NODE() { $NODE }
74
75 sub _ANY_() { 1 }
76 sub _any_() { \&_ANY_ }
77
78 sub _inject {
79 ($PORT{$_[0][0]} or return)->(@{$_[0][1]});
80 }
81
82 sub add_node {
83 my ($noderef) = @_;
84
85 return $NODE{$noderef}
86 if exists $NODE{$noderef};
87
88 for (split /,/, $noderef) {
89 return $NODE{$noderef} = $NODE{$_}
90 if exists $NODE{$_};
91 }
92
93 # for indirect sends, use a different class
94 my $node = new AnyEvent::MP::Node::Direct $noderef;
95
96 $NODE{$_} = $node
97 for $noderef, split /,/, $noderef;
98
99 $node
100 }
101
102 sub snd(@) {
103 my ($noderef, $port) = split /#/, shift, 2;
104
105 add_node $noderef
106 unless exists $NODE{$noderef};
107
108 $NODE{$noderef}->send (["$port", [@_]]);
109 }
110
111 sub become_public {
112 return if $PUBLIC;
113
114 my $noderef = join ",", @_;
115 my @args = @_;
116
117 $NODE = (AnyEvent::MP::Node::normalise_noderef $noderef)->recv;
118
119 for my $t (split /,/, $NODE) {
120 $NODE{$t} = $NODE{""};
121
122 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
123
124 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
125 @args,
126 on_error => sub {
127 die "on_error<@_>\n";#d#
128 },
129 on_connect => sub {
130 my ($tp) = @_;
131
132 $NODE{$tp->{remote_id}} = $_[0];
133 },
134 sub {
135 my ($tp) = @_;
136
137 $NODE{"$tp->{peerhost}:$tp->{peerport}"} = $tp;
138 },
139 ;
140 }
141
142 $PUBLIC = 1;
143 }
144
145 #############################################################################
146 # self node code
147
148 $NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE;
149 $PORT{""} = sub {
150 given (shift) {
151 when ("wkp") {
152 my $wkname = shift;
153 snd @_, $WKP{$wkname};
154 }
155 when ("relay") {
156 &snd;
157 }
158 when ("eval") {
159 my @res = eval shift;
160 snd @_, "$@", @res if @_;
161 }
162 when ("time") {
163 snd @_, AE::time;
164 }
165 when ("devnull") {
166 #
167 }
168 }
169 };
170
171 =head1 SEE ALSO
172
173 L<AnyEvent::MP>.
174
175 =head1 AUTHOR
176
177 Marc Lehmann <schmorp@schmorp.de>
178 http://home.schmorp.de/
179
180 =cut
181
182 1
183