ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/syncmail/syncmail
Revision: 1.4
Committed: Sun Oct 28 03:51:24 2001 UTC (22 years, 6 months ago) by root
Branch: MAIN
Changes since 1.3: +48 -14 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #!/usr/bin/perl
2    
3     use Coro;
4     use Coro::Handle;
5     use Coro::Event;
6     use Coro::Semaphore;
7     use Coro::Channel;
8     use Coro::Signal;
9 root 1.2 use Coro::RWLock;
10 root 1.1
11 root 1.4 use Set::Scalar;
12    
13 root 1.2 use Fcntl;
14 root 1.1
15 root 1.2 use constant PROTVERSION => 1;
16    
17     $VERSION = 0.1;
18    
19 root 1.3 require "config.pl";
20    
21     use folder;
22     use vc;
23    
24     $v = $VERBOSE;
25 root 1.2
26     # WARNING:
27     # Content-Length headers are deliberately being ignored. They
28     # are broken by design and will never be supported
29 root 1.1
30     # TODO: real message-id parsing
31    
32     $|=1;
33    
34     my $ecnt;
35    
36     sub slog {
37     if ($_[0] <= $v) {
38 root 1.2 print STDERR "[$SLAVE] ", $_[1];
39 root 1.1 }
40     }
41    
42 root 1.3 $lockdisk = new Coro::Semaphore;
43 root 1.2
44     # give up a/the timeslice
45     sub give {
46     Coro::Event::do_timer(after => 0);
47     }
48    
49     my $quit = new Coro::RWLock;
50    
51     sub rwlock::DESTROY {
52     $quit->unlock;
53     }
54    
55     sub quit_guard {
56     $quit->rdlock;
57     bless [], rwlock;
58     }
59    
60     sub find_folders {
61     my @folders;
62    
63     opendir my $dh, $PREFIX
64     or die "$PREFIX: $!";
65    
66     while (defined (my $folder = readdir $dh)) {
67     next if $folder =~ /^\./;
68     next unless -f "$PREFIX/$folder";
69     push @folders, $folder;
70     }
71    
72     @folders;
73     }
74    
75     my $sync_folder = new Coro::Semaphore 3; # max 3 folders in parallel
76    
77     sub sync_folder {
78     my $name = $_[0];
79    
80     my $quit_guard = quit_guard;
81     async {
82     my $guard = $sync_folder->guard;
83 root 1.3 my $vc = new vc;
84 root 1.2
85 root 1.3 my $folder = new folder name => $name;
86 root 1.2
87 root 1.3 $vc->snd("open", $name);
88     $vc->snd("mtime");
89 root 1.2
90     $folder->check;
91     $folder->read_mdif;
92 root 1.3
93 root 1.4 my $ctime = $folder->{host}{$OTHERNAME} || -1;
94 root 1.3 my $rtime = $vc->rcv;
95 root 1.1
96 root 1.4 $vc->snd("diff", $ctime);
97    
98     my (%ladd, %ldel, %radd, %rdel);
99    
100     my @diff = grep { $_->[0] > $ctime } @{$folder->{diff}};
101     $diff = $vc->rcv;
102    
103     while () {
104     if ($diff >= 0 and (!@diff or $diff <= $diff[0][0])) {
105     my @add = split /\0/, $vc->rcv;
106     my @del = split /\0/, $vc->rcv;
107     $diff = $vc->rcv;
108     slog 0, "applying remote diff $diff\n";
109     for (@del) { undef $rdel{$_}; delete $radd{$_}; delete $ladd{$_}; }
110     for (@add) { undef $radd{$_}; delete $rdel{$_}; delete $ldel{$_}; }
111     } elsif (@diff) {
112     slog 0, "applying local diff $diff[0][0]\n";
113     for (@{$diff[0][2]}) { undef $rdel{$_}; delete $radd{$_}; delete $ladd{$_}; }
114     for (@{$diff[0][1]}) { undef $radd{$_}; delete $rdel{$_}; delete $ldel{$_}; }
115     shift @diff;
116     } else {
117     slog 0, "no more diffing\n";
118     last;
119     }
120     }
121    
122     slog 0, "LADD ".join(" ", keys %ladd)."\n";
123     slog 0, "LDEL ".join(" ", keys %ldel)."\n";
124     slog 0, "RADD ".join(" ", keys %radd)."\n";
125     slog 0, "RDEL ".join(" ", keys %rdel)."\n";
126    
127     #$folder->{host}{$OTHERNAME} = time;
128     #$vc->snd("setctime", time);
129 root 1.1
130 root 1.2 undef $quit_guard;
131 root 1.1 }
132     }
133    
134 root 1.2 sub main {
135 root 1.3 my $vc = new vc;
136    
137 root 1.2 # time checking done symmetrically
138     {
139     my $time = time;
140 root 1.3 $vc->snd("time");
141     my $othertime = $vc->rcv;
142 root 1.2 abs (($time + time)*0.5 - $othertime) <= $::MAXTIMEDIFF
143     or die "ERROR: time difference between hosts larger than $::MAXTIMEDIFF";
144     }
145 root 1.3 #Coro::Event::do_timer(after => 60);#d#
146    
147     $vc->snd("name");
148     $OTHERNAME = $vc->rcv;
149 root 1.1
150 root 1.2 if ($SLAVE) {
151     #
152     } else {
153 root 1.3 $vc->snd("list");
154     for (split /\0/, $vc->rcv) {
155 root 1.2 if (!-e "$PREFIX/$_") {
156     slog 2, "created new empty folder $_\n";
157     sysopen my $fh, "$PREFIX/$_", O_RDWR|O_CREAT, 0666;
158     }
159     }
160 root 1.1
161 root 1.2 for my $folder (find_folders) {
162     sync_folder $folder;
163 root 1.1 }
164 root 1.2
165     $quit->wrlock;
166 root 1.3
167     vc::snd_quit;
168     }
169     }
170    
171     sub serve_folder {
172     my $vc = shift;
173     my $name = $vc->rcv;
174     my $folder = new folder name => $name;
175    
176     slog 8, "serving folder $name\n";
177    
178     $folder->check;
179     $folder->read_mdif;
180    
181     while (my $msg = $vc->rcv) {
182     if ($msg eq "mtime") {
183     $vc->snd($folder->{mtime});
184 root 1.4 } elsif ($msg eq "diff") {
185     my $time = $vc->rcv;
186     for (@{$folder->{diff}}) {
187     next if $_->[0] <= $time;
188     $vc->snd($_->[0],
189     (join "\0", @{$_->[1]}),
190     (join "\0", @{$_->[2]}),
191     );
192     }
193     $vc->snd(-1);
194     } elsif ($msg eq "setctime") {
195     $folder->{host}{$OTHERNAME} = $vc->rcv;
196 root 1.3 } else {
197     die "protocol error, unknown folder command ($msg)\n";
198     }
199     }
200     }
201    
202     sub serve {
203     my $vc = shift;
204    
205     slog 8, "new connection $vc->{port}\n";
206    
207     while (my $msg = $vc->rcv) {
208     if ($msg eq "name") {
209     $vc->snd($::MYNAME);
210     } elsif ($msg eq "pri") {
211     $self->{pri} = $vc->rcv;
212     } elsif ($msg eq "time") {
213     $vc->snd(time);
214     } elsif ($msg eq "list") {
215     $vc->snd(join "\0", find_folders);
216     } elsif ($msg eq "open") {
217     serve_folder($vc);
218     } else {
219     die "protocol error, unknown command ($msg)\n";
220     }
221 root 1.1 }
222     }
223    
224     if ($SLAVE) {
225     $HOSTID = "slave";
226 root 1.2 async \&main;
227 root 1.3 vc::multiplex unblock \*STDIN, $SLAVE;
228 root 1.1 } else {
229     $HOSTID = "master";
230     {
231     use Socket;
232     socketpair S1, S2, AF_UNIX, SOCK_STREAM, PF_UNSPEC;
233     if (fork == 0) {
234 root 1.2 close S1;
235 root 1.1 open STDIN, "<&S2" or die;
236     open STDOUT, ">&S2" or die;
237     exec $0, "--slave";
238     exit 255;
239     }
240 root 1.2 close S2;
241     async \&main;
242 root 1.3 vc::multiplex unblock \*S1, $SLAVE;
243 root 1.1 }
244     }
245