]> git.sesse.net Git - pistorm/blob - a314/a314.cc
Add A314 emulation
[pistorm] / a314 / a314.cc
1 /*
2  * Copyright 2020 Niklas Ekström
3  * Based on a314d daemon for A314.
4  */
5
6 #include <arpa/inet.h>
7
8 #include <linux/spi/spidev.h>
9 #include <linux/types.h>
10
11 #include <netinet/in.h>
12 #include <netinet/tcp.h>
13
14 #include <sys/epoll.h>
15 #include <sys/ioctl.h>
16 #include <sys/socket.h>
17 #include <sys/stat.h>
18 #include <sys/types.h>
19
20 #include <ctype.h>
21 #include <errno.h>
22 #include <fcntl.h>
23 #include <signal.h>
24 #include <stdint.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <time.h>
29 #include <unistd.h>
30 #include <pthread.h>
31
32 #include <algorithm>
33 #include <list>
34 #include <string>
35 #include <vector>
36
37 #include "a314.h"
38 #include "../m68k.h"
39
40 #define LOGGER_TRACE    1
41 #define LOGGER_DEBUG    2
42 #define LOGGER_INFO     3
43 #define LOGGER_WARN     4
44 #define LOGGER_ERROR    5
45
46 #define LOGGER_SHOW LOGGER_INFO
47
48 #define logger_trace(...) do { if (LOGGER_TRACE >= LOGGER_SHOW) fprintf(stdout, __VA_ARGS__); } while (0)
49 #define logger_debug(...) do { if (LOGGER_DEBUG >= LOGGER_SHOW) fprintf(stdout, __VA_ARGS__); } while (0)
50 #define logger_info(...) do { if (LOGGER_INFO >= LOGGER_SHOW) fprintf(stdout, __VA_ARGS__); } while (0)
51 #define logger_warn(...) do { if (LOGGER_WARN >= LOGGER_SHOW) fprintf(stdout, __VA_ARGS__); } while (0)
52 #define logger_error(...) do { if (LOGGER_ERROR >= LOGGER_SHOW) fprintf(stderr, __VA_ARGS__); } while (0)
53
54 // Events that are communicated via IRQ from Amiga to Raspberry.
55 #define R_EVENT_A2R_TAIL        1
56 #define R_EVENT_R2A_HEAD        2
57 #define R_EVENT_STARTED         4
58
59 // Events that are communicated from Raspberry to Amiga.
60 #define A_EVENT_R2A_TAIL        1
61 #define A_EVENT_A2R_HEAD        2
62
63 // Offset relative to communication area for queue pointers.
64 #define A2R_TAIL_OFFSET         0
65 #define R2A_HEAD_OFFSET         1
66 #define R2A_TAIL_OFFSET         2
67 #define A2R_HEAD_OFFSET         3
68
69 // Packets that are communicated across physical channels (A2R and R2A).
70 #define PKT_CONNECT             4
71 #define PKT_CONNECT_RESPONSE    5
72 #define PKT_DATA                6
73 #define PKT_EOS                 7
74 #define PKT_RESET               8
75
76 // Valid responses for PKT_CONNECT_RESPONSE.
77 #define CONNECT_OK              0
78 #define CONNECT_UNKNOWN_SERVICE 3
79
80 // Messages that are communicated between driver and client.
81 #define MSG_REGISTER_REQ        1
82 #define MSG_REGISTER_RES        2
83 #define MSG_DEREGISTER_REQ      3
84 #define MSG_DEREGISTER_RES      4
85 #define MSG_READ_MEM_REQ        5
86 #define MSG_READ_MEM_RES        6
87 #define MSG_WRITE_MEM_REQ       7
88 #define MSG_WRITE_MEM_RES       8
89 #define MSG_CONNECT             9
90 #define MSG_CONNECT_RESPONSE    10
91 #define MSG_DATA                11
92 #define MSG_EOS                 12
93 #define MSG_RESET               13
94
95 #define MSG_SUCCESS             1
96 #define MSG_FAIL                0
97
98 static sigset_t original_sigset;
99
100 static pthread_t thread_id;
101 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
102
103 static int server_socket = -1;
104
105 static int epfd = -1;
106 static int irq_fds[2];
107
108 extern "C" unsigned char fast_ram_array[];
109 extern "C" void write16(unsigned int address, unsigned int value);
110
111 // Register bank in 0xE90000 memory.
112 struct ComArea
113 {
114     uint8_t a_events;
115     uint8_t a_enable;
116     uint8_t r_events;
117     uint8_t r_enable; // Unused.
118
119     uint8_t a2r_tail;
120     uint8_t r2a_head;
121     uint8_t r2a_tail;
122     uint8_t a2r_head;
123
124     uint8_t a2r_buffer[256];
125     uint8_t r2a_buffer[256];
126 };
127
128 static ComArea ca;
129
130 static bool a314_device_started = false;
131
132 static uint8_t channel_status[4];
133 static uint8_t channel_status_updated = 0;
134
135 static uint8_t recv_buf[256];
136 static uint8_t send_buf[256];
137
138 struct LogicalChannel;
139 struct ClientConnection;
140
141 #pragma pack(push, 1)
142 struct MessageHeader
143 {
144     uint32_t length;
145     uint32_t stream_id;
146     uint8_t type;
147 }; //} __attribute__((packed));
148 #pragma pack(pop)
149
150 struct MessageBuffer
151 {
152     int pos;
153     std::vector<uint8_t> data;
154 };
155
156 struct RegisteredService
157 {
158     std::string name;
159     ClientConnection *cc;
160 };
161
162 struct PacketBuffer
163 {
164     int type;
165     std::vector<uint8_t> data;
166 };
167
168 struct ClientConnection
169 {
170     int fd;
171
172     int next_stream_id;
173
174     int bytes_read;
175     MessageHeader header;
176     std::vector<uint8_t> payload;
177
178     std::list<MessageBuffer> message_queue;
179
180     std::list<LogicalChannel*> associations;
181 };
182
183 struct LogicalChannel
184 {
185     int channel_id;
186
187     ClientConnection *association;
188     int stream_id;
189
190     bool got_eos_from_ami;
191     bool got_eos_from_client;
192
193     std::list<PacketBuffer> packet_queue;
194 };
195
196 static void remove_association(LogicalChannel *ch);
197 static void clear_packet_queue(LogicalChannel *ch);
198 static void create_and_enqueue_packet(LogicalChannel *ch, uint8_t type, uint8_t *data, uint8_t length);
199
200 static std::list<ClientConnection> connections;
201 static std::list<RegisteredService> services;
202 static std::list<LogicalChannel> channels;
203 static std::list<LogicalChannel*> send_queue;
204
205 struct OnDemandStart
206 {
207     std::string service_name;
208     std::string program;
209     std::vector<std::string> arguments;
210 };
211
212 std::vector<OnDemandStart> on_demand_services;
213
214 static void load_config_file(const char *filename)
215 {
216     FILE *f = fopen(filename, "rt");
217     if (f == nullptr)
218         return;
219
220     char line[256];
221     std::vector<char *> parts;
222
223     while (fgets(line, 256, f) != nullptr)
224     {
225         char org_line[256];
226         strcpy(org_line, line);
227
228         bool in_quotes = false;
229
230         int start = 0;
231         for (int i = 0; i < 256; i++)
232         {
233             if (line[i] == 0)
234             {
235                 if (start < i)
236                     parts.push_back(&line[start]);
237                 break;
238             }
239             else if (line[i] == '"')
240             {
241                 line[i] = 0;
242                 if (in_quotes)
243                     parts.push_back(&line[start]);
244                 in_quotes = !in_quotes;
245                 start = i + 1;
246             }
247             else if (isspace(line[i]) && !in_quotes)
248             {
249                 line[i] = 0;
250                 if (start < i)
251                     parts.push_back(&line[start]);
252                 start = i + 1;
253             }
254         }
255
256         if (parts.size() >= 2)
257         {
258             on_demand_services.emplace_back();
259             auto &e = on_demand_services.back();
260             e.service_name = parts[0];
261             e.program = parts[1];
262             for (int i = 1; i < parts.size(); i++)
263                 e.arguments.push_back(std::string(parts[i]));
264         }
265         else if (parts.size() != 0)
266             logger_warn("Invalid number of columns in configuration file line: %s\n", org_line);
267
268         parts.clear();
269     }
270
271     fclose(f);
272
273     if (on_demand_services.empty())
274         logger_warn("No registered services\n");
275 }
276
277 static int init_server_socket()
278 {
279     server_socket = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, 0);
280     if (server_socket == -1)
281     {
282         logger_error("Failed to create server socket\n");
283         return -1;
284     }
285
286     struct sockaddr_in address;
287     address.sin_family = AF_INET;
288     address.sin_addr.s_addr = INADDR_ANY;
289     address.sin_port = htons(7110);
290
291     int res = bind(server_socket, (struct sockaddr *)&address, sizeof(address));
292     if (res < 0)
293     {
294         logger_error("Bind to localhost:7110 failed\n");
295         return -1;
296     }
297
298     listen(server_socket, 16);
299
300     return 0;
301 }
302
303 static void shutdown_server_socket()
304 {
305     if (server_socket != -1)
306         close(server_socket);
307     server_socket = -1;
308 }
309
310 void create_and_send_msg(ClientConnection *cc, int type, int stream_id, uint8_t *data, int length)
311 {
312     MessageBuffer mb;
313     mb.pos = 0;
314     mb.data.resize(sizeof(MessageHeader) + length);
315
316     MessageHeader *mh = (MessageHeader *)&mb.data[0];
317     mh->length = length;
318     mh->stream_id = stream_id;
319     mh->type = type;
320     if (length && data)
321         memcpy(&mb.data[sizeof(MessageHeader)], data, length);
322
323     if (!cc->message_queue.empty())
324     {
325         cc->message_queue.push_back(std::move(mb));
326         return;
327     }
328
329     while (1)
330     {
331         int left = mb.data.size() - mb.pos;
332         uint8_t *src = &mb.data[mb.pos];
333         ssize_t r = write(cc->fd, src, left);
334         if (r == -1)
335         {
336             if (errno == EAGAIN || errno == EWOULDBLOCK)
337             {
338                 cc->message_queue.push_back(std::move(mb));
339                 return;
340             }
341             else if (errno == ECONNRESET)
342             {
343                 // Do not close connection here; it will get done at some other place.
344                 return;
345             }
346             else
347             {
348                 logger_error("Write failed unexpectedly with errno = %d\n", errno);
349                 exit(-1);
350             }
351         }
352
353         mb.pos += r;
354         if (r == left)
355         {
356             return;
357         }
358     }
359 }
360
361 static void handle_msg_register_req(ClientConnection *cc)
362 {
363     uint8_t result = MSG_FAIL;
364
365     std::string service_name((char *)&cc->payload[0], cc->payload.size());
366
367     auto it = services.begin();
368     for (; it != services.end(); it++)
369         if (it->name == service_name)
370             break;
371
372     if (it == services.end())
373     {
374         services.emplace_back();
375
376         RegisteredService &srv = services.back();
377         srv.cc = cc;
378         srv.name = std::move(service_name);
379
380         result = MSG_SUCCESS;
381     }
382
383     create_and_send_msg(cc, MSG_REGISTER_RES, 0, &result, 1);
384 }
385
386 static void handle_msg_deregister_req(ClientConnection *cc)
387 {
388     uint8_t result = MSG_FAIL;
389
390     std::string service_name((char *)&cc->payload[0], cc->payload.size());
391
392     for (auto it = services.begin(); it != services.end(); it++)
393     {
394         if (it->name == service_name && it->cc == cc)
395         {
396             services.erase(it);
397             result = MSG_SUCCESS;
398             break;
399         }
400     }
401
402     create_and_send_msg(cc, MSG_DEREGISTER_RES, 0, &result, 1);
403 }
404
405 static void handle_msg_read_mem_req(ClientConnection *cc)
406 {
407     uint32_t address = *(uint32_t *)&(cc->payload[0]);
408     uint32_t length = *(uint32_t *)&(cc->payload[4]);
409
410     create_and_send_msg(cc, MSG_READ_MEM_RES, 0, &fast_ram_array[address], length);
411 }
412
413 static void handle_msg_write_mem_req(ClientConnection *cc)
414 {
415     uint32_t address = *(uint32_t *)&(cc->payload[0]);
416     uint32_t length = cc->payload.size() - 4;
417
418     memcpy(&fast_ram_array[address], &(cc->payload[4]), length);
419
420     create_and_send_msg(cc, MSG_WRITE_MEM_RES, 0, nullptr, 0);
421 }
422
423 static LogicalChannel *get_associated_channel_by_stream_id(ClientConnection *cc, int stream_id)
424 {
425     for (auto ch : cc->associations)
426     {
427         if (ch->stream_id == stream_id)
428             return ch;
429     }
430     return nullptr;
431 }
432
433 static void handle_msg_connect(ClientConnection *cc)
434 {
435     // We currently don't handle that a client tries to connect to a service on the Amiga.
436 }
437
438 static void handle_msg_connect_response(ClientConnection *cc)
439 {
440     LogicalChannel *ch = get_associated_channel_by_stream_id(cc, cc->header.stream_id);
441     if (!ch)
442         return;
443
444     create_and_enqueue_packet(ch, PKT_CONNECT_RESPONSE, &cc->payload[0], cc->payload.size());
445
446     if (cc->payload[0] != CONNECT_OK)
447         remove_association(ch);
448 }
449
450 static void handle_msg_data(ClientConnection *cc)
451 {
452     LogicalChannel *ch = get_associated_channel_by_stream_id(cc, cc->header.stream_id);
453     if (!ch)
454         return;
455
456     create_and_enqueue_packet(ch, PKT_DATA, &cc->payload[0], cc->header.length);
457 }
458
459 static void handle_msg_eos(ClientConnection *cc)
460 {
461     LogicalChannel *ch = get_associated_channel_by_stream_id(cc, cc->header.stream_id);
462     if (!ch || ch->got_eos_from_client)
463         return;
464
465     ch->got_eos_from_client = true;
466
467     create_and_enqueue_packet(ch, PKT_EOS, nullptr, 0);
468
469     if (ch->got_eos_from_ami)
470         remove_association(ch);
471 }
472
473 static void handle_msg_reset(ClientConnection *cc)
474 {
475     LogicalChannel *ch = get_associated_channel_by_stream_id(cc, cc->header.stream_id);
476     if (!ch)
477         return;
478
479     remove_association(ch);
480
481     clear_packet_queue(ch);
482     create_and_enqueue_packet(ch, PKT_RESET, nullptr, 0);
483 }
484
485 static void handle_received_message(ClientConnection *cc)
486 {
487     switch (cc->header.type)
488     {
489     case MSG_REGISTER_REQ:
490         handle_msg_register_req(cc);
491         break;
492     case MSG_DEREGISTER_REQ:
493         handle_msg_deregister_req(cc);
494         break;
495     case MSG_READ_MEM_REQ:
496         handle_msg_read_mem_req(cc);
497         break;
498     case MSG_WRITE_MEM_REQ:
499         handle_msg_write_mem_req(cc);
500         break;
501     case MSG_CONNECT:
502         handle_msg_connect(cc);
503         break;
504     case MSG_CONNECT_RESPONSE:
505         handle_msg_connect_response(cc);
506         break;
507     case MSG_DATA:
508         handle_msg_data(cc);
509         break;
510     case MSG_EOS:
511         handle_msg_eos(cc);
512         break;
513     case MSG_RESET:
514         handle_msg_reset(cc);
515         break;
516     default:
517         // This is bad, probably should disconnect from client.
518         logger_warn("Received a message of unknown type from client\n");
519         break;
520     }
521 }
522
523 static void close_and_remove_connection(ClientConnection *cc)
524 {
525     shutdown(cc->fd, SHUT_WR);
526     close(cc->fd);
527
528     {
529         auto it = services.begin();
530         while (it != services.end())
531         {
532             if (it->cc == cc)
533                 it = services.erase(it);
534             else
535                 it++;
536         }
537     }
538
539     {
540         auto it = cc->associations.begin();
541         while (it != cc->associations.end())
542         {
543             auto ch = *it;
544
545             clear_packet_queue(ch);
546             create_and_enqueue_packet(ch, PKT_RESET, nullptr, 0);
547
548             ch->association = nullptr;
549             ch->stream_id = 0;
550
551             it = cc->associations.erase(it);
552         }
553     }
554
555     for (auto it = connections.begin(); it != connections.end(); it++)
556     {
557         if (&(*it) == cc)
558         {
559             connections.erase(it);
560             break;
561         }
562     }
563 }
564
565 static void remove_association(LogicalChannel *ch)
566 {
567     auto &ass = ch->association->associations;
568     ass.erase(std::find(ass.begin(), ass.end(), ch));
569
570     ch->association = nullptr;
571     ch->stream_id = 0;
572 }
573
574 static void clear_packet_queue(LogicalChannel *ch)
575 {
576     if (!ch->packet_queue.empty())
577     {
578         ch->packet_queue.clear();
579         send_queue.erase(std::find(send_queue.begin(), send_queue.end(), ch));
580     }
581 }
582
583 static void create_and_enqueue_packet(LogicalChannel *ch, uint8_t type, uint8_t *data, uint8_t length)
584 {
585     if (ch->packet_queue.empty())
586         send_queue.push_back(ch);
587
588     ch->packet_queue.emplace_back();
589
590     PacketBuffer &pb = ch->packet_queue.back();
591     pb.type = type;
592     pb.data.resize(length);
593     if (data && length)
594         memcpy(&pb.data[0], data, length);
595 }
596
597 static void handle_pkt_connect(int channel_id, uint8_t *data, int plen)
598 {
599     for (auto &ch : channels)
600     {
601         if (ch.channel_id == channel_id)
602         {
603             // We should handle this in some constructive way.
604             // This signals that should reset all logical channels.
605             logger_error("Received a CONNECT packet on a channel that was believed to be previously allocated\n");
606             exit(-1);
607         }
608     }
609
610     channels.emplace_back();
611
612     auto &ch = channels.back();
613
614     ch.channel_id = channel_id;
615     ch.association = nullptr;
616     ch.stream_id = 0;
617     ch.got_eos_from_ami = false;
618     ch.got_eos_from_client = false;
619
620     std::string service_name((char *)data, plen);
621
622     for (auto &srv : services)
623     {
624         if (srv.name == service_name)
625         {
626             ClientConnection *cc = srv.cc;
627
628             ch.association = cc;
629             ch.stream_id = cc->next_stream_id;
630
631             cc->next_stream_id += 2;
632             cc->associations.push_back(&ch);
633
634             create_and_send_msg(ch.association, MSG_CONNECT, ch.stream_id, data, plen);
635             return;
636         }
637     }
638
639     for (auto &on_demand : on_demand_services)
640     {
641         if (on_demand.service_name == service_name)
642         {
643             int fds[2];
644             int status = socketpair(AF_UNIX, SOCK_STREAM, 0, fds);
645             if (status != 0)
646             {
647                 logger_error("Unexpectedly not able to create socket pair.\n");
648                 exit(-1);
649             }
650
651             pid_t child = fork();
652             if (child == -1)
653             {
654                 logger_error("Unexpectedly was not able to fork.\n");
655                 exit(-1);
656             }
657             else if (child == 0)
658             {
659                 close(fds[0]);
660                 int fd = fds[1];
661
662                 // FIXE: The user should be configurable.
663                 setgid(1000);
664                 setuid(1000);
665                 putenv("HOME=/home/pi");
666
667                 std::vector<std::string> args(on_demand.arguments);
668                 args.push_back("-ondemand");
669                 args.push_back(std::to_string(fd));
670                 std::vector<const char *> args_arr;
671                 for (auto &arg : args)
672                     args_arr.push_back(arg.c_str());
673                 args_arr.push_back(nullptr);
674
675                 execvp(on_demand.program.c_str(), (char* const*) &args_arr[0]);
676             }
677             else
678             {
679                 close(fds[1]);
680                 int fd = fds[0];
681
682                 int status = fcntl(fd, F_SETFD, fcntl(fd, F_GETFD, 0) | FD_CLOEXEC);
683                 if (status == -1)
684                 {
685                     logger_error("Unexpectedly unable to set close-on-exec flag on client socket descriptor; errno = %d\n", errno);
686                     exit(-1);
687                 }
688
689                 status = fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
690                 if (status == -1)
691                 {
692                     logger_error("Unexpectedly unable to set client socket to non blocking; errno = %d\n", errno);
693                     exit(-1);
694                 }
695
696                 int flag = 1;
697                 setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(int));
698
699                 connections.emplace_back();
700
701                 ClientConnection &cc = connections.back();
702                 cc.fd = fd;
703                 cc.next_stream_id = 1;
704                 cc.bytes_read = 0;
705
706                 struct epoll_event ev;
707                 ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
708                 ev.data.fd = fd;
709                 if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) != 0)
710                 {
711                     logger_error("epoll_ctl() failed unexpectedly with errno = %d\n", errno);
712                     exit(-1);
713                 }
714
715                 services.emplace_back();
716
717                 RegisteredService &srv = services.back();
718                 srv.cc = &cc;
719                 srv.name = std::move(service_name);
720
721                 ch.association = &cc;
722                 ch.stream_id = cc.next_stream_id;
723
724                 cc.next_stream_id += 2;
725                 cc.associations.push_back(&ch);
726
727                 create_and_send_msg(ch.association, MSG_CONNECT, ch.stream_id, data, plen);
728                 return;
729             }
730         }
731     }
732
733     uint8_t response = CONNECT_UNKNOWN_SERVICE;
734     create_and_enqueue_packet(&ch, PKT_CONNECT_RESPONSE, &response, 1);
735 }
736
737 static void handle_pkt_data(int channel_id, uint8_t *data, int plen)
738 {
739     for (auto &ch : channels)
740     {
741         if (ch.channel_id == channel_id)
742         {
743             if (ch.association != nullptr && !ch.got_eos_from_ami)
744                 create_and_send_msg(ch.association, MSG_DATA, ch.stream_id, data, plen);
745
746             break;
747         }
748     }
749 }
750
751 static void handle_pkt_eos(int channel_id)
752 {
753     for (auto &ch : channels)
754     {
755         if (ch.channel_id == channel_id)
756         {
757             if (ch.association != nullptr && !ch.got_eos_from_ami)
758             {
759                 ch.got_eos_from_ami = true;
760
761                 create_and_send_msg(ch.association, MSG_EOS, ch.stream_id, nullptr, 0);
762
763                 if (ch.got_eos_from_client)
764                     remove_association(&ch);
765             }
766             break;
767         }
768     }
769 }
770
771 static void handle_pkt_reset(int channel_id)
772 {
773     for (auto &ch : channels)
774     {
775         if (ch.channel_id == channel_id)
776         {
777             clear_packet_queue(&ch);
778
779             if (ch.association != nullptr)
780             {
781                 create_and_send_msg(ch.association, MSG_RESET, ch.stream_id, nullptr, 0);
782                 remove_association(&ch);
783             }
784
785             break;
786         }
787     }
788 }
789
790 static void remove_channel_if_not_associated_and_empty_pq(int channel_id)
791 {
792     for (auto it = channels.begin(); it != channels.end(); it++)
793     {
794         if (it->channel_id == channel_id)
795         {
796             if (it->association == nullptr && it->packet_queue.empty())
797                 channels.erase(it);
798
799             break;
800         }
801     }
802 }
803
804 static void handle_received_pkt(int ptype, int channel_id, uint8_t *data, int plen)
805 {
806     if (ptype == PKT_CONNECT)
807         handle_pkt_connect(channel_id, data, plen);
808     else if (ptype == PKT_DATA)
809         handle_pkt_data(channel_id, data, plen);
810     else if (ptype == PKT_EOS)
811         handle_pkt_eos(channel_id);
812     else if (ptype == PKT_RESET)
813         handle_pkt_reset(channel_id);
814
815     remove_channel_if_not_associated_and_empty_pq(channel_id);
816 }
817
818 static bool receive_from_a2r()
819 {
820     int head = channel_status[A2R_HEAD_OFFSET];
821     int tail = channel_status[A2R_TAIL_OFFSET];
822     int len = (tail - head) & 255;
823     if (len == 0)
824         return false;
825
826     if (head < tail)
827     {
828         memcpy(recv_buf, &ca.a2r_buffer[head], len);
829     }
830     else
831     {
832         memcpy(recv_buf, &ca.a2r_buffer[head], 256 - head);
833
834         if (tail != 0)
835         {
836             memcpy(&recv_buf[len - tail], &ca.a2r_buffer[0], tail);
837         }
838     }
839
840     uint8_t *p = recv_buf;
841     while (p < recv_buf + len)
842     {
843         uint8_t plen = *p++;
844         uint8_t ptype = *p++;
845         uint8_t channel_id = *p++;
846         handle_received_pkt(ptype, channel_id, p, plen);
847         p += plen;
848     }
849
850     channel_status[A2R_HEAD_OFFSET] = channel_status[A2R_TAIL_OFFSET];
851     channel_status_updated |= A_EVENT_A2R_HEAD;
852     return true;
853 }
854
855 static bool flush_send_queue()
856 {
857     int tail = channel_status[R2A_TAIL_OFFSET];
858     int head = channel_status[R2A_HEAD_OFFSET];
859     int len = (tail - head) & 255;
860     int left = 255 - len;
861
862     int pos = 0;
863
864     while (!send_queue.empty())
865     {
866         LogicalChannel *ch = send_queue.front();
867         PacketBuffer &pb = ch->packet_queue.front();
868
869         int ptype = pb.type;
870         int plen = 3 + pb.data.size();
871
872         if (left < plen)
873             break;
874
875         send_buf[pos++] = pb.data.size();
876         send_buf[pos++] = ptype;
877         send_buf[pos++] = ch->channel_id;
878         memcpy(&send_buf[pos], &pb.data[0], pb.data.size());
879         pos += pb.data.size();
880
881         ch->packet_queue.pop_front();
882
883         send_queue.pop_front();
884
885         if (!ch->packet_queue.empty())
886             send_queue.push_back(ch);
887         else
888             remove_channel_if_not_associated_and_empty_pq(ch->channel_id);
889
890         left -= plen;
891     }
892
893     int to_write = pos;
894     if (!to_write)
895         return false;
896
897     uint8_t *p = send_buf;
898     int at_end = 256 - tail;
899     if (at_end < to_write)
900     {
901         memcpy(&ca.r2a_buffer[tail], p, at_end);
902         p += at_end;
903         to_write -= at_end;
904         tail = 0;
905     }
906
907     memcpy(&ca.r2a_buffer[tail], p, to_write);
908     tail = (tail + to_write) & 255;
909
910     channel_status[R2A_TAIL_OFFSET] = tail;
911     channel_status_updated |= A_EVENT_R2A_TAIL;
912     return true;
913 }
914
915 static void read_channel_status()
916 {
917     channel_status[A2R_TAIL_OFFSET] = ca.a2r_tail;
918     channel_status[R2A_HEAD_OFFSET] = ca.r2a_head;
919     channel_status[R2A_TAIL_OFFSET] = ca.r2a_tail;
920     channel_status[A2R_HEAD_OFFSET] = ca.a2r_head;
921     channel_status_updated = 0;
922 }
923
924 static void write_channel_status()
925 {
926     if (channel_status_updated != 0)
927     {
928         ca.r2a_tail = channel_status[R2A_TAIL_OFFSET];
929         ca.a2r_head = channel_status[A2R_HEAD_OFFSET];
930
931         pthread_mutex_lock(&mutex);
932         ca.a_events |= channel_status_updated;
933         pthread_mutex_unlock(&mutex);
934
935         channel_status_updated = 0;
936     }
937 }
938
939 static void close_all_logical_channels()
940 {
941     send_queue.clear();
942
943     auto it = channels.begin();
944     while (it != channels.end())
945     {
946         LogicalChannel &ch = *it;
947
948         if (ch.association != nullptr)
949         {
950             create_and_send_msg(ch.association, MSG_RESET, ch.stream_id, nullptr, 0);
951             remove_association(&ch);
952         }
953
954         it = channels.erase(it);
955     }
956 }
957
958 static void handle_a314_irq(uint8_t events)
959 {
960     if (events == 0)
961         return;
962
963     if (events & R_EVENT_STARTED)
964     {
965         if (!channels.empty())
966             logger_info("Received STARTED event while logical channels are open -- closing channels\n");
967
968         close_all_logical_channels();
969         a314_device_started = true;
970     }
971
972     if (!a314_device_started)
973         return;
974
975     read_channel_status();
976
977     bool any_rcvd = receive_from_a2r();
978     bool any_sent = flush_send_queue();
979
980     if (any_rcvd || any_sent)
981         write_channel_status();
982 }
983
984 static void handle_client_connection_event(ClientConnection *cc, struct epoll_event *ev)
985 {
986     if (ev->events & EPOLLERR)
987     {
988         logger_warn("Received EPOLLERR for client connection\n");
989         close_and_remove_connection(cc);
990         return;
991     }
992
993     if (ev->events & EPOLLIN)
994     {
995         while (1)
996         {
997             int left;
998             uint8_t *dst;
999
1000             if (cc->payload.empty())
1001             {
1002                 left = sizeof(MessageHeader) - cc->bytes_read;
1003                 dst = (uint8_t *)&(cc->header) + cc->bytes_read;
1004             }
1005             else
1006             {
1007                 left = cc->header.length - cc->bytes_read;
1008                 dst = &cc->payload[cc->bytes_read];
1009             }
1010
1011             ssize_t r = read(cc->fd, dst, left);
1012             if (r == -1)
1013             {
1014                 if (errno == EAGAIN || errno == EWOULDBLOCK)
1015                     break;
1016
1017                 logger_error("Read failed unexpectedly with errno = %d\n", errno);
1018                 exit(-1);
1019             }
1020
1021             if (r == 0)
1022             {
1023                 logger_info("Received End-of-File on client connection\n");
1024                 close_and_remove_connection(cc);
1025                 return;
1026             }
1027             else
1028             {
1029                 cc->bytes_read += r;
1030                 left -= r;
1031                 if (!left)
1032                 {
1033                     if (cc->payload.empty())
1034                     {
1035                         if (cc->header.length == 0)
1036                         {
1037                             logger_trace("header: length=%d, stream_id=%d, type=%d\n", cc->header.length, cc->header.stream_id, cc->header.type);
1038                             handle_received_message(cc);
1039                         }
1040                         else
1041                         {
1042                             cc->payload.resize(cc->header.length);
1043                         }
1044                     }
1045                     else
1046                     {
1047                         logger_trace("header: length=%d, stream_id=%d, type=%d\n", cc->header.length, cc->header.stream_id, cc->header.type);
1048                         handle_received_message(cc);
1049                         cc->payload.clear();
1050                     }
1051                     cc->bytes_read = 0;
1052                 }
1053             }
1054         }
1055     }
1056
1057     if (ev->events & EPOLLOUT)
1058     {
1059         while (!cc->message_queue.empty())
1060         {
1061             MessageBuffer &mb = cc->message_queue.front();
1062
1063             int left = mb.data.size() - mb.pos;
1064             uint8_t *src = &mb.data[mb.pos];
1065             ssize_t r = write(cc->fd, src, left);
1066             if (r == -1)
1067             {
1068                 if (errno == EAGAIN || errno == EWOULDBLOCK)
1069                     break;
1070                 else if (errno == ECONNRESET)
1071                 {
1072                     close_and_remove_connection(cc);
1073                     return;
1074                 }
1075                 else
1076                 {
1077                     logger_error("Write failed unexpectedly with errno = %d\n", errno);
1078                     exit(-1);
1079                 }
1080             }
1081
1082             mb.pos += r;
1083             if (r == left)
1084                 cc->message_queue.pop_front();
1085         }
1086     }
1087 }
1088
1089 static void handle_server_socket_ready()
1090 {
1091     struct sockaddr_in address;
1092     int alen = sizeof(struct sockaddr_in);
1093
1094     int fd = accept(server_socket, (struct sockaddr *)&address, (socklen_t *)&alen);
1095     if (fd < 0)
1096     {
1097         logger_error("Accept failed unexpectedly with errno = %d\n", errno);
1098         exit(-1);
1099     }
1100
1101     int status = fcntl(fd, F_SETFD, fcntl(fd, F_GETFD, 0) | FD_CLOEXEC);
1102     if (status == -1)
1103     {
1104         logger_error("Unexpectedly unable to set close-on-exec flag on client socket descriptor; errno = %d\n", errno);
1105         exit(-1);
1106     }
1107
1108     status = fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
1109     if (status == -1)
1110     {
1111         logger_error("Unexpectedly unable to set client socket to non blocking; errno = %d\n", errno);
1112         exit(-1);
1113     }
1114
1115     int flag = 1;
1116     setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(int));
1117
1118     connections.emplace_back();
1119
1120     ClientConnection &cc = connections.back();
1121     cc.fd = fd;
1122     cc.next_stream_id = 1;
1123     cc.bytes_read = 0;
1124
1125     struct epoll_event ev;
1126     ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
1127     ev.data.fd = fd;
1128     if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) != 0)
1129     {
1130         logger_error("epoll_ctl() failed unexpectedly with errno = %d\n", errno);
1131         exit(-1);
1132     }
1133 }
1134
1135 static void main_loop()
1136 {
1137     bool shutting_down = false;
1138     bool done = false;
1139
1140     while (!done)
1141     {
1142         struct epoll_event ev;
1143         int timeout = shutting_down ? 10000 : -1;
1144         int n = epoll_pwait(epfd, &ev, 1, timeout, &original_sigset);
1145         if (n == -1)
1146         {
1147             if (errno == EINTR)
1148             {
1149                 logger_info("Received SIGTERM\n");
1150
1151                 shutdown_server_socket();
1152
1153                 while (!connections.empty())
1154                     close_and_remove_connection(&connections.front());
1155
1156                 if (flush_send_queue())
1157                     write_channel_status();
1158
1159                 if (!channels.empty())
1160                     shutting_down = true;
1161                 else
1162                     done = true;
1163             }
1164             else
1165             {
1166                 logger_error("epoll_pwait failed with unexpected errno = %d\n", errno);
1167                 exit(-1);
1168             }
1169         }
1170         else if (n == 0)
1171         {
1172             if (shutting_down)
1173                 done = true;
1174             else
1175             {
1176                 logger_error("epoll_pwait returned 0 which is unexpected since no timeout was set\n");
1177                 exit(-1);
1178             }
1179         }
1180         else
1181         {
1182             if (ev.data.fd == irq_fds[1])
1183             {
1184                 uint8_t events;
1185                 if (read(irq_fds[1], &events, 1) != 1)
1186                 {
1187                     logger_error("Read from interrupt socket pair, and unexpectedly didn't return 1 byte\n");
1188                     exit(-1);
1189                 }
1190
1191                 handle_a314_irq(events);
1192             }
1193             else if (ev.data.fd == server_socket)
1194             {
1195                 logger_trace("Epoll event: server socket is ready, events = %d\n", ev.events);
1196                 handle_server_socket_ready();
1197             }
1198             else
1199             {
1200                 logger_trace("Epoll event: client socket is ready, events = %d\n", ev.events);
1201
1202                 auto it = connections.begin();
1203                 for (; it != connections.end(); it++)
1204                 {
1205                     if (it->fd == ev.data.fd)
1206                         break;
1207                 }
1208
1209                 if (it == connections.end())
1210                 {
1211                     logger_error("Got notified about an event on a client connection that supposedly isn't currently open\n");
1212                     exit(-1);
1213                 }
1214
1215                 ClientConnection *cc = &(*it);
1216                 handle_client_connection_event(cc, &ev);
1217
1218                 if (flush_send_queue())
1219                     write_channel_status();
1220             }
1221         }
1222     }
1223 }
1224
1225 static void sigterm_handler(int signo)
1226 {
1227 }
1228
1229 static void init_sigterm()
1230 {
1231     /*
1232     sigset_t ss;
1233     sigemptyset(&ss);
1234     sigaddset(&ss, SIGTERM);
1235     sigprocmask(SIG_BLOCK, &ss, &original_sigset);
1236
1237     struct sigaction sa;
1238     sa.sa_handler = sigterm_handler;
1239     sigemptyset(&sa.sa_mask);
1240     sa.sa_flags = 0;
1241     sigaction(SIGTERM, &sa, NULL);
1242     */
1243 }
1244
1245 static int init_driver()
1246 {
1247     init_sigterm();
1248
1249     if (init_server_socket() != 0)
1250         return -1;
1251
1252     int err = socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0, irq_fds);
1253     if (err != 0)
1254     {
1255         logger_error("Unable to create socket pair, errno = %d\n", errno);
1256         return -1;
1257     }
1258
1259     epfd = epoll_create1(EPOLL_CLOEXEC);
1260     if (epfd == -1)
1261         return -1;
1262
1263     struct epoll_event ev;
1264     ev.events = EPOLLIN;
1265     ev.data.fd = irq_fds[1];
1266     if (epoll_ctl(epfd, EPOLL_CTL_ADD, irq_fds[1], &ev) != 0)
1267         return -1;
1268
1269     ev.events = EPOLLIN;
1270     ev.data.fd = server_socket;
1271     if (epoll_ctl(epfd, EPOLL_CTL_ADD, server_socket, &ev) != 0)
1272         return -1;
1273
1274     return 0;
1275 }
1276
1277 static void shutdown_driver()
1278 {
1279     if (epfd != -1)
1280         close(epfd);
1281
1282     shutdown_server_socket();
1283 }
1284
1285 static void *thread_start(void *arg)
1286 {
1287     main_loop();
1288     shutdown_driver();
1289     return NULL;
1290 }
1291
1292 static void write_r_events(uint8_t events)
1293 {
1294     if (write(irq_fds[0], &events, 1) != 1)
1295         logger_error("Write to interrupt socket pair did not return 1\n");
1296 }
1297
1298 #ifdef __cplusplus
1299 extern "C" {
1300 #endif
1301
1302 int a314_init()
1303 {
1304     std::string conf_filename("/etc/opt/a314/a314d.conf");
1305
1306     load_config_file(conf_filename.c_str());
1307
1308     int err = init_driver();
1309     if (err < 0)
1310     {
1311         shutdown_driver();
1312         return -1;
1313     }
1314
1315     err = pthread_create(&thread_id, NULL, thread_start, NULL);
1316     if (err < 0)
1317     {
1318         logger_error("pthread_create failed with err = %d\n", err);
1319         return -2;
1320     }
1321
1322     return 0;
1323 }
1324
1325 void a314_process_events()
1326 {
1327     if (ca.a_events & ca.a_enable)
1328     {
1329         write16(0xdff09c, 0x8008);
1330         m68k_set_irq(2);
1331     }
1332 }
1333
1334 unsigned int a314_read_memory_8(unsigned int address)
1335 {
1336     if (address >= sizeof(ca))
1337         return 0;
1338
1339     uint8_t val;
1340     if (address == offsetof(ComArea, a_events))
1341     {
1342         pthread_mutex_lock(&mutex);
1343         val = ca.a_events;
1344         ca.a_events = 0;
1345         pthread_mutex_unlock(&mutex);
1346     }
1347     else
1348     {
1349         uint8_t *p = (uint8_t *)&ca;
1350         val = p[address];
1351     }
1352
1353     return val;
1354 }
1355
1356 unsigned int a314_read_memory_16(unsigned int address)
1357 {
1358     // Not implemented.
1359     return 0;
1360 }
1361
1362 unsigned int a314_read_memory_32(unsigned int address)
1363 {
1364     // Not implemented.
1365     return 0;
1366 }
1367
1368 void a314_write_memory_8(unsigned int address, unsigned int value)
1369 {
1370     if (address >= sizeof(ca))
1371         return;
1372
1373     switch (address)
1374     {
1375         case offsetof(ComArea, a_events):
1376             // a_events is not writable.
1377             break;
1378
1379         case offsetof(ComArea, r_events):
1380             if (value != 0)
1381                 write_r_events((uint8_t)value);
1382             break;
1383
1384         default:
1385         {
1386             uint8_t *p = (uint8_t *)&ca;
1387             p[address] = (uint8_t)value;
1388             break;
1389         }
1390     }
1391 }
1392
1393 void a314_write_memory_16(unsigned int address, unsigned int value)
1394 {
1395     // Not implemented.
1396 }
1397
1398 void a314_write_memory_32(unsigned int address, unsigned int value)
1399 {
1400     // Not implemented.
1401 }
1402
1403 #ifdef __cplusplus
1404 }
1405 #endif