]> git.sesse.net Git - greproxy/commitdiff
Add FEC.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Sat, 7 Feb 2015 23:55:00 +0000 (00:55 +0100)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Sat, 7 Feb 2015 23:55:00 +0000 (00:55 +0100)
Makefile
greproxy.cpp
rsdecoder.cpp [new file with mode: 0644]
rsdecoder.h [new file with mode: 0644]
rsencoder.cpp [new file with mode: 0644]
rsencoder.h [new file with mode: 0644]
tungre.cpp

index aeb7ba9d4f3a80d07ccaa5be0178e03204b0a62f..4f098204b173c0ba475518b2f3a7d3d69077f292 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -1,7 +1,8 @@
 CXXFLAGS=-std=gnu++11 -O2 -g
 CXXFLAGS=-std=gnu++11 -O2 -g
+LDLIBS=-lfec
 
 
-TUNGRE_OBJS=tungre.o greprotocol.o reorderer.o tunprotocol.o
-GREPROXY_OBJS=greproxy.o greprotocol.o reorderer.o tunprotocol.o
+TUNGRE_OBJS=tungre.o greprotocol.o reorderer.o tunprotocol.o rsdecoder.o rsencoder.o
+GREPROXY_OBJS=greproxy.o greprotocol.o reorderer.o tunprotocol.o rsdecoder.o rsencoder.o
 
 all: tungre greproxy
 tungre: $(TUNGRE_OBJS)
 
 all: tungre greproxy
 tungre: $(TUNGRE_OBJS)
index 030ea1f335da90ea4a9b3704db8d9cd8e7e151f5..127e0a6c72f51f717cc02cc3eb84a84877877f27 100644 (file)
@@ -7,6 +7,7 @@
 
 #include "greprotocol.h"
 #include "reorderer.h"
 
 #include "greprotocol.h"
 #include "reorderer.h"
+#include "rsencoder.h"
 
 using namespace std;
 
 
 using namespace std;
 
@@ -26,8 +27,10 @@ int main(int argc, char **argv)
        in6_addr myaddr = get_addr(argv[3]);
        GREProtocol gre_a(myaddr, addr_a);
        GREProtocol gre_b(myaddr, addr_b);
        in6_addr myaddr = get_addr(argv[3]);
        GREProtocol gre_a(myaddr, addr_a);
        GREProtocol gre_b(myaddr, addr_b);
-       Reorderer dst_a(&gre_a);
-       Reorderer dst_b(&gre_b);
+       RSEncoder rs_a(&gre_a);
+       RSEncoder rs_b(&gre_b);
+       Reorderer reorder_a(&rs_a);
+       Reorderer reorder_b(&rs_b);
 
        fd_set fds;
        FD_ZERO(&fds);
 
        fd_set fds;
        FD_ZERO(&fds);
@@ -41,10 +44,10 @@ int main(int argc, char **argv)
                }
 
                if (FD_ISSET(gre_a.fd(), &fds)) {
                }
 
                if (FD_ISSET(gre_a.fd(), &fds)) {
-                       gre_a.read_packet(&dst_b);
+                       gre_a.read_packet(&reorder_b);
                }
                if (FD_ISSET(gre_b.fd(), &fds)) {
                }
                if (FD_ISSET(gre_b.fd(), &fds)) {
-                       gre_b.read_packet(&dst_a);
+                       gre_b.read_packet(&reorder_a);
                }
        }
 }
                }
        }
 }
diff --git a/rsdecoder.cpp b/rsdecoder.cpp
new file mode 100644 (file)
index 0000000..20297bf
--- /dev/null
@@ -0,0 +1,158 @@
+#include <stdio.h>
+#include <string.h>
+#include <arpa/inet.h>
+
+#include <map>
+#include "rsdecoder.h"
+#include "rs_parm.h"
+
+extern "C" {
+#include <fec.h>
+}
+
+#define RS_GROUP_HISTORY 3
+
+using namespace std;
+
+RSDecoder::RSDecoder(Sender *sender)
+       : sender(sender) {}
+
+void RSDecoder::send_packet(uint16_t proto, const std::string &data, int incoming_seq)
+{
+       int rs_group;
+       if (proto == 0xffff) {
+               // RS packet
+               rs_group = (incoming_seq + RS_PAYLOAD_SIZE - 1) / RS_PAYLOAD_SIZE;
+       } else {
+               // Regular packet
+               rs_group = incoming_seq / RS_PAYLOAD_SIZE;
+       }
+
+       if (rs_groups.size() >= RS_GROUP_HISTORY &&
+           rs_group < rs_groups.begin()->first) {
+               // Older than the oldest group.
+               return;
+       }
+
+       auto group_it = rs_groups.find(rs_group);
+       if (group_it == rs_groups.end()) {
+               RSGroup group;
+               group.done = false;
+               group_it = rs_groups.insert(make_pair(rs_group, group)).first;
+       }
+
+       RSGroup &group = group_it->second;
+       if (group.done) {
+               // This RS group was already sent.
+               return;
+       }
+       if (group.packets.count(incoming_seq)) {
+               // Already seen this packet.
+               return;
+       }
+
+       if (proto != 0xffff) {
+               sender->send_packet(proto, data, incoming_seq);
+       }
+
+       GREPacket packet;
+       packet.seq = incoming_seq;
+       packet.proto = proto;
+       packet.data = data;
+       // Don't care about ts.
+
+       group.packets.insert(make_pair(incoming_seq, packet));
+       if (group.packets.size() >= RS_PAYLOAD_SIZE) {
+               // Enough to reconstruct all missing packets.
+
+               // Reconstruction always happens on the longest packet;
+               // we will truncate them later.
+               int max_length = 0;
+               int num_regular = 0;
+               for (const auto &it : group.packets) {
+                       if (it.first >= rs_group * RS_PAYLOAD_SIZE) {
+                               // Regular packet.
+                               max_length = max<int>(max_length, it.second.data.size() + 4);
+                               ++num_regular;
+                       } else {
+                               // RS packet.
+                               max_length = max<int>(max_length, it.second.data.size());
+                       }
+               }
+
+               if (num_regular < RS_PAYLOAD_SIZE) {
+                       // Piece the data back into the different RS groups.
+                       vector<string> padded_packets;
+                       vector<int> missing_packets;
+                       for (int i = 0; i < RS_GROUP_SIZE; ++i) {
+                               int packet_num = (i < RS_PAYLOAD_SIZE) ? rs_group * RS_PAYLOAD_SIZE + i :
+                                       rs_group * RS_PAYLOAD_SIZE - 1 - (i - RS_PAYLOAD_SIZE);
+                               string p;
+                               p.resize(max_length);
+                               const auto it = group.packets.find(packet_num);
+                               if (it == group.packets.end()) {
+                                       missing_packets.push_back(i);
+                               } else {
+                                       const GREPacket &packet = it->second;
+                                       uint16_t proto_be = htons(packet.proto);
+                                       memcpy(&p[0], &proto_be, sizeof(uint16_t));
+                                       uint16_t len_be = htons(packet.data.size());
+                                       memcpy(&p[2], &len_be, sizeof(uint16_t));
+                                       memcpy(&p[4], packet.data.data(), packet.data.size());
+                               }
+                               padded_packets.push_back(p);
+                       }
+
+                       // Now reconstruct the missing pieces.
+                       unsigned char ch[RS_GROUP_SIZE];
+                       for (int i = 0; i < max_length; ++i) {
+                               for (int j = 0; j < RS_GROUP_SIZE; ++j) {
+                                       ch[j] = padded_packets[j][i];
+                               }
+                               int ret = decode_rs_8(ch, &missing_packets[0], missing_packets.size(),
+                                       RS_PAD);
+                               if (ret == -1) {
+                                       printf("Failed reconstruction!\n");
+                                       // We might get more data later, so don't remove it.
+                                       return;
+                               }
+                               for (int j = 0; j < RS_GROUP_SIZE; ++j) {
+                                       padded_packets[j][i] = ch[j];
+                               }
+                       }
+
+                       // Output all packets we didn't have before. They will come
+                       // out-of-order, which will be the job of the Reorderer to fix.
+                       for (int i = 0; i < RS_PAYLOAD_SIZE; ++i) {
+                               int packet_num = rs_group * RS_PAYLOAD_SIZE + i;
+                               if (group.packets.count(packet_num) != 0) {
+                                       // Already had this packet.
+                                       continue;
+                               }
+                               const string &p = padded_packets[i];
+                               uint16_t proto_be, len_be;
+                               memcpy(&proto_be, &p[0], sizeof(uint16_t));
+                               memcpy(&len_be, &p[2], sizeof(uint16_t));
+                               string s(&p[4], &p[4 + ntohs(len_be)]);  // TODO: security
+                               sender->send_packet(ntohs(proto_be), s, packet_num);
+                               printf("Reconstructed packet %d\n", packet_num);
+                       }
+               }
+               
+               group.done = true;
+       }
+
+       if (rs_groups.size() > RS_GROUP_HISTORY) {
+               const auto &it = rs_groups.begin();
+               if (!it->second.done) {
+                       printf("Giving up reconstruction within group [%d,%d> (only got %d/%d packets, needed %d)\n",
+                              it->first * RS_PAYLOAD_SIZE,
+                              (it->first + 1) * RS_PAYLOAD_SIZE,
+                              int(it->second.packets.size()),
+                              RS_GROUP_SIZE,
+                              RS_PAYLOAD_SIZE);
+               }
+               rs_groups.erase(it);
+       }
+}
+
diff --git a/rsdecoder.h b/rsdecoder.h
new file mode 100644 (file)
index 0000000..5ee52d1
--- /dev/null
@@ -0,0 +1,26 @@
+#ifndef _RSDECODER_H
+#define _RSDECODER_H 1
+
+#include <stdint.h>
+
+#include <map>
+
+#include "reorderer.h"
+
+class Sender;
+
+class RSDecoder : public Sender {
+public:
+       RSDecoder(Sender *sender);
+       virtual void send_packet(uint16_t proto, const std::string &data, int incoming_seq);
+
+private:
+       struct RSGroup {
+               std::map<int, GREPacket> packets;
+               bool done;
+       };
+       Sender *sender;
+       std::map<int, RSGroup> rs_groups;
+};
+
+#endif  /* !defined(_RSDECODER_H) */
diff --git a/rsencoder.cpp b/rsencoder.cpp
new file mode 100644 (file)
index 0000000..3026dd4
--- /dev/null
@@ -0,0 +1,92 @@
+#include <string.h>
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/select.h>
+#include <sys/socket.h>
+extern "C" {
+#include <fec.h>
+}
+
+#include "reorderer.h"
+#include "rsencoder.h"
+#include "rs_parm.h"
+
+#include <algorithm>
+
+using namespace std;
+
+void RSEncoder::send_packet(uint16_t proto, const std::string &data, int incoming_seq)
+{
+       if (!packet_history.empty() &&
+           incoming_seq <= packet_history.back().seq) {
+               // Reorderer should have done this for us.
+               return;
+       }
+       if (!packet_history.empty() &&
+           incoming_seq / RS_PAYLOAD_SIZE !=
+               packet_history.back().seq / RS_PAYLOAD_SIZE) {
+               // Received an unfinished group.
+               packet_history.clear();
+       }
+       sender->send_packet(proto, data, incoming_seq);
+       packet_history.emplace_back(GREPacket{incoming_seq, proto, data});
+       if (packet_history.size() == RS_PAYLOAD_SIZE) {
+               finish_group();
+       }
+}
+
+void RSEncoder::finish_group()
+{
+       // Our RS packets need to have the same max length as the longest one.
+       int max_length = 0;
+       for (int i = 0; i < packet_history.size(); ++i) {
+               max_length = max<int>(max_length, packet_history[i].data.size());
+       }
+
+       vector<string> padded_packets;
+       for (int i = 0; i < packet_history.size(); ++i) {
+               string p;
+               p.resize(max_length + 4);
+               memset(&p[0], 0, max_length + 4);
+               uint16_t proto_be = htons(packet_history[i].proto);
+               memcpy(&p[0], &proto_be, sizeof(uint16_t));
+               uint16_t len_be = htons(packet_history[i].data.size());
+               memcpy(&p[2], &len_be, sizeof(uint16_t));
+               memcpy(&p[4], packet_history[i].data.data(), packet_history[i].data.size());
+               padded_packets.push_back(p);
+       }
+
+       // Now construct RS packets.
+       vector<string> rs_packets;
+       for (int i = 0; i < RS_PARITY_SIZE; ++i) {
+               string p;
+               p.resize(max_length + 4);
+               memset(&p[0], 0, max_length + 4);
+               rs_packets.push_back(p);
+       }
+       string data, parity;
+       data.resize(RS_PAYLOAD_SIZE);
+       parity.resize(RS_PARITY_SIZE);
+       for (int i = 0; i < max_length + 4; ++i) {
+               for (int j = 0; j < packet_history.size(); ++j) {
+                       data[j] = packet_history[j].data[i];
+               }
+               encode_rs_8(reinterpret_cast<unsigned char *>(&data[0]),
+                           reinterpret_cast<unsigned char *>(&parity[0]),
+                           RS_PAD);
+               for (int j = 0; j < RS_PARITY_SIZE; ++j) {
+                       rs_packets[j][i] = parity[j];
+               }
+       }
+
+       // Actually send the RS packets.
+       int start_seq = packet_history[0].seq - 1;
+       for (int i = 0; i < RS_PARITY_SIZE; ++i) {
+               sender->send_packet(0xffff, rs_packets[i], start_seq - i);
+       }
+       
+       packet_history.clear();
+}
+
diff --git a/rsencoder.h b/rsencoder.h
new file mode 100644 (file)
index 0000000..75f0f24
--- /dev/null
@@ -0,0 +1,25 @@
+#ifndef _RSENCODER_H
+#define _RSENCODER_H 1
+
+#include <stdint.h>
+
+#include <string>
+#include <vector>
+
+#include "protocol.h"
+
+class Sender;
+
+class RSEncoder : public Sender {
+public:
+       RSEncoder(Sender* sender) : sender(sender) {}
+       virtual void send_packet(uint16_t proto, const std::string &data, int incoming_seq);
+
+private:
+       void finish_group();
+
+       Sender* sender;
+       std::vector<GREPacket> packet_history;
+};
+
+#endif  // !defined(_RSENCODER_H)
index 89339bba2455aa9ab4616f99f18ce0736e3f6ea0..b9ad8f143b743b9e021422fbadf3efdf00001003 100644 (file)
@@ -8,6 +8,7 @@
 #include "greprotocol.h"
 #include "reorderer.h"
 #include "tunprotocol.h"
 #include "greprotocol.h"
 #include "reorderer.h"
 #include "tunprotocol.h"
+#include "rsdecoder.h"
 
 using namespace std;
 
 
 using namespace std;
 
@@ -28,6 +29,7 @@ int main(int argc, char **argv)
        TUNProtocol tun("tungre");
 
        Reorderer tun_reorderer(&tun);
        TUNProtocol tun("tungre");
 
        Reorderer tun_reorderer(&tun);
+       RSDecoder tun_decoder(&tun_reorderer);
 
        fd_set fds;
        FD_ZERO(&fds);
 
        fd_set fds;
        FD_ZERO(&fds);
@@ -41,7 +43,7 @@ int main(int argc, char **argv)
                }
 
                if (FD_ISSET(gre.fd(), &fds)) {
                }
 
                if (FD_ISSET(gre.fd(), &fds)) {
-                       gre.read_packet(&tun_reorderer);
+                       gre.read_packet(&tun_decoder);
                }
                if (FD_ISSET(tun.fd(), &fds)) {
                        tun.read_packet(&gre);
                }
                if (FD_ISSET(tun.fd(), &fds)) {
                        tun.read_packet(&gre);