]> git.sesse.net Git - greproxy/blob - rsdecoder.cpp
Add FEC.
[greproxy] / rsdecoder.cpp
1 #include <stdio.h>
2 #include <string.h>
3 #include <arpa/inet.h>
4
5 #include <map>
6 #include "rsdecoder.h"
7 #include "rs_parm.h"
8
9 extern "C" {
10 #include <fec.h>
11 }
12
13 #define RS_GROUP_HISTORY 3
14
15 using namespace std;
16
17 RSDecoder::RSDecoder(Sender *sender)
18         : sender(sender) {}
19
20 void RSDecoder::send_packet(uint16_t proto, const std::string &data, int incoming_seq)
21 {
22         int rs_group;
23         if (proto == 0xffff) {
24                 // RS packet
25                 rs_group = (incoming_seq + RS_PAYLOAD_SIZE - 1) / RS_PAYLOAD_SIZE;
26         } else {
27                 // Regular packet
28                 rs_group = incoming_seq / RS_PAYLOAD_SIZE;
29         }
30
31         if (rs_groups.size() >= RS_GROUP_HISTORY &&
32             rs_group < rs_groups.begin()->first) {
33                 // Older than the oldest group.
34                 return;
35         }
36
37         auto group_it = rs_groups.find(rs_group);
38         if (group_it == rs_groups.end()) {
39                 RSGroup group;
40                 group.done = false;
41                 group_it = rs_groups.insert(make_pair(rs_group, group)).first;
42         }
43
44         RSGroup &group = group_it->second;
45         if (group.done) {
46                 // This RS group was already sent.
47                 return;
48         }
49         if (group.packets.count(incoming_seq)) {
50                 // Already seen this packet.
51                 return;
52         }
53
54         if (proto != 0xffff) {
55                 sender->send_packet(proto, data, incoming_seq);
56         }
57
58         GREPacket packet;
59         packet.seq = incoming_seq;
60         packet.proto = proto;
61         packet.data = data;
62         // Don't care about ts.
63
64         group.packets.insert(make_pair(incoming_seq, packet));
65         if (group.packets.size() >= RS_PAYLOAD_SIZE) {
66                 // Enough to reconstruct all missing packets.
67
68                 // Reconstruction always happens on the longest packet;
69                 // we will truncate them later.
70                 int max_length = 0;
71                 int num_regular = 0;
72                 for (const auto &it : group.packets) {
73                         if (it.first >= rs_group * RS_PAYLOAD_SIZE) {
74                                 // Regular packet.
75                                 max_length = max<int>(max_length, it.second.data.size() + 4);
76                                 ++num_regular;
77                         } else {
78                                 // RS packet.
79                                 max_length = max<int>(max_length, it.second.data.size());
80                         }
81                 }
82
83                 if (num_regular < RS_PAYLOAD_SIZE) {
84                         // Piece the data back into the different RS groups.
85                         vector<string> padded_packets;
86                         vector<int> missing_packets;
87                         for (int i = 0; i < RS_GROUP_SIZE; ++i) {
88                                 int packet_num = (i < RS_PAYLOAD_SIZE) ? rs_group * RS_PAYLOAD_SIZE + i :
89                                         rs_group * RS_PAYLOAD_SIZE - 1 - (i - RS_PAYLOAD_SIZE);
90                                 string p;
91                                 p.resize(max_length);
92                                 const auto it = group.packets.find(packet_num);
93                                 if (it == group.packets.end()) {
94                                         missing_packets.push_back(i);
95                                 } else {
96                                         const GREPacket &packet = it->second;
97                                         uint16_t proto_be = htons(packet.proto);
98                                         memcpy(&p[0], &proto_be, sizeof(uint16_t));
99                                         uint16_t len_be = htons(packet.data.size());
100                                         memcpy(&p[2], &len_be, sizeof(uint16_t));
101                                         memcpy(&p[4], packet.data.data(), packet.data.size());
102                                 }
103                                 padded_packets.push_back(p);
104                         }
105
106                         // Now reconstruct the missing pieces.
107                         unsigned char ch[RS_GROUP_SIZE];
108                         for (int i = 0; i < max_length; ++i) {
109                                 for (int j = 0; j < RS_GROUP_SIZE; ++j) {
110                                         ch[j] = padded_packets[j][i];
111                                 }
112                                 int ret = decode_rs_8(ch, &missing_packets[0], missing_packets.size(),
113                                         RS_PAD);
114                                 if (ret == -1) {
115                                         printf("Failed reconstruction!\n");
116                                         // We might get more data later, so don't remove it.
117                                         return;
118                                 }
119                                 for (int j = 0; j < RS_GROUP_SIZE; ++j) {
120                                         padded_packets[j][i] = ch[j];
121                                 }
122                         }
123
124                         // Output all packets we didn't have before. They will come
125                         // out-of-order, which will be the job of the Reorderer to fix.
126                         for (int i = 0; i < RS_PAYLOAD_SIZE; ++i) {
127                                 int packet_num = rs_group * RS_PAYLOAD_SIZE + i;
128                                 if (group.packets.count(packet_num) != 0) {
129                                         // Already had this packet.
130                                         continue;
131                                 }
132                                 const string &p = padded_packets[i];
133                                 uint16_t proto_be, len_be;
134                                 memcpy(&proto_be, &p[0], sizeof(uint16_t));
135                                 memcpy(&len_be, &p[2], sizeof(uint16_t));
136                                 string s(&p[4], &p[4 + ntohs(len_be)]);  // TODO: security
137                                 sender->send_packet(ntohs(proto_be), s, packet_num);
138                                 printf("Reconstructed packet %d\n", packet_num);
139                         }
140                 }
141                 
142                 group.done = true;
143         }
144
145         if (rs_groups.size() > RS_GROUP_HISTORY) {
146                 const auto &it = rs_groups.begin();
147                 if (!it->second.done) {
148                         printf("Giving up reconstruction within group [%d,%d> (only got %d/%d packets, needed %d)\n",
149                                it->first * RS_PAYLOAD_SIZE,
150                                (it->first + 1) * RS_PAYLOAD_SIZE,
151                                int(it->second.packets.size()),
152                                RS_GROUP_SIZE,
153                                RS_PAYLOAD_SIZE);
154                 }
155                 rs_groups.erase(it);
156         }
157 }
158