#!/usr/bin/perl
# nagios: -epn

##############################################################################
#
# NAME:        send_to_db 
#
# FACILITY:    SAM (Service Availability Monitoring)
#
# COPYRIGHT:
#         Copyright (c) 2009, Members of the EGEE Collaboration.
#         http://www.eu-egee.org/partners/
#         Licensed under the Apache License, Version 2.0.
#         http://www.apache.org/licenses/LICENSE-2.0
#         This software is provided "as is", without warranties
#         or conditions of any kind, either express or implied.
#
# DESCRIPTION:
#
# AUTHORS:     Wojciech Lapka, CERN
#
# CREATED:     01-Oct-2009
#
# NOTES:
#
##############################################################################

=head1 NAME

send_to_db - A nagios check which grabs messages from the queue and sends them to the database

=head1 SYNOPSIS

B<send_to_db> --db_uri I<db_uri> --db_user I<db_user> --db_pwd I<db_password>  [ -v ] [ --dir I<qdirectory> ] 
  [ --batch I<BATCH_SIZE> ]

=head1 DESCRIPTION

B<send_to_db> is called as a Nagios check. It reads WLCG Monitoring WG
compatible I<metricOutput> messages from the queue, and inserts them
to the database, and then exits.

If not provided, the directory defaults to
C</var/spool/nagios2metricstore/data>.

=cut

use strict;
use warnings;
use DBI;
use File::Copy;
use Getopt::Long;
use GridMon::MsgCache;
use Messaging::Message::Queue;
use Messaging::Message;
use Nagios::Plugin;
use GridMon::sgutils qw($SERVICEURI $SERVICETYPE $METRICNAME
$LCG_LOCATION $VONAME $TIMEOUT $OUTPUT_TYPE @METRICS %ERRORS
%OUTPUT_TYPES %COMMANDS &print_revision &support &printOutput
&checkCommands &checkProxy &createProxy &checkHost &printWLCGList
&checkMetric &processCommand);
use GridMon::Nagios qw(nagios_debug);
use No::Worries::Syslog qw(syslog_open);

$SIG{__WARN__} = \&No::Worries::Warn::handler;
$SIG{__DIE__}  = \&No::Worries::Die::handler;

syslog_open(ident => "send_to_db", facility => "user");
nagios_debug("started");

use constant PROGNAME => "$0";
use constant VERSION => '1.1';
use constant DESCRIPTION => 'Check imports massages from dir queue to Metric Results Store.';
use constant DEFAULT_BATCH_SIZE => 20000;
use constant EXTRA_DESC => "";
use constant LICENSE => 'Apache 2.0 : http://www.apache.org/licenses/LICENSE-2.0';
use constant SHORTNAME => 'SendToDb';
use constant TIMEOUT => 0.1;
use constant USAGE => "usage: $0 --db_uri I<db_uri> --db_user I<db_user> --db_pwd I<db_password>  [ -v ] [ --dir I<qdirectory> ] 
  [ --batch I<BATCH_SIZE> ] \n";
# FIXME: after the transition phase, this as well as --dir should be for the new queue...
use constant DEFAULT_DIR => '/var/spool/nagios2metricstore/outgoing';
use constant REJECTED_DIR => '/var/spool/nagios2metricstore/rejected';

my(%columnsOrder, %columnNames);

%columnsOrder = ( 
   1 => "serviceflavour",
   2 => "metricname",
   3 => "metricstatus",
   4 => "summarydata",
   5 => "detailsdata",
   6 => "voname",
   7 => "vofqan",
   8 => "hostname",
   9 => "timestamp",
  10 => "gatheredat",
);

foreach my $key (sort keys(%columnsOrder)) {
  $columnNames{$columnsOrder{$key}} = $key;
}

# Create Nagios::Plugin instance
my $plugin = Nagios::Plugin->new (usage => USAGE,
                                  shortname => SHORTNAME,
                                  version => VERSION,
                                  blurb => DESCRIPTION,
                                  extra => EXTRA_DESC,
                                  license => LICENSE,
                                  plugin  => PROGNAME);

$plugin->add_arg(
    spec => 'batch=s',
    help => "--batch\n   Max number of messages that will be processed.\n   (default: ".DEFAULT_BATCH_SIZE.")",
    required => 0,
    default => DEFAULT_BATCH_SIZE
);                         
$plugin->add_arg(
    spec => 'dir=s',
    help => "--dir\n   Queue directory.\n   (default: ".DEFAULT_DIR.")",
    required => 0,
    default => DEFAULT_DIR
);
$plugin->add_arg(
    spec => 'db_user=s',
    help => "--db_user\n   DB User name.\n",
    required => 1,
);                         
$plugin->add_arg(
    spec => 'db_uri=s',
    help => "--db_uri\n   DB URI (e.g.mrs;host=HOSTNAME).\n",
    required => 1,
);                         
$plugin->add_arg(
    spec => 'db_pwd=s',
    help => "--db_pwd\n   DB password.\n",
    required => 1,
    default => "",
);                         

$plugin->getopts();

my $state = $ERRORS{OK};
$OUTPUT_TYPE = $OUTPUT_TYPES{NAGIOS};

local $SIG{ALRM} = sub {
    $plugin->nagios_die("Timeout occured during send_to_db.");
};

local $SIG{TERM} = sub {
    $plugin->nagios_die("Plugin received TERM signal.");
};

my $cache = GridMon::MsgCache->new({ dir => $plugin->opts->get('dir') });

sub connect_to_db () {
    my($dbh);

    $dbh = DBI->connect('DBI:mysql:'.$plugin->opts->get('db_uri'),
			$plugin->opts->get('db_user'), $plugin->opts->get('db_pwd'),
			{ AutoCommit => 0, RaiseError => 1 } );
    die("Could not connect to database: $DBI::errstr\n") unless $dbh;
    return($dbh);
}

sub prepare_db_sth ($) {
    my($dbh) = @_;
    my($sth);

    $sth = $dbh->prepare("call loadmetricdatatospool(?, ?, ?, ?, ?, ?, ?, ?, ?, ?);");
    return($sth);
}

sub insert_record ($$) {
  my($sth, $record) = @_;
  my($insertOK);

  $insertOK = 1; 
  foreach my $key (keys %columnNames) {
      if (exists($$record{$key})) {
	  $sth->bind_param($columnNames{$key}, $$record{$key});
      } else {
	  $sth->bind_param($columnNames{$key}, undef);
      }
  }
  eval {
      $sth->execute();
      $sth->finish();
  };
  if ($@) {
      $insertOK = 0;
      warn("Insert failed $@");
  }
  return($insertOK);
}

sub read_record ($) {
  my($msg) = @_;
  my(@lines, $line);
  my($key, $value, $multiline, %record, @tmp);

  $multiline = 0;

  @lines = split(/\n/ , $msg);

  foreach $line (@lines) {
    if ($line ne 'EOT') {
      if ($multiline == 1) { 
        $value .= "\n".$line; 
      } else {
        @tmp = undef;
        @tmp = split(/:/, $line);
        $key = $tmp[0];
        if ($#tmp > 1) {
          shift @tmp;
          $value = join(":", @tmp);
        } else {
          $value = $tmp[1];
        }        

        $key = lc($key);
        $value =~ s/^\s+//;
        if ($key eq 'detailsdata') {
          $multiline = 1;
        } else {
          $record{$key} = $value;
        }
      }
    }
  }
  if ($multiline == 1) { 
    $record{$key} = $value;
  }
  
  return(\%record);
}

sub old_insert_data ($$) { 
    my($dbh, $sth) = @_;
    my($countInserted, $countRejected);

    $countInserted = 0;
    $countRejected = 0;

    while (($countInserted+$countRejected) < $plugin->opts->batch) {
	my($message, $record, $insertOK);

	$message = $cache->next_message(TIMEOUT);
  
	last if !$message;
	$record = read_record($message->get_data());
  
	$insertOK = insert_record($sth, $record);
	print STDERR $message->get_data() if $plugin->opts->verbose;
  
	if ($insertOK == 1) {
	    $message->finish($message);
	    $countInserted += 1;
	} else {
	    copy($message->get_data_path, REJECTED_DIR) or die "File cannot be copied. $@";
	    print STDERR "Insert failed for message:\n". $message->get_data();
	    $message->finish($message);
	    $countRejected += 1;
	}
    }

    return($countInserted, $countRejected);
}

sub insert_data ($$) { 
    my($dbh, $sth) = @_;
    my($inserted, $rejected, $data_queue, $rejected_queue, $name, $msg, $record, $ok);

    $inserted = $rejected = 0;
    $data_queue     = Messaging::Message::Queue->new(type=> 'DQS', path => "/var/spool/nagios2metricstore/data");
    $rejected_queue = Messaging::Message::Queue->new(type=> 'DQS', path => "/var/spool/nagios2metricstore/rejected");
    $data_queue->purge() if $data_queue->count() < 1000;
    for ($name = $data_queue->first(); $name; $name = $data_queue->next()) {
	next unless $data_queue->lock($name);
	$msg = $data_queue->get_message($name);
	$record = read_record($msg->body());
	$ok = insert_record($sth, $record);
	if ($ok) {
	    $inserted++;
	} else {
	    $rejected++;
	    $rejected_queue->add_message($msg);
	}
	$data_queue->remove($name);
	last if $inserted + $rejected >= $plugin->opts->batch;
    }
    return($inserted, $rejected);
}

########################
#
# MAIN
#
########################

my($dbh, $sth, $totalInserted, $totalRejected, $ins, $rej, @msglist);

eval {
    $dbh = connect_to_db();
    $sth = prepare_db_sth($dbh);
};

if ($@) {
    warn("DB connect failed $@\n");
    $plugin->nagios_exit(CRITICAL, $@);
} else {
    $totalInserted = $totalRejected = 0;

    ($ins, $rej) = old_insert_data($dbh, $sth);
    nagios_debug("old queue: inserted=$ins rejected=$rej");
    $totalInserted += $ins;
    $totalRejected += $rej;

    ($ins, $rej) = insert_data($dbh, $sth);
    nagios_debug("new queue: inserted=$ins rejected=$rej");
    $totalInserted += $ins;
    $totalRejected += $rej;

    $dbh->disconnect();
    
    push(@msglist, "Successfully inserted $totalInserted messages.") if $totalInserted;
    push(@msglist, "Rejected $totalRejected messages.") if $totalRejected;
    push(@msglist, "No messages to send.") unless @msglist;
    
    $plugin->add_perfdata( label => "inserts", value => $totalInserted );
    $plugin->add_perfdata( label => "rejects", value => $totalRejected );
    
    if (($totalInserted >= 0 ) and ($totalRejected == 0)) {
	$plugin->nagios_exit(OK, "@msglist");
    } elsif (($totalInserted > 0 ) and ($totalRejected > 0)) {
	$plugin->nagios_exit(WARNING, "@msglist");
    } elsif (($totalInserted == 0 ) and ($totalRejected > 0)) {
	$plugin->nagios_exit(CRITICAL, "@msglist");
    }
}
