my $ws = AnyEvent::WebSocket::Client->new;
$ws->connect($ws_url)->cb(\&ws_cb);
$ws_disconnected->recv;
- print "Disconnected; trying to reconnect.\n\n";
+ print STDERR "Disconnected; trying to reconnect.\n\n";
};
sub ws_cb {
return;
}
- print "Connected to the Slack WebSocket.\n";
+ print STDERR "Connected to the Slack WebSocket.\n";
+
+ bless $connection, 'PingableConnection';
+ $connection->{'_pending_ping'} = undef;
+ $connection->{'_ping_sent'} = undef;
+ $connection->{'_stuck_timer'} = AnyEvent->timer(
+ after => 5.0,
+ interval => 5.0,
+ cb => sub {
+ if (defined($connection->{'_pending_ping'})) {
+ print STDERR "Timed out while waiting for pong on '$connection->{'_pending_ping'}'; disconnecting.\n";
+ $connection->close;
+ return;
+ }
+ chomp ($connection->{'_pending_ping'} = ctime(time));
+ $connection->{'_ping_sent'} = [Time::HiRes::gettimeofday];
+ $connection->ping($connection->{'_pending_ping'});
+ });
+ $connection->on_pong(sub {
+ my ($conn, $msg) = @_;
+ if (defined($connection->{'_pending_ping'}) && $connection->{'_pending_ping'} eq $msg->{'body'}) {
+ my $t0 = Time::HiRes::tv_interval($connection->{'_ping_sent'});
+ print STDERR "Received expected pong: $msg->{'body'} ($t0 seconds RTT)\n";
+ undef $connection->{'_pending_ping'};
+ undef $connection->{'_ping_sent'};
+ } else {
+ print STDERR "Received unexpected pong '$msg->{'body'}' (expected '$connection->{'_pending_ping'}'); disconnecting.\n";
+ $connection->close;
+ }
+ });
+
$connection->on(each_message => sub {
my ($conn, $message) = @_;
- print "Message: $message->{'body'}\n";
+ my $now = [Time::HiRes::gettimeofday];
+ print STDERR "Message: $message->{'body'}\n";
my $json = JSON::XS::decode_json($message->{'body'});
eval {
if (exists($json->{'payload'}{'event'})) {
+ if (exists($json->{'payload'}{'event'}{'event_ts'}) &&
+ $json->{'payload'}{'event'}{'event_ts'} =~ /(\d+)\.(\d+)/) {
+ my $elapsed = Time::HiRes::tv_interval([$1, $2], $now);
+ printf STDERR "Message used %.1f ms to reach us.\n", 1e3 * $elapsed;
+ }
handle_event($json->{'payload'}{'event'});
}
};
if ($@) {
- print "Error during handling: $@";
+ print STDERR "Error during handling: $@";
die;
} elsif (exists($json->{'envelope_id'})) {
my $ack = { envelope_id => $json->{'envelope_id'} };
- print "Ack: " . JSON::XS::encode_json($ack) . "\n";
+ print STDERR "Ack: " . JSON::XS::encode_json($ack) . "\n";
$conn->send(JSON::XS::encode_json($ack))
or die "Error sending ack: $!";
}
$connection->on(finish => sub {
my ($conn, $msg) = @_;
$msg //= '(none)';
- print "Finished with message: $msg\n";
+ print STDERR "Finished with message: $msg\n";
$ws_disconnected->send;
});
}
sub mark {
print STDERR "Marking that a sync is needed.\n";
- open my $fh, ">", "/srv/skvidar-slack.sesse.net/marker";
- close $fh;
+ $dbh->do('NOTIFY skvupdate');
}
sub handle_event {
my $user = $ev->{'user'};
if ($type eq 'message') {
- if ($ev->{'message'}{'text'} =~ /(20\d{2}-\d{2}-\d{2})/) {
+ my $text = $ev->{'text'} // $ev->{'message'}{'text'};
+ $text =~ s/<[^>]+>//g; # Don't match dates in channel names or URLs.
+ if ($text =~ /(20\d{2}-\d{2}-\d{2})/) {
# TODO: What if edits happen out-of-order?
my $date = $1;
my $channel = $ev->{'channel'};
- my $ts = $ev->{'message'}{'ts'};
- print "Matching message {$channel, $ts} to date $date\n";
- $dbh->do('INSERT INTO message_sheet_link (channel, ts, sheet_title) VALUES (?,?,?)', undef,
+ my $ts = $ev->{'message'}{'ts'} // $ev->{'ts'};
+ print STDERR "Matching message {$channel, $ts} to date $date\n";
+ $dbh->do('INSERT INTO message_sheet_link (channel, ts, sheet_title) VALUES (?,?,?) ON CONFLICT (channel,ts) DO UPDATE SET sheet_title=EXCLUDED.sheet_title', undef,
$channel, $ts, $date);
+ # Blow the cache.
+ $dbh->do('UPDATE message_sheet_link SET tab_name=NULL, tab_id=NULL WHERE channel=?', undef, $channel);
} else {
print STDERR "No date found in message, ignoring\n";
}
if ($type eq 'reaction_added' || $type eq 'reaction_removed') {
$dbh->do('INSERT INTO reaction_log (userid, channel, ts, reaction, event_type, event_ts) VALUES (?,?,?,?,?,?)', undef,
$user, $channel, $ts, $reaction, $type, $event_ts);
- mark();
+ mark($dbh);
} else {
print STDERR "Type is $type (not a reaction added or removed); ignoring.\n";
}
}
+
+# Override a little packet processing so that we get to see pong messages.
+
+package PingableConnection;
+use parent 'AnyEvent::WebSocket::Connection';
+
+sub on_pong {
+ my ($self, $cb) = @_;
+ push @{ $self->{'_pong_cb'} }, $cb;
+}
+
+sub ping {
+ my ($self, $message) = @_;
+
+ $self->send(AnyEvent::WebSocket::Message->new({
+ opcode => 9, # ping
+ body => $message
+ }));
+}
+
+sub _process_message {
+ my ($self, $message) = @_;
+ if ($message->is_pong) {
+ $_->($self, $message) for @{$self->{'_pong_cb'}};
+ }
+ return $self->SUPER::_process_message($message);
+}