1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22 #include "uv.h"
23 #include "internal.h"
24
25 #include <assert.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <stdlib.h>
29 #include <unistd.h>
30 #if defined(__MVS__)
31 #include <xti.h>
32 #endif
33 #include <sys/un.h>
34
35 #if defined(IPV6_JOIN_GROUP) && !defined(IPV6_ADD_MEMBERSHIP)
36 # define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
37 #endif
38
39 #if defined(IPV6_LEAVE_GROUP) && !defined(IPV6_DROP_MEMBERSHIP)
40 # define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
41 #endif
42
43 static void uv__udp_run_completed(uv_udp_t* handle);
44 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
45 static void uv__udp_recvmsg(uv_udp_t* handle);
46 static void uv__udp_sendmsg(uv_udp_t* handle);
47 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
48 int domain,
49 unsigned int flags);
50 static int uv__udp_sendmsg1(int fd,
51 const uv_buf_t* bufs,
52 unsigned int nbufs,
53 const struct sockaddr* addr);
54
55
uv__udp_close(uv_udp_t * handle)56 void uv__udp_close(uv_udp_t* handle) {
57 uv__io_close(handle->loop, &handle->io_watcher);
58 uv__handle_stop(handle);
59
60 if (handle->io_watcher.fd != -1) {
61 uv__close(handle->io_watcher.fd);
62 handle->io_watcher.fd = -1;
63 }
64 }
65
66
uv__udp_finish_close(uv_udp_t * handle)67 void uv__udp_finish_close(uv_udp_t* handle) {
68 uv_udp_send_t* req;
69 struct uv__queue* q;
70
71 assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
72 assert(handle->io_watcher.fd == -1);
73
74 while (!uv__queue_empty(&handle->write_queue)) {
75 q = uv__queue_head(&handle->write_queue);
76 uv__queue_remove(q);
77
78 req = uv__queue_data(q, uv_udp_send_t, queue);
79 req->status = UV_ECANCELED;
80 uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
81 }
82
83 uv__udp_run_completed(handle);
84
85 assert(handle->send_queue_size == 0);
86 assert(handle->send_queue_count == 0);
87
88 /* Now tear down the handle. */
89 handle->recv_cb = NULL;
90 handle->alloc_cb = NULL;
91 /* but _do not_ touch close_cb */
92 }
93
94
uv__udp_run_completed(uv_udp_t * handle)95 static void uv__udp_run_completed(uv_udp_t* handle) {
96 uv_udp_send_t* req;
97 struct uv__queue* q;
98
99 assert(!(handle->flags & UV_HANDLE_UDP_PROCESSING));
100 handle->flags |= UV_HANDLE_UDP_PROCESSING;
101
102 while (!uv__queue_empty(&handle->write_completed_queue)) {
103 q = uv__queue_head(&handle->write_completed_queue);
104 uv__queue_remove(q);
105
106 req = uv__queue_data(q, uv_udp_send_t, queue);
107 uv__req_unregister(handle->loop);
108
109 handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);
110 handle->send_queue_count--;
111
112 if (req->bufs != req->bufsml)
113 uv__free(req->bufs);
114 req->bufs = NULL;
115
116 if (req->send_cb == NULL)
117 continue;
118
119 /* req->status >= 0 == bytes written
120 * req->status < 0 == errno
121 */
122 if (req->status >= 0)
123 req->send_cb(req, 0);
124 else
125 req->send_cb(req, req->status);
126 }
127
128 if (uv__queue_empty(&handle->write_queue)) {
129 /* Pending queue and completion queue empty, stop watcher. */
130 uv__io_stop(handle->loop, &handle->io_watcher, POLLOUT);
131 if (!uv__io_active(&handle->io_watcher, POLLIN))
132 uv__handle_stop(handle);
133 }
134
135 handle->flags &= ~UV_HANDLE_UDP_PROCESSING;
136 }
137
138
uv__udp_io(uv_loop_t * loop,uv__io_t * w,unsigned int revents)139 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {
140 uv_udp_t* handle;
141
142 handle = container_of(w, uv_udp_t, io_watcher);
143 assert(handle->type == UV_UDP);
144
145 if (revents & POLLIN)
146 uv__udp_recvmsg(handle);
147
148 if (revents & POLLOUT && !uv__is_closing(handle)) {
149 uv__udp_sendmsg(handle);
150 uv__udp_run_completed(handle);
151 }
152 }
153
uv__udp_recvmmsg(uv_udp_t * handle,uv_buf_t * buf)154 static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf) {
155 #if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__)
156 struct sockaddr_in6 peers[20];
157 struct iovec iov[ARRAY_SIZE(peers)];
158 struct mmsghdr msgs[ARRAY_SIZE(peers)];
159 ssize_t nread;
160 uv_buf_t chunk_buf;
161 size_t chunks;
162 int flags;
163 size_t k;
164
165 /* prepare structures for recvmmsg */
166 chunks = buf->len / UV__UDP_DGRAM_MAXSIZE;
167 if (chunks > ARRAY_SIZE(iov))
168 chunks = ARRAY_SIZE(iov);
169 for (k = 0; k < chunks; ++k) {
170 iov[k].iov_base = buf->base + k * UV__UDP_DGRAM_MAXSIZE;
171 iov[k].iov_len = UV__UDP_DGRAM_MAXSIZE;
172 memset(&msgs[k].msg_hdr, 0, sizeof(msgs[k].msg_hdr));
173 msgs[k].msg_hdr.msg_iov = iov + k;
174 msgs[k].msg_hdr.msg_iovlen = 1;
175 msgs[k].msg_hdr.msg_name = peers + k;
176 msgs[k].msg_hdr.msg_namelen = sizeof(peers[0]);
177 msgs[k].msg_hdr.msg_control = NULL;
178 msgs[k].msg_hdr.msg_controllen = 0;
179 msgs[k].msg_hdr.msg_flags = 0;
180 msgs[k].msg_len = 0;
181 }
182
183 #if defined(__APPLE__)
184 do
185 nread = recvmsg_x(handle->io_watcher.fd, msgs, chunks, MSG_DONTWAIT);
186 while (nread == -1 && errno == EINTR);
187 #else
188 do
189 nread = recvmmsg(handle->io_watcher.fd, msgs, chunks, 0, NULL);
190 while (nread == -1 && errno == EINTR);
191 #endif
192
193 if (nread < 1) {
194 if (nread == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
195 handle->recv_cb(handle, 0, buf, NULL, 0);
196 else
197 handle->recv_cb(handle, UV__ERR(errno), buf, NULL, 0);
198 } else {
199 /* pass each chunk to the application */
200 for (k = 0; k < (size_t) nread && handle->recv_cb != NULL; k++) {
201 flags = UV_UDP_MMSG_CHUNK;
202 if (msgs[k].msg_hdr.msg_flags & MSG_TRUNC)
203 flags |= UV_UDP_PARTIAL;
204
205 chunk_buf = uv_buf_init(iov[k].iov_base, iov[k].iov_len);
206 handle->recv_cb(handle,
207 msgs[k].msg_len,
208 &chunk_buf,
209 msgs[k].msg_hdr.msg_name,
210 flags);
211 }
212
213 /* one last callback so the original buffer is freed */
214 if (handle->recv_cb != NULL)
215 handle->recv_cb(handle, 0, buf, NULL, UV_UDP_MMSG_FREE);
216 }
217 return nread;
218 #else /* __linux__ || ____FreeBSD__ || __APPLE__ */
219 return UV_ENOSYS;
220 #endif /* __linux__ || ____FreeBSD__ || __APPLE__ */
221 }
222
uv__udp_recvmsg(uv_udp_t * handle)223 static void uv__udp_recvmsg(uv_udp_t* handle) {
224 struct sockaddr_storage peer;
225 struct msghdr h;
226 ssize_t nread;
227 uv_buf_t buf;
228 int flags;
229 int count;
230
231 assert(handle->recv_cb != NULL);
232 assert(handle->alloc_cb != NULL);
233
234 /* Prevent loop starvation when the data comes in as fast as (or faster than)
235 * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
236 */
237 count = 32;
238
239 do {
240 buf = uv_buf_init(NULL, 0);
241 handle->alloc_cb((uv_handle_t*) handle, UV__UDP_DGRAM_MAXSIZE, &buf);
242 if (buf.base == NULL || buf.len == 0) {
243 handle->recv_cb(handle, UV_ENOBUFS, &buf, NULL, 0);
244 return;
245 }
246 assert(buf.base != NULL);
247
248 if (uv_udp_using_recvmmsg(handle)) {
249 nread = uv__udp_recvmmsg(handle, &buf);
250 if (nread > 0)
251 count -= nread;
252 continue;
253 }
254
255 memset(&h, 0, sizeof(h));
256 memset(&peer, 0, sizeof(peer));
257 h.msg_name = &peer;
258 h.msg_namelen = sizeof(peer);
259 h.msg_iov = (void*) &buf;
260 h.msg_iovlen = 1;
261
262 do {
263 nread = recvmsg(handle->io_watcher.fd, &h, 0);
264 }
265 while (nread == -1 && errno == EINTR);
266
267 if (nread == -1) {
268 if (errno == EAGAIN || errno == EWOULDBLOCK)
269 handle->recv_cb(handle, 0, &buf, NULL, 0);
270 else
271 handle->recv_cb(handle, UV__ERR(errno), &buf, NULL, 0);
272 }
273 else {
274 flags = 0;
275 if (h.msg_flags & MSG_TRUNC)
276 flags |= UV_UDP_PARTIAL;
277
278 handle->recv_cb(handle, nread, &buf, (const struct sockaddr*) &peer, flags);
279 }
280 count--;
281 }
282 /* recv_cb callback may decide to pause or close the handle */
283 while (nread != -1
284 && count > 0
285 && handle->io_watcher.fd != -1
286 && handle->recv_cb != NULL);
287 }
288
289
290 /* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional
291 * refinements for programs that use multicast. Therefore we preferentially
292 * set SO_REUSEPORT over SO_REUSEADDR here, but we set SO_REUSEPORT only
293 * when that socket option doesn't have the capability of load balancing.
294 * Otherwise, we fall back to SO_REUSEADDR.
295 *
296 * Linux as of 3.9, DragonflyBSD 3.6, AIX 7.2.5 have the SO_REUSEPORT socket
297 * option but with semantics that are different from the BSDs: it _shares_
298 * the port rather than steals it from the current listener. While useful,
299 * it's not something we can emulate on other platforms so we don't enable it.
300 *
301 * zOS does not support getsockname with SO_REUSEPORT option when using
302 * AF_UNIX.
303 */
uv__sock_reuseaddr(int fd)304 static int uv__sock_reuseaddr(int fd) {
305 int yes;
306 yes = 1;
307
308 #if defined(SO_REUSEPORT) && defined(__MVS__)
309 struct sockaddr_in sockfd;
310 unsigned int sockfd_len = sizeof(sockfd);
311 if (getsockname(fd, (struct sockaddr*) &sockfd, &sockfd_len) == -1)
312 return UV__ERR(errno);
313 if (sockfd.sin_family == AF_UNIX) {
314 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
315 return UV__ERR(errno);
316 } else {
317 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
318 return UV__ERR(errno);
319 }
320 #elif defined(SO_REUSEPORT) && !defined(__linux__) && !defined(__GNU__) && \
321 !defined(__sun__) && !defined(__DragonFly__) && !defined(_AIX73)
322 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
323 return UV__ERR(errno);
324 #else
325 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
326 return UV__ERR(errno);
327 #endif
328
329 return 0;
330 }
331
332 /*
333 * The Linux kernel suppresses some ICMP error messages by default for UDP
334 * sockets. Setting IP_RECVERR/IPV6_RECVERR on the socket enables full ICMP
335 * error reporting, hopefully resulting in faster failover to working name
336 * servers.
337 */
uv__set_recverr(int fd,sa_family_t ss_family)338 static int uv__set_recverr(int fd, sa_family_t ss_family) {
339 #if defined(__linux__)
340 int yes;
341
342 yes = 1;
343 if (ss_family == AF_INET) {
344 if (setsockopt(fd, IPPROTO_IP, IP_RECVERR, &yes, sizeof(yes)))
345 return UV__ERR(errno);
346 } else if (ss_family == AF_INET6) {
347 if (setsockopt(fd, IPPROTO_IPV6, IPV6_RECVERR, &yes, sizeof(yes)))
348 return UV__ERR(errno);
349 }
350 #endif
351 return 0;
352 }
353
354
uv__udp_bind(uv_udp_t * handle,const struct sockaddr * addr,unsigned int addrlen,unsigned int flags)355 int uv__udp_bind(uv_udp_t* handle,
356 const struct sockaddr* addr,
357 unsigned int addrlen,
358 unsigned int flags) {
359 int err;
360 int yes;
361 int fd;
362
363 /* Check for bad flags. */
364 if (flags & ~(UV_UDP_IPV6ONLY | UV_UDP_REUSEADDR |
365 UV_UDP_REUSEPORT | UV_UDP_LINUX_RECVERR))
366 return UV_EINVAL;
367
368 /* Cannot set IPv6-only mode on non-IPv6 socket. */
369 if ((flags & UV_UDP_IPV6ONLY) && addr->sa_family != AF_INET6)
370 return UV_EINVAL;
371
372 fd = handle->io_watcher.fd;
373 if (fd == -1) {
374 err = uv__socket(addr->sa_family, SOCK_DGRAM, 0);
375 if (err < 0)
376 return err;
377 fd = err;
378 handle->io_watcher.fd = fd;
379 }
380
381 if (flags & UV_UDP_LINUX_RECVERR) {
382 err = uv__set_recverr(fd, addr->sa_family);
383 if (err)
384 return err;
385 }
386
387 if (flags & UV_UDP_REUSEADDR) {
388 err = uv__sock_reuseaddr(fd);
389 if (err)
390 return err;
391 }
392
393 if (flags & UV_UDP_REUSEPORT) {
394 err = uv__sock_reuseport(fd);
395 if (err)
396 return err;
397 }
398
399 if (flags & UV_UDP_IPV6ONLY) {
400 #ifdef IPV6_V6ONLY
401 yes = 1;
402 if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof yes) == -1) {
403 err = UV__ERR(errno);
404 return err;
405 }
406 #else
407 err = UV_ENOTSUP;
408 return err;
409 #endif
410 }
411
412 if (bind(fd, addr, addrlen)) {
413 err = UV__ERR(errno);
414 if (errno == EAFNOSUPPORT)
415 /* OSX, other BSDs and SunoS fail with EAFNOSUPPORT when binding a
416 * socket created with AF_INET to an AF_INET6 address or vice versa. */
417 err = UV_EINVAL;
418 return err;
419 }
420
421 if (addr->sa_family == AF_INET6)
422 handle->flags |= UV_HANDLE_IPV6;
423
424 handle->flags |= UV_HANDLE_BOUND;
425 return 0;
426 }
427
428
uv__udp_maybe_deferred_bind(uv_udp_t * handle,int domain,unsigned int flags)429 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
430 int domain,
431 unsigned int flags) {
432 union uv__sockaddr taddr;
433 socklen_t addrlen;
434
435 if (handle->io_watcher.fd != -1)
436 return 0;
437
438 switch (domain) {
439 case AF_INET:
440 {
441 struct sockaddr_in* addr = &taddr.in;
442 memset(addr, 0, sizeof *addr);
443 addr->sin_family = AF_INET;
444 addr->sin_addr.s_addr = INADDR_ANY;
445 addrlen = sizeof *addr;
446 break;
447 }
448 case AF_INET6:
449 {
450 struct sockaddr_in6* addr = &taddr.in6;
451 memset(addr, 0, sizeof *addr);
452 addr->sin6_family = AF_INET6;
453 addr->sin6_addr = in6addr_any;
454 addrlen = sizeof *addr;
455 break;
456 }
457 default:
458 assert(0 && "unsupported address family");
459 abort();
460 }
461
462 return uv__udp_bind(handle, &taddr.addr, addrlen, flags);
463 }
464
465
uv__udp_connect(uv_udp_t * handle,const struct sockaddr * addr,unsigned int addrlen)466 int uv__udp_connect(uv_udp_t* handle,
467 const struct sockaddr* addr,
468 unsigned int addrlen) {
469 int err;
470
471 err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
472 if (err)
473 return err;
474
475 do {
476 errno = 0;
477 err = connect(handle->io_watcher.fd, addr, addrlen);
478 } while (err == -1 && errno == EINTR);
479
480 if (err)
481 return UV__ERR(errno);
482
483 handle->flags |= UV_HANDLE_UDP_CONNECTED;
484
485 return 0;
486 }
487
488 /* From https://pubs.opengroup.org/onlinepubs/9699919799/functions/connect.html
489 * Any of uv supported UNIXs kernel should be standardized, but the kernel
490 * implementation logic not same, let's use pseudocode to explain the udp
491 * disconnect behaviors:
492 *
493 * Predefined stubs for pseudocode:
494 * 1. sodisconnect: The function to perform the real udp disconnect
495 * 2. pru_connect: The function to perform the real udp connect
496 * 3. so: The kernel object match with socket fd
497 * 4. addr: The sockaddr parameter from user space
498 *
499 * BSDs:
500 * if(sodisconnect(so) == 0) { // udp disconnect succeed
501 * if (addr->sa_len != so->addr->sa_len) return EINVAL;
502 * if (addr->sa_family != so->addr->sa_family) return EAFNOSUPPORT;
503 * pru_connect(so);
504 * }
505 * else return EISCONN;
506 *
507 * z/OS (same with Windows):
508 * if(addr->sa_len < so->addr->sa_len) return EINVAL;
509 * if (addr->sa_family == AF_UNSPEC) sodisconnect(so);
510 *
511 * AIX:
512 * if(addr->sa_len != sizeof(struct sockaddr)) return EINVAL; // ignore ip proto version
513 * if (addr->sa_family == AF_UNSPEC) sodisconnect(so);
514 *
515 * Linux,Others:
516 * if(addr->sa_len < sizeof(struct sockaddr)) return EINVAL;
517 * if (addr->sa_family == AF_UNSPEC) sodisconnect(so);
518 */
uv__udp_disconnect(uv_udp_t * handle)519 int uv__udp_disconnect(uv_udp_t* handle) {
520 int r;
521 #if defined(__MVS__)
522 struct sockaddr_storage addr;
523 #else
524 struct sockaddr addr;
525 #endif
526
527 memset(&addr, 0, sizeof(addr));
528
529 #if defined(__MVS__)
530 addr.ss_family = AF_UNSPEC;
531 #else
532 addr.sa_family = AF_UNSPEC;
533 #endif
534
535 do {
536 errno = 0;
537 #ifdef __PASE__
538 /* On IBMi a connectionless transport socket can be disconnected by
539 * either setting the addr parameter to NULL or setting the
540 * addr_length parameter to zero, and issuing another connect().
541 * https://www.ibm.com/docs/en/i/7.4?topic=ssw_ibm_i_74/apis/connec.htm
542 */
543 r = connect(handle->io_watcher.fd, (struct sockaddr*) NULL, 0);
544 #else
545 r = connect(handle->io_watcher.fd, (struct sockaddr*) &addr, sizeof(addr));
546 #endif
547 } while (r == -1 && errno == EINTR);
548
549 if (r == -1) {
550 #if defined(BSD) /* The macro BSD is from sys/param.h */
551 if (errno != EAFNOSUPPORT && errno != EINVAL)
552 return UV__ERR(errno);
553 #else
554 return UV__ERR(errno);
555 #endif
556 }
557
558 handle->flags &= ~UV_HANDLE_UDP_CONNECTED;
559 return 0;
560 }
561
uv__udp_send(uv_udp_send_t * req,uv_udp_t * handle,const uv_buf_t bufs[],unsigned int nbufs,const struct sockaddr * addr,unsigned int addrlen,uv_udp_send_cb send_cb)562 int uv__udp_send(uv_udp_send_t* req,
563 uv_udp_t* handle,
564 const uv_buf_t bufs[],
565 unsigned int nbufs,
566 const struct sockaddr* addr,
567 unsigned int addrlen,
568 uv_udp_send_cb send_cb) {
569 int err;
570 int empty_queue;
571
572 assert(nbufs > 0);
573
574 if (addr) {
575 err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
576 if (err)
577 return err;
578 }
579
580 /* It's legal for send_queue_count > 0 even when the write_queue is empty;
581 * it means there are error-state requests in the write_completed_queue that
582 * will touch up send_queue_size/count later.
583 */
584 empty_queue = (handle->send_queue_count == 0);
585
586 uv__req_init(handle->loop, req, UV_UDP_SEND);
587 assert(addrlen <= sizeof(req->u.storage));
588 if (addr == NULL)
589 req->u.storage.ss_family = AF_UNSPEC;
590 else
591 memcpy(&req->u.storage, addr, addrlen);
592 req->send_cb = send_cb;
593 req->handle = handle;
594 req->nbufs = nbufs;
595
596 req->bufs = req->bufsml;
597 if (nbufs > ARRAY_SIZE(req->bufsml))
598 req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
599
600 if (req->bufs == NULL) {
601 uv__req_unregister(handle->loop);
602 return UV_ENOMEM;
603 }
604
605 memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
606 handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs);
607 handle->send_queue_count++;
608 uv__queue_insert_tail(&handle->write_queue, &req->queue);
609 uv__handle_start(handle);
610
611 if (empty_queue && !(handle->flags & UV_HANDLE_UDP_PROCESSING)) {
612 uv__udp_sendmsg(handle);
613
614 /* `uv__udp_sendmsg` may not be able to do non-blocking write straight
615 * away. In such cases the `io_watcher` has to be queued for asynchronous
616 * write.
617 */
618 if (!uv__queue_empty(&handle->write_queue))
619 uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
620 } else {
621 uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
622 }
623
624 return 0;
625 }
626
627
uv__udp_try_send(uv_udp_t * handle,const uv_buf_t bufs[],unsigned int nbufs,const struct sockaddr * addr,unsigned int addrlen)628 int uv__udp_try_send(uv_udp_t* handle,
629 const uv_buf_t bufs[],
630 unsigned int nbufs,
631 const struct sockaddr* addr,
632 unsigned int addrlen) {
633 int err;
634
635 if (nbufs < 1)
636 return UV_EINVAL;
637
638 /* already sending a message */
639 if (handle->send_queue_count != 0)
640 return UV_EAGAIN;
641
642 if (addr) {
643 err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
644 if (err)
645 return err;
646 } else {
647 assert(handle->flags & UV_HANDLE_UDP_CONNECTED);
648 }
649
650 err = uv__udp_sendmsg1(handle->io_watcher.fd, bufs, nbufs, addr);
651 if (err > 0)
652 return uv__count_bufs(bufs, nbufs);
653
654 return err;
655 }
656
657
uv__udp_set_membership4(uv_udp_t * handle,const struct sockaddr_in * multicast_addr,const char * interface_addr,uv_membership membership)658 static int uv__udp_set_membership4(uv_udp_t* handle,
659 const struct sockaddr_in* multicast_addr,
660 const char* interface_addr,
661 uv_membership membership) {
662 struct ip_mreq mreq;
663 int optname;
664 int err;
665
666 memset(&mreq, 0, sizeof mreq);
667
668 if (interface_addr) {
669 err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
670 if (err)
671 return err;
672 } else {
673 mreq.imr_interface.s_addr = htonl(INADDR_ANY);
674 }
675
676 mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
677
678 switch (membership) {
679 case UV_JOIN_GROUP:
680 optname = IP_ADD_MEMBERSHIP;
681 break;
682 case UV_LEAVE_GROUP:
683 optname = IP_DROP_MEMBERSHIP;
684 break;
685 default:
686 return UV_EINVAL;
687 }
688
689 if (setsockopt(handle->io_watcher.fd,
690 IPPROTO_IP,
691 optname,
692 &mreq,
693 sizeof(mreq))) {
694 #if defined(__MVS__)
695 if (errno == ENXIO)
696 return UV_ENODEV;
697 #endif
698 return UV__ERR(errno);
699 }
700
701 return 0;
702 }
703
704
uv__udp_set_membership6(uv_udp_t * handle,const struct sockaddr_in6 * multicast_addr,const char * interface_addr,uv_membership membership)705 static int uv__udp_set_membership6(uv_udp_t* handle,
706 const struct sockaddr_in6* multicast_addr,
707 const char* interface_addr,
708 uv_membership membership) {
709 int optname;
710 struct ipv6_mreq mreq;
711 struct sockaddr_in6 addr6;
712
713 memset(&mreq, 0, sizeof mreq);
714
715 if (interface_addr) {
716 if (uv_ip6_addr(interface_addr, 0, &addr6))
717 return UV_EINVAL;
718 mreq.ipv6mr_interface = addr6.sin6_scope_id;
719 } else {
720 mreq.ipv6mr_interface = 0;
721 }
722
723 mreq.ipv6mr_multiaddr = multicast_addr->sin6_addr;
724
725 switch (membership) {
726 case UV_JOIN_GROUP:
727 optname = IPV6_ADD_MEMBERSHIP;
728 break;
729 case UV_LEAVE_GROUP:
730 optname = IPV6_DROP_MEMBERSHIP;
731 break;
732 default:
733 return UV_EINVAL;
734 }
735
736 if (setsockopt(handle->io_watcher.fd,
737 IPPROTO_IPV6,
738 optname,
739 &mreq,
740 sizeof(mreq))) {
741 #if defined(__MVS__)
742 if (errno == ENXIO)
743 return UV_ENODEV;
744 #endif
745 return UV__ERR(errno);
746 }
747
748 return 0;
749 }
750
751
752 #if !defined(__OpenBSD__) && \
753 !defined(__NetBSD__) && \
754 !defined(__ANDROID__) && \
755 !defined(__DragonFly__) && \
756 !defined(__QNX__) && \
757 !defined(__GNU__)
uv__udp_set_source_membership4(uv_udp_t * handle,const struct sockaddr_in * multicast_addr,const char * interface_addr,const struct sockaddr_in * source_addr,uv_membership membership)758 static int uv__udp_set_source_membership4(uv_udp_t* handle,
759 const struct sockaddr_in* multicast_addr,
760 const char* interface_addr,
761 const struct sockaddr_in* source_addr,
762 uv_membership membership) {
763 struct ip_mreq_source mreq;
764 int optname;
765 int err;
766
767 err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
768 if (err)
769 return err;
770
771 memset(&mreq, 0, sizeof(mreq));
772
773 if (interface_addr != NULL) {
774 err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
775 if (err)
776 return err;
777 } else {
778 mreq.imr_interface.s_addr = htonl(INADDR_ANY);
779 }
780
781 mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
782 mreq.imr_sourceaddr.s_addr = source_addr->sin_addr.s_addr;
783
784 if (membership == UV_JOIN_GROUP)
785 optname = IP_ADD_SOURCE_MEMBERSHIP;
786 else if (membership == UV_LEAVE_GROUP)
787 optname = IP_DROP_SOURCE_MEMBERSHIP;
788 else
789 return UV_EINVAL;
790
791 if (setsockopt(handle->io_watcher.fd,
792 IPPROTO_IP,
793 optname,
794 &mreq,
795 sizeof(mreq))) {
796 return UV__ERR(errno);
797 }
798
799 return 0;
800 }
801
802
uv__udp_set_source_membership6(uv_udp_t * handle,const struct sockaddr_in6 * multicast_addr,const char * interface_addr,const struct sockaddr_in6 * source_addr,uv_membership membership)803 static int uv__udp_set_source_membership6(uv_udp_t* handle,
804 const struct sockaddr_in6* multicast_addr,
805 const char* interface_addr,
806 const struct sockaddr_in6* source_addr,
807 uv_membership membership) {
808 struct group_source_req mreq;
809 struct sockaddr_in6 addr6;
810 int optname;
811 int err;
812
813 err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
814 if (err)
815 return err;
816
817 memset(&mreq, 0, sizeof(mreq));
818
819 if (interface_addr != NULL) {
820 err = uv_ip6_addr(interface_addr, 0, &addr6);
821 if (err)
822 return err;
823 mreq.gsr_interface = addr6.sin6_scope_id;
824 } else {
825 mreq.gsr_interface = 0;
826 }
827
828 STATIC_ASSERT(sizeof(mreq.gsr_group) >= sizeof(*multicast_addr));
829 STATIC_ASSERT(sizeof(mreq.gsr_source) >= sizeof(*source_addr));
830 memcpy(&mreq.gsr_group, multicast_addr, sizeof(*multicast_addr));
831 memcpy(&mreq.gsr_source, source_addr, sizeof(*source_addr));
832
833 if (membership == UV_JOIN_GROUP)
834 optname = MCAST_JOIN_SOURCE_GROUP;
835 else if (membership == UV_LEAVE_GROUP)
836 optname = MCAST_LEAVE_SOURCE_GROUP;
837 else
838 return UV_EINVAL;
839
840 if (setsockopt(handle->io_watcher.fd,
841 IPPROTO_IPV6,
842 optname,
843 &mreq,
844 sizeof(mreq))) {
845 return UV__ERR(errno);
846 }
847
848 return 0;
849 }
850 #endif
851
852
uv__udp_init_ex(uv_loop_t * loop,uv_udp_t * handle,unsigned flags,int domain)853 int uv__udp_init_ex(uv_loop_t* loop,
854 uv_udp_t* handle,
855 unsigned flags,
856 int domain) {
857 int fd;
858
859 fd = -1;
860 if (domain != AF_UNSPEC) {
861 fd = uv__socket(domain, SOCK_DGRAM, 0);
862 if (fd < 0)
863 return fd;
864 }
865
866 uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP);
867 handle->alloc_cb = NULL;
868 handle->recv_cb = NULL;
869 handle->send_queue_size = 0;
870 handle->send_queue_count = 0;
871 uv__io_init(&handle->io_watcher, uv__udp_io, fd);
872 uv__queue_init(&handle->write_queue);
873 uv__queue_init(&handle->write_completed_queue);
874
875 return 0;
876 }
877
878
uv_udp_using_recvmmsg(const uv_udp_t * handle)879 int uv_udp_using_recvmmsg(const uv_udp_t* handle) {
880 #if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__)
881 if (handle->flags & UV_HANDLE_UDP_RECVMMSG)
882 return 1;
883 #endif
884 return 0;
885 }
886
887
uv_udp_open(uv_udp_t * handle,uv_os_sock_t sock)888 int uv_udp_open(uv_udp_t* handle, uv_os_sock_t sock) {
889 int err;
890
891 /* Check for already active socket. */
892 if (handle->io_watcher.fd != -1)
893 return UV_EBUSY;
894
895 if (uv__fd_exists(handle->loop, sock))
896 return UV_EEXIST;
897
898 err = uv__nonblock(sock, 1);
899 if (err)
900 return err;
901
902 err = uv__sock_reuseaddr(sock);
903 if (err)
904 return err;
905
906 handle->io_watcher.fd = sock;
907 if (uv__udp_is_connected(handle))
908 handle->flags |= UV_HANDLE_UDP_CONNECTED;
909
910 return 0;
911 }
912
913
uv_udp_set_membership(uv_udp_t * handle,const char * multicast_addr,const char * interface_addr,uv_membership membership)914 int uv_udp_set_membership(uv_udp_t* handle,
915 const char* multicast_addr,
916 const char* interface_addr,
917 uv_membership membership) {
918 int err;
919 struct sockaddr_in addr4;
920 struct sockaddr_in6 addr6;
921
922 if (uv_ip4_addr(multicast_addr, 0, &addr4) == 0) {
923 err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
924 if (err)
925 return err;
926 return uv__udp_set_membership4(handle, &addr4, interface_addr, membership);
927 } else if (uv_ip6_addr(multicast_addr, 0, &addr6) == 0) {
928 err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
929 if (err)
930 return err;
931 return uv__udp_set_membership6(handle, &addr6, interface_addr, membership);
932 } else {
933 return UV_EINVAL;
934 }
935 }
936
937
uv_udp_set_source_membership(uv_udp_t * handle,const char * multicast_addr,const char * interface_addr,const char * source_addr,uv_membership membership)938 int uv_udp_set_source_membership(uv_udp_t* handle,
939 const char* multicast_addr,
940 const char* interface_addr,
941 const char* source_addr,
942 uv_membership membership) {
943 #if !defined(__OpenBSD__) && \
944 !defined(__NetBSD__) && \
945 !defined(__ANDROID__) && \
946 !defined(__DragonFly__) && \
947 !defined(__QNX__) && \
948 !defined(__GNU__)
949 int err;
950 union uv__sockaddr mcast_addr;
951 union uv__sockaddr src_addr;
952
953 err = uv_ip4_addr(multicast_addr, 0, &mcast_addr.in);
954 if (err) {
955 err = uv_ip6_addr(multicast_addr, 0, &mcast_addr.in6);
956 if (err)
957 return err;
958 err = uv_ip6_addr(source_addr, 0, &src_addr.in6);
959 if (err)
960 return err;
961 return uv__udp_set_source_membership6(handle,
962 &mcast_addr.in6,
963 interface_addr,
964 &src_addr.in6,
965 membership);
966 }
967
968 err = uv_ip4_addr(source_addr, 0, &src_addr.in);
969 if (err)
970 return err;
971 return uv__udp_set_source_membership4(handle,
972 &mcast_addr.in,
973 interface_addr,
974 &src_addr.in,
975 membership);
976 #else
977 return UV_ENOSYS;
978 #endif
979 }
980
981
uv__setsockopt(uv_udp_t * handle,int option4,int option6,const void * val,socklen_t size)982 static int uv__setsockopt(uv_udp_t* handle,
983 int option4,
984 int option6,
985 const void* val,
986 socklen_t size) {
987 int r;
988
989 if (handle->flags & UV_HANDLE_IPV6)
990 r = setsockopt(handle->io_watcher.fd,
991 IPPROTO_IPV6,
992 option6,
993 val,
994 size);
995 else
996 r = setsockopt(handle->io_watcher.fd,
997 IPPROTO_IP,
998 option4,
999 val,
1000 size);
1001 if (r)
1002 return UV__ERR(errno);
1003
1004 return 0;
1005 }
1006
uv__setsockopt_maybe_char(uv_udp_t * handle,int option4,int option6,int val)1007 static int uv__setsockopt_maybe_char(uv_udp_t* handle,
1008 int option4,
1009 int option6,
1010 int val) {
1011 #if defined(__sun) || defined(_AIX) || defined(__MVS__)
1012 char arg = val;
1013 #elif defined(__OpenBSD__)
1014 unsigned char arg = val;
1015 #else
1016 int arg = val;
1017 #endif
1018
1019 if (val < 0 || val > 255)
1020 return UV_EINVAL;
1021
1022 return uv__setsockopt(handle, option4, option6, &arg, sizeof(arg));
1023 }
1024
1025
uv_udp_set_broadcast(uv_udp_t * handle,int on)1026 int uv_udp_set_broadcast(uv_udp_t* handle, int on) {
1027 if (setsockopt(handle->io_watcher.fd,
1028 SOL_SOCKET,
1029 SO_BROADCAST,
1030 &on,
1031 sizeof(on))) {
1032 return UV__ERR(errno);
1033 }
1034
1035 return 0;
1036 }
1037
1038
uv_udp_set_ttl(uv_udp_t * handle,int ttl)1039 int uv_udp_set_ttl(uv_udp_t* handle, int ttl) {
1040 if (ttl < 1 || ttl > 255)
1041 return UV_EINVAL;
1042
1043 #if defined(__MVS__)
1044 if (!(handle->flags & UV_HANDLE_IPV6))
1045 return UV_ENOTSUP; /* zOS does not support setting ttl for IPv4 */
1046 #endif
1047
1048 /*
1049 * On Solaris and derivatives such as SmartOS, the length of socket options
1050 * is sizeof(int) for IP_TTL and IPV6_UNICAST_HOPS,
1051 * so hardcode the size of these options on this platform,
1052 * and use the general uv__setsockopt_maybe_char call on other platforms.
1053 */
1054 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1055 defined(__MVS__) || defined(__QNX__)
1056
1057 return uv__setsockopt(handle,
1058 IP_TTL,
1059 IPV6_UNICAST_HOPS,
1060 &ttl,
1061 sizeof(ttl));
1062
1063 #else /* !(defined(__sun) || defined(_AIX) || defined (__OpenBSD__) ||
1064 defined(__MVS__) || defined(__QNX__)) */
1065
1066 return uv__setsockopt_maybe_char(handle,
1067 IP_TTL,
1068 IPV6_UNICAST_HOPS,
1069 ttl);
1070
1071 #endif /* defined(__sun) || defined(_AIX) || defined (__OpenBSD__) ||
1072 defined(__MVS__) || defined(__QNX__) */
1073 }
1074
1075
uv_udp_set_multicast_ttl(uv_udp_t * handle,int ttl)1076 int uv_udp_set_multicast_ttl(uv_udp_t* handle, int ttl) {
1077 /*
1078 * On Solaris and derivatives such as SmartOS, the length of socket options
1079 * is sizeof(int) for IPV6_MULTICAST_HOPS and sizeof(char) for
1080 * IP_MULTICAST_TTL, so hardcode the size of the option in the IPv6 case,
1081 * and use the general uv__setsockopt_maybe_char call otherwise.
1082 */
1083 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1084 defined(__MVS__) || defined(__QNX__)
1085 if (handle->flags & UV_HANDLE_IPV6)
1086 return uv__setsockopt(handle,
1087 IP_MULTICAST_TTL,
1088 IPV6_MULTICAST_HOPS,
1089 &ttl,
1090 sizeof(ttl));
1091 #endif /* defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1092 defined(__MVS__) || defined(__QNX__) */
1093
1094 return uv__setsockopt_maybe_char(handle,
1095 IP_MULTICAST_TTL,
1096 IPV6_MULTICAST_HOPS,
1097 ttl);
1098 }
1099
1100
uv_udp_set_multicast_loop(uv_udp_t * handle,int on)1101 int uv_udp_set_multicast_loop(uv_udp_t* handle, int on) {
1102 /*
1103 * On Solaris and derivatives such as SmartOS, the length of socket options
1104 * is sizeof(int) for IPV6_MULTICAST_LOOP and sizeof(char) for
1105 * IP_MULTICAST_LOOP, so hardcode the size of the option in the IPv6 case,
1106 * and use the general uv__setsockopt_maybe_char call otherwise.
1107 */
1108 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1109 defined(__MVS__) || defined(__QNX__)
1110 if (handle->flags & UV_HANDLE_IPV6)
1111 return uv__setsockopt(handle,
1112 IP_MULTICAST_LOOP,
1113 IPV6_MULTICAST_LOOP,
1114 &on,
1115 sizeof(on));
1116 #endif /* defined(__sun) || defined(_AIX) ||defined(__OpenBSD__) ||
1117 defined(__MVS__) || defined(__QNX__) */
1118
1119 return uv__setsockopt_maybe_char(handle,
1120 IP_MULTICAST_LOOP,
1121 IPV6_MULTICAST_LOOP,
1122 on);
1123 }
1124
uv_udp_set_multicast_interface(uv_udp_t * handle,const char * interface_addr)1125 int uv_udp_set_multicast_interface(uv_udp_t* handle, const char* interface_addr) {
1126 struct sockaddr_storage addr_st;
1127 struct sockaddr_in* addr4;
1128 struct sockaddr_in6* addr6;
1129
1130 addr4 = (struct sockaddr_in*) &addr_st;
1131 addr6 = (struct sockaddr_in6*) &addr_st;
1132
1133 if (!interface_addr) {
1134 memset(&addr_st, 0, sizeof addr_st);
1135 if (handle->flags & UV_HANDLE_IPV6) {
1136 addr_st.ss_family = AF_INET6;
1137 addr6->sin6_scope_id = 0;
1138 } else {
1139 addr_st.ss_family = AF_INET;
1140 addr4->sin_addr.s_addr = htonl(INADDR_ANY);
1141 }
1142 } else if (uv_ip4_addr(interface_addr, 0, addr4) == 0) {
1143 /* nothing, address was parsed */
1144 } else if (uv_ip6_addr(interface_addr, 0, addr6) == 0) {
1145 /* nothing, address was parsed */
1146 } else {
1147 return UV_EINVAL;
1148 }
1149
1150 if (addr_st.ss_family == AF_INET) {
1151 if (setsockopt(handle->io_watcher.fd,
1152 IPPROTO_IP,
1153 IP_MULTICAST_IF,
1154 (void*) &addr4->sin_addr,
1155 sizeof(addr4->sin_addr)) == -1) {
1156 return UV__ERR(errno);
1157 }
1158 } else if (addr_st.ss_family == AF_INET6) {
1159 if (setsockopt(handle->io_watcher.fd,
1160 IPPROTO_IPV6,
1161 IPV6_MULTICAST_IF,
1162 &addr6->sin6_scope_id,
1163 sizeof(addr6->sin6_scope_id)) == -1) {
1164 return UV__ERR(errno);
1165 }
1166 } else {
1167 assert(0 && "unexpected address family");
1168 abort();
1169 }
1170
1171 return 0;
1172 }
1173
uv_udp_getpeername(const uv_udp_t * handle,struct sockaddr * name,int * namelen)1174 int uv_udp_getpeername(const uv_udp_t* handle,
1175 struct sockaddr* name,
1176 int* namelen) {
1177
1178 return uv__getsockpeername((const uv_handle_t*) handle,
1179 getpeername,
1180 name,
1181 namelen);
1182 }
1183
uv_udp_getsockname(const uv_udp_t * handle,struct sockaddr * name,int * namelen)1184 int uv_udp_getsockname(const uv_udp_t* handle,
1185 struct sockaddr* name,
1186 int* namelen) {
1187
1188 return uv__getsockpeername((const uv_handle_t*) handle,
1189 getsockname,
1190 name,
1191 namelen);
1192 }
1193
1194
uv__udp_recv_start(uv_udp_t * handle,uv_alloc_cb alloc_cb,uv_udp_recv_cb recv_cb)1195 int uv__udp_recv_start(uv_udp_t* handle,
1196 uv_alloc_cb alloc_cb,
1197 uv_udp_recv_cb recv_cb) {
1198 int err;
1199
1200 if (alloc_cb == NULL || recv_cb == NULL)
1201 return UV_EINVAL;
1202
1203 if (uv__io_active(&handle->io_watcher, POLLIN))
1204 return UV_EALREADY; /* FIXME(bnoordhuis) Should be UV_EBUSY. */
1205
1206 err = uv__udp_maybe_deferred_bind(handle, AF_INET, 0);
1207 if (err)
1208 return err;
1209
1210 handle->alloc_cb = alloc_cb;
1211 handle->recv_cb = recv_cb;
1212
1213 uv__io_start(handle->loop, &handle->io_watcher, POLLIN);
1214 uv__handle_start(handle);
1215
1216 return 0;
1217 }
1218
1219
uv__udp_recv_stop(uv_udp_t * handle)1220 int uv__udp_recv_stop(uv_udp_t* handle) {
1221 uv__io_stop(handle->loop, &handle->io_watcher, POLLIN);
1222
1223 if (!uv__io_active(&handle->io_watcher, POLLOUT))
1224 uv__handle_stop(handle);
1225
1226 handle->alloc_cb = NULL;
1227 handle->recv_cb = NULL;
1228
1229 return 0;
1230 }
1231
1232
uv__udp_prep_pkt(struct msghdr * h,const uv_buf_t * bufs,const unsigned int nbufs,const struct sockaddr * addr)1233 static int uv__udp_prep_pkt(struct msghdr* h,
1234 const uv_buf_t* bufs,
1235 const unsigned int nbufs,
1236 const struct sockaddr* addr) {
1237 memset(h, 0, sizeof(*h));
1238 h->msg_name = (void*) addr;
1239 h->msg_iov = (void*) bufs;
1240 h->msg_iovlen = nbufs;
1241 if (addr == NULL)
1242 return 0;
1243 switch (addr->sa_family) {
1244 case AF_INET:
1245 h->msg_namelen = sizeof(struct sockaddr_in);
1246 return 0;
1247 case AF_INET6:
1248 h->msg_namelen = sizeof(struct sockaddr_in6);
1249 return 0;
1250 case AF_UNIX:
1251 h->msg_namelen = sizeof(struct sockaddr_un);
1252 return 0;
1253 case AF_UNSPEC:
1254 h->msg_name = NULL;
1255 return 0;
1256 }
1257 return UV_EINVAL;
1258 }
1259
1260
uv__udp_sendmsg1(int fd,const uv_buf_t * bufs,unsigned int nbufs,const struct sockaddr * addr)1261 static int uv__udp_sendmsg1(int fd,
1262 const uv_buf_t* bufs,
1263 unsigned int nbufs,
1264 const struct sockaddr* addr) {
1265 struct msghdr h;
1266 int r;
1267
1268 if ((r = uv__udp_prep_pkt(&h, bufs, nbufs, addr)))
1269 return r;
1270
1271 do
1272 r = sendmsg(fd, &h, 0);
1273 while (r == -1 && errno == EINTR);
1274
1275 if (r < 0) {
1276 r = UV__ERR(errno);
1277 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
1278 r = UV_EAGAIN;
1279 return r;
1280 }
1281
1282 /* UDP sockets don't EOF so we don't have to handle r=0 specially,
1283 * that only happens when the input was a zero-sized buffer.
1284 */
1285 return 1;
1286 }
1287
1288
uv__udp_sendmsgv(int fd,unsigned int count,uv_buf_t * bufs[],unsigned int nbufs[],struct sockaddr * addrs[])1289 static int uv__udp_sendmsgv(int fd,
1290 unsigned int count,
1291 uv_buf_t* bufs[/*count*/],
1292 unsigned int nbufs[/*count*/],
1293 struct sockaddr* addrs[/*count*/]) {
1294 unsigned int i;
1295 int nsent;
1296 int r;
1297
1298 r = 0;
1299 nsent = 0;
1300
1301 #if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__)
1302 if (count > 1) {
1303 for (i = 0; i < count; /*empty*/) {
1304 struct mmsghdr m[20];
1305 unsigned int n;
1306
1307 for (n = 0; i < count && n < ARRAY_SIZE(m); i++, n++)
1308 if ((r = uv__udp_prep_pkt(&m[n].msg_hdr, bufs[i], nbufs[i], addrs[i])))
1309 goto exit;
1310
1311 do
1312 #if defined(__APPLE__)
1313 r = sendmsg_x(fd, m, n, MSG_DONTWAIT);
1314 #else
1315 r = sendmmsg(fd, m, n, 0);
1316 #endif
1317 while (r == -1 && errno == EINTR);
1318
1319 if (r < 1)
1320 goto exit;
1321
1322 nsent += r;
1323 i += r;
1324 }
1325
1326 goto exit;
1327 }
1328 #endif /* defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__) */
1329
1330 for (i = 0; i < count; i++, nsent++)
1331 if ((r = uv__udp_sendmsg1(fd, bufs[i], nbufs[i], addrs[i])))
1332 goto exit; /* goto to avoid unused label warning. */
1333
1334 exit:
1335
1336 if (nsent > 0)
1337 return nsent;
1338
1339 if (r < 0) {
1340 r = UV__ERR(errno);
1341 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
1342 r = UV_EAGAIN;
1343 }
1344
1345 return r;
1346 }
1347
1348
uv__udp_sendmsg(uv_udp_t * handle)1349 static void uv__udp_sendmsg(uv_udp_t* handle) {
1350 static const int N = 20;
1351 struct sockaddr* addrs[N];
1352 unsigned int nbufs[N];
1353 uv_buf_t* bufs[N];
1354 struct uv__queue* q;
1355 uv_udp_send_t* req;
1356 int n;
1357
1358 if (uv__queue_empty(&handle->write_queue))
1359 return;
1360
1361 again:
1362 n = 0;
1363 q = uv__queue_head(&handle->write_queue);
1364 do {
1365 req = uv__queue_data(q, uv_udp_send_t, queue);
1366 addrs[n] = &req->u.addr;
1367 nbufs[n] = req->nbufs;
1368 bufs[n] = req->bufs;
1369 q = uv__queue_next(q);
1370 n++;
1371 } while (n < N && q != &handle->write_queue);
1372
1373 n = uv__udp_sendmsgv(handle->io_watcher.fd, n, bufs, nbufs, addrs);
1374 while (n > 0) {
1375 q = uv__queue_head(&handle->write_queue);
1376 req = uv__queue_data(q, uv_udp_send_t, queue);
1377 req->status = uv__count_bufs(req->bufs, req->nbufs);
1378 uv__queue_remove(&req->queue);
1379 uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
1380 n--;
1381 }
1382
1383 if (n == 0) {
1384 if (uv__queue_empty(&handle->write_queue))
1385 goto feed;
1386 goto again;
1387 }
1388
1389 if (n == UV_EAGAIN)
1390 return;
1391
1392 /* Register the error against first request in queue because that
1393 * is the request that uv__udp_sendmsgv tried but failed to send,
1394 * because if it did send any requests, it won't return an error.
1395 */
1396 q = uv__queue_head(&handle->write_queue);
1397 req = uv__queue_data(q, uv_udp_send_t, queue);
1398 req->status = n;
1399 uv__queue_remove(&req->queue);
1400 uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
1401 feed:
1402 uv__io_feed(handle->loop, &handle->io_watcher);
1403 }
1404
1405
uv__udp_try_send2(uv_udp_t * handle,unsigned int count,uv_buf_t * bufs[],unsigned int nbufs[],struct sockaddr * addrs[])1406 int uv__udp_try_send2(uv_udp_t* handle,
1407 unsigned int count,
1408 uv_buf_t* bufs[/*count*/],
1409 unsigned int nbufs[/*count*/],
1410 struct sockaddr* addrs[/*count*/]) {
1411 int fd;
1412
1413 fd = handle->io_watcher.fd;
1414 if (fd == -1)
1415 return UV_EINVAL;
1416
1417 return uv__udp_sendmsgv(fd, count, bufs, nbufs, addrs);
1418 }
1419