From: Steinar H. Gunderson Date: Sat, 7 Feb 2015 23:55:00 +0000 (+0100) Subject: Add FEC. X-Git-Url: https://git.sesse.net/?p=greproxy;a=commitdiff_plain;h=9e8a28e92f8e092a2409ddad770b3dbe088a4fe9 Add FEC. --- diff --git a/Makefile b/Makefile index aeb7ba9..4f09820 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,8 @@ CXXFLAGS=-std=gnu++11 -O2 -g +LDLIBS=-lfec -TUNGRE_OBJS=tungre.o greprotocol.o reorderer.o tunprotocol.o -GREPROXY_OBJS=greproxy.o greprotocol.o reorderer.o tunprotocol.o +TUNGRE_OBJS=tungre.o greprotocol.o reorderer.o tunprotocol.o rsdecoder.o rsencoder.o +GREPROXY_OBJS=greproxy.o greprotocol.o reorderer.o tunprotocol.o rsdecoder.o rsencoder.o all: tungre greproxy tungre: $(TUNGRE_OBJS) diff --git a/greproxy.cpp b/greproxy.cpp index 030ea1f..127e0a6 100644 --- a/greproxy.cpp +++ b/greproxy.cpp @@ -7,6 +7,7 @@ #include "greprotocol.h" #include "reorderer.h" +#include "rsencoder.h" using namespace std; @@ -26,8 +27,10 @@ int main(int argc, char **argv) in6_addr myaddr = get_addr(argv[3]); GREProtocol gre_a(myaddr, addr_a); GREProtocol gre_b(myaddr, addr_b); - Reorderer dst_a(&gre_a); - Reorderer dst_b(&gre_b); + RSEncoder rs_a(&gre_a); + RSEncoder rs_b(&gre_b); + Reorderer reorder_a(&rs_a); + Reorderer reorder_b(&rs_b); fd_set fds; FD_ZERO(&fds); @@ -41,10 +44,10 @@ int main(int argc, char **argv) } if (FD_ISSET(gre_a.fd(), &fds)) { - gre_a.read_packet(&dst_b); + gre_a.read_packet(&reorder_b); } if (FD_ISSET(gre_b.fd(), &fds)) { - gre_b.read_packet(&dst_a); + gre_b.read_packet(&reorder_a); } } } diff --git a/rsdecoder.cpp b/rsdecoder.cpp new file mode 100644 index 0000000..20297bf --- /dev/null +++ b/rsdecoder.cpp @@ -0,0 +1,158 @@ +#include +#include +#include + +#include +#include "rsdecoder.h" +#include "rs_parm.h" + +extern "C" { +#include +} + +#define RS_GROUP_HISTORY 3 + +using namespace std; + +RSDecoder::RSDecoder(Sender *sender) + : sender(sender) {} + +void RSDecoder::send_packet(uint16_t proto, const std::string &data, int incoming_seq) +{ + int rs_group; + if (proto == 0xffff) { + // RS packet + rs_group = (incoming_seq + RS_PAYLOAD_SIZE - 1) / RS_PAYLOAD_SIZE; + } else { + // Regular packet + rs_group = incoming_seq / RS_PAYLOAD_SIZE; + } + + if (rs_groups.size() >= RS_GROUP_HISTORY && + rs_group < rs_groups.begin()->first) { + // Older than the oldest group. + return; + } + + auto group_it = rs_groups.find(rs_group); + if (group_it == rs_groups.end()) { + RSGroup group; + group.done = false; + group_it = rs_groups.insert(make_pair(rs_group, group)).first; + } + + RSGroup &group = group_it->second; + if (group.done) { + // This RS group was already sent. + return; + } + if (group.packets.count(incoming_seq)) { + // Already seen this packet. + return; + } + + if (proto != 0xffff) { + sender->send_packet(proto, data, incoming_seq); + } + + GREPacket packet; + packet.seq = incoming_seq; + packet.proto = proto; + packet.data = data; + // Don't care about ts. + + group.packets.insert(make_pair(incoming_seq, packet)); + if (group.packets.size() >= RS_PAYLOAD_SIZE) { + // Enough to reconstruct all missing packets. + + // Reconstruction always happens on the longest packet; + // we will truncate them later. + int max_length = 0; + int num_regular = 0; + for (const auto &it : group.packets) { + if (it.first >= rs_group * RS_PAYLOAD_SIZE) { + // Regular packet. + max_length = max(max_length, it.second.data.size() + 4); + ++num_regular; + } else { + // RS packet. + max_length = max(max_length, it.second.data.size()); + } + } + + if (num_regular < RS_PAYLOAD_SIZE) { + // Piece the data back into the different RS groups. + vector padded_packets; + vector missing_packets; + for (int i = 0; i < RS_GROUP_SIZE; ++i) { + int packet_num = (i < RS_PAYLOAD_SIZE) ? rs_group * RS_PAYLOAD_SIZE + i : + rs_group * RS_PAYLOAD_SIZE - 1 - (i - RS_PAYLOAD_SIZE); + string p; + p.resize(max_length); + const auto it = group.packets.find(packet_num); + if (it == group.packets.end()) { + missing_packets.push_back(i); + } else { + const GREPacket &packet = it->second; + uint16_t proto_be = htons(packet.proto); + memcpy(&p[0], &proto_be, sizeof(uint16_t)); + uint16_t len_be = htons(packet.data.size()); + memcpy(&p[2], &len_be, sizeof(uint16_t)); + memcpy(&p[4], packet.data.data(), packet.data.size()); + } + padded_packets.push_back(p); + } + + // Now reconstruct the missing pieces. + unsigned char ch[RS_GROUP_SIZE]; + for (int i = 0; i < max_length; ++i) { + for (int j = 0; j < RS_GROUP_SIZE; ++j) { + ch[j] = padded_packets[j][i]; + } + int ret = decode_rs_8(ch, &missing_packets[0], missing_packets.size(), + RS_PAD); + if (ret == -1) { + printf("Failed reconstruction!\n"); + // We might get more data later, so don't remove it. + return; + } + for (int j = 0; j < RS_GROUP_SIZE; ++j) { + padded_packets[j][i] = ch[j]; + } + } + + // Output all packets we didn't have before. They will come + // out-of-order, which will be the job of the Reorderer to fix. + for (int i = 0; i < RS_PAYLOAD_SIZE; ++i) { + int packet_num = rs_group * RS_PAYLOAD_SIZE + i; + if (group.packets.count(packet_num) != 0) { + // Already had this packet. + continue; + } + const string &p = padded_packets[i]; + uint16_t proto_be, len_be; + memcpy(&proto_be, &p[0], sizeof(uint16_t)); + memcpy(&len_be, &p[2], sizeof(uint16_t)); + string s(&p[4], &p[4 + ntohs(len_be)]); // TODO: security + sender->send_packet(ntohs(proto_be), s, packet_num); + printf("Reconstructed packet %d\n", packet_num); + } + } + + group.done = true; + } + + if (rs_groups.size() > RS_GROUP_HISTORY) { + const auto &it = rs_groups.begin(); + if (!it->second.done) { + printf("Giving up reconstruction within group [%d,%d> (only got %d/%d packets, needed %d)\n", + it->first * RS_PAYLOAD_SIZE, + (it->first + 1) * RS_PAYLOAD_SIZE, + int(it->second.packets.size()), + RS_GROUP_SIZE, + RS_PAYLOAD_SIZE); + } + rs_groups.erase(it); + } +} + diff --git a/rsdecoder.h b/rsdecoder.h new file mode 100644 index 0000000..5ee52d1 --- /dev/null +++ b/rsdecoder.h @@ -0,0 +1,26 @@ +#ifndef _RSDECODER_H +#define _RSDECODER_H 1 + +#include + +#include + +#include "reorderer.h" + +class Sender; + +class RSDecoder : public Sender { +public: + RSDecoder(Sender *sender); + virtual void send_packet(uint16_t proto, const std::string &data, int incoming_seq); + +private: + struct RSGroup { + std::map packets; + bool done; + }; + Sender *sender; + std::map rs_groups; +}; + +#endif /* !defined(_RSDECODER_H) */ diff --git a/rsencoder.cpp b/rsencoder.cpp new file mode 100644 index 0000000..3026dd4 --- /dev/null +++ b/rsencoder.cpp @@ -0,0 +1,92 @@ +#include +#include +#include +#include +#include +#include +#include +extern "C" { +#include +} + +#include "reorderer.h" +#include "rsencoder.h" +#include "rs_parm.h" + +#include + +using namespace std; + +void RSEncoder::send_packet(uint16_t proto, const std::string &data, int incoming_seq) +{ + if (!packet_history.empty() && + incoming_seq <= packet_history.back().seq) { + // Reorderer should have done this for us. + return; + } + if (!packet_history.empty() && + incoming_seq / RS_PAYLOAD_SIZE != + packet_history.back().seq / RS_PAYLOAD_SIZE) { + // Received an unfinished group. + packet_history.clear(); + } + sender->send_packet(proto, data, incoming_seq); + packet_history.emplace_back(GREPacket{incoming_seq, proto, data}); + if (packet_history.size() == RS_PAYLOAD_SIZE) { + finish_group(); + } +} + +void RSEncoder::finish_group() +{ + // Our RS packets need to have the same max length as the longest one. + int max_length = 0; + for (int i = 0; i < packet_history.size(); ++i) { + max_length = max(max_length, packet_history[i].data.size()); + } + + vector padded_packets; + for (int i = 0; i < packet_history.size(); ++i) { + string p; + p.resize(max_length + 4); + memset(&p[0], 0, max_length + 4); + uint16_t proto_be = htons(packet_history[i].proto); + memcpy(&p[0], &proto_be, sizeof(uint16_t)); + uint16_t len_be = htons(packet_history[i].data.size()); + memcpy(&p[2], &len_be, sizeof(uint16_t)); + memcpy(&p[4], packet_history[i].data.data(), packet_history[i].data.size()); + padded_packets.push_back(p); + } + + // Now construct RS packets. + vector rs_packets; + for (int i = 0; i < RS_PARITY_SIZE; ++i) { + string p; + p.resize(max_length + 4); + memset(&p[0], 0, max_length + 4); + rs_packets.push_back(p); + } + string data, parity; + data.resize(RS_PAYLOAD_SIZE); + parity.resize(RS_PARITY_SIZE); + for (int i = 0; i < max_length + 4; ++i) { + for (int j = 0; j < packet_history.size(); ++j) { + data[j] = packet_history[j].data[i]; + } + encode_rs_8(reinterpret_cast(&data[0]), + reinterpret_cast(&parity[0]), + RS_PAD); + for (int j = 0; j < RS_PARITY_SIZE; ++j) { + rs_packets[j][i] = parity[j]; + } + } + + // Actually send the RS packets. + int start_seq = packet_history[0].seq - 1; + for (int i = 0; i < RS_PARITY_SIZE; ++i) { + sender->send_packet(0xffff, rs_packets[i], start_seq - i); + } + + packet_history.clear(); +} + diff --git a/rsencoder.h b/rsencoder.h new file mode 100644 index 0000000..75f0f24 --- /dev/null +++ b/rsencoder.h @@ -0,0 +1,25 @@ +#ifndef _RSENCODER_H +#define _RSENCODER_H 1 + +#include + +#include +#include + +#include "protocol.h" + +class Sender; + +class RSEncoder : public Sender { +public: + RSEncoder(Sender* sender) : sender(sender) {} + virtual void send_packet(uint16_t proto, const std::string &data, int incoming_seq); + +private: + void finish_group(); + + Sender* sender; + std::vector packet_history; +}; + +#endif // !defined(_RSENCODER_H) diff --git a/tungre.cpp b/tungre.cpp index 89339bb..b9ad8f1 100644 --- a/tungre.cpp +++ b/tungre.cpp @@ -8,6 +8,7 @@ #include "greprotocol.h" #include "reorderer.h" #include "tunprotocol.h" +#include "rsdecoder.h" using namespace std; @@ -28,6 +29,7 @@ int main(int argc, char **argv) TUNProtocol tun("tungre"); Reorderer tun_reorderer(&tun); + RSDecoder tun_decoder(&tun_reorderer); fd_set fds; FD_ZERO(&fds); @@ -41,7 +43,7 @@ int main(int argc, char **argv) } if (FD_ISSET(gre.fd(), &fds)) { - gre.read_packet(&tun_reorderer); + gre.read_packet(&tun_decoder); } if (FD_ISSET(tun.fd(), &fds)) { tun.read_packet(&gre);