| 1 |
#!/usr/bin/perl |
| 2 |
|
| 3 |
use Coro; |
| 4 |
use Coro::Handle; |
| 5 |
use Coro::Event; |
| 6 |
use Coro::Semaphore; |
| 7 |
use Coro::Channel; |
| 8 |
use Coro::Signal; |
| 9 |
use Coro::RWLock; |
| 10 |
|
| 11 |
use Set::Scalar; |
| 12 |
|
| 13 |
use Fcntl; |
| 14 |
|
| 15 |
our $VERSION = 0.1; |
| 16 |
|
| 17 |
our $NOW = time; |
| 18 |
|
| 19 |
require "config.pl"; |
| 20 |
|
| 21 |
use folder; |
| 22 |
use vc; |
| 23 |
|
| 24 |
$v = $VERBOSE; |
| 25 |
|
| 26 |
# WARNING: |
| 27 |
# Content-Length headers are deliberately being ignored. They |
| 28 |
# are broken by design and will never be supported |
| 29 |
|
| 30 |
# TODO: real message-id parsing |
| 31 |
|
| 32 |
$|=1; |
| 33 |
|
| 34 |
my $ecnt; |
| 35 |
|
| 36 |
sub slog { |
| 37 |
if ($_[0] <= $v) { |
| 38 |
print STDERR "[$SLAVE] ", $_[1]; |
| 39 |
} |
| 40 |
} |
| 41 |
|
| 42 |
$lockdisk = new Coro::Semaphore; |
| 43 |
|
| 44 |
# give up a/the timeslice |
| 45 |
sub give { |
| 46 |
Coro::Event::do_timer(after => 0); |
| 47 |
} |
| 48 |
|
| 49 |
my $quit = new Coro::RWLock; |
| 50 |
|
| 51 |
sub rwlock::DESTROY { |
| 52 |
$quit->unlock; |
| 53 |
} |
| 54 |
|
| 55 |
sub quit_guard { |
| 56 |
$quit->rdlock; |
| 57 |
bless [], rwlock; |
| 58 |
} |
| 59 |
|
| 60 |
sub find_folders { |
| 61 |
my @folders; |
| 62 |
|
| 63 |
opendir my $dh, $PREFIX |
| 64 |
or die "$PREFIX: $!"; |
| 65 |
|
| 66 |
while (defined (my $folder = readdir $dh)) { |
| 67 |
next if $folder =~ /^\./; |
| 68 |
next if $folder =~ /~$/; |
| 69 |
next unless -f "$PREFIX/$folder"; |
| 70 |
push @folders, $folder; |
| 71 |
} |
| 72 |
|
| 73 |
@folders; |
| 74 |
} |
| 75 |
|
| 76 |
my $sync_folder = new Coro::Semaphore 3; # max 3 folders in parallel |
| 77 |
|
| 78 |
sub sync_offer { |
| 79 |
my ($folder, $avc, $diff) = @_; |
| 80 |
|
| 81 |
my $vc = create vc pri => 1; |
| 82 |
$avc->snd("offer", $vc->{port}); |
| 83 |
|
| 84 |
async { |
| 85 |
$vc->rcv; # need to synchronize, argl. should open on other side |
| 86 |
$vc->snd(join "\0", @$diff); |
| 87 |
my %dup; @dup{split /\0/, $vc->rcv} = (); |
| 88 |
|
| 89 |
for (@$diff) { |
| 90 |
$vc->snd($folder->fetch($_)) unless exists $dup{$_}; |
| 91 |
} |
| 92 |
|
| 93 |
defined $vc->rcv and die "protocol error, expected close"; |
| 94 |
}; |
| 95 |
} |
| 96 |
|
| 97 |
sub sync_folder { |
| 98 |
my $name = $_[0]; |
| 99 |
|
| 100 |
my $quit_guard = quit_guard; |
| 101 |
async { |
| 102 |
my $guard = $sync_folder->guard; |
| 103 |
my $vc = create vc; |
| 104 |
|
| 105 |
my $folder = new folder name => $name; |
| 106 |
|
| 107 |
$vc->snd("open", $name); |
| 108 |
$folder->read_mdif; |
| 109 |
|
| 110 |
my $ctime = $folder->{host}{$OTHERNAME} || -1; |
| 111 |
|
| 112 |
$vc->snd("diff", $ctime); |
| 113 |
|
| 114 |
my %diff; # |
| 115 |
# 00 - local del |
| 116 |
# 01 - local add |
| 117 |
# 10 - remote del |
| 118 |
# 11 - remote add |
| 119 |
|
| 120 |
my @diff = grep { $_->[0] > $ctime } @{$folder->{diff}}; |
| 121 |
|
| 122 |
$diff = $vc->rcv; |
| 123 |
while () { |
| 124 |
if ($diff >= 0 and (!@diff or $diff < $diff[0][0])) { |
| 125 |
$diff{$_} = 0b01 for split /\0/, $vc->rcv; # add |
| 126 |
$diff{$_} = 0b00 for split /\0/, $vc->rcv; # del |
| 127 |
|
| 128 |
$diff = $vc->rcv; |
| 129 |
} elsif (@diff) { |
| 130 |
$diff{$_} = 0b11 for @{$diff[0][1]}; |
| 131 |
$diff{$_} = 0b10 for @{$diff[0][2]}; |
| 132 |
|
| 133 |
shift @diff; |
| 134 |
} else { |
| 135 |
last; |
| 136 |
} |
| 137 |
} |
| 138 |
|
| 139 |
# append or update, depending on wether there are messages to be deleted |
| 140 |
$vc->snd("begin"); |
| 141 |
$folder->begin_update; |
| 142 |
|
| 143 |
while (my ($k,$v) = each %diff) { |
| 144 |
push @{$diff[$v]}, $k; |
| 145 |
slog 0, "DIFF $k : $v\n";#d# |
| 146 |
} |
| 147 |
|
| 148 |
$vc->snd("delete", join "\0", @{$diff[2]}); |
| 149 |
$folder->delete(@{$diff[0]}); |
| 150 |
|
| 151 |
# offer ours |
| 152 |
my $offer_coro = sync_offer($folder, $vc, $diff[3]); |
| 153 |
|
| 154 |
# request theirs |
| 155 |
{ |
| 156 |
my @send = grep { !$folder->exists($_) } @{$diff[1]}; |
| 157 |
$vc->snd("send", join "\0", @send); |
| 158 |
$folder->append($_, $vc->rcv) for @send; |
| 159 |
$vc->snd("-"); # sync |
| 160 |
} |
| 161 |
|
| 162 |
slog 0, "waiting...\n";#d# |
| 163 |
$offer_coro->join; |
| 164 |
|
| 165 |
# sanity check |
| 166 |
$vc->snd("inventory"); |
| 167 |
if ($folder->inventory ne $vc->rcv) { |
| 168 |
$folder->write_mdif; |
| 169 |
slog 0, "FATAL: folder inventory mismatch after update\n"; |
| 170 |
} |
| 171 |
|
| 172 |
$vc->snd("end"); |
| 173 |
$folder->end_update; |
| 174 |
|
| 175 |
$vc->snd("setctime", $folder->{ctime}); |
| 176 |
|
| 177 |
$vc->snd("mtime"); |
| 178 |
$folder->{host}{$OTHERNAME} = $vc->rcv; |
| 179 |
|
| 180 |
$vc->snd("close"); |
| 181 |
$folder->close; |
| 182 |
|
| 183 |
undef $quit_guard; |
| 184 |
} |
| 185 |
} |
| 186 |
|
| 187 |
sub main { |
| 188 |
my $vc = create vc; |
| 189 |
|
| 190 |
# time checking done symmetrically |
| 191 |
{ |
| 192 |
my $time = time; |
| 193 |
$vc->snd("time"); |
| 194 |
my $othertime = $vc->rcv; |
| 195 |
abs (($time + time)*0.5 - $othertime) <= $::MAXTIMEDIFF |
| 196 |
or die "ERROR: time difference between hosts larger than $::MAXTIMEDIFF"; |
| 197 |
} |
| 198 |
|
| 199 |
$vc->snd("setname", $MYNAME); $OTHERNAME = $vc->rcv; |
| 200 |
|
| 201 |
if ($SLAVE) { |
| 202 |
# |
| 203 |
} else { |
| 204 |
$vc->snd("list"); |
| 205 |
for (split /\0/, $vc->rcv) { |
| 206 |
if (!-e "$PREFIX/$_") { |
| 207 |
slog 2, "created new empty folder $_\n"; |
| 208 |
sysopen my $fh, "$PREFIX/$_", O_RDWR|O_CREAT, 0666; |
| 209 |
} |
| 210 |
} |
| 211 |
|
| 212 |
for my $folder (find_folders) { |
| 213 |
sync_folder $folder; |
| 214 |
} |
| 215 |
|
| 216 |
$quit->wrlock; |
| 217 |
|
| 218 |
vc::snd_quit; |
| 219 |
} |
| 220 |
} |
| 221 |
|
| 222 |
sub serve_folder { |
| 223 |
my $vc = shift; |
| 224 |
my $name = $vc->rcv; |
| 225 |
my $folder = new folder name => $name; |
| 226 |
|
| 227 |
slog 8, "serving folder $name\n"; |
| 228 |
|
| 229 |
$folder->read_mdif; |
| 230 |
|
| 231 |
while (my $msg = $vc->rcv) { |
| 232 |
if ($msg eq "mtime") { |
| 233 |
$vc->snd($folder->{mtime}); |
| 234 |
} elsif ($msg eq "inventory") { |
| 235 |
$vc->snd($folder->inventory); |
| 236 |
} elsif ($msg eq "diff") { |
| 237 |
my $time = $vc->rcv; |
| 238 |
for (@{$folder->{diff}}) { |
| 239 |
next if $_->[0] <= $time; |
| 240 |
$vc->snd($_->[0], |
| 241 |
(join "\0", @{$_->[1]}), |
| 242 |
(join "\0", @{$_->[2]}), |
| 243 |
); |
| 244 |
} |
| 245 |
$vc->snd(-1); |
| 246 |
} elsif ($msg eq "begin") { |
| 247 |
$folder->begin_update; |
| 248 |
} elsif ($msg eq "delete") { |
| 249 |
$folder->delete(split /\0/, $vc->rcv); |
| 250 |
} elsif ($msg eq "offer") { |
| 251 |
my $ovc = catch vc port => $vc->rcv; |
| 252 |
async { |
| 253 |
my @offer; |
| 254 |
{ |
| 255 |
my @dup; |
| 256 |
|
| 257 |
$ovc->snd("-"); # synchronize |
| 258 |
for (split /\0/, $ovc->rcv) { |
| 259 |
if ($folder->exists($_)) { |
| 260 |
push @dup, $_; |
| 261 |
} else { |
| 262 |
push @offer, $_; |
| 263 |
} |
| 264 |
} |
| 265 |
|
| 266 |
$ovc->snd(join "\0", @dup); |
| 267 |
} |
| 268 |
|
| 269 |
# now we'll get everything in @offer, in order |
| 270 |
$folder->append($_, $ovc->rcv) for @offer; |
| 271 |
$ovc->close; |
| 272 |
}; |
| 273 |
} elsif ($msg eq "send") { |
| 274 |
$vc->pri(1); |
| 275 |
$vc->snd($folder->fetch($_)) for split /\0/, $vc->rcv; |
| 276 |
$vc->rcv; # sync |
| 277 |
$vc->pri(0); |
| 278 |
} elsif ($msg eq "end") { |
| 279 |
$folder->end_update; |
| 280 |
} elsif ($msg eq "mtime") { |
| 281 |
$vc->snd($folder->{mtime}); |
| 282 |
} elsif ($msg eq "setctime") { |
| 283 |
$folder->{host}{$OTHERNAME} = $vc->rcv; |
| 284 |
} elsif ($msg eq "close") { |
| 285 |
$folder->close; |
| 286 |
} else { |
| 287 |
die "protocol error, unknown folder command ($msg)\n"; |
| 288 |
} |
| 289 |
} |
| 290 |
} |
| 291 |
|
| 292 |
sub serve { |
| 293 |
my $vc = shift; |
| 294 |
|
| 295 |
slog 8, "new connection $vc->{port}\n"; |
| 296 |
|
| 297 |
while (my $msg = $vc->rcv) { |
| 298 |
if ($msg eq "setname") { |
| 299 |
$vc->snd($MYNAME); |
| 300 |
$OTHERNAME = $vc->rcv; |
| 301 |
} elsif ($msg eq "pri") { |
| 302 |
$vc->{pri} = $vc->rcv; |
| 303 |
} elsif ($msg eq "time") { |
| 304 |
$vc->snd(time); |
| 305 |
} elsif ($msg eq "list") { |
| 306 |
$vc->snd(join "\0", find_folders); |
| 307 |
} elsif ($msg eq "open") { |
| 308 |
serve_folder($vc); |
| 309 |
} else { |
| 310 |
die "protocol error, unknown command ($msg)\n"; |
| 311 |
} |
| 312 |
} |
| 313 |
} |
| 314 |
|
| 315 |
if ($SLAVE) { |
| 316 |
$HOSTID = "slave"; |
| 317 |
async \&main; |
| 318 |
vc::multiplex unblock \*STDIN, $SLAVE; |
| 319 |
} else { |
| 320 |
$HOSTID = "master"; |
| 321 |
{ |
| 322 |
use Socket; |
| 323 |
socketpair S1, S2, AF_UNIX, SOCK_STREAM, PF_UNSPEC; |
| 324 |
if (fork == 0) { |
| 325 |
close S1; |
| 326 |
open STDIN, "<&S2" or die; |
| 327 |
open STDOUT, ">&S2" or die; |
| 328 |
exec $0, "--slave"; |
| 329 |
exit 255; |
| 330 |
} |
| 331 |
close S2; |
| 332 |
async \&main; |
| 333 |
vc::multiplex unblock \*S1, $SLAVE; |
| 334 |
} |
| 335 |
} |
| 336 |
|