#!/usr/bin/perl
# nagios: -epn

=head1 NAME

send_to_msg - A nagios check which grabs messages from the queue and
sends them to the messaging system using STOMP

=head1 SYNOPSIS

B<send_to_msg> [-v] [ --dir I<qdirectory> ] [ --prefix I<destination
     prefix>] [ --batch I<BATCH_SIZE> ] --broker-cache I<FILENAME> |
     --broker I<URI>  [ --config-cache I<file> ] 
     [ --config-table I<table-name> ] [ --config-prefix I<destination prefix> ]
     [ --alarm-dir I<qdirectory> ] [ --alarm-prefix I<destination prefix> ]
     [ --error-dir I<qdirectory> ] [ --persistent ]

=head1 DESCRIPTION

B<send_to_msg> is called as a Nagios check. It reads WLCG Monitoring
WG compatible I<metricOutput> messages from the queue, and submits
them to the messaging system, and then exits. Furthermore it reads
generated Nagios configuration from SQLite based cache and publish it
to the messaging system.

If not provided, the directory defaults to
C</var/spool/nagios-msg-bridge/outgoing/>.

If configuration cache file is not provided, it defaults to:
C</var/cache/msg/config-cache/config.db>. Configuration table
defaults to: C<config_outgoing>.

If not provided, the directory with alarms defaults to
C</var/spool/nagios-msg-bridge/outgoing_alarms/>.

In case it cannot send a message, a warning is issued and the message
is moved to the "error queue", which can be controlled using
I<--error-dir>.

The broker to publish to is either provided directly on the command
line (I<--broker> ), or cached in a file (I<--broker-cache>) which
contains a list of brokers URLs, one per line. It can be created by
the I<msg-brokers> script. The broker on the command line takes
precedence over those specified in the cache file.

If not provided, the destination defaults to
C</topic/grid.probe.metricOutput.EGEE>.  A destination is created by:
  * append the role (e.g. C<Site>, C<ROC>, C<Project>, C<VO>)
  * append C<.SITE_NAME> to the prefix.

Note that since C<'.'> is a hierarchical separator in the messaging
system, we replace all instances of C<'.'> with
C<'_'>. Eg. C<egee.srce.hr> becomes <egee_srce_hr>.

=head1 SEE ALSO

GridMon::MsgCache(3)
GridMon::ConfigCache(3)
handle_service_check(1)
msg-brokers(1)

=cut

use strict;
use warnings;
use Getopt::Long;
use Socket;
use IPC::DirQueue;
use Net::STOMP::Client;
use GridMon::MsgCache;
use GridMon::MetricOutput;
use GridMon::ConfigCache;
use JSON qw(to_json);
use Nagios::Plugin;
use Messaging::Message;
use Messaging::Message::Queue;
use Sys::Hostname qw(hostname);

use No::Worries::Date qw(date_string);
use GridMon::Nagios qw(nagios_debug);
use No::Worries::Syslog qw(syslog_open); 

$SIG{__DIE__}  = \&No::Worries::Die::handler; 
$SIG{__WARN__} = \&No::Worries::Warn::handler;

syslog_open(ident => "send_to_msg", facility => "user");
nagios_debug("started");

# Standard variables used in Nagios::Plugin constructor
use constant PROGNAME => "$0";
use constant VERSION => '1.2';
use constant DESCRIPTION => 'Check imports massages from dir queue to Nagios as passive checks.';
use constant EXTRA_DESC => "";
use constant LICENSE => 'This nagios plugin is free software, and comes with ABSOLUTELY NO WARRANTY.
It may be used, redistributed and/or modified under the terms of the GNU
General Public Licence (see http://www.fsf.org/licensing/licenses/gpl.txt).';
use constant SHORTNAME => 'SendToMsg';
use constant USAGE => "usage: $0 [ -v ] [ --dir I<qdirectory> ] [ --prefix I<destination prefix> ]
  [ --batch I<BATCH_SIZE> ] [ --broker-cache I<FILENAME> ] [ --broker I<HOSTNAME> ]
  [ --config-cache I<file> ] [ --config-table I<table-name> ] [ --config-prefix I<destination prefix> ]
  [ --alarm-dir I<qdirectory> ] [ --alarm-prefix I<destination prefix> ] [--persistent]\n";
use constant DEFAULT_BATCH_SIZE => 10000;
use constant DEFAULT_DIR => '/var/spool/msg-nagios-bridge/outgoing';
use constant DEFAULT_PREFIX => '/topic/grid.probe.metricOutput.EGEE';
use constant DEFAULT_ALARM_DIR => '/var/spool/msg-nagios-bridge/outgoing_alarms';
use constant DEFAULT_ALARM_PREFIX => '/topic/grid.probe.notification';
use constant DEFAULT_CONFIG_CACHE => '/var/cache/msg/config-cache/config.db';
use constant DEFAULT_CONFIG_TABLE => 'config_outgoing';
use constant DEFAULT_CONFIG_PREFIX => '/topic/grid.config.metricOutput.EGEE';
use constant DEFAULT_ERROR_DIR => '/var/spool/msg-nagios-bridge/undelivered-messages';
use constant DEFAULT_OUTGOING_DIR => '/var/spool/msg-nagios-bridge/outgoing-messages';
use constant DEFAULT_BROKER_CACHE => '/var/cache/msg/broker-cache-file/broker-list';
use constant TIMEOUT => 0.1;

use constant HEADER_FAILED_TIME  => "send2msg-last-failed-sent-time";
use constant HEADER_FAILED_ERROR => "send2msg-last-failed-sent-error";
use constant HEADER_FAILED_COUNT => "send2msg-failed-sent-count";

# global variables
our(%Cache, %Count, $Plugin, $Broker, $BrokerString);
$Count{messages} = $Count{alarms} = $Count{configs} = $Count{errors} = 0;

#
# report a critical error [FIXME: call it when die()?]
#

sub critical ($) {
    my($message) = @_;

    alarm(0);
    $Plugin->nagios_exit(CRITICAL, $message);
}

#
# store a message that cannot be delivered [FIXME: dead code to be removed]
#

sub store_undelivered_message ($%) {
    my($error, %option) = @_;
    my($body, $msg);

    $error =~ s/\s+/ /g;
    $error =~ s/\s+$//;
    $body = delete($option{body}) || "";
    $msg = Messaging::Message->new(body => $body, header => \%option);
    $msg->header_field(HEADER_FAILED_ERROR, $error);
    file_message($msg);
}

#
# send a message reliably (using a receipt) [FIXME: dead code to be removed]
#

sub send_with_receipt (%) {
    my(%option) = @_;

    if ($Plugin->opts->verbose) {
	printf(STDERR "Sending message to %s\n", $option{destination} || "???");
	print(STDERR $option{body});
    }
    eval {
	$option{receipt} = $Broker->uuid();
	$Broker->send(%option);
	$Broker->wait_for_receipts(timeout => 1);
    };
    if ($@) {
	store_undelivered_message($@, %option);
	return(0);
    }
    return(1);
}

#
# send a message, storing failure information in the header
#

sub send_message ($) {
    my($msg) = @_;
    my(%option, $error, $count);

    if ($Plugin->opts->verbose) {
	printf(STDERR "Sending message to %s\n", $msg->header_field("destination") || "???");
    }
    %option = %{ $msg->header() };
    $option{body} = $msg->body();
    eval {
	$option{receipt} = $Broker->uuid();
	$Broker->send(%option);
	$Broker->wait_for_receipts(timeout => 1);
    };
    return(1) unless $@;
    $error = $@;
    $error =~ s/\s+/ /g;
    $error =~ s/\s+$//;
    $count = $msg->header_field(HEADER_FAILED_COUNT) || 0;
    $msg->header_field(HEADER_FAILED_TIME, time());
    $msg->header_field(HEADER_FAILED_ERROR, $error);
    $msg->header_field(HEADER_FAILED_COUNT, $count+1);
    return(0);
}

#
# file a message in case it could not be sent
#

sub file_message ($) {
    my($msg) = @_;
    my(%object, $hdr, $name);

    $Count{errors}++;
    $object{host} = hostname();
    $object{ts} = date_string();
    $object{error} = $msg->header_field(HEADER_FAILED_ERROR);
    $object{component} = "send_to_msg";
    # remove the internal data that we put in the header
    $hdr = $msg->header();
    foreach $name (keys(%$hdr)) {
	delete($hdr->{$name}) if $name =~ /^send2msg-/;
    }
    $object{message} = $msg->jsonify();
    $name = $Cache{errors}->add_message(Messaging::Message->new(
        header => {
	    "destination" => "/queue/grid.error.messages",
	    "persistent"  => "true",
	    "expires"     => 1000 * (time() + 604800), # 1 week
	    "mig-type"    => "message-with-error",
	},
	body => to_json(\%object),
	text => 1,
    ));
    if ($Plugin->opts->verbose) {
	printf(STDERR "Filed message as %s/%s\n", $Cache{errors}->path(), $name);
    }
}

#
# Nagios setup
#

$Plugin = Nagios::Plugin->new(usage     => USAGE,
			      shortname => SHORTNAME,
			      version   => VERSION,
			      blurb     => DESCRIPTION,
			      extra     => EXTRA_DESC,
			      license   => LICENSE,
			      plugin    => PROGNAME);
$Plugin->add_arg(
    spec => 'batch=s',
    help => "--batch\n   Max number of messages that will be processed.\n   (default: ".DEFAULT_BATCH_SIZE.")",
    required => 0,
    default => DEFAULT_BATCH_SIZE
);                         
$Plugin->add_arg(
    spec => 'dir=s',
    help => "--dir\n   Queue directory.\n   (default: ".DEFAULT_DIR.")",
    required => 0,
    default => DEFAULT_DIR
);
$Plugin->add_arg(
    spec => 'prefix=s',
    help => "--prefix\n   Topic prefix for probe results.\n   (default: ".DEFAULT_PREFIX.")",
    required => 0,
    default => DEFAULT_PREFIX
);
$Plugin->add_arg(
    spec => 'config-cache=s',
    help => "--config-cache\n   Configuration cache file (SQLite database).\n   (default: ".DEFAULT_CONFIG_CACHE.")",
    required => 0,
    default => DEFAULT_CONFIG_CACHE
);
$Plugin->add_arg(
    spec => 'config-table=s',
    help => "--config-table\n   Configuration table in database.\n   (default: ".DEFAULT_CONFIG_TABLE.")",
    required => 0,
    default => DEFAULT_CONFIG_TABLE
);
$Plugin->add_arg(
    spec => 'config-prefix=s',
    help => "--config-prefix\n   Topic prefix for configuration.\n   (default: ".DEFAULT_CONFIG_PREFIX.")",
    required => 0,
    default => DEFAULT_CONFIG_PREFIX
);
$Plugin->add_arg(
    spec => 'alarm-dir=s',
    help => "--alarm-dir\n   Queue directory for alarms.\n   (default: ".DEFAULT_ALARM_DIR.")",
    required => 0,
    default => DEFAULT_ALARM_DIR
);
$Plugin->add_arg(
    spec => 'alarm-prefix=s',
    help => "--alarm-prefix\n   Topic prefix for alarms.\n   (default: ".DEFAULT_ALARM_PREFIX.")",
    required => 0,
    default => DEFAULT_ALARM_PREFIX
);
$Plugin->add_arg(
    spec => 'error-dir=s',
    help => "--error-dir\n   Queue directory for messages that triggered errors.\n   (default: ".DEFAULT_ERROR_DIR.")",
    required => 0,
    default => DEFAULT_ERROR_DIR
);
$Plugin->add_arg(
    spec => 'broker-cache=s',
    help => "--broker-cache\n   MSG broker cache file.\n   (default: ".DEFAULT_BROKER_CACHE.")",
    required => 0,
    default => DEFAULT_BROKER_CACHE
);
$Plugin->add_arg(
    spec => 'broker=s',
    help => "--broker\n   MSG broker address. Overrides brokers from cache file.\n   (default: undef)",
    required => 0
);
$Plugin->add_arg(
    spec => 'persistent',
    help => "--persistent\n   If defined send_to_msg will send persistant messages to all topics/queues.\n   (default: undef)",
    required => 0
);

$Plugin->getopts();

local $SIG{ALRM} = sub {
    $Plugin->nagios_die("Timeout while fetching results.");
};

local $SIG{TERM} = sub {
    $Plugin->nagios_die("Plugin received TERM signal.");
};

alarm($Plugin->opts->timeout);

#
# caches setup
#

eval {
    $Cache{messages} = GridMon::MsgCache->new({ dir => $Plugin->opts->get('dir') });
};
if ($@) {
    critical("Error creating messages cache object: $@");
}

eval {
    $Cache{alarms} = GridMon::MsgCache->new({ dir => $Plugin->opts->get('alarm-dir') });
};
if ($@) {
    critical("Error creating alarms cache object: $@");
}

eval {
    $Cache{errors} = Messaging::Message::Queue->new(type => 'DQS', path => $Plugin->opts->get("error-dir"));
};
if ($@) {
    critical("Error creating error message queue: $@");
}

$Cache{configs} = GridMon::ConfigCache->new({
    CACHE_FILE  => $Plugin->opts->get('config-cache'),
    CACHE_TABLE => $Plugin->opts->get('config-table'),
});
unless ($Cache{configs}) {
    critical("Error creating configs cache object");
}

# FIXME: after the transition phase, this should get the same command
# line option as the main outgoing directory (i.e. --dir)
eval {
    $Cache{outgoing} = Messaging::Message::Queue->new(type => 'DQS', path => DEFAULT_OUTGOING_DIR);
};
if ($@) {
    critical("Error creating outgoing message queue: $@");
}

#
# connecting
#

eval {
    my($path, $addr);

    if (defined($Plugin->opts->broker)) {
    	print "Connecting to ".$Plugin->opts->broker."\n" if $Plugin->opts->verbose;
    	$Broker = Net::STOMP::Client->new(uri => $Plugin->opts->broker);
    } else {
	$path = $Plugin->opts->get('broker-cache');
	die("Broker URI is not defined and broker cache file is empty: $path\n")
	    unless -s $path;
    	print "Using broker cache file: $path\n" if $Plugin->opts->verbose;
	$Broker = Net::STOMP::Client->new(uri => "file:$path");
    }
    $Broker->error_callback(sub {
	my($self, $frame) = @_;
	my($text);
	$text = $frame->body() || $frame->header("message") || "???";
	if ($frame->header("destination")) {
	    $text .= " on destination '" . $frame->header("destination") . "'";
	} else {
	    $text .= ". No destination was specified in the original message";
	}
	$text .= " with receipt-id '" . $frame->header("receipt-id") . "'"
	    if $frame->header("receipt-id");
	die("STOMP ERROR: $text\n");
    });
    $addr = inet_aton($Broker->host());
    $BrokerString = gethostbyaddr($addr, AF_INET) || $Broker->host();
    $BrokerString .= ":" . $Broker->port();
    print "Connected to $BrokerString\n" if $Plugin->opts->verbose;
};
if ($@) {
    critical("Error creating Net::STOMP::Client object: $@");
}

eval {
    $Broker->connect();
};
if ($@) {
    critical("Error connecting to broker $BrokerString: $@");
}

#
# sending messages [FIXME: dead code to be removed]
#

{
    my($failures, $message, %option, $site);

    $failures = 0;
    while ($Count{messages} < $Plugin->opts->batch) {
	eval {
	    $message = $Cache{messages}->next_message(TIMEOUT);
	};
	if ($@) {
	    $failures++;
	    if ($failures > 3) {
		critical("Error fetching messages from queue: $@");
	    } else {
		next;
	    }
	}
	last unless $message;
	# now we do have something to send
	%option = %{ $message->{metadata} };
	$option{body} = $message->get_data();
	$option{persistent} = "true"
	    if $Plugin->opts->persistent;
	if ($option{sitename}) {
	    $site = $option{sitename};
	    # and change '.' to '_'
	    $site =~ s/\./_/g;
	    if ($option{role}) {
		$option{destination} = $Plugin->opts->prefix . ".$option{role}.$site";
		send_with_receipt(%option) and $Count{messages}++;
	    } else {
		store_undelivered_message("Missing 'role' header", %option);
	    }
	} else {
	    store_undelivered_message("Missing 'sitename' header", %option);
	}
	$message->finish();
    }
}

#
# sending alarms [FIXME: dead code to be removed]
#

{
    my($failures, $message, %option);

    $failures = 0;
    while ($Count{messages} + $Count{alarms} < $Plugin->opts->batch) {
	eval {
	    $message = $Cache{alarms}->next_message(TIMEOUT);
	};
	if ($@) {
	    $failures++;
	    if ($failures > 3) {
		critical("Error fetching alarms from queue: $@");
	    } else {
		next;
	    }
	}
	last unless $message;
	# now we do have something to send
	%option = %{ $message->{metadata} };
	$option{body} = $message->get_data();
	$option{persistent} = "true"
	    if $Plugin->opts->persistent;
	$option{destination} = $Plugin->opts->get('alarm-prefix')
	    unless defined($option{destination});
	send_with_receipt(%option) and $Count{alarms}++;
	$message->finish();
    }
}

#
# sending configs [FIXME: dead code to be removed]
#

{
    my($status, $result, $tuple, $site, $msg);

    ($status, $result) = $Cache{configs}->getUpdated();
    critical($result) unless $status;
    critical("unexpected GridMon::ConfigCache result: $result")
	unless ref($result) eq "ARRAY";
    foreach $tuple (@$result) {
	last unless $Count{messages} + $Count{alarms} + $Count{configs} < $Plugin->opts->batch;
	$msg = Messaging::Message->new();
	$msg->body($tuple->{config});
	$msg->header_field("persistent", "true")
	    if $Plugin->opts->persistent;
	if ($tuple->{sitename}) {
	    $site = $tuple->{sitename};
	    # and change '.' to '_'
	    $site =~ s/\./_/g;
	    if ($tuple->{role}) {
		$msg->header_field("destination", $Plugin->opts->get("config-prefix") . ".$tuple->{role}.$site");
		# so far so good, we add it to the outgoing queue...
		$Cache{outgoing}->add_message($msg);
		$Count{configs}++;
	    } else {
		$msg->header_field(HEADER_FAILED_ERROR, "missing role");
		file_message($msg);
	    }
	} else {
	    $msg->header_field(HEADER_FAILED_ERROR, "missing sitename");
	    file_message($msg);
	}
	# in any case we are done with this one...
	$Cache{configs}->updateProcessed($tuple->{sitename}, $tuple->{hostname});
    }
}

#
# sending messages
#

{
    my($mq, $name, $msg, $time);

    $mq = $Cache{outgoing};
    $mq->purge() if $mq->count() < 1000;
    for ($name = $mq->first(); $name; $name = $mq->next()) {
	last unless $Count{messages} + $Count{alarms} + $Count{configs} < $Plugin->opts->batch;
	next unless $mq->lock($name);
	$msg = $mq->get_message($name);
	$time = $msg->header_field(HEADER_FAILED_TIME);
	if ($time and $time > time() - 300) {
	    # too fresh failed sent, keep it for later
	    $mq->unlock($name);
	    next;
	}
	$msg->header_field("persistent", "true")
	    if $Plugin->opts->persistent;
	if (send_message($msg)) {
	    # successful sent
	    $Count{messages}++;
	} else {
	    # failed sent
	    if ($msg->header_field(HEADER_FAILED_COUNT) >= 3) {
		# failed too many times
		file_message($msg);
	    } else {
		# add it back
		$mq->add_message($msg);
	    }
	}
	# in any case, the message has been processed...
	$mq->remove($name);
    }
}

#
# disconnecting
#

eval {
    $Broker->disconnect();
};
if ($@) {
    critical("Error disconnecting from broker: $@");
}

#
# Nagios cleanup
#

alarm(0);

$Plugin->add_perfdata( label => "messages", value => $Count{messages} );
$Plugin->add_perfdata( label => "alarms",   value => $Count{alarms}   );
$Plugin->add_perfdata( label => "configs",  value => $Count{configs}  );
$Plugin->add_perfdata( label => "errors",   value => $Count{errors}   );

my(@done, $message);
push(@done, "$Count{messages} messages")      if $Count{messages};
push(@done, "$Count{alarms} alarms")          if $Count{alarms};
push(@done, "$Count{configs} configurations") if $Count{configs};
if (@done) {
    $message = "Successfully sent " . join(", ", @done);
} else {
    $message = "No messages have been sent";
}
$message .= " to $BrokerString.";
$message .= " Failed to send $Count{errors} times." if $Count{errors};
$Plugin->nagios_exit($Count{errors} ? WARNING : OK, $message);
