#!/usr/bin/perl
#+##############################################################################
#                                                                              #
# File: msg-to-handler                                                         #
#                                                                              #
# Description: receive messages from brokers and dispatch them to handlers     #
#                                                                              #
#-##############################################################################

#
# modules
#

use strict;
use warnings;
use Config::General;
use Data::Dumper;
use Getopt::Long;
use GridMon::MsgHandler;
use JSON qw(to_json);
use Messaging::Message::Queue;
use Messaging::Message;
use Net::STOMP::Client;
use POSIX qw(setsid);
use Pod::Usage;
use Sys::Hostname qw(hostname);

use No::Worries::Date qw(date_string); 
use No::Worries::Die qw(dief);
use No::Worries::Syslog qw(syslog_open syslog_close syslog_info syslog_debug);
use No::Worries::Warn qw(warnf);
use No::Worries::File qw(file_read);
use No::Worries::PidFile qw(pf_set pf_check pf_touch pf_unset pf_status pf_quit);

use Time::HiRes qw();

# FIXME: optionally save status information to a file (that can be read by Nagios)
# FIXME: flag controlling when syslog is used? (now: daemon => syslog)

#
# constants
#

use constant ALARM_COOKIE     => "AlArM\n";
use constant HANDLER_TIMEOUT  => 10;
use constant BROKER_SLEEP     => 30;
use constant NICELY           =>  0;
use constant SILENTLY         =>  1;
use constant BRUTALLY         =>  2;

#
# global variables
#

our(%Option, %Handler, $UseSyslog, $ErrorQueue, $Broker, $Cleaned, $Aborted, $AliveTime);

#
# report a debugging message
#

sub debug ($$@) {
    my($level, $format, @arguments) = @_;
    my($message);

    return unless $Option{debug} >= $level;
    if ($UseSyslog) {
	if ($level == 0) {
	    syslog_info($format, @arguments);
	} else {
	    syslog_debug($format, @arguments);
	}
	return;
    }
    # we print on stderr when we do not use syslog...
    $message = sprintf($format, @arguments);
    $message =~ s/\s+$//;
    if ($level) {
	print(STDERR "# $message\n");
    } else {
	print(STDERR "$message\n");
    }
}

#
# evaluate some code with an optional timeout
#

sub teval ($$) {
    my($timeout, $code) = @_;
    my(@result);

    if ($timeout) {
	eval {
	    local $SIG{ALRM} = sub { die(ALARM_COOKIE) };
	    alarm($timeout);
	    @result = $code->();
	    alarm(0);
	};
	# we must clear the alarm in case the code died...
	alarm(0);
    } else {
	eval {
	    @result = $code->();
	};
    }
    if ($@) {
	if ($@ eq ALARM_COOKIE) {
	    $@ = "timeout after $timeout seconds";
	} else {
	    chomp($@);
	}
    }
    return(@result);
}

#
# report (and maybe store) a message that could not be handled
#

sub unhandled_message ($$) {
    my($message, $reason) = @_;
    my($id, %object, $msg);

    # FIXME: shall we count such messages? (-> status)
    $id = $message->header("message-id");
    $reason =~ s/\s+/ /g;
    $reason =~ s/\s+$//;
    warnf("could not handle message %s: %s", $id, $reason);
    return unless $ErrorQueue;
    $msg = Messaging::Message->new(
	header => $message->headers(),
	body => $message->body(),
    );
    $object{host} = hostname();
    $object{ts} = date_string();
    $object{error} = $reason;
    $object{component} = "msg-to-handler";
    $object{message} = $msg->jsonify();
    $ErrorQueue->add_message(Messaging::Message->new(
        header => {
	    "destination" => "/queue/grid.error.messages",
	    "persistent"  => "true",
	    "expires"     => 1000 * (time() + 604800), # 1 week
	    "mig-type"    => "message-with-error",
	},
	body => to_json(\%object),
	text => 1,
    ));
}

#
# handle a received signal
#

sub signal_handler ($) {
    my($name) = @_;

    $Aborted++ if $name =~ /^(INT|TERM)$/;
#    error_die("received SIG%s", $name);
    dief("received SIG%s", $name);
}

#
# daemonise the current process: chdir(), background, setsid()...
#

sub daemonise () {
    my($pid, $sid);

    # chdir to a known place
    chdir("/") or dief("cannot chdir(/): %s", $!);
    # fork and let dad die
    $pid = fork();
    dief("cannot fork(): %s", $!) unless defined($pid);
    if ($pid) {
	# the parent does not need to perform any cleanup
	$Cleaned = 1;
	exit(0);
    }
    # create a new session
    $sid = setsid();
    dief("cannot setsid(): %s", $!) if $sid == -1;
    # use syslog
    $SIG{__WARN__} = \&No::Worries::Warn::handler;
    $SIG{__DIE__}  = \&No::Worries::Die::handler;
    syslog_open(ident => "msg-to-handler", facility => "user");
    $UseSyslog = 1;
    # detach std* from any terminal
    if (-t STDIN) {
	open(STDIN, "<", "/dev/null") or dief("cannot re-open stdin: %s", $!);
    }
    if (-t STDOUT) {
	open(STDOUT, ">", "/dev/null") or dief("cannot re-open stdout: %s", $!);
    }
    if (-t STDERR) {
	open(STDERR, ">", "/dev/null") or dief("cannot re-open stderr: %s", $!);
    }
}

#
# initialise
#

sub init () {
    # defaults
    $Cleaned = 1;
    $Data::Dumper::Terse = 1;
    $Option{debug} = 0;
    # options parsing
    GetOptions(
        "broker=s"       => \$Option{broker},
        "broker-cache=s" => \$Option{broker_cache},
        "config=s"       => \$Option{config},
        "daemon"         => \$Option{daemon},
        "debug|d+"       => \$Option{debug},
        "error-dir=s"    => \$Option{error_dir},
        "help|h|?"       => \$Option{help},
        "manual|m"       => \$Option{manual},
        "pid-file=s"     => \$Option{pid_file},
        "ping=i"         => \$Option{ping},
        "quit"           => \$Option{quit},
        "timeout=i"      => \$Option{timeout},
    ) or pod2usage(2);
    pod2usage(1) if $Option{help};
    pod2usage(exitstatus => 0, verbose => 2) if $Option{manual};
    pod2usage(2) if @ARGV;
    # optionally set Net::STOMP::Client debugging level
    $Net::STOMP::Client::Debug::Flags = -1 if $Option{debug} > 2;
}

#
# connect to a message broker and subscribe on behalf of all active handlers
#

sub connect_broker () {
    my($name, %option, $what, $subscribed);

    # clear error counters
    foreach $name (keys(%Handler)) {
	$Handler{$name}{errors} = 0;
    }
    # connect
    teval($Option{timeout}, sub {
	%option = (uri => $Option{broker});
	$option{timeout} = $Option{timeout} if $Option{timeout};
	$Broker = Net::STOMP::Client->new(%option);
	$Broker->message_callback(\&message_handler);
	$Broker->connect();
    });
    if ($@) {
	warnf("cannot connect to broker %s", $@);
	$Broker = undef;
	return();
    }
    debug(0, "connected to broker %s:%d", $Broker->host(), $Broker->port());
    # subscribe
    $subscribed = 0;
    foreach $name (keys(%Handler)) {
	next unless $Handler{$name}{active};
	%option = %{ $Handler{$name}{subscribe} };
	$what = join(" ", map("$_:$option{$_}", sort(keys(%option))));
	$option{id} = $name;
	$option{ack} = "client";
	$option{receipt} = $name . "-" . $Broker->uuid();
	teval($Option{timeout}, sub {
	    $Broker->subscribe(%option);
	});
	if ($@) {
	    warnf("cannot subscribe for %s to %s: %s", $name, $what, $@);
	    # FIXME: what shall we do exactly here?
	    #  - if broker error: disconnect & retry
	    #  - if handler config error: unregister it
	    disconnect_broker(SILENTLY);
	    return();
	}
	debug(1, "subscribed for %s to %s", $name, $what);
	$subscribed++;
    }
    # wait for the subscription receipts
    teval($Option{timeout}, sub {
	%option = ();
	$option{timeout} = $Option{timeout} if $Option{timeout};
	$Broker->wait_for_receipts(%option);
    });
    if ($@) {
	warnf("cannot check subscription receipts: %s", $@);
	disconnect_broker(SILENTLY);
	return();
    }
    if ($Broker->receipts()) {
	warnf("missing subscription receipts: %s", join(" ", $Broker->receipts()));
	disconnect_broker(SILENTLY);
	return();
    }
    # so far so good
    debug(0, "successfully registered %d handlers", $subscribed);
    $AliveTime = time();
    return($Broker);
}

#
# disconnect from message broker
#

sub disconnect_broker ($) {
    my($how) = @_;

    unless ($how == BRUTALLY) {
	teval($Option{timeout}, sub {
	    $Broker->disconnect();
	});
	if ($@) {
	    warnf("cannot disconnect from broker: %s", $@) unless $how == SILENTLY;
	} else {
	    debug(0, "disconnected from broker");
	}
    }
    $Broker = undef;
}

#
# check all the declared handlers
#

sub check_handlers () {
    my($name, $test, %loaded, $active);

    foreach $name (keys(%Handler)) {
	# check handler class
	$test = $Handler{$name}{class};
	dief("missing class for handler %s", $name) unless $test;
	dief("invalid class for handler %s: %s", $name, $test)
	    unless not ref($test) and $test =~ /^(\w+::)*\w+$/;
	unless ($loaded{$test}) {
	    eval("require $test");
	    dief("buggy class for handler %s: %s", $name, $test) if $@;
	    $loaded{$test}++;
	}
	# check optional instance options
	$test = $Handler{$name}{instance};
	if ($test) {
	    dief("invalid instance options for handler %s", $name)
		unless ref($test) eq "HASH";
	} else {
	    $Handler{$name}{instance} = {};
	}
	# check subscribe options hash
	$test = $Handler{$name}{subscribe};
	dief("invalid subscribe options for handler %s", $name)
	    unless $test and ref($test) eq "HASH" and keys(%$test);
	# check unexpected information
	foreach $test (keys(%{ $Handler{$name} })) {
	    next if $test =~ /^(class|instance|subscribe|active)$/;
	    dief("unexpected configuration for handler %s: %s", $name, $test);
	}
	# check active flag
	next unless $Handler{$name}{active} or not defined($Handler{$name}{active});
	$active++;
	# instantiate this handler
	$Handler{$name}{active} = $Handler{$name}{class}->new(%{ $Handler{$name}{instance} });
    }
    dief("aborting: no active handlers") unless $active;
}

#
# load the configuration file
#

sub load_config ($) {
    my($path) = @_;
    my($config, %config, $name, $option, $broker_options);

    dief("invalid configuration path: %s", $path)
	unless -f $path and -s _;
    eval {
	$config = Config::General->new(
	    -ConfigFile        => $path,
	    -AllowMultiOptions => "yes", 
	    -InterPolateVars   => "yes", 
	    -InterPolateEnv    => "yes",
	    -UseApacheInclude  => "yes",
	    -IncludeGlob       => "yes",
	);
	%config = $config->getall();
    };
    dief("cannot parse %s: %s", $path, $@) if $@;
    # handle <option>
    if ($config{option}) {
	dief("invalid <option> block in %s", $path)
	    unless ref($config{option}) eq "HASH";
	foreach $option (keys(%Option)) {
	    next unless $option =~ /^broker/;
	    $broker_options++ if defined($Option{$option});
	}
	foreach $name (keys(%{ $config{option} })) {
	    dief("unexpected option in %s: %s", $path, $name)
		unless $name =~ /^(broker|broker-cache|error-dir|pid-file|ping|timeout)$/
		   and not ref($config{option}{$name});
	    ($option = $name) =~ s/-/_/;
	    next if defined($Option{$option});
	    next if $option =~ /^broker/ and $broker_options;
	    $Option{$option} = $config{option}{$name};
	}
	delete($config{option});
    }
    # handle <handler>
    if ($config{handler}) {
	dief("invalid <handler> block in %s", $path)
	    unless ref($config{handler}) eq "HASH";
	foreach $name (keys(%{ $config{handler} })) {
	    dief("unexpected handler in %s: %s", $path, $name)
		unless $name =~ /^[\w\-\.]+$/ and ref($config{handler}{$name}) eq "HASH";
	    $Handler{$name} = $config{handler}{$name};
	}
	delete($config{handler});
    }
    # complain if we see extra configuration information
    if (keys(%config)) {
	($name) = keys(%config);
	dief("unexpected configuration in %s: %s", $path, $name);
    }
}

#
# setup everything
#

# sub pf_get ($) {                                                                        
#     my($path) = @_;                                                                     
#     my($contents);                                                             
#                                                                                         
#     return("") unless -e $path;                                                         
#     $contents = file_read($path) or return();                                           
#     if ($contents =~ /\A(\d+)\Z/) {                                                     
#         # only the pid                                                                  
#         return($1);                                                                     
#     }                                                                                   
#     if ($contents =~ /\A(\d+)\s+([a-z]+)\Z/) {                                          
#         # pid + action                                                                  
#         return($1, $2) if wantarray();                                                  
#         return($1);                                                                     
#     }                                                                                   
#     # unexpected                                                                        
#     warnf("unexpected pid file contents in %s: %s", $path, $contents);           
#     return();                                                                           
# }     


sub setup () {
    my(@status, $signal);

    load_config($Option{config}) if $Option{config};
    if ($Option{quit}) {
	    dief("missing --pid-file option for --quit") unless $Option{pid_file};
        @status = pf_status($Option{pid_file});
	    if ($status[0] == 1) {
            debug(0, "%s, told to quit", $status[1]);
            pf_quit($Option{pid_file});
	    } else {
	        debug(0, "msg-to-handler does not seem to be running");
	    }
	    exit(0);
    }
    if ($Option{debug} >= 2) {
	print(STDERR "# %Option = ", Dumper(\%Option));
	print(STDERR "# %Handler = ", Dumper(\%Handler));
    }
    check_handlers();
    if ($Option{broker}) {
	dief("options --broker and --broker-cache are mutually exclusive")
	    if $Option{broker_cache};
    } elsif ($Option{broker_cache}) {
	dief("invalid path for --broker-cache: %s", $Option{broker_cache})
	    unless substr($Option{broker_cache}, 0, 1) eq "/"
	       and -f $Option{broker_cache} and -s _;
	$Option{broker} = "file:$Option{broker_cache}";
    } else {
	dief("one of --broker or --broker-cache must be specified");
    }
    if ($Option{error_dir}) {
	$ErrorQueue = Messaging::Message::Queue->new(type => 'DQS', path => $Option{error_dir});
    }
    foreach $signal (qw(HUP INT PIPE TERM)) {
	$SIG{$signal} = \&signal_handler;
    }
    $Cleaned = 0;
    if ($Option{daemon}) {
	daemonise();
	pf_set($Option{pid_file}) if $Option{pid_file};
    }
    debug(0, "started");
}

#
# handle one message probe
#

sub probe_handler ($$) {
    my($broker, $message) = @_;
    my($name, %reply);

    foreach $name (qw(reply-to monitor.test
                      monitor.test.clientname monitor.test.serverid)) {
	$reply{$name} = $message->header($name);
	return("incomplete probe: missing $name header field")
	    unless $reply{$name};
    }
    $reply{destination} = delete($reply{"reply-to"});
    $reply{"monitor.test.timestamp"} = Time::HiRes::time();
    teval($Option{timeout}, sub {
	$broker->send(%reply);
    });
    return("cannot send probe reply: $@") if $@;
    # so far so good...
    return();
}

#
# handle one message frame
#

sub message_handler ($$) {
    my($broker, $message) = @_;
    my($time, $error, $name, $test, $client, $status, $reason);

    debug(1, "received message %s", $message->header("message-id") || "<unknown-id>");
    $time = Time::HiRes::time();
    $AliveTime = $time;
    $name = $message->header("subscription");
    unless ($name) {
	$error = "missing subscription header";
	goto the_end;
    }
    unless ($Handler{$name}) {
	$error = "unexpected subscription header: $name";
	goto the_end;
    }
    unless ($Handler{$name}{active}) {
	# this may happen while we de-activate a handler; we do nothing,
	# including not sending the ack
	debug(1, "ignored message for inactive handler %s", $name);
	return($broker);
    }
    $test = $message->header("monitor.test");
    $client = $message->header("monitor.test.clientname");
    if ($test and $client and $client eq "msg-to-handler") {
	# this is a monitoring probe and not a real message
	debug(1, "message is monitoring probe %s", $test);
	$error = probe_handler($broker, $message);
	goto the_end;
    }
    debug(1, "calling handler %s", $name);
    teval(HANDLER_TIMEOUT, sub {
	($status, $reason) =
	    $Handler{$name}{active}->handle($message->headers(), $message->body());
    });
    if ($@) {
	$error = "handler died: $@";
	$Handler{$name}{errors} += $error =~ /timeout after \d+ seconds/ ? 10 : 100;
    } else {
	$reason ||= "?";
	if ($status == GridMon::MsgHandler::SUCCESS) {
	    # good!
	    $Handler{$name}{errors}-- if $Handler{$name}{errors} > 0;
	} elsif ($status == GridMon::MsgHandler::WARNING) {
	    $error = "handler warning: $reason";
	    $Handler{$name}{errors} += 1;
	} elsif ($status == GridMon::MsgHandler::ERROR) {
	    $error = "handler error: $reason";
	    $Handler{$name}{errors} += 100;
	} else {
	    $error = "unexpected handler status: $status";
	    $Handler{$name}{errors} += 100;
	}
    }
  the_end:
    if ($error) {
	unhandled_message($message, $error);
    } else {
	$test = (Time::HiRes::time() - $time) * 1000.0;
	debug(1, "message handled in %.3f ms", $test);
    }
    # attempt to ack the message in any case
    teval($Option{timeout}, sub {
	$broker->ack(frame => $message);
    });
    warnf("cannot ack message: %s", $@) if $@;
    # attempt to unsubscribe in case of too many (or to severe) errors
    debug(2, "handler %s has %d errors", $name, $Handler{$name}{errors});
    if ($Handler{$name}{errors} >= 100) {
	warnf("disabling %s handler because of errors (%d)",
		       $name, $Handler{$name}{errors});
	teval($Option{timeout}, sub {
	    $broker->unsubscribe(id => $name);
	});
	warnf("cannot unsubscribe %s: %s", $name, $@) if $@;
	$Handler{$name}{active} = 0;
    }
    # return success in any case...
    return($broker);
}

#
# ping the broker if needed
#

sub ping () {
    my($tid);

    return unless $Option{ping} and $Broker;
    return if $AliveTime and $AliveTime + $Option{ping} > time();
    $tid = "ping-" . $Broker->uuid();
    teval($Option{timeout}, sub {
	$Broker->begin(transaction => $tid, receipt =>  $Broker->uuid());
	$Broker->wait_for_receipts(timeout => 1);
	$Broker->abort(transaction => $tid);
    });
    if ($@) {
	warnf("cannot ping broker: %s", $@);
	disconnect_broker(BRUTALLY);
	return;
    }
    if ($Broker->receipts()) {
	if ($AliveTime and $AliveTime + $Option{ping} > time()) {
	    debug(1, "ping probably ok: no receipt but message received");
	    return;
	}
	warnf("cannot ping broker: receipt not received");
	disconnect_broker(NICELY);
	return;
    }
    debug(1, "ping ok");
    $AliveTime = time();
}

#
# do the work
#

sub work () {
    my($action);

    while (1) {
	# make sure we are connected to a broker
	unless ($Broker) {
	    unless (connect_broker()) {
		sleep(BROKER_SLEEP);
		next;
	    }
	}
	# receive and process the next frame
	teval($Option{timeout}, sub {
	    $Broker->wait_for_frames(timeout => 1);
	});
	if ($@) {
	    dief("%s", $@) if $Aborted;
	    warnf("cannot receive message: %s", $@);
	    disconnect_broker(BRUTALLY);
	    sleep(BROKER_SLEEP);
	    next;
	}
	dief("aborting: no more active handlers")
	    unless grep($Handler{$_}{active}, keys(%Handler));
    } continue {
	ping();
	if ($Option{daemon} and $Option{pid_file}) {
	    $action = pf_check($Option{pid_file});
	    if ($action and $action eq "quit") {
		debug(0, "quitting since asked to!");
		last;
	    }
	    pf_touch($Option{pid_file});
	}
    }
}

#
# cleanup everything
#

sub cleanup () {
    $Cleaned = 1;
    disconnect_broker(NICELY) if $Broker;
    debug(0, "exiting");
    if ($Option{daemon}) {
        pf_unset($Option{pid_file}) if $Option{pid_file};
	    $UseSyslog = 0;
	    syslog_close();
    }
}

END {
    cleanup() unless $Cleaned;
}

#
# just do it
#

init();
setup();
work();
cleanup();

__END__

=head1 NAME

msg-to-handler - receive messages from brokers and dispatch them to handlers

=head1 SYNOPSIS

B<msg-to-handler> [I<OPTIONS>]

=head1 DESCRIPTION

This program reads its configuration file to find out which handlers
are active. For them, it connects to a messaging broker and subscribes
to the relevant destinations. Then, all the messages received are
dispatched to the appropriate handlers.

=head1 OPTIONS

=over

=item B<--broker> I<uri>

use this messaging broker

=item B<--broker-cache> I<path>

use this file to find a list of messaging brokers to be used

=item B<--config> I<path>

use the configuration stored in this file

=item B<--daemon>

run in daemon mode:
put itself in the background and detach from any terminal

=item B<--debug>, B<-d>

report debugging information;
can be used multiple times for increased verbosity

=item B<--error-dir> I<path>

use this directory to store the messages that triggered an error in
the handler

=item B<--help>, B<-h>, B<-?>

show some help

=item B<--manual>, B<-m>

show the complete man page

=item B<--pid-file> I<path>

use this file to store the pid of the running process (in daemon mode only)

=item B<--ping> I<integer>

ping the broker if nothing is received within the given time window (in seconds)

=item B<--quit>

tell the running msg-to-handler daemon (if any) to quit, via its pid file

=item B<--timeout> I<integer>

use this timeout (in seconds) when interacting with the broker

=back

=head1 CONFIGURATION SYNTAX

The file given via the B<--config> option must be in a format
recognised by L<Config::General>. Here is an example:

  <option>
    broker = stomp://gridmsg001.cern.ch:6163
  </option>
  <handler test>
    class = "GridMon::MsgHandler::Test"
    <subscribe>
      destination = /topic/test
    </subscribe>
  </handler>

The command line options have precedence over the options set in the
configuration file. Only the following options can be specified:
B<broker>, B<broker-cache>, B<error-dir>, B<pid-file>, B<ping> and
B<timeout>.

=head1 WRITING HANDLERS

A handler is a piece of code that will be called each time an
interesting message is received.

Handlers must have unique names. The name is specified in the
configuration file, in the E<lt>handlerE<gt> tag.

The E<lt>subscribeE<gt> part of the handler configuration defines how
the program should subscribe for the given handler.

Handlers must implemented as Perl classes that should derive from the
C<GridMon::MsgHandler> class. They must implement the C<handle> method
that will be called for each received message. This method will be
called with two arguments: the headers (as a hash) and the body (as a
string).

Handlers can use the C<debug> method to report a debugging message.

Handlers should use the C<success>, C<warning> or C<error> methods to
return their status. C<success> takes no argument and means that the
message has been successfully handled. C<warning> and C<error> do take
a single argument: a reason string. They both indicate that the
message has not been successfully handled. C<warning> means that the
handler is working fine while C<error> indicates a problem at the
handler level, in this case C<msg-to-handler> will disable this handler.

Here is sample code:

  sub handle : method {
      my($self, $headers, $body) = @_;
  
      $self->debug(1, "received message %s", $headers->{"message-id"});
      # process the message...
      return($self->warning("... why it failed ...")) unless ...;
      return($self->success());
  }

=head1 SEE ALSO

L<Config::General>,
L<GridMon::MsgHandler>,
L<Net::STOMP::Client>

=head1 AUTHOR

Lionel Cons L<http://cern.ch/lionel.cons>
