CXXFLAGS=-std=gnu++11 -O2 -g
LDLIBS=-lfecpp
-TUNGRE_OBJS=tungre.o greprotocol.o reorderer.o tunprotocol.o rsdecoder.o rsencoder.o
-GREPROXY_OBJS=greproxy.o greprotocol.o reorderer.o tunprotocol.o rsdecoder.o rsencoder.o
+TUNGRE_OBJS=tungre.o greprotocol.o reorderer.o tunprotocol.o rsdecoder.o rsencoder.o pacer.o
+GREPROXY_OBJS=greproxy.o greprotocol.o reorderer.o tunprotocol.o rsdecoder.o rsencoder.o pacer.o
all: tungre greproxy
tungre: $(TUNGRE_OBJS)
#include "greprotocol.h"
#include "reorderer.h"
#include "rsencoder.h"
+#include "pacer.h"
using namespace std;
in6_addr myaddr = get_addr(argv[3]);
GREProtocol gre_a(myaddr, addr_a);
GREProtocol gre_b(myaddr, addr_b);
- RSEncoder rs_a(&gre_a);
- RSEncoder rs_b(&gre_b);
+ Pacer pacer_a(&gre_a, 40000, 6);
+ Pacer pacer_b(&gre_b, 40000, 6);
+ RSEncoder rs_a(&pacer_a);
+ RSEncoder rs_b(&pacer_b);
Reorderer reorder_a(&rs_a);
Reorderer reorder_b(&rs_b);
fd_set fds;
FD_ZERO(&fds);
for ( ;; ) {
+ timeval tv = { 1, 0 };
FD_SET(gre_a.fd(), &fds);
FD_SET(gre_b.fd(), &fds);
- int ret = select(1024, &fds, NULL, NULL, NULL);
+ pacer_a.possibly_adjust_tv(&tv);
+ pacer_b.possibly_adjust_tv(&tv);
+ int ret = select(1024, &fds, NULL, NULL, &tv);
if (ret == -1) {
perror("select");
continue;
if (FD_ISSET(gre_b.fd(), &fds)) {
gre_b.read_packet(&reorder_a);
}
+ pacer_a.possibly_flush_packets();
+ pacer_b.possibly_flush_packets();
}
}
--- /dev/null
+#include "pacer.h"
+
+#include <algorithm>
+
+using namespace std;
+
+Pacer::Pacer(Sender *sender, int max_rate_kbit_per_sec, int burst_num_packets)
+ : sender(sender), burst_num_packets(burst_num_packets), next_send_packet{0, 0}
+{
+ const int max_rate_byte_per_sec = 1000 * max_rate_kbit_per_sec / 8;
+ seconds_per_byte = 1.0 / max_rate_byte_per_sec;
+}
+
+bool less_than(const timeval &a, const timeval &b)
+{
+ return make_pair(a.tv_sec, a.tv_usec) < make_pair(b.tv_sec, b.tv_usec);
+}
+
+void Pacer::send_packet(uint16_t proto, const string &data, int incoming_seq)
+{
+ waiting_packets.push_back(GREPacket{incoming_seq, proto, data, {0, 0}});
+ possibly_flush_packets();
+}
+
+void Pacer::possibly_adjust_tv(timeval *tv)
+{
+ if (waiting_packets.empty()) {
+ return;
+ }
+
+ timeval now;
+ gettimeofday(&now, NULL);
+ if (less_than(next_send_packet, now)) {
+ // Should send packets immediately.
+ tv->tv_sec = tv->tv_usec = 0;
+ return;
+ }
+
+ timeval tdiff;
+ tdiff.tv_sec = next_send_packet.tv_sec - now.tv_sec;
+ tdiff.tv_usec = next_send_packet.tv_usec - now.tv_usec;
+ if (tdiff.tv_usec < 0) {
+ tdiff.tv_usec += 1000000;
+ --tdiff.tv_sec;
+ }
+
+ if (less_than(tdiff, *tv)) {
+ *tv = tdiff;
+ }
+}
+
+void Pacer::possibly_flush_packets()
+{
+ if (waiting_packets.empty()) {
+ return;
+ }
+
+ timeval now;
+ gettimeofday(&now, NULL);
+ if (less_than(now, next_send_packet)) {
+ return;
+ }
+
+ int bytes_sent = 0;
+ for (int i = 0; i < burst_num_packets && !waiting_packets.empty(); ++i) {
+ const GREPacket &packet = waiting_packets.front();
+ sender->send_packet(packet.proto, packet.data, packet.seq);
+ bytes_sent += packet.data.size();
+ waiting_packets.pop_front();
+ }
+
+ int usec_to_add = lrint(1e6 * seconds_per_byte * bytes_sent);
+ int sec_to_add = usec_to_add / 1000000;
+ usec_to_add %= 1000000;
+
+ next_send_packet = now;
+ next_send_packet.tv_usec += usec_to_add;
+ next_send_packet.tv_sec += sec_to_add;
+ next_send_packet.tv_sec += next_send_packet.tv_usec / 1000000;
+ next_send_packet.tv_usec %= 1000000;
+}
+
--- /dev/null
+#ifndef _PACER_H
+#define _PACER_H 1
+
+#include <stdint.h>
+#include <sys/time.h>
+#include <deque>
+#include <string>
+
+#include "protocol.h"
+#include "reorderer.h"
+
+struct Pacer : public Sender {
+public:
+ Pacer(Sender *sender, int max_rate_kbit_per_sec, int burst_num_packets);
+ virtual void send_packet(uint16_t proto, const std::string &data, int incoming_seq);
+
+ void possibly_adjust_tv(timeval *tv);
+ void possibly_flush_packets();
+
+private:
+ Sender *sender;
+ int burst_num_packets;
+ double seconds_per_byte;
+ timeval next_send_packet;
+ std::deque<GREPacket> waiting_packets;
+};
+
+#endif // !defined(_PACER_H)
#include "reorderer.h"
#include "tunprotocol.h"
#include "rsdecoder.h"
+#include "pacer.h"
using namespace std;
in6_addr myaddr = get_addr(argv[1]);
in6_addr remoteaddr = get_addr(argv[2]);
GREProtocol gre(myaddr, remoteaddr);
+ Pacer gre_pacer(&gre, 40000, 6);
TUNProtocol tun("tungre");
- Reorderer tun_reorderer(&tun);
+ Pacer tun_pacer(&tun, 40000, 6);
+ Reorderer tun_reorderer(&tun_pacer);
RSDecoder tun_decoder(&tun_reorderer);
fd_set fds;
gre.read_packet(&tun_decoder);
}
if (FD_ISSET(tun.fd(), &fds)) {
- tun.read_packet(&gre);
+ tun.read_packet(&gre_pacer);
}
}
}