xref: /libuv/src/unix/udp.c (revision e8969bff)
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include "uv.h"
23 #include "internal.h"
24 
25 #include <assert.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <stdlib.h>
29 #include <unistd.h>
30 #if defined(__MVS__)
31 #include <xti.h>
32 #endif
33 #include <sys/un.h>
34 
35 #if defined(IPV6_JOIN_GROUP) && !defined(IPV6_ADD_MEMBERSHIP)
36 # define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
37 #endif
38 
39 #if defined(IPV6_LEAVE_GROUP) && !defined(IPV6_DROP_MEMBERSHIP)
40 # define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
41 #endif
42 
43 static void uv__udp_run_completed(uv_udp_t* handle);
44 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
45 static void uv__udp_recvmsg(uv_udp_t* handle);
46 static void uv__udp_sendmsg(uv_udp_t* handle);
47 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
48                                        int domain,
49                                        unsigned int flags);
50 static int uv__udp_sendmsg1(int fd,
51                             const uv_buf_t* bufs,
52                             unsigned int nbufs,
53                             const struct sockaddr* addr);
54 
55 
uv__udp_close(uv_udp_t * handle)56 void uv__udp_close(uv_udp_t* handle) {
57   uv__io_close(handle->loop, &handle->io_watcher);
58   uv__handle_stop(handle);
59 
60   if (handle->io_watcher.fd != -1) {
61     uv__close(handle->io_watcher.fd);
62     handle->io_watcher.fd = -1;
63   }
64 }
65 
66 
uv__udp_finish_close(uv_udp_t * handle)67 void uv__udp_finish_close(uv_udp_t* handle) {
68   uv_udp_send_t* req;
69   struct uv__queue* q;
70 
71   assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
72   assert(handle->io_watcher.fd == -1);
73 
74   while (!uv__queue_empty(&handle->write_queue)) {
75     q = uv__queue_head(&handle->write_queue);
76     uv__queue_remove(q);
77 
78     req = uv__queue_data(q, uv_udp_send_t, queue);
79     req->status = UV_ECANCELED;
80     uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
81   }
82 
83   uv__udp_run_completed(handle);
84 
85   assert(handle->send_queue_size == 0);
86   assert(handle->send_queue_count == 0);
87 
88   /* Now tear down the handle. */
89   handle->recv_cb = NULL;
90   handle->alloc_cb = NULL;
91   /* but _do not_ touch close_cb */
92 }
93 
94 
uv__udp_run_completed(uv_udp_t * handle)95 static void uv__udp_run_completed(uv_udp_t* handle) {
96   uv_udp_send_t* req;
97   struct uv__queue* q;
98 
99   assert(!(handle->flags & UV_HANDLE_UDP_PROCESSING));
100   handle->flags |= UV_HANDLE_UDP_PROCESSING;
101 
102   while (!uv__queue_empty(&handle->write_completed_queue)) {
103     q = uv__queue_head(&handle->write_completed_queue);
104     uv__queue_remove(q);
105 
106     req = uv__queue_data(q, uv_udp_send_t, queue);
107     uv__req_unregister(handle->loop);
108 
109     handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);
110     handle->send_queue_count--;
111 
112     if (req->bufs != req->bufsml)
113       uv__free(req->bufs);
114     req->bufs = NULL;
115 
116     if (req->send_cb == NULL)
117       continue;
118 
119     /* req->status >= 0 == bytes written
120      * req->status <  0 == errno
121      */
122     if (req->status >= 0)
123       req->send_cb(req, 0);
124     else
125       req->send_cb(req, req->status);
126   }
127 
128   if (uv__queue_empty(&handle->write_queue)) {
129     /* Pending queue and completion queue empty, stop watcher. */
130     uv__io_stop(handle->loop, &handle->io_watcher, POLLOUT);
131     if (!uv__io_active(&handle->io_watcher, POLLIN))
132       uv__handle_stop(handle);
133   }
134 
135   handle->flags &= ~UV_HANDLE_UDP_PROCESSING;
136 }
137 
138 
uv__udp_io(uv_loop_t * loop,uv__io_t * w,unsigned int revents)139 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {
140   uv_udp_t* handle;
141 
142   handle = container_of(w, uv_udp_t, io_watcher);
143   assert(handle->type == UV_UDP);
144 
145   if (revents & POLLIN)
146     uv__udp_recvmsg(handle);
147 
148   if (revents & POLLOUT && !uv__is_closing(handle)) {
149     uv__udp_sendmsg(handle);
150     uv__udp_run_completed(handle);
151   }
152 }
153 
uv__udp_recvmmsg(uv_udp_t * handle,uv_buf_t * buf)154 static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf) {
155 #if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__)
156   struct sockaddr_in6 peers[20];
157   struct iovec iov[ARRAY_SIZE(peers)];
158   struct mmsghdr msgs[ARRAY_SIZE(peers)];
159   ssize_t nread;
160   uv_buf_t chunk_buf;
161   size_t chunks;
162   int flags;
163   size_t k;
164 
165   /* prepare structures for recvmmsg */
166   chunks = buf->len / UV__UDP_DGRAM_MAXSIZE;
167   if (chunks > ARRAY_SIZE(iov))
168     chunks = ARRAY_SIZE(iov);
169   for (k = 0; k < chunks; ++k) {
170     iov[k].iov_base = buf->base + k * UV__UDP_DGRAM_MAXSIZE;
171     iov[k].iov_len = UV__UDP_DGRAM_MAXSIZE;
172     memset(&msgs[k].msg_hdr, 0, sizeof(msgs[k].msg_hdr));
173     msgs[k].msg_hdr.msg_iov = iov + k;
174     msgs[k].msg_hdr.msg_iovlen = 1;
175     msgs[k].msg_hdr.msg_name = peers + k;
176     msgs[k].msg_hdr.msg_namelen = sizeof(peers[0]);
177     msgs[k].msg_hdr.msg_control = NULL;
178     msgs[k].msg_hdr.msg_controllen = 0;
179     msgs[k].msg_hdr.msg_flags = 0;
180     msgs[k].msg_len = 0;
181   }
182 
183 #if defined(__APPLE__)
184   do
185     nread = recvmsg_x(handle->io_watcher.fd, msgs, chunks, MSG_DONTWAIT);
186   while (nread == -1 && errno == EINTR);
187 #else
188   do
189     nread = recvmmsg(handle->io_watcher.fd, msgs, chunks, 0, NULL);
190   while (nread == -1 && errno == EINTR);
191 #endif
192 
193   if (nread < 1) {
194     if (nread == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
195       handle->recv_cb(handle, 0, buf, NULL, 0);
196     else
197       handle->recv_cb(handle, UV__ERR(errno), buf, NULL, 0);
198   } else {
199     /* pass each chunk to the application */
200     for (k = 0; k < (size_t) nread && handle->recv_cb != NULL; k++) {
201       flags = UV_UDP_MMSG_CHUNK;
202       if (msgs[k].msg_hdr.msg_flags & MSG_TRUNC)
203         flags |= UV_UDP_PARTIAL;
204 
205       chunk_buf = uv_buf_init(iov[k].iov_base, iov[k].iov_len);
206       handle->recv_cb(handle,
207                       msgs[k].msg_len,
208                       &chunk_buf,
209                       msgs[k].msg_hdr.msg_name,
210                       flags);
211     }
212 
213     /* one last callback so the original buffer is freed */
214     if (handle->recv_cb != NULL)
215       handle->recv_cb(handle, 0, buf, NULL, UV_UDP_MMSG_FREE);
216   }
217   return nread;
218 #else  /* __linux__ || ____FreeBSD__ || __APPLE__ */
219   return UV_ENOSYS;
220 #endif  /* __linux__ || ____FreeBSD__ || __APPLE__ */
221 }
222 
uv__udp_recvmsg(uv_udp_t * handle)223 static void uv__udp_recvmsg(uv_udp_t* handle) {
224   struct sockaddr_storage peer;
225   struct msghdr h;
226   ssize_t nread;
227   uv_buf_t buf;
228   int flags;
229   int count;
230 
231   assert(handle->recv_cb != NULL);
232   assert(handle->alloc_cb != NULL);
233 
234   /* Prevent loop starvation when the data comes in as fast as (or faster than)
235    * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
236    */
237   count = 32;
238 
239   do {
240     buf = uv_buf_init(NULL, 0);
241     handle->alloc_cb((uv_handle_t*) handle, UV__UDP_DGRAM_MAXSIZE, &buf);
242     if (buf.base == NULL || buf.len == 0) {
243       handle->recv_cb(handle, UV_ENOBUFS, &buf, NULL, 0);
244       return;
245     }
246     assert(buf.base != NULL);
247 
248     if (uv_udp_using_recvmmsg(handle)) {
249       nread = uv__udp_recvmmsg(handle, &buf);
250       if (nread > 0)
251         count -= nread;
252       continue;
253     }
254 
255     memset(&h, 0, sizeof(h));
256     memset(&peer, 0, sizeof(peer));
257     h.msg_name = &peer;
258     h.msg_namelen = sizeof(peer);
259     h.msg_iov = (void*) &buf;
260     h.msg_iovlen = 1;
261 
262     do {
263       nread = recvmsg(handle->io_watcher.fd, &h, 0);
264     }
265     while (nread == -1 && errno == EINTR);
266 
267     if (nread == -1) {
268       if (errno == EAGAIN || errno == EWOULDBLOCK)
269         handle->recv_cb(handle, 0, &buf, NULL, 0);
270       else
271         handle->recv_cb(handle, UV__ERR(errno), &buf, NULL, 0);
272     }
273     else {
274       flags = 0;
275       if (h.msg_flags & MSG_TRUNC)
276         flags |= UV_UDP_PARTIAL;
277 
278       handle->recv_cb(handle, nread, &buf, (const struct sockaddr*) &peer, flags);
279     }
280     count--;
281   }
282   /* recv_cb callback may decide to pause or close the handle */
283   while (nread != -1
284       && count > 0
285       && handle->io_watcher.fd != -1
286       && handle->recv_cb != NULL);
287 }
288 
289 
290 /* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional
291  * refinements for programs that use multicast. Therefore we preferentially
292  * set SO_REUSEPORT over SO_REUSEADDR here, but we set SO_REUSEPORT only
293  * when that socket option doesn't have the capability of load balancing.
294  * Otherwise, we fall back to SO_REUSEADDR.
295  *
296  * Linux as of 3.9, DragonflyBSD 3.6, AIX 7.2.5 have the SO_REUSEPORT socket
297  * option but with semantics that are different from the BSDs: it _shares_
298  * the port rather than steals it from the current listener. While useful,
299  * it's not something we can emulate on other platforms so we don't enable it.
300  *
301  * zOS does not support getsockname with SO_REUSEPORT option when using
302  * AF_UNIX.
303  */
uv__sock_reuseaddr(int fd)304 static int uv__sock_reuseaddr(int fd) {
305   int yes;
306   yes = 1;
307 
308 #if defined(SO_REUSEPORT) && defined(__MVS__)
309   struct sockaddr_in sockfd;
310   unsigned int sockfd_len = sizeof(sockfd);
311   if (getsockname(fd, (struct sockaddr*) &sockfd, &sockfd_len) == -1)
312       return UV__ERR(errno);
313   if (sockfd.sin_family == AF_UNIX) {
314     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
315       return UV__ERR(errno);
316   } else {
317     if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
318        return UV__ERR(errno);
319   }
320 #elif defined(SO_REUSEPORT) && !defined(__linux__) && !defined(__GNU__) && \
321 	!defined(__sun__) && !defined(__DragonFly__) && !defined(_AIX73)
322   if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
323     return UV__ERR(errno);
324 #else
325   if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
326     return UV__ERR(errno);
327 #endif
328 
329   return 0;
330 }
331 
332 /*
333  * The Linux kernel suppresses some ICMP error messages by default for UDP
334  * sockets. Setting IP_RECVERR/IPV6_RECVERR on the socket enables full ICMP
335  * error reporting, hopefully resulting in faster failover to working name
336  * servers.
337  */
uv__set_recverr(int fd,sa_family_t ss_family)338 static int uv__set_recverr(int fd, sa_family_t ss_family) {
339 #if defined(__linux__)
340   int yes;
341 
342   yes = 1;
343   if (ss_family == AF_INET) {
344     if (setsockopt(fd, IPPROTO_IP, IP_RECVERR, &yes, sizeof(yes)))
345       return UV__ERR(errno);
346   } else if (ss_family == AF_INET6) {
347     if (setsockopt(fd, IPPROTO_IPV6, IPV6_RECVERR, &yes, sizeof(yes)))
348        return UV__ERR(errno);
349   }
350 #endif
351   return 0;
352 }
353 
354 
uv__udp_bind(uv_udp_t * handle,const struct sockaddr * addr,unsigned int addrlen,unsigned int flags)355 int uv__udp_bind(uv_udp_t* handle,
356                  const struct sockaddr* addr,
357                  unsigned int addrlen,
358                  unsigned int flags) {
359   int err;
360   int yes;
361   int fd;
362 
363   /* Check for bad flags. */
364   if (flags & ~(UV_UDP_IPV6ONLY | UV_UDP_REUSEADDR |
365                 UV_UDP_REUSEPORT | UV_UDP_LINUX_RECVERR))
366     return UV_EINVAL;
367 
368   /* Cannot set IPv6-only mode on non-IPv6 socket. */
369   if ((flags & UV_UDP_IPV6ONLY) && addr->sa_family != AF_INET6)
370     return UV_EINVAL;
371 
372   fd = handle->io_watcher.fd;
373   if (fd == -1) {
374     err = uv__socket(addr->sa_family, SOCK_DGRAM, 0);
375     if (err < 0)
376       return err;
377     fd = err;
378     handle->io_watcher.fd = fd;
379   }
380 
381   if (flags & UV_UDP_LINUX_RECVERR) {
382     err = uv__set_recverr(fd, addr->sa_family);
383     if (err)
384       return err;
385   }
386 
387   if (flags & UV_UDP_REUSEADDR) {
388     err = uv__sock_reuseaddr(fd);
389     if (err)
390       return err;
391   }
392 
393   if (flags & UV_UDP_REUSEPORT) {
394     err = uv__sock_reuseport(fd);
395     if (err)
396       return err;
397   }
398 
399   if (flags & UV_UDP_IPV6ONLY) {
400 #ifdef IPV6_V6ONLY
401     yes = 1;
402     if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof yes) == -1) {
403       err = UV__ERR(errno);
404       return err;
405     }
406 #else
407     err = UV_ENOTSUP;
408     return err;
409 #endif
410   }
411 
412   if (bind(fd, addr, addrlen)) {
413     err = UV__ERR(errno);
414     if (errno == EAFNOSUPPORT)
415       /* OSX, other BSDs and SunoS fail with EAFNOSUPPORT when binding a
416        * socket created with AF_INET to an AF_INET6 address or vice versa. */
417       err = UV_EINVAL;
418     return err;
419   }
420 
421   if (addr->sa_family == AF_INET6)
422     handle->flags |= UV_HANDLE_IPV6;
423 
424   handle->flags |= UV_HANDLE_BOUND;
425   return 0;
426 }
427 
428 
uv__udp_maybe_deferred_bind(uv_udp_t * handle,int domain,unsigned int flags)429 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
430                                        int domain,
431                                        unsigned int flags) {
432   union uv__sockaddr taddr;
433   socklen_t addrlen;
434 
435   if (handle->io_watcher.fd != -1)
436     return 0;
437 
438   switch (domain) {
439   case AF_INET:
440   {
441     struct sockaddr_in* addr = &taddr.in;
442     memset(addr, 0, sizeof *addr);
443     addr->sin_family = AF_INET;
444     addr->sin_addr.s_addr = INADDR_ANY;
445     addrlen = sizeof *addr;
446     break;
447   }
448   case AF_INET6:
449   {
450     struct sockaddr_in6* addr = &taddr.in6;
451     memset(addr, 0, sizeof *addr);
452     addr->sin6_family = AF_INET6;
453     addr->sin6_addr = in6addr_any;
454     addrlen = sizeof *addr;
455     break;
456   }
457   default:
458     assert(0 && "unsupported address family");
459     abort();
460   }
461 
462   return uv__udp_bind(handle, &taddr.addr, addrlen, flags);
463 }
464 
465 
uv__udp_connect(uv_udp_t * handle,const struct sockaddr * addr,unsigned int addrlen)466 int uv__udp_connect(uv_udp_t* handle,
467                     const struct sockaddr* addr,
468                     unsigned int addrlen) {
469   int err;
470 
471   err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
472   if (err)
473     return err;
474 
475   do {
476     errno = 0;
477     err = connect(handle->io_watcher.fd, addr, addrlen);
478   } while (err == -1 && errno == EINTR);
479 
480   if (err)
481     return UV__ERR(errno);
482 
483   handle->flags |= UV_HANDLE_UDP_CONNECTED;
484 
485   return 0;
486 }
487 
488 /* From https://pubs.opengroup.org/onlinepubs/9699919799/functions/connect.html
489  * Any of uv supported UNIXs kernel should be standardized, but the kernel
490  * implementation logic not same, let's use pseudocode to explain the udp
491  * disconnect behaviors:
492  *
493  * Predefined stubs for pseudocode:
494  *   1. sodisconnect: The function to perform the real udp disconnect
495  *   2. pru_connect: The function to perform the real udp connect
496  *   3. so: The kernel object match with socket fd
497  *   4. addr: The sockaddr parameter from user space
498  *
499  * BSDs:
500  *   if(sodisconnect(so) == 0) { // udp disconnect succeed
501  *     if (addr->sa_len != so->addr->sa_len) return EINVAL;
502  *     if (addr->sa_family != so->addr->sa_family) return EAFNOSUPPORT;
503  *     pru_connect(so);
504  *   }
505  *   else return EISCONN;
506  *
507  * z/OS (same with Windows):
508  *   if(addr->sa_len < so->addr->sa_len) return EINVAL;
509  *   if (addr->sa_family == AF_UNSPEC) sodisconnect(so);
510  *
511  * AIX:
512  *   if(addr->sa_len != sizeof(struct sockaddr)) return EINVAL; // ignore ip proto version
513  *   if (addr->sa_family == AF_UNSPEC) sodisconnect(so);
514  *
515  * Linux,Others:
516  *   if(addr->sa_len < sizeof(struct sockaddr)) return EINVAL;
517  *   if (addr->sa_family == AF_UNSPEC) sodisconnect(so);
518  */
uv__udp_disconnect(uv_udp_t * handle)519 int uv__udp_disconnect(uv_udp_t* handle) {
520     int r;
521 #if defined(__MVS__)
522     struct sockaddr_storage addr;
523 #else
524     struct sockaddr addr;
525 #endif
526 
527     memset(&addr, 0, sizeof(addr));
528 
529 #if defined(__MVS__)
530     addr.ss_family = AF_UNSPEC;
531 #else
532     addr.sa_family = AF_UNSPEC;
533 #endif
534 
535     do {
536       errno = 0;
537 #ifdef __PASE__
538       /* On IBMi a connectionless transport socket can be disconnected by
539        * either setting the addr parameter to NULL or setting the
540        * addr_length parameter to zero, and issuing another connect().
541        * https://www.ibm.com/docs/en/i/7.4?topic=ssw_ibm_i_74/apis/connec.htm
542        */
543       r = connect(handle->io_watcher.fd, (struct sockaddr*) NULL, 0);
544 #else
545       r = connect(handle->io_watcher.fd, (struct sockaddr*) &addr, sizeof(addr));
546 #endif
547     } while (r == -1 && errno == EINTR);
548 
549     if (r == -1) {
550 #if defined(BSD)  /* The macro BSD is from sys/param.h */
551       if (errno != EAFNOSUPPORT && errno != EINVAL)
552         return UV__ERR(errno);
553 #else
554       return UV__ERR(errno);
555 #endif
556     }
557 
558     handle->flags &= ~UV_HANDLE_UDP_CONNECTED;
559     return 0;
560 }
561 
uv__udp_send(uv_udp_send_t * req,uv_udp_t * handle,const uv_buf_t bufs[],unsigned int nbufs,const struct sockaddr * addr,unsigned int addrlen,uv_udp_send_cb send_cb)562 int uv__udp_send(uv_udp_send_t* req,
563                  uv_udp_t* handle,
564                  const uv_buf_t bufs[],
565                  unsigned int nbufs,
566                  const struct sockaddr* addr,
567                  unsigned int addrlen,
568                  uv_udp_send_cb send_cb) {
569   int err;
570   int empty_queue;
571 
572   assert(nbufs > 0);
573 
574   if (addr) {
575     err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
576     if (err)
577       return err;
578   }
579 
580   /* It's legal for send_queue_count > 0 even when the write_queue is empty;
581    * it means there are error-state requests in the write_completed_queue that
582    * will touch up send_queue_size/count later.
583    */
584   empty_queue = (handle->send_queue_count == 0);
585 
586   uv__req_init(handle->loop, req, UV_UDP_SEND);
587   assert(addrlen <= sizeof(req->u.storage));
588   if (addr == NULL)
589     req->u.storage.ss_family = AF_UNSPEC;
590   else
591     memcpy(&req->u.storage, addr, addrlen);
592   req->send_cb = send_cb;
593   req->handle = handle;
594   req->nbufs = nbufs;
595 
596   req->bufs = req->bufsml;
597   if (nbufs > ARRAY_SIZE(req->bufsml))
598     req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
599 
600   if (req->bufs == NULL) {
601     uv__req_unregister(handle->loop);
602     return UV_ENOMEM;
603   }
604 
605   memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
606   handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs);
607   handle->send_queue_count++;
608   uv__queue_insert_tail(&handle->write_queue, &req->queue);
609   uv__handle_start(handle);
610 
611   if (empty_queue && !(handle->flags & UV_HANDLE_UDP_PROCESSING)) {
612     uv__udp_sendmsg(handle);
613 
614     /* `uv__udp_sendmsg` may not be able to do non-blocking write straight
615      * away. In such cases the `io_watcher` has to be queued for asynchronous
616      * write.
617      */
618     if (!uv__queue_empty(&handle->write_queue))
619       uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
620   } else {
621     uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
622   }
623 
624   return 0;
625 }
626 
627 
uv__udp_try_send(uv_udp_t * handle,const uv_buf_t bufs[],unsigned int nbufs,const struct sockaddr * addr,unsigned int addrlen)628 int uv__udp_try_send(uv_udp_t* handle,
629                      const uv_buf_t bufs[],
630                      unsigned int nbufs,
631                      const struct sockaddr* addr,
632                      unsigned int addrlen) {
633   int err;
634 
635   if (nbufs < 1)
636     return UV_EINVAL;
637 
638   /* already sending a message */
639   if (handle->send_queue_count != 0)
640     return UV_EAGAIN;
641 
642   if (addr) {
643     err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
644     if (err)
645       return err;
646   } else {
647     assert(handle->flags & UV_HANDLE_UDP_CONNECTED);
648   }
649 
650   err = uv__udp_sendmsg1(handle->io_watcher.fd, bufs, nbufs, addr);
651   if (err > 0)
652     return uv__count_bufs(bufs, nbufs);
653 
654   return err;
655 }
656 
657 
uv__udp_set_membership4(uv_udp_t * handle,const struct sockaddr_in * multicast_addr,const char * interface_addr,uv_membership membership)658 static int uv__udp_set_membership4(uv_udp_t* handle,
659                                    const struct sockaddr_in* multicast_addr,
660                                    const char* interface_addr,
661                                    uv_membership membership) {
662   struct ip_mreq mreq;
663   int optname;
664   int err;
665 
666   memset(&mreq, 0, sizeof mreq);
667 
668   if (interface_addr) {
669     err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
670     if (err)
671       return err;
672   } else {
673     mreq.imr_interface.s_addr = htonl(INADDR_ANY);
674   }
675 
676   mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
677 
678   switch (membership) {
679   case UV_JOIN_GROUP:
680     optname = IP_ADD_MEMBERSHIP;
681     break;
682   case UV_LEAVE_GROUP:
683     optname = IP_DROP_MEMBERSHIP;
684     break;
685   default:
686     return UV_EINVAL;
687   }
688 
689   if (setsockopt(handle->io_watcher.fd,
690                  IPPROTO_IP,
691                  optname,
692                  &mreq,
693                  sizeof(mreq))) {
694 #if defined(__MVS__)
695   if (errno == ENXIO)
696     return UV_ENODEV;
697 #endif
698     return UV__ERR(errno);
699   }
700 
701   return 0;
702 }
703 
704 
uv__udp_set_membership6(uv_udp_t * handle,const struct sockaddr_in6 * multicast_addr,const char * interface_addr,uv_membership membership)705 static int uv__udp_set_membership6(uv_udp_t* handle,
706                                    const struct sockaddr_in6* multicast_addr,
707                                    const char* interface_addr,
708                                    uv_membership membership) {
709   int optname;
710   struct ipv6_mreq mreq;
711   struct sockaddr_in6 addr6;
712 
713   memset(&mreq, 0, sizeof mreq);
714 
715   if (interface_addr) {
716     if (uv_ip6_addr(interface_addr, 0, &addr6))
717       return UV_EINVAL;
718     mreq.ipv6mr_interface = addr6.sin6_scope_id;
719   } else {
720     mreq.ipv6mr_interface = 0;
721   }
722 
723   mreq.ipv6mr_multiaddr = multicast_addr->sin6_addr;
724 
725   switch (membership) {
726   case UV_JOIN_GROUP:
727     optname = IPV6_ADD_MEMBERSHIP;
728     break;
729   case UV_LEAVE_GROUP:
730     optname = IPV6_DROP_MEMBERSHIP;
731     break;
732   default:
733     return UV_EINVAL;
734   }
735 
736   if (setsockopt(handle->io_watcher.fd,
737                  IPPROTO_IPV6,
738                  optname,
739                  &mreq,
740                  sizeof(mreq))) {
741 #if defined(__MVS__)
742   if (errno == ENXIO)
743     return UV_ENODEV;
744 #endif
745     return UV__ERR(errno);
746   }
747 
748   return 0;
749 }
750 
751 
752 #if !defined(__OpenBSD__) &&                                        \
753     !defined(__NetBSD__) &&                                         \
754     !defined(__ANDROID__) &&                                        \
755     !defined(__DragonFly__) &&                                      \
756     !defined(__QNX__) &&                                            \
757     !defined(__GNU__)
uv__udp_set_source_membership4(uv_udp_t * handle,const struct sockaddr_in * multicast_addr,const char * interface_addr,const struct sockaddr_in * source_addr,uv_membership membership)758 static int uv__udp_set_source_membership4(uv_udp_t* handle,
759                                           const struct sockaddr_in* multicast_addr,
760                                           const char* interface_addr,
761                                           const struct sockaddr_in* source_addr,
762                                           uv_membership membership) {
763   struct ip_mreq_source mreq;
764   int optname;
765   int err;
766 
767   err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
768   if (err)
769     return err;
770 
771   memset(&mreq, 0, sizeof(mreq));
772 
773   if (interface_addr != NULL) {
774     err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
775     if (err)
776       return err;
777   } else {
778     mreq.imr_interface.s_addr = htonl(INADDR_ANY);
779   }
780 
781   mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
782   mreq.imr_sourceaddr.s_addr = source_addr->sin_addr.s_addr;
783 
784   if (membership == UV_JOIN_GROUP)
785     optname = IP_ADD_SOURCE_MEMBERSHIP;
786   else if (membership == UV_LEAVE_GROUP)
787     optname = IP_DROP_SOURCE_MEMBERSHIP;
788   else
789     return UV_EINVAL;
790 
791   if (setsockopt(handle->io_watcher.fd,
792                  IPPROTO_IP,
793                  optname,
794                  &mreq,
795                  sizeof(mreq))) {
796     return UV__ERR(errno);
797   }
798 
799   return 0;
800 }
801 
802 
uv__udp_set_source_membership6(uv_udp_t * handle,const struct sockaddr_in6 * multicast_addr,const char * interface_addr,const struct sockaddr_in6 * source_addr,uv_membership membership)803 static int uv__udp_set_source_membership6(uv_udp_t* handle,
804                                           const struct sockaddr_in6* multicast_addr,
805                                           const char* interface_addr,
806                                           const struct sockaddr_in6* source_addr,
807                                           uv_membership membership) {
808   struct group_source_req mreq;
809   struct sockaddr_in6 addr6;
810   int optname;
811   int err;
812 
813   err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
814   if (err)
815     return err;
816 
817   memset(&mreq, 0, sizeof(mreq));
818 
819   if (interface_addr != NULL) {
820     err = uv_ip6_addr(interface_addr, 0, &addr6);
821     if (err)
822       return err;
823     mreq.gsr_interface = addr6.sin6_scope_id;
824   } else {
825     mreq.gsr_interface = 0;
826   }
827 
828   STATIC_ASSERT(sizeof(mreq.gsr_group) >= sizeof(*multicast_addr));
829   STATIC_ASSERT(sizeof(mreq.gsr_source) >= sizeof(*source_addr));
830   memcpy(&mreq.gsr_group, multicast_addr, sizeof(*multicast_addr));
831   memcpy(&mreq.gsr_source, source_addr, sizeof(*source_addr));
832 
833   if (membership == UV_JOIN_GROUP)
834     optname = MCAST_JOIN_SOURCE_GROUP;
835   else if (membership == UV_LEAVE_GROUP)
836     optname = MCAST_LEAVE_SOURCE_GROUP;
837   else
838     return UV_EINVAL;
839 
840   if (setsockopt(handle->io_watcher.fd,
841                  IPPROTO_IPV6,
842                  optname,
843                  &mreq,
844                  sizeof(mreq))) {
845     return UV__ERR(errno);
846   }
847 
848   return 0;
849 }
850 #endif
851 
852 
uv__udp_init_ex(uv_loop_t * loop,uv_udp_t * handle,unsigned flags,int domain)853 int uv__udp_init_ex(uv_loop_t* loop,
854                     uv_udp_t* handle,
855                     unsigned flags,
856                     int domain) {
857   int fd;
858 
859   fd = -1;
860   if (domain != AF_UNSPEC) {
861     fd = uv__socket(domain, SOCK_DGRAM, 0);
862     if (fd < 0)
863       return fd;
864   }
865 
866   uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP);
867   handle->alloc_cb = NULL;
868   handle->recv_cb = NULL;
869   handle->send_queue_size = 0;
870   handle->send_queue_count = 0;
871   uv__io_init(&handle->io_watcher, uv__udp_io, fd);
872   uv__queue_init(&handle->write_queue);
873   uv__queue_init(&handle->write_completed_queue);
874 
875   return 0;
876 }
877 
878 
uv_udp_using_recvmmsg(const uv_udp_t * handle)879 int uv_udp_using_recvmmsg(const uv_udp_t* handle) {
880 #if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__)
881   if (handle->flags & UV_HANDLE_UDP_RECVMMSG)
882     return 1;
883 #endif
884   return 0;
885 }
886 
887 
uv_udp_open(uv_udp_t * handle,uv_os_sock_t sock)888 int uv_udp_open(uv_udp_t* handle, uv_os_sock_t sock) {
889   int err;
890 
891   /* Check for already active socket. */
892   if (handle->io_watcher.fd != -1)
893     return UV_EBUSY;
894 
895   if (uv__fd_exists(handle->loop, sock))
896     return UV_EEXIST;
897 
898   err = uv__nonblock(sock, 1);
899   if (err)
900     return err;
901 
902   err = uv__sock_reuseaddr(sock);
903   if (err)
904     return err;
905 
906   handle->io_watcher.fd = sock;
907   if (uv__udp_is_connected(handle))
908     handle->flags |= UV_HANDLE_UDP_CONNECTED;
909 
910   return 0;
911 }
912 
913 
uv_udp_set_membership(uv_udp_t * handle,const char * multicast_addr,const char * interface_addr,uv_membership membership)914 int uv_udp_set_membership(uv_udp_t* handle,
915                           const char* multicast_addr,
916                           const char* interface_addr,
917                           uv_membership membership) {
918   int err;
919   struct sockaddr_in addr4;
920   struct sockaddr_in6 addr6;
921 
922   if (uv_ip4_addr(multicast_addr, 0, &addr4) == 0) {
923     err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
924     if (err)
925       return err;
926     return uv__udp_set_membership4(handle, &addr4, interface_addr, membership);
927   } else if (uv_ip6_addr(multicast_addr, 0, &addr6) == 0) {
928     err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
929     if (err)
930       return err;
931     return uv__udp_set_membership6(handle, &addr6, interface_addr, membership);
932   } else {
933     return UV_EINVAL;
934   }
935 }
936 
937 
uv_udp_set_source_membership(uv_udp_t * handle,const char * multicast_addr,const char * interface_addr,const char * source_addr,uv_membership membership)938 int uv_udp_set_source_membership(uv_udp_t* handle,
939                                  const char* multicast_addr,
940                                  const char* interface_addr,
941                                  const char* source_addr,
942                                  uv_membership membership) {
943 #if !defined(__OpenBSD__) &&                                        \
944     !defined(__NetBSD__) &&                                         \
945     !defined(__ANDROID__) &&                                        \
946     !defined(__DragonFly__) &&                                      \
947     !defined(__QNX__) &&                                            \
948     !defined(__GNU__)
949   int err;
950   union uv__sockaddr mcast_addr;
951   union uv__sockaddr src_addr;
952 
953   err = uv_ip4_addr(multicast_addr, 0, &mcast_addr.in);
954   if (err) {
955     err = uv_ip6_addr(multicast_addr, 0, &mcast_addr.in6);
956     if (err)
957       return err;
958     err = uv_ip6_addr(source_addr, 0, &src_addr.in6);
959     if (err)
960       return err;
961     return uv__udp_set_source_membership6(handle,
962                                           &mcast_addr.in6,
963                                           interface_addr,
964                                           &src_addr.in6,
965                                           membership);
966   }
967 
968   err = uv_ip4_addr(source_addr, 0, &src_addr.in);
969   if (err)
970     return err;
971   return uv__udp_set_source_membership4(handle,
972                                         &mcast_addr.in,
973                                         interface_addr,
974                                         &src_addr.in,
975                                         membership);
976 #else
977   return UV_ENOSYS;
978 #endif
979 }
980 
981 
uv__setsockopt(uv_udp_t * handle,int option4,int option6,const void * val,socklen_t size)982 static int uv__setsockopt(uv_udp_t* handle,
983                          int option4,
984                          int option6,
985                          const void* val,
986                          socklen_t size) {
987   int r;
988 
989   if (handle->flags & UV_HANDLE_IPV6)
990     r = setsockopt(handle->io_watcher.fd,
991                    IPPROTO_IPV6,
992                    option6,
993                    val,
994                    size);
995   else
996     r = setsockopt(handle->io_watcher.fd,
997                    IPPROTO_IP,
998                    option4,
999                    val,
1000                    size);
1001   if (r)
1002     return UV__ERR(errno);
1003 
1004   return 0;
1005 }
1006 
uv__setsockopt_maybe_char(uv_udp_t * handle,int option4,int option6,int val)1007 static int uv__setsockopt_maybe_char(uv_udp_t* handle,
1008                                      int option4,
1009                                      int option6,
1010                                      int val) {
1011 #if defined(__sun) || defined(_AIX) || defined(__MVS__)
1012   char arg = val;
1013 #elif defined(__OpenBSD__)
1014   unsigned char arg = val;
1015 #else
1016   int arg = val;
1017 #endif
1018 
1019   if (val < 0 || val > 255)
1020     return UV_EINVAL;
1021 
1022   return uv__setsockopt(handle, option4, option6, &arg, sizeof(arg));
1023 }
1024 
1025 
uv_udp_set_broadcast(uv_udp_t * handle,int on)1026 int uv_udp_set_broadcast(uv_udp_t* handle, int on) {
1027   if (setsockopt(handle->io_watcher.fd,
1028                  SOL_SOCKET,
1029                  SO_BROADCAST,
1030                  &on,
1031                  sizeof(on))) {
1032     return UV__ERR(errno);
1033   }
1034 
1035   return 0;
1036 }
1037 
1038 
uv_udp_set_ttl(uv_udp_t * handle,int ttl)1039 int uv_udp_set_ttl(uv_udp_t* handle, int ttl) {
1040   if (ttl < 1 || ttl > 255)
1041     return UV_EINVAL;
1042 
1043 #if defined(__MVS__)
1044   if (!(handle->flags & UV_HANDLE_IPV6))
1045     return UV_ENOTSUP;  /* zOS does not support setting ttl for IPv4 */
1046 #endif
1047 
1048 /*
1049  * On Solaris and derivatives such as SmartOS, the length of socket options
1050  * is sizeof(int) for IP_TTL and IPV6_UNICAST_HOPS,
1051  * so hardcode the size of these options on this platform,
1052  * and use the general uv__setsockopt_maybe_char call on other platforms.
1053  */
1054 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1055     defined(__MVS__) || defined(__QNX__)
1056 
1057   return uv__setsockopt(handle,
1058                         IP_TTL,
1059                         IPV6_UNICAST_HOPS,
1060                         &ttl,
1061                         sizeof(ttl));
1062 
1063 #else /* !(defined(__sun) || defined(_AIX) || defined (__OpenBSD__) ||
1064            defined(__MVS__) || defined(__QNX__)) */
1065 
1066   return uv__setsockopt_maybe_char(handle,
1067                                    IP_TTL,
1068                                    IPV6_UNICAST_HOPS,
1069                                    ttl);
1070 
1071 #endif /* defined(__sun) || defined(_AIX) || defined (__OpenBSD__) ||
1072           defined(__MVS__) || defined(__QNX__) */
1073 }
1074 
1075 
uv_udp_set_multicast_ttl(uv_udp_t * handle,int ttl)1076 int uv_udp_set_multicast_ttl(uv_udp_t* handle, int ttl) {
1077 /*
1078  * On Solaris and derivatives such as SmartOS, the length of socket options
1079  * is sizeof(int) for IPV6_MULTICAST_HOPS and sizeof(char) for
1080  * IP_MULTICAST_TTL, so hardcode the size of the option in the IPv6 case,
1081  * and use the general uv__setsockopt_maybe_char call otherwise.
1082  */
1083 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1084     defined(__MVS__) || defined(__QNX__)
1085   if (handle->flags & UV_HANDLE_IPV6)
1086     return uv__setsockopt(handle,
1087                           IP_MULTICAST_TTL,
1088                           IPV6_MULTICAST_HOPS,
1089                           &ttl,
1090                           sizeof(ttl));
1091 #endif /* defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1092     defined(__MVS__) || defined(__QNX__) */
1093 
1094   return uv__setsockopt_maybe_char(handle,
1095                                    IP_MULTICAST_TTL,
1096                                    IPV6_MULTICAST_HOPS,
1097                                    ttl);
1098 }
1099 
1100 
uv_udp_set_multicast_loop(uv_udp_t * handle,int on)1101 int uv_udp_set_multicast_loop(uv_udp_t* handle, int on) {
1102 /*
1103  * On Solaris and derivatives such as SmartOS, the length of socket options
1104  * is sizeof(int) for IPV6_MULTICAST_LOOP and sizeof(char) for
1105  * IP_MULTICAST_LOOP, so hardcode the size of the option in the IPv6 case,
1106  * and use the general uv__setsockopt_maybe_char call otherwise.
1107  */
1108 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1109     defined(__MVS__) || defined(__QNX__)
1110   if (handle->flags & UV_HANDLE_IPV6)
1111     return uv__setsockopt(handle,
1112                           IP_MULTICAST_LOOP,
1113                           IPV6_MULTICAST_LOOP,
1114                           &on,
1115                           sizeof(on));
1116 #endif /* defined(__sun) || defined(_AIX) ||defined(__OpenBSD__) ||
1117     defined(__MVS__) || defined(__QNX__) */
1118 
1119   return uv__setsockopt_maybe_char(handle,
1120                                    IP_MULTICAST_LOOP,
1121                                    IPV6_MULTICAST_LOOP,
1122                                    on);
1123 }
1124 
uv_udp_set_multicast_interface(uv_udp_t * handle,const char * interface_addr)1125 int uv_udp_set_multicast_interface(uv_udp_t* handle, const char* interface_addr) {
1126   struct sockaddr_storage addr_st;
1127   struct sockaddr_in* addr4;
1128   struct sockaddr_in6* addr6;
1129 
1130   addr4 = (struct sockaddr_in*) &addr_st;
1131   addr6 = (struct sockaddr_in6*) &addr_st;
1132 
1133   if (!interface_addr) {
1134     memset(&addr_st, 0, sizeof addr_st);
1135     if (handle->flags & UV_HANDLE_IPV6) {
1136       addr_st.ss_family = AF_INET6;
1137       addr6->sin6_scope_id = 0;
1138     } else {
1139       addr_st.ss_family = AF_INET;
1140       addr4->sin_addr.s_addr = htonl(INADDR_ANY);
1141     }
1142   } else if (uv_ip4_addr(interface_addr, 0, addr4) == 0) {
1143     /* nothing, address was parsed */
1144   } else if (uv_ip6_addr(interface_addr, 0, addr6) == 0) {
1145     /* nothing, address was parsed */
1146   } else {
1147     return UV_EINVAL;
1148   }
1149 
1150   if (addr_st.ss_family == AF_INET) {
1151     if (setsockopt(handle->io_watcher.fd,
1152                    IPPROTO_IP,
1153                    IP_MULTICAST_IF,
1154                    (void*) &addr4->sin_addr,
1155                    sizeof(addr4->sin_addr)) == -1) {
1156       return UV__ERR(errno);
1157     }
1158   } else if (addr_st.ss_family == AF_INET6) {
1159     if (setsockopt(handle->io_watcher.fd,
1160                    IPPROTO_IPV6,
1161                    IPV6_MULTICAST_IF,
1162                    &addr6->sin6_scope_id,
1163                    sizeof(addr6->sin6_scope_id)) == -1) {
1164       return UV__ERR(errno);
1165     }
1166   } else {
1167     assert(0 && "unexpected address family");
1168     abort();
1169   }
1170 
1171   return 0;
1172 }
1173 
uv_udp_getpeername(const uv_udp_t * handle,struct sockaddr * name,int * namelen)1174 int uv_udp_getpeername(const uv_udp_t* handle,
1175                        struct sockaddr* name,
1176                        int* namelen) {
1177 
1178   return uv__getsockpeername((const uv_handle_t*) handle,
1179                              getpeername,
1180                              name,
1181                              namelen);
1182 }
1183 
uv_udp_getsockname(const uv_udp_t * handle,struct sockaddr * name,int * namelen)1184 int uv_udp_getsockname(const uv_udp_t* handle,
1185                        struct sockaddr* name,
1186                        int* namelen) {
1187 
1188   return uv__getsockpeername((const uv_handle_t*) handle,
1189                              getsockname,
1190                              name,
1191                              namelen);
1192 }
1193 
1194 
uv__udp_recv_start(uv_udp_t * handle,uv_alloc_cb alloc_cb,uv_udp_recv_cb recv_cb)1195 int uv__udp_recv_start(uv_udp_t* handle,
1196                        uv_alloc_cb alloc_cb,
1197                        uv_udp_recv_cb recv_cb) {
1198   int err;
1199 
1200   if (alloc_cb == NULL || recv_cb == NULL)
1201     return UV_EINVAL;
1202 
1203   if (uv__io_active(&handle->io_watcher, POLLIN))
1204     return UV_EALREADY;  /* FIXME(bnoordhuis) Should be UV_EBUSY. */
1205 
1206   err = uv__udp_maybe_deferred_bind(handle, AF_INET, 0);
1207   if (err)
1208     return err;
1209 
1210   handle->alloc_cb = alloc_cb;
1211   handle->recv_cb = recv_cb;
1212 
1213   uv__io_start(handle->loop, &handle->io_watcher, POLLIN);
1214   uv__handle_start(handle);
1215 
1216   return 0;
1217 }
1218 
1219 
uv__udp_recv_stop(uv_udp_t * handle)1220 int uv__udp_recv_stop(uv_udp_t* handle) {
1221   uv__io_stop(handle->loop, &handle->io_watcher, POLLIN);
1222 
1223   if (!uv__io_active(&handle->io_watcher, POLLOUT))
1224     uv__handle_stop(handle);
1225 
1226   handle->alloc_cb = NULL;
1227   handle->recv_cb = NULL;
1228 
1229   return 0;
1230 }
1231 
1232 
uv__udp_prep_pkt(struct msghdr * h,const uv_buf_t * bufs,const unsigned int nbufs,const struct sockaddr * addr)1233 static int uv__udp_prep_pkt(struct msghdr* h,
1234                             const uv_buf_t* bufs,
1235                             const unsigned int nbufs,
1236                             const struct sockaddr* addr) {
1237   memset(h, 0, sizeof(*h));
1238   h->msg_name = (void*) addr;
1239   h->msg_iov = (void*) bufs;
1240   h->msg_iovlen = nbufs;
1241   if (addr == NULL)
1242     return 0;
1243   switch (addr->sa_family) {
1244   case AF_INET:
1245     h->msg_namelen = sizeof(struct sockaddr_in);
1246     return 0;
1247   case AF_INET6:
1248     h->msg_namelen = sizeof(struct sockaddr_in6);
1249     return 0;
1250   case AF_UNIX:
1251     h->msg_namelen = sizeof(struct sockaddr_un);
1252     return 0;
1253   case AF_UNSPEC:
1254     h->msg_name = NULL;
1255     return 0;
1256   }
1257   return UV_EINVAL;
1258 }
1259 
1260 
uv__udp_sendmsg1(int fd,const uv_buf_t * bufs,unsigned int nbufs,const struct sockaddr * addr)1261 static int uv__udp_sendmsg1(int fd,
1262                             const uv_buf_t* bufs,
1263                             unsigned int nbufs,
1264                             const struct sockaddr* addr) {
1265   struct msghdr h;
1266   int r;
1267 
1268   if ((r = uv__udp_prep_pkt(&h, bufs, nbufs, addr)))
1269     return r;
1270 
1271   do
1272     r = sendmsg(fd, &h, 0);
1273   while (r == -1 && errno == EINTR);
1274 
1275   if (r < 0) {
1276     r = UV__ERR(errno);
1277     if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
1278       r = UV_EAGAIN;
1279     return r;
1280   }
1281 
1282   /* UDP sockets don't EOF so we don't have to handle r=0 specially,
1283    * that only happens when the input was a zero-sized buffer.
1284    */
1285   return 1;
1286 }
1287 
1288 
uv__udp_sendmsgv(int fd,unsigned int count,uv_buf_t * bufs[],unsigned int nbufs[],struct sockaddr * addrs[])1289 static int uv__udp_sendmsgv(int fd,
1290                             unsigned int count,
1291                             uv_buf_t* bufs[/*count*/],
1292                             unsigned int nbufs[/*count*/],
1293                             struct sockaddr* addrs[/*count*/]) {
1294   unsigned int i;
1295   int nsent;
1296   int r;
1297 
1298   r = 0;
1299   nsent = 0;
1300 
1301 #if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__)
1302   if (count > 1) {
1303     for (i = 0; i < count; /*empty*/) {
1304       struct mmsghdr m[20];
1305       unsigned int n;
1306 
1307       for (n = 0; i < count && n < ARRAY_SIZE(m); i++, n++)
1308         if ((r = uv__udp_prep_pkt(&m[n].msg_hdr, bufs[i], nbufs[i], addrs[i])))
1309           goto exit;
1310 
1311       do
1312 #if defined(__APPLE__)
1313         r = sendmsg_x(fd, m, n, MSG_DONTWAIT);
1314 #else
1315         r = sendmmsg(fd, m, n, 0);
1316 #endif
1317       while (r == -1 && errno == EINTR);
1318 
1319       if (r < 1)
1320         goto exit;
1321 
1322       nsent += r;
1323       i += r;
1324     }
1325 
1326     goto exit;
1327   }
1328 #endif  /* defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__) */
1329 
1330   for (i = 0; i < count; i++, nsent++)
1331     if ((r = uv__udp_sendmsg1(fd, bufs[i], nbufs[i], addrs[i])))
1332       goto exit;  /* goto to avoid unused label warning. */
1333 
1334 exit:
1335 
1336   if (nsent > 0)
1337     return nsent;
1338 
1339   if (r < 0) {
1340     r = UV__ERR(errno);
1341     if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
1342       r = UV_EAGAIN;
1343   }
1344 
1345   return r;
1346 }
1347 
1348 
uv__udp_sendmsg(uv_udp_t * handle)1349 static void uv__udp_sendmsg(uv_udp_t* handle) {
1350   static const int N = 20;
1351   struct sockaddr* addrs[N];
1352   unsigned int nbufs[N];
1353   uv_buf_t* bufs[N];
1354   struct uv__queue* q;
1355   uv_udp_send_t* req;
1356   int n;
1357 
1358   if (uv__queue_empty(&handle->write_queue))
1359     return;
1360 
1361 again:
1362   n = 0;
1363   q = uv__queue_head(&handle->write_queue);
1364   do {
1365     req = uv__queue_data(q, uv_udp_send_t, queue);
1366     addrs[n] = &req->u.addr;
1367     nbufs[n] = req->nbufs;
1368     bufs[n] = req->bufs;
1369     q = uv__queue_next(q);
1370     n++;
1371   } while (n < N && q != &handle->write_queue);
1372 
1373   n = uv__udp_sendmsgv(handle->io_watcher.fd, n, bufs, nbufs, addrs);
1374   while (n > 0) {
1375     q = uv__queue_head(&handle->write_queue);
1376     req = uv__queue_data(q, uv_udp_send_t, queue);
1377     req->status = uv__count_bufs(req->bufs, req->nbufs);
1378     uv__queue_remove(&req->queue);
1379     uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
1380     n--;
1381   }
1382 
1383   if (n == 0) {
1384     if (uv__queue_empty(&handle->write_queue))
1385       goto feed;
1386     goto again;
1387   }
1388 
1389   if (n == UV_EAGAIN)
1390     return;
1391 
1392   /* Register the error against first request in queue because that
1393    * is the request that uv__udp_sendmsgv tried but failed to send,
1394    * because if it did send any requests, it won't return an error.
1395    */
1396   q = uv__queue_head(&handle->write_queue);
1397   req = uv__queue_data(q, uv_udp_send_t, queue);
1398   req->status = n;
1399   uv__queue_remove(&req->queue);
1400   uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
1401 feed:
1402   uv__io_feed(handle->loop, &handle->io_watcher);
1403 }
1404 
1405 
uv__udp_try_send2(uv_udp_t * handle,unsigned int count,uv_buf_t * bufs[],unsigned int nbufs[],struct sockaddr * addrs[])1406 int uv__udp_try_send2(uv_udp_t* handle,
1407                       unsigned int count,
1408                       uv_buf_t* bufs[/*count*/],
1409                       unsigned int nbufs[/*count*/],
1410                       struct sockaddr* addrs[/*count*/]) {
1411   int fd;
1412 
1413   fd = handle->io_watcher.fd;
1414   if (fd == -1)
1415     return UV_EINVAL;
1416 
1417   return uv__udp_sendmsgv(fd, count, bufs, nbufs, addrs);
1418 }
1419