]> git.sesse.net Git - greproxy/commitdiff
Add some packet pacing, to reduce the burstiness many parts of the networking do...
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Sun, 8 Feb 2015 20:50:40 +0000 (21:50 +0100)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Sun, 8 Feb 2015 20:50:40 +0000 (21:50 +0100)
Makefile
greproxy.cpp
pacer.cpp [new file with mode: 0644]
pacer.h [new file with mode: 0644]
tungre.cpp

index fe742a68849838046f00b2ec1e5fedf7e75700b7..b4634d8e25831135980de38927dda0482ccb0c26 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -1,8 +1,8 @@
 CXXFLAGS=-std=gnu++11 -O2 -g
 LDLIBS=-lfecpp
 
-TUNGRE_OBJS=tungre.o greprotocol.o reorderer.o tunprotocol.o rsdecoder.o rsencoder.o
-GREPROXY_OBJS=greproxy.o greprotocol.o reorderer.o tunprotocol.o rsdecoder.o rsencoder.o
+TUNGRE_OBJS=tungre.o greprotocol.o reorderer.o tunprotocol.o rsdecoder.o rsencoder.o pacer.o
+GREPROXY_OBJS=greproxy.o greprotocol.o reorderer.o tunprotocol.o rsdecoder.o rsencoder.o pacer.o
 
 all: tungre greproxy
 tungre: $(TUNGRE_OBJS)
index 127e0a6c72f51f717cc02cc3eb84a84877877f27..bf28295d02dd79aa9fc3bdbd6b91a4b55c1d913e 100644 (file)
@@ -8,6 +8,7 @@
 #include "greprotocol.h"
 #include "reorderer.h"
 #include "rsencoder.h"
+#include "pacer.h"
 
 using namespace std;
 
@@ -27,17 +28,22 @@ int main(int argc, char **argv)
        in6_addr myaddr = get_addr(argv[3]);
        GREProtocol gre_a(myaddr, addr_a);
        GREProtocol gre_b(myaddr, addr_b);
-       RSEncoder rs_a(&gre_a);
-       RSEncoder rs_b(&gre_b);
+       Pacer pacer_a(&gre_a, 40000, 6);
+       Pacer pacer_b(&gre_b, 40000, 6);
+       RSEncoder rs_a(&pacer_a);
+       RSEncoder rs_b(&pacer_b);
        Reorderer reorder_a(&rs_a);
        Reorderer reorder_b(&rs_b);
 
        fd_set fds;
        FD_ZERO(&fds);
        for ( ;; ) {
+               timeval tv = { 1, 0 };
                FD_SET(gre_a.fd(), &fds);
                FD_SET(gre_b.fd(), &fds);
-               int ret = select(1024, &fds, NULL, NULL, NULL);
+               pacer_a.possibly_adjust_tv(&tv);
+               pacer_b.possibly_adjust_tv(&tv);
+               int ret = select(1024, &fds, NULL, NULL, &tv);
                if (ret == -1) {
                        perror("select");
                        continue;
@@ -49,5 +55,7 @@ int main(int argc, char **argv)
                if (FD_ISSET(gre_b.fd(), &fds)) {
                        gre_b.read_packet(&reorder_a);
                }
+               pacer_a.possibly_flush_packets();
+               pacer_b.possibly_flush_packets();
        }
 }
diff --git a/pacer.cpp b/pacer.cpp
new file mode 100644 (file)
index 0000000..b0683f4
--- /dev/null
+++ b/pacer.cpp
@@ -0,0 +1,82 @@
+#include "pacer.h"
+
+#include <algorithm>
+
+using namespace std;
+
+Pacer::Pacer(Sender *sender, int max_rate_kbit_per_sec, int burst_num_packets)
+       : sender(sender), burst_num_packets(burst_num_packets), next_send_packet{0, 0}
+{
+       const int max_rate_byte_per_sec = 1000 * max_rate_kbit_per_sec / 8;
+       seconds_per_byte = 1.0 / max_rate_byte_per_sec;
+}
+
+bool less_than(const timeval &a, const timeval &b)
+{
+       return make_pair(a.tv_sec, a.tv_usec) < make_pair(b.tv_sec, b.tv_usec);
+}
+       
+void Pacer::send_packet(uint16_t proto, const string &data, int incoming_seq)
+{
+       waiting_packets.push_back(GREPacket{incoming_seq, proto, data, {0, 0}});
+       possibly_flush_packets();
+}
+
+void Pacer::possibly_adjust_tv(timeval *tv)
+{
+       if (waiting_packets.empty()) {
+               return;
+       }
+
+       timeval now;
+       gettimeofday(&now, NULL);
+       if (less_than(next_send_packet, now)) {
+               // Should send packets immediately.
+               tv->tv_sec = tv->tv_usec = 0;
+               return;
+       }
+
+       timeval tdiff;
+       tdiff.tv_sec = next_send_packet.tv_sec - now.tv_sec;
+       tdiff.tv_usec = next_send_packet.tv_usec - now.tv_usec;
+       if (tdiff.tv_usec < 0) {
+               tdiff.tv_usec += 1000000;
+               --tdiff.tv_sec;
+       }
+
+       if (less_than(tdiff, *tv)) {
+               *tv = tdiff;
+       }
+}
+
+void Pacer::possibly_flush_packets()
+{
+       if (waiting_packets.empty()) {
+               return;
+       }
+
+       timeval now;
+       gettimeofday(&now, NULL);
+       if (less_than(now, next_send_packet)) {
+               return;
+       }
+
+       int bytes_sent = 0;
+       for (int i = 0; i < burst_num_packets && !waiting_packets.empty(); ++i) {
+               const GREPacket &packet = waiting_packets.front();
+               sender->send_packet(packet.proto, packet.data, packet.seq);
+               bytes_sent += packet.data.size();
+               waiting_packets.pop_front();
+       }
+
+       int usec_to_add = lrint(1e6 * seconds_per_byte * bytes_sent);
+       int sec_to_add = usec_to_add / 1000000;
+       usec_to_add %= 1000000;
+
+       next_send_packet = now;
+       next_send_packet.tv_usec += usec_to_add;
+       next_send_packet.tv_sec += sec_to_add;
+       next_send_packet.tv_sec += next_send_packet.tv_usec / 1000000;
+       next_send_packet.tv_usec %= 1000000;
+}
+
diff --git a/pacer.h b/pacer.h
new file mode 100644 (file)
index 0000000..90d0c4f
--- /dev/null
+++ b/pacer.h
@@ -0,0 +1,28 @@
+#ifndef _PACER_H
+#define _PACER_H 1
+
+#include <stdint.h>
+#include <sys/time.h>
+#include <deque>
+#include <string>
+
+#include "protocol.h"
+#include "reorderer.h"
+
+struct Pacer : public Sender {
+public:
+       Pacer(Sender *sender, int max_rate_kbit_per_sec, int burst_num_packets);
+       virtual void send_packet(uint16_t proto, const std::string &data, int incoming_seq);
+
+       void possibly_adjust_tv(timeval *tv);
+       void possibly_flush_packets();
+
+private:
+       Sender *sender;
+       int burst_num_packets;
+       double seconds_per_byte;
+       timeval next_send_packet;
+       std::deque<GREPacket> waiting_packets;
+};
+
+#endif  // !defined(_PACER_H)
index b9ad8f143b743b9e021422fbadf3efdf00001003..ea736fbc2fa600d78738da8c750e0ebd6a1fb77c 100644 (file)
@@ -9,6 +9,7 @@
 #include "reorderer.h"
 #include "tunprotocol.h"
 #include "rsdecoder.h"
+#include "pacer.h"
 
 using namespace std;
 
@@ -26,9 +27,11 @@ int main(int argc, char **argv)
        in6_addr myaddr = get_addr(argv[1]);
        in6_addr remoteaddr = get_addr(argv[2]);
        GREProtocol gre(myaddr, remoteaddr);
+       Pacer gre_pacer(&gre, 40000, 6);
        TUNProtocol tun("tungre");
 
-       Reorderer tun_reorderer(&tun);
+       Pacer tun_pacer(&tun, 40000, 6);
+       Reorderer tun_reorderer(&tun_pacer);
        RSDecoder tun_decoder(&tun_reorderer);
 
        fd_set fds;
@@ -46,7 +49,7 @@ int main(int argc, char **argv)
                        gre.read_packet(&tun_decoder);
                }
                if (FD_ISSET(tun.fd(), &fds)) {
-                       tun.read_packet(&gre);
+                       tun.read_packet(&gre_pacer);
                }
        }
 }