From 746a402a7662a59b2d1761e8629864a8d1a18703 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Mon, 6 Nov 2023 22:03:10 +0100 Subject: [PATCH] Add a WebSocket daemon, hopefully with lower latency than HTTP events. --- bin/ws.pl | 146 ++++++++++++++++++++++++++++++++++++++++++++++ include/config.pm | 3 +- 2 files changed, 148 insertions(+), 1 deletion(-) create mode 100644 bin/ws.pl diff --git a/bin/ws.pl b/bin/ws.pl new file mode 100644 index 0000000..1e4da98 --- /dev/null +++ b/bin/ws.pl @@ -0,0 +1,146 @@ +#! /usr/bin/perl +use strict; +use warnings; +no warnings qw(once); +use JSON::XS; +use LWP::UserAgent; +use DBI; +use POSIX; +use AnyEvent::WebSocket::Client; +use AnyEvent::Loop; +binmode STDOUT, ':utf8'; +binmode STDERR, ':utf8'; +use utf8; + +require '../include/config.pm'; + +my $ua = LWP::UserAgent->new('SKVidarLang/1.0'); +our $ws_disconnected; + +sub db_connect { + my $dbh = DBI->connect("dbi:Pg:dbname=$config::dbname;host=127.0.0.1", $config::dbuser, $config::dbpass, {RaiseError => 1}) + or warn "Could not connect to Postgres: " . DBI->errstr; + return $dbh; +} + +my $dbh; +while (1) { + if (!defined($dbh) || !$dbh->ping) { + $dbh = db_connect(); + if (!defined($dbh)) { + sleep 1; + next; + } + } + $ws_disconnected = AnyEvent->condvar; + my $response = $ua->post('https://slack.com/api/apps.connections.open', + Authorization => 'Bearer ' . $config::slack_app_token, + Content_type => 'application/x-www-form-urlencoded' + ); + if (!$response->is_success) { + warn "apps.connections.open: " . $response->status_line; + sleep 1; + next; + } + my $msg_json = JSON::XS::decode_json($response->decoded_content); + if (!defined($msg_json) || !$msg_json->{'ok'}) { + warn "Something went wrong: " . $response->decoded_content; + sleep 1; + next; + } + + my $ws_url = $msg_json->{'url'}; + my $ws = AnyEvent::WebSocket::Client->new; + $ws->connect($ws_url)->cb(\&ws_cb); + $ws_disconnected->recv; + print "Disconnected; trying to reconnect.\n\n"; +}; + +sub ws_cb { + our $connection = eval { shift->recv }; + if ($@) { + warn $@; + sleep 1; + $ws_disconnected->send; + return; + } + + print "Connected to the Slack WebSocket.\n"; + + $connection->on(each_message => sub { + my ($conn, $message) = @_; + print "Message: $message->{'body'}\n"; + my $json = JSON::XS::decode_json($message->{'body'}); + eval { + if (exists($json->{'payload'}{'event'})) { + handle_event($json->{'payload'}{'event'}); + } + }; + if ($@) { + print "Error during handling: $@"; + die; + } elsif (exists($json->{'envelope_id'})) { + my $ack = { envelope_id => $json->{'envelope_id'} }; + print "Ack: " . JSON::XS::encode_json($ack) . "\n"; + $conn->send(JSON::XS::encode_json($ack)) + or die "Error sending ack: $!"; + } + }); + + $connection->on(finish => sub { + my ($conn, $msg) = @_; + $msg //= '(none)'; + print "Finished with message: $msg\n"; + $ws_disconnected->send; + }); +} + +sub mark { + print STDERR "Marking that a sync is needed.\n"; + open my $fh, ">", "/srv/skvidar-slack.sesse.net/marker"; + close $fh; +} + +sub handle_event { + my $ev = shift; + if (!exists($ev->{'type'})) { + print STDERR "Has no type; ignoring.\n"; + return; + } + + my $type = $ev->{'type'}; + my $user = $ev->{'user'}; + + if ($type eq 'message') { + if ($ev->{'message'}{'text'} =~ /(20\d{2}-\d{2}-\d{2})/) { + # TODO: What if edits happen out-of-order? + my $date = $1; + my $channel = $ev->{'channel'}; + my $ts = $ev->{'message'}{'ts'}; + print "Matching message {$channel, $ts} to date $date\n"; + $dbh->do('INSERT INTO message_sheet_link (channel, ts, sheet_title) VALUES (?,?,?)', undef, + $channel, $ts, $date); + } else { + print STDERR "No date found in message, ignoring\n"; + } + return; + } + + my $reaction = $ev->{'reaction'}; + my $channel = $ev->{'item'}{'channel'}; + my $ts = $ev->{'item'}{'ts'}; + my $event_ts = $ev->{'event_ts'}; + + if (!defined($channel) || !defined($ts)) { + print STDERR "Not reacting to a message; ignoring.\n"; + return; + } + + if ($type eq 'reaction_added' || $type eq 'reaction_removed') { + $dbh->do('INSERT INTO reaction_log (userid, channel, ts, reaction, event_type, event_ts) VALUES (?,?,?,?,?,?)', undef, + $user, $channel, $ts, $reaction, $type, $event_ts); + mark(); + } else { + print STDERR "Type is $type (not a reaction added or removed); ignoring.\n"; + } +} diff --git a/include/config.pm b/include/config.pm index e55cae3..2d95d41 100644 --- a/include/config.pm +++ b/include/config.pm @@ -2,7 +2,8 @@ package config; our $client_secret; our $signing_secret; -our $slack_oauth_token; +our $slack_oauth_token; # xoxb-* +our $slack_app_token; # xapp-* our $jwt_key = { "type" => "service_account", "project_id" => "solskogen-cubemap", -- 2.39.2