]> git.sesse.net Git - greproxy/blobdiff - pacer.cpp
Add some packet pacing, to reduce the burstiness many parts of the networking do...
[greproxy] / pacer.cpp
diff --git a/pacer.cpp b/pacer.cpp
new file mode 100644 (file)
index 0000000..b0683f4
--- /dev/null
+++ b/pacer.cpp
@@ -0,0 +1,82 @@
+#include "pacer.h"
+
+#include <algorithm>
+
+using namespace std;
+
+Pacer::Pacer(Sender *sender, int max_rate_kbit_per_sec, int burst_num_packets)
+       : sender(sender), burst_num_packets(burst_num_packets), next_send_packet{0, 0}
+{
+       const int max_rate_byte_per_sec = 1000 * max_rate_kbit_per_sec / 8;
+       seconds_per_byte = 1.0 / max_rate_byte_per_sec;
+}
+
+bool less_than(const timeval &a, const timeval &b)
+{
+       return make_pair(a.tv_sec, a.tv_usec) < make_pair(b.tv_sec, b.tv_usec);
+}
+       
+void Pacer::send_packet(uint16_t proto, const string &data, int incoming_seq)
+{
+       waiting_packets.push_back(GREPacket{incoming_seq, proto, data, {0, 0}});
+       possibly_flush_packets();
+}
+
+void Pacer::possibly_adjust_tv(timeval *tv)
+{
+       if (waiting_packets.empty()) {
+               return;
+       }
+
+       timeval now;
+       gettimeofday(&now, NULL);
+       if (less_than(next_send_packet, now)) {
+               // Should send packets immediately.
+               tv->tv_sec = tv->tv_usec = 0;
+               return;
+       }
+
+       timeval tdiff;
+       tdiff.tv_sec = next_send_packet.tv_sec - now.tv_sec;
+       tdiff.tv_usec = next_send_packet.tv_usec - now.tv_usec;
+       if (tdiff.tv_usec < 0) {
+               tdiff.tv_usec += 1000000;
+               --tdiff.tv_sec;
+       }
+
+       if (less_than(tdiff, *tv)) {
+               *tv = tdiff;
+       }
+}
+
+void Pacer::possibly_flush_packets()
+{
+       if (waiting_packets.empty()) {
+               return;
+       }
+
+       timeval now;
+       gettimeofday(&now, NULL);
+       if (less_than(now, next_send_packet)) {
+               return;
+       }
+
+       int bytes_sent = 0;
+       for (int i = 0; i < burst_num_packets && !waiting_packets.empty(); ++i) {
+               const GREPacket &packet = waiting_packets.front();
+               sender->send_packet(packet.proto, packet.data, packet.seq);
+               bytes_sent += packet.data.size();
+               waiting_packets.pop_front();
+       }
+
+       int usec_to_add = lrint(1e6 * seconds_per_byte * bytes_sent);
+       int sec_to_add = usec_to_add / 1000000;
+       usec_to_add %= 1000000;
+
+       next_send_packet = now;
+       next_send_packet.tv_usec += usec_to_add;
+       next_send_packet.tv_sec += sec_to_add;
+       next_send_packet.tv_sec += next_send_packet.tv_usec / 1000000;
+       next_send_packet.tv_usec %= 1000000;
+}
+