xref: /libuv/test/test-poll.c (revision 8a499e13)
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include <errno.h>
23 
24 #ifdef _WIN32
25 # include <fcntl.h>
26 # define close _close
27 #else
28 # include <sys/socket.h>
29 # include <unistd.h>
30 #endif
31 
32 #include "uv.h"
33 #include "task.h"
34 
35 #ifdef __linux__
36 # include <sys/epoll.h>
37 #endif
38 
39 #ifdef UV_HAVE_KQUEUE
40 # include <sys/types.h>
41 # include <sys/event.h>
42 # include <sys/time.h>
43 #endif
44 
45 
46 #define NUM_CLIENTS 5
47 #define TRANSFER_BYTES (1 << 16)
48 
49 #undef MIN
50 #define MIN(a, b) (((a) < (b)) ? (a) : (b));
51 
52 
53 typedef enum {
54   UNIDIRECTIONAL,
55   DUPLEX
56 } test_mode_t;
57 
58 typedef struct connection_context_s {
59   uv_poll_t poll_handle;
60   uv_timer_t timer_handle;
61   uv_os_sock_t sock;
62   size_t read, sent;
63   int is_server_connection;
64   int open_handles;
65   int got_fin, sent_fin, got_disconnect;
66   unsigned int events, delayed_events;
67 } connection_context_t;
68 
69 typedef struct server_context_s {
70   uv_poll_t poll_handle;
71   uv_os_sock_t sock;
72   int connections;
73 } server_context_t;
74 
75 
76 static void delay_timer_cb(uv_timer_t* timer);
77 
78 
79 static test_mode_t test_mode = DUPLEX;
80 
81 static int closed_connections = 0;
82 
83 static int valid_writable_wakeups = 0;
84 static int spurious_writable_wakeups = 0;
85 
86 #if !defined(__sun) && !defined(_AIX) && !defined(__MVS__)
87 static int disconnects = 0;
88 #endif /* !__sun && !_AIX  && !__MVS__ */
89 
got_eagain(void)90 static int got_eagain(void) {
91 #ifdef _WIN32
92   return WSAGetLastError() == WSAEWOULDBLOCK;
93 #else
94   return errno == EAGAIN
95       || errno == EINPROGRESS
96 #ifdef EWOULDBLOCK
97       || errno == EWOULDBLOCK;
98 #endif
99       ;
100 #endif
101 }
102 
103 
create_bound_socket(struct sockaddr_in bind_addr)104 static uv_os_sock_t create_bound_socket (struct sockaddr_in bind_addr) {
105   uv_os_sock_t sock;
106   int r;
107 
108   sock = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
109 #ifdef _WIN32
110   ASSERT_NE(sock, INVALID_SOCKET);
111 #else
112   ASSERT_GE(sock, 0);
113 #endif
114 
115 #ifndef _WIN32
116   {
117     /* Allow reuse of the port. */
118     int yes = 1;
119     r = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes);
120     ASSERT_OK(r);
121   }
122 #endif
123 
124   r = bind(sock, (const struct sockaddr*) &bind_addr, sizeof bind_addr);
125   ASSERT_OK(r);
126 
127   return sock;
128 }
129 
130 
close_socket(uv_os_sock_t sock)131 static void close_socket(uv_os_sock_t sock) {
132   int r;
133 #ifdef _WIN32
134   r = closesocket(sock);
135 #else
136   r = close(sock);
137 #endif
138   /* On FreeBSD close() can fail with ECONNRESET if the socket was shutdown by
139    * the peer before all pending data was delivered.
140    */
141   ASSERT(r == 0 || errno == ECONNRESET);
142 }
143 
144 
create_connection_context(uv_os_sock_t sock,int is_server_connection)145 static connection_context_t* create_connection_context(
146     uv_os_sock_t sock, int is_server_connection) {
147   int r;
148   connection_context_t* context;
149 
150   context = (connection_context_t*) malloc(sizeof *context);
151   ASSERT_NOT_NULL(context);
152 
153   context->sock = sock;
154   context->is_server_connection = is_server_connection;
155   context->read = 0;
156   context->sent = 0;
157   context->open_handles = 0;
158   context->events = 0;
159   context->delayed_events = 0;
160   context->got_fin = 0;
161   context->sent_fin = 0;
162   context->got_disconnect = 0;
163 
164   r = uv_poll_init_socket(uv_default_loop(), &context->poll_handle, sock);
165   context->open_handles++;
166   context->poll_handle.data = context;
167   ASSERT_OK(r);
168 
169   r = uv_timer_init(uv_default_loop(), &context->timer_handle);
170   context->open_handles++;
171   context->timer_handle.data = context;
172   ASSERT_OK(r);
173 
174   return context;
175 }
176 
177 
connection_close_cb(uv_handle_t * handle)178 static void connection_close_cb(uv_handle_t* handle) {
179   connection_context_t* context = (connection_context_t*) handle->data;
180 
181   if (--context->open_handles == 0) {
182     if (test_mode == DUPLEX || context->is_server_connection) {
183       ASSERT_EQ(context->read, TRANSFER_BYTES);
184     } else {
185       ASSERT_OK(context->read);
186     }
187 
188     if (test_mode == DUPLEX || !context->is_server_connection) {
189       ASSERT_EQ(context->sent, TRANSFER_BYTES);
190     } else {
191       ASSERT_OK(context->sent);
192     }
193 
194     closed_connections++;
195 
196     free(context);
197   }
198 }
199 
200 
destroy_connection_context(connection_context_t * context)201 static void destroy_connection_context(connection_context_t* context) {
202   uv_close((uv_handle_t*) &context->poll_handle, connection_close_cb);
203   uv_close((uv_handle_t*) &context->timer_handle, connection_close_cb);
204 }
205 
206 
connection_poll_cb(uv_poll_t * handle,int status,int events)207 static void connection_poll_cb(uv_poll_t* handle, int status, int events) {
208   connection_context_t* context = (connection_context_t*) handle->data;
209   unsigned int new_events;
210   int r;
211 
212   ASSERT_OK(status);
213   ASSERT(events & context->events);
214   ASSERT(!(events & ~context->events));
215 
216   new_events = context->events;
217 
218   if (events & UV_READABLE) {
219     int action = rand() % 7;
220 
221     switch (action) {
222       case 0:
223       case 1: {
224         /* Read a couple of bytes. */
225         static char buffer[74];
226 
227         do
228           r = recv(context->sock, buffer, sizeof buffer, 0);
229         while (r == -1 && errno == EINTR);
230         ASSERT_GE(r, 0);
231 
232         if (r > 0) {
233           context->read += r;
234         } else {
235           /* Got FIN. */
236           context->got_fin = 1;
237           new_events &= ~UV_READABLE;
238         }
239 
240         break;
241       }
242 
243       case 2:
244       case 3: {
245         /* Read until EAGAIN. */
246         static char buffer[931];
247 
248         for (;;) {
249           do
250             r = recv(context->sock, buffer, sizeof buffer, 0);
251           while (r == -1 && errno == EINTR);
252 
253           if (r <= 0)
254             break;
255 
256           context->read += r;
257         }
258 
259         if (r == 0) {
260           /* Got FIN. */
261           context->got_fin = 1;
262           new_events &= ~UV_READABLE;
263         } else {
264           ASSERT(got_eagain());
265         }
266 
267         break;
268       }
269 
270       case 4:
271         /* Ignore. */
272         break;
273 
274       case 5:
275         /* Stop reading for a while. Restart in timer callback. */
276         new_events &= ~UV_READABLE;
277         if (!uv_is_active((uv_handle_t*) &context->timer_handle)) {
278           context->delayed_events = UV_READABLE;
279           uv_timer_start(&context->timer_handle, delay_timer_cb, 10, 0);
280         } else {
281           context->delayed_events |= UV_READABLE;
282         }
283         break;
284 
285       case 6:
286         /* Fudge with the event mask. */
287         uv_poll_start(&context->poll_handle, UV_WRITABLE, connection_poll_cb);
288         uv_poll_start(&context->poll_handle, UV_READABLE, connection_poll_cb);
289         context->events = UV_READABLE;
290         break;
291 
292       default:
293         ASSERT(0);
294     }
295   }
296 
297   if (events & UV_WRITABLE) {
298     if (context->sent < TRANSFER_BYTES &&
299         !(test_mode == UNIDIRECTIONAL && context->is_server_connection)) {
300       /* We have to send more bytes. */
301       int action = rand() % 7;
302 
303       switch (action) {
304         case 0:
305         case 1: {
306           /* Send a couple of bytes. */
307           static char buffer[103];
308 
309           int send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer);
310           ASSERT_GT(send_bytes, 0);
311 
312           do
313             r = send(context->sock, buffer, send_bytes, 0);
314           while (r == -1 && errno == EINTR);
315 
316           if (r < 0) {
317             ASSERT(got_eagain());
318             spurious_writable_wakeups++;
319             break;
320           }
321 
322           ASSERT_GT(r, 0);
323           context->sent += r;
324           valid_writable_wakeups++;
325           break;
326         }
327 
328         case 2:
329         case 3: {
330           /* Send until EAGAIN. */
331           static char buffer[1234];
332 
333           int send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer);
334           ASSERT_GT(send_bytes, 0);
335 
336           do
337             r = send(context->sock, buffer, send_bytes, 0);
338           while (r == -1 && errno == EINTR);
339 
340           if (r < 0) {
341             ASSERT(got_eagain());
342             spurious_writable_wakeups++;
343             break;
344           }
345 
346           ASSERT_GT(r, 0);
347           valid_writable_wakeups++;
348           context->sent += r;
349 
350           while (context->sent < TRANSFER_BYTES) {
351             send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer);
352             ASSERT_GT(send_bytes, 0);
353 
354             do
355               r = send(context->sock, buffer, send_bytes, 0);
356             while (r == -1 && errno == EINTR);
357             ASSERT(r);
358 
359             if (r < 0) {
360               ASSERT(got_eagain());
361               break;
362             }
363 
364             context->sent += r;
365           }
366           break;
367         }
368 
369         case 4:
370           /* Ignore. */
371          break;
372 
373         case 5:
374           /* Stop sending for a while. Restart in timer callback. */
375           new_events &= ~UV_WRITABLE;
376           if (!uv_is_active((uv_handle_t*) &context->timer_handle)) {
377             context->delayed_events = UV_WRITABLE;
378             uv_timer_start(&context->timer_handle, delay_timer_cb, 100, 0);
379           } else {
380             context->delayed_events |= UV_WRITABLE;
381           }
382           break;
383 
384         case 6:
385           /* Fudge with the event mask. */
386           uv_poll_start(&context->poll_handle,
387                         UV_READABLE,
388                         connection_poll_cb);
389           uv_poll_start(&context->poll_handle,
390                         UV_WRITABLE,
391                         connection_poll_cb);
392           context->events = UV_WRITABLE;
393           break;
394 
395         default:
396           ASSERT(0);
397       }
398 
399     } else {
400       /* Nothing more to write. Send FIN. */
401       int r;
402 #ifdef _WIN32
403       r = shutdown(context->sock, SD_SEND);
404 #else
405       r = shutdown(context->sock, SHUT_WR);
406 #endif
407       ASSERT_OK(r);
408       context->sent_fin = 1;
409       new_events &= ~UV_WRITABLE;
410     }
411   }
412 #if !defined(__sun) && !defined(_AIX) && !defined(__MVS__)
413   if (events & UV_DISCONNECT) {
414     context->got_disconnect = 1;
415     ++disconnects;
416     new_events &= ~UV_DISCONNECT;
417   }
418 
419   if (context->got_fin && context->sent_fin && context->got_disconnect) {
420 #else /* __sun && _AIX  && __MVS__ */
421   if (context->got_fin && context->sent_fin) {
422 #endif /* !__sun && !_AIX && !__MVS__  */
423     /* Sent and received FIN. Close and destroy context. */
424     close_socket(context->sock);
425     destroy_connection_context(context);
426     context->events = 0;
427 
428   } else if (new_events != context->events) {
429     /* Poll mask changed. Call uv_poll_start again. */
430     context->events = new_events;
431     uv_poll_start(handle, new_events, connection_poll_cb);
432   }
433 
434   /* Assert that uv_is_active works correctly for poll handles. */
435   if (context->events != 0) {
436     ASSERT_EQ(1, uv_is_active((uv_handle_t*) handle));
437   } else {
438     ASSERT_OK(uv_is_active((uv_handle_t*) handle));
439   }
440 }
441 
442 
443 static void delay_timer_cb(uv_timer_t* timer) {
444   connection_context_t* context = (connection_context_t*) timer->data;
445   int r;
446 
447   /* Timer should auto stop. */
448   ASSERT_OK(uv_is_active((uv_handle_t*) timer));
449 
450   /* Add the requested events to the poll mask. */
451   ASSERT(context->delayed_events != 0);
452   context->events |= context->delayed_events;
453   context->delayed_events = 0;
454 
455   r = uv_poll_start(&context->poll_handle,
456                     context->events,
457                     connection_poll_cb);
458   ASSERT_OK(r);
459 }
460 
461 
462 static server_context_t* create_server_context(
463     uv_os_sock_t sock) {
464   int r;
465   server_context_t* context;
466 
467   context = (server_context_t*) malloc(sizeof *context);
468   ASSERT_NOT_NULL(context);
469 
470   context->sock = sock;
471   context->connections = 0;
472 
473   r = uv_poll_init_socket(uv_default_loop(), &context->poll_handle, sock);
474   context->poll_handle.data = context;
475   ASSERT_OK(r);
476 
477   return context;
478 }
479 
480 
481 static void server_close_cb(uv_handle_t* handle) {
482   server_context_t* context = (server_context_t*) handle->data;
483   free(context);
484 }
485 
486 
487 static void destroy_server_context(server_context_t* context) {
488   uv_close((uv_handle_t*) &context->poll_handle, server_close_cb);
489 }
490 
491 
492 static void server_poll_cb(uv_poll_t* handle, int status, int events) {
493   server_context_t* server_context = (server_context_t*)
494                                           handle->data;
495   connection_context_t* connection_context;
496   struct sockaddr_in addr;
497   socklen_t addr_len;
498   uv_os_sock_t sock;
499   int r;
500 
501   addr_len = sizeof addr;
502   sock = accept(server_context->sock, (struct sockaddr*) &addr, &addr_len);
503 #ifdef _WIN32
504   ASSERT_NE(sock, INVALID_SOCKET);
505 #else
506   ASSERT_GE(sock, 0);
507 #endif
508 
509   connection_context = create_connection_context(sock, 1);
510   connection_context->events = UV_READABLE | UV_WRITABLE | UV_DISCONNECT;
511   r = uv_poll_start(&connection_context->poll_handle,
512                     UV_READABLE | UV_WRITABLE | UV_DISCONNECT,
513                     connection_poll_cb);
514   ASSERT_OK(r);
515 
516   if (++server_context->connections == NUM_CLIENTS) {
517     close_socket(server_context->sock);
518     destroy_server_context(server_context);
519   }
520 }
521 
522 
523 static void start_server(void) {
524   server_context_t* context;
525   struct sockaddr_in addr;
526   uv_os_sock_t sock;
527   int r;
528 
529   ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
530   sock = create_bound_socket(addr);
531   context = create_server_context(sock);
532 
533   r = listen(sock, 100);
534   ASSERT_OK(r);
535 
536   r = uv_poll_start(&context->poll_handle, UV_READABLE, server_poll_cb);
537   ASSERT_OK(r);
538 }
539 
540 
541 static void start_client(void) {
542   uv_os_sock_t sock;
543   connection_context_t* context;
544   struct sockaddr_in server_addr;
545   struct sockaddr_in addr;
546   int r;
547 
548   ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &server_addr));
549   ASSERT_OK(uv_ip4_addr("0.0.0.0", 0, &addr));
550 
551   sock = create_bound_socket(addr);
552   context = create_connection_context(sock, 0);
553 
554   context->events = UV_READABLE | UV_WRITABLE | UV_DISCONNECT;
555   r = uv_poll_start(&context->poll_handle,
556                     UV_READABLE | UV_WRITABLE | UV_DISCONNECT,
557                     connection_poll_cb);
558   ASSERT_OK(r);
559 
560   r = connect(sock, (struct sockaddr*) &server_addr, sizeof server_addr);
561   ASSERT(r == 0 || got_eagain());
562 }
563 
564 
565 static void start_poll_test(void) {
566   int i, r;
567 
568 #ifdef _WIN32
569   {
570     struct WSAData wsa_data;
571     int r = WSAStartup(MAKEWORD(2, 2), &wsa_data);
572     ASSERT_OK(r);
573   }
574 #endif
575 
576   start_server();
577 
578   for (i = 0; i < NUM_CLIENTS; i++)
579     start_client();
580 
581   r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
582   ASSERT_OK(r);
583 
584   /* Assert that at most five percent of the writable wakeups was spurious. */
585   ASSERT_NE(spurious_writable_wakeups == 0 ||
586             (valid_writable_wakeups + spurious_writable_wakeups) /
587             spurious_writable_wakeups > 20, 0);
588 
589   ASSERT_EQ(closed_connections, NUM_CLIENTS * 2);
590 #if !defined(__sun) && !defined(_AIX) && !defined(__MVS__)
591   ASSERT_EQ(disconnects, NUM_CLIENTS * 2);
592 #endif
593   MAKE_VALGRIND_HAPPY(uv_default_loop());
594 }
595 
596 
597 /* Issuing a shutdown() on IBM i PASE with parameter SHUT_WR
598  * also sends a normal close sequence to the partner program.
599  * This leads to timing issues and ECONNRESET failures in the
600  * test 'poll_duplex' and 'poll_unidirectional'.
601  *
602  * https://www.ibm.com/support/knowledgecenter/en/ssw_ibm_i_74/apis/shutdn.htm
603  */
604 TEST_IMPL(poll_duplex) {
605 #if defined(NO_SELF_CONNECT)
606   RETURN_SKIP(NO_SELF_CONNECT);
607 #elif defined(__PASE__)
608   RETURN_SKIP("API shutdown() may lead to timing issue on IBM i PASE");
609 #endif
610   test_mode = DUPLEX;
611   start_poll_test();
612   return 0;
613 }
614 
615 
616 TEST_IMPL(poll_unidirectional) {
617 #if defined(NO_SELF_CONNECT)
618   RETURN_SKIP(NO_SELF_CONNECT);
619 #elif defined(__PASE__)
620   RETURN_SKIP("API shutdown() may lead to timing issue on IBM i PASE");
621 #endif
622   test_mode = UNIDIRECTIONAL;
623   start_poll_test();
624   return 0;
625 }
626 
627 
628 /* Windows won't let you open a directory so we open a file instead.
629  * OS X lets you poll a file so open the $PWD instead.  Both fail
630  * on Linux so it doesn't matter which one we pick.  Both succeed
631  * on FreeBSD, Solaris and AIX so skip the test on those platforms.
632  */
633 TEST_IMPL(poll_bad_fdtype) {
634 #if !defined(__DragonFly__) && !defined(__FreeBSD__) && !defined(__sun) && \
635     !defined(_AIX) && !defined(__MVS__) && \
636     !defined(__OpenBSD__) && !defined(__CYGWIN__) && !defined(__MSYS__) && \
637     !defined(__NetBSD__)
638   uv_poll_t poll_handle;
639   int fd;
640 
641 #if defined(_WIN32)
642   fd = _open("test/fixtures/empty_file", UV_FS_O_RDONLY);
643 #else
644   fd = open(".", UV_FS_O_RDONLY);
645 #endif
646   ASSERT_NE(fd, -1);
647   ASSERT_NE(0, uv_poll_init(uv_default_loop(), &poll_handle, fd));
648   ASSERT_OK(close(fd));
649 #endif
650 
651   MAKE_VALGRIND_HAPPY(uv_default_loop());
652   return 0;
653 }
654 
655 
656 #ifdef __linux__
657 TEST_IMPL(poll_nested_epoll) {
658   uv_poll_t poll_handle;
659   int fd;
660 
661   fd = epoll_create(1);
662   ASSERT_NE(fd, -1);
663 
664   ASSERT_OK(uv_poll_init(uv_default_loop(), &poll_handle, fd));
665   ASSERT_OK(uv_poll_start(&poll_handle, UV_READABLE, (uv_poll_cb) abort));
666   ASSERT_NE(0, uv_run(uv_default_loop(), UV_RUN_NOWAIT));
667 
668   uv_close((uv_handle_t*) &poll_handle, NULL);
669   ASSERT_OK(uv_run(uv_default_loop(), UV_RUN_DEFAULT));
670   ASSERT_OK(close(fd));
671 
672   MAKE_VALGRIND_HAPPY(uv_default_loop());
673   return 0;
674 }
675 #endif  /* __linux__ */
676 
677 
678 #ifdef UV_HAVE_KQUEUE
679 TEST_IMPL(poll_nested_kqueue) {
680   uv_poll_t poll_handle;
681   int fd;
682 
683   fd = kqueue();
684   ASSERT_NE(fd, -1);
685 
686   ASSERT_OK(uv_poll_init(uv_default_loop(), &poll_handle, fd));
687   ASSERT_OK(uv_poll_start(&poll_handle, UV_READABLE, (uv_poll_cb) abort));
688   ASSERT_NE(0, uv_run(uv_default_loop(), UV_RUN_NOWAIT));
689 
690   uv_close((uv_handle_t*) &poll_handle, NULL);
691   ASSERT_OK(uv_run(uv_default_loop(), UV_RUN_DEFAULT));
692   ASSERT_OK(close(fd));
693 
694   MAKE_VALGRIND_HAPPY(uv_default_loop());
695   return 0;
696 }
697 #endif  /* UV_HAVE_KQUEUE */
698