package SOMEConnection; use strict; use Socket; use AnyEvent; use IO::Socket::INET; sub new { my $this = shift; my $class = ref($this) || $this; my $self = {@_}; bless $self, $class; return $self; } sub client_connect { my ($self, $lid) = @_; } sub client_disconnect { my ($self, $lid) = @_; } sub start_listener { my ($self) = @_; my $sock = IO::Socket::INET->new( Listen => 5, ReuseAddr => 1, Reuse => 1, LocalPort => 1236, Proto => 'tcp' ); $sock or die "Couldn't create listener: $!"; $self->{listener} = AnyEvent->io (poll => 'r', fh => $sock, cb => sub { my $cl = $sock->accept () or die "couldn't accept client: $!"; binmode $cl; $cl->autoflush (1); $self->handle_client ($cl); }); } sub write_data { my ($self, $lid, $data) = @_; return unless $self->{$lid . '_r'}; my $cl = $self->{$lid}->{socket}; $self->{$lid}->{write_buffer} .= $data; unless ($self->{$lid . '_w'}) { $self->{$lid . '_w'} = AnyEvent->io (poll => 'w', fh => $cl, cb => sub { if (my $data = $self->{$lid}->{write_buffer}) { my $len = syswrite $cl, $data; unless ($len) { if (not defined $len) { warn "error when writing data on $lid: $!"; return; } else { delete $self->{$lid . '_w'}; } } if ($len == length $self->{$lid}->{write_buffer}) { delete $self->{$lid . '_w'}; } $self->{$lid}->{write_buffer} = substr $self->{$lid}->{write_buffer}, $len; } }); } } sub handle_client { my ($self, $cl) = @_; my ($chost, $cport) = ($cl->peerhost (), $cl->peerport ()); my $lid = "$chost:$cport"; $self->{$lid}->{socket} = $cl; $self->{$lid . '_r'} = AnyEvent->io (poll => 'r', fh => $cl, cb => sub { my $res = sysread $cl, my $data, 1024; if ($res) { $self->{$lid}->{read_buffer} .= $data; $self->handle_data ($lid, \$self->{$lid}->{read_buffer}); } else { if (not defined $res) { warn "error when receiving data on $lid: $!"; } else { warn "got eof on $lid: $!"; } $self->close_client ($lid); } }); $self->client_connect ($lid); } sub handle_data { my ($self, $lid, $buf) = @_; die "implement"; } sub close_client { my ($self, $lid) = @_; eval { delete $self->{$lid}->{socket} }; delete $self->{$lid . '_r'}; delete $self->{$lid . '_w'}; delete $self->{$lid}; $self->client_disconnect ($lid); } package JSONConnection; use strict; our @ISA = qw/SOMEConnection/; use JSON::Syck; our %CLIENTS; sub client_connect { my ($self, $lid) = @_; $CLIENTS{$lid} = 1; $self->{connect_cb}->($self, $lid); } sub client_disconnect { my ($self, $lid) = @_; delete $CLIENTS{$lid}; } sub broadcast { my ($self, $data) = @_; for (keys %CLIENTS) { $self->send_data ($_, $data); } } sub send_data { my ($self, $lid, $data) = @_; # $JSON::Syck::ImplicitUnicode = 0; my $dump = JSON::Syck::Dump ($data); $self->write_data ($lid, (length $dump) . " " . $dump . "\015\012"); } sub handle_data { my ($self, $lid, $buf) = @_; while ($$buf =~ m/^(\s*(\d+) )(.*)$/s) { my ($prefix, $len, $rembuf) = ($1, $2, $3); if ((length $rembuf) >= $len) { my $data = substr $rembuf, 0, $len; substr $$buf, 0, (length $prefix) + (length $data), ''; # $JSON::Syck::ImplicitUnicode = 1; $self->{packet_cb}->($self, $lid, JSON::Syck::Load ($data)); } else { return } } } package JSONClientConnection; use strict; use JSON::Syck; sub new { my $this = shift; my $class = ref($this) || $this; my $self = { disconnect_cb => sub {}, @_ }; bless $self, $class; return $self; } sub connect { my ($self, $host, $port) = @_; $self->{socket} and return; my $sock = IO::Socket::INET->new ( PeerAddr => $host, PeerPort => $port, Proto => 'tcp', Blocking => 0 ); die "couldn't connect to server '$host:$port': $!\n" unless $sock->connected; $self->{socket} = $sock; $self->{host} = $host; $self->{port} = $port; $self->{r} = AnyEvent->io (poll => 'r', fh => $sock, cb => sub { my $l = sysread $sock, my $data, 1024; $self->{read_buffer} .= $data; $self->handle_data (\$self->{read_buffer}); unless ($l) { if (defined $l) { $self->{disconnect_cb}->("EOF from json server '$host:$port'"); delete $self->{r}; delete $self->{socket}; return; } else { $self->{disconnect_cb}->("Error while reading from json server '$host:$port': $!"); delete $self->{socket}; delete $self->{r}; return; } } }); } sub handle_data { my ($self, $buf) = @_; while ($$buf =~ m/^(\s*(\d+) )(.*)$/s) { my ($prefix, $len, $rembuf) = ($1, $2, $3); if ((length $rembuf) >= $len) { my $data = substr $rembuf, 0, $len; substr $$buf, 0, (length $prefix) + (length $data), ''; # $JSON::Syck::ImplicitUnicode = 1; $self->{packet_cb}->($self, JSON::Syck::Load ($data)); } else { return } } } sub send_data { my ($self, $data) = @_; # $JSON::Syck::ImplicitUnicode = 0; my $dump = JSON::Syck::Dump ($data); $self->write_data ((length $dump) . " " . $dump . "\015\012"); } sub write_data { my ($self, $data) = @_; return unless $self->{r}; my $cl = $self->{socket}; $self->{write_buffer} .= $data; unless ($self->{w}) { $self->{w} = AnyEvent->io (poll => 'w', fh => $cl, cb => sub { if (my $data = $self->{write_buffer}) { my $len = syswrite $cl, $data; unless ($len) { if (not defined $len) { warn "error when writing data on $self->{host}:$self->{port}: $!"; return; } else { delete $self->{w}; } } if ($len == length $self->{write_buffer}) { delete $self->{w}; } $self->{write_buffer} = substr $self->{write_buffer}, $len; } }); } } 1;