]> git.sesse.net Git - pistorm/blob - a314/a314.cc
Adapt piaudio to work on PiStorm + A314 emulation
[pistorm] / a314 / a314.cc
1 /*
2  * Copyright 2020-2021 Niklas Ekström
3  * Based on a314d daemon for A314.
4  */
5
6 #include <arpa/inet.h>
7
8 #include <linux/spi/spidev.h>
9 #include <linux/types.h>
10
11 #include <netinet/in.h>
12 #include <netinet/tcp.h>
13
14 #include <sys/epoll.h>
15 #include <sys/ioctl.h>
16 #include <sys/socket.h>
17 #include <sys/stat.h>
18 #include <sys/types.h>
19
20 #include <ctype.h>
21 #include <errno.h>
22 #include <fcntl.h>
23 #include <signal.h>
24 #include <stdint.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <time.h>
29 #include <unistd.h>
30 #include <pthread.h>
31
32 #include <algorithm>
33 #include <list>
34 #include <string>
35 #include <vector>
36
37 #include "a314.h"
38 // Silence stupid warning
39 #undef _GNU_SOURCE
40 #include "config_file/config_file.h"
41
42 extern "C" emulator_config *cfg;
43
44 #define LOGGER_TRACE    1
45 #define LOGGER_DEBUG    2
46 #define LOGGER_INFO     3
47 #define LOGGER_WARN     4
48 #define LOGGER_ERROR    5
49
50 #define LOGGER_SHOW LOGGER_INFO
51
52 #define logger_trace(...) do { if (LOGGER_TRACE >= LOGGER_SHOW) fprintf(stdout, __VA_ARGS__); } while (0)
53 #define logger_debug(...) do { if (LOGGER_DEBUG >= LOGGER_SHOW) fprintf(stdout, __VA_ARGS__); } while (0)
54 #define logger_info(...) do { if (LOGGER_INFO >= LOGGER_SHOW) fprintf(stdout, __VA_ARGS__); } while (0)
55 #define logger_warn(...) do { if (LOGGER_WARN >= LOGGER_SHOW) fprintf(stdout, __VA_ARGS__); } while (0)
56 #define logger_error(...) do { if (LOGGER_ERROR >= LOGGER_SHOW) fprintf(stderr, __VA_ARGS__); } while (0)
57
58 // Events that are communicated via IRQ from Amiga to Raspberry.
59 #define R_EVENT_A2R_TAIL        1
60 #define R_EVENT_R2A_HEAD        2
61 #define R_EVENT_STARTED         4
62
63 // Events that are communicated from Raspberry to Amiga.
64 #define A_EVENT_R2A_TAIL        1
65 #define A_EVENT_A2R_HEAD        2
66
67 // Offset relative to communication area for queue pointers.
68 #define A2R_TAIL_OFFSET         0
69 #define R2A_HEAD_OFFSET         1
70 #define R2A_TAIL_OFFSET         2
71 #define A2R_HEAD_OFFSET         3
72
73 // Packets that are communicated across physical channels (A2R and R2A).
74 #define PKT_CONNECT             4
75 #define PKT_CONNECT_RESPONSE    5
76 #define PKT_DATA                6
77 #define PKT_EOS                 7
78 #define PKT_RESET               8
79
80 // Valid responses for PKT_CONNECT_RESPONSE.
81 #define CONNECT_OK              0
82 #define CONNECT_UNKNOWN_SERVICE 3
83
84 // Messages that are communicated between driver and client.
85 #define MSG_REGISTER_REQ        1
86 #define MSG_REGISTER_RES        2
87 #define MSG_DEREGISTER_REQ      3
88 #define MSG_DEREGISTER_RES      4
89 #define MSG_READ_MEM_REQ        5
90 #define MSG_READ_MEM_RES        6
91 #define MSG_WRITE_MEM_REQ       7
92 #define MSG_WRITE_MEM_RES       8
93 #define MSG_CONNECT             9
94 #define MSG_CONNECT_RESPONSE    10
95 #define MSG_DATA                11
96 #define MSG_EOS                 12
97 #define MSG_RESET               13
98
99 #define MSG_SUCCESS             1
100 #define MSG_FAIL                0
101
102 static sigset_t original_sigset;
103
104 static pthread_t thread_id;
105 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
106
107 static int server_socket = -1;
108
109 static int epfd = -1;
110 static int irq_fds[2];
111
112 extern "C" void ps_write_16(unsigned int address, unsigned int value);
113
114 unsigned int a314_base;
115 int a314_base_configured;
116
117 struct ComArea
118 {
119     uint8_t a_events;
120     uint8_t a_enable;
121     uint8_t r_events;
122     uint8_t r_enable; // Unused.
123
124     uint32_t mem_base;
125     uint32_t mem_size;
126
127     uint8_t a2r_tail;
128     uint8_t r2a_head;
129     uint8_t r2a_tail;
130     uint8_t a2r_head;
131
132     uint8_t a2r_buffer[256];
133     uint8_t r2a_buffer[256];
134 };
135
136 static ComArea ca;
137
138 static bool a314_device_started = false;
139
140 static uint8_t channel_status[4];
141 static uint8_t channel_status_updated = 0;
142
143 static uint8_t recv_buf[256];
144 static uint8_t send_buf[256];
145
146 struct LogicalChannel;
147 struct ClientConnection;
148
149 #pragma pack(push, 1)
150 struct MessageHeader
151 {
152     uint32_t length;
153     uint32_t stream_id;
154     uint8_t type;
155 }; //} __attribute__((packed));
156 #pragma pack(pop)
157
158 struct MessageBuffer
159 {
160     int pos;
161     std::vector<uint8_t> data;
162 };
163
164 struct RegisteredService
165 {
166     std::string name;
167     ClientConnection *cc;
168 };
169
170 struct PacketBuffer
171 {
172     int type;
173     std::vector<uint8_t> data;
174 };
175
176 struct ClientConnection
177 {
178     int fd;
179
180     int next_stream_id;
181
182     int bytes_read;
183     MessageHeader header;
184     std::vector<uint8_t> payload;
185
186     std::list<MessageBuffer> message_queue;
187
188     std::list<LogicalChannel*> associations;
189 };
190
191 struct LogicalChannel
192 {
193     int channel_id;
194
195     ClientConnection *association;
196     int stream_id;
197
198     bool got_eos_from_ami;
199     bool got_eos_from_client;
200
201     std::list<PacketBuffer> packet_queue;
202 };
203
204 static void remove_association(LogicalChannel *ch);
205 static void clear_packet_queue(LogicalChannel *ch);
206 static void create_and_enqueue_packet(LogicalChannel *ch, uint8_t type, uint8_t *data, uint8_t length);
207
208 static std::list<ClientConnection> connections;
209 static std::list<RegisteredService> services;
210 static std::list<LogicalChannel> channels;
211 static std::list<LogicalChannel*> send_queue;
212
213 struct OnDemandStart
214 {
215     std::string service_name;
216     std::string program;
217     std::vector<std::string> arguments;
218 };
219
220 std::vector<OnDemandStart> on_demand_services;
221
222 std::string a314_config_file = "/etc/opt/a314/a314d.conf";
223 std::string home_env = "HOME=/home/pi";
224
225 static void load_config_file(const char *filename)
226 {
227     FILE *f = fopen(filename, "rt");
228     if (f == nullptr)
229         return;
230
231     char line[256];
232     std::vector<char *> parts;
233
234     while (fgets(line, 256, f) != nullptr)
235     {
236         char org_line[256];
237         strcpy(org_line, line);
238
239         bool in_quotes = false;
240
241         int start = 0;
242         for (int i = 0; i < 256; i++)
243         {
244             if (line[i] == 0)
245             {
246                 if (start < i)
247                     parts.push_back(&line[start]);
248                 break;
249             }
250             else if (line[i] == '"')
251             {
252                 line[i] = 0;
253                 if (in_quotes)
254                     parts.push_back(&line[start]);
255                 in_quotes = !in_quotes;
256                 start = i + 1;
257             }
258             else if (isspace(line[i]) && !in_quotes)
259             {
260                 line[i] = 0;
261                 if (start < i)
262                     parts.push_back(&line[start]);
263                 start = i + 1;
264             }
265         }
266
267         if (parts.size() >= 2)
268         {
269             on_demand_services.emplace_back();
270             auto &e = on_demand_services.back();
271             e.service_name = parts[0];
272             e.program = parts[1];
273             for (int i = 1; i < parts.size(); i++)
274                 e.arguments.push_back(std::string(parts[i]));
275         }
276         else if (parts.size() != 0)
277             logger_warn("Invalid number of columns in configuration file line: %s\n", org_line);
278
279         parts.clear();
280     }
281
282     fclose(f);
283
284     if (on_demand_services.empty())
285         logger_warn("No registered services\n");
286 }
287
288 static int init_server_socket()
289 {
290     server_socket = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, 0);
291     if (server_socket == -1)
292     {
293         logger_error("Failed to create server socket\n");
294         return -1;
295     }
296
297     struct sockaddr_in address;
298     address.sin_family = AF_INET;
299     address.sin_addr.s_addr = INADDR_ANY;
300     address.sin_port = htons(7110);
301
302     int res = bind(server_socket, (struct sockaddr *)&address, sizeof(address));
303     if (res < 0)
304     {
305         logger_error("Bind to localhost:7110 failed\n");
306         return -1;
307     }
308
309     listen(server_socket, 16);
310
311     return 0;
312 }
313
314 static void shutdown_server_socket()
315 {
316     if (server_socket != -1)
317         close(server_socket);
318     server_socket = -1;
319 }
320
321 void create_and_send_msg(ClientConnection *cc, int type, int stream_id, uint8_t *data, int length)
322 {
323     MessageBuffer mb;
324     mb.pos = 0;
325     mb.data.resize(sizeof(MessageHeader) + length);
326
327     MessageHeader *mh = (MessageHeader *)&mb.data[0];
328     mh->length = length;
329     mh->stream_id = stream_id;
330     mh->type = type;
331     if (length && data)
332         memcpy(&mb.data[sizeof(MessageHeader)], data, length);
333
334     if (!cc->message_queue.empty())
335     {
336         cc->message_queue.push_back(std::move(mb));
337         return;
338     }
339
340     while (1)
341     {
342         int left = mb.data.size() - mb.pos;
343         uint8_t *src = &mb.data[mb.pos];
344         ssize_t r = write(cc->fd, src, left);
345         if (r == -1)
346         {
347             if (errno == EAGAIN || errno == EWOULDBLOCK)
348             {
349                 cc->message_queue.push_back(std::move(mb));
350                 return;
351             }
352             else if (errno == ECONNRESET)
353             {
354                 // Do not close connection here; it will get done at some other place.
355                 return;
356             }
357             else
358             {
359                 logger_error("Write failed unexpectedly with errno = %d\n", errno);
360                 exit(-1);
361             }
362         }
363
364         mb.pos += r;
365         if (r == left)
366         {
367             return;
368         }
369     }
370 }
371
372 static void handle_msg_register_req(ClientConnection *cc)
373 {
374     uint8_t result = MSG_FAIL;
375
376     std::string service_name((char *)&cc->payload[0], cc->payload.size());
377
378     auto it = services.begin();
379     for (; it != services.end(); it++)
380         if (it->name == service_name)
381             break;
382
383     if (it == services.end())
384     {
385         services.emplace_back();
386
387         RegisteredService &srv = services.back();
388         srv.cc = cc;
389         srv.name = std::move(service_name);
390
391         result = MSG_SUCCESS;
392     }
393
394     create_and_send_msg(cc, MSG_REGISTER_RES, 0, &result, 1);
395 }
396
397 static void handle_msg_deregister_req(ClientConnection *cc)
398 {
399     uint8_t result = MSG_FAIL;
400
401     std::string service_name((char *)&cc->payload[0], cc->payload.size());
402
403     for (auto it = services.begin(); it != services.end(); it++)
404     {
405         if (it->name == service_name && it->cc == cc)
406         {
407             services.erase(it);
408             result = MSG_SUCCESS;
409             break;
410         }
411     }
412
413     create_and_send_msg(cc, MSG_DEREGISTER_RES, 0, &result, 1);
414 }
415
416 static void handle_msg_read_mem_req(ClientConnection *cc)
417 {
418     uint32_t address = *(uint32_t *)&(cc->payload[0]);
419     uint32_t length = *(uint32_t *)&(cc->payload[4]);
420
421     if (get_mapped_item_by_address(cfg, address) != -1) {
422         int32_t index = get_mapped_item_by_address(cfg, address);
423         uint8_t *map = &cfg->map_data[index][address - cfg->map_offset[index]];
424         create_and_send_msg(cc, MSG_READ_MEM_RES, 0, map, length);
425     }
426     else // FIXME
427         printf("help.\n");
428 }
429
430 static void handle_msg_write_mem_req(ClientConnection *cc)
431 {
432     uint32_t address = *(uint32_t *)&(cc->payload[0]);
433     uint32_t length = cc->payload.size() - 4;
434
435
436     if (get_mapped_item_by_address(cfg, address) != -1) {
437         int32_t index = get_mapped_item_by_address(cfg, address);
438         uint8_t *map = &cfg->map_data[index][address - cfg->map_offset[index]];
439         memcpy(map, &(cc->payload[4]), length);
440     }
441     else // FIXME
442         printf("help 2.\n");
443
444     create_and_send_msg(cc, MSG_WRITE_MEM_RES, 0, nullptr, 0);
445 }
446
447 static LogicalChannel *get_associated_channel_by_stream_id(ClientConnection *cc, int stream_id)
448 {
449     for (auto ch : cc->associations)
450     {
451         if (ch->stream_id == stream_id)
452             return ch;
453     }
454     return nullptr;
455 }
456
457 static void handle_msg_connect(ClientConnection *cc)
458 {
459     // We currently don't handle that a client tries to connect to a service on the Amiga.
460 }
461
462 static void handle_msg_connect_response(ClientConnection *cc)
463 {
464     LogicalChannel *ch = get_associated_channel_by_stream_id(cc, cc->header.stream_id);
465     if (!ch)
466         return;
467
468     create_and_enqueue_packet(ch, PKT_CONNECT_RESPONSE, &cc->payload[0], cc->payload.size());
469
470     if (cc->payload[0] != CONNECT_OK)
471         remove_association(ch);
472 }
473
474 static void handle_msg_data(ClientConnection *cc)
475 {
476     LogicalChannel *ch = get_associated_channel_by_stream_id(cc, cc->header.stream_id);
477     if (!ch)
478         return;
479
480     create_and_enqueue_packet(ch, PKT_DATA, &cc->payload[0], cc->header.length);
481 }
482
483 static void handle_msg_eos(ClientConnection *cc)
484 {
485     LogicalChannel *ch = get_associated_channel_by_stream_id(cc, cc->header.stream_id);
486     if (!ch || ch->got_eos_from_client)
487         return;
488
489     ch->got_eos_from_client = true;
490
491     create_and_enqueue_packet(ch, PKT_EOS, nullptr, 0);
492
493     if (ch->got_eos_from_ami)
494         remove_association(ch);
495 }
496
497 static void handle_msg_reset(ClientConnection *cc)
498 {
499     LogicalChannel *ch = get_associated_channel_by_stream_id(cc, cc->header.stream_id);
500     if (!ch)
501         return;
502
503     remove_association(ch);
504
505     clear_packet_queue(ch);
506     create_and_enqueue_packet(ch, PKT_RESET, nullptr, 0);
507 }
508
509 static void handle_received_message(ClientConnection *cc)
510 {
511     switch (cc->header.type)
512     {
513     case MSG_REGISTER_REQ:
514         handle_msg_register_req(cc);
515         break;
516     case MSG_DEREGISTER_REQ:
517         handle_msg_deregister_req(cc);
518         break;
519     case MSG_READ_MEM_REQ:
520         handle_msg_read_mem_req(cc);
521         break;
522     case MSG_WRITE_MEM_REQ:
523         handle_msg_write_mem_req(cc);
524         break;
525     case MSG_CONNECT:
526         handle_msg_connect(cc);
527         break;
528     case MSG_CONNECT_RESPONSE:
529         handle_msg_connect_response(cc);
530         break;
531     case MSG_DATA:
532         handle_msg_data(cc);
533         break;
534     case MSG_EOS:
535         handle_msg_eos(cc);
536         break;
537     case MSG_RESET:
538         handle_msg_reset(cc);
539         break;
540     default:
541         // This is bad, probably should disconnect from client.
542         logger_warn("Received a message of unknown type from client\n");
543         break;
544     }
545 }
546
547 static void close_and_remove_connection(ClientConnection *cc)
548 {
549     shutdown(cc->fd, SHUT_WR);
550     close(cc->fd);
551
552     {
553         auto it = services.begin();
554         while (it != services.end())
555         {
556             if (it->cc == cc)
557                 it = services.erase(it);
558             else
559                 it++;
560         }
561     }
562
563     {
564         auto it = cc->associations.begin();
565         while (it != cc->associations.end())
566         {
567             auto ch = *it;
568
569             clear_packet_queue(ch);
570             create_and_enqueue_packet(ch, PKT_RESET, nullptr, 0);
571
572             ch->association = nullptr;
573             ch->stream_id = 0;
574
575             it = cc->associations.erase(it);
576         }
577     }
578
579     for (auto it = connections.begin(); it != connections.end(); it++)
580     {
581         if (&(*it) == cc)
582         {
583             connections.erase(it);
584             break;
585         }
586     }
587 }
588
589 static void remove_association(LogicalChannel *ch)
590 {
591     auto &ass = ch->association->associations;
592     ass.erase(std::find(ass.begin(), ass.end(), ch));
593
594     ch->association = nullptr;
595     ch->stream_id = 0;
596 }
597
598 static void clear_packet_queue(LogicalChannel *ch)
599 {
600     if (!ch->packet_queue.empty())
601     {
602         ch->packet_queue.clear();
603         send_queue.erase(std::find(send_queue.begin(), send_queue.end(), ch));
604     }
605 }
606
607 static void create_and_enqueue_packet(LogicalChannel *ch, uint8_t type, uint8_t *data, uint8_t length)
608 {
609     if (ch->packet_queue.empty())
610         send_queue.push_back(ch);
611
612     ch->packet_queue.emplace_back();
613
614     PacketBuffer &pb = ch->packet_queue.back();
615     pb.type = type;
616     pb.data.resize(length);
617     if (data && length)
618         memcpy(&pb.data[0], data, length);
619 }
620
621 static void handle_pkt_connect(int channel_id, uint8_t *data, int plen)
622 {
623     for (auto &ch : channels)
624     {
625         if (ch.channel_id == channel_id)
626         {
627             // We should handle this in some constructive way.
628             // This signals that should reset all logical channels.
629             logger_error("Received a CONNECT packet on a channel that was believed to be previously allocated\n");
630             exit(-1);
631         }
632     }
633
634     channels.emplace_back();
635
636     auto &ch = channels.back();
637
638     ch.channel_id = channel_id;
639     ch.association = nullptr;
640     ch.stream_id = 0;
641     ch.got_eos_from_ami = false;
642     ch.got_eos_from_client = false;
643
644     std::string service_name((char *)data, plen);
645
646     for (auto &srv : services)
647     {
648         if (srv.name == service_name)
649         {
650             ClientConnection *cc = srv.cc;
651
652             ch.association = cc;
653             ch.stream_id = cc->next_stream_id;
654
655             cc->next_stream_id += 2;
656             cc->associations.push_back(&ch);
657
658             create_and_send_msg(ch.association, MSG_CONNECT, ch.stream_id, data, plen);
659             return;
660         }
661     }
662
663     for (auto &on_demand : on_demand_services)
664     {
665         if (on_demand.service_name == service_name)
666         {
667             int fds[2];
668             int status = socketpair(AF_UNIX, SOCK_STREAM, 0, fds);
669             if (status != 0)
670             {
671                 logger_error("Unexpectedly not able to create socket pair.\n");
672                 exit(-1);
673             }
674
675             pid_t child = fork();
676             if (child == -1)
677             {
678                 logger_error("Unexpectedly was not able to fork.\n");
679                 exit(-1);
680             }
681             else if (child == 0)
682             {
683                 close(fds[0]);
684                 int fd = fds[1];
685
686                 // FIXE: The user should be configurable.
687                 setgid(1000);
688                 setuid(1000);
689                 putenv((char *)home_env.c_str());
690
691                 std::vector<std::string> args(on_demand.arguments);
692                 args.push_back("-ondemand");
693                 args.push_back(std::to_string(fd));
694                 std::vector<const char *> args_arr;
695                 for (auto &arg : args)
696                     args_arr.push_back(arg.c_str());
697                 args_arr.push_back(nullptr);
698
699                 execvp(on_demand.program.c_str(), (char* const*) &args_arr[0]);
700             }
701             else
702             {
703                 close(fds[1]);
704                 int fd = fds[0];
705
706                 int status = fcntl(fd, F_SETFD, fcntl(fd, F_GETFD, 0) | FD_CLOEXEC);
707                 if (status == -1)
708                 {
709                     logger_error("Unexpectedly unable to set close-on-exec flag on client socket descriptor; errno = %d\n", errno);
710                     exit(-1);
711                 }
712
713                 status = fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
714                 if (status == -1)
715                 {
716                     logger_error("Unexpectedly unable to set client socket to non blocking; errno = %d\n", errno);
717                     exit(-1);
718                 }
719
720                 int flag = 1;
721                 setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(int));
722
723                 connections.emplace_back();
724
725                 ClientConnection &cc = connections.back();
726                 cc.fd = fd;
727                 cc.next_stream_id = 1;
728                 cc.bytes_read = 0;
729
730                 struct epoll_event ev;
731                 ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
732                 ev.data.fd = fd;
733                 if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) != 0)
734                 {
735                     logger_error("epoll_ctl() failed unexpectedly with errno = %d\n", errno);
736                     exit(-1);
737                 }
738
739                 services.emplace_back();
740
741                 RegisteredService &srv = services.back();
742                 srv.cc = &cc;
743                 srv.name = std::move(service_name);
744
745                 ch.association = &cc;
746                 ch.stream_id = cc.next_stream_id;
747
748                 cc.next_stream_id += 2;
749                 cc.associations.push_back(&ch);
750
751                 create_and_send_msg(ch.association, MSG_CONNECT, ch.stream_id, data, plen);
752                 return;
753             }
754         }
755     }
756
757     uint8_t response = CONNECT_UNKNOWN_SERVICE;
758     create_and_enqueue_packet(&ch, PKT_CONNECT_RESPONSE, &response, 1);
759 }
760
761 static void handle_pkt_data(int channel_id, uint8_t *data, int plen)
762 {
763     for (auto &ch : channels)
764     {
765         if (ch.channel_id == channel_id)
766         {
767             if (ch.association != nullptr && !ch.got_eos_from_ami)
768                 create_and_send_msg(ch.association, MSG_DATA, ch.stream_id, data, plen);
769
770             break;
771         }
772     }
773 }
774
775 static void handle_pkt_eos(int channel_id)
776 {
777     for (auto &ch : channels)
778     {
779         if (ch.channel_id == channel_id)
780         {
781             if (ch.association != nullptr && !ch.got_eos_from_ami)
782             {
783                 ch.got_eos_from_ami = true;
784
785                 create_and_send_msg(ch.association, MSG_EOS, ch.stream_id, nullptr, 0);
786
787                 if (ch.got_eos_from_client)
788                     remove_association(&ch);
789             }
790             break;
791         }
792     }
793 }
794
795 static void handle_pkt_reset(int channel_id)
796 {
797     for (auto &ch : channels)
798     {
799         if (ch.channel_id == channel_id)
800         {
801             clear_packet_queue(&ch);
802
803             if (ch.association != nullptr)
804             {
805                 create_and_send_msg(ch.association, MSG_RESET, ch.stream_id, nullptr, 0);
806                 remove_association(&ch);
807             }
808
809             break;
810         }
811     }
812 }
813
814 static void remove_channel_if_not_associated_and_empty_pq(int channel_id)
815 {
816     for (auto it = channels.begin(); it != channels.end(); it++)
817     {
818         if (it->channel_id == channel_id)
819         {
820             if (it->association == nullptr && it->packet_queue.empty())
821                 channels.erase(it);
822
823             break;
824         }
825     }
826 }
827
828 static void handle_received_pkt(int ptype, int channel_id, uint8_t *data, int plen)
829 {
830     if (ptype == PKT_CONNECT)
831         handle_pkt_connect(channel_id, data, plen);
832     else if (ptype == PKT_DATA)
833         handle_pkt_data(channel_id, data, plen);
834     else if (ptype == PKT_EOS)
835         handle_pkt_eos(channel_id);
836     else if (ptype == PKT_RESET)
837         handle_pkt_reset(channel_id);
838
839     remove_channel_if_not_associated_and_empty_pq(channel_id);
840 }
841
842 static bool receive_from_a2r()
843 {
844     int head = channel_status[A2R_HEAD_OFFSET];
845     int tail = channel_status[A2R_TAIL_OFFSET];
846     int len = (tail - head) & 255;
847     if (len == 0)
848         return false;
849
850     if (head < tail)
851     {
852         memcpy(recv_buf, &ca.a2r_buffer[head], len);
853     }
854     else
855     {
856         memcpy(recv_buf, &ca.a2r_buffer[head], 256 - head);
857
858         if (tail != 0)
859         {
860             memcpy(&recv_buf[len - tail], &ca.a2r_buffer[0], tail);
861         }
862     }
863
864     uint8_t *p = recv_buf;
865     while (p < recv_buf + len)
866     {
867         uint8_t plen = *p++;
868         uint8_t ptype = *p++;
869         uint8_t channel_id = *p++;
870         handle_received_pkt(ptype, channel_id, p, plen);
871         p += plen;
872     }
873
874     channel_status[A2R_HEAD_OFFSET] = channel_status[A2R_TAIL_OFFSET];
875     channel_status_updated |= A_EVENT_A2R_HEAD;
876     return true;
877 }
878
879 static bool flush_send_queue()
880 {
881     int tail = channel_status[R2A_TAIL_OFFSET];
882     int head = channel_status[R2A_HEAD_OFFSET];
883     int len = (tail - head) & 255;
884     int left = 255 - len;
885
886     int pos = 0;
887
888     while (!send_queue.empty())
889     {
890         LogicalChannel *ch = send_queue.front();
891         PacketBuffer &pb = ch->packet_queue.front();
892
893         int ptype = pb.type;
894         int plen = 3 + pb.data.size();
895
896         if (left < plen)
897             break;
898
899         send_buf[pos++] = pb.data.size();
900         send_buf[pos++] = ptype;
901         send_buf[pos++] = ch->channel_id;
902         memcpy(&send_buf[pos], &pb.data[0], pb.data.size());
903         pos += pb.data.size();
904
905         ch->packet_queue.pop_front();
906
907         send_queue.pop_front();
908
909         if (!ch->packet_queue.empty())
910             send_queue.push_back(ch);
911         else
912             remove_channel_if_not_associated_and_empty_pq(ch->channel_id);
913
914         left -= plen;
915     }
916
917     int to_write = pos;
918     if (!to_write)
919         return false;
920
921     uint8_t *p = send_buf;
922     int at_end = 256 - tail;
923     if (at_end < to_write)
924     {
925         memcpy(&ca.r2a_buffer[tail], p, at_end);
926         p += at_end;
927         to_write -= at_end;
928         tail = 0;
929     }
930
931     memcpy(&ca.r2a_buffer[tail], p, to_write);
932     tail = (tail + to_write) & 255;
933
934     channel_status[R2A_TAIL_OFFSET] = tail;
935     channel_status_updated |= A_EVENT_R2A_TAIL;
936     return true;
937 }
938
939 static void read_channel_status()
940 {
941     channel_status[A2R_TAIL_OFFSET] = ca.a2r_tail;
942     channel_status[R2A_HEAD_OFFSET] = ca.r2a_head;
943     channel_status[R2A_TAIL_OFFSET] = ca.r2a_tail;
944     channel_status[A2R_HEAD_OFFSET] = ca.a2r_head;
945     channel_status_updated = 0;
946 }
947
948 static void write_channel_status()
949 {
950     if (channel_status_updated != 0)
951     {
952         ca.r2a_tail = channel_status[R2A_TAIL_OFFSET];
953         ca.a2r_head = channel_status[A2R_HEAD_OFFSET];
954
955         pthread_mutex_lock(&mutex);
956         ca.a_events |= channel_status_updated;
957         pthread_mutex_unlock(&mutex);
958
959         channel_status_updated = 0;
960     }
961 }
962
963 static void close_all_logical_channels()
964 {
965     send_queue.clear();
966
967     auto it = channels.begin();
968     while (it != channels.end())
969     {
970         LogicalChannel &ch = *it;
971
972         if (ch.association != nullptr)
973         {
974             create_and_send_msg(ch.association, MSG_RESET, ch.stream_id, nullptr, 0);
975             remove_association(&ch);
976         }
977
978         it = channels.erase(it);
979     }
980 }
981
982 static void handle_a314_irq(uint8_t events)
983 {
984     if (events == 0)
985         return;
986
987     if (events & R_EVENT_STARTED)
988     {
989         if (!channels.empty())
990             logger_info("Received STARTED event while logical channels are open -- closing channels\n");
991
992         close_all_logical_channels();
993         a314_device_started = true;
994     }
995
996     if (!a314_device_started)
997         return;
998
999     read_channel_status();
1000
1001     bool any_rcvd = receive_from_a2r();
1002     bool any_sent = flush_send_queue();
1003
1004     if (any_rcvd || any_sent)
1005         write_channel_status();
1006 }
1007
1008 static void handle_client_connection_event(ClientConnection *cc, struct epoll_event *ev)
1009 {
1010     if (ev->events & EPOLLERR)
1011     {
1012         logger_warn("Received EPOLLERR for client connection\n");
1013         close_and_remove_connection(cc);
1014         return;
1015     }
1016
1017     if (ev->events & EPOLLIN)
1018     {
1019         while (1)
1020         {
1021             int left;
1022             uint8_t *dst;
1023
1024             if (cc->payload.empty())
1025             {
1026                 left = sizeof(MessageHeader) - cc->bytes_read;
1027                 dst = (uint8_t *)&(cc->header) + cc->bytes_read;
1028             }
1029             else
1030             {
1031                 left = cc->header.length - cc->bytes_read;
1032                 dst = &cc->payload[cc->bytes_read];
1033             }
1034
1035             ssize_t r = read(cc->fd, dst, left);
1036             if (r == -1)
1037             {
1038                 if (errno == EAGAIN || errno == EWOULDBLOCK)
1039                     break;
1040
1041                 logger_error("Read failed unexpectedly with errno = %d\n", errno);
1042                 exit(-1);
1043             }
1044
1045             if (r == 0)
1046             {
1047                 logger_info("Received End-of-File on client connection\n");
1048                 close_and_remove_connection(cc);
1049                 return;
1050             }
1051             else
1052             {
1053                 cc->bytes_read += r;
1054                 left -= r;
1055                 if (!left)
1056                 {
1057                     if (cc->payload.empty())
1058                     {
1059                         if (cc->header.length == 0)
1060                         {
1061                             logger_trace("header: length=%d, stream_id=%d, type=%d\n", cc->header.length, cc->header.stream_id, cc->header.type);
1062                             handle_received_message(cc);
1063                         }
1064                         else
1065                         {
1066                             cc->payload.resize(cc->header.length);
1067                         }
1068                     }
1069                     else
1070                     {
1071                         logger_trace("header: length=%d, stream_id=%d, type=%d\n", cc->header.length, cc->header.stream_id, cc->header.type);
1072                         handle_received_message(cc);
1073                         cc->payload.clear();
1074                     }
1075                     cc->bytes_read = 0;
1076                 }
1077             }
1078         }
1079     }
1080
1081     if (ev->events & EPOLLOUT)
1082     {
1083         while (!cc->message_queue.empty())
1084         {
1085             MessageBuffer &mb = cc->message_queue.front();
1086
1087             int left = mb.data.size() - mb.pos;
1088             uint8_t *src = &mb.data[mb.pos];
1089             ssize_t r = write(cc->fd, src, left);
1090             if (r == -1)
1091             {
1092                 if (errno == EAGAIN || errno == EWOULDBLOCK)
1093                     break;
1094                 else if (errno == ECONNRESET)
1095                 {
1096                     close_and_remove_connection(cc);
1097                     return;
1098                 }
1099                 else
1100                 {
1101                     logger_error("Write failed unexpectedly with errno = %d\n", errno);
1102                     exit(-1);
1103                 }
1104             }
1105
1106             mb.pos += r;
1107             if (r == left)
1108                 cc->message_queue.pop_front();
1109         }
1110     }
1111 }
1112
1113 static void handle_server_socket_ready()
1114 {
1115     struct sockaddr_in address;
1116     int alen = sizeof(struct sockaddr_in);
1117
1118     int fd = accept(server_socket, (struct sockaddr *)&address, (socklen_t *)&alen);
1119     if (fd < 0)
1120     {
1121         logger_error("Accept failed unexpectedly with errno = %d\n", errno);
1122         exit(-1);
1123     }
1124
1125     int status = fcntl(fd, F_SETFD, fcntl(fd, F_GETFD, 0) | FD_CLOEXEC);
1126     if (status == -1)
1127     {
1128         logger_error("Unexpectedly unable to set close-on-exec flag on client socket descriptor; errno = %d\n", errno);
1129         exit(-1);
1130     }
1131
1132     status = fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
1133     if (status == -1)
1134     {
1135         logger_error("Unexpectedly unable to set client socket to non blocking; errno = %d\n", errno);
1136         exit(-1);
1137     }
1138
1139     int flag = 1;
1140     setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(int));
1141
1142     connections.emplace_back();
1143
1144     ClientConnection &cc = connections.back();
1145     cc.fd = fd;
1146     cc.next_stream_id = 1;
1147     cc.bytes_read = 0;
1148
1149     struct epoll_event ev;
1150     ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
1151     ev.data.fd = fd;
1152     if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) != 0)
1153     {
1154         logger_error("epoll_ctl() failed unexpectedly with errno = %d\n", errno);
1155         exit(-1);
1156     }
1157 }
1158
1159 static void main_loop()
1160 {
1161     bool shutting_down = false;
1162     bool done = false;
1163
1164     while (!done)
1165     {
1166         struct epoll_event ev;
1167         int timeout = shutting_down ? 10000 : -1;
1168         int n = epoll_pwait(epfd, &ev, 1, timeout, &original_sigset);
1169         if (n == -1)
1170         {
1171             if (errno == EINTR)
1172             {
1173                 logger_info("Received SIGTERM\n");
1174
1175                 shutdown_server_socket();
1176
1177                 while (!connections.empty())
1178                     close_and_remove_connection(&connections.front());
1179
1180                 if (flush_send_queue())
1181                     write_channel_status();
1182
1183                 if (!channels.empty())
1184                     shutting_down = true;
1185                 else
1186                     done = true;
1187             }
1188             else
1189             {
1190                 logger_error("epoll_pwait failed with unexpected errno = %d\n", errno);
1191                 exit(-1);
1192             }
1193         }
1194         else if (n == 0)
1195         {
1196             if (shutting_down)
1197                 done = true;
1198             else
1199             {
1200                 logger_error("epoll_pwait returned 0 which is unexpected since no timeout was set\n");
1201                 exit(-1);
1202             }
1203         }
1204         else
1205         {
1206             if (ev.data.fd == irq_fds[1])
1207             {
1208                 uint8_t events;
1209                 if (read(irq_fds[1], &events, 1) != 1)
1210                 {
1211                     logger_error("Read from interrupt socket pair, and unexpectedly didn't return 1 byte\n");
1212                     exit(-1);
1213                 }
1214
1215                 handle_a314_irq(events);
1216             }
1217             else if (ev.data.fd == server_socket)
1218             {
1219                 logger_trace("Epoll event: server socket is ready, events = %d\n", ev.events);
1220                 handle_server_socket_ready();
1221             }
1222             else
1223             {
1224                 logger_trace("Epoll event: client socket is ready, events = %d\n", ev.events);
1225
1226                 auto it = connections.begin();
1227                 for (; it != connections.end(); it++)
1228                 {
1229                     if (it->fd == ev.data.fd)
1230                         break;
1231                 }
1232
1233                 if (it == connections.end())
1234                 {
1235                     logger_error("Got notified about an event on a client connection that supposedly isn't currently open\n");
1236                     exit(-1);
1237                 }
1238
1239                 ClientConnection *cc = &(*it);
1240                 handle_client_connection_event(cc, &ev);
1241
1242                 if (flush_send_queue())
1243                     write_channel_status();
1244             }
1245         }
1246     }
1247 }
1248
1249 static int init_driver()
1250 {
1251     if (init_server_socket() != 0)
1252         return -1;
1253
1254     int err = socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0, irq_fds);
1255     if (err != 0)
1256     {
1257         logger_error("Unable to create socket pair, errno = %d\n", errno);
1258         return -1;
1259     }
1260
1261     epfd = epoll_create1(EPOLL_CLOEXEC);
1262     if (epfd == -1)
1263         return -1;
1264
1265     struct epoll_event ev;
1266     ev.events = EPOLLIN;
1267     ev.data.fd = irq_fds[1];
1268     if (epoll_ctl(epfd, EPOLL_CTL_ADD, irq_fds[1], &ev) != 0)
1269         return -1;
1270
1271     ev.events = EPOLLIN;
1272     ev.data.fd = server_socket;
1273     if (epoll_ctl(epfd, EPOLL_CTL_ADD, server_socket, &ev) != 0)
1274         return -1;
1275
1276     return 0;
1277 }
1278
1279 static void shutdown_driver()
1280 {
1281     if (epfd != -1)
1282         close(epfd);
1283
1284     shutdown_server_socket();
1285 }
1286
1287 static void *thread_start(void *arg)
1288 {
1289     main_loop();
1290     shutdown_driver();
1291     return NULL;
1292 }
1293
1294 static void write_r_events(uint8_t events)
1295 {
1296     if (write(irq_fds[0], &events, 1) != 1)
1297         logger_error("Write to interrupt socket pair did not return 1\n");
1298 }
1299
1300 int a314_init()
1301 {
1302     load_config_file(a314_config_file.c_str());
1303
1304     int err = init_driver();
1305     if (err < 0)
1306     {
1307         shutdown_driver();
1308         return -1;
1309     }
1310
1311     err = pthread_create(&thread_id, NULL, thread_start, NULL);
1312     if (err < 0)
1313     {
1314         logger_error("pthread_create failed with err = %d\n", err);
1315         return -2;
1316     }
1317
1318     return 0;
1319 }
1320
1321 void a314_set_mem_base_size(unsigned int base, unsigned int size)
1322 {
1323     ca.mem_base = htobe32(base);
1324     ca.mem_size = htobe32(size);
1325 }
1326
1327 void a314_process_events()
1328 {
1329     if (ca.a_events & ca.a_enable)
1330     {
1331         ps_write_16(0xdff09c, 0x8008);
1332         m68k_set_irq(2);
1333     }
1334 }
1335
1336 unsigned int a314_read_memory_8(unsigned int address)
1337 {
1338     if (address >= sizeof(ca))
1339         return 0;
1340
1341     uint8_t val;
1342     if (address == offsetof(ComArea, a_events))
1343     {
1344         pthread_mutex_lock(&mutex);
1345         val = ca.a_events;
1346         ca.a_events = 0;
1347         pthread_mutex_unlock(&mutex);
1348     }
1349     else
1350     {
1351         uint8_t *p = (uint8_t *)&ca;
1352         val = p[address];
1353     }
1354
1355     return val;
1356 }
1357
1358 unsigned int a314_read_memory_16(unsigned int address)
1359 {
1360     if (address >= sizeof(ca))
1361         return 0;
1362
1363     uint16_t *p = (uint16_t *)&ca;
1364     return be16toh(p[address >> 1]);
1365 }
1366
1367 unsigned int a314_read_memory_32(unsigned int address)
1368 {
1369     if (address >= sizeof(ca))
1370         return 0;
1371
1372     uint32_t *p = (uint32_t *)&ca;
1373     return be32toh(p[address >> 2]);
1374 }
1375
1376 void a314_write_memory_8(unsigned int address, unsigned int value)
1377 {
1378     if (address >= sizeof(ca))
1379         return;
1380
1381     switch (address)
1382     {
1383         case offsetof(ComArea, a_events):
1384             // a_events is not writable.
1385             break;
1386
1387         case offsetof(ComArea, r_events):
1388             if (value != 0)
1389                 write_r_events((uint8_t)value);
1390             break;
1391
1392         default:
1393         {
1394             uint8_t *p = (uint8_t *)&ca;
1395             p[address] = (uint8_t)value;
1396             break;
1397         }
1398     }
1399 }
1400
1401 void a314_write_memory_16(unsigned int address, unsigned int value)
1402 {
1403     // Not implemented.
1404 }
1405
1406 void a314_write_memory_32(unsigned int address, unsigned int value)
1407 {
1408     // Not implemented.
1409 }
1410
1411 void a314_set_config_file(char *filename)
1412 {
1413     printf ("[A314] Set A314 config filename to %s.\n", filename);
1414     a314_config_file = std::string(filename);
1415 }