10 #define RS_GROUP_HISTORY 3
14 RSDecoder::RSDecoder(Sender *sender)
16 rs(RS_PAYLOAD_SIZE, RS_GROUP_SIZE)
20 void RSDecoder::send_packet(uint16_t proto, const std::string &data, int incoming_seq)
23 if (proto == 0xffff) {
25 rs_group = (incoming_seq + RS_PAYLOAD_SIZE - 1) / RS_PAYLOAD_SIZE;
28 rs_group = incoming_seq / RS_PAYLOAD_SIZE;
31 if (rs_groups.size() >= RS_GROUP_HISTORY &&
32 rs_group < rs_groups.begin()->first) {
33 // Older than the oldest group.
37 auto group_it = rs_groups.find(rs_group);
38 if (group_it == rs_groups.end()) {
41 group_it = rs_groups.insert(make_pair(rs_group, group)).first;
44 RSGroup &group = group_it->second;
46 // This RS group was already sent.
49 if (group.packets.count(incoming_seq)) {
50 // Already seen this packet.
54 if (proto != 0xffff) {
55 sender->send_packet(proto, data, incoming_seq);
59 packet.seq = incoming_seq;
62 // Don't care about ts.
64 group.packets.insert(make_pair(incoming_seq, packet));
65 if (group.packets.size() >= RS_PAYLOAD_SIZE) {
66 // Enough to reconstruct all missing packets.
68 // Reconstruction always happens on the longest packet;
69 // we will truncate them later.
72 for (const auto &it : group.packets) {
73 if (it.first >= rs_group * RS_PAYLOAD_SIZE) {
75 max_length = max<int>(max_length, it.second.data.size() + 4);
79 max_length = max<int>(max_length, it.second.data.size());
83 if (num_regular < RS_PAYLOAD_SIZE) {
84 // Piece the data back into the different RS groups.
85 vector<string> padded_packets;
86 std::map<size_t, const fecpp::byte *> shares;
87 for (const auto &packet_pair : group.packets) {
88 int packet_seq = packet_pair.first;
92 const GREPacket &packet = packet_pair.second;
93 if (packet_seq >= rs_group * RS_PAYLOAD_SIZE) {
95 share_num = packet_seq - rs_group * RS_PAYLOAD_SIZE;
96 uint16_t proto_be = htons(packet.proto);
97 memcpy(&p[0], &proto_be, sizeof(uint16_t));
98 uint16_t len_be = htons(packet.data.size());
99 memcpy(&p[2], &len_be, sizeof(uint16_t));
100 memcpy(&p[4], packet.data.data(), packet.data.size());
103 share_num = RS_PAYLOAD_SIZE + (rs_group * RS_PAYLOAD_SIZE - packet_seq - 1);
104 memcpy(&p[0], packet.data.data(), packet.data.size());
106 padded_packets.push_back(p);
107 shares[share_num] = reinterpret_cast<const fecpp::byte *>(padded_packets.back().data());
110 // Output all packets we didn't have before. They will come
111 // out-of-order, which will be the job of the Reorderer to fix.
112 rs.decode(shares, max_length, [&](size_t share_num, size_t num_shares, const fecpp::byte data[], size_t len){
113 if (shares.count(share_num)) {
114 // Already had this packet.
117 int packet_num = rs_group * RS_PAYLOAD_SIZE + share_num;
118 uint16_t proto_be, len_be;
119 memcpy(&proto_be, &data[0], sizeof(uint16_t));
120 memcpy(&len_be, &data[2], sizeof(uint16_t));
121 string s(&data[4], &data[4 + ntohs(len_be)]); // TODO: security
122 sender->send_packet(ntohs(proto_be), s, packet_num);
123 const unsigned char *ptr = &data[4];
124 printf("Reconstructed packet %d (proto=0x%04x len=%d, data=%02x %02x %02x %02x %02x %02x %02x %02x)\n", packet_num,
125 ntohs(proto_be), ntohs(len_be),
126 ptr[0], ptr[1], ptr[2], ptr[3], ptr[4], ptr[5], ptr[6], ptr[7]);
134 if (rs_groups.size() > RS_GROUP_HISTORY) {
135 const auto &it = rs_groups.begin();
136 if (!it->second.done) {
137 printf("Giving up reconstruction within group [%d,%d> (only got %d/%d packets, needed %d)\n",
138 it->first * RS_PAYLOAD_SIZE,
139 (it->first + 1) * RS_PAYLOAD_SIZE,
140 int(it->second.packets.size()),