#!/usr/bin/perl
#
# Check imports massages from dir queue to Nagios as passive checks
# Copyright (c) 2009 Emir Imamagic
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

use strict;
use warnings;
use Nagios::Plugin;
use GridMon::MsgCache;
use GridMon::Nagios::Passive;
use Messaging::Message;
use Messaging::Message::Queue;

use GridMon::Nagios qw(nagios_debug);
use No::Worries::Syslog qw(syslog_open);

$SIG{__WARN__} = \&No::Worries::Warn::handler; 
$SIG{__DIE__}  = \&No::Worries::Die::handler;

syslog_open(ident => "recv_from_queue", facility => "user");
nagios_debug("started");

# Standard variables used in Nagios::Plugin constructor
use constant PROGNAME => "$0";
use constant VERSION => '1.0';
use constant DESCRIPTION => 'Check imports massages from dir queue to Nagios as passive checks.';
use constant EXTRA_DESC => "";
use constant LICENSE => 'This nagios plugin is free software, and comes with ABSOLUTELY NO WARRANTY.
It may be used, redistributed and/or modified under the terms of the GNU
General Public Licence (see http://www.fsf.org/licensing/licenses/gpl.txt).
Copyright (c) 2009 Emir Imamagic';
use constant SHORTNAME => 'RecvFromQueue';
use constant USAGE => "usage: $0 [ -v ] [ --dir I<qdirectory> ] [ --batch I<BATCH_SIZE> ]\n";
use constant DEFAULT_QUEUE_DIR => '/var/spool/msg-nagios-bridge/incoming';
use constant DEFAULT_BATCH_SIZE => 1000;
use constant TIMEOUT => 0.1;

# Create Nagios::Plugin instance
my $plugin = Nagios::Plugin->new (usage => USAGE,
                                  shortname => SHORTNAME,
                                  version => VERSION,
                                  blurb => DESCRIPTION,
                                  extra => EXTRA_DESC,
                                  license => LICENSE,
                                  plugin  => PROGNAME);
# Define additional arguments
$plugin->add_arg(
    spec => 'dir=s',
    help => "--dir\n   Queue directory.\n   (default: ".DEFAULT_QUEUE_DIR.")",
    required => 0,
    default => DEFAULT_QUEUE_DIR
);
$plugin->add_arg(
    spec => 'batch=s',
    help => "--batch\n   Max number of messages that will be processed.\n   (default: ".DEFAULT_BATCH_SIZE.")",
    required => 0,
    default => DEFAULT_BATCH_SIZE
);

$plugin->getopts();

local $SIG{ALRM} = sub {
    $plugin->nagios_die("Timeout while fetching results.");
};

local $SIG{TERM} = sub {
    $plugin->nagios_die("Plugin received TERM signal.");
};

alarm($plugin->opts->timeout);

my $cache;
my $count = 0;
my $message;
my $name;

eval {
    $cache = GridMon::MsgCache->new({ dir => $plugin->opts->dir });
};
if ($@) {
    alarm(0);
    $plugin->nagios_exit(CRITICAL, "Error creating Cache object: ".$@);
}
my $nagios = GridMon::Nagios::Passive->new();
my $failureCount = 0;

# FIXME: old code that should get removed at some point
while ($count < $plugin->opts->batch) {
    eval {
        $message = $cache->next_message(TIMEOUT);
    };
    if ($@) {
        $failureCount++;
        if ($failureCount>3) {
            $plugin->nagios_exit(CRITICAL, "Error fetching messages from queue: $@");
        } else {
            next;
        }
    }
    if (!$message) {
        last;
    }
    if ($plugin->opts->verbose) {
        print STDERR "Importing result: " . $message->get_data();
    }
    
    if (! $nagios->publishPassiveResultString($message->get_data())) {
        alarm(0);
        $plugin->nagios_exit(CRITICAL, "Error importing to Nagios after $count messages: ".$nagios->{ERROR});
    }
    $message->finish($message);
    
    $count++;
}

# loop over messages in the queue
$cache = Messaging::Message::Queue->new(type => 'DQS', path => $plugin->opts->dir);
$cache->purge() if $cache->count() < 1000;
for ($name = $cache->first(); $name; $name = $cache->next()) {
    last if $count >= $plugin->opts->batch;
    next unless $cache->lock($name);
    $message = $cache->get_message($name);
    unless ($nagios->publishPassiveResultString($message->body())) {
        alarm(0);
        $plugin->nagios_exit(CRITICAL, "Error importing to Nagios after $count messages: " . $nagios->{ERROR});
    }
    $count++;
    $cache->remove($name);
}

# so far so good
alarm(0);
$plugin->add_perfdata(label => "messages", value => $count);
if ($count == 0) {
    $plugin->nagios_exit(OK, "No messages imported.");
} else {
    $plugin->nagios_exit(OK, "Successfully imported $count messages.");
}

=head1 NAME

recv_from_queue - A nagios check which gathers check results from queue and imports them to Nagios as passive checks

=head1 SYNOPSIS

B<recv_from_queue> [-v] [ --dir I<qdirectory> ] [ --batch I<BATCH_SIZE> ]

=head1 DESCRIPTION

B<recv_from_queue> is called as a Nagios check. It reads WLCG
Monitoring WG compatible I<metricOutput> messages from the queue, and
imports them to Nagios as passive checks.

If not provided, the directory defaults to
C</var/spool/nagios-msg-bridge/incoming>.

=cut
