xref: /libuv/test/test-udp-reuseport.c (revision ba24986f)
1 /* Copyright libuv project contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include "uv.h"
23 #include "task.h"
24 
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 
29 #if !defined(__linux__) && !defined(__FreeBSD__) && \
30     !defined(__DragonFly__) && !defined(__sun) && !defined(_AIX73)
31 
TEST_IMPL(udp_reuseport)32 TEST_IMPL(udp_reuseport) {
33   struct sockaddr_in addr1, addr2, addr3;
34   uv_loop_t* loop;
35   uv_udp_t handle1, handle2, handle3;
36   int r;
37 
38   ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &addr1));
39   ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT_2, &addr2));
40   ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT_3, &addr3));
41 
42   loop = uv_default_loop();
43   ASSERT_NOT_NULL(loop);
44 
45   r = uv_udp_init(loop, &handle1);
46   ASSERT_OK(r);
47 
48   r = uv_udp_bind(&handle1, (const struct sockaddr*) &addr1, UV_UDP_REUSEADDR);
49   ASSERT_OK(r);
50 
51   r = uv_udp_init(loop, &handle2);
52   ASSERT_OK(r);
53 
54   r = uv_udp_bind(&handle2, (const struct sockaddr*) &addr2, UV_UDP_REUSEPORT);
55   ASSERT_EQ(r, UV_ENOTSUP);
56 
57   r = uv_udp_init(loop, &handle3);
58   ASSERT_OK(r);
59 
60   /* For platforms where SO_REUSEPORTs don't have the capability of
61    * load balancing, specifying both UV_UDP_REUSEADDR and UV_UDP_REUSEPORT
62    * in flags will fail, returning an UV_ENOTSUP error. */
63   r = uv_udp_bind(&handle3, (const struct sockaddr*) &addr3,
64                   UV_UDP_REUSEADDR | UV_UDP_REUSEPORT);
65   ASSERT_EQ(r, UV_ENOTSUP);
66 
67   MAKE_VALGRIND_HAPPY(loop);
68 
69   return 0;
70 }
71 
72 #else
73 
74 #define NUM_RECEIVING_THREADS 2
75 #define MAX_UDP_DATAGRAMS 10
76 
77 static uv_udp_t udp_send_handles[MAX_UDP_DATAGRAMS];
78 static uv_udp_send_t udp_send_requests[MAX_UDP_DATAGRAMS];
79 
80 static uv_sem_t semaphore;
81 
82 static uv_mutex_t mutex;
83 static unsigned int received;
84 
85 static unsigned int thread_loop1_recv;
86 static unsigned int thread_loop2_recv;
87 static unsigned int sent;
88 
89 static uv_loop_t* main_loop;
90 static uv_loop_t thread_loop1;
91 static uv_loop_t thread_loop2;
92 static uv_udp_t thread_handle1;
93 static uv_udp_t thread_handle2;
94 static uv_timer_t thread_timer_handle1;
95 static uv_timer_t thread_timer_handle2;
96 
alloc_cb(uv_handle_t * handle,size_t suggested_size,uv_buf_t * buf)97 static void alloc_cb(uv_handle_t* handle,
98                      size_t suggested_size,
99                      uv_buf_t* buf) {
100   buf->base = malloc(suggested_size);
101   buf->len = (int) suggested_size;
102 }
103 
ticktack(uv_timer_t * timer)104 static void ticktack(uv_timer_t* timer) {
105   int done = 0;
106 
107   ASSERT(timer == &thread_timer_handle1 || timer == &thread_timer_handle2);
108 
109   uv_mutex_lock(&mutex);
110   if (received == MAX_UDP_DATAGRAMS) {
111     done = 1;
112   }
113   uv_mutex_unlock(&mutex);
114 
115   if (done) {
116     uv_close((uv_handle_t*) timer, NULL);
117     if (timer->loop == &thread_loop1)
118       uv_close((uv_handle_t*) &thread_handle1, NULL);
119     if (timer->loop == &thread_loop2)
120       uv_close((uv_handle_t*) &thread_handle2, NULL);
121   }
122 }
123 
on_recv(uv_udp_t * handle,ssize_t nr,const uv_buf_t * buf,const struct sockaddr * addr,unsigned flags)124 static void on_recv(uv_udp_t* handle,
125                     ssize_t nr,
126                     const uv_buf_t* buf,
127                     const struct sockaddr* addr,
128                     unsigned flags) {
129   ASSERT_OK(flags);
130   ASSERT(handle == &thread_handle1 || handle == &thread_handle2);
131 
132   ASSERT_GE(nr, 0);
133 
134   if (nr == 0) {
135     ASSERT_NULL(addr);
136     free(buf->base);
137     return;
138   }
139 
140   ASSERT_NOT_NULL(addr);
141   ASSERT_EQ(5, nr);
142   ASSERT(!memcmp("Hello", buf->base, nr));
143   free(buf->base);
144 
145   if (handle->loop == &thread_loop1)
146     thread_loop1_recv++;
147 
148   if (handle->loop == &thread_loop2)
149     thread_loop2_recv++;
150 
151   uv_mutex_lock(&mutex);
152   received++;
153   uv_mutex_unlock(&mutex);
154 }
155 
on_send(uv_udp_send_t * req,int status)156 static void on_send(uv_udp_send_t* req, int status) {
157   ASSERT_OK(status);
158   ASSERT_PTR_EQ(req->handle->loop, main_loop);
159 
160   if (++sent == MAX_UDP_DATAGRAMS)
161     uv_close((uv_handle_t*) req->handle, NULL);
162 }
163 
bind_socket_and_prepare_recv(uv_loop_t * loop,uv_udp_t * handle)164 static void bind_socket_and_prepare_recv(uv_loop_t* loop, uv_udp_t* handle) {
165   struct sockaddr_in addr;
166   int r;
167 
168   ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
169 
170   r = uv_udp_init(loop, handle);
171   ASSERT_OK(r);
172 
173   /* For platforms where SO_REUSEPORTs have the capability of
174    * load balancing, specifying both UV_UDP_REUSEADDR and
175    * UV_UDP_REUSEPORT in flags is allowed and SO_REUSEPORT will
176    * always override the behavior of SO_REUSEADDR. */
177   r = uv_udp_bind(handle, (const struct sockaddr*) &addr,
178                   UV_UDP_REUSEADDR | UV_UDP_REUSEPORT);
179   ASSERT_OK(r);
180 
181   r = uv_udp_recv_start(handle, alloc_cb, on_recv);
182   ASSERT_OK(r);
183 }
184 
run_event_loop(void * arg)185 static void run_event_loop(void* arg) {
186   int r;
187   uv_udp_t* handle;
188   uv_timer_t* timer;
189   uv_loop_t* loop = (uv_loop_t*) arg;
190   ASSERT(loop == &thread_loop1 || loop == &thread_loop2);
191 
192   if (loop == &thread_loop1) {
193     handle = &thread_handle1;
194     timer = &thread_timer_handle1;
195   } else {
196     handle = &thread_handle2;
197     timer = &thread_timer_handle2;
198   }
199 
200   bind_socket_and_prepare_recv(loop, handle);
201   r = uv_timer_init(loop, timer);
202   ASSERT_OK(r);
203   r = uv_timer_start(timer, ticktack, 0, 10);
204   ASSERT_OK(r);
205 
206   /* Notify the main thread to start sending data. */
207   uv_sem_post(&semaphore);
208   r = uv_run(loop, UV_RUN_DEFAULT);
209   ASSERT_OK(r);
210 }
211 
TEST_IMPL(udp_reuseport)212 TEST_IMPL(udp_reuseport) {
213   struct sockaddr_in addr;
214   uv_buf_t buf;
215   int r;
216   int i;
217 
218   r = uv_mutex_init(&mutex);
219   ASSERT_OK(r);
220 
221   r = uv_sem_init(&semaphore, 0);
222   ASSERT_OK(r);
223 
224   main_loop = uv_default_loop();
225   ASSERT_NOT_NULL(main_loop);
226 
227   /* Run event loops of receiving sockets in separate threads. */
228   uv_loop_init(&thread_loop1);
229   uv_loop_init(&thread_loop2);
230   uv_thread_t thread_loop_id1;
231   uv_thread_t thread_loop_id2;
232   uv_thread_create(&thread_loop_id1, run_event_loop, &thread_loop1);
233   uv_thread_create(&thread_loop_id2, run_event_loop, &thread_loop2);
234 
235   /* Wait until all threads to poll for receiving datagrams
236    * before we start to sending. Otherwise the incoming datagrams
237    * might not be distributed across all receiving threads. */
238   for (i = 0; i < NUM_RECEIVING_THREADS; i++)
239     uv_sem_wait(&semaphore);
240   /* Now we know all threads are up and entering the uv_run(),
241    * but we still sleep a little bit just for dual fail-safe. */
242   uv_sleep(100);
243 
244   /* Start sending datagrams to the peers. */
245   buf = uv_buf_init("Hello", 5);
246   ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
247   for (i = 0; i < MAX_UDP_DATAGRAMS; i++) {
248     r = uv_udp_init(main_loop, &udp_send_handles[i]);
249     ASSERT_OK(r);
250     r = uv_udp_send(&udp_send_requests[i],
251                     &udp_send_handles[i],
252                     &buf,
253                     1,
254                     (const struct sockaddr*) &addr,
255                     on_send);
256     ASSERT_OK(r);
257   }
258 
259   r = uv_run(main_loop, UV_RUN_DEFAULT);
260   ASSERT_OK(r);
261 
262   /* Wait for all threads to exit. */
263   uv_thread_join(&thread_loop_id1);
264   uv_thread_join(&thread_loop_id2);
265 
266   /* Verify if each receiving socket per event loop received datagrams
267    * and the amount of received datagrams matches the one of sent datagrams.
268    */
269   ASSERT_EQ(received, MAX_UDP_DATAGRAMS);
270   ASSERT_EQ(sent, MAX_UDP_DATAGRAMS);
271   ASSERT_GT(thread_loop1_recv, 0);
272   ASSERT_GT(thread_loop2_recv, 0);
273   ASSERT_EQ(thread_loop1_recv + thread_loop2_recv, sent);
274 
275   /* Clean up. */
276   uv_mutex_destroy(&mutex);
277 
278   uv_sem_destroy(&semaphore);
279 
280   uv_loop_close(&thread_loop1);
281   uv_loop_close(&thread_loop2);
282   MAKE_VALGRIND_HAPPY(main_loop);
283 
284   return 0;
285 }
286 
287 #endif
288