From 45fc511092347489c5cb6ed76cd5fba593b93fa6 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Sun, 8 Feb 2015 21:50:40 +0100 Subject: [PATCH] Add some packet pacing, to reduce the burstiness many parts of the networking do not like. --- Makefile | 4 +-- greproxy.cpp | 14 +++++++-- pacer.cpp | 82 ++++++++++++++++++++++++++++++++++++++++++++++++++++ pacer.h | 28 ++++++++++++++++++ tungre.cpp | 7 +++-- 5 files changed, 128 insertions(+), 7 deletions(-) create mode 100644 pacer.cpp create mode 100644 pacer.h diff --git a/Makefile b/Makefile index fe742a6..b4634d8 100644 --- a/Makefile +++ b/Makefile @@ -1,8 +1,8 @@ CXXFLAGS=-std=gnu++11 -O2 -g LDLIBS=-lfecpp -TUNGRE_OBJS=tungre.o greprotocol.o reorderer.o tunprotocol.o rsdecoder.o rsencoder.o -GREPROXY_OBJS=greproxy.o greprotocol.o reorderer.o tunprotocol.o rsdecoder.o rsencoder.o +TUNGRE_OBJS=tungre.o greprotocol.o reorderer.o tunprotocol.o rsdecoder.o rsencoder.o pacer.o +GREPROXY_OBJS=greproxy.o greprotocol.o reorderer.o tunprotocol.o rsdecoder.o rsencoder.o pacer.o all: tungre greproxy tungre: $(TUNGRE_OBJS) diff --git a/greproxy.cpp b/greproxy.cpp index 127e0a6..bf28295 100644 --- a/greproxy.cpp +++ b/greproxy.cpp @@ -8,6 +8,7 @@ #include "greprotocol.h" #include "reorderer.h" #include "rsencoder.h" +#include "pacer.h" using namespace std; @@ -27,17 +28,22 @@ int main(int argc, char **argv) in6_addr myaddr = get_addr(argv[3]); GREProtocol gre_a(myaddr, addr_a); GREProtocol gre_b(myaddr, addr_b); - RSEncoder rs_a(&gre_a); - RSEncoder rs_b(&gre_b); + Pacer pacer_a(&gre_a, 40000, 6); + Pacer pacer_b(&gre_b, 40000, 6); + RSEncoder rs_a(&pacer_a); + RSEncoder rs_b(&pacer_b); Reorderer reorder_a(&rs_a); Reorderer reorder_b(&rs_b); fd_set fds; FD_ZERO(&fds); for ( ;; ) { + timeval tv = { 1, 0 }; FD_SET(gre_a.fd(), &fds); FD_SET(gre_b.fd(), &fds); - int ret = select(1024, &fds, NULL, NULL, NULL); + pacer_a.possibly_adjust_tv(&tv); + pacer_b.possibly_adjust_tv(&tv); + int ret = select(1024, &fds, NULL, NULL, &tv); if (ret == -1) { perror("select"); continue; @@ -49,5 +55,7 @@ int main(int argc, char **argv) if (FD_ISSET(gre_b.fd(), &fds)) { gre_b.read_packet(&reorder_a); } + pacer_a.possibly_flush_packets(); + pacer_b.possibly_flush_packets(); } } diff --git a/pacer.cpp b/pacer.cpp new file mode 100644 index 0000000..b0683f4 --- /dev/null +++ b/pacer.cpp @@ -0,0 +1,82 @@ +#include "pacer.h" + +#include + +using namespace std; + +Pacer::Pacer(Sender *sender, int max_rate_kbit_per_sec, int burst_num_packets) + : sender(sender), burst_num_packets(burst_num_packets), next_send_packet{0, 0} +{ + const int max_rate_byte_per_sec = 1000 * max_rate_kbit_per_sec / 8; + seconds_per_byte = 1.0 / max_rate_byte_per_sec; +} + +bool less_than(const timeval &a, const timeval &b) +{ + return make_pair(a.tv_sec, a.tv_usec) < make_pair(b.tv_sec, b.tv_usec); +} + +void Pacer::send_packet(uint16_t proto, const string &data, int incoming_seq) +{ + waiting_packets.push_back(GREPacket{incoming_seq, proto, data, {0, 0}}); + possibly_flush_packets(); +} + +void Pacer::possibly_adjust_tv(timeval *tv) +{ + if (waiting_packets.empty()) { + return; + } + + timeval now; + gettimeofday(&now, NULL); + if (less_than(next_send_packet, now)) { + // Should send packets immediately. + tv->tv_sec = tv->tv_usec = 0; + return; + } + + timeval tdiff; + tdiff.tv_sec = next_send_packet.tv_sec - now.tv_sec; + tdiff.tv_usec = next_send_packet.tv_usec - now.tv_usec; + if (tdiff.tv_usec < 0) { + tdiff.tv_usec += 1000000; + --tdiff.tv_sec; + } + + if (less_than(tdiff, *tv)) { + *tv = tdiff; + } +} + +void Pacer::possibly_flush_packets() +{ + if (waiting_packets.empty()) { + return; + } + + timeval now; + gettimeofday(&now, NULL); + if (less_than(now, next_send_packet)) { + return; + } + + int bytes_sent = 0; + for (int i = 0; i < burst_num_packets && !waiting_packets.empty(); ++i) { + const GREPacket &packet = waiting_packets.front(); + sender->send_packet(packet.proto, packet.data, packet.seq); + bytes_sent += packet.data.size(); + waiting_packets.pop_front(); + } + + int usec_to_add = lrint(1e6 * seconds_per_byte * bytes_sent); + int sec_to_add = usec_to_add / 1000000; + usec_to_add %= 1000000; + + next_send_packet = now; + next_send_packet.tv_usec += usec_to_add; + next_send_packet.tv_sec += sec_to_add; + next_send_packet.tv_sec += next_send_packet.tv_usec / 1000000; + next_send_packet.tv_usec %= 1000000; +} + diff --git a/pacer.h b/pacer.h new file mode 100644 index 0000000..90d0c4f --- /dev/null +++ b/pacer.h @@ -0,0 +1,28 @@ +#ifndef _PACER_H +#define _PACER_H 1 + +#include +#include +#include +#include + +#include "protocol.h" +#include "reorderer.h" + +struct Pacer : public Sender { +public: + Pacer(Sender *sender, int max_rate_kbit_per_sec, int burst_num_packets); + virtual void send_packet(uint16_t proto, const std::string &data, int incoming_seq); + + void possibly_adjust_tv(timeval *tv); + void possibly_flush_packets(); + +private: + Sender *sender; + int burst_num_packets; + double seconds_per_byte; + timeval next_send_packet; + std::deque waiting_packets; +}; + +#endif // !defined(_PACER_H) diff --git a/tungre.cpp b/tungre.cpp index b9ad8f1..ea736fb 100644 --- a/tungre.cpp +++ b/tungre.cpp @@ -9,6 +9,7 @@ #include "reorderer.h" #include "tunprotocol.h" #include "rsdecoder.h" +#include "pacer.h" using namespace std; @@ -26,9 +27,11 @@ int main(int argc, char **argv) in6_addr myaddr = get_addr(argv[1]); in6_addr remoteaddr = get_addr(argv[2]); GREProtocol gre(myaddr, remoteaddr); + Pacer gre_pacer(&gre, 40000, 6); TUNProtocol tun("tungre"); - Reorderer tun_reorderer(&tun); + Pacer tun_pacer(&tun, 40000, 6); + Reorderer tun_reorderer(&tun_pacer); RSDecoder tun_decoder(&tun_reorderer); fd_set fds; @@ -46,7 +49,7 @@ int main(int argc, char **argv) gre.read_packet(&tun_decoder); } if (FD_ISSET(tun.fd(), &fds)) { - tun.read_packet(&gre); + tun.read_packet(&gre_pacer); } } } -- 2.39.2