]> git.sesse.net Git - pistorm/blob - a314/a314.cc
Merge remote-tracking branch 'niklasekstrom/autoconfig_a314' into wip-crap
[pistorm] / a314 / a314.cc
1 /*
2  * Copyright 2020 Niklas Ekström
3  * Based on a314d daemon for A314.
4  */
5
6 #include <arpa/inet.h>
7
8 #include <linux/spi/spidev.h>
9 #include <linux/types.h>
10
11 #include <netinet/in.h>
12 #include <netinet/tcp.h>
13
14 #include <sys/epoll.h>
15 #include <sys/ioctl.h>
16 #include <sys/socket.h>
17 #include <sys/stat.h>
18 #include <sys/types.h>
19
20 #include <ctype.h>
21 #include <errno.h>
22 #include <fcntl.h>
23 #include <signal.h>
24 #include <stdint.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <time.h>
29 #include <unistd.h>
30 #include <pthread.h>
31
32 #include <algorithm>
33 #include <list>
34 #include <string>
35 #include <vector>
36
37 #include "a314.h"
38 #include "../m68k.h"
39
40 #define LOGGER_TRACE    1
41 #define LOGGER_DEBUG    2
42 #define LOGGER_INFO     3
43 #define LOGGER_WARN     4
44 #define LOGGER_ERROR    5
45
46 #define LOGGER_SHOW LOGGER_INFO
47
48 #define logger_trace(...) do { if (LOGGER_TRACE >= LOGGER_SHOW) fprintf(stdout, __VA_ARGS__); } while (0)
49 #define logger_debug(...) do { if (LOGGER_DEBUG >= LOGGER_SHOW) fprintf(stdout, __VA_ARGS__); } while (0)
50 #define logger_info(...) do { if (LOGGER_INFO >= LOGGER_SHOW) fprintf(stdout, __VA_ARGS__); } while (0)
51 #define logger_warn(...) do { if (LOGGER_WARN >= LOGGER_SHOW) fprintf(stdout, __VA_ARGS__); } while (0)
52 #define logger_error(...) do { if (LOGGER_ERROR >= LOGGER_SHOW) fprintf(stderr, __VA_ARGS__); } while (0)
53
54 // Events that are communicated via IRQ from Amiga to Raspberry.
55 #define R_EVENT_A2R_TAIL        1
56 #define R_EVENT_R2A_HEAD        2
57 #define R_EVENT_STARTED         4
58
59 // Events that are communicated from Raspberry to Amiga.
60 #define A_EVENT_R2A_TAIL        1
61 #define A_EVENT_A2R_HEAD        2
62
63 // Offset relative to communication area for queue pointers.
64 #define A2R_TAIL_OFFSET         0
65 #define R2A_HEAD_OFFSET         1
66 #define R2A_TAIL_OFFSET         2
67 #define A2R_HEAD_OFFSET         3
68
69 // Packets that are communicated across physical channels (A2R and R2A).
70 #define PKT_CONNECT             4
71 #define PKT_CONNECT_RESPONSE    5
72 #define PKT_DATA                6
73 #define PKT_EOS                 7
74 #define PKT_RESET               8
75
76 // Valid responses for PKT_CONNECT_RESPONSE.
77 #define CONNECT_OK              0
78 #define CONNECT_UNKNOWN_SERVICE 3
79
80 // Messages that are communicated between driver and client.
81 #define MSG_REGISTER_REQ        1
82 #define MSG_REGISTER_RES        2
83 #define MSG_DEREGISTER_REQ      3
84 #define MSG_DEREGISTER_RES      4
85 #define MSG_READ_MEM_REQ        5
86 #define MSG_READ_MEM_RES        6
87 #define MSG_WRITE_MEM_REQ       7
88 #define MSG_WRITE_MEM_RES       8
89 #define MSG_CONNECT             9
90 #define MSG_CONNECT_RESPONSE    10
91 #define MSG_DATA                11
92 #define MSG_EOS                 12
93 #define MSG_RESET               13
94
95 #define MSG_SUCCESS             1
96 #define MSG_FAIL                0
97
98 static sigset_t original_sigset;
99
100 static pthread_t thread_id;
101 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
102
103 static int server_socket = -1;
104
105 static int epfd = -1;
106 static int irq_fds[2];
107
108 extern "C" unsigned char fast_ram_array[];
109 extern "C" void write16(unsigned int address, unsigned int value);
110
111 unsigned int a314_base;
112 int a314_base_configured;
113
114 struct ComArea
115 {
116     uint8_t a_events;
117     uint8_t a_enable;
118     uint8_t r_events;
119     uint8_t r_enable; // Unused.
120
121     uint32_t mem_base;
122     uint32_t mem_size;
123
124     uint8_t a2r_tail;
125     uint8_t r2a_head;
126     uint8_t r2a_tail;
127     uint8_t a2r_head;
128
129     uint8_t a2r_buffer[256];
130     uint8_t r2a_buffer[256];
131 };
132
133 static ComArea ca;
134
135 static bool a314_device_started = false;
136
137 static uint8_t channel_status[4];
138 static uint8_t channel_status_updated = 0;
139
140 static uint8_t recv_buf[256];
141 static uint8_t send_buf[256];
142
143 struct LogicalChannel;
144 struct ClientConnection;
145
146 #pragma pack(push, 1)
147 struct MessageHeader
148 {
149     uint32_t length;
150     uint32_t stream_id;
151     uint8_t type;
152 }; //} __attribute__((packed));
153 #pragma pack(pop)
154
155 struct MessageBuffer
156 {
157     int pos;
158     std::vector<uint8_t> data;
159 };
160
161 struct RegisteredService
162 {
163     std::string name;
164     ClientConnection *cc;
165 };
166
167 struct PacketBuffer
168 {
169     int type;
170     std::vector<uint8_t> data;
171 };
172
173 struct ClientConnection
174 {
175     int fd;
176
177     int next_stream_id;
178
179     int bytes_read;
180     MessageHeader header;
181     std::vector<uint8_t> payload;
182
183     std::list<MessageBuffer> message_queue;
184
185     std::list<LogicalChannel*> associations;
186 };
187
188 struct LogicalChannel
189 {
190     int channel_id;
191
192     ClientConnection *association;
193     int stream_id;
194
195     bool got_eos_from_ami;
196     bool got_eos_from_client;
197
198     std::list<PacketBuffer> packet_queue;
199 };
200
201 static void remove_association(LogicalChannel *ch);
202 static void clear_packet_queue(LogicalChannel *ch);
203 static void create_and_enqueue_packet(LogicalChannel *ch, uint8_t type, uint8_t *data, uint8_t length);
204
205 static std::list<ClientConnection> connections;
206 static std::list<RegisteredService> services;
207 static std::list<LogicalChannel> channels;
208 static std::list<LogicalChannel*> send_queue;
209
210 struct OnDemandStart
211 {
212     std::string service_name;
213     std::string program;
214     std::vector<std::string> arguments;
215 };
216
217 std::vector<OnDemandStart> on_demand_services;
218
219 static void load_config_file(const char *filename)
220 {
221     FILE *f = fopen(filename, "rt");
222     if (f == nullptr)
223         return;
224
225     char line[256];
226     std::vector<char *> parts;
227
228     while (fgets(line, 256, f) != nullptr)
229     {
230         char org_line[256];
231         strcpy(org_line, line);
232
233         bool in_quotes = false;
234
235         int start = 0;
236         for (int i = 0; i < 256; i++)
237         {
238             if (line[i] == 0)
239             {
240                 if (start < i)
241                     parts.push_back(&line[start]);
242                 break;
243             }
244             else if (line[i] == '"')
245             {
246                 line[i] = 0;
247                 if (in_quotes)
248                     parts.push_back(&line[start]);
249                 in_quotes = !in_quotes;
250                 start = i + 1;
251             }
252             else if (isspace(line[i]) && !in_quotes)
253             {
254                 line[i] = 0;
255                 if (start < i)
256                     parts.push_back(&line[start]);
257                 start = i + 1;
258             }
259         }
260
261         if (parts.size() >= 2)
262         {
263             on_demand_services.emplace_back();
264             auto &e = on_demand_services.back();
265             e.service_name = parts[0];
266             e.program = parts[1];
267             for (int i = 1; i < parts.size(); i++)
268                 e.arguments.push_back(std::string(parts[i]));
269         }
270         else if (parts.size() != 0)
271             logger_warn("Invalid number of columns in configuration file line: %s\n", org_line);
272
273         parts.clear();
274     }
275
276     fclose(f);
277
278     if (on_demand_services.empty())
279         logger_warn("No registered services\n");
280 }
281
282 static int init_server_socket()
283 {
284     server_socket = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, 0);
285     if (server_socket == -1)
286     {
287         logger_error("Failed to create server socket\n");
288         return -1;
289     }
290
291     struct sockaddr_in address;
292     address.sin_family = AF_INET;
293     address.sin_addr.s_addr = INADDR_ANY;
294     address.sin_port = htons(7110);
295
296     int res = bind(server_socket, (struct sockaddr *)&address, sizeof(address));
297     if (res < 0)
298     {
299         logger_error("Bind to localhost:7110 failed\n");
300         return -1;
301     }
302
303     listen(server_socket, 16);
304
305     return 0;
306 }
307
308 static void shutdown_server_socket()
309 {
310     if (server_socket != -1)
311         close(server_socket);
312     server_socket = -1;
313 }
314
315 void create_and_send_msg(ClientConnection *cc, int type, int stream_id, uint8_t *data, int length)
316 {
317     MessageBuffer mb;
318     mb.pos = 0;
319     mb.data.resize(sizeof(MessageHeader) + length);
320
321     MessageHeader *mh = (MessageHeader *)&mb.data[0];
322     mh->length = length;
323     mh->stream_id = stream_id;
324     mh->type = type;
325     if (length && data)
326         memcpy(&mb.data[sizeof(MessageHeader)], data, length);
327
328     if (!cc->message_queue.empty())
329     {
330         cc->message_queue.push_back(std::move(mb));
331         return;
332     }
333
334     while (1)
335     {
336         int left = mb.data.size() - mb.pos;
337         uint8_t *src = &mb.data[mb.pos];
338         ssize_t r = write(cc->fd, src, left);
339         if (r == -1)
340         {
341             if (errno == EAGAIN || errno == EWOULDBLOCK)
342             {
343                 cc->message_queue.push_back(std::move(mb));
344                 return;
345             }
346             else if (errno == ECONNRESET)
347             {
348                 // Do not close connection here; it will get done at some other place.
349                 return;
350             }
351             else
352             {
353                 logger_error("Write failed unexpectedly with errno = %d\n", errno);
354                 exit(-1);
355             }
356         }
357
358         mb.pos += r;
359         if (r == left)
360         {
361             return;
362         }
363     }
364 }
365
366 static void handle_msg_register_req(ClientConnection *cc)
367 {
368     uint8_t result = MSG_FAIL;
369
370     std::string service_name((char *)&cc->payload[0], cc->payload.size());
371
372     auto it = services.begin();
373     for (; it != services.end(); it++)
374         if (it->name == service_name)
375             break;
376
377     if (it == services.end())
378     {
379         services.emplace_back();
380
381         RegisteredService &srv = services.back();
382         srv.cc = cc;
383         srv.name = std::move(service_name);
384
385         result = MSG_SUCCESS;
386     }
387
388     create_and_send_msg(cc, MSG_REGISTER_RES, 0, &result, 1);
389 }
390
391 static void handle_msg_deregister_req(ClientConnection *cc)
392 {
393     uint8_t result = MSG_FAIL;
394
395     std::string service_name((char *)&cc->payload[0], cc->payload.size());
396
397     for (auto it = services.begin(); it != services.end(); it++)
398     {
399         if (it->name == service_name && it->cc == cc)
400         {
401             services.erase(it);
402             result = MSG_SUCCESS;
403             break;
404         }
405     }
406
407     create_and_send_msg(cc, MSG_DEREGISTER_RES, 0, &result, 1);
408 }
409
410 static void handle_msg_read_mem_req(ClientConnection *cc)
411 {
412     uint32_t address = *(uint32_t *)&(cc->payload[0]);
413     uint32_t length = *(uint32_t *)&(cc->payload[4]);
414
415     create_and_send_msg(cc, MSG_READ_MEM_RES, 0, &fast_ram_array[address], length);
416 }
417
418 static void handle_msg_write_mem_req(ClientConnection *cc)
419 {
420     uint32_t address = *(uint32_t *)&(cc->payload[0]);
421     uint32_t length = cc->payload.size() - 4;
422
423     memcpy(&fast_ram_array[address], &(cc->payload[4]), length);
424
425     create_and_send_msg(cc, MSG_WRITE_MEM_RES, 0, nullptr, 0);
426 }
427
428 static LogicalChannel *get_associated_channel_by_stream_id(ClientConnection *cc, int stream_id)
429 {
430     for (auto ch : cc->associations)
431     {
432         if (ch->stream_id == stream_id)
433             return ch;
434     }
435     return nullptr;
436 }
437
438 static void handle_msg_connect(ClientConnection *cc)
439 {
440     // We currently don't handle that a client tries to connect to a service on the Amiga.
441 }
442
443 static void handle_msg_connect_response(ClientConnection *cc)
444 {
445     LogicalChannel *ch = get_associated_channel_by_stream_id(cc, cc->header.stream_id);
446     if (!ch)
447         return;
448
449     create_and_enqueue_packet(ch, PKT_CONNECT_RESPONSE, &cc->payload[0], cc->payload.size());
450
451     if (cc->payload[0] != CONNECT_OK)
452         remove_association(ch);
453 }
454
455 static void handle_msg_data(ClientConnection *cc)
456 {
457     LogicalChannel *ch = get_associated_channel_by_stream_id(cc, cc->header.stream_id);
458     if (!ch)
459         return;
460
461     create_and_enqueue_packet(ch, PKT_DATA, &cc->payload[0], cc->header.length);
462 }
463
464 static void handle_msg_eos(ClientConnection *cc)
465 {
466     LogicalChannel *ch = get_associated_channel_by_stream_id(cc, cc->header.stream_id);
467     if (!ch || ch->got_eos_from_client)
468         return;
469
470     ch->got_eos_from_client = true;
471
472     create_and_enqueue_packet(ch, PKT_EOS, nullptr, 0);
473
474     if (ch->got_eos_from_ami)
475         remove_association(ch);
476 }
477
478 static void handle_msg_reset(ClientConnection *cc)
479 {
480     LogicalChannel *ch = get_associated_channel_by_stream_id(cc, cc->header.stream_id);
481     if (!ch)
482         return;
483
484     remove_association(ch);
485
486     clear_packet_queue(ch);
487     create_and_enqueue_packet(ch, PKT_RESET, nullptr, 0);
488 }
489
490 static void handle_received_message(ClientConnection *cc)
491 {
492     switch (cc->header.type)
493     {
494     case MSG_REGISTER_REQ:
495         handle_msg_register_req(cc);
496         break;
497     case MSG_DEREGISTER_REQ:
498         handle_msg_deregister_req(cc);
499         break;
500     case MSG_READ_MEM_REQ:
501         handle_msg_read_mem_req(cc);
502         break;
503     case MSG_WRITE_MEM_REQ:
504         handle_msg_write_mem_req(cc);
505         break;
506     case MSG_CONNECT:
507         handle_msg_connect(cc);
508         break;
509     case MSG_CONNECT_RESPONSE:
510         handle_msg_connect_response(cc);
511         break;
512     case MSG_DATA:
513         handle_msg_data(cc);
514         break;
515     case MSG_EOS:
516         handle_msg_eos(cc);
517         break;
518     case MSG_RESET:
519         handle_msg_reset(cc);
520         break;
521     default:
522         // This is bad, probably should disconnect from client.
523         logger_warn("Received a message of unknown type from client\n");
524         break;
525     }
526 }
527
528 static void close_and_remove_connection(ClientConnection *cc)
529 {
530     shutdown(cc->fd, SHUT_WR);
531     close(cc->fd);
532
533     {
534         auto it = services.begin();
535         while (it != services.end())
536         {
537             if (it->cc == cc)
538                 it = services.erase(it);
539             else
540                 it++;
541         }
542     }
543
544     {
545         auto it = cc->associations.begin();
546         while (it != cc->associations.end())
547         {
548             auto ch = *it;
549
550             clear_packet_queue(ch);
551             create_and_enqueue_packet(ch, PKT_RESET, nullptr, 0);
552
553             ch->association = nullptr;
554             ch->stream_id = 0;
555
556             it = cc->associations.erase(it);
557         }
558     }
559
560     for (auto it = connections.begin(); it != connections.end(); it++)
561     {
562         if (&(*it) == cc)
563         {
564             connections.erase(it);
565             break;
566         }
567     }
568 }
569
570 static void remove_association(LogicalChannel *ch)
571 {
572     auto &ass = ch->association->associations;
573     ass.erase(std::find(ass.begin(), ass.end(), ch));
574
575     ch->association = nullptr;
576     ch->stream_id = 0;
577 }
578
579 static void clear_packet_queue(LogicalChannel *ch)
580 {
581     if (!ch->packet_queue.empty())
582     {
583         ch->packet_queue.clear();
584         send_queue.erase(std::find(send_queue.begin(), send_queue.end(), ch));
585     }
586 }
587
588 static void create_and_enqueue_packet(LogicalChannel *ch, uint8_t type, uint8_t *data, uint8_t length)
589 {
590     if (ch->packet_queue.empty())
591         send_queue.push_back(ch);
592
593     ch->packet_queue.emplace_back();
594
595     PacketBuffer &pb = ch->packet_queue.back();
596     pb.type = type;
597     pb.data.resize(length);
598     if (data && length)
599         memcpy(&pb.data[0], data, length);
600 }
601
602 static void handle_pkt_connect(int channel_id, uint8_t *data, int plen)
603 {
604     for (auto &ch : channels)
605     {
606         if (ch.channel_id == channel_id)
607         {
608             // We should handle this in some constructive way.
609             // This signals that should reset all logical channels.
610             logger_error("Received a CONNECT packet on a channel that was believed to be previously allocated\n");
611             exit(-1);
612         }
613     }
614
615     channels.emplace_back();
616
617     auto &ch = channels.back();
618
619     ch.channel_id = channel_id;
620     ch.association = nullptr;
621     ch.stream_id = 0;
622     ch.got_eos_from_ami = false;
623     ch.got_eos_from_client = false;
624
625     std::string service_name((char *)data, plen);
626
627     for (auto &srv : services)
628     {
629         if (srv.name == service_name)
630         {
631             ClientConnection *cc = srv.cc;
632
633             ch.association = cc;
634             ch.stream_id = cc->next_stream_id;
635
636             cc->next_stream_id += 2;
637             cc->associations.push_back(&ch);
638
639             create_and_send_msg(ch.association, MSG_CONNECT, ch.stream_id, data, plen);
640             return;
641         }
642     }
643
644     for (auto &on_demand : on_demand_services)
645     {
646         if (on_demand.service_name == service_name)
647         {
648             int fds[2];
649             int status = socketpair(AF_UNIX, SOCK_STREAM, 0, fds);
650             if (status != 0)
651             {
652                 logger_error("Unexpectedly not able to create socket pair.\n");
653                 exit(-1);
654             }
655
656             pid_t child = fork();
657             if (child == -1)
658             {
659                 logger_error("Unexpectedly was not able to fork.\n");
660                 exit(-1);
661             }
662             else if (child == 0)
663             {
664                 close(fds[0]);
665                 int fd = fds[1];
666
667                 // FIXE: The user should be configurable.
668                 setgid(1000);
669                 setuid(1000);
670                 putenv("HOME=/home/pi");
671
672                 std::vector<std::string> args(on_demand.arguments);
673                 args.push_back("-ondemand");
674                 args.push_back(std::to_string(fd));
675                 std::vector<const char *> args_arr;
676                 for (auto &arg : args)
677                     args_arr.push_back(arg.c_str());
678                 args_arr.push_back(nullptr);
679
680                 execvp(on_demand.program.c_str(), (char* const*) &args_arr[0]);
681             }
682             else
683             {
684                 close(fds[1]);
685                 int fd = fds[0];
686
687                 int status = fcntl(fd, F_SETFD, fcntl(fd, F_GETFD, 0) | FD_CLOEXEC);
688                 if (status == -1)
689                 {
690                     logger_error("Unexpectedly unable to set close-on-exec flag on client socket descriptor; errno = %d\n", errno);
691                     exit(-1);
692                 }
693
694                 status = fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
695                 if (status == -1)
696                 {
697                     logger_error("Unexpectedly unable to set client socket to non blocking; errno = %d\n", errno);
698                     exit(-1);
699                 }
700
701                 int flag = 1;
702                 setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(int));
703
704                 connections.emplace_back();
705
706                 ClientConnection &cc = connections.back();
707                 cc.fd = fd;
708                 cc.next_stream_id = 1;
709                 cc.bytes_read = 0;
710
711                 struct epoll_event ev;
712                 ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
713                 ev.data.fd = fd;
714                 if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) != 0)
715                 {
716                     logger_error("epoll_ctl() failed unexpectedly with errno = %d\n", errno);
717                     exit(-1);
718                 }
719
720                 services.emplace_back();
721
722                 RegisteredService &srv = services.back();
723                 srv.cc = &cc;
724                 srv.name = std::move(service_name);
725
726                 ch.association = &cc;
727                 ch.stream_id = cc.next_stream_id;
728
729                 cc.next_stream_id += 2;
730                 cc.associations.push_back(&ch);
731
732                 create_and_send_msg(ch.association, MSG_CONNECT, ch.stream_id, data, plen);
733                 return;
734             }
735         }
736     }
737
738     uint8_t response = CONNECT_UNKNOWN_SERVICE;
739     create_and_enqueue_packet(&ch, PKT_CONNECT_RESPONSE, &response, 1);
740 }
741
742 static void handle_pkt_data(int channel_id, uint8_t *data, int plen)
743 {
744     for (auto &ch : channels)
745     {
746         if (ch.channel_id == channel_id)
747         {
748             if (ch.association != nullptr && !ch.got_eos_from_ami)
749                 create_and_send_msg(ch.association, MSG_DATA, ch.stream_id, data, plen);
750
751             break;
752         }
753     }
754 }
755
756 static void handle_pkt_eos(int channel_id)
757 {
758     for (auto &ch : channels)
759     {
760         if (ch.channel_id == channel_id)
761         {
762             if (ch.association != nullptr && !ch.got_eos_from_ami)
763             {
764                 ch.got_eos_from_ami = true;
765
766                 create_and_send_msg(ch.association, MSG_EOS, ch.stream_id, nullptr, 0);
767
768                 if (ch.got_eos_from_client)
769                     remove_association(&ch);
770             }
771             break;
772         }
773     }
774 }
775
776 static void handle_pkt_reset(int channel_id)
777 {
778     for (auto &ch : channels)
779     {
780         if (ch.channel_id == channel_id)
781         {
782             clear_packet_queue(&ch);
783
784             if (ch.association != nullptr)
785             {
786                 create_and_send_msg(ch.association, MSG_RESET, ch.stream_id, nullptr, 0);
787                 remove_association(&ch);
788             }
789
790             break;
791         }
792     }
793 }
794
795 static void remove_channel_if_not_associated_and_empty_pq(int channel_id)
796 {
797     for (auto it = channels.begin(); it != channels.end(); it++)
798     {
799         if (it->channel_id == channel_id)
800         {
801             if (it->association == nullptr && it->packet_queue.empty())
802                 channels.erase(it);
803
804             break;
805         }
806     }
807 }
808
809 static void handle_received_pkt(int ptype, int channel_id, uint8_t *data, int plen)
810 {
811     if (ptype == PKT_CONNECT)
812         handle_pkt_connect(channel_id, data, plen);
813     else if (ptype == PKT_DATA)
814         handle_pkt_data(channel_id, data, plen);
815     else if (ptype == PKT_EOS)
816         handle_pkt_eos(channel_id);
817     else if (ptype == PKT_RESET)
818         handle_pkt_reset(channel_id);
819
820     remove_channel_if_not_associated_and_empty_pq(channel_id);
821 }
822
823 static bool receive_from_a2r()
824 {
825     int head = channel_status[A2R_HEAD_OFFSET];
826     int tail = channel_status[A2R_TAIL_OFFSET];
827     int len = (tail - head) & 255;
828     if (len == 0)
829         return false;
830
831     if (head < tail)
832     {
833         memcpy(recv_buf, &ca.a2r_buffer[head], len);
834     }
835     else
836     {
837         memcpy(recv_buf, &ca.a2r_buffer[head], 256 - head);
838
839         if (tail != 0)
840         {
841             memcpy(&recv_buf[len - tail], &ca.a2r_buffer[0], tail);
842         }
843     }
844
845     uint8_t *p = recv_buf;
846     while (p < recv_buf + len)
847     {
848         uint8_t plen = *p++;
849         uint8_t ptype = *p++;
850         uint8_t channel_id = *p++;
851         handle_received_pkt(ptype, channel_id, p, plen);
852         p += plen;
853     }
854
855     channel_status[A2R_HEAD_OFFSET] = channel_status[A2R_TAIL_OFFSET];
856     channel_status_updated |= A_EVENT_A2R_HEAD;
857     return true;
858 }
859
860 static bool flush_send_queue()
861 {
862     int tail = channel_status[R2A_TAIL_OFFSET];
863     int head = channel_status[R2A_HEAD_OFFSET];
864     int len = (tail - head) & 255;
865     int left = 255 - len;
866
867     int pos = 0;
868
869     while (!send_queue.empty())
870     {
871         LogicalChannel *ch = send_queue.front();
872         PacketBuffer &pb = ch->packet_queue.front();
873
874         int ptype = pb.type;
875         int plen = 3 + pb.data.size();
876
877         if (left < plen)
878             break;
879
880         send_buf[pos++] = pb.data.size();
881         send_buf[pos++] = ptype;
882         send_buf[pos++] = ch->channel_id;
883         memcpy(&send_buf[pos], &pb.data[0], pb.data.size());
884         pos += pb.data.size();
885
886         ch->packet_queue.pop_front();
887
888         send_queue.pop_front();
889
890         if (!ch->packet_queue.empty())
891             send_queue.push_back(ch);
892         else
893             remove_channel_if_not_associated_and_empty_pq(ch->channel_id);
894
895         left -= plen;
896     }
897
898     int to_write = pos;
899     if (!to_write)
900         return false;
901
902     uint8_t *p = send_buf;
903     int at_end = 256 - tail;
904     if (at_end < to_write)
905     {
906         memcpy(&ca.r2a_buffer[tail], p, at_end);
907         p += at_end;
908         to_write -= at_end;
909         tail = 0;
910     }
911
912     memcpy(&ca.r2a_buffer[tail], p, to_write);
913     tail = (tail + to_write) & 255;
914
915     channel_status[R2A_TAIL_OFFSET] = tail;
916     channel_status_updated |= A_EVENT_R2A_TAIL;
917     return true;
918 }
919
920 static void read_channel_status()
921 {
922     channel_status[A2R_TAIL_OFFSET] = ca.a2r_tail;
923     channel_status[R2A_HEAD_OFFSET] = ca.r2a_head;
924     channel_status[R2A_TAIL_OFFSET] = ca.r2a_tail;
925     channel_status[A2R_HEAD_OFFSET] = ca.a2r_head;
926     channel_status_updated = 0;
927 }
928
929 static void write_channel_status()
930 {
931     if (channel_status_updated != 0)
932     {
933         ca.r2a_tail = channel_status[R2A_TAIL_OFFSET];
934         ca.a2r_head = channel_status[A2R_HEAD_OFFSET];
935
936         pthread_mutex_lock(&mutex);
937         ca.a_events |= channel_status_updated;
938         pthread_mutex_unlock(&mutex);
939
940         channel_status_updated = 0;
941     }
942 }
943
944 static void close_all_logical_channels()
945 {
946     send_queue.clear();
947
948     auto it = channels.begin();
949     while (it != channels.end())
950     {
951         LogicalChannel &ch = *it;
952
953         if (ch.association != nullptr)
954         {
955             create_and_send_msg(ch.association, MSG_RESET, ch.stream_id, nullptr, 0);
956             remove_association(&ch);
957         }
958
959         it = channels.erase(it);
960     }
961 }
962
963 static void handle_a314_irq(uint8_t events)
964 {
965     if (events == 0)
966         return;
967
968     if (events & R_EVENT_STARTED)
969     {
970         if (!channels.empty())
971             logger_info("Received STARTED event while logical channels are open -- closing channels\n");
972
973         close_all_logical_channels();
974         a314_device_started = true;
975     }
976
977     if (!a314_device_started)
978         return;
979
980     read_channel_status();
981
982     bool any_rcvd = receive_from_a2r();
983     bool any_sent = flush_send_queue();
984
985     if (any_rcvd || any_sent)
986         write_channel_status();
987 }
988
989 static void handle_client_connection_event(ClientConnection *cc, struct epoll_event *ev)
990 {
991     if (ev->events & EPOLLERR)
992     {
993         logger_warn("Received EPOLLERR for client connection\n");
994         close_and_remove_connection(cc);
995         return;
996     }
997
998     if (ev->events & EPOLLIN)
999     {
1000         while (1)
1001         {
1002             int left;
1003             uint8_t *dst;
1004
1005             if (cc->payload.empty())
1006             {
1007                 left = sizeof(MessageHeader) - cc->bytes_read;
1008                 dst = (uint8_t *)&(cc->header) + cc->bytes_read;
1009             }
1010             else
1011             {
1012                 left = cc->header.length - cc->bytes_read;
1013                 dst = &cc->payload[cc->bytes_read];
1014             }
1015
1016             ssize_t r = read(cc->fd, dst, left);
1017             if (r == -1)
1018             {
1019                 if (errno == EAGAIN || errno == EWOULDBLOCK)
1020                     break;
1021
1022                 logger_error("Read failed unexpectedly with errno = %d\n", errno);
1023                 exit(-1);
1024             }
1025
1026             if (r == 0)
1027             {
1028                 logger_info("Received End-of-File on client connection\n");
1029                 close_and_remove_connection(cc);
1030                 return;
1031             }
1032             else
1033             {
1034                 cc->bytes_read += r;
1035                 left -= r;
1036                 if (!left)
1037                 {
1038                     if (cc->payload.empty())
1039                     {
1040                         if (cc->header.length == 0)
1041                         {
1042                             logger_trace("header: length=%d, stream_id=%d, type=%d\n", cc->header.length, cc->header.stream_id, cc->header.type);
1043                             handle_received_message(cc);
1044                         }
1045                         else
1046                         {
1047                             cc->payload.resize(cc->header.length);
1048                         }
1049                     }
1050                     else
1051                     {
1052                         logger_trace("header: length=%d, stream_id=%d, type=%d\n", cc->header.length, cc->header.stream_id, cc->header.type);
1053                         handle_received_message(cc);
1054                         cc->payload.clear();
1055                     }
1056                     cc->bytes_read = 0;
1057                 }
1058             }
1059         }
1060     }
1061
1062     if (ev->events & EPOLLOUT)
1063     {
1064         while (!cc->message_queue.empty())
1065         {
1066             MessageBuffer &mb = cc->message_queue.front();
1067
1068             int left = mb.data.size() - mb.pos;
1069             uint8_t *src = &mb.data[mb.pos];
1070             ssize_t r = write(cc->fd, src, left);
1071             if (r == -1)
1072             {
1073                 if (errno == EAGAIN || errno == EWOULDBLOCK)
1074                     break;
1075                 else if (errno == ECONNRESET)
1076                 {
1077                     close_and_remove_connection(cc);
1078                     return;
1079                 }
1080                 else
1081                 {
1082                     logger_error("Write failed unexpectedly with errno = %d\n", errno);
1083                     exit(-1);
1084                 }
1085             }
1086
1087             mb.pos += r;
1088             if (r == left)
1089                 cc->message_queue.pop_front();
1090         }
1091     }
1092 }
1093
1094 static void handle_server_socket_ready()
1095 {
1096     struct sockaddr_in address;
1097     int alen = sizeof(struct sockaddr_in);
1098
1099     int fd = accept(server_socket, (struct sockaddr *)&address, (socklen_t *)&alen);
1100     if (fd < 0)
1101     {
1102         logger_error("Accept failed unexpectedly with errno = %d\n", errno);
1103         exit(-1);
1104     }
1105
1106     int status = fcntl(fd, F_SETFD, fcntl(fd, F_GETFD, 0) | FD_CLOEXEC);
1107     if (status == -1)
1108     {
1109         logger_error("Unexpectedly unable to set close-on-exec flag on client socket descriptor; errno = %d\n", errno);
1110         exit(-1);
1111     }
1112
1113     status = fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
1114     if (status == -1)
1115     {
1116         logger_error("Unexpectedly unable to set client socket to non blocking; errno = %d\n", errno);
1117         exit(-1);
1118     }
1119
1120     int flag = 1;
1121     setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(int));
1122
1123     connections.emplace_back();
1124
1125     ClientConnection &cc = connections.back();
1126     cc.fd = fd;
1127     cc.next_stream_id = 1;
1128     cc.bytes_read = 0;
1129
1130     struct epoll_event ev;
1131     ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
1132     ev.data.fd = fd;
1133     if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) != 0)
1134     {
1135         logger_error("epoll_ctl() failed unexpectedly with errno = %d\n", errno);
1136         exit(-1);
1137     }
1138 }
1139
1140 static void main_loop()
1141 {
1142     bool shutting_down = false;
1143     bool done = false;
1144
1145     while (!done)
1146     {
1147         struct epoll_event ev;
1148         int timeout = shutting_down ? 10000 : -1;
1149         int n = epoll_pwait(epfd, &ev, 1, timeout, &original_sigset);
1150         if (n == -1)
1151         {
1152             if (errno == EINTR)
1153             {
1154                 logger_info("Received SIGTERM\n");
1155
1156                 shutdown_server_socket();
1157
1158                 while (!connections.empty())
1159                     close_and_remove_connection(&connections.front());
1160
1161                 if (flush_send_queue())
1162                     write_channel_status();
1163
1164                 if (!channels.empty())
1165                     shutting_down = true;
1166                 else
1167                     done = true;
1168             }
1169             else
1170             {
1171                 logger_error("epoll_pwait failed with unexpected errno = %d\n", errno);
1172                 exit(-1);
1173             }
1174         }
1175         else if (n == 0)
1176         {
1177             if (shutting_down)
1178                 done = true;
1179             else
1180             {
1181                 logger_error("epoll_pwait returned 0 which is unexpected since no timeout was set\n");
1182                 exit(-1);
1183             }
1184         }
1185         else
1186         {
1187             if (ev.data.fd == irq_fds[1])
1188             {
1189                 uint8_t events;
1190                 if (read(irq_fds[1], &events, 1) != 1)
1191                 {
1192                     logger_error("Read from interrupt socket pair, and unexpectedly didn't return 1 byte\n");
1193                     exit(-1);
1194                 }
1195
1196                 handle_a314_irq(events);
1197             }
1198             else if (ev.data.fd == server_socket)
1199             {
1200                 logger_trace("Epoll event: server socket is ready, events = %d\n", ev.events);
1201                 handle_server_socket_ready();
1202             }
1203             else
1204             {
1205                 logger_trace("Epoll event: client socket is ready, events = %d\n", ev.events);
1206
1207                 auto it = connections.begin();
1208                 for (; it != connections.end(); it++)
1209                 {
1210                     if (it->fd == ev.data.fd)
1211                         break;
1212                 }
1213
1214                 if (it == connections.end())
1215                 {
1216                     logger_error("Got notified about an event on a client connection that supposedly isn't currently open\n");
1217                     exit(-1);
1218                 }
1219
1220                 ClientConnection *cc = &(*it);
1221                 handle_client_connection_event(cc, &ev);
1222
1223                 if (flush_send_queue())
1224                     write_channel_status();
1225             }
1226         }
1227     }
1228 }
1229
1230 static void sigterm_handler(int signo)
1231 {
1232 }
1233
1234 static void init_sigterm()
1235 {
1236     /*
1237     sigset_t ss;
1238     sigemptyset(&ss);
1239     sigaddset(&ss, SIGTERM);
1240     sigprocmask(SIG_BLOCK, &ss, &original_sigset);
1241
1242     struct sigaction sa;
1243     sa.sa_handler = sigterm_handler;
1244     sigemptyset(&sa.sa_mask);
1245     sa.sa_flags = 0;
1246     sigaction(SIGTERM, &sa, NULL);
1247     */
1248 }
1249
1250 static int init_driver()
1251 {
1252     init_sigterm();
1253
1254     if (init_server_socket() != 0)
1255         return -1;
1256
1257     int err = socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0, irq_fds);
1258     if (err != 0)
1259     {
1260         logger_error("Unable to create socket pair, errno = %d\n", errno);
1261         return -1;
1262     }
1263
1264     epfd = epoll_create1(EPOLL_CLOEXEC);
1265     if (epfd == -1)
1266         return -1;
1267
1268     struct epoll_event ev;
1269     ev.events = EPOLLIN;
1270     ev.data.fd = irq_fds[1];
1271     if (epoll_ctl(epfd, EPOLL_CTL_ADD, irq_fds[1], &ev) != 0)
1272         return -1;
1273
1274     ev.events = EPOLLIN;
1275     ev.data.fd = server_socket;
1276     if (epoll_ctl(epfd, EPOLL_CTL_ADD, server_socket, &ev) != 0)
1277         return -1;
1278
1279     return 0;
1280 }
1281
1282 static void shutdown_driver()
1283 {
1284     if (epfd != -1)
1285         close(epfd);
1286
1287     shutdown_server_socket();
1288 }
1289
1290 static void *thread_start(void *arg)
1291 {
1292     main_loop();
1293     shutdown_driver();
1294     return NULL;
1295 }
1296
1297 static void write_r_events(uint8_t events)
1298 {
1299     if (write(irq_fds[0], &events, 1) != 1)
1300         logger_error("Write to interrupt socket pair did not return 1\n");
1301 }
1302
1303 int a314_init()
1304 {
1305     std::string conf_filename("/etc/opt/a314/a314d.conf");
1306
1307     load_config_file(conf_filename.c_str());
1308
1309     int err = init_driver();
1310     if (err < 0)
1311     {
1312         shutdown_driver();
1313         return -1;
1314     }
1315
1316     err = pthread_create(&thread_id, NULL, thread_start, NULL);
1317     if (err < 0)
1318     {
1319         logger_error("pthread_create failed with err = %d\n", err);
1320         return -2;
1321     }
1322
1323     return 0;
1324 }
1325
1326 void a314_set_mem_base_size(unsigned int base, unsigned int size)
1327 {
1328     ca.mem_base = htobe32(base);
1329     ca.mem_size = htobe32(size);
1330 }
1331
1332 void a314_process_events()
1333 {
1334     if (ca.a_events & ca.a_enable)
1335     {
1336         write16(0xdff09c, 0x8008);
1337         m68k_set_irq(2);
1338     }
1339 }
1340
1341 unsigned int a314_read_memory_8(unsigned int address)
1342 {
1343     if (address >= sizeof(ca))
1344         return 0;
1345
1346     uint8_t val;
1347     if (address == offsetof(ComArea, a_events))
1348     {
1349         pthread_mutex_lock(&mutex);
1350         val = ca.a_events;
1351         ca.a_events = 0;
1352         pthread_mutex_unlock(&mutex);
1353     }
1354     else
1355     {
1356         uint8_t *p = (uint8_t *)&ca;
1357         val = p[address];
1358     }
1359
1360     return val;
1361 }
1362
1363 unsigned int a314_read_memory_16(unsigned int address)
1364 {
1365     if (address >= sizeof(ca))
1366         return 0;
1367
1368     uint16_t *p = (uint16_t *)&ca;
1369     return be16toh(p[address >> 1]);
1370 }
1371
1372 unsigned int a314_read_memory_32(unsigned int address)
1373 {
1374     if (address >= sizeof(ca))
1375         return 0;
1376
1377     uint32_t *p = (uint32_t *)&ca;
1378     return be32toh(p[address >> 2]);
1379 }
1380
1381 void a314_write_memory_8(unsigned int address, unsigned int value)
1382 {
1383     if (address >= sizeof(ca))
1384         return;
1385
1386     switch (address)
1387     {
1388         case offsetof(ComArea, a_events):
1389             // a_events is not writable.
1390             break;
1391
1392         case offsetof(ComArea, r_events):
1393             if (value != 0)
1394                 write_r_events((uint8_t)value);
1395             break;
1396
1397         default:
1398         {
1399             uint8_t *p = (uint8_t *)&ca;
1400             p[address] = (uint8_t)value;
1401             break;
1402         }
1403     }
1404 }
1405
1406 void a314_write_memory_16(unsigned int address, unsigned int value)
1407 {
1408     // Not implemented.
1409 }
1410
1411 void a314_write_memory_32(unsigned int address, unsigned int value)
1412 {
1413     // Not implemented.
1414 }