+#include <arpa/inet.h>
+#include <netinet/in.h>
#include <stdio.h>
-#include <string.h>
#include <stdlib.h>
+#include <sys/select.h>
#include <sys/socket.h>
-#include <arpa/inet.h>
-#include <netinet/in.h>
-#include <map>
-#include <string>
-#include <queue>
+#include "greprotocol.h"
+#include "reorderer.h"
+#include "rsencoder.h"
+#include "pacer.h"
using namespace std;
-struct gre_header {
- uint8_t reserved0_hi : 4;
- uint8_t has_seq : 1;
- uint8_t has_key : 1;
- uint8_t unused : 1;
- uint8_t has_checksum : 1;
-
- uint8_t version : 3;
- uint8_t reserved0_lo: 5;
-
- uint16_t protocol_type;
-};
-
in6_addr get_addr(const char *str) {
in6_addr ret;
if (inet_pton(AF_INET6, str, &ret) != 1) {
return ret;
}
-struct GREPacket {
- int seq;
- string data;
-
- bool operator> (const GREPacket &other) const {
- return seq > other.seq;
- }
-};
-
-class Reorderer {
-public:
- Reorderer(int sock, const in6_addr &dst);
- void handle_packet(const char* buf, size_t size, int seq);
-
-private:
- void send_packet(const string &data);
-
- int sock;
- sockaddr_in6 dstaddr;
- int last_seq;
-
- priority_queue<GREPacket, vector<GREPacket>, greater<GREPacket>> packet_buffer;
- map<int, int> ccs;
-};
-
-Reorderer::Reorderer(int sock, const in6_addr &dst)
- : sock(sock), last_seq(-1)
-{
- memset(&dstaddr, 0, sizeof(dstaddr));
- dstaddr.sin6_family = AF_INET6;
- dstaddr.sin6_addr = dst;
-}
-
-#define PACKET_BUFFER_SIZE 100
-
-void Reorderer::handle_packet(const char* buf, size_t size, int seq)
-{
- if (packet_buffer.size() >= PACKET_BUFFER_SIZE) {
- printf("Gave up waiting for packets [%d,%d>\n",
- last_seq + 1, packet_buffer.top().seq);
- last_seq = packet_buffer.top().seq - 1;
- }
-
- GREPacket packet;
- packet.seq = seq;
- packet.data = string(buf, buf + size);
- packet_buffer.push(packet);
-
- bool silence = false;
- while (!packet_buffer.empty() &&
- (last_seq == -1 || packet_buffer.top().seq <= last_seq + 1)) {
- int front_seq = packet_buffer.top().seq;
- if (front_seq < last_seq + 1) {
- printf("Duplicate packet or way out-of-order: seq=%d front_seq=%d\n",
- seq, front_seq);
- packet_buffer.pop();
- continue;
- }
- //if (packet_buffer.size() > 1) {
- // printf("seq=%d (REORDER %d)\n", front_seq, int(packet_buffer.size()));
- //} else {
- // printf("seq=%d\n", front_seq);
- //}
- const string &data = packet_buffer.top().data;
- send_packet(data);
- packet_buffer.pop();
- last_seq = front_seq;
- if (!silence && !packet_buffer.empty()) {
- printf("Reordering with packet buffer size %d: seq=%d new_front_seq=%d\n", int(packet_buffer.size()), front_seq, packet_buffer.top().seq);
- silence = true;
- }
- }
-}
-
-void Reorderer::send_packet(const string &data)
-{
- if (data.size() == 1352) {
- for (int i = 0; i < 7; ++i) {
- const char *pkt = &data[i * 188 + 36];
- int pid = (ntohl(*(uint32_t *)(pkt)) & 0x1fff00) >> 8;
- int has_payload = pkt[3] & 0x10;
- int cc = pkt[3] & 0xf;
- if (has_payload) {
- int last_cc = ccs[pid];
- if (cc != ((last_cc + 1) & 0xf)) {
- printf("Pid %d discontinuity (expected %d, got %d)\n", pid, (last_cc + 1) & 0xf, cc);
- }
- ccs[pid] = cc;
- }
- }
- }
- if (sendto(sock, data.data(), data.size(), 0, (sockaddr *)&dstaddr, sizeof(dstaddr)) == -1) {
- perror("sendto");
- return;
- }
-}
-
int main(int argc, char **argv)
{
- int sock = socket(AF_INET6, SOCK_RAW, IPPROTO_GRE);
- if (sock == -1) {
- perror("socket");
- exit(1);
- }
-
- sockaddr_in6 my_addr;
- memset(&my_addr, 0, sizeof(my_addr));
- my_addr.sin6_family = AF_INET6;
- my_addr.sin6_addr = get_addr(argv[3]);
- if (bind(sock, (sockaddr *)&my_addr, sizeof(my_addr)) == -1) {
- perror("bind");
- exit(1);
- }
-
in6_addr addr_a = get_addr(argv[1]);
in6_addr addr_b = get_addr(argv[2]);
- Reorderer dst_a(sock, addr_a);
- Reorderer dst_b(sock, addr_b);
-
+ in6_addr myaddr = get_addr(argv[3]);
+ GREProtocol gre_a(myaddr, addr_a);
+ GREProtocol gre_b(myaddr, addr_b);
+ Pacer pacer_a(&gre_a, 40000, 6);
+ Pacer pacer_b(&gre_b, 40000, 6);
+ RSEncoder rs_a(&pacer_a);
+ RSEncoder rs_b(&pacer_b);
+ Reorderer reorder_a(&rs_a);
+ Reorderer reorder_b(&rs_b);
+
+ fd_set fds;
+ FD_ZERO(&fds);
for ( ;; ) {
- char addrstr[256];
- struct sockaddr_storage addr;
- socklen_t addrlen = sizeof(addr);
- char buf[4096];
- int ret = recvfrom(sock, buf, sizeof(buf), 0, (struct sockaddr *)&addr, &addrlen);
+ timeval tv = { 1, 0 };
+ FD_SET(gre_a.fd(), &fds);
+ FD_SET(gre_b.fd(), &fds);
+ pacer_a.possibly_adjust_tv(&tv);
+ pacer_b.possibly_adjust_tv(&tv);
+ reorder_a.possibly_adjust_tv(&tv);
+ reorder_b.possibly_adjust_tv(&tv);
+ int ret = select(1024, &fds, NULL, NULL, &tv);
if (ret == -1) {
- perror("recvfrom");
- exit(1);
- }
- if (addr.ss_family != AF_INET6) {
- // ignore
- //inet_ntop(AF_INET, &((struct sockaddr_in *)&addr)->sin_addr, addrstr, sizeof(addrstr));
- continue;
- }
- struct in6_addr *addr6 = &((struct sockaddr_in6 *)&addr)->sin6_addr;
- if (memcmp(addr6, &addr_a, sizeof(*addr6)) != 0 &&
- memcmp(addr6, &addr_b, sizeof(*addr6)) != 0) {
- // ignore
+ perror("select");
continue;
}
- inet_ntop(AF_INET6, addr6, addrstr, sizeof(addrstr));
- gre_header* gre = (gre_header *)buf;
- //printf("GREv%d of %d bytes from %s: %02x %02x %02x %02x\n", gre->version, ret, addrstr,
- // buf[0], buf[1], buf[2], buf[3]);
- //printf(" has_checksum=%d, has_key=%d, has_seq=%d\n", gre->has_checksum, gre->has_key, gre->has_seq);
- char* ptr = buf + sizeof(gre_header);
- if (gre->has_checksum) {
- ptr += 4;
- }
- if (gre->has_key) {
- ptr += 4;
- }
- uint32_t seq;
- if (gre->has_seq) {
- seq = ntohl(*(uint32_t *)ptr);
- // printf(" seq=%d\n", seq);
+ if (FD_ISSET(gre_a.fd(), &fds)) {
+ gre_a.read_packet(&reorder_b);
}
- if (memcmp(addr6, &addr_a, sizeof(*addr6)) == 0) {
- // comes from A, send to B
- dst_b.handle_packet(buf, ret, seq);
- } else {
- // comes from B, send to A
- dst_a.handle_packet(buf, ret, seq);
+ if (FD_ISSET(gre_b.fd(), &fds)) {
+ gre_b.read_packet(&reorder_a);
}
+ pacer_a.possibly_flush_packets();
+ pacer_b.possibly_flush_packets();
}
}