]> git.sesse.net Git - jam/blob - jam.c
Add a Pareto random number generator.
[jam] / jam.c
1 #include <stdio.h>
2 #include <math.h>
3 #include <string.h>
4 #include <stdlib.h>
5 #include <getopt.h>
6 #include <netdb.h>
7 #include <unistd.h>
8 #include <pthread.h>
9 #include <sys/socket.h>
10 #include <netinet/in.h>
11
12 unsigned short port = 2007;
13
14 struct in_addr *destinations = NULL;
15 unsigned num_destinations = 0;
16 unsigned room_destinations = 0;
17
18 struct in_addr *sources = NULL;
19 unsigned num_sources = 0;
20 unsigned room_sources = 0;
21
22 unsigned num_senders = 128;
23
24 const static struct option longopts[] = {
25         { "source-list", required_argument, NULL, 's' },
26         { "destination-list", required_argument, NULL, 'd' },
27         { "num-senders", required_argument, NULL, 'n' },
28         { "port", required_argument, NULL, 'p' },
29         { NULL, 0, NULL, 0 }
30 };
31
32 double gen_pareto_random(double min, double k)
33 {
34         double u = rand() / (RAND_MAX+1.0);
35         return min * pow(u, -1.0 / k);
36 }
37
38 void read_ip_list(char *filename, struct in_addr **addr_list, unsigned *num, unsigned *room)
39 {
40         char buf[256];
41         FILE *in = fopen(filename, "r");
42         if (in == NULL) {
43                 perror(filename);
44                 exit(1);
45         }
46
47         for ( ;; ) {
48                 char *ptr;
49                 struct in_addr addr;
50                 struct hostent *he;
51
52                 if (fgets(buf, 256, in) == NULL)
53                         break;
54
55                 ptr = strchr(buf, '\n');
56                 if (ptr != NULL)
57                         *ptr = 0;
58
59                 ptr = strchr(buf, '\r');
60                 if (ptr != NULL)
61                         *ptr = 0;
62                 
63                 ptr = buf + strspn(buf, " \t");
64
65                 if (ptr[0] == '#' || ptr[0] == 0)
66                         continue;
67
68                 he = gethostbyname(ptr);
69                 if (he == NULL) {
70                         perror(ptr);
71                         exit(1);
72                 }
73
74                 // just pick the first for now
75                 memcpy(&addr.s_addr, he->h_addr_list[0], sizeof(addr.s_addr));
76
77                 if (*num >= *room) {
78                         if (*room == 0) {
79                                 *room = 16;
80                         } else {
81                                 *room <<= 1;
82                         }
83                         *addr_list = (struct in_addr *)realloc(*addr_list, *room * sizeof(struct in_addr));
84                 }
85
86                 (*addr_list)[*num] = addr;
87                 ++*num;
88         }
89
90         fclose(in);
91 }
92
93 void parse_options(int argc, char **argv)
94 {
95         int option_index = 0;
96
97         for ( ;; ) {
98                 int c = getopt_long(argc, argv, "s:d:n:p:", longopts, &option_index); 
99                 switch (c) {
100                 case 's':
101                         read_ip_list(optarg, &sources, &num_sources, &room_sources);
102                         break;
103                 case 'd':
104                         read_ip_list(optarg, &destinations, &num_destinations, &room_destinations);
105                         break;
106                 case 'n':
107                         num_senders = atoi(optarg);
108                         break;
109                 case 'p':
110                         port = atoi(optarg);
111                         break;
112                 case -1:
113                         return;       // end of argument list
114                 default:
115                         fprintf(stderr, "Invalid option\n");
116                         exit(1);
117                 }
118         }
119 }
120
121 void *sender_worker(void *arg)
122 {
123         unsigned i;
124
125         printf("Dummy sender worker\n");
126
127         for (i = 0; i < 1000; ++i) {
128                 unsigned bytes = (unsigned)gen_pareto_random(1048576.0, 1.0);
129                 printf("Sending %u bytes\n", bytes);
130         }
131
132         pthread_exit(0);
133 }
134
135 void *receiver_worker(void *arg)
136 {
137         int sock = (int)arg;
138         char buf[65536];
139
140         printf("Received worker for socket %u\n", sock);
141
142         for ( ;; ) {
143                 int ret = read(sock, buf, 65536);
144                 if (ret == 0)
145                         break;
146
147                 // FIXME: update stats here
148         }
149
150         printf("Socket %u done\n", sock);
151         
152         if (close(sock) == -1) {
153                 perror("close()");
154                 exit(1);
155         }
156
157         pthread_exit(0);
158 }
159
160 int get_server_socket(unsigned short port)
161 {
162         int server_sock;
163         struct sockaddr_in sin;
164         unsigned one = 1;
165
166         server_sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
167         if (server_sock == -1) {
168                 perror("socket()");
169                 exit(1);
170         }
171
172         if (setsockopt(server_sock, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) == -1) {
173                 perror("setsocket(SO_REUSEADDR)");
174                 exit(1);
175         }
176
177         sin.sin_family = AF_INET;
178         sin.sin_port = htons(port);
179         sin.sin_addr.s_addr = INADDR_ANY;
180
181         if (bind(server_sock, (struct sockaddr *)&sin, sizeof(struct sockaddr)) == -1) {
182                 perror("bind()");
183                 exit(1);
184         }
185
186         if (listen(server_sock, 255) == -1) {
187                 perror("listen()");
188                 exit(1);
189         }
190
191         return server_sock;
192 }
193
194 int main(int argc, char **argv)
195 {
196         int server_sock;
197         unsigned i;
198         pthread_attr_t attr;
199
200         parse_options(argc, argv);
201
202         if (destinations == NULL || sources == NULL) {
203                 fprintf(stderr, "Missing or empty source or destination host list. Aborting.\n");
204         }
205
206         server_sock = get_server_socket(port);
207         
208         printf("Sending data on port %u from %u sources to %u destinations.\n\n",
209                 port, num_sources, num_destinations);
210                 
211         // FIXME: these do not really set errno
212         if (pthread_attr_init(&attr) != 0) {
213                 perror("pthread_attr_init()");
214                 exit(1);
215         }
216
217         if (pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN + 65536 + 0x4000) != 0) {
218                 perror("pthread_attr_setstacksize");
219                 exit(1);
220         }
221
222         // Fire off sender workers.
223         for (i = 0; i < num_senders; ++i) {
224                 pthread_t thread;
225                 
226                 if (pthread_create(&thread, &attr, sender_worker, NULL) != 0) {
227                         perror("pthread_create()");
228                         exit(1);
229                 }
230         }
231
232         /*
233          * Listen for incoming connections, spawning off one receiver
234          * thread for each (which will just gobble up the data until
235          * we're done).
236          */
237         for ( ;; ) {
238                 struct sockaddr_in addr;
239                 socklen_t addr_len = sizeof(addr);
240                 pthread_t thread;
241
242                 int sock = accept(server_sock, (struct sockaddr *)&addr, &addr_len);
243                 if (sock == -1) {
244                         perror("accept()");
245                         exit(1);
246                 }
247
248                 if (pthread_create(&thread, &attr, receiver_worker, (void *)sock) != 0) {
249                         perror("pthread_create()");
250                         exit(1);
251                 }
252         }
253
254         exit(0);
255 }