xref: /libuv/test/benchmark-pump.c (revision d8669609)
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include "task.h"
23 #include "uv.h"
24 
25 #include <math.h>
26 #include <stdio.h>
27 
28 
29 static int TARGET_CONNECTIONS;
30 #define WRITE_BUFFER_SIZE           8192
31 #define MAX_SIMULTANEOUS_CONNECTS   100
32 
33 #define PRINT_STATS                 0
34 #define STATS_INTERVAL              1000 /* msec */
35 #define STATS_COUNT                 5
36 
37 
38 static void do_write(uv_stream_t*);
39 static void maybe_connect_some(void);
40 
41 static uv_req_t* req_alloc(void);
42 static void req_free(uv_req_t* uv_req);
43 
44 static void buf_alloc(uv_handle_t* handle, size_t size, uv_buf_t* buf);
45 static void buf_free(const uv_buf_t* buf);
46 
47 static uv_loop_t* loop;
48 
49 static uv_tcp_t tcpServer;
50 static uv_pipe_t pipeServer;
51 static uv_stream_t* server;
52 static struct sockaddr_in listen_addr;
53 static struct sockaddr_in connect_addr;
54 
55 static int64_t start_time;
56 
57 static int max_connect_socket = 0;
58 static int max_read_sockets = 0;
59 static int read_sockets = 0;
60 static int write_sockets = 0;
61 
62 static int64_t nrecv = 0;
63 static int64_t nrecv_total = 0;
64 static int64_t nsent = 0;
65 static int64_t nsent_total = 0;
66 
67 static int stats_left = 0;
68 
69 static char write_buffer[WRITE_BUFFER_SIZE];
70 
71 /* Make this as large as you need. */
72 #define MAX_WRITE_HANDLES 1000
73 
74 static stream_type type;
75 
76 static uv_tcp_t tcp_write_handles[MAX_WRITE_HANDLES];
77 static uv_pipe_t pipe_write_handles[MAX_WRITE_HANDLES];
78 
79 static uv_timer_t timer_handle;
80 
81 
gbit(int64_t bytes,int64_t passed_ms)82 static double gbit(int64_t bytes, int64_t passed_ms) {
83   double gbits = ((double)bytes / (1024 * 1024 * 1024)) * 8;
84   return gbits / ((double)passed_ms / 1000);
85 }
86 
87 
show_stats(uv_timer_t * handle)88 static void show_stats(uv_timer_t* handle) {
89   int64_t diff;
90   int i;
91 
92 #if PRINT_STATS
93   fprintf(stderr, "connections: %d, write: %.1f gbit/s\n",
94           write_sockets,
95           gbit(nsent, STATS_INTERVAL));
96   fflush(stderr);
97 #endif
98 
99   /* Exit if the show is over */
100   if (!--stats_left) {
101 
102     uv_update_time(loop);
103     diff = uv_now(loop) - start_time;
104 
105     fprintf(stderr, "%s_pump%d_client: %.1f gbit/s\n",
106             type == TCP ? "tcp" : "pipe",
107             write_sockets,
108             gbit(nsent_total, diff));
109     fflush(stderr);
110 
111     for (i = 0; i < write_sockets; i++) {
112       if (type == TCP)
113         uv_close((uv_handle_t*) &tcp_write_handles[i], NULL);
114       else
115         uv_close((uv_handle_t*) &pipe_write_handles[i], NULL);
116     }
117 
118     exit(0);
119   }
120 
121   /* Reset read and write counters */
122   nrecv = 0;
123   nsent = 0;
124 }
125 
126 
read_show_stats(void)127 static void read_show_stats(void) {
128   int64_t diff;
129 
130   uv_update_time(loop);
131   diff = uv_now(loop) - start_time;
132 
133   fprintf(stderr, "%s_pump%d_server: %.1f gbit/s\n",
134           type == TCP ? "tcp" : "pipe",
135           max_read_sockets,
136           gbit(nrecv_total, diff));
137   fflush(stderr);
138 }
139 
140 
141 
read_sockets_close_cb(uv_handle_t * handle)142 static void read_sockets_close_cb(uv_handle_t* handle) {
143   free(handle);
144   read_sockets--;
145 
146   /* If it's past the first second and everyone has closed their connection
147    * Then print stats.
148    */
149   if (uv_now(loop) - start_time > 1000 && read_sockets == 0) {
150     read_show_stats();
151     uv_close((uv_handle_t*)server, NULL);
152   }
153 }
154 
155 
start_stats_collection(void)156 static void start_stats_collection(void) {
157   int r;
158 
159   /* Show-stats timer */
160   stats_left = STATS_COUNT;
161   r = uv_timer_init(loop, &timer_handle);
162   ASSERT_OK(r);
163   r = uv_timer_start(&timer_handle, show_stats, STATS_INTERVAL, STATS_INTERVAL);
164   ASSERT_OK(r);
165 
166   uv_update_time(loop);
167   start_time = uv_now(loop);
168 }
169 
170 
read_cb(uv_stream_t * stream,ssize_t bytes,const uv_buf_t * buf)171 static void read_cb(uv_stream_t* stream, ssize_t bytes, const uv_buf_t* buf) {
172   if (nrecv_total == 0) {
173     ASSERT_OK(start_time);
174     uv_update_time(loop);
175     start_time = uv_now(loop);
176   }
177 
178   if (bytes < 0) {
179     uv_close((uv_handle_t*)stream, read_sockets_close_cb);
180     return;
181   }
182 
183   buf_free(buf);
184 
185   nrecv += bytes;
186   nrecv_total += bytes;
187 }
188 
189 
write_cb(uv_write_t * req,int status)190 static void write_cb(uv_write_t* req, int status) {
191   ASSERT_OK(status);
192 
193   req_free((uv_req_t*) req);
194 
195   nsent += sizeof write_buffer;
196   nsent_total += sizeof write_buffer;
197 
198   do_write((uv_stream_t*) req->handle);
199 }
200 
201 
do_write(uv_stream_t * stream)202 static void do_write(uv_stream_t* stream) {
203   uv_write_t* req;
204   uv_buf_t buf;
205   int r;
206 
207   buf.base = (char*) &write_buffer;
208   buf.len = sizeof write_buffer;
209 
210   req = (uv_write_t*) req_alloc();
211   r = uv_write(req, stream, &buf, 1, write_cb);
212   ASSERT_OK(r);
213 }
214 
215 
connect_cb(uv_connect_t * req,int status)216 static void connect_cb(uv_connect_t* req, int status) {
217   int i;
218 
219   if (status) {
220     fprintf(stderr, "%s", uv_strerror(status));
221     fflush(stderr);
222   }
223   ASSERT_OK(status);
224 
225   write_sockets++;
226   req_free((uv_req_t*) req);
227 
228   maybe_connect_some();
229 
230   if (write_sockets == TARGET_CONNECTIONS) {
231     start_stats_collection();
232 
233     /* Yay! start writing */
234     for (i = 0; i < write_sockets; i++) {
235       if (type == TCP)
236         do_write((uv_stream_t*) &tcp_write_handles[i]);
237       else
238         do_write((uv_stream_t*) &pipe_write_handles[i]);
239     }
240   }
241 }
242 
243 
maybe_connect_some(void)244 static void maybe_connect_some(void) {
245   uv_connect_t* req;
246   uv_tcp_t* tcp;
247   uv_pipe_t* pipe;
248   int r;
249 
250   while (max_connect_socket < TARGET_CONNECTIONS &&
251          max_connect_socket < write_sockets + MAX_SIMULTANEOUS_CONNECTS) {
252     if (type == TCP) {
253       tcp = &tcp_write_handles[max_connect_socket++];
254 
255       r = uv_tcp_init(loop, tcp);
256       ASSERT_OK(r);
257 
258       req = (uv_connect_t*) req_alloc();
259       r = uv_tcp_connect(req,
260                          tcp,
261                          (const struct sockaddr*) &connect_addr,
262                          connect_cb);
263       ASSERT_OK(r);
264     } else {
265       pipe = &pipe_write_handles[max_connect_socket++];
266 
267       r = uv_pipe_init(loop, pipe, 0);
268       ASSERT_OK(r);
269 
270       req = (uv_connect_t*) req_alloc();
271       uv_pipe_connect(req, pipe, TEST_PIPENAME, connect_cb);
272     }
273   }
274 }
275 
276 
connection_cb(uv_stream_t * s,int status)277 static void connection_cb(uv_stream_t* s, int status) {
278   uv_stream_t* stream;
279   int r;
280 
281   ASSERT_PTR_EQ(server, s);
282   ASSERT_OK(status);
283 
284   if (type == TCP) {
285     stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
286     r = uv_tcp_init(loop, (uv_tcp_t*)stream);
287     ASSERT_OK(r);
288   } else {
289     stream = (uv_stream_t*)malloc(sizeof(uv_pipe_t));
290     r = uv_pipe_init(loop, (uv_pipe_t*)stream, 0);
291     ASSERT_OK(r);
292   }
293 
294   r = uv_accept(s, stream);
295   ASSERT_OK(r);
296 
297   r = uv_read_start(stream, buf_alloc, read_cb);
298   ASSERT_OK(r);
299 
300   read_sockets++;
301   max_read_sockets++;
302 }
303 
304 
305 /*
306  * Request allocator
307  */
308 
309 typedef struct req_list_s {
310   union uv_any_req uv_req;
311   struct req_list_s* next;
312 } req_list_t;
313 
314 
315 static req_list_t* req_freelist = NULL;
316 
317 
req_alloc(void)318 static uv_req_t* req_alloc(void) {
319   req_list_t* req;
320 
321   req = req_freelist;
322   if (req != NULL) {
323     req_freelist = req->next;
324     return (uv_req_t*) req;
325   }
326 
327   req = (req_list_t*) malloc(sizeof *req);
328   return (uv_req_t*) req;
329 }
330 
331 
req_free(uv_req_t * uv_req)332 static void req_free(uv_req_t* uv_req) {
333   req_list_t* req = (req_list_t*) uv_req;
334 
335   req->next = req_freelist;
336   req_freelist = req;
337 }
338 
339 
340 /*
341  * Buffer allocator
342  */
343 
344 typedef struct buf_list_s {
345   uv_buf_t uv_buf_t;
346   struct buf_list_s* next;
347 } buf_list_t;
348 
349 
350 static buf_list_t* buf_freelist = NULL;
351 
352 
buf_alloc(uv_handle_t * handle,size_t size,uv_buf_t * buf)353 static void buf_alloc(uv_handle_t* handle, size_t size, uv_buf_t* buf) {
354   buf_list_t* ab;
355 
356   ab = buf_freelist;
357   if (ab != NULL)
358     buf_freelist = ab->next;
359   else {
360     ab = malloc(size + sizeof(*ab));
361     ab->uv_buf_t.len = size;
362     ab->uv_buf_t.base = (char*) (ab + 1);
363   }
364 
365   *buf = ab->uv_buf_t;
366 }
367 
368 
buf_free(const uv_buf_t * buf)369 static void buf_free(const uv_buf_t* buf) {
370   buf_list_t* ab = (buf_list_t*) buf->base - 1;
371   ab->next = buf_freelist;
372   buf_freelist = ab;
373 }
374 
375 
HELPER_IMPL(tcp_pump_server)376 HELPER_IMPL(tcp_pump_server) {
377   int r;
378 
379   type = TCP;
380   loop = uv_default_loop();
381 
382   ASSERT_OK(uv_ip4_addr("0.0.0.0", TEST_PORT, &listen_addr));
383 
384   /* Server */
385   server = (uv_stream_t*)&tcpServer;
386   r = uv_tcp_init(loop, &tcpServer);
387   ASSERT_OK(r);
388   r = uv_tcp_bind(&tcpServer, (const struct sockaddr*) &listen_addr, 0);
389   ASSERT_OK(r);
390   r = uv_listen((uv_stream_t*)&tcpServer, MAX_WRITE_HANDLES, connection_cb);
391   ASSERT_OK(r);
392 
393   notify_parent_process();
394   uv_run(loop, UV_RUN_DEFAULT);
395 
396   return 0;
397 }
398 
399 
HELPER_IMPL(pipe_pump_server)400 HELPER_IMPL(pipe_pump_server) {
401   int r;
402   type = PIPE;
403 
404   loop = uv_default_loop();
405 
406   /* Server */
407   server = (uv_stream_t*)&pipeServer;
408   r = uv_pipe_init(loop, &pipeServer, 0);
409   ASSERT_OK(r);
410   r = uv_pipe_bind(&pipeServer, TEST_PIPENAME);
411   ASSERT_OK(r);
412   r = uv_listen((uv_stream_t*)&pipeServer, MAX_WRITE_HANDLES, connection_cb);
413   ASSERT_OK(r);
414 
415   notify_parent_process();
416   uv_run(loop, UV_RUN_DEFAULT);
417 
418   MAKE_VALGRIND_HAPPY(loop);
419   return 0;
420 }
421 
422 
tcp_pump(int n)423 static void tcp_pump(int n) {
424   ASSERT_LE(n, MAX_WRITE_HANDLES);
425   TARGET_CONNECTIONS = n;
426   type = TCP;
427 
428   loop = uv_default_loop();
429 
430   ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &connect_addr));
431 
432   /* Start making connections */
433   maybe_connect_some();
434 
435   uv_run(loop, UV_RUN_DEFAULT);
436 
437   MAKE_VALGRIND_HAPPY(loop);
438 }
439 
440 
pipe_pump(int n)441 static void pipe_pump(int n) {
442   ASSERT_LE(n, MAX_WRITE_HANDLES);
443   TARGET_CONNECTIONS = n;
444   type = PIPE;
445 
446   loop = uv_default_loop();
447 
448   /* Start making connections */
449   maybe_connect_some();
450 
451   uv_run(loop, UV_RUN_DEFAULT);
452 
453   MAKE_VALGRIND_HAPPY(loop);
454 }
455 
456 
BENCHMARK_IMPL(tcp_pump100_client)457 BENCHMARK_IMPL(tcp_pump100_client) {
458   tcp_pump(100);
459   return 0;
460 }
461 
462 
BENCHMARK_IMPL(tcp_pump1_client)463 BENCHMARK_IMPL(tcp_pump1_client) {
464   tcp_pump(1);
465   return 0;
466 }
467 
468 
BENCHMARK_IMPL(pipe_pump100_client)469 BENCHMARK_IMPL(pipe_pump100_client) {
470   pipe_pump(100);
471   return 0;
472 }
473 
474 
BENCHMARK_IMPL(pipe_pump1_client)475 BENCHMARK_IMPL(pipe_pump1_client) {
476   pipe_pump(1);
477   return 0;
478 }
479