]> git.sesse.net Git - greproxy/blob - rsencoder.cpp
Merge branch 'master' of /srv/git.sesse.net/www/greproxy
[greproxy] / rsencoder.cpp
1 #include <string.h>
2 #include <arpa/inet.h>
3 #include <netinet/in.h>
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <sys/select.h>
7 #include <sys/socket.h>
8 #include <fecpp.h>
9
10 #include "reorderer.h"
11 #include "rsencoder.h"
12 #include "rs_parm.h"
13
14 #include <algorithm>
15
16 using namespace std;
17         
18 RSEncoder::RSEncoder(Sender *sender) 
19         : sender(sender),
20           rs(RS_PAYLOAD_SIZE, RS_GROUP_SIZE)
21 {
22 }
23
24 void RSEncoder::send_packet(uint16_t proto, const std::string &data, uint32_t incoming_seq)
25 {
26         if (!packet_history.empty() &&
27             incoming_seq <= packet_history.back().seq) {
28                 // Reorderer should have done this for us.
29                 return;
30         }
31         if (!packet_history.empty() &&
32             incoming_seq / RS_PAYLOAD_SIZE !=
33                 packet_history.back().seq / RS_PAYLOAD_SIZE) {
34                 // Received an unfinished group.
35                 packet_history.clear();
36         }
37         bool debug_drop_packet = false;  // For testing only.
38         if (incoming_seq % RS_PAYLOAD_SIZE == 3) {
39                 //debug_drop_packet = true;
40         }
41         if (debug_drop_packet) {
42                 const unsigned char *ptr = reinterpret_cast<const unsigned char *>(data.data());
43                 printf("DEBUG: Dropping packet seq=%u proto=0x%04x len=%d data=%02x %02x %02x %02x %02x %02x %02x %02x ...\n",
44                         incoming_seq, proto, data.size(), ptr[0], ptr[1], ptr[2], ptr[3], ptr[4], ptr[5], ptr[6], ptr[7]);
45         } else {
46                 sender->send_packet(proto, data, incoming_seq);
47         }
48         packet_history.emplace_back(GREPacket{incoming_seq, proto, data});
49         if (packet_history.size() == RS_PAYLOAD_SIZE) {
50                 finish_group();
51         }
52 }
53
54 void RSEncoder::finish_group()
55 {
56         // Our RS packets need to have the same max length as the longest one.
57         int max_length = 0;
58         for (int i = 0; i < packet_history.size(); ++i) {
59                 max_length = max<int>(max_length, packet_history[i].data.size());
60         }
61
62         string padded_packets;
63         padded_packets.reserve((max_length + 4) * packet_history.size());
64         // TODO: RS_PAD
65         for (int i = 0; i < packet_history.size(); ++i) {
66                 string p;
67                 p.resize(max_length + 4);
68                 memset(&p[0], 0, max_length + 4);
69                 uint16_t proto_be = htons(packet_history[i].proto);
70                 memcpy(&p[0], &proto_be, sizeof(uint16_t));
71                 uint16_t len_be = htons(packet_history[i].data.size());
72                 memcpy(&p[2], &len_be, sizeof(uint16_t));
73                 memcpy(&p[4], packet_history[i].data.data(), packet_history[i].data.size());
74                 padded_packets += p;
75         }
76
77         // Now construct and send RS packets.
78         rs.encode(reinterpret_cast<const fecpp::byte*>(padded_packets.data()),
79                   padded_packets.size(),
80                   [&](size_t packet_num, size_t num_packets, const fecpp::byte data[], size_t size) {
81                         // The first N packets are just the original ones; ignore them.
82                         if (packet_num < RS_PAYLOAD_SIZE) {
83                                 return;
84                         }
85
86                         const char *sdata = reinterpret_cast<const char *>(data);
87                         uint32_t start_seq = packet_history[0].seq - 1;
88                         sender->send_packet(0xffff, string(sdata, size), start_seq - (packet_num - RS_PAYLOAD_SIZE));
89                 });
90         
91         packet_history.clear();
92 }
93