print STDERR "Connected to the Slack WebSocket.\n";
+ bless $connection, 'PingableConnection';
+ $connection->{'_pending_ping'} = undef;
+ $connection->{'_ping_sent'} = undef;
+ $connection->{'_stuck_timer'} = AnyEvent->timer(
+ after => 5.0,
+ interval => 5.0,
+ cb => sub {
+ if (defined($connection->{'_pending_ping'})) {
+ print STDERR "Timed out while waiting for pong on '$connection->{'_pending_ping'}'; disconnecting.\n";
+ $connection->close;
+ return;
+ }
+ chomp ($connection->{'_pending_ping'} = ctime(time));
+ $connection->{'_ping_sent'} = [Time::HiRes::gettimeofday];
+ $connection->ping($connection->{'_pending_ping'});
+ });
+ $connection->on_pong(sub {
+ my ($conn, $msg) = @_;
+ if (defined($connection->{'_pending_ping'}) && $connection->{'_pending_ping'} eq $msg->{'body'}) {
+ my $t0 = Time::HiRes::tv_interval($connection->{'_ping_sent'});
+ print STDERR "Received expected pong: $msg->{'body'} ($t0 seconds RTT)\n";
+ undef $connection->{'_pending_ping'};
+ undef $connection->{'_ping_sent'};
+ } else {
+ print STDERR "Received unexpected pong '$msg->{'body'}' (expected '$connection->{'_pending_ping'}'); disconnecting.\n";
+ $connection->close;
+ }
+ });
+
+
$connection->on(each_message => sub {
my ($conn, $message) = @_;
+ my $now = [Time::HiRes::gettimeofday];
print STDERR "Message: $message->{'body'}\n";
my $json = JSON::XS::decode_json($message->{'body'});
eval {
- if (exists($json->{'payload'}{'event'})) {
- handle_event($json->{'payload'}{'event'});
+ my $payload = $json->{'payload'};
+ if (exists($payload->{'event'})) {
+ if (exists($payload->{'event'}{'event_ts'}) &&
+ $payload->{'event'}{'event_ts'} =~ /(\d+)\.(\d+)/) {
+ my $elapsed = Time::HiRes::tv_interval([$1, $2], $now);
+ printf STDERR "Message used %.1f ms to reach us.\n", 1e3 * $elapsed;
+ }
+ handle_event($payload->{'event'});
+ }
+ if (defined($payload->{'type'}) && $payload->{'type'} eq 'block_actions') {
+ if (exists($payload->{'actions'}) &&
+ scalar @{$payload->{'actions'}} == 1 &&
+ exists($payload->{'actions'}[0]{'action_ts'}) &&
+ $payload->{'actions'}[0]{'action_ts'} =~ /(\d+)\.(\d+)/) {
+ my $elapsed = Time::HiRes::tv_interval([$1, $2], $now);
+ printf STDERR "Action used %.1f ms to reach us.\n", 1e3 * $elapsed;
+ }
+ handle_action($payload->{'user'}{'id'}, $payload->{'state'});
}
};
if ($@) {
my $user = $ev->{'user'};
if ($type eq 'message') {
- if ($ev->{'message'}{'text'} =~ /(20\d{2}-\d{2}-\d{2})/) {
+ my $text = $ev->{'text'} // $ev->{'message'}{'text'};
+ $text =~ s/<[^>]+>//g; # Don't match dates in channel names or URLs.
+ if ($text =~ /(20\d{2}-\d{2}-\d{2})/) {
# TODO: What if edits happen out-of-order?
my $date = $1;
my $channel = $ev->{'channel'};
- my $ts = $ev->{'message'}{'ts'};
+ my $ts = $ev->{'message'}{'ts'} // $ev->{'ts'};
print STDERR "Matching message {$channel, $ts} to date $date\n";
- $dbh->do('INSERT INTO message_sheet_link (channel, ts, sheet_title) VALUES (?,?,?)', undef,
+ $dbh->do('INSERT INTO message_sheet_link (channel, ts, sheet_title) VALUES (?,?,?) ON CONFLICT (channel,ts) DO UPDATE SET sheet_title=EXCLUDED.sheet_title', undef,
$channel, $ts, $date);
+ $dbh->do('INSERT INTO message (channel, ts, contents) VALUES (?,?,?) ON CONFLICT (channel,ts) DO UPDATE SET contents=EXCLUDED.contents', undef,
+ $channel, $ts, $text);
# Blow the cache.
$dbh->do('UPDATE message_sheet_link SET tab_name=NULL, tab_id=NULL WHERE channel=?', undef, $channel);
} else {
print STDERR "Type is $type (not a reaction added or removed); ignoring.\n";
}
}
+
+sub handle_action {
+ my ($userid, $state) = @_;
+
+ # We hope the messages are not coming in out-of-order if the user clicked several times.
+ # Using state instead of actions is a feeble protection on top of that...
+ for my $val (values %{$state->{'values'}}) {
+ if (exists($val->{'change_group'}) &&
+ exists($val->{'change_group'}{'selected_option'})) {
+ my $new_group = $val->{'change_group'}{'selected_option'}{'value'};
+ if (defined($new_group) && $new_group =~ /^G[1-4]\.[1-5]$/) {
+ print STDERR "User $userid moved to $new_group.\n";
+ $dbh->do('INSERT INTO users_to_move (userid, new_group) VALUES (?,?) ON CONFLICT (userid) DO UPDATE SET new_group=EXCLUDED.new_group', undef,
+ $userid, $new_group);
+ mark($dbh);
+ }
+ }
+ }
+}
+
+# Override a little packet processing so that we get to see pong messages.
+
+package PingableConnection;
+use parent 'AnyEvent::WebSocket::Connection';
+
+sub on_pong {
+ my ($self, $cb) = @_;
+ push @{ $self->{'_pong_cb'} }, $cb;
+}
+
+sub ping {
+ my ($self, $message) = @_;
+
+ $self->send(AnyEvent::WebSocket::Message->new({
+ opcode => 9, # ping
+ body => $message
+ }));
+}
+
+sub _process_message {
+ my ($self, $message) = @_;
+ if ($message->is_pong) {
+ $_->($self, $message) for @{$self->{'_pong_cb'}};
+ }
+ return $self->SUPER::_process_message($message);
+}