]> git.sesse.net Git - greproxy/blob - rsdecoder.cpp
Make Reorderer handle multi-timeouts.
[greproxy] / rsdecoder.cpp
1 #include <stdio.h>
2 #include <string.h>
3 #include <arpa/inet.h>
4 #include <fecpp.h>
5
6 #include <map>
7 #include "rsdecoder.h"
8 #include "rs_parm.h"
9
10 #define RS_GROUP_HISTORY 3
11
12 using namespace std;
13
14 RSDecoder::RSDecoder(Sender *sender)
15         : sender(sender),
16           rs(RS_PAYLOAD_SIZE, RS_GROUP_SIZE)
17 {
18 }
19
20 void RSDecoder::send_packet(uint16_t proto, const std::string &data, int incoming_seq)
21 {
22         int rs_group;
23         if (proto == 0xffff) {
24                 // RS packet
25                 rs_group = (incoming_seq + RS_PAYLOAD_SIZE - 1) / RS_PAYLOAD_SIZE;
26         } else {
27                 // Regular packet
28                 rs_group = incoming_seq / RS_PAYLOAD_SIZE;
29         }
30
31         if (rs_groups.size() >= RS_GROUP_HISTORY &&
32             rs_group < rs_groups.begin()->first) {
33                 // Older than the oldest group.
34                 return;
35         }
36
37         auto group_it = rs_groups.find(rs_group);
38         if (group_it == rs_groups.end()) {
39                 RSGroup group;
40                 group.done = false;
41                 group_it = rs_groups.insert(make_pair(rs_group, group)).first;
42         }
43
44         RSGroup &group = group_it->second;
45         if (group.done) {
46                 // This RS group was already sent.
47                 return;
48         }
49         if (group.packets.count(incoming_seq)) {
50                 // Already seen this packet.
51                 return;
52         }
53
54         if (proto != 0xffff) {
55                 sender->send_packet(proto, data, incoming_seq);
56         }
57
58         GREPacket packet;
59         packet.seq = incoming_seq;
60         packet.proto = proto;
61         packet.data = data;
62         // Don't care about ts.
63
64         group.packets.insert(make_pair(incoming_seq, packet));
65         if (group.packets.size() >= RS_PAYLOAD_SIZE) {
66                 // Enough to reconstruct all missing packets.
67
68                 // Reconstruction always happens on the longest packet;
69                 // we will truncate them later.
70                 int max_length = 0;
71                 int num_regular = 0;
72                 for (const auto &it : group.packets) {
73                         if (it.first >= rs_group * RS_PAYLOAD_SIZE) {
74                                 // Regular packet.
75                                 max_length = max<int>(max_length, it.second.data.size() + 4);
76                                 ++num_regular;
77                         } else {
78                                 // RS packet.
79                                 max_length = max<int>(max_length, it.second.data.size());
80                         }
81                 }
82
83                 if (num_regular < RS_PAYLOAD_SIZE) {
84                         // Piece the data back into the different RS groups.
85                         vector<string> padded_packets;
86                         std::map<size_t, const fecpp::byte *> shares;
87                         for (const auto &packet_pair : group.packets) {
88                                 int packet_seq = packet_pair.first;
89                                 int share_num;
90                                 string p;
91                                 p.resize(max_length);
92                                 const GREPacket &packet = packet_pair.second;
93                                 if (packet_seq >= rs_group * RS_PAYLOAD_SIZE) {
94                                         // Regular packet.
95                                         share_num = packet_seq - rs_group * RS_PAYLOAD_SIZE;
96                                         uint16_t proto_be = htons(packet.proto);
97                                         memcpy(&p[0], &proto_be, sizeof(uint16_t));
98                                         uint16_t len_be = htons(packet.data.size());
99                                         memcpy(&p[2], &len_be, sizeof(uint16_t));
100                                         memcpy(&p[4], packet.data.data(), packet.data.size());
101                                 } else {
102                                         // RS packet.
103                                         share_num = RS_PAYLOAD_SIZE + (rs_group * RS_PAYLOAD_SIZE - packet_seq - 1);
104                                         memcpy(&p[0], packet.data.data(), packet.data.size());
105                                 }
106                                 padded_packets.push_back(p);
107                                 shares[share_num] = reinterpret_cast<const fecpp::byte *>(padded_packets.back().data());
108                         }
109
110                         // Output all packets we didn't have before. They will come
111                         // out-of-order, which will be the job of the Reorderer to fix.
112                         rs.decode(shares, max_length, [&](size_t share_num, size_t num_shares, const fecpp::byte data[], size_t len){
113                                 if (shares.count(share_num)) {
114                                         // Already had this packet.
115                                         return;
116                                 }
117                                 int packet_num = rs_group * RS_PAYLOAD_SIZE + share_num;
118                                 uint16_t proto_be, len_be;
119                                 memcpy(&proto_be, &data[0], sizeof(uint16_t));
120                                 memcpy(&len_be, &data[2], sizeof(uint16_t));
121                                 string s(&data[4], &data[4 + ntohs(len_be)]);  // TODO: security
122                                 sender->send_packet(ntohs(proto_be), s, packet_num);
123                                 const unsigned char *ptr = &data[4];
124                                 printf("Reconstructed packet %d (proto=0x%04x len=%d, data=%02x %02x %02x %02x %02x %02x %02x %02x)\n", packet_num,
125                                         ntohs(proto_be), ntohs(len_be),
126                                         ptr[0], ptr[1], ptr[2], ptr[3], ptr[4], ptr[5], ptr[6], ptr[7]);
127
128                         });
129                 }
130                 
131                 group.done = true;
132         }
133
134         if (rs_groups.size() > RS_GROUP_HISTORY) {
135                 const auto &it = rs_groups.begin();
136                 if (!it->second.done) {
137                         printf("Giving up reconstruction within group [%d,%d> (only got %d/%d packets, needed %d)\n",
138                                it->first * RS_PAYLOAD_SIZE,
139                                (it->first + 1) * RS_PAYLOAD_SIZE,
140                                int(it->second.packets.size()),
141                                RS_GROUP_SIZE,
142                                RS_PAYLOAD_SIZE);
143                 }
144                 rs_groups.erase(it);
145         }
146 }
147