1 /* Copyright libuv project contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22 #include "uv.h"
23 #include "task.h"
24
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28
29 #if !defined(__linux__) && !defined(__FreeBSD__) && \
30 !defined(__DragonFly__) && !defined(__sun) && !defined(_AIX73)
31
TEST_IMPL(udp_reuseport)32 TEST_IMPL(udp_reuseport) {
33 struct sockaddr_in addr1, addr2, addr3;
34 uv_loop_t* loop;
35 uv_udp_t handle1, handle2, handle3;
36 int r;
37
38 ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &addr1));
39 ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT_2, &addr2));
40 ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT_3, &addr3));
41
42 loop = uv_default_loop();
43 ASSERT_NOT_NULL(loop);
44
45 r = uv_udp_init(loop, &handle1);
46 ASSERT_OK(r);
47
48 r = uv_udp_bind(&handle1, (const struct sockaddr*) &addr1, UV_UDP_REUSEADDR);
49 ASSERT_OK(r);
50
51 r = uv_udp_init(loop, &handle2);
52 ASSERT_OK(r);
53
54 r = uv_udp_bind(&handle2, (const struct sockaddr*) &addr2, UV_UDP_REUSEPORT);
55 ASSERT_EQ(r, UV_ENOTSUP);
56
57 r = uv_udp_init(loop, &handle3);
58 ASSERT_OK(r);
59
60 /* For platforms where SO_REUSEPORTs don't have the capability of
61 * load balancing, specifying both UV_UDP_REUSEADDR and UV_UDP_REUSEPORT
62 * in flags will fail, returning an UV_ENOTSUP error. */
63 r = uv_udp_bind(&handle3, (const struct sockaddr*) &addr3,
64 UV_UDP_REUSEADDR | UV_UDP_REUSEPORT);
65 ASSERT_EQ(r, UV_ENOTSUP);
66
67 MAKE_VALGRIND_HAPPY(loop);
68
69 return 0;
70 }
71
72 #else
73
74 #define NUM_RECEIVING_THREADS 2
75 #define MAX_UDP_DATAGRAMS 10
76
77 static uv_udp_t udp_send_handles[MAX_UDP_DATAGRAMS];
78 static uv_udp_send_t udp_send_requests[MAX_UDP_DATAGRAMS];
79
80 static uv_sem_t semaphore;
81
82 static uv_mutex_t mutex;
83 static unsigned int received;
84
85 static unsigned int thread_loop1_recv;
86 static unsigned int thread_loop2_recv;
87 static unsigned int sent;
88
89 static uv_loop_t* main_loop;
90 static uv_loop_t thread_loop1;
91 static uv_loop_t thread_loop2;
92 static uv_udp_t thread_handle1;
93 static uv_udp_t thread_handle2;
94 static uv_timer_t thread_timer_handle1;
95 static uv_timer_t thread_timer_handle2;
96
alloc_cb(uv_handle_t * handle,size_t suggested_size,uv_buf_t * buf)97 static void alloc_cb(uv_handle_t* handle,
98 size_t suggested_size,
99 uv_buf_t* buf) {
100 buf->base = malloc(suggested_size);
101 buf->len = (int) suggested_size;
102 }
103
ticktack(uv_timer_t * timer)104 static void ticktack(uv_timer_t* timer) {
105 int done = 0;
106
107 ASSERT(timer == &thread_timer_handle1 || timer == &thread_timer_handle2);
108
109 uv_mutex_lock(&mutex);
110 if (received == MAX_UDP_DATAGRAMS) {
111 done = 1;
112 }
113 uv_mutex_unlock(&mutex);
114
115 if (done) {
116 uv_close((uv_handle_t*) timer, NULL);
117 if (timer->loop == &thread_loop1)
118 uv_close((uv_handle_t*) &thread_handle1, NULL);
119 if (timer->loop == &thread_loop2)
120 uv_close((uv_handle_t*) &thread_handle2, NULL);
121 }
122 }
123
on_recv(uv_udp_t * handle,ssize_t nr,const uv_buf_t * buf,const struct sockaddr * addr,unsigned flags)124 static void on_recv(uv_udp_t* handle,
125 ssize_t nr,
126 const uv_buf_t* buf,
127 const struct sockaddr* addr,
128 unsigned flags) {
129 ASSERT_OK(flags);
130 ASSERT(handle == &thread_handle1 || handle == &thread_handle2);
131
132 ASSERT_GE(nr, 0);
133
134 if (nr == 0) {
135 ASSERT_NULL(addr);
136 free(buf->base);
137 return;
138 }
139
140 ASSERT_NOT_NULL(addr);
141 ASSERT_EQ(5, nr);
142 ASSERT(!memcmp("Hello", buf->base, nr));
143 free(buf->base);
144
145 if (handle->loop == &thread_loop1)
146 thread_loop1_recv++;
147
148 if (handle->loop == &thread_loop2)
149 thread_loop2_recv++;
150
151 uv_mutex_lock(&mutex);
152 received++;
153 uv_mutex_unlock(&mutex);
154 }
155
on_send(uv_udp_send_t * req,int status)156 static void on_send(uv_udp_send_t* req, int status) {
157 ASSERT_OK(status);
158 ASSERT_PTR_EQ(req->handle->loop, main_loop);
159
160 if (++sent == MAX_UDP_DATAGRAMS)
161 uv_close((uv_handle_t*) req->handle, NULL);
162 }
163
bind_socket_and_prepare_recv(uv_loop_t * loop,uv_udp_t * handle)164 static void bind_socket_and_prepare_recv(uv_loop_t* loop, uv_udp_t* handle) {
165 struct sockaddr_in addr;
166 int r;
167
168 ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
169
170 r = uv_udp_init(loop, handle);
171 ASSERT_OK(r);
172
173 /* For platforms where SO_REUSEPORTs have the capability of
174 * load balancing, specifying both UV_UDP_REUSEADDR and
175 * UV_UDP_REUSEPORT in flags is allowed and SO_REUSEPORT will
176 * always override the behavior of SO_REUSEADDR. */
177 r = uv_udp_bind(handle, (const struct sockaddr*) &addr,
178 UV_UDP_REUSEADDR | UV_UDP_REUSEPORT);
179 ASSERT_OK(r);
180
181 r = uv_udp_recv_start(handle, alloc_cb, on_recv);
182 ASSERT_OK(r);
183 }
184
run_event_loop(void * arg)185 static void run_event_loop(void* arg) {
186 int r;
187 uv_udp_t* handle;
188 uv_timer_t* timer;
189 uv_loop_t* loop = (uv_loop_t*) arg;
190 ASSERT(loop == &thread_loop1 || loop == &thread_loop2);
191
192 if (loop == &thread_loop1) {
193 handle = &thread_handle1;
194 timer = &thread_timer_handle1;
195 } else {
196 handle = &thread_handle2;
197 timer = &thread_timer_handle2;
198 }
199
200 bind_socket_and_prepare_recv(loop, handle);
201 r = uv_timer_init(loop, timer);
202 ASSERT_OK(r);
203 r = uv_timer_start(timer, ticktack, 0, 10);
204 ASSERT_OK(r);
205
206 /* Notify the main thread to start sending data. */
207 uv_sem_post(&semaphore);
208 r = uv_run(loop, UV_RUN_DEFAULT);
209 ASSERT_OK(r);
210 }
211
TEST_IMPL(udp_reuseport)212 TEST_IMPL(udp_reuseport) {
213 struct sockaddr_in addr;
214 uv_buf_t buf;
215 int r;
216 int i;
217
218 r = uv_mutex_init(&mutex);
219 ASSERT_OK(r);
220
221 r = uv_sem_init(&semaphore, 0);
222 ASSERT_OK(r);
223
224 main_loop = uv_default_loop();
225 ASSERT_NOT_NULL(main_loop);
226
227 /* Run event loops of receiving sockets in separate threads. */
228 uv_loop_init(&thread_loop1);
229 uv_loop_init(&thread_loop2);
230 uv_thread_t thread_loop_id1;
231 uv_thread_t thread_loop_id2;
232 uv_thread_create(&thread_loop_id1, run_event_loop, &thread_loop1);
233 uv_thread_create(&thread_loop_id2, run_event_loop, &thread_loop2);
234
235 /* Wait until all threads to poll for receiving datagrams
236 * before we start to sending. Otherwise the incoming datagrams
237 * might not be distributed across all receiving threads. */
238 for (i = 0; i < NUM_RECEIVING_THREADS; i++)
239 uv_sem_wait(&semaphore);
240 /* Now we know all threads are up and entering the uv_run(),
241 * but we still sleep a little bit just for dual fail-safe. */
242 uv_sleep(100);
243
244 /* Start sending datagrams to the peers. */
245 buf = uv_buf_init("Hello", 5);
246 ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
247 for (i = 0; i < MAX_UDP_DATAGRAMS; i++) {
248 r = uv_udp_init(main_loop, &udp_send_handles[i]);
249 ASSERT_OK(r);
250 r = uv_udp_send(&udp_send_requests[i],
251 &udp_send_handles[i],
252 &buf,
253 1,
254 (const struct sockaddr*) &addr,
255 on_send);
256 ASSERT_OK(r);
257 }
258
259 r = uv_run(main_loop, UV_RUN_DEFAULT);
260 ASSERT_OK(r);
261
262 /* Wait for all threads to exit. */
263 uv_thread_join(&thread_loop_id1);
264 uv_thread_join(&thread_loop_id2);
265
266 /* Verify if each receiving socket per event loop received datagrams
267 * and the amount of received datagrams matches the one of sent datagrams.
268 */
269 ASSERT_EQ(received, MAX_UDP_DATAGRAMS);
270 ASSERT_EQ(sent, MAX_UDP_DATAGRAMS);
271 ASSERT_GT(thread_loop1_recv, 0);
272 ASSERT_GT(thread_loop2_recv, 0);
273 ASSERT_EQ(thread_loop1_recv + thread_loop2_recv, sent);
274
275 /* Clean up. */
276 uv_mutex_destroy(&mutex);
277
278 uv_sem_destroy(&semaphore);
279
280 uv_loop_close(&thread_loop1);
281 uv_loop_close(&thread_loop2);
282 MAKE_VALGRIND_HAPPY(main_loop);
283
284 return 0;
285 }
286
287 #endif
288