]> git.sesse.net Git - skvidarsync/blob - bin/ws.pl
Remove HKS 2024 code.
[skvidarsync] / bin / ws.pl
1 #! /usr/bin/perl
2 use strict;
3 use warnings;
4 no warnings qw(once);
5 use JSON::XS;
6 use LWP::UserAgent;
7 use DBI;
8 use POSIX;
9 use AnyEvent::WebSocket::Client;
10 use AnyEvent::Loop;
11 binmode STDOUT, ':utf8';
12 binmode STDERR, ':utf8';
13 use utf8;
14
15 require '../include/config.pm';
16
17 my $ua = LWP::UserAgent->new('SKVidarLang/1.0');
18 our $ws_disconnected;
19
20 sub db_connect {
21         my $dbh = DBI->connect("dbi:Pg:dbname=$config::dbname;host=127.0.0.1", $config::dbuser, $config::dbpass, {RaiseError => 1})
22                 or warn "Could not connect to Postgres: " . DBI->errstr;
23         return $dbh;
24 }
25
26 my $dbh;
27 while (1) {
28         if (!defined($dbh) || !$dbh->ping) {
29                 $dbh = db_connect();
30                 if (!defined($dbh)) {
31                         sleep 1;
32                         next;
33                 }
34         }
35         $ws_disconnected = AnyEvent->condvar;
36         my $response = $ua->post('https://slack.com/api/apps.connections.open',
37                 Authorization => 'Bearer ' . $config::slack_app_token,
38                 Content_type => 'application/x-www-form-urlencoded'
39         );
40         if (!$response->is_success) {
41                 warn "apps.connections.open: " . $response->status_line;
42                 sleep 1;
43                 next;   
44         }
45         my $msg_json = JSON::XS::decode_json($response->decoded_content);
46         if (!defined($msg_json) || !$msg_json->{'ok'}) {
47                 warn "Something went wrong: " . $response->decoded_content;
48                 sleep 1;
49                 next;
50         }
51
52         my $ws_url = $msg_json->{'url'};
53         my $ws = AnyEvent::WebSocket::Client->new;
54         $ws->connect($ws_url)->cb(\&ws_cb);
55         $ws_disconnected->recv;
56         print STDERR "Disconnected; trying to reconnect.\n\n";
57 };
58
59 sub ws_cb {
60         our $connection = eval { shift->recv };
61         if ($@) {
62                 warn $@;
63                 sleep 1;
64                 $ws_disconnected->send;
65                 return;
66         }
67
68         print STDERR "Connected to the Slack WebSocket.\n";
69
70         bless $connection, 'PingableConnection';
71         $connection->{'_pending_ping'} = undef;
72         $connection->{'_ping_sent'} = undef;
73         $connection->{'_stuck_timer'} = AnyEvent->timer(
74                 after => 5.0,
75                 interval => 5.0,
76                 cb => sub {
77                         if (defined($connection->{'_pending_ping'})) {
78                                 print STDERR "Timed out while waiting for pong on '$connection->{'_pending_ping'}'; disconnecting.\n";
79                                 $connection->close;
80                                 return;
81                         }
82                         chomp ($connection->{'_pending_ping'} = ctime(time));
83                         $connection->{'_ping_sent'} = [Time::HiRes::gettimeofday];
84                         $connection->ping($connection->{'_pending_ping'});
85                 });
86         $connection->on_pong(sub {
87                 my ($conn, $msg) = @_;
88                 if (defined($connection->{'_pending_ping'}) && $connection->{'_pending_ping'} eq $msg->{'body'}) {
89                         my $t0 = Time::HiRes::tv_interval($connection->{'_ping_sent'});
90                         print STDERR "Received expected pong: $msg->{'body'} ($t0 seconds RTT)\n";
91                         undef $connection->{'_pending_ping'};
92                         undef $connection->{'_ping_sent'};
93                 } else {
94                         print STDERR "Received unexpected pong '$msg->{'body'}' (expected '$connection->{'_pending_ping'}'); disconnecting.\n";
95                         $connection->close;
96                 }
97         });
98
99
100         $connection->on(each_message => sub {
101                 my ($conn, $message) = @_;
102                 my $now = [Time::HiRes::gettimeofday];
103                 print STDERR "Message: $message->{'body'}\n";
104                 my $json = JSON::XS::decode_json($message->{'body'});
105                 eval {
106                         if (exists($json->{'payload'}{'event'})) {
107                                 if (exists($json->{'payload'}{'event'}{'event_ts'}) &&
108                                     $json->{'payload'}{'event'}{'event_ts'} =~ /(\d+)\.(\d+)/) {
109                                         my $elapsed = Time::HiRes::tv_interval([$1, $2], $now);
110                                         printf STDERR "Message used %.1f ms to reach us.\n", 1e3 * $elapsed;
111                                 }
112                                 handle_event($json->{'payload'}{'event'});
113                         }
114                 };
115                 if ($@) {
116                         print STDERR "Error during handling: $@";
117                         die;
118                 } elsif (exists($json->{'envelope_id'})) {
119                         my $ack = { envelope_id => $json->{'envelope_id'} };
120                         print STDERR "Ack: " . JSON::XS::encode_json($ack) . "\n";
121                         $conn->send(JSON::XS::encode_json($ack))
122                                 or die "Error sending ack: $!";
123                 }
124         });
125
126         $connection->on(finish => sub {
127                 my ($conn, $msg) = @_;
128                 $msg //= '(none)';
129                 print STDERR "Finished with message: $msg\n";
130                 $ws_disconnected->send;
131         });
132 }
133
134 sub mark {
135         print STDERR "Marking that a sync is needed.\n";
136         $dbh->do('NOTIFY skvupdate');
137 }
138
139 sub handle_event {
140         my $ev = shift;
141         if (!exists($ev->{'type'})) {
142                 print STDERR "Has no type; ignoring.\n";
143                 return;
144         }
145
146         my $type = $ev->{'type'};
147         my $user = $ev->{'user'};
148
149         if ($type eq 'message') {
150                 my $text = $ev->{'text'} // $ev->{'message'}{'text'};
151                 $text =~ s/<#[A-Z0-9]+|[^>]+>//g; #  Don't match dates in channel names.
152                 if ($text =~ /(20\d{2}-\d{2}-\d{2})/) {
153                         # TODO: What if edits happen out-of-order?
154                         my $date = $1;
155                         my $channel = $ev->{'channel'};
156                         my $ts = $ev->{'message'}{'ts'} // $ev->{'ts'};
157                         print STDERR "Matching message {$channel, $ts} to date $date\n";
158                         $dbh->do('INSERT INTO message_sheet_link (channel, ts, sheet_title) VALUES (?,?,?) ON CONFLICT (channel,ts) DO UPDATE SET sheet_title=EXCLUDED.sheet_title', undef,
159                                 $channel, $ts, $date);
160                         # Blow the cache.
161                         $dbh->do('UPDATE message_sheet_link SET tab_name=NULL, tab_id=NULL WHERE channel=?', undef, $channel);
162                 } else {
163                         print STDERR "No date found in message, ignoring\n";
164                 }
165                 return;
166         }
167
168         my $reaction = $ev->{'reaction'};
169         my $channel = $ev->{'item'}{'channel'};
170         my $ts = $ev->{'item'}{'ts'};
171         my $event_ts = $ev->{'event_ts'};
172
173         if (!defined($channel) || !defined($ts)) {
174                 print STDERR "Not reacting to a message; ignoring.\n";
175                 return;
176         }
177
178         if ($type eq 'reaction_added' || $type eq 'reaction_removed') {
179                 $dbh->do('INSERT INTO reaction_log (userid, channel, ts, reaction, event_type, event_ts) VALUES (?,?,?,?,?,?)', undef,
180                         $user, $channel, $ts, $reaction, $type, $event_ts);
181                 mark($dbh);
182         } else {
183                 print STDERR "Type is $type (not a reaction added or removed); ignoring.\n";
184         }
185 }
186
187 # Override a little packet processing so that we get to see pong messages.
188
189 package PingableConnection;
190 use parent 'AnyEvent::WebSocket::Connection';
191
192 sub on_pong {
193         my ($self, $cb) = @_;
194         push @{ $self->{'_pong_cb'} }, $cb;
195 }
196
197 sub ping {
198         my ($self, $message) = @_;
199
200         $self->send(AnyEvent::WebSocket::Message->new({
201                 opcode => 9,  # ping
202                 body => $message
203         }));
204 }
205
206 sub _process_message {
207         my ($self, $message) = @_;
208         if ($message->is_pong) {
209                 $_->($self, $message) for @{$self->{'_pong_cb'}};
210         }
211         return $self->SUPER::_process_message($message);
212 }