ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/LogCatcher.pm
(Generate patch)

Comparing AnyEvent-MP/MP/LogCatcher.pm (file contents):
Revision 1.1 by root, Sun Sep 6 00:13:21 2009 UTC vs.
Revision 1.7 by root, Thu Mar 22 22:57:50 2012 UTC

6 6
7 use AnyEvent::MP::LogCatcher; 7 use AnyEvent::MP::LogCatcher;
8 8
9=head1 DESCRIPTION 9=head1 DESCRIPTION
10 10
11This relatively simple module overrides C<$AnyEvent::MP::Kernel::WARN> on 11This relatively simple module attaches itself to the
12every node and sends all log messages to the node running this service. 12C<$AnyEvent::Log::COLLECT> context on every node and sends all log
13messages to the node showing interest via the C<catch> function.
13 14
14No attempt to buffer log messages on connection loss, or retransmit lost 15No attempt to buffer log messages on connection loss, or to retransmit
15messages, is done. 16lost messages, is done.
16 17
17=head1 GLOBALS AND FUNCTIONS 18=head1 GLOBALS AND FUNCTIONS
18 19
19=over 4 20=over 4
20 21
25use common::sense; 26use common::sense;
26use Carp (); 27use Carp ();
27use POSIX (); 28use POSIX ();
28 29
29use AnyEvent (); 30use AnyEvent ();
31use AnyEvent::Log ();
30use AnyEvent::Util (); 32use AnyEvent::Util ();
31 33
32use AnyEvent::MP; 34use AnyEvent::MP;
33use AnyEvent::MP::Kernel; 35use AnyEvent::MP::Kernel;
34 36
35use base "Exporter"; 37use base "Exporter";
36 38
37$AnyEvent::MP::Kernel::WARN->(7, "starting log catcher service."); 39AE::log 7 => "starting log catcher service.";
38 40
39our $LOGLEVEL; 41our $LOGLEVEL;
42our $MON;
43our $PROPAGATE = 1; # set to one when messages ought to be send to remote nodes
40our %lport; # local logging ports 44our %LPORT; # local logging ports
41 45
42# other nodes connect via this 46# other nodes connect via this
43sub connect { 47sub connect {
44 my ($version, $rport, $loglevel) = @_; 48 my ($version, $rport, $loglevel) = @_;
45 49
46 my $cb = sub { 50 # context to catch log messages
51 my $ctx = new AnyEvent::Log::Ctx
52 title => "AnyEvent::MP::LogCatcher",
53 level => $loglevel,
54 log_cb => sub {
47 snd $rport, @_ 55 snd $rport, @{ $_[0] }
48 if $_[0] <= $loglevel; 56 if $PROPAGATE;
57 },
58 fmt_cb => sub {
59 [$_[0], $_[1]->title, $_[2], $_[3]]
60 },
61 ;
62
63 $AnyEvent::Log::COLLECT->attach ($ctx);
64
65 # monitor them, silently die if they die
66 mon $rport, sub {
67 $AnyEvent::Log::COLLECT->detach ($ctx);
49 }; 68 };
50 69
51 rcv $SELF, sub { };#d# 70 AE::log 8 => "starting to propagate log messages to $rport";
52
53 push @AnyEvent::MP::Kernel::WARN, $cb;
54
55 # monitor them, silently die
56 mon $rport, sub {
57 @AnyEvent::MP::Kernel::WARN = grep $_ != $cb, @AnyEvent::MP::Kernel::WARN;
58 };
59} 71}
60 72
61sub mon_node { 73sub mon_node {
62 my ($node, $is_up) = @_; 74 my ($node) = @_;
63 75
64 return unless $is_up; 76 # don't log messages from ourselves
77 return if $node eq $NODE;
65 78
66 my $lport = $lport{$node} = port { 79 $LPORT{$node} ||= do {
80 my $lport = port {
67 my ($level, $msg) = @_; 81 my ($time, $ctx, $level, $msg) = @_;
68 82
69 $msg =~ s/\n$//; 83 $level = 2 if $level < 2; # do not exit just because others do so
70 84
71 printf STDERR "%s [%s] <%d> %s\n", 85 my $diff = AE::now - $time;
72 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time), 86 $diff = (abs $diff) < 1e-3 ? "" : sprintf "%+.3fs", $diff;
73 $node,
74 $level,
75 $msg;
76 };
77 87
78 # establish connection 88 local $PROPAGATE; # do not propagate to other nodes
79 my $rport = spawn $node, "AnyEvent::MP::LogCatcher::connect", 0, $lport, $LOGLEVEL; 89 (AnyEvent::Log::ctx $ctx)->log ($level, "[$node$diff] $msg");
90 };
80 91
81 mon $rport, $lport;
82 mon $lport, sub { 92 mon $lport, sub {
83 warn "$lport $rport <@_>\n";#d# 93 delete $LPORT{$node}
94 or return; # do not monitor if node is not there
95 AE::log error => "@_"
96 if @_; # log error if there really was one
97 mon_node ($node); # try to reocnnect
98 };
99
100 # establish connection
101 AnyEvent::MP::Kernel::snd_to_func $node, "AnyEvent::MP::LogCatcher::connect", 0, $lport, $LOGLEVEL;
102
103 mon $node, $lport;
104
105 $lport
84 }; 106 }
85} 107}
108
109=item AnyEvent::MP::LogCatcher::catch [$level]
110
111Starts catching all log messages from all nodes with level C<$level> or
112lower. If the C<$level> is C<undef>, then stop catching all messages
113again.
114
115Example: start a node that catches all messages (you might have to specify
116a suitable profile name).
117
118 AE_VERBOSE=9 aemp run profilename services '[["AnyEvent::MP::LogCatcher::catch",9]]'
119
120=cut
86 121
87sub catch { 122sub catch {
88 $LOGLEVEL = $_[0]; 123 $LOGLEVEL = $_[0];
89 kil $_, "restart" for values %lport; 124 kil $_ for values %LPORT;
90 %lport = (); 125 %LPORT = ();
91 126
92 return unless defined $LOGLEVEL; 127 return unless defined $LOGLEVEL;
93 128
94 mon_node $_, 1 129 $MON = db_mon "'l" => sub {
95 for up_nodes; 130 my ($family, $add, $chg, $del) = @_;
96 131
97 mon_nodes \&mon_node; 132 kil delete $LPORT{$_}
133 for @$del;
134
135 mon_node $_
136 for @$add;
137 };
138
98 () 139 ()
99} 140}
100 141
101=back 142=back
143
144=head1 LOGGING
145
146AnyEvent::MP::LogCatcher logs messages from remote nodes. It logs them
147into the original logging context and prepends the origin node name
148and, if the time difference is larger than 1e-4 seconds, also the time
149difference between local time and origin time.
102 150
103=head1 SEE ALSO 151=head1 SEE ALSO
104 152
105L<AnyEvent::MP>. 153L<AnyEvent::MP>.
106 154

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines