Файловый менеджер - Редактировать - /usr/sbin/amavis-services
Назад
#!/usr/bin/perl -T #------------------------------------------------------------------------------ # This is amavis-services, a set of supervisor processes for amavisd-new. # # Author: Mark Martinec <Mark.Martinec@ijs.si> # # Copyright (c) 2012-2014, Mark Martinec # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright notice, # this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS # BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. # # The views and conclusions contained in the software and documentation are # those of the authors and should not be interpreted as representing official # policies, either expressed or implied, of the Jozef Stefan Institute. # (the above license is the 2-clause BSD license, also known as # a "Simplified BSD License", and pertains to this program only) # # Patches and problem reports are welcome. # The latest version of this program is available at: # http://www.ijs.si/software/amavisd/ #------------------------------------------------------------------------------ use strict; use re 'taint'; use warnings; use warnings FATAL => qw(utf8 void); no warnings 'uninitialized'; use vars qw($VERSION); $VERSION = 2.008002; use Errno qw(ESRCH ENOENT); use POSIX qw(strftime); use Time::HiRes (); use Unix::Syslog qw(:macros :subs); use vars qw($myproduct_name $myversion_id $myversion_date $myversion); use vars qw($MYHOME $idle_ttl $active_ttl $log_level); use vars qw($syslog_ident $syslog_facility); use vars qw($inner_sock_specs $outer_sock_specs $snmp_sock_specs); BEGIN { $myproduct_name = 'amavis-services'; $myversion_id = '2.9.0'; $myversion_date = '20140506'; $myversion = "$myproduct_name-$myversion_id ($myversion_date)"; } ### USER CONFIGURABLE: $log_level = 0; # 0..5 $syslog_facility = LOG_MAIL; $syslog_ident = $myproduct_name; $MYHOME = '/var/lib/amavis'; # A socket to which amavisd child processes report their data. # should match one of the sockets in @zmq_sockets in amavisd.conf $inner_sock_specs = "ipc://$MYHOME/amavisd-zmq.sock"; # A socket to which we forward summarized amavisd data. # should match a socket of the same name in amavis-status $outer_sock_specs = "tcp://127.0.0.1:23232"; # A socket on which we accept SNMP queries and respond to. # should match a socket of the same name in amavisd-snmp-subagent-zmq $snmp_sock_specs = "tcp://127.0.0.1:23233"; # tcp://*:23233 $idle_ttl = 4*60*60; # idle children are sent a SIGTERM # after this many seconds $active_ttl = 15*60; # stuck active children are sent a SIGTERM # after this many seconds ### END OF USER CONFIGURABLE use vars qw(@age_slots); BEGIN { @age_slots = ( 0.1, 0.2, 0.5, 1, 2, 4, 8, 15, 30, # seconds 1*60, 2*60, 4*60, 8*60, 15*60, 30*60, # minutes 1*3600, 2*3600, 4*3600, 8*3600, 15*3600, 30*3600); # hours } use vars qw($zmq_mod_name $zmq_mod_version $zmq_lib_version); BEGIN { my($zmq_major, $zmq_minor, $zmq_patch); if (eval { require ZMQ::LibZMQ3 && require ZMQ::Constants }) { $zmq_mod_name = 'ZMQ::LibZMQ3'; # new interface module to zmq v3 or libxs import ZMQ::LibZMQ3; import ZMQ::Constants qw(:all); ($zmq_major, $zmq_minor, $zmq_patch) = ZMQ::LibZMQ3::zmq_version(); # *zmq_sendmsg [native] # (socket,msgobj,flags) # *zmq_recvmsg [native] # (socket,flags) -> msgobj *zmq_sendstr = sub { # (socket,string,flags) my $rv = zmq_send($_[0], $_[1], length $_[1], $_[2]||0); $rv == -1 ? undef : $rv; }; } elsif (eval { require ZMQ::LibZMQ2 && require ZMQ::Constants }) { $zmq_mod_name = 'ZMQ::LibZMQ2'; # new interface module to zmq v2 import ZMQ::LibZMQ2; import ZMQ::Constants qw(:all); ($zmq_major, $zmq_minor, $zmq_patch) = ZMQ::LibZMQ2::zmq_version(); # zmq v2/v3 incompatibile renaming *zmq_sendmsg = \&ZMQ::LibZMQ2::zmq_send; # (socket,msgobj,flags) *zmq_recvmsg = \&ZMQ::LibZMQ2::zmq_recv; # (socket,flags) -> msgobj *zmq_sendstr = sub { # (socket,string,flags) my $rv = zmq_send(@_); $rv == -1 ? undef : $rv; }; } elsif (eval { require ZeroMQ::Constants && require ZeroMQ::Raw }) { $zmq_mod_name = 'ZeroMQ'; # old interface module to zmq v2 import ZeroMQ::Raw; import ZeroMQ::Constants qw(:all); ($zmq_major, $zmq_minor, $zmq_patch) = ZeroMQ::version(); # zmq v2/v3 incompatibile renaming *zmq_sendmsg = \&ZeroMQ::Raw::zmq_send; # (socket,msgobj,flags) *zmq_recvmsg = \&ZeroMQ::Raw::zmq_recv; # (socket,flags) -> msgobj *zmq_sendstr = sub { # (socket,string,flags) my $rv = zmq_send(@_); $rv == -1 ? undef : $rv; }; } else { die "Perl modules ZMQ::LibZMQ3 or ZMQ::LibZMQ2 or ZeroMQ not available\n"; } $zmq_mod_version = $zmq_mod_name->VERSION; $zmq_lib_version = join('.', $zmq_major, $zmq_minor, $zmq_patch); 1; } sub zmq_version { sprintf("%s %s, lib %s", $zmq_mod_name, $zmq_mod_version, $zmq_lib_version); }; sub zmq_recvstr { # (socket,buffer,offset) -> (len,more) my $sock = $_[0]; my $offset = $_[2] || 0; my $zm = zmq_recvmsg($sock); # a copy of a received msg obj if (!$zm) { substr($_[1],$offset) = ''; return } ($offset ? substr($_[1],$offset) : $_[1]) = zmq_msg_data($zm); my $len = length($_[1]) - $offset; zmq_msg_close($zm); return $len if !wantarray; my $more = zmq_getsockopt($sock, ZMQ_RCVMORE); if ($more == -1) { substr($_[1],$offset) = ''; return } ($len, $more); }; my($interrupted, $syslog_open, $foreground); my($zmq_ctx, $inner_sock, $outer_sock, $snmp_sock); my $zmq_poll_units = 1000; # milliseconds since zmq v3 $zmq_poll_units *= 1000 if $zmq_lib_version =~ /^[012]\./; # microseconds # Return untainted copy of a string (argument can be a string or a string ref) # sub untaint($) { return undef if !defined $_[0]; # must return undef even in a list context! no re 'taint'; local $1; # avoids Perl taint bug: tainted global $1 propagates taintedness (ref($_[0]) ? ${$_[0]} : $_[0]) =~ /^(.*)\z/s; $1; } sub ll($) { my($level) = @_; $level <= $log_level; } sub do_log($$;@) { # my($level,$errmsg,@args) = @_; my $level = shift; if ($level <= $log_level) { my $errmsg = shift; # treat $errmsg as sprintf format string if additional arguments provided $errmsg = sprintf($errmsg,@_) if @_; if (!$syslog_open) { $errmsg .= "\n"; print STDERR $errmsg; # print ignoring I/O status, except SIGPIPE } else { my $prio = $level >= 3 ? LOG_DEBUG # most frequent first : $level >= 1 ? LOG_INFO : $level >= 0 ? LOG_NOTICE : $level >= -1 ? LOG_WARNING : LOG_ERR; syslog($prio, "%s", $errmsg); } } } sub process_message($$) { my($process_states_ref,$msgstr_ref) = @_; if (!$msgstr_ref || !defined $$msgstr_ref) { # should not happen (except on a failure of zmq_recvmsg) } elsif ($$msgstr_ref =~ /^am\.st \d+\s+/s) { my($subscription_chan, $pid, $time, $state, $task_id) = split(' ', $$msgstr_ref); if ($state eq 'FLUSH') { do_log(0, "childproc_minder: FLUSH process states"); %$process_states_ref = (); # flush all kept state (e.g. on a restart) } elsif ($state eq 'exiting' || $state eq 'purged') { delete $process_states_ref->{$pid}; # may or may not exist } else { $state = ' ' if $state eq '-'; my $p = $process_states_ref->{$pid}; if ($p) { $p->{state} = $state; $p->{task_id} = $task_id; } else { # new process appeared $process_states_ref->{$pid} = $p = { state => $state, task_id => $task_id, timestamp => undef, base_timestamp => undef, }; } my $now = Time::HiRes::time; if ($time > 1e9) { # Unix time in seconds with fraction (> Y2000) $p->{base_timestamp} = $p->{timestamp} = $time; } elsif (!$p->{base_timestamp}) { # delta time but no base $p->{timestamp} = $now; $p->{base_timestamp} = $p->{timestamp} - $time/1000; # estimate } else { # delta time since base_timestamp in ms $p->{timestamp} = $p->{base_timestamp} + $time/1000; } $p->{tick} = $now; } } 1; } sub check_proc($) { my($process_states_ref) = @_; do_log(2, "CHECK"); my $cnt_gone = 0; my $cnt_terminated = 0; while (my($pid,$p) = each %$process_states_ref) { my $now = Time::HiRes::time; my $age = $now - $p->{base_timestamp}; my $idling = $p->{task_id} eq '' && $p->{state} =~ /^[. ]\z/s; my $overdue = $age > ($idling ? $idle_ttl : $active_ttl); my $n; # number of checked processes (0 or 1) if (!$overdue && $now - $p->{tick} < 10) { $n = 1; # recently heard from it, assume it is still there do_log(2, "PID %d skipped, recently heared from\n", $pid); } elsif (!$overdue && $idling && $p->{last_checked_timestamp} && $now - $p->{last_checked_timestamp} < 20) { $n = 1; # recently checked, idle, assume it is still there do_log(2, "PID %d skipped, recently checked\n", $pid); } elsif (!$overdue && $p->{last_checked_timestamp} && $now - $p->{last_checked_timestamp} < 10) { $n = 1; # recently checked, busy, assume it is still there do_log(2, "PID %d skipped, recently checked\n", $pid); } else { do_log(2, "PID %d checking\n", $pid); $p->{last_checked_timestamp} = $now; $n = kill(0,$pid); # test if the process is still there if ($n == 0) { # ESRCH means there is no such process if ($! != ESRCH) { do_log(-1, "Can't check the process %s: %s", $pid,$!); } elsif (defined $p->{sig_sent}) { $cnt_terminated++; do_log(2, "PID %d sucessfully terminated by SIG%s, %s\n", $pid, $p->{sig_sent}, $p->{task_id} || $p->{state}); } else { $cnt_gone++; do_log(0, "PID %d went away, %s\n", $pid, $p->{task_id} || $p->{state} ); } delete $process_states_ref->{$pid}; defined zmq_sendstr($inner_sock, sprintf('am.st %s %014.3f purged', $pid,$now)) or die "zmq_sendstr failed: $!" } } if ($n == 0) { # already dealt with } elsif (!$overdue) { # life is good do_log(2, "PID %d: %s\n", $pid, $p->{task_id} || $p->{state} ); } elsif (!$p->{sig_sent} || $p->{sig_sent_timestamp} + $p->{sig_sent_retry_in} >= $now) { # overdue, terminate or kill, or retry the killing if (!$p->{sig_sent}) { $p->{sig_sent} = 'TERM'; $p->{sig_sent_retry_in} = 20; } else { $p->{sig_sent} = 'KILL'; $p->{sig_sent_retry_in} *= 1.5; # increase the wait time for a retry } $p->{sig_sent_timestamp} = $now; if (kill($p->{sig_sent},$pid)) { do_log(2, "PID %d SIG%s, %s\n", $pid, $p->{sig_sent}, $p->{task_id} || $p->{state}); } elsif ($! == ESRCH) { # already gone by now, no fuss } else { warn "Can't $p->{sig_sent} the [$pid]: $!"; } if ($p->{sig_sent_retry_in} > 600) { do_log(2, "Giving up on PID %d, %s\n", $pid, $p->{task_id} || $p->{state}); delete $process_states_ref->{$pid}; defined zmq_sendstr($inner_sock, sprintf('am.st %s %014.3f purged', $pid,$now)) or die "zmq_sendstr failed: $!" } } } ($cnt_gone, $cnt_terminated); } # The childproc_minder process receives information about amavisd child # processes from an outer socket, forwarded there by a forwarding process. # Based on updates received, and based on its own processes sanity checks, # it maintains a state in memory of each amavisd child process. # # It must run on the same host as amavisd child processes in order to be # able to check for lost or crashed processes. # # It kills amavisd child processes which are active longer than $active_ttl # seconds, or are idling for more than $idle_ttl seconds. For killed or lost # processes it sends 'purged' messages to the inner socket, and periodically # sends a list of amavisd child process PIDs to the inner socket for the # benefit of more ephemeral clients. # sub childproc_minder() { do_log(5, "childproc-minder: zmq_init"); $zmq_ctx = zmq_init(1); $zmq_ctx or die "Can't create ZMQ context: $!"; do_log(5, "childproc-minder: creating outer ZMQ_SUB socket"); $outer_sock = zmq_socket($zmq_ctx, ZMQ_SUB); $outer_sock or die "Can't create outer ZMQ socket: $!"; do_log(5, "childproc-minder: zmq_setsockopt on outer socket"); zmq_setsockopt($outer_sock, ZMQ_LINGER, 2000) != -1 # milliseconds or die "zmq_setsockopt LINGER failed: $!"; my $outer_sock_ipv4only = 1; # a ZMQ default if (defined &ZMQ_IPV4ONLY && $outer_sock_specs =~ /:[0-9a-f]*:/i) { zmq_setsockopt($outer_sock, ZMQ_IPV4ONLY(), 0) != -1 or die "zmq_setsockopt failed: $!"; $outer_sock_ipv4only = 0; } zmq_setsockopt($outer_sock, ZMQ_SUBSCRIBE, 'am.st ') != -1 or die "zmq_setsockopt SUBSCRIBE failed: $!"; do_log(5, "childproc-minder: connecting to outer zmq socket %s%s", $outer_sock_specs, $outer_sock_ipv4only ? '' : ', IPv6 enabled'); zmq_connect($outer_sock, $outer_sock_specs) != -1 or die "zmq_connect to $outer_sock_specs failed: $!"; do_log(5, "childproc-minder: creating inner ZMQ_PUB socket"); $inner_sock = zmq_socket($zmq_ctx, ZMQ_PUB); $inner_sock or die "Can't create inner ZMQ socket: $!"; do_log(5, "childproc-minder: zmq_setsockopt on inner socket"); zmq_setsockopt($inner_sock, ZMQ_LINGER, 2000) != -1 # milliseconds or die "zmq_setsockopt LINGER failed: $!"; my $inner_sock_ipv4only = 1; # a ZMQ default if (defined &ZMQ_IPV4ONLY && $inner_sock_specs =~ /:[0-9a-f]*:/i) { zmq_setsockopt($inner_sock, ZMQ_IPV4ONLY(), 0) != -1 or die "zmq_setsockopt IPV4ONLY failed: $!"; $inner_sock_ipv4only = 0; } my $hwm = defined &ZMQ_SNDHWM ? ZMQ_SNDHWM() : defined &ZMQ_HWM ? ZMQ_HWM() : undef; if (defined $hwm) { zmq_setsockopt($inner_sock, $hwm, 100) != -1 or die "zmq_setsockopt HWM failed: $!"; } do_log(5, "childproc-minder: connecting to inner zmq socket %s%s", $inner_sock_specs, $inner_sock_ipv4only ? '' : ', IPv6 enabled'); zmq_connect($inner_sock, $inner_sock_specs) != -1 or die "zmq_connect to $inner_sock_specs failed: $!"; sleep 1; # a crude way to avoid a "slow joiner" syndrome #*** my $last_checked = 0; my $last_proclist_sent = 0; my %process_states; # associative array on pid do_log(5, "childproc-minder: sending FLUSH to inner zmq socket"); my $now = Time::HiRes::time; defined zmq_sendstr($inner_sock, sprintf('am.st %s %014.3f FLUSH', $$, $now)) or die "zmq_sendstr failed: $!"; do_log(5, "childproc-minder: entering event loop"); for (;;) { zmq_poll( [ { socket => $outer_sock, events => ZMQ_POLLIN, callback => sub { my($msgstr, $msgstr_l, $more); for (;;) { ($msgstr_l,$more) = zmq_recvstr($outer_sock,$msgstr); defined $msgstr_l or die "zmq_recvstr failed: $!"; do_log(5, "childproc-minder: got %s", $msgstr); process_message(\%process_states,\$msgstr); last if !$more; } }, }, ], 1 * $zmq_poll_units ) != -1 or die "zmq_poll failed: $!"; $now = Time::HiRes::time; if ($last_checked + 1 < $now && $last_proclist_sent + 2 < $now) { my($cnt_gone, $cnt_terminated) = check_proc(\%process_states); $last_checked = $now; my(@proc_idle_list, @proc_busy_list); my(@num_proc_busy_by_age, %num_proc_busy_by_activity); my $now = Time::HiRes::time; for my $pid (sort { $a <=> $b } keys %process_states) { my $p = $process_states{$pid}; my $s = $p->{state}; if ($p->{task_id} eq '' && $s =~ /^[. ]\z/s) { push(@proc_idle_list, $pid); } else { push(@proc_busy_list, $pid); $num_proc_busy_by_age[0]++; if (defined $p->{base_timestamp}) { my $age = $now - $p->{base_timestamp}; my $j = 1; for my $t (@age_slots) { if ($age >= $t) { $num_proc_busy_by_age[$j]++ } $j++; } } if ($s eq 'm' || $s eq 'd' || $s eq 'F') { $s = 'm' } elsif ($s eq 'D' || $s eq 'V' || $s eq 'S') { } else { $s = ' ' } $num_proc_busy_by_activity{$s}++; } } for my $j (0..@age_slots) { # age_slots start at 1, zero is extra send_gauge("ProcBusy$j", $num_proc_busy_by_age[$j] || 0, 1); } send_gauge('ProcBusyTransfer', $num_proc_busy_by_activity{'m'}||0, 1); send_gauge('ProcBusyDecode', $num_proc_busy_by_activity{'D'}||0, 1); send_gauge('ProcBusyVirus', $num_proc_busy_by_activity{'V'}||0, 1); send_gauge('ProcBusySpam', $num_proc_busy_by_activity{'S'}||0, 1); send_gauge('ProcBusyOther', $num_proc_busy_by_activity{' '}||0, 1); send_count('ProcGone', $cnt_gone, 1) if $cnt_gone; send_gauge('ProcAll', @proc_busy_list + @proc_idle_list, 1); send_gauge('ProcBusy', scalar @proc_busy_list, 1); send_gauge('ProcIdle', scalar @proc_idle_list, 0); # last chunk # must not mix different subscription prefixes # within the same multi-part message my $msg = 'am.proc.busy ' . join(' ',@proc_busy_list); do_log(5, "childproc-minder: sending %s", $msg); defined zmq_sendstr($inner_sock, $msg) or die "zmq_sendstr failed: $!"; $msg = 'am.proc.idle ' . join(' ',@proc_idle_list); do_log(5, "childproc-minder: sending %s", $msg); defined zmq_sendstr($inner_sock, $msg) or die "zmq_sendstr failed: $!"; $last_proclist_sent = $now; } } # forever # not reached } sub send_gauge($$;$) { my($name,$value,$more) = @_; defined zmq_sendstr($inner_sock, sprintf('am.nanny %s G32 %d', $name,$value), $more ? ZMQ_SNDMORE : 0) or die "zmq_sendstr (send_gauge $name) failed: $!"; } sub send_count($$;$) { my($name,$value,$more) = @_; defined zmq_sendstr($inner_sock, sprintf('am.nanny %s C32 %d', $name,$value), $more ? ZMQ_SNDMORE : 0) or die "zmq_sendstr (send_count $name) failed: $!"; } # snmp_responder listens to am.snmp and am.nanny messages reporting the # SNMP variable updates (as broadcast to the outer socket by a forwarding # process and child_minder), keeps evidence of a current value of each # SNMP variable, and also listens on a $snmp_sock_specs socket for queries # from amavisd-snmp-subagent-zmq, responding to each query by current values # of SNMP variables. # sub snmp_responder() { do_log(5, "snmp-responder: zmq_init"); $zmq_ctx = zmq_init(1); $zmq_ctx or die "Can't create ZMQ context: $!"; do_log(5, "snmp-responder: creating outer ZMQ_SUB socket"); $outer_sock = zmq_socket($zmq_ctx, ZMQ_SUB); $outer_sock or die "Can't create outer ZMQ socket: $!"; do_log(5, "snmp-responder: zmq_setsockopt on outer socket"); zmq_setsockopt($outer_sock, ZMQ_LINGER, 2000) != -1 # milliseconds or die "zmq_setsockopt LINGER failed: $!"; my $outer_sock_ipv4only = 1; # a ZMQ default if (defined &ZMQ_IPV4ONLY && $outer_sock_specs =~ /:[0-9a-f]*:/i) { zmq_setsockopt($outer_sock, ZMQ_IPV4ONLY(), 0) != -1 or die "zmq_setsockopt IPV4ONLY failed: $!"; $outer_sock_ipv4only = 0; } zmq_setsockopt($outer_sock, ZMQ_SUBSCRIBE, 'am.snmp ') != -1 or die "zmq_setsockopt SUBSCRIBE failed: $!"; zmq_setsockopt($outer_sock, ZMQ_SUBSCRIBE, 'am.nanny ') != -1 or die "zmq_setsockopt SUBSCRIBE failed: $!"; do_log(5, "snmp-responder: connecting to outer zmq socket %s%s", $outer_sock_specs, $outer_sock_ipv4only ? '' : ', IPv6 enabled'); zmq_connect($outer_sock, $outer_sock_specs) != -1 or die "zmq_connect to $outer_sock_specs failed: $!"; do_log(5, "snmp-responder: creating snmp ZMQ_REP socket"); $snmp_sock = zmq_socket($zmq_ctx, ZMQ_REP); $snmp_sock or die "Can't create ZMQ socket: $!"; do_log(5, "snmp-responder: zmq_setsockopt LINGER on snmp socket"); zmq_setsockopt($snmp_sock, ZMQ_LINGER, 2000) != -1 # milliseconds or die "zmq_setsockopt LINGER failed: $!"; my $snmp_sock_ipv4only = 1; # a ZMQ default if (defined &ZMQ_IPV4ONLY && $snmp_sock_specs =~ /:[0-9a-f]*:/i) { do_log(5, "snmp-responder: zmq_setsockopt IPV4ONLY on snmp socket"); zmq_setsockopt($snmp_sock, ZMQ_IPV4ONLY(), 0) != -1 or die "zmq_setsockopt IPV4ONLY failed: $!"; $snmp_sock_ipv4only = 0; } # my $hwm = defined &ZMQ_SNDHWM ? ZMQ_SNDHWM() # : defined &ZMQ_HWM ? ZMQ_HWM() : undef; # if (defined $hwm) { # do_log(5, "snmp-responder: zmq_setsockopt SNDHWM on snmp socket"); # zmq_setsockopt($inner_sock, $hwm, 2000) != -1 # or die "zmq_setsockopt HWM failed: $!"; # } do_log(5, "snmp-responder: binding to snmp zmq socket %s%s", $snmp_sock_specs, $snmp_sock_ipv4only ? '' : ', IPv6 enabled'); zmq_bind($snmp_sock, $snmp_sock_specs) != -1 or die "zmq_bind to $snmp_sock_specs failed: $!"; my(%snmp_var, %snmp_type); $snmp_var{'sysUpTime'} = int(time); # to be converted to TIM $snmp_type{'sysUpTime'} = 'INT'; $snmp_var{'sysObjectID'} = '1.3.6.1.4.1.15312.2'; $snmp_type{'sysObjectID'} = 'OID'; $snmp_var{'sysServices'} = 64; $snmp_type{'sysServices'} = 'INT'; do_log(5, "snmp-responder: entering event loop"); for (;;) { zmq_poll( [ { socket => $snmp_sock, events => ZMQ_POLLIN, callback => sub { # listen to queries # fetch a query of a form: "am.snmp?" or "am.nanny?" for (;;) { my($msgstr, $msgstr_l, $more); ($msgstr_l,$more) = zmq_recvstr($snmp_sock,$msgstr); defined $msgstr_l or die "zmq_recvstr failed: $!"; do_log(5, 'snmp-responder: %sgot "%s"', $more?'M':' ', $msgstr); if ($msgstr ne 'am.snmp?' && $msgstr ne 'am.nanny?') { do_log(2, 'snmp-responder: ignored "%s"', $msgstr); } else { my $chan = $msgstr; $chan =~ s/\?\z//; my $query_nanny = $chan eq 'am.nanny'; my $response; while (my($key,$val) = each(%snmp_var)) { next if $query_nanny ? $key !~ /^Proc/ : $key =~ /^Proc/; my $type = $snmp_type{$key}; if (!defined $val) { $type = $val = '?' } if (defined $response) { # previous do_log(2, 'snmp-responder: sending "%s"', $response); defined zmq_sendstr($snmp_sock,$response, ZMQ_SNDMORE) or die "zmq_sendstr failed: $!"; } $response = join(' ', $chan, $key, $type, $val); } if (defined $response) { defined zmq_sendstr($snmp_sock,$response) or die "zmq_sendstr failed: $!"; } } last if !$more; } }, }, { socket => $outer_sock, events => ZMQ_POLLIN, callback => sub { # listen to information updates for (;;) { my($msgstr, $msgstr_l, $more, $chan, $key, $type, $rest); ($msgstr_l,$more) = zmq_recvstr($outer_sock,$msgstr); defined $msgstr_l or die "zmq_recvstr failed: $!"; ($chan,$key,$type,$rest) = split(' ',$msgstr,4); if ($chan ne 'am.snmp' && $chan ne 'am.nanny') { do_log(5, "snmp_responder: ignored: %s", $msgstr); } elsif (!defined $key) { do_log(5, "snmp_responder: ignored, no key: %s", $msgstr); } elsif ($key eq 'FLUSH') { # amavisd cold start, flush SNMP variables do_log(0, "snmp_responder: FLUSH snmp data"); %snmp_var = (); %snmp_type = (); $snmp_var{'sysUpTime'} = int(time); # to be converted to TIM $snmp_type{'sysUpTime'} = 'INT'; $snmp_var{'sysObjectID'} = '1.3.6.1.4.1.15312.2'; $snmp_type{'sysObjectID'} = 'OID'; $snmp_var{'sysServices'} = 64; $snmp_type{'sysServices'} = 'INT'; } elsif (!$snmp_var{$key}) { $snmp_var{$key} = $rest; $snmp_type{$key} = $type; } elsif ($type =~ /^(C32|C64|TIM)\z/) { # a counter $snmp_var{$key} += $rest; } else { $snmp_var{$key} = $rest; # string, gauge, absolute value } last if !$more; } }, }, ], -1, # blocking (no timeout) ) != -1 or die "zmq_poll failed: $!"; } # not reached } # msg_forwarder forwards messages from inner ZMQ socket to outer ZMQ socket. # Binding (not connecting) to both sockets provides a single stable point # in the system. # # Amavisd child processes are dynamic and connect to the inner socket, # supplying information. Similarly the childproc_minder process occasionally # feeds its supplementaty information updates to this inner socket. # # Dynamic clients like amavisd-nanny, amavisd-snmp-subagent, amavisd-agent, # and a childproc_minder process connect to the outer socket to receive # information from there. # sub msg_forwarder() { do_log(5, "msg-forwarder: zmq_init"); $zmq_ctx = zmq_init(1); $zmq_ctx or die "Can't create a ZMQ context"; # receive from amavisd child processes do_log(5, "msg-forwarder: creating inner ZMQ_SUB socket"); $inner_sock = zmq_socket($zmq_ctx, ZMQ_SUB); $inner_sock or die "Error creating inner ZMQ_SUB socket: $!"; do_log(5, "msg-forwarder: zmq_setsockopt on inner socket"); zmq_setsockopt($inner_sock, ZMQ_LINGER, 2000) != -1 # milliseconds or die "zmq_setsockopt LINGER failed: $!"; my $inner_sock_ipv4only = 1; # a ZMQ default if (defined &ZMQ_IPV4ONLY && $inner_sock_specs =~ /:[0-9a-f]*:/i) { zmq_setsockopt($inner_sock, ZMQ_IPV4ONLY(), 0) != -1 or die "zmq_setsockopt IPV4ONLY failed: $!"; $inner_sock_ipv4only = 0; } zmq_setsockopt($inner_sock, ZMQ_SUBSCRIBE, '') != -1 or die "zmq_setsockopt SUBSCRIBE failed: $!"; # zmq_setsockopt($inner_sock, ZMQ_SUBSCRIBE, 'am.log.') != -1 # or die "zmq_setsockopt SUBSCRIBE failed: $!"; # zmq_setsockopt($inner_sock, ZMQ_SUBSCRIBE, 'am.proc.') != -1 # or die "zmq_setsockopt SUBSCRIBE failed: $!"; # zmq_setsockopt($inner_sock, ZMQ_SUBSCRIBE, 'am.st ') != -1 # or die "zmq_setsockopt SUBSCRIBE failed: $!"; # zmq_setsockopt($inner_sock, ZMQ_SUBSCRIBE, 'am.snmp ') != -1 # or die "zmq_setsockopt SUBSCRIBE failed: $!"; # zmq_setsockopt($inner_sock, ZMQ_SUBSCRIBE, 'am.nanny ') != -1 # or die "zmq_setsockopt SUBSCRIBE failed: $!"; do_log(5, "msg-forwarder: binding to inner zmq socket %s%s", $inner_sock_specs, $inner_sock_ipv4only ? '' : ', IPv6 enabled'); zmq_bind($inner_sock, $inner_sock_specs) != -1 or die "zmq_bind to $inner_sock_specs failed: $!"; # forward to a public outer socket # to clients like amavisd-nanny, amavisd-agent, amavisd-snmp-subagent do_log(5, "msg-forwarder: creating outer ZMQ_PUB socket"); $outer_sock = zmq_socket($zmq_ctx, ZMQ_PUB); $outer_sock or die "Error creating outer ZMQ_PUB socket: $!"; do_log(5, "msg-forwarder: zmq_setsockopt on outer socket"); zmq_setsockopt($outer_sock, ZMQ_LINGER, 2000) != -1 # milliseconds or die "zmq_setsockopt LINGER failed: $!"; my $outer_sock_ipv4only = 1; # a ZMQ default if (defined &ZMQ_IPV4ONLY && $outer_sock_specs =~ /:[0-9a-f]*:/i) { zmq_setsockopt($outer_sock, ZMQ_IPV4ONLY(), 0) != -1 or die "zmq_setsockopt IPV4ONLY failed: $!"; $outer_sock_ipv4only = 0; } do_log(5, "msg-forwarder: binding to outer zmq socket %s%s", $outer_sock_specs, $outer_sock_ipv4only ? '' : ', IPv6 enabled'); zmq_bind($outer_sock, $outer_sock_specs) != -1 or die "zmq_bind to $outer_sock_specs failed: $!"; # start forwarding # if (0) { if ($zmq_mod_name eq 'ZeroMQ' || $zmq_mod_name eq 'ZMQ::LibZMQ2') { do_log(5, "msg_forwarder: using a built-in zmq_device for forwarding"); zmq_device(ZMQ_FORWARDER, $inner_sock, $outer_sock); } else { # ZMQ_FORWARDER device is no longer available in 3.1 # 0MQ 3.2.1: zmq_device() deprecated and replaced by zmq_proxy() do_log(5, "msg_forwarder: start forwarding"); my $debug = $foreground && ll(5); if ($debug) { $| = 1; print "starting\n" } my $cnt = 0; for (;;) { # pass messages $cnt++; for (;;) { # pass one multi-part message my $zmsg = zmq_recvmsg($inner_sock); # a copy of a received msg obj $zmsg or die "zmq_recvmsg failed: $!"; my $more = zmq_getsockopt($inner_sock, ZMQ_RCVMORE); $more != -1 or die "zmq_getsockopt RCVMORE failed: $!"; # if ($debug && $zmsg) { # my $str = zmq_msg_data($zmsg); # copy and return as a perl scalar # printf("%s %s\n", $more?'M':' ', $str) if 1 || $str =~ /^am\.st /; # } # the zmq_sendmsg nullifies a message in a $zmsg object zmq_sendmsg($outer_sock, $zmsg, $more ? ZMQ_SNDMORE : 0) != -1 or die "zmq_sendmsg failed: $!"; zmq_msg_close($zmsg); # declares a msg obj as no longer required last if !$more; } if ($debug) { print '.'; printf(" %d\n", $cnt) if $cnt % 100 == 0; } } } # not reached } sub usage() { my $me = $0; local $1; $me =~ s{([^/]*)\z}{$1}s; "Usage: $me [-f] [-d log_level] (msg-forwarder|childproc-minder|snmp-responder)"; } # main program starts here my $normal_termination = 0; $SIG{'__DIE__' } = sub { if (!$^S) { my($m) = @_; chomp($m); do_log(-1,"_DIE: %s", $m) } }; $SIG{'__WARN__'} = sub { my($m) = @_; chomp($m); do_log(0,"_WARN: %s",$m) }; my $task_name; $foreground = 0; my(@argv) = @ARGV; # preserve @ARGV, may modify @argv while (@argv >= 2 && $argv[0] =~ /^-[d]\z/ || @argv >= 1 && $argv[0] =~ /^-/) { my($opt,$val); $opt = shift @argv; $val = shift @argv if $opt !~ /^-[hVf-]\z/; # these take no arguments if ($opt eq '--') { last; } elsif ($opt eq '-h') { # -h (help) printf STDERR ("%s\n\n%s\n", $myversion, usage()); } elsif ($opt eq '-V') { # -V (version) printf STDERR ("%s\n", $myversion); } elsif ($opt eq '-f') { $foreground = 1; } elsif ($opt eq '-d') { # -d log_level or -d SAdbg1,SAdbg2,..,SAdbg3 $val =~ /^\d+\z/ or die "Bad value for option -d: $val\n"; $log_level = untaint($val) if $val =~ /^\d+\z/; } else { printf STDERR ("Error in command line options: %s\n\n%s\n", $opt, usage()); exit 1; } } if (@argv == 1 && $argv[0] =~ /^(?:msg-forwarder|childproc-minder|snmp-responder)\z/) { $task_name = $argv[0]; } else { printf STDERR ("Error parsing a command line %s\n\n%s\n", join(' ',@ARGV), usage()); exit 1; } if (!$foreground) { openlog($syslog_ident, LOG_PID | LOG_NDELAY, $syslog_facility); $syslog_open = 1; } do_log(0, "%s task '%s' [%d] started. %s\n", $myversion, $task_name, $$, zmq_version()); eval { # catch TERM and INT signals for a controlled shutdown my $h = sub { $interrupted = $_[0]; die "\n" }; local $SIG{INT} = $h; local $SIG{TERM} = $h; if ($task_name eq 'msg-forwarder') { msg_forwarder(); } elsif ($task_name eq 'childproc-minder') { childproc_minder(); } elsif ($task_name eq 'snmp-responder') { snmp_responder(); } else { die "Unrecognized task name: $task_name"; } }; # until interrupted do_log(0, "Task '%s' [%d] shutting down", $task_name, $$); if ($inner_sock) { do_log(0, "%s closing inner socket", $task_name); zmq_setsockopt($inner_sock, ZMQ_LINGER, 0); # ignoring status zmq_close($inner_sock); # ignoring status } if ($outer_sock) { do_log(0, "%s closing outer socket", $task_name); zmq_setsockopt($outer_sock, ZMQ_LINGER, 0); # ignoring status zmq_close($outer_sock); # ignoring status } if ($snmp_sock) { do_log(0, "%s closing SNMP socket", $task_name); zmq_setsockopt($snmp_sock, ZMQ_LINGER, 0); # ignoring status zmq_close($snmp_sock); # ignoring status } if ($zmq_ctx) { do_log(0, "%s closing context", $task_name); zmq_term($zmq_ctx); # ignoring status } END { do_log(0, "Task '%s' [%d] exiting: %s", $task_name, $$, $interrupted) if !$normal_termination; if ($syslog_open) { closelog(); $syslog_open = 0 } }
| ver. 1.4 |
Github
|
.
| PHP 7.4.3-4ubuntu2.24 | Генерация страницы: 0.01 |
proxy
|
phpinfo
|
Настройка