9 use AnyEvent::WebSocket::Client;
11 binmode STDOUT, ':utf8';
12 binmode STDERR, ':utf8';
15 require '../include/config.pm';
17 my $ua = LWP::UserAgent->new('SKVidarLang/1.0');
21 my $dbh = DBI->connect("dbi:Pg:dbname=$config::dbname;host=127.0.0.1", $config::dbuser, $config::dbpass, {RaiseError => 1})
22 or warn "Could not connect to Postgres: " . DBI->errstr;
28 if (!defined($dbh) || !$dbh->ping) {
35 $ws_disconnected = AnyEvent->condvar;
36 my $response = $ua->post('https://slack.com/api/apps.connections.open',
37 Authorization => 'Bearer ' . $config::slack_app_token,
38 Content_type => 'application/x-www-form-urlencoded'
40 if (!$response->is_success) {
41 warn "apps.connections.open: " . $response->status_line;
45 my $msg_json = JSON::XS::decode_json($response->decoded_content);
46 if (!defined($msg_json) || !$msg_json->{'ok'}) {
47 warn "Something went wrong: " . $response->decoded_content;
52 my $ws_url = $msg_json->{'url'};
53 my $ws = AnyEvent::WebSocket::Client->new;
54 $ws->connect($ws_url)->cb(\&ws_cb);
55 $ws_disconnected->recv;
56 print STDERR "Disconnected; trying to reconnect.\n\n";
60 our $connection = eval { shift->recv };
64 $ws_disconnected->send;
68 print STDERR "Connected to the Slack WebSocket.\n";
70 bless $connection, 'PingableConnection';
71 $connection->{'_pending_ping'} = undef;
72 $connection->{'_ping_sent'} = undef;
73 $connection->{'_stuck_timer'} = AnyEvent->timer(
77 if (defined($connection->{'_pending_ping'})) {
78 print STDERR "Timed out while waiting for pong on '$connection->{'_pending_ping'}'; disconnecting.\n";
82 chomp ($connection->{'_pending_ping'} = ctime(time));
83 $connection->{'_ping_sent'} = [Time::HiRes::gettimeofday];
84 $connection->ping($connection->{'_pending_ping'});
86 $connection->on_pong(sub {
87 my ($conn, $msg) = @_;
88 if (defined($connection->{'_pending_ping'}) && $connection->{'_pending_ping'} eq $msg->{'body'}) {
89 my $t0 = Time::HiRes::tv_interval($connection->{'_ping_sent'});
90 print STDERR "Received expected pong: $msg->{'body'} ($t0 seconds RTT)\n";
91 undef $connection->{'_pending_ping'};
92 undef $connection->{'_ping_sent'};
94 print STDERR "Received unexpected pong '$msg->{'body'}' (expected '$connection->{'_pending_ping'}'); disconnecting.\n";
100 $connection->on(each_message => sub {
101 my ($conn, $message) = @_;
102 my $now = [Time::HiRes::gettimeofday];
103 print STDERR "Message: $message->{'body'}\n";
104 my $json = JSON::XS::decode_json($message->{'body'});
106 if (exists($json->{'payload'}{'event'})) {
107 if (exists($json->{'payload'}{'event'}{'event_ts'}) &&
108 $json->{'payload'}{'event'}{'event_ts'} =~ /(\d+)\.(\d+)/) {
109 my $elapsed = Time::HiRes::tv_interval([$1, $2], $now);
110 printf STDERR "Message used %.1f ms to reach us.\n", 1e3 * $elapsed;
112 handle_event($json->{'payload'}{'event'});
116 print STDERR "Error during handling: $@";
118 } elsif (exists($json->{'envelope_id'})) {
119 my $ack = { envelope_id => $json->{'envelope_id'} };
120 print STDERR "Ack: " . JSON::XS::encode_json($ack) . "\n";
121 $conn->send(JSON::XS::encode_json($ack))
122 or die "Error sending ack: $!";
126 $connection->on(finish => sub {
127 my ($conn, $msg) = @_;
129 print STDERR "Finished with message: $msg\n";
130 $ws_disconnected->send;
135 print STDERR "Marking that a sync is needed.\n";
136 $dbh->do('NOTIFY skvupdate');
141 if (!exists($ev->{'type'})) {
142 print STDERR "Has no type; ignoring.\n";
146 my $type = $ev->{'type'};
147 my $user = $ev->{'user'};
149 if ($type eq 'message') {
150 my $text = $ev->{'text'} // $ev->{'message'}{'text'};
151 $text =~ s/<[^>]+>//g; # Don't match dates in channel names or URLs.
152 if ($text =~ /(20\d{2}-\d{2}-\d{2})/) {
153 # TODO: What if edits happen out-of-order?
155 my $channel = $ev->{'channel'};
156 my $ts = $ev->{'message'}{'ts'} // $ev->{'ts'};
157 print STDERR "Matching message {$channel, $ts} to date $date\n";
158 $dbh->do('INSERT INTO message_sheet_link (channel, ts, sheet_title) VALUES (?,?,?) ON CONFLICT (channel,ts) DO UPDATE SET sheet_title=EXCLUDED.sheet_title', undef,
159 $channel, $ts, $date);
161 $dbh->do('UPDATE message_sheet_link SET tab_name=NULL, tab_id=NULL WHERE channel=?', undef, $channel);
163 print STDERR "No date found in message, ignoring\n";
168 my $reaction = $ev->{'reaction'};
169 my $channel = $ev->{'item'}{'channel'};
170 my $ts = $ev->{'item'}{'ts'};
171 my $event_ts = $ev->{'event_ts'};
173 if (!defined($channel) || !defined($ts)) {
174 print STDERR "Not reacting to a message; ignoring.\n";
178 if ($type eq 'reaction_added' || $type eq 'reaction_removed') {
179 $dbh->do('INSERT INTO reaction_log (userid, channel, ts, reaction, event_type, event_ts) VALUES (?,?,?,?,?,?)', undef,
180 $user, $channel, $ts, $reaction, $type, $event_ts);
183 print STDERR "Type is $type (not a reaction added or removed); ignoring.\n";
187 # Override a little packet processing so that we get to see pong messages.
189 package PingableConnection;
190 use parent 'AnyEvent::WebSocket::Connection';
193 my ($self, $cb) = @_;
194 push @{ $self->{'_pong_cb'} }, $cb;
198 my ($self, $message) = @_;
200 $self->send(AnyEvent::WebSocket::Message->new({
206 sub _process_message {
207 my ($self, $message) = @_;
208 if ($message->is_pong) {
209 $_->($self, $message) for @{$self->{'_pong_cb'}};
211 return $self->SUPER::_process_message($message);