xref: /libuv/src/win/pipe.c (revision c6d43bea)
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include <assert.h>
23 #include <io.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <string.h>
27 
28 #include "handle-inl.h"
29 #include "internal.h"
30 #include "req-inl.h"
31 #include "stream-inl.h"
32 #include "uv-common.h"
33 #include "uv.h"
34 
35 #include <aclapi.h>
36 #include <accctrl.h>
37 
38 /* A zero-size buffer for use by uv_pipe_read */
39 static char uv_zero_[] = "";
40 
41 /* Null uv_buf_t */
42 static const uv_buf_t uv_null_buf_ = { 0, NULL };
43 
44 /* The timeout that the pipe will wait for the remote end to write data when
45  * the local ends wants to shut it down. */
46 static const int64_t eof_timeout = 50; /* ms */
47 
48 static const int default_pending_pipe_instances = 4;
49 
50 /* Pipe prefix */
51 static char pipe_prefix[] = "\\\\?\\pipe";
52 static const size_t pipe_prefix_len = sizeof(pipe_prefix) - 1;
53 
54 /* IPC incoming xfer queue item. */
55 typedef struct {
56   uv__ipc_socket_xfer_type_t xfer_type;
57   uv__ipc_socket_xfer_info_t xfer_info;
58   struct uv__queue member;
59 } uv__ipc_xfer_queue_item_t;
60 
61 /* IPC frame header flags. */
62 /* clang-format off */
63 enum {
64   UV__IPC_FRAME_HAS_DATA                = 0x01,
65   UV__IPC_FRAME_HAS_SOCKET_XFER         = 0x02,
66   UV__IPC_FRAME_XFER_IS_TCP_CONNECTION  = 0x04,
67   /* These are combinations of the flags above. */
68   UV__IPC_FRAME_XFER_FLAGS              = 0x06,
69   UV__IPC_FRAME_VALID_FLAGS             = 0x07
70 };
71 /* clang-format on */
72 
73 /* IPC frame header. */
74 typedef struct {
75   uint32_t flags;
76   uint32_t reserved1;   /* Ignored. */
77   uint32_t data_length; /* Must be zero if there is no data. */
78   uint32_t reserved2;   /* Must be zero. */
79 } uv__ipc_frame_header_t;
80 
81 /* To implement the IPC protocol correctly, these structures must have exactly
82  * the right size. */
83 STATIC_ASSERT(sizeof(uv__ipc_frame_header_t) == 16);
84 STATIC_ASSERT(sizeof(uv__ipc_socket_xfer_info_t) == 632);
85 
86 /* Coalesced write request. */
87 typedef struct {
88   uv_write_t req;       /* Internal heap-allocated write request. */
89   uv_write_t* user_req; /* Pointer to user-specified uv_write_t. */
90 } uv__coalesced_write_t;
91 
92 
93 static void eof_timer_init(uv_pipe_t* pipe);
94 static void eof_timer_start(uv_pipe_t* pipe);
95 static void eof_timer_stop(uv_pipe_t* pipe);
96 static void eof_timer_cb(uv_timer_t* timer);
97 static void eof_timer_destroy(uv_pipe_t* pipe);
98 static void eof_timer_close_cb(uv_handle_t* handle);
99 
100 
101 /* Does the file path contain embedded nul bytes? */
includes_nul(const char * s,size_t n)102 static int includes_nul(const char *s, size_t n) {
103   if (n == 0)
104     return 0;
105   return NULL != memchr(s, '\0', n);
106 }
107 
108 
uv__unique_pipe_name(unsigned long long ptr,char * name,size_t size)109 static void uv__unique_pipe_name(unsigned long long ptr, char* name, size_t size) {
110   snprintf(name, size, "\\\\?\\pipe\\uv\\%llu-%lu", ptr, GetCurrentProcessId());
111 }
112 
113 
uv_pipe_init(uv_loop_t * loop,uv_pipe_t * handle,int ipc)114 int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
115   uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
116 
117   handle->reqs_pending = 0;
118   handle->handle = INVALID_HANDLE_VALUE;
119   handle->name = NULL;
120   handle->pipe.conn.ipc_remote_pid = 0;
121   handle->pipe.conn.ipc_data_frame.payload_remaining = 0;
122   uv__queue_init(&handle->pipe.conn.ipc_xfer_queue);
123   handle->pipe.conn.ipc_xfer_queue_length = 0;
124   handle->ipc = ipc;
125   handle->pipe.conn.non_overlapped_writes_tail = NULL;
126 
127   return 0;
128 }
129 
130 
uv__pipe_connection_init(uv_pipe_t * handle)131 static void uv__pipe_connection_init(uv_pipe_t* handle) {
132   assert(!(handle->flags & UV_HANDLE_PIPESERVER));
133   uv__connection_init((uv_stream_t*) handle);
134   handle->read_req.data = handle;
135   handle->pipe.conn.eof_timer = NULL;
136 }
137 
138 
open_named_pipe(const WCHAR * name,DWORD * duplex_flags)139 static HANDLE open_named_pipe(const WCHAR* name, DWORD* duplex_flags) {
140   HANDLE pipeHandle;
141 
142   /*
143    * Assume that we have a duplex pipe first, so attempt to
144    * connect with GENERIC_READ | GENERIC_WRITE.
145    */
146   pipeHandle = CreateFileW(name,
147                            GENERIC_READ | GENERIC_WRITE,
148                            0,
149                            NULL,
150                            OPEN_EXISTING,
151                            FILE_FLAG_OVERLAPPED,
152                            NULL);
153   if (pipeHandle != INVALID_HANDLE_VALUE) {
154     *duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
155     return pipeHandle;
156   }
157 
158   /*
159    * If the pipe is not duplex CreateFileW fails with
160    * ERROR_ACCESS_DENIED.  In that case try to connect
161    * as a read-only or write-only.
162    */
163   if (GetLastError() == ERROR_ACCESS_DENIED) {
164     pipeHandle = CreateFileW(name,
165                              GENERIC_READ | FILE_WRITE_ATTRIBUTES,
166                              0,
167                              NULL,
168                              OPEN_EXISTING,
169                              FILE_FLAG_OVERLAPPED,
170                              NULL);
171 
172     if (pipeHandle != INVALID_HANDLE_VALUE) {
173       *duplex_flags = UV_HANDLE_READABLE;
174       return pipeHandle;
175     }
176   }
177 
178   if (GetLastError() == ERROR_ACCESS_DENIED) {
179     pipeHandle = CreateFileW(name,
180                              GENERIC_WRITE | FILE_READ_ATTRIBUTES,
181                              0,
182                              NULL,
183                              OPEN_EXISTING,
184                              FILE_FLAG_OVERLAPPED,
185                              NULL);
186 
187     if (pipeHandle != INVALID_HANDLE_VALUE) {
188       *duplex_flags = UV_HANDLE_WRITABLE;
189       return pipeHandle;
190     }
191   }
192 
193   return INVALID_HANDLE_VALUE;
194 }
195 
196 
close_pipe(uv_pipe_t * pipe)197 static void close_pipe(uv_pipe_t* pipe) {
198   assert(pipe->u.fd == -1 || pipe->u.fd > 2);
199   if (pipe->u.fd == -1)
200     CloseHandle(pipe->handle);
201   else
202     _close(pipe->u.fd);
203 
204   pipe->u.fd = -1;
205   pipe->handle = INVALID_HANDLE_VALUE;
206 }
207 
208 
uv__pipe_server(HANDLE * pipeHandle_ptr,DWORD access,char * name,size_t nameSize,unsigned long long random)209 static int uv__pipe_server(
210     HANDLE* pipeHandle_ptr, DWORD access,
211     char* name, size_t nameSize, unsigned long long random) {
212   HANDLE pipeHandle;
213   int err;
214 
215   for (;;) {
216     uv__unique_pipe_name(random, name, nameSize);
217 
218     pipeHandle = CreateNamedPipeA(name,
219       access | FILE_FLAG_FIRST_PIPE_INSTANCE,
220       PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0,
221       NULL);
222 
223     if (pipeHandle != INVALID_HANDLE_VALUE) {
224       /* No name collisions.  We're done. */
225       break;
226     }
227 
228     err = GetLastError();
229     if (err != ERROR_PIPE_BUSY && err != ERROR_ACCESS_DENIED) {
230       goto error;
231     }
232 
233     /* Pipe name collision.  Increment the random number and try again. */
234     random++;
235   }
236 
237   *pipeHandle_ptr = pipeHandle;
238 
239   return 0;
240 
241  error:
242   if (pipeHandle != INVALID_HANDLE_VALUE)
243     CloseHandle(pipeHandle);
244 
245   return err;
246 }
247 
248 
uv__create_pipe_pair(HANDLE * server_pipe_ptr,HANDLE * client_pipe_ptr,unsigned int server_flags,unsigned int client_flags,int inherit_client,unsigned long long random)249 static int uv__create_pipe_pair(
250     HANDLE* server_pipe_ptr, HANDLE* client_pipe_ptr,
251     unsigned int server_flags, unsigned int client_flags,
252     int inherit_client, unsigned long long random) {
253   /* allowed flags are: UV_READABLE_PIPE | UV_WRITABLE_PIPE | UV_NONBLOCK_PIPE */
254   char pipe_name[64];
255   SECURITY_ATTRIBUTES sa;
256   DWORD server_access;
257   DWORD client_access;
258   HANDLE server_pipe;
259   HANDLE client_pipe;
260   int err;
261 
262   server_pipe = INVALID_HANDLE_VALUE;
263   client_pipe = INVALID_HANDLE_VALUE;
264 
265   server_access = 0;
266   if (server_flags & UV_READABLE_PIPE)
267     server_access |= PIPE_ACCESS_INBOUND;
268   if (server_flags & UV_WRITABLE_PIPE)
269     server_access |= PIPE_ACCESS_OUTBOUND;
270   if (server_flags & UV_NONBLOCK_PIPE)
271     server_access |= FILE_FLAG_OVERLAPPED;
272   server_access |= WRITE_DAC;
273 
274   client_access = 0;
275   if (client_flags & UV_READABLE_PIPE)
276     client_access |= GENERIC_READ;
277   else
278     client_access |= FILE_READ_ATTRIBUTES;
279   if (client_flags & UV_WRITABLE_PIPE)
280     client_access |= GENERIC_WRITE;
281   else
282     client_access |= FILE_WRITE_ATTRIBUTES;
283   client_access |= WRITE_DAC;
284 
285   /* Create server pipe handle. */
286   err = uv__pipe_server(&server_pipe,
287                         server_access,
288                         pipe_name,
289                         sizeof(pipe_name),
290                         random);
291   if (err)
292     goto error;
293 
294   /* Create client pipe handle. */
295   sa.nLength = sizeof sa;
296   sa.lpSecurityDescriptor = NULL;
297   sa.bInheritHandle = inherit_client;
298 
299   client_pipe = CreateFileA(pipe_name,
300                             client_access,
301                             0,
302                             &sa,
303                             OPEN_EXISTING,
304                             (client_flags & UV_NONBLOCK_PIPE) ? FILE_FLAG_OVERLAPPED : 0,
305                             NULL);
306   if (client_pipe == INVALID_HANDLE_VALUE) {
307     err = GetLastError();
308     goto error;
309   }
310 
311 #ifndef NDEBUG
312   /* Validate that the pipe was opened in the right mode. */
313   {
314     DWORD mode;
315     BOOL r;
316     r = GetNamedPipeHandleState(client_pipe, &mode, NULL, NULL, NULL, NULL, 0);
317     if (r == TRUE) {
318       assert(mode == (PIPE_READMODE_BYTE | PIPE_WAIT));
319     } else {
320       fprintf(stderr, "libuv assertion failure: GetNamedPipeHandleState failed\n");
321     }
322   }
323 #endif
324 
325   /* Do a blocking ConnectNamedPipe.  This should not block because we have
326    * both ends of the pipe created. */
327   if (!ConnectNamedPipe(server_pipe, NULL)) {
328     if (GetLastError() != ERROR_PIPE_CONNECTED) {
329       err = GetLastError();
330       goto error;
331     }
332   }
333 
334   *client_pipe_ptr = client_pipe;
335   *server_pipe_ptr = server_pipe;
336   return 0;
337 
338  error:
339   if (server_pipe != INVALID_HANDLE_VALUE)
340     CloseHandle(server_pipe);
341 
342   if (client_pipe != INVALID_HANDLE_VALUE)
343     CloseHandle(client_pipe);
344 
345   return err;
346 }
347 
348 
uv_pipe(uv_file fds[2],int read_flags,int write_flags)349 int uv_pipe(uv_file fds[2], int read_flags, int write_flags) {
350   uv_file temp[2];
351   int err;
352   HANDLE readh;
353   HANDLE writeh;
354 
355   /* Make the server side the inbound (read) end, */
356   /* so that both ends will have FILE_READ_ATTRIBUTES permission. */
357   /* TODO: better source of local randomness than &fds? */
358   read_flags |= UV_READABLE_PIPE;
359   write_flags |= UV_WRITABLE_PIPE;
360   err = uv__create_pipe_pair(&readh,
361                              &writeh,
362                              read_flags,
363                              write_flags,
364                              0,
365                              (uintptr_t) &fds[0]);
366   if (err != 0)
367     return err;
368   temp[0] = _open_osfhandle((intptr_t) readh, 0);
369   if (temp[0] == -1) {
370     if (errno == UV_EMFILE)
371       err = UV_EMFILE;
372     else
373       err = UV_UNKNOWN;
374     CloseHandle(readh);
375     CloseHandle(writeh);
376     return err;
377   }
378   temp[1] = _open_osfhandle((intptr_t) writeh, 0);
379   if (temp[1] == -1) {
380     if (errno == UV_EMFILE)
381       err = UV_EMFILE;
382     else
383       err = UV_UNKNOWN;
384     _close(temp[0]);
385     CloseHandle(writeh);
386     return err;
387   }
388   fds[0] = temp[0];
389   fds[1] = temp[1];
390   return 0;
391 }
392 
393 
uv__create_stdio_pipe_pair(uv_loop_t * loop,uv_pipe_t * parent_pipe,HANDLE * child_pipe_ptr,unsigned int flags)394 int uv__create_stdio_pipe_pair(uv_loop_t* loop,
395     uv_pipe_t* parent_pipe, HANDLE* child_pipe_ptr, unsigned int flags) {
396   /* The parent_pipe is always the server_pipe and kept by libuv.
397    * The child_pipe is always the client_pipe and is passed to the child.
398    * The flags are specified with respect to their usage in the child. */
399   HANDLE server_pipe;
400   HANDLE client_pipe;
401   unsigned int server_flags;
402   unsigned int client_flags;
403   int err;
404 
405   uv__pipe_connection_init(parent_pipe);
406 
407   server_pipe = INVALID_HANDLE_VALUE;
408   client_pipe = INVALID_HANDLE_VALUE;
409 
410   server_flags = 0;
411   client_flags = 0;
412   if (flags & UV_READABLE_PIPE) {
413     /* The server needs inbound (read) access too, otherwise CreateNamedPipe()
414      * won't give us the FILE_READ_ATTRIBUTES permission. We need that to probe
415      * the state of the write buffer when we're trying to shutdown the pipe. */
416     server_flags |= UV_READABLE_PIPE | UV_WRITABLE_PIPE;
417     client_flags |= UV_READABLE_PIPE;
418   }
419   if (flags & UV_WRITABLE_PIPE) {
420     server_flags |= UV_READABLE_PIPE;
421     client_flags |= UV_WRITABLE_PIPE;
422   }
423   server_flags |= UV_NONBLOCK_PIPE;
424   if (flags & UV_NONBLOCK_PIPE || parent_pipe->ipc) {
425     client_flags |= UV_NONBLOCK_PIPE;
426   }
427 
428   err = uv__create_pipe_pair(&server_pipe, &client_pipe,
429           server_flags, client_flags, 1, (uintptr_t) server_pipe);
430   if (err)
431     goto error;
432 
433   if (CreateIoCompletionPort(server_pipe,
434                              loop->iocp,
435                              (ULONG_PTR) parent_pipe,
436                              0) == NULL) {
437     err = GetLastError();
438     goto error;
439   }
440 
441   parent_pipe->handle = server_pipe;
442   *child_pipe_ptr = client_pipe;
443 
444   /* The server end is now readable and/or writable. */
445   if (flags & UV_READABLE_PIPE)
446     parent_pipe->flags |= UV_HANDLE_WRITABLE;
447   if (flags & UV_WRITABLE_PIPE)
448     parent_pipe->flags |= UV_HANDLE_READABLE;
449 
450   return 0;
451 
452  error:
453   if (server_pipe != INVALID_HANDLE_VALUE)
454     CloseHandle(server_pipe);
455 
456   if (client_pipe != INVALID_HANDLE_VALUE)
457     CloseHandle(client_pipe);
458 
459   return err;
460 }
461 
462 
uv__set_pipe_handle(uv_loop_t * loop,uv_pipe_t * handle,HANDLE pipeHandle,int fd,DWORD duplex_flags)463 static int uv__set_pipe_handle(uv_loop_t* loop,
464                                uv_pipe_t* handle,
465                                HANDLE pipeHandle,
466                                int fd,
467                                DWORD duplex_flags) {
468   NTSTATUS nt_status;
469   IO_STATUS_BLOCK io_status;
470   FILE_MODE_INFORMATION mode_info;
471   DWORD mode = PIPE_READMODE_BYTE | PIPE_WAIT;
472   DWORD current_mode = 0;
473   DWORD err = 0;
474 
475   assert(handle->flags & UV_HANDLE_CONNECTION);
476   assert(!(handle->flags & UV_HANDLE_PIPESERVER));
477   if (handle->flags & UV_HANDLE_CLOSING)
478     return UV_EINVAL;
479   if (handle->handle != INVALID_HANDLE_VALUE)
480     return UV_EBUSY;
481 
482   if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
483     err = GetLastError();
484     if (err == ERROR_ACCESS_DENIED) {
485       /*
486        * SetNamedPipeHandleState can fail if the handle doesn't have either
487        * GENERIC_WRITE  or FILE_WRITE_ATTRIBUTES.
488        * But if the handle already has the desired wait and blocking modes
489        * we can continue.
490        */
491       if (!GetNamedPipeHandleState(pipeHandle, &current_mode, NULL, NULL,
492                                    NULL, NULL, 0)) {
493         return uv_translate_sys_error(GetLastError());
494       } else if (current_mode & PIPE_NOWAIT) {
495         return UV_EACCES;
496       }
497     } else {
498       /* If this returns ERROR_INVALID_PARAMETER we probably opened
499        * something that is not a pipe. */
500       if (err == ERROR_INVALID_PARAMETER) {
501         return UV_ENOTSOCK;
502       }
503       return uv_translate_sys_error(err);
504     }
505   }
506 
507   /* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */
508   nt_status = pNtQueryInformationFile(pipeHandle,
509                                       &io_status,
510                                       &mode_info,
511                                       sizeof(mode_info),
512                                       FileModeInformation);
513   if (nt_status != STATUS_SUCCESS) {
514     return uv_translate_sys_error(err);
515   }
516 
517   if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT ||
518       mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) {
519     /* Non-overlapped pipe. */
520     handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE;
521     handle->pipe.conn.readfile_thread_handle = NULL;
522     InitializeCriticalSection(&handle->pipe.conn.readfile_thread_lock);
523   } else {
524     /* Overlapped pipe.  Try to associate with IOCP. */
525     if (CreateIoCompletionPort(pipeHandle,
526                                loop->iocp,
527                                (ULONG_PTR) handle,
528                                0) == NULL) {
529       handle->flags |= UV_HANDLE_EMULATE_IOCP;
530     }
531   }
532 
533   handle->handle = pipeHandle;
534   handle->u.fd = fd;
535   handle->flags |= duplex_flags;
536 
537   return 0;
538 }
539 
540 
pipe_alloc_accept(uv_loop_t * loop,uv_pipe_t * handle,uv_pipe_accept_t * req,BOOL firstInstance)541 static int pipe_alloc_accept(uv_loop_t* loop, uv_pipe_t* handle,
542                              uv_pipe_accept_t* req, BOOL firstInstance) {
543   assert(req->pipeHandle == INVALID_HANDLE_VALUE);
544 
545   req->pipeHandle =
546       CreateNamedPipeW(handle->name,
547                        PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | WRITE_DAC |
548                          (firstInstance ? FILE_FLAG_FIRST_PIPE_INSTANCE : 0),
549                        PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
550                        PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
551 
552   if (req->pipeHandle == INVALID_HANDLE_VALUE) {
553     return 0;
554   }
555 
556   /* Associate it with IOCP so we can get events. */
557   if (CreateIoCompletionPort(req->pipeHandle,
558                              loop->iocp,
559                              (ULONG_PTR) handle,
560                              0) == NULL) {
561     uv_fatal_error(GetLastError(), "CreateIoCompletionPort");
562   }
563 
564   /* Stash a handle in the server object for use from places such as
565    * getsockname and chmod. As we transfer ownership of these to client
566    * objects, we'll allocate new ones here. */
567   handle->handle = req->pipeHandle;
568 
569   return 1;
570 }
571 
572 
pipe_shutdown_thread_proc(void * parameter)573 static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) {
574   uv_loop_t* loop;
575   uv_pipe_t* handle;
576   uv_shutdown_t* req;
577 
578   req = (uv_shutdown_t*) parameter;
579   assert(req);
580   handle = (uv_pipe_t*) req->handle;
581   assert(handle);
582   loop = handle->loop;
583   assert(loop);
584 
585   FlushFileBuffers(handle->handle);
586 
587   /* Post completed */
588   POST_COMPLETION_FOR_REQ(loop, req);
589 
590   return 0;
591 }
592 
593 
uv__pipe_shutdown(uv_loop_t * loop,uv_pipe_t * handle,uv_shutdown_t * req)594 void uv__pipe_shutdown(uv_loop_t* loop, uv_pipe_t* handle, uv_shutdown_t *req) {
595   DWORD result;
596   NTSTATUS nt_status;
597   IO_STATUS_BLOCK io_status;
598   FILE_PIPE_LOCAL_INFORMATION pipe_info;
599 
600   assert(handle->flags & UV_HANDLE_CONNECTION);
601   assert(req != NULL);
602   assert(handle->stream.conn.write_reqs_pending == 0);
603   SET_REQ_SUCCESS(req);
604 
605   if (handle->flags & UV_HANDLE_CLOSING) {
606     uv__insert_pending_req(loop, (uv_req_t*) req);
607     return;
608   }
609 
610   /* Try to avoid flushing the pipe buffer in the thread pool. */
611   nt_status = pNtQueryInformationFile(handle->handle,
612                                       &io_status,
613                                       &pipe_info,
614                                       sizeof pipe_info,
615                                       FilePipeLocalInformation);
616 
617   if (nt_status != STATUS_SUCCESS) {
618     SET_REQ_ERROR(req, pRtlNtStatusToDosError(nt_status));
619     handle->flags |= UV_HANDLE_WRITABLE; /* Questionable. */
620     uv__insert_pending_req(loop, (uv_req_t*) req);
621     return;
622   }
623 
624   if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
625     /* Short-circuit, no need to call FlushFileBuffers:
626      * all writes have been read. */
627     uv__insert_pending_req(loop, (uv_req_t*) req);
628     return;
629   }
630 
631   /* Run FlushFileBuffers in the thread pool. */
632   result = QueueUserWorkItem(pipe_shutdown_thread_proc,
633                              req,
634                              WT_EXECUTELONGFUNCTION);
635   if (!result) {
636     SET_REQ_ERROR(req, GetLastError());
637     handle->flags |= UV_HANDLE_WRITABLE; /* Questionable. */
638     uv__insert_pending_req(loop, (uv_req_t*) req);
639     return;
640   }
641 }
642 
643 
uv__pipe_endgame(uv_loop_t * loop,uv_pipe_t * handle)644 void uv__pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
645   uv__ipc_xfer_queue_item_t* xfer_queue_item;
646 
647   assert(handle->reqs_pending == 0);
648   assert(handle->flags & UV_HANDLE_CLOSING);
649   assert(!(handle->flags & UV_HANDLE_CLOSED));
650 
651   if (handle->flags & UV_HANDLE_CONNECTION) {
652     /* Free pending sockets */
653     while (!uv__queue_empty(&handle->pipe.conn.ipc_xfer_queue)) {
654       struct uv__queue* q;
655       SOCKET socket;
656 
657       q = uv__queue_head(&handle->pipe.conn.ipc_xfer_queue);
658       uv__queue_remove(q);
659       xfer_queue_item = uv__queue_data(q, uv__ipc_xfer_queue_item_t, member);
660 
661       /* Materialize socket and close it */
662       socket = WSASocketW(FROM_PROTOCOL_INFO,
663                           FROM_PROTOCOL_INFO,
664                           FROM_PROTOCOL_INFO,
665                           &xfer_queue_item->xfer_info.socket_info,
666                           0,
667                           WSA_FLAG_OVERLAPPED);
668       uv__free(xfer_queue_item);
669 
670       if (socket != INVALID_SOCKET)
671         closesocket(socket);
672     }
673     handle->pipe.conn.ipc_xfer_queue_length = 0;
674 
675     assert(handle->read_req.wait_handle == INVALID_HANDLE_VALUE);
676     if (handle->read_req.event_handle != NULL) {
677       CloseHandle(handle->read_req.event_handle);
678       handle->read_req.event_handle = NULL;
679     }
680 
681     if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)
682       DeleteCriticalSection(&handle->pipe.conn.readfile_thread_lock);
683   }
684 
685   if (handle->flags & UV_HANDLE_PIPESERVER) {
686     assert(handle->pipe.serv.accept_reqs);
687     uv__free(handle->pipe.serv.accept_reqs);
688     handle->pipe.serv.accept_reqs = NULL;
689   }
690 
691   uv__handle_close(handle);
692 }
693 
694 
uv_pipe_pending_instances(uv_pipe_t * handle,int count)695 void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
696   if (handle->flags & UV_HANDLE_BOUND)
697     return;
698   handle->pipe.serv.pending_instances = count;
699   handle->flags |= UV_HANDLE_PIPESERVER;
700 }
701 
702 
703 /* Creates a pipe server. */
uv_pipe_bind(uv_pipe_t * handle,const char * name)704 int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
705   return uv_pipe_bind2(handle, name, strlen(name), 0);
706 }
707 
708 
uv_pipe_bind2(uv_pipe_t * handle,const char * name,size_t namelen,unsigned int flags)709 int uv_pipe_bind2(uv_pipe_t* handle,
710                   const char* name,
711                   size_t namelen,
712                   unsigned int flags) {
713   uv_loop_t* loop = handle->loop;
714   int i, err;
715   uv_pipe_accept_t* req;
716   char* name_copy;
717 
718   if (flags & ~UV_PIPE_NO_TRUNCATE) {
719     return UV_EINVAL;
720   }
721 
722   if (name == NULL) {
723     return UV_EINVAL;
724   }
725 
726   if (namelen == 0) {
727     return UV_EINVAL;
728   }
729 
730   if (includes_nul(name, namelen)) {
731     return UV_EINVAL;
732   }
733 
734   if (handle->flags & UV_HANDLE_BOUND) {
735     return UV_EINVAL;
736   }
737 
738   if (uv__is_closing(handle)) {
739     return UV_EINVAL;
740   }
741 
742   name_copy = uv__malloc(namelen + 1);
743   if (name_copy == NULL) {
744     return UV_ENOMEM;
745   }
746 
747   memcpy(name_copy, name, namelen);
748   name_copy[namelen] = '\0';
749 
750   if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
751     handle->pipe.serv.pending_instances = default_pending_pipe_instances;
752   }
753 
754   err = UV_ENOMEM;
755   handle->pipe.serv.accept_reqs = (uv_pipe_accept_t*)
756     uv__malloc(sizeof(uv_pipe_accept_t) * handle->pipe.serv.pending_instances);
757   if (handle->pipe.serv.accept_reqs == NULL) {
758     goto error;
759   }
760 
761   for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
762     req = &handle->pipe.serv.accept_reqs[i];
763     UV_REQ_INIT(req, UV_ACCEPT);
764     req->data = handle;
765     req->pipeHandle = INVALID_HANDLE_VALUE;
766     req->next_pending = NULL;
767   }
768 
769   /* TODO(bnoordhuis) Add converters that take a |length| parameter. */
770   err = uv__convert_utf8_to_utf16(name_copy, &handle->name);
771   uv__free(name_copy);
772   name_copy = NULL;
773 
774   if (err) {
775     goto error;
776   }
777 
778   /*
779    * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE.
780    * If this fails then there's already a pipe server for the given pipe name.
781    */
782   if (!pipe_alloc_accept(loop,
783                          handle,
784                          &handle->pipe.serv.accept_reqs[0],
785                          TRUE)) {
786     err = GetLastError();
787     if (err == ERROR_ACCESS_DENIED) {
788       err = UV_EADDRINUSE;
789     } else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) {
790       err = UV_EACCES;
791     } else {
792       err = uv_translate_sys_error(err);
793     }
794     goto error;
795   }
796 
797   handle->pipe.serv.pending_accepts = NULL;
798   handle->flags |= UV_HANDLE_PIPESERVER;
799   handle->flags |= UV_HANDLE_BOUND;
800 
801   return 0;
802 
803 error:
804   uv__free(handle->pipe.serv.accept_reqs);
805   uv__free(handle->name);
806   uv__free(name_copy);
807   handle->pipe.serv.accept_reqs = NULL;
808   handle->name = NULL;
809 
810   return err;
811 }
812 
813 
pipe_connect_thread_proc(void * parameter)814 static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
815   uv_loop_t* loop;
816   uv_pipe_t* handle;
817   uv_connect_t* req;
818   HANDLE pipeHandle = INVALID_HANDLE_VALUE;
819   DWORD duplex_flags;
820 
821   req = (uv_connect_t*) parameter;
822   assert(req);
823   handle = (uv_pipe_t*) req->handle;
824   assert(handle);
825   loop = handle->loop;
826   assert(loop);
827 
828   /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. We wait
829    * up to 30 seconds for the pipe to become available with WaitNamedPipe. */
830   while (WaitNamedPipeW(req->u.connect.name, 30000)) {
831     /* The pipe is now available, try to connect. */
832     pipeHandle = open_named_pipe(req->u.connect.name, &duplex_flags);
833     if (pipeHandle != INVALID_HANDLE_VALUE)
834       break;
835 
836     SwitchToThread();
837   }
838 
839   uv__free(req->u.connect.name);
840   req->u.connect.name = NULL;
841   if (pipeHandle != INVALID_HANDLE_VALUE) {
842     SET_REQ_SUCCESS(req);
843     req->u.connect.pipeHandle = pipeHandle;
844     req->u.connect.duplex_flags = duplex_flags;
845   } else {
846     SET_REQ_ERROR(req, GetLastError());
847   }
848 
849   /* Post completed */
850   POST_COMPLETION_FOR_REQ(loop, req);
851 
852   return 0;
853 }
854 
855 
uv_pipe_connect(uv_connect_t * req,uv_pipe_t * handle,const char * name,uv_connect_cb cb)856 void uv_pipe_connect(uv_connect_t* req,
857                     uv_pipe_t* handle,
858                     const char* name,
859                     uv_connect_cb cb) {
860   uv_loop_t* loop;
861   int err;
862 
863   err = uv_pipe_connect2(req, handle, name, strlen(name), 0, cb);
864 
865   if (err) {
866     loop = handle->loop;
867     /* Make this req pending reporting an error. */
868     SET_REQ_ERROR(req, err);
869     uv__insert_pending_req(loop, (uv_req_t*) req);
870     handle->reqs_pending++;
871     REGISTER_HANDLE_REQ(loop, handle);
872   }
873 }
874 
875 
uv_pipe_connect2(uv_connect_t * req,uv_pipe_t * handle,const char * name,size_t namelen,unsigned int flags,uv_connect_cb cb)876 int uv_pipe_connect2(uv_connect_t* req,
877                      uv_pipe_t* handle,
878                      const char* name,
879                      size_t namelen,
880                      unsigned int flags,
881                      uv_connect_cb cb) {
882   uv_loop_t* loop;
883   int err;
884   size_t nameSize;
885   HANDLE pipeHandle = INVALID_HANDLE_VALUE;
886   DWORD duplex_flags;
887   char* name_copy;
888 
889   loop = handle->loop;
890   UV_REQ_INIT(req, UV_CONNECT);
891   req->handle = (uv_stream_t*) handle;
892   req->cb = cb;
893   req->u.connect.pipeHandle = INVALID_HANDLE_VALUE;
894   req->u.connect.duplex_flags = 0;
895   req->u.connect.name = NULL;
896 
897   if (flags & ~UV_PIPE_NO_TRUNCATE) {
898     return UV_EINVAL;
899   }
900 
901   if (name == NULL) {
902     return UV_EINVAL;
903   }
904 
905   if (namelen == 0) {
906     return UV_EINVAL;
907   }
908 
909   if (includes_nul(name, namelen)) {
910     return UV_EINVAL;
911   }
912 
913   name_copy = uv__malloc(namelen + 1);
914   if (name_copy == NULL) {
915     return UV_ENOMEM;
916   }
917 
918   memcpy(name_copy, name, namelen);
919   name_copy[namelen] = '\0';
920 
921   if (handle->flags & UV_HANDLE_PIPESERVER) {
922     err = ERROR_INVALID_PARAMETER;
923     goto error;
924   }
925   if (handle->flags & UV_HANDLE_CONNECTION) {
926     err = ERROR_PIPE_BUSY;
927     goto error;
928   }
929   uv__pipe_connection_init(handle);
930 
931   /* TODO(bnoordhuis) Add converters that take a |length| parameter. */
932   err = uv__convert_utf8_to_utf16(name_copy, &handle->name);
933   uv__free(name_copy);
934   name_copy = NULL;
935 
936   if (err) {
937     err = ERROR_NO_UNICODE_TRANSLATION;
938     goto error;
939   }
940 
941   pipeHandle = open_named_pipe(handle->name, &duplex_flags);
942   if (pipeHandle == INVALID_HANDLE_VALUE) {
943     if (GetLastError() == ERROR_PIPE_BUSY) {
944       nameSize = (wcslen(handle->name) + 1) * sizeof(WCHAR);
945       req->u.connect.name = uv__malloc(nameSize);
946       if (!req->u.connect.name) {
947         uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
948       }
949 
950       memcpy(req->u.connect.name, handle->name, nameSize);
951 
952       /* Wait for the server to make a pipe instance available. */
953       if (!QueueUserWorkItem(&pipe_connect_thread_proc,
954                              req,
955                              WT_EXECUTELONGFUNCTION)) {
956         uv__free(req->u.connect.name);
957         req->u.connect.name = NULL;
958         err = GetLastError();
959         goto error;
960       }
961 
962       REGISTER_HANDLE_REQ(loop, handle);
963       handle->reqs_pending++;
964 
965       return 0;
966     }
967 
968     err = GetLastError();
969     goto error;
970   }
971 
972   req->u.connect.pipeHandle = pipeHandle;
973   req->u.connect.duplex_flags = duplex_flags;
974   SET_REQ_SUCCESS(req);
975   uv__insert_pending_req(loop, (uv_req_t*) req);
976   handle->reqs_pending++;
977   REGISTER_HANDLE_REQ(loop, handle);
978   return 0;
979 
980 error:
981   uv__free(name_copy);
982 
983   if (handle->name) {
984     uv__free(handle->name);
985     handle->name = NULL;
986   }
987 
988   if (pipeHandle != INVALID_HANDLE_VALUE)
989     CloseHandle(pipeHandle);
990 
991   /* Make this req pending reporting an error. */
992   SET_REQ_ERROR(req, err);
993   uv__insert_pending_req(loop, (uv_req_t*) req);
994   handle->reqs_pending++;
995   REGISTER_HANDLE_REQ(loop, handle);
996   return 0;
997 }
998 
999 
uv__pipe_interrupt_read(uv_pipe_t * handle)1000 void uv__pipe_interrupt_read(uv_pipe_t* handle) {
1001   BOOL r;
1002 
1003   if (!(handle->flags & UV_HANDLE_READ_PENDING))
1004     return; /* No pending reads. */
1005   if (handle->flags & UV_HANDLE_CANCELLATION_PENDING)
1006     return; /* Already cancelled. */
1007   if (handle->handle == INVALID_HANDLE_VALUE)
1008     return; /* Pipe handle closed. */
1009 
1010   if (!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)) {
1011     /* Cancel asynchronous read. */
1012     r = CancelIoEx(handle->handle, &handle->read_req.u.io.overlapped);
1013     assert(r || GetLastError() == ERROR_NOT_FOUND);
1014     (void) r;
1015   } else {
1016     /* Cancel synchronous read (which is happening in the thread pool). */
1017     HANDLE thread;
1018     volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle;
1019 
1020     EnterCriticalSection(&handle->pipe.conn.readfile_thread_lock);
1021 
1022     thread = *thread_ptr;
1023     if (thread == NULL) {
1024       /* The thread pool thread has not yet reached the point of blocking, we
1025        * can pre-empt it by setting thread_handle to INVALID_HANDLE_VALUE. */
1026       *thread_ptr = INVALID_HANDLE_VALUE;
1027 
1028     } else {
1029       /* Spin until the thread has acknowledged (by setting the thread to
1030        * INVALID_HANDLE_VALUE) that it is past the point of blocking. */
1031       while (thread != INVALID_HANDLE_VALUE) {
1032         r = CancelSynchronousIo(thread);
1033         assert(r || GetLastError() == ERROR_NOT_FOUND);
1034         SwitchToThread(); /* Yield thread. */
1035         thread = *thread_ptr;
1036       }
1037     }
1038 
1039     LeaveCriticalSection(&handle->pipe.conn.readfile_thread_lock);
1040   }
1041 
1042   /* Set flag to indicate that read has been cancelled. */
1043   handle->flags |= UV_HANDLE_CANCELLATION_PENDING;
1044 }
1045 
1046 
uv__pipe_read_stop(uv_pipe_t * handle)1047 void uv__pipe_read_stop(uv_pipe_t* handle) {
1048   handle->flags &= ~UV_HANDLE_READING;
1049   DECREASE_ACTIVE_COUNT(handle->loop, handle);
1050   uv__pipe_interrupt_read(handle);
1051 }
1052 
1053 
1054 /* Cleans up uv_pipe_t (server or connection) and all resources associated with
1055  * it. */
uv__pipe_close(uv_loop_t * loop,uv_pipe_t * handle)1056 void uv__pipe_close(uv_loop_t* loop, uv_pipe_t* handle) {
1057   int i;
1058   HANDLE pipeHandle;
1059 
1060   if (handle->flags & UV_HANDLE_READING) {
1061     handle->flags &= ~UV_HANDLE_READING;
1062     DECREASE_ACTIVE_COUNT(loop, handle);
1063   }
1064 
1065   if (handle->flags & UV_HANDLE_LISTENING) {
1066     handle->flags &= ~UV_HANDLE_LISTENING;
1067     DECREASE_ACTIVE_COUNT(loop, handle);
1068   }
1069 
1070   handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1071 
1072   uv__handle_closing(handle);
1073 
1074   uv__pipe_interrupt_read(handle);
1075 
1076   if (handle->name) {
1077     uv__free(handle->name);
1078     handle->name = NULL;
1079   }
1080 
1081   if (handle->flags & UV_HANDLE_PIPESERVER) {
1082     for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
1083       pipeHandle = handle->pipe.serv.accept_reqs[i].pipeHandle;
1084       if (pipeHandle != INVALID_HANDLE_VALUE) {
1085         CloseHandle(pipeHandle);
1086         handle->pipe.serv.accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE;
1087       }
1088     }
1089     handle->handle = INVALID_HANDLE_VALUE;
1090   }
1091 
1092   if (handle->flags & UV_HANDLE_CONNECTION) {
1093     eof_timer_destroy(handle);
1094   }
1095 
1096   if ((handle->flags & UV_HANDLE_CONNECTION)
1097       && handle->handle != INVALID_HANDLE_VALUE) {
1098     /* This will eventually destroy the write queue for us too. */
1099     close_pipe(handle);
1100   }
1101 
1102   if (handle->reqs_pending == 0)
1103     uv__want_endgame(loop, (uv_handle_t*) handle);
1104 }
1105 
1106 
uv__pipe_queue_accept(uv_loop_t * loop,uv_pipe_t * handle,uv_pipe_accept_t * req,BOOL firstInstance)1107 static void uv__pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
1108     uv_pipe_accept_t* req, BOOL firstInstance) {
1109   assert(handle->flags & UV_HANDLE_LISTENING);
1110 
1111   if (!firstInstance && !pipe_alloc_accept(loop, handle, req, FALSE)) {
1112     SET_REQ_ERROR(req, GetLastError());
1113     uv__insert_pending_req(loop, (uv_req_t*) req);
1114     handle->reqs_pending++;
1115     return;
1116   }
1117 
1118   assert(req->pipeHandle != INVALID_HANDLE_VALUE);
1119 
1120   /* Prepare the overlapped structure. */
1121   memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
1122 
1123   if (!ConnectNamedPipe(req->pipeHandle, &req->u.io.overlapped) &&
1124       GetLastError() != ERROR_IO_PENDING) {
1125     if (GetLastError() == ERROR_PIPE_CONNECTED) {
1126       SET_REQ_SUCCESS(req);
1127     } else {
1128       CloseHandle(req->pipeHandle);
1129       req->pipeHandle = INVALID_HANDLE_VALUE;
1130       /* Make this req pending reporting an error. */
1131       SET_REQ_ERROR(req, GetLastError());
1132     }
1133     uv__insert_pending_req(loop, (uv_req_t*) req);
1134     handle->reqs_pending++;
1135     return;
1136   }
1137 
1138   /* Wait for completion via IOCP */
1139   handle->reqs_pending++;
1140 }
1141 
1142 
uv__pipe_accept(uv_pipe_t * server,uv_stream_t * client)1143 int uv__pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
1144   uv_loop_t* loop = server->loop;
1145   uv_pipe_t* pipe_client;
1146   uv_pipe_accept_t* req;
1147   struct uv__queue* q;
1148   uv__ipc_xfer_queue_item_t* item;
1149   int err;
1150 
1151   if (server->ipc) {
1152     if (uv__queue_empty(&server->pipe.conn.ipc_xfer_queue)) {
1153       /* No valid pending sockets. */
1154       return WSAEWOULDBLOCK;
1155     }
1156 
1157     q = uv__queue_head(&server->pipe.conn.ipc_xfer_queue);
1158     uv__queue_remove(q);
1159     server->pipe.conn.ipc_xfer_queue_length--;
1160     item = uv__queue_data(q, uv__ipc_xfer_queue_item_t, member);
1161 
1162     err = uv__tcp_xfer_import(
1163         (uv_tcp_t*) client, item->xfer_type, &item->xfer_info);
1164 
1165     uv__free(item);
1166 
1167     if (err != 0)
1168       return err;
1169 
1170   } else {
1171     pipe_client = (uv_pipe_t*) client;
1172     uv__pipe_connection_init(pipe_client);
1173 
1174     /* Find a connection instance that has been connected, but not yet
1175      * accepted. */
1176     req = server->pipe.serv.pending_accepts;
1177 
1178     if (!req) {
1179       /* No valid connections found, so we error out. */
1180       return WSAEWOULDBLOCK;
1181     }
1182 
1183     /* Initialize the client handle and copy the pipeHandle to the client */
1184     pipe_client->handle = req->pipeHandle;
1185     pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1186 
1187     /* Prepare the req to pick up a new connection */
1188     server->pipe.serv.pending_accepts = req->next_pending;
1189     req->next_pending = NULL;
1190     req->pipeHandle = INVALID_HANDLE_VALUE;
1191 
1192     server->handle = INVALID_HANDLE_VALUE;
1193     if (!(server->flags & UV_HANDLE_CLOSING)) {
1194       uv__pipe_queue_accept(loop, server, req, FALSE);
1195     }
1196   }
1197 
1198   return 0;
1199 }
1200 
1201 
1202 /* Starts listening for connections for the given pipe. */
uv__pipe_listen(uv_pipe_t * handle,int backlog,uv_connection_cb cb)1203 int uv__pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
1204   uv_loop_t* loop = handle->loop;
1205   int i;
1206 
1207   if (handle->flags & UV_HANDLE_LISTENING) {
1208     handle->stream.serv.connection_cb = cb;
1209   }
1210 
1211   if (!(handle->flags & UV_HANDLE_BOUND)) {
1212     return WSAEINVAL;
1213   }
1214 
1215   if (handle->flags & UV_HANDLE_READING) {
1216     return WSAEISCONN;
1217   }
1218 
1219   if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
1220     return ERROR_NOT_SUPPORTED;
1221   }
1222 
1223   if (handle->ipc) {
1224     return WSAEINVAL;
1225   }
1226 
1227   handle->flags |= UV_HANDLE_LISTENING;
1228   INCREASE_ACTIVE_COUNT(loop, handle);
1229   handle->stream.serv.connection_cb = cb;
1230 
1231   /* First pipe handle should have already been created in uv_pipe_bind */
1232   assert(handle->pipe.serv.accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
1233 
1234   for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
1235     uv__pipe_queue_accept(loop, handle, &handle->pipe.serv.accept_reqs[i], i == 0);
1236   }
1237 
1238   return 0;
1239 }
1240 
1241 
uv_pipe_zero_readfile_thread_proc(void * arg)1242 static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* arg) {
1243   uv_read_t* req = (uv_read_t*) arg;
1244   uv_pipe_t* handle = (uv_pipe_t*) req->data;
1245   uv_loop_t* loop = handle->loop;
1246   volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle;
1247   CRITICAL_SECTION* lock = &handle->pipe.conn.readfile_thread_lock;
1248   HANDLE thread;
1249   DWORD bytes;
1250   DWORD err;
1251 
1252   assert(req->type == UV_READ);
1253   assert(handle->type == UV_NAMED_PIPE);
1254 
1255   err = 0;
1256 
1257   /* Create a handle to the current thread. */
1258   if (!DuplicateHandle(GetCurrentProcess(),
1259                        GetCurrentThread(),
1260                        GetCurrentProcess(),
1261                        &thread,
1262                        0,
1263                        FALSE,
1264                        DUPLICATE_SAME_ACCESS)) {
1265     err = GetLastError();
1266     goto out1;
1267   }
1268 
1269   /* The lock needs to be held when thread handle is modified. */
1270   EnterCriticalSection(lock);
1271   if (*thread_ptr == INVALID_HANDLE_VALUE) {
1272     /* uv__pipe_interrupt_read() cancelled reading before we got here. */
1273     err = ERROR_OPERATION_ABORTED;
1274   } else {
1275     /* Let main thread know which worker thread is doing the blocking read. */
1276     assert(*thread_ptr == NULL);
1277     *thread_ptr = thread;
1278   }
1279   LeaveCriticalSection(lock);
1280 
1281   if (err)
1282     goto out2;
1283 
1284   /* Block the thread until data is available on the pipe, or the read is
1285    * cancelled. */
1286   if (!ReadFile(handle->handle, &uv_zero_, 0, &bytes, NULL))
1287     err = GetLastError();
1288 
1289   /* Let the main thread know the worker is past the point of blocking. */
1290   assert(thread == *thread_ptr);
1291   *thread_ptr = INVALID_HANDLE_VALUE;
1292 
1293   /* Briefly acquire the mutex. Since the main thread holds the lock while it
1294    * is spinning trying to cancel this thread's I/O, we will block here until
1295    * it stops doing that. */
1296   EnterCriticalSection(lock);
1297   LeaveCriticalSection(lock);
1298 
1299 out2:
1300   /* Close the handle to the current thread. */
1301   CloseHandle(thread);
1302 
1303 out1:
1304   /* Set request status and post a completion record to the IOCP. */
1305   if (err)
1306     SET_REQ_ERROR(req, err);
1307   else
1308     SET_REQ_SUCCESS(req);
1309   POST_COMPLETION_FOR_REQ(loop, req);
1310 
1311   return 0;
1312 }
1313 
1314 
uv_pipe_writefile_thread_proc(void * parameter)1315 static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) {
1316   int result;
1317   DWORD bytes;
1318   uv_write_t* req = (uv_write_t*) parameter;
1319   uv_pipe_t* handle = (uv_pipe_t*) req->handle;
1320   uv_loop_t* loop = handle->loop;
1321 
1322   assert(req != NULL);
1323   assert(req->type == UV_WRITE);
1324   assert(handle->type == UV_NAMED_PIPE);
1325 
1326   result = WriteFile(handle->handle,
1327                      req->write_buffer.base,
1328                      req->write_buffer.len,
1329                      &bytes,
1330                      NULL);
1331 
1332   if (!result) {
1333     SET_REQ_ERROR(req, GetLastError());
1334   }
1335 
1336   POST_COMPLETION_FOR_REQ(loop, req);
1337   return 0;
1338 }
1339 
1340 
post_completion_read_wait(void * context,BOOLEAN timed_out)1341 static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) {
1342   uv_read_t* req;
1343   uv_tcp_t* handle;
1344 
1345   req = (uv_read_t*) context;
1346   assert(req != NULL);
1347   handle = (uv_tcp_t*)req->data;
1348   assert(handle != NULL);
1349   assert(!timed_out);
1350 
1351   if (!PostQueuedCompletionStatus(handle->loop->iocp,
1352                                   req->u.io.overlapped.InternalHigh,
1353                                   0,
1354                                   &req->u.io.overlapped)) {
1355     uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
1356   }
1357 }
1358 
1359 
post_completion_write_wait(void * context,BOOLEAN timed_out)1360 static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) {
1361   uv_write_t* req;
1362   uv_tcp_t* handle;
1363 
1364   req = (uv_write_t*) context;
1365   assert(req != NULL);
1366   handle = (uv_tcp_t*)req->handle;
1367   assert(handle != NULL);
1368   assert(!timed_out);
1369 
1370   if (!PostQueuedCompletionStatus(handle->loop->iocp,
1371                                   req->u.io.overlapped.InternalHigh,
1372                                   0,
1373                                   &req->u.io.overlapped)) {
1374     uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
1375   }
1376 }
1377 
1378 
uv__pipe_queue_read(uv_loop_t * loop,uv_pipe_t * handle)1379 static void uv__pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
1380   uv_read_t* req;
1381   int result;
1382 
1383   assert(handle->flags & UV_HANDLE_READING);
1384   assert(!(handle->flags & UV_HANDLE_READ_PENDING));
1385 
1386   assert(handle->handle != INVALID_HANDLE_VALUE);
1387 
1388   req = &handle->read_req;
1389 
1390   if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1391     handle->pipe.conn.readfile_thread_handle = NULL; /* Reset cancellation. */
1392     if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc,
1393                            req,
1394                            WT_EXECUTELONGFUNCTION)) {
1395       /* Make this req pending reporting an error. */
1396       SET_REQ_ERROR(req, GetLastError());
1397       goto error;
1398     }
1399   } else {
1400     memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
1401     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1402       assert(req->event_handle != NULL);
1403       req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
1404     }
1405 
1406     /* Do 0-read */
1407     result = ReadFile(handle->handle,
1408                       &uv_zero_,
1409                       0,
1410                       NULL,
1411                       &req->u.io.overlapped);
1412 
1413     if (!result && GetLastError() != ERROR_IO_PENDING) {
1414       /* Make this req pending reporting an error. */
1415       SET_REQ_ERROR(req, GetLastError());
1416       goto error;
1417     }
1418 
1419     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1420       assert(req->wait_handle == INVALID_HANDLE_VALUE);
1421       if (!RegisterWaitForSingleObject(&req->wait_handle,
1422           req->event_handle, post_completion_read_wait, (void*) req,
1423           INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE)) {
1424         SET_REQ_ERROR(req, GetLastError());
1425         goto error;
1426       }
1427     }
1428   }
1429 
1430   /* Start the eof timer if there is one */
1431   eof_timer_start(handle);
1432   handle->flags |= UV_HANDLE_READ_PENDING;
1433   handle->reqs_pending++;
1434   return;
1435 
1436 error:
1437   uv__insert_pending_req(loop, (uv_req_t*)req);
1438   handle->flags |= UV_HANDLE_READ_PENDING;
1439   handle->reqs_pending++;
1440 }
1441 
1442 
uv__pipe_read_start(uv_pipe_t * handle,uv_alloc_cb alloc_cb,uv_read_cb read_cb)1443 int uv__pipe_read_start(uv_pipe_t* handle,
1444                         uv_alloc_cb alloc_cb,
1445                         uv_read_cb read_cb) {
1446   uv_loop_t* loop = handle->loop;
1447 
1448   handle->flags |= UV_HANDLE_READING;
1449   INCREASE_ACTIVE_COUNT(loop, handle);
1450   handle->read_cb = read_cb;
1451   handle->alloc_cb = alloc_cb;
1452 
1453   if (handle->read_req.event_handle == NULL) {
1454     handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL);
1455     if (handle->read_req.event_handle == NULL) {
1456       uv_fatal_error(GetLastError(), "CreateEvent");
1457     }
1458   }
1459 
1460   /* If reading was stopped and then started again, there could still be a read
1461    * request pending. */
1462   if (!(handle->flags & UV_HANDLE_READ_PENDING)) {
1463     uv__pipe_queue_read(loop, handle);
1464   }
1465 
1466   return 0;
1467 }
1468 
1469 
uv__insert_non_overlapped_write_req(uv_pipe_t * handle,uv_write_t * req)1470 static void uv__insert_non_overlapped_write_req(uv_pipe_t* handle,
1471     uv_write_t* req) {
1472   req->next_req = NULL;
1473   if (handle->pipe.conn.non_overlapped_writes_tail) {
1474     req->next_req =
1475       handle->pipe.conn.non_overlapped_writes_tail->next_req;
1476     handle->pipe.conn.non_overlapped_writes_tail->next_req = (uv_req_t*)req;
1477     handle->pipe.conn.non_overlapped_writes_tail = req;
1478   } else {
1479     req->next_req = (uv_req_t*)req;
1480     handle->pipe.conn.non_overlapped_writes_tail = req;
1481   }
1482 }
1483 
1484 
uv_remove_non_overlapped_write_req(uv_pipe_t * handle)1485 static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) {
1486   uv_write_t* req;
1487 
1488   if (handle->pipe.conn.non_overlapped_writes_tail) {
1489     req = (uv_write_t*)handle->pipe.conn.non_overlapped_writes_tail->next_req;
1490 
1491     if (req == handle->pipe.conn.non_overlapped_writes_tail) {
1492       handle->pipe.conn.non_overlapped_writes_tail = NULL;
1493     } else {
1494       handle->pipe.conn.non_overlapped_writes_tail->next_req =
1495         req->next_req;
1496     }
1497 
1498     return req;
1499   } else {
1500     /* queue empty */
1501     return NULL;
1502   }
1503 }
1504 
1505 
uv__queue_non_overlapped_write(uv_pipe_t * handle)1506 static void uv__queue_non_overlapped_write(uv_pipe_t* handle) {
1507   uv_write_t* req = uv_remove_non_overlapped_write_req(handle);
1508   if (req) {
1509     if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc,
1510                            req,
1511                            WT_EXECUTELONGFUNCTION)) {
1512       uv_fatal_error(GetLastError(), "QueueUserWorkItem");
1513     }
1514   }
1515 }
1516 
1517 
uv__build_coalesced_write_req(uv_write_t * user_req,const uv_buf_t bufs[],size_t nbufs,uv_write_t ** req_out,uv_buf_t * write_buf_out)1518 static int uv__build_coalesced_write_req(uv_write_t* user_req,
1519                                          const uv_buf_t bufs[],
1520                                          size_t nbufs,
1521                                          uv_write_t** req_out,
1522                                          uv_buf_t* write_buf_out) {
1523   /* Pack into a single heap-allocated buffer:
1524    *   (a) a uv_write_t structure where libuv stores the actual state.
1525    *   (b) a pointer to the original uv_write_t.
1526    *   (c) data from all `bufs` entries.
1527    */
1528   char* heap_buffer;
1529   size_t heap_buffer_length, heap_buffer_offset;
1530   uv__coalesced_write_t* coalesced_write_req; /* (a) + (b) */
1531   char* data_start;                           /* (c) */
1532   size_t data_length;
1533   unsigned int i;
1534 
1535   /* Compute combined size of all combined buffers from `bufs`. */
1536   data_length = 0;
1537   for (i = 0; i < nbufs; i++)
1538     data_length += bufs[i].len;
1539 
1540   /* The total combined size of data buffers should not exceed UINT32_MAX,
1541    * because WriteFile() won't accept buffers larger than that. */
1542   if (data_length > UINT32_MAX)
1543     return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
1544 
1545   /* Compute heap buffer size. */
1546   heap_buffer_length = sizeof *coalesced_write_req + /* (a) + (b) */
1547                        data_length;                  /* (c) */
1548 
1549   /* Allocate buffer. */
1550   heap_buffer = uv__malloc(heap_buffer_length);
1551   if (heap_buffer == NULL)
1552     return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
1553 
1554   /* Copy uv_write_t information to the buffer. */
1555   coalesced_write_req = (uv__coalesced_write_t*) heap_buffer;
1556   coalesced_write_req->req = *user_req; /* copy (a) */
1557   coalesced_write_req->req.coalesced = 1;
1558   coalesced_write_req->user_req = user_req;         /* copy (b) */
1559   heap_buffer_offset = sizeof *coalesced_write_req; /* offset (a) + (b) */
1560 
1561   /* Copy data buffers to the heap buffer. */
1562   data_start = &heap_buffer[heap_buffer_offset];
1563   for (i = 0; i < nbufs; i++) {
1564     memcpy(&heap_buffer[heap_buffer_offset],
1565            bufs[i].base,
1566            bufs[i].len);               /* copy (c) */
1567     heap_buffer_offset += bufs[i].len; /* offset (c) */
1568   }
1569   assert(heap_buffer_offset == heap_buffer_length);
1570 
1571   /* Set out arguments and return. */
1572   *req_out = &coalesced_write_req->req;
1573   *write_buf_out = uv_buf_init(data_start, (unsigned int) data_length);
1574   return 0;
1575 }
1576 
1577 
uv__pipe_write_data(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t bufs[],size_t nbufs,uv_write_cb cb,int copy_always)1578 static int uv__pipe_write_data(uv_loop_t* loop,
1579                                uv_write_t* req,
1580                                uv_pipe_t* handle,
1581                                const uv_buf_t bufs[],
1582                                size_t nbufs,
1583                                uv_write_cb cb,
1584                                int copy_always) {
1585   int err;
1586   int result;
1587   uv_buf_t write_buf;
1588 
1589   assert(handle->handle != INVALID_HANDLE_VALUE);
1590 
1591   UV_REQ_INIT(req, UV_WRITE);
1592   req->handle = (uv_stream_t*) handle;
1593   req->send_handle = NULL;
1594   req->cb = cb;
1595   /* Private fields. */
1596   req->coalesced = 0;
1597   req->event_handle = NULL;
1598   req->wait_handle = INVALID_HANDLE_VALUE;
1599 
1600   /* Prepare the overlapped structure. */
1601   memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
1602   if (handle->flags & (UV_HANDLE_EMULATE_IOCP | UV_HANDLE_BLOCKING_WRITES)) {
1603     req->event_handle = CreateEvent(NULL, 0, 0, NULL);
1604     if (req->event_handle == NULL) {
1605       uv_fatal_error(GetLastError(), "CreateEvent");
1606     }
1607     req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
1608   }
1609   req->write_buffer = uv_null_buf_;
1610 
1611   if (nbufs == 0) {
1612     /* Write empty buffer. */
1613     write_buf = uv_null_buf_;
1614   } else if (nbufs == 1 && !copy_always) {
1615     /* Write directly from bufs[0]. */
1616     write_buf = bufs[0];
1617   } else {
1618     /* Coalesce all `bufs` into one big buffer. This also creates a new
1619      * write-request structure that replaces the old one. */
1620     err = uv__build_coalesced_write_req(req, bufs, nbufs, &req, &write_buf);
1621     if (err != 0)
1622       return err;
1623   }
1624 
1625   if ((handle->flags &
1626       (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
1627       (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) {
1628     DWORD bytes;
1629     result =
1630         WriteFile(handle->handle, write_buf.base, write_buf.len, &bytes, NULL);
1631 
1632     if (!result) {
1633       err = GetLastError();
1634       return err;
1635     } else {
1636       /* Request completed immediately. */
1637       req->u.io.queued_bytes = 0;
1638     }
1639 
1640     REGISTER_HANDLE_REQ(loop, handle);
1641     handle->reqs_pending++;
1642     handle->stream.conn.write_reqs_pending++;
1643     POST_COMPLETION_FOR_REQ(loop, req);
1644     return 0;
1645   } else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1646     req->write_buffer = write_buf;
1647     uv__insert_non_overlapped_write_req(handle, req);
1648     if (handle->stream.conn.write_reqs_pending == 0) {
1649       uv__queue_non_overlapped_write(handle);
1650     }
1651 
1652     /* Request queued by the kernel. */
1653     req->u.io.queued_bytes = write_buf.len;
1654     handle->write_queue_size += req->u.io.queued_bytes;
1655   } else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
1656     /* Using overlapped IO, but wait for completion before returning */
1657     result = WriteFile(handle->handle,
1658                        write_buf.base,
1659                        write_buf.len,
1660                        NULL,
1661                        &req->u.io.overlapped);
1662 
1663     if (!result && GetLastError() != ERROR_IO_PENDING) {
1664       err = GetLastError();
1665       CloseHandle(req->event_handle);
1666       req->event_handle = NULL;
1667       return err;
1668     }
1669 
1670     if (result) {
1671       /* Request completed immediately. */
1672       req->u.io.queued_bytes = 0;
1673     } else {
1674       /* Request queued by the kernel. */
1675       req->u.io.queued_bytes = write_buf.len;
1676       handle->write_queue_size += req->u.io.queued_bytes;
1677       if (WaitForSingleObject(req->event_handle, INFINITE) !=
1678           WAIT_OBJECT_0) {
1679         err = GetLastError();
1680         CloseHandle(req->event_handle);
1681         req->event_handle = NULL;
1682         return err;
1683       }
1684     }
1685     CloseHandle(req->event_handle);
1686     req->event_handle = NULL;
1687 
1688     REGISTER_HANDLE_REQ(loop, handle);
1689     handle->reqs_pending++;
1690     handle->stream.conn.write_reqs_pending++;
1691     return 0;
1692   } else {
1693     result = WriteFile(handle->handle,
1694                        write_buf.base,
1695                        write_buf.len,
1696                        NULL,
1697                        &req->u.io.overlapped);
1698 
1699     if (!result && GetLastError() != ERROR_IO_PENDING) {
1700       return GetLastError();
1701     }
1702 
1703     if (result) {
1704       /* Request completed immediately. */
1705       req->u.io.queued_bytes = 0;
1706     } else {
1707       /* Request queued by the kernel. */
1708       req->u.io.queued_bytes = write_buf.len;
1709       handle->write_queue_size += req->u.io.queued_bytes;
1710     }
1711 
1712     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1713       if (!RegisterWaitForSingleObject(&req->wait_handle,
1714           req->event_handle, post_completion_write_wait, (void*) req,
1715           INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE)) {
1716         return GetLastError();
1717       }
1718     }
1719   }
1720 
1721   REGISTER_HANDLE_REQ(loop, handle);
1722   handle->reqs_pending++;
1723   handle->stream.conn.write_reqs_pending++;
1724 
1725   return 0;
1726 }
1727 
1728 
uv__pipe_get_ipc_remote_pid(uv_pipe_t * handle)1729 static DWORD uv__pipe_get_ipc_remote_pid(uv_pipe_t* handle) {
1730   DWORD* pid = &handle->pipe.conn.ipc_remote_pid;
1731 
1732   /* If the both ends of the IPC pipe are owned by the same process,
1733    * the remote end pid may not yet be set. If so, do it here.
1734    * TODO: this is weird; it'd probably better to use a handshake. */
1735   if (*pid == 0) {
1736     GetNamedPipeClientProcessId(handle->handle, pid);
1737     if (*pid == GetCurrentProcessId()) {
1738       GetNamedPipeServerProcessId(handle->handle, pid);
1739     }
1740   }
1741 
1742   return *pid;
1743 }
1744 
1745 
uv__pipe_write_ipc(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t data_bufs[],size_t data_buf_count,uv_stream_t * send_handle,uv_write_cb cb)1746 int uv__pipe_write_ipc(uv_loop_t* loop,
1747                        uv_write_t* req,
1748                        uv_pipe_t* handle,
1749                        const uv_buf_t data_bufs[],
1750                        size_t data_buf_count,
1751                        uv_stream_t* send_handle,
1752                        uv_write_cb cb) {
1753   uv_buf_t stack_bufs[6];
1754   uv_buf_t* bufs;
1755   size_t buf_count, buf_index;
1756   uv__ipc_frame_header_t frame_header;
1757   uv__ipc_socket_xfer_type_t xfer_type = UV__IPC_SOCKET_XFER_NONE;
1758   uv__ipc_socket_xfer_info_t xfer_info;
1759   uint64_t data_length;
1760   size_t i;
1761   int err;
1762 
1763   /* Compute the combined size of data buffers. */
1764   data_length = 0;
1765   for (i = 0; i < data_buf_count; i++)
1766     data_length += data_bufs[i].len;
1767   if (data_length > UINT32_MAX)
1768     return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
1769 
1770   /* Prepare the frame's socket xfer payload. */
1771   if (send_handle != NULL) {
1772     uv_tcp_t* send_tcp_handle = (uv_tcp_t*) send_handle;
1773 
1774     /* Verify that `send_handle` it is indeed a tcp handle. */
1775     if (send_tcp_handle->type != UV_TCP)
1776       return ERROR_NOT_SUPPORTED;
1777 
1778     /* Export the tcp handle. */
1779     err = uv__tcp_xfer_export(send_tcp_handle,
1780                               uv__pipe_get_ipc_remote_pid(handle),
1781                               &xfer_type,
1782                               &xfer_info);
1783     if (err != 0)
1784       return err;
1785   }
1786 
1787   /* Compute the number of uv_buf_t's required. */
1788   buf_count = 1 + data_buf_count; /* Frame header and data buffers. */
1789   if (send_handle != NULL)
1790     buf_count += 1; /* One extra for the socket xfer information. */
1791 
1792   /* Use the on-stack buffer array if it is big enough; otherwise allocate
1793    * space for it on the heap. */
1794   if (buf_count < ARRAY_SIZE(stack_bufs)) {
1795     /* Use on-stack buffer array. */
1796     bufs = stack_bufs;
1797   } else {
1798     /* Use heap-allocated buffer array. */
1799     bufs = uv__calloc(buf_count, sizeof(uv_buf_t));
1800     if (bufs == NULL)
1801       return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
1802   }
1803   buf_index = 0;
1804 
1805   /* Initialize frame header and add it to the buffers list. */
1806   memset(&frame_header, 0, sizeof frame_header);
1807   bufs[buf_index++] = uv_buf_init((char*) &frame_header, sizeof frame_header);
1808 
1809   if (send_handle != NULL) {
1810     /* Add frame header flags. */
1811     switch (xfer_type) {
1812       case UV__IPC_SOCKET_XFER_TCP_CONNECTION:
1813         frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER |
1814                               UV__IPC_FRAME_XFER_IS_TCP_CONNECTION;
1815         break;
1816       case UV__IPC_SOCKET_XFER_TCP_SERVER:
1817         frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER;
1818         break;
1819       default:
1820         assert(0);  /* Unreachable. */
1821     }
1822     /* Add xfer info buffer. */
1823     bufs[buf_index++] = uv_buf_init((char*) &xfer_info, sizeof xfer_info);
1824   }
1825 
1826   if (data_length > 0) {
1827     /* Update frame header. */
1828     frame_header.flags |= UV__IPC_FRAME_HAS_DATA;
1829     frame_header.data_length = (uint32_t) data_length;
1830     /* Add data buffers to buffers list. */
1831     for (i = 0; i < data_buf_count; i++)
1832       bufs[buf_index++] = data_bufs[i];
1833   }
1834 
1835   /* Write buffers. We set the `always_copy` flag, so it is not a problem that
1836    * some of the written data lives on the stack. */
1837   err = uv__pipe_write_data(loop, req, handle, bufs, buf_count, cb, 1);
1838 
1839   /* If we had to heap-allocate the bufs array, free it now. */
1840   if (bufs != stack_bufs) {
1841     uv__free(bufs);
1842   }
1843 
1844   return err;
1845 }
1846 
1847 
uv__pipe_write(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t bufs[],size_t nbufs,uv_stream_t * send_handle,uv_write_cb cb)1848 int uv__pipe_write(uv_loop_t* loop,
1849                    uv_write_t* req,
1850                    uv_pipe_t* handle,
1851                    const uv_buf_t bufs[],
1852                    size_t nbufs,
1853                    uv_stream_t* send_handle,
1854                    uv_write_cb cb) {
1855   if (handle->ipc) {
1856     /* IPC pipe write: use framing protocol. */
1857     return uv__pipe_write_ipc(loop, req, handle, bufs, nbufs, send_handle, cb);
1858   } else {
1859     /* Non-IPC pipe write: put data on the wire directly. */
1860     assert(send_handle == NULL);
1861     return uv__pipe_write_data(loop, req, handle, bufs, nbufs, cb, 0);
1862   }
1863 }
1864 
1865 
uv__pipe_read_eof(uv_loop_t * loop,uv_pipe_t * handle,uv_buf_t buf)1866 static void uv__pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
1867     uv_buf_t buf) {
1868   /* If there is an eof timer running, we don't need it any more, so discard
1869    * it. */
1870   eof_timer_destroy(handle);
1871 
1872   uv_read_stop((uv_stream_t*) handle);
1873 
1874   handle->read_cb((uv_stream_t*) handle, UV_EOF, &buf);
1875 }
1876 
1877 
uv__pipe_read_error(uv_loop_t * loop,uv_pipe_t * handle,int error,uv_buf_t buf)1878 static void uv__pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
1879     uv_buf_t buf) {
1880   /* If there is an eof timer running, we don't need it any more, so discard
1881    * it. */
1882   eof_timer_destroy(handle);
1883 
1884   uv_read_stop((uv_stream_t*) handle);
1885 
1886   handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf);
1887 }
1888 
1889 
uv__pipe_read_error_or_eof(uv_loop_t * loop,uv_pipe_t * handle,DWORD error,uv_buf_t buf)1890 static void uv__pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
1891     DWORD error, uv_buf_t buf) {
1892   if (error == ERROR_BROKEN_PIPE) {
1893     uv__pipe_read_eof(loop, handle, buf);
1894   } else {
1895     uv__pipe_read_error(loop, handle, error, buf);
1896   }
1897 }
1898 
1899 
uv__pipe_queue_ipc_xfer_info(uv_pipe_t * handle,uv__ipc_socket_xfer_type_t xfer_type,uv__ipc_socket_xfer_info_t * xfer_info)1900 static void uv__pipe_queue_ipc_xfer_info(
1901     uv_pipe_t* handle,
1902     uv__ipc_socket_xfer_type_t xfer_type,
1903     uv__ipc_socket_xfer_info_t* xfer_info) {
1904   uv__ipc_xfer_queue_item_t* item;
1905 
1906   item = (uv__ipc_xfer_queue_item_t*) uv__malloc(sizeof(*item));
1907   if (item == NULL)
1908     uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
1909 
1910   item->xfer_type = xfer_type;
1911   item->xfer_info = *xfer_info;
1912 
1913   uv__queue_insert_tail(&handle->pipe.conn.ipc_xfer_queue, &item->member);
1914   handle->pipe.conn.ipc_xfer_queue_length++;
1915 }
1916 
1917 
1918 /* Read an exact number of bytes from a pipe. If an error or end-of-file is
1919  * encountered before the requested number of bytes are read, an error is
1920  * returned. */
uv__pipe_read_exactly(uv_pipe_t * handle,void * buffer,DWORD count)1921 static DWORD uv__pipe_read_exactly(uv_pipe_t* handle, void* buffer, DWORD count) {
1922   uv_read_t* req;
1923   DWORD bytes_read;
1924   DWORD bytes_read_now;
1925 
1926   bytes_read = 0;
1927   while (bytes_read < count) {
1928     req = &handle->read_req;
1929     memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
1930     req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
1931     if (!ReadFile(handle->handle,
1932                   (char*) buffer + bytes_read,
1933                   count - bytes_read,
1934                   &bytes_read_now,
1935                   &req->u.io.overlapped)) {
1936       if (GetLastError() != ERROR_IO_PENDING)
1937         return GetLastError();
1938       if (!GetOverlappedResult(handle->handle, &req->u.io.overlapped, &bytes_read_now, TRUE))
1939         return GetLastError();
1940     }
1941 
1942     bytes_read += bytes_read_now;
1943   }
1944 
1945   assert(bytes_read == count);
1946   return 0;
1947 }
1948 
1949 
uv__pipe_read_data(uv_loop_t * loop,uv_pipe_t * handle,DWORD * bytes_read,DWORD max_bytes)1950 static int uv__pipe_read_data(uv_loop_t* loop,
1951                               uv_pipe_t* handle,
1952                               DWORD* bytes_read, /* inout argument */
1953                               DWORD max_bytes) {
1954   uv_buf_t buf;
1955   uv_read_t* req;
1956   DWORD r;
1957   DWORD bytes_available;
1958   int more;
1959 
1960   /* Ask the user for a buffer to read data into. */
1961   buf = uv_buf_init(NULL, 0);
1962   handle->alloc_cb((uv_handle_t*) handle, *bytes_read, &buf);
1963   if (buf.base == NULL || buf.len == 0) {
1964     handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
1965     return 0; /* Break out of read loop. */
1966   }
1967 
1968   /* Ensure we read at most the smaller of:
1969    *   (a) the length of the user-allocated buffer.
1970    *   (b) the maximum data length as specified by the `max_bytes` argument.
1971    *   (c) the amount of data that can be read non-blocking
1972    */
1973   if (max_bytes > buf.len)
1974     max_bytes = buf.len;
1975 
1976   if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1977     /* The user failed to supply a pipe that can be used non-blocking or with
1978      * threads. Try to estimate the amount of data that is safe to read without
1979      * blocking, in a race-y way however. */
1980     bytes_available = 0;
1981     if (!PeekNamedPipe(handle->handle, NULL, 0, NULL, &bytes_available, NULL)) {
1982       r = GetLastError();
1983     } else {
1984       if (max_bytes > bytes_available)
1985         max_bytes = bytes_available;
1986       *bytes_read = 0;
1987       if (max_bytes == 0 || ReadFile(handle->handle, buf.base, max_bytes, bytes_read, NULL))
1988         r = ERROR_SUCCESS;
1989       else
1990         r = GetLastError();
1991     }
1992     more = max_bytes < bytes_available;
1993   } else {
1994     /* Read into the user buffer.
1995      * Prepare an Event so that we can cancel if it doesn't complete immediately.
1996      */
1997     req = &handle->read_req;
1998     memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
1999     req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
2000     if (ReadFile(handle->handle, buf.base, max_bytes, bytes_read, &req->u.io.overlapped)) {
2001       r = ERROR_SUCCESS;
2002     } else {
2003       r = GetLastError();
2004       *bytes_read = 0;
2005       if (r == ERROR_IO_PENDING) {
2006         r = CancelIoEx(handle->handle, &req->u.io.overlapped);
2007         assert(r || GetLastError() == ERROR_NOT_FOUND);
2008         if (GetOverlappedResult(handle->handle, &req->u.io.overlapped, bytes_read, TRUE)) {
2009           r = ERROR_SUCCESS;
2010         } else {
2011           r = GetLastError();
2012           *bytes_read = 0;
2013         }
2014       }
2015     }
2016     more = *bytes_read == max_bytes;
2017   }
2018 
2019   /* Call the read callback. */
2020   if (r == ERROR_SUCCESS || r == ERROR_OPERATION_ABORTED)
2021     handle->read_cb((uv_stream_t*) handle, *bytes_read, &buf);
2022   else
2023     uv__pipe_read_error_or_eof(loop, handle, r, buf);
2024 
2025   return more;
2026 }
2027 
2028 
uv__pipe_read_ipc(uv_loop_t * loop,uv_pipe_t * handle)2029 static int uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
2030   uint32_t* data_remaining;
2031   DWORD err;
2032   DWORD more;
2033   DWORD bytes_read;
2034 
2035   data_remaining = &handle->pipe.conn.ipc_data_frame.payload_remaining;
2036 
2037   if (*data_remaining > 0) {
2038     /* Read frame data payload. */
2039     bytes_read = *data_remaining;
2040     more = uv__pipe_read_data(loop, handle, &bytes_read, bytes_read);
2041     *data_remaining -= bytes_read;
2042 
2043   } else {
2044     /* Start of a new IPC frame. */
2045     uv__ipc_frame_header_t frame_header;
2046     uint32_t xfer_flags;
2047     uv__ipc_socket_xfer_type_t xfer_type;
2048     uv__ipc_socket_xfer_info_t xfer_info;
2049 
2050     /* Read the IPC frame header. */
2051     err = uv__pipe_read_exactly(
2052         handle, &frame_header, sizeof frame_header);
2053     if (err)
2054       goto error;
2055 
2056     /* Validate that flags are valid. */
2057     if ((frame_header.flags & ~UV__IPC_FRAME_VALID_FLAGS) != 0)
2058       goto invalid;
2059     /* Validate that reserved2 is zero. */
2060     if (frame_header.reserved2 != 0)
2061       goto invalid;
2062 
2063     /* Parse xfer flags. */
2064     xfer_flags = frame_header.flags & UV__IPC_FRAME_XFER_FLAGS;
2065     if (xfer_flags & UV__IPC_FRAME_HAS_SOCKET_XFER) {
2066       /* Socket coming -- determine the type. */
2067       xfer_type = xfer_flags & UV__IPC_FRAME_XFER_IS_TCP_CONNECTION
2068                       ? UV__IPC_SOCKET_XFER_TCP_CONNECTION
2069                       : UV__IPC_SOCKET_XFER_TCP_SERVER;
2070     } else if (xfer_flags == 0) {
2071       /* No socket. */
2072       xfer_type = UV__IPC_SOCKET_XFER_NONE;
2073     } else {
2074       /* Invalid flags. */
2075       goto invalid;
2076     }
2077 
2078     /* Parse data frame information. */
2079     if (frame_header.flags & UV__IPC_FRAME_HAS_DATA) {
2080       *data_remaining = frame_header.data_length;
2081     } else if (frame_header.data_length != 0) {
2082       /* Data length greater than zero but data flag not set -- invalid. */
2083       goto invalid;
2084     }
2085 
2086     /* If no socket xfer info follows, return here. Data will be read in a
2087      * subsequent invocation of uv__pipe_read_ipc(). */
2088     if (xfer_type != UV__IPC_SOCKET_XFER_NONE) {
2089       /* Read transferred socket information. */
2090       err = uv__pipe_read_exactly(handle, &xfer_info, sizeof xfer_info);
2091       if (err)
2092         goto error;
2093 
2094       /* Store the pending socket info. */
2095       uv__pipe_queue_ipc_xfer_info(handle, xfer_type, &xfer_info);
2096     }
2097   }
2098 
2099   /* Return whether the caller should immediately try another read call to get
2100    * more data. Calling uv__pipe_read_exactly will hang if there isn't data
2101    * available, so we cannot do this unless we are guaranteed not to reach that.
2102    */
2103   more = *data_remaining > 0;
2104   return more;
2105 
2106 invalid:
2107   /* Invalid frame. */
2108   err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */
2109 
2110 error:
2111   uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
2112   return 0; /* Break out of read loop. */
2113 }
2114 
2115 
uv__process_pipe_read_req(uv_loop_t * loop,uv_pipe_t * handle,uv_req_t * req)2116 void uv__process_pipe_read_req(uv_loop_t* loop,
2117                                uv_pipe_t* handle,
2118                                uv_req_t* req) {
2119   DWORD err;
2120   DWORD more;
2121   DWORD bytes_requested;
2122   assert(handle->type == UV_NAMED_PIPE);
2123 
2124   handle->flags &= ~(UV_HANDLE_READ_PENDING | UV_HANDLE_CANCELLATION_PENDING);
2125   DECREASE_PENDING_REQ_COUNT(handle);
2126   eof_timer_stop(handle);
2127 
2128   if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
2129     UnregisterWait(handle->read_req.wait_handle);
2130     handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
2131   }
2132 
2133   /* At this point, we're done with bookkeeping. If the user has stopped
2134    * reading the pipe in the meantime, there is nothing left to do, since there
2135    * is no callback that we can call. */
2136   if (!(handle->flags & UV_HANDLE_READING))
2137     return;
2138 
2139   if (!REQ_SUCCESS(req)) {
2140     /* An error occurred doing the zero-read. */
2141     err = GET_REQ_ERROR(req);
2142 
2143     /* If the read was cancelled by uv__pipe_interrupt_read(), the request may
2144      * indicate an ERROR_OPERATION_ABORTED error. This error isn't relevant to
2145      * the user; we'll start a new zero-read at the end of this function. */
2146     if (err != ERROR_OPERATION_ABORTED)
2147       uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
2148 
2149   } else {
2150     /* The zero-read completed without error, indicating there is data
2151      * available in the kernel buffer. */
2152     while (handle->flags & UV_HANDLE_READING) {
2153       bytes_requested = 65536;
2154       /* Depending on the type of pipe, read either IPC frames or raw data. */
2155       if (handle->ipc)
2156           more = uv__pipe_read_ipc(loop, handle);
2157       else
2158           more = uv__pipe_read_data(loop, handle, &bytes_requested, INT32_MAX);
2159 
2160       /* If no bytes were read, treat this as an indication that an error
2161        * occurred, and break out of the read loop. */
2162       if (more == 0)
2163         break;
2164     }
2165   }
2166 
2167   /* Start another zero-read request if necessary. */
2168   if ((handle->flags & UV_HANDLE_READING) &&
2169       !(handle->flags & UV_HANDLE_READ_PENDING)) {
2170     uv__pipe_queue_read(loop, handle);
2171   }
2172 }
2173 
2174 
uv__process_pipe_write_req(uv_loop_t * loop,uv_pipe_t * handle,uv_write_t * req)2175 void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
2176     uv_write_t* req) {
2177   int err;
2178 
2179   assert(handle->type == UV_NAMED_PIPE);
2180 
2181   assert(handle->write_queue_size >= req->u.io.queued_bytes);
2182   handle->write_queue_size -= req->u.io.queued_bytes;
2183 
2184   UNREGISTER_HANDLE_REQ(loop, handle);
2185 
2186   if (req->wait_handle != INVALID_HANDLE_VALUE) {
2187     UnregisterWait(req->wait_handle);
2188     req->wait_handle = INVALID_HANDLE_VALUE;
2189   }
2190   if (req->event_handle) {
2191     CloseHandle(req->event_handle);
2192     req->event_handle = NULL;
2193   }
2194 
2195   err = GET_REQ_ERROR(req);
2196 
2197   /* If this was a coalesced write, extract pointer to the user_provided
2198    * uv_write_t structure so we can pass the expected pointer to the callback,
2199    * then free the heap-allocated write req. */
2200   if (req->coalesced) {
2201     uv__coalesced_write_t* coalesced_write =
2202         container_of(req, uv__coalesced_write_t, req);
2203     req = coalesced_write->user_req;
2204     uv__free(coalesced_write);
2205   }
2206   if (req->cb) {
2207     req->cb(req, uv_translate_sys_error(err));
2208   }
2209 
2210   handle->stream.conn.write_reqs_pending--;
2211 
2212   if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
2213       handle->pipe.conn.non_overlapped_writes_tail) {
2214     assert(handle->stream.conn.write_reqs_pending > 0);
2215     uv__queue_non_overlapped_write(handle);
2216   }
2217 
2218   if (handle->stream.conn.write_reqs_pending == 0 &&
2219       uv__is_stream_shutting(handle))
2220     uv__pipe_shutdown(loop, handle, handle->stream.conn.shutdown_req);
2221 
2222   DECREASE_PENDING_REQ_COUNT(handle);
2223 }
2224 
2225 
uv__process_pipe_accept_req(uv_loop_t * loop,uv_pipe_t * handle,uv_req_t * raw_req)2226 void uv__process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
2227     uv_req_t* raw_req) {
2228   uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req;
2229 
2230   assert(handle->type == UV_NAMED_PIPE);
2231 
2232   if (handle->flags & UV_HANDLE_CLOSING) {
2233     /* The req->pipeHandle should be freed already in uv__pipe_close(). */
2234     assert(req->pipeHandle == INVALID_HANDLE_VALUE);
2235     DECREASE_PENDING_REQ_COUNT(handle);
2236     return;
2237   }
2238 
2239   if (REQ_SUCCESS(req)) {
2240     assert(req->pipeHandle != INVALID_HANDLE_VALUE);
2241     req->next_pending = handle->pipe.serv.pending_accepts;
2242     handle->pipe.serv.pending_accepts = req;
2243 
2244     if (handle->stream.serv.connection_cb) {
2245       handle->stream.serv.connection_cb((uv_stream_t*)handle, 0);
2246     }
2247   } else {
2248     if (req->pipeHandle != INVALID_HANDLE_VALUE) {
2249       CloseHandle(req->pipeHandle);
2250       req->pipeHandle = INVALID_HANDLE_VALUE;
2251     }
2252     if (!(handle->flags & UV_HANDLE_CLOSING)) {
2253       uv__pipe_queue_accept(loop, handle, req, FALSE);
2254     }
2255   }
2256 
2257   DECREASE_PENDING_REQ_COUNT(handle);
2258 }
2259 
2260 
uv__process_pipe_connect_req(uv_loop_t * loop,uv_pipe_t * handle,uv_connect_t * req)2261 void uv__process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
2262     uv_connect_t* req) {
2263   HANDLE pipeHandle;
2264   DWORD duplex_flags;
2265   int err;
2266 
2267   assert(handle->type == UV_NAMED_PIPE);
2268 
2269   UNREGISTER_HANDLE_REQ(loop, handle);
2270 
2271   err = 0;
2272   if (REQ_SUCCESS(req)) {
2273     pipeHandle = req->u.connect.pipeHandle;
2274     duplex_flags = req->u.connect.duplex_flags;
2275     if (handle->flags & UV_HANDLE_CLOSING)
2276       err = UV_ECANCELED;
2277     else
2278       err = uv__set_pipe_handle(loop, handle, pipeHandle, -1, duplex_flags);
2279     if (err)
2280       CloseHandle(pipeHandle);
2281   } else {
2282     err = uv_translate_sys_error(GET_REQ_ERROR(req));
2283   }
2284 
2285   if (req->cb)
2286     req->cb(req, err);
2287 
2288   DECREASE_PENDING_REQ_COUNT(handle);
2289 }
2290 
2291 
2292 
uv__process_pipe_shutdown_req(uv_loop_t * loop,uv_pipe_t * handle,uv_shutdown_t * req)2293 void uv__process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
2294     uv_shutdown_t* req) {
2295   int err;
2296 
2297   assert(handle->type == UV_NAMED_PIPE);
2298 
2299   /* Clear the shutdown_req field so we don't go here again. */
2300   handle->stream.conn.shutdown_req = NULL;
2301   UNREGISTER_HANDLE_REQ(loop, handle);
2302 
2303   if (handle->flags & UV_HANDLE_CLOSING) {
2304     /* Already closing. Cancel the shutdown. */
2305     err = UV_ECANCELED;
2306   } else if (!REQ_SUCCESS(req)) {
2307     /* An error occurred in trying to shutdown gracefully. */
2308     err = uv_translate_sys_error(GET_REQ_ERROR(req));
2309   } else {
2310     if (handle->flags & UV_HANDLE_READABLE) {
2311       /* Initialize and optionally start the eof timer. Only do this if the pipe
2312        * is readable and we haven't seen EOF come in ourselves. */
2313       eof_timer_init(handle);
2314 
2315       /* If reading start the timer right now. Otherwise uv__pipe_queue_read will
2316        * start it. */
2317       if (handle->flags & UV_HANDLE_READ_PENDING) {
2318         eof_timer_start(handle);
2319       }
2320 
2321     } else {
2322       /* This pipe is not readable. We can just close it to let the other end
2323        * know that we're done writing. */
2324       close_pipe(handle);
2325     }
2326     err = 0;
2327   }
2328 
2329   if (req->cb)
2330     req->cb(req, err);
2331 
2332   DECREASE_PENDING_REQ_COUNT(handle);
2333 }
2334 
2335 
eof_timer_init(uv_pipe_t * pipe)2336 static void eof_timer_init(uv_pipe_t* pipe) {
2337   int r;
2338 
2339   assert(pipe->pipe.conn.eof_timer == NULL);
2340   assert(pipe->flags & UV_HANDLE_CONNECTION);
2341 
2342   pipe->pipe.conn.eof_timer = (uv_timer_t*) uv__malloc(sizeof *pipe->pipe.conn.eof_timer);
2343 
2344   r = uv_timer_init(pipe->loop, pipe->pipe.conn.eof_timer);
2345   assert(r == 0);  /* timers can't fail */
2346   (void) r;
2347   pipe->pipe.conn.eof_timer->data = pipe;
2348   uv_unref((uv_handle_t*) pipe->pipe.conn.eof_timer);
2349 }
2350 
2351 
eof_timer_start(uv_pipe_t * pipe)2352 static void eof_timer_start(uv_pipe_t* pipe) {
2353   assert(pipe->flags & UV_HANDLE_CONNECTION);
2354 
2355   if (pipe->pipe.conn.eof_timer != NULL) {
2356     uv_timer_start(pipe->pipe.conn.eof_timer, eof_timer_cb, eof_timeout, 0);
2357   }
2358 }
2359 
2360 
eof_timer_stop(uv_pipe_t * pipe)2361 static void eof_timer_stop(uv_pipe_t* pipe) {
2362   assert(pipe->flags & UV_HANDLE_CONNECTION);
2363 
2364   if (pipe->pipe.conn.eof_timer != NULL) {
2365     uv_timer_stop(pipe->pipe.conn.eof_timer);
2366   }
2367 }
2368 
2369 
eof_timer_cb(uv_timer_t * timer)2370 static void eof_timer_cb(uv_timer_t* timer) {
2371   uv_pipe_t* pipe = (uv_pipe_t*) timer->data;
2372   uv_loop_t* loop = timer->loop;
2373 
2374   assert(pipe->type == UV_NAMED_PIPE);
2375 
2376   /* This should always be true, since we start the timer only in
2377    * uv__pipe_queue_read after successfully calling ReadFile, or in
2378    * uv__process_pipe_shutdown_req if a read is pending, and we always
2379    * immediately stop the timer in uv__process_pipe_read_req. */
2380   assert(pipe->flags & UV_HANDLE_READ_PENDING);
2381 
2382   /* If there are many packets coming off the iocp then the timer callback may
2383    * be called before the read request is coming off the queue. Therefore we
2384    * check here if the read request has completed but will be processed later.
2385    */
2386   if ((pipe->flags & UV_HANDLE_READ_PENDING) &&
2387       HasOverlappedIoCompleted(&pipe->read_req.u.io.overlapped)) {
2388     return;
2389   }
2390 
2391   /* Force both ends off the pipe. */
2392   close_pipe(pipe);
2393 
2394   /* Stop reading, so the pending read that is going to fail will not be
2395    * reported to the user. */
2396   uv_read_stop((uv_stream_t*) pipe);
2397 
2398   /* Report the eof and update flags. This will get reported even if the user
2399    * stopped reading in the meantime. TODO: is that okay? */
2400   uv__pipe_read_eof(loop, pipe, uv_null_buf_);
2401 }
2402 
2403 
eof_timer_destroy(uv_pipe_t * pipe)2404 static void eof_timer_destroy(uv_pipe_t* pipe) {
2405   assert(pipe->flags & UV_HANDLE_CONNECTION);
2406 
2407   if (pipe->pipe.conn.eof_timer) {
2408     uv_close((uv_handle_t*) pipe->pipe.conn.eof_timer, eof_timer_close_cb);
2409     pipe->pipe.conn.eof_timer = NULL;
2410   }
2411 }
2412 
2413 
eof_timer_close_cb(uv_handle_t * handle)2414 static void eof_timer_close_cb(uv_handle_t* handle) {
2415   assert(handle->type == UV_TIMER);
2416   uv__free(handle);
2417 }
2418 
2419 
uv_pipe_open(uv_pipe_t * pipe,uv_file file)2420 int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
2421   HANDLE os_handle = uv__get_osfhandle(file);
2422   NTSTATUS nt_status;
2423   IO_STATUS_BLOCK io_status;
2424   FILE_ACCESS_INFORMATION access;
2425   DWORD duplex_flags = 0;
2426   int err;
2427 
2428   if (os_handle == INVALID_HANDLE_VALUE)
2429     return UV_EBADF;
2430   if (pipe->flags & UV_HANDLE_PIPESERVER)
2431     return UV_EINVAL;
2432   if (pipe->flags & UV_HANDLE_CONNECTION)
2433     return UV_EBUSY;
2434 
2435   uv__pipe_connection_init(pipe);
2436   uv__once_init();
2437   /* In order to avoid closing a stdio file descriptor 0-2, duplicate the
2438    * underlying OS handle and forget about the original fd.
2439    * We could also opt to use the original OS handle and just never close it,
2440    * but then there would be no reliable way to cancel pending read operations
2441    * upon close.
2442    */
2443   if (file <= 2) {
2444     if (!DuplicateHandle(INVALID_HANDLE_VALUE,
2445                          os_handle,
2446                          INVALID_HANDLE_VALUE,
2447                          &os_handle,
2448                          0,
2449                          FALSE,
2450                          DUPLICATE_SAME_ACCESS))
2451       return uv_translate_sys_error(GetLastError());
2452     assert(os_handle != INVALID_HANDLE_VALUE);
2453     file = -1;
2454   }
2455 
2456   /* Determine what kind of permissions we have on this handle.
2457    * Cygwin opens the pipe in message mode, but we can support it,
2458    * just query the access flags and set the stream flags accordingly.
2459    */
2460   nt_status = pNtQueryInformationFile(os_handle,
2461                                       &io_status,
2462                                       &access,
2463                                       sizeof(access),
2464                                       FileAccessInformation);
2465   if (nt_status != STATUS_SUCCESS)
2466     return UV_EINVAL;
2467 
2468   if (pipe->ipc) {
2469     if (!(access.AccessFlags & FILE_WRITE_DATA) ||
2470         !(access.AccessFlags & FILE_READ_DATA)) {
2471       return UV_EINVAL;
2472     }
2473   }
2474 
2475   if (access.AccessFlags & FILE_WRITE_DATA)
2476     duplex_flags |= UV_HANDLE_WRITABLE;
2477   if (access.AccessFlags & FILE_READ_DATA)
2478     duplex_flags |= UV_HANDLE_READABLE;
2479 
2480   err = uv__set_pipe_handle(pipe->loop,
2481                             pipe,
2482                             os_handle,
2483                             file,
2484                             duplex_flags);
2485   if (err) {
2486     if (file == -1)
2487       CloseHandle(os_handle);
2488     return err;
2489   }
2490 
2491   if (pipe->ipc) {
2492     assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
2493     GetNamedPipeClientProcessId(os_handle, &pipe->pipe.conn.ipc_remote_pid);
2494     if (pipe->pipe.conn.ipc_remote_pid == GetCurrentProcessId()) {
2495       GetNamedPipeServerProcessId(os_handle, &pipe->pipe.conn.ipc_remote_pid);
2496     }
2497     assert(pipe->pipe.conn.ipc_remote_pid != (DWORD)(uv_pid_t) -1);
2498   }
2499   return 0;
2500 }
2501 
2502 
uv__pipe_getname(const uv_pipe_t * handle,char * buffer,size_t * size)2503 static int uv__pipe_getname(const uv_pipe_t* handle, char* buffer, size_t* size) {
2504   NTSTATUS nt_status;
2505   IO_STATUS_BLOCK io_status;
2506   FILE_NAME_INFORMATION tmp_name_info;
2507   FILE_NAME_INFORMATION* name_info;
2508   WCHAR* name_buf;
2509   unsigned int name_size;
2510   unsigned int name_len;
2511   int err;
2512 
2513   uv__once_init();
2514   name_info = NULL;
2515 
2516   if (handle->name != NULL) {
2517     /* The user might try to query the name before we are connected,
2518      * and this is just easier to return the cached value if we have it. */
2519     return uv__copy_utf16_to_utf8(handle->name, -1, buffer, size);
2520   }
2521 
2522   if (handle->handle == INVALID_HANDLE_VALUE) {
2523     *size = 0;
2524     return UV_EINVAL;
2525   }
2526 
2527   /* NtQueryInformationFile will block if another thread is performing a
2528    * blocking operation on the queried handle. If the pipe handle is
2529    * synchronous, there may be a worker thread currently calling ReadFile() on
2530    * the pipe handle, which could cause a deadlock. To avoid this, interrupt
2531    * the read. */
2532   if (handle->flags & UV_HANDLE_CONNECTION &&
2533       handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
2534     uv__pipe_interrupt_read((uv_pipe_t*) handle); /* cast away const warning */
2535   }
2536 
2537   nt_status = pNtQueryInformationFile(handle->handle,
2538                                       &io_status,
2539                                       &tmp_name_info,
2540                                       sizeof tmp_name_info,
2541                                       FileNameInformation);
2542   if (nt_status == STATUS_BUFFER_OVERFLOW) {
2543     name_size = sizeof(*name_info) + tmp_name_info.FileNameLength;
2544     name_info = uv__malloc(name_size);
2545     if (!name_info) {
2546       *size = 0;
2547       return UV_ENOMEM;
2548     }
2549 
2550     nt_status = pNtQueryInformationFile(handle->handle,
2551                                         &io_status,
2552                                         name_info,
2553                                         name_size,
2554                                         FileNameInformation);
2555   }
2556 
2557   if (nt_status != STATUS_SUCCESS) {
2558     *size = 0;
2559     err = uv_translate_sys_error(pRtlNtStatusToDosError(nt_status));
2560     goto error;
2561   }
2562 
2563   if (!name_info) {
2564     /* the struct on stack was used */
2565     name_buf = tmp_name_info.FileName;
2566     name_len = tmp_name_info.FileNameLength;
2567   } else {
2568     name_buf = name_info->FileName;
2569     name_len = name_info->FileNameLength;
2570   }
2571 
2572   if (name_len == 0) {
2573     *size = 0;
2574     err = 0;
2575     goto error;
2576   }
2577 
2578   name_len /= sizeof(WCHAR);
2579 
2580   /* "\\\\.\\pipe" + name */
2581   if (*size < pipe_prefix_len) {
2582     *size = 0;
2583   }
2584   else {
2585     memcpy(buffer, pipe_prefix, pipe_prefix_len);
2586     *size -= pipe_prefix_len;
2587   }
2588   err = uv__copy_utf16_to_utf8(name_buf, name_len, buffer+pipe_prefix_len, size);
2589   *size += pipe_prefix_len;
2590 
2591 error:
2592   uv__free(name_info);
2593   return err;
2594 }
2595 
2596 
uv_pipe_pending_count(uv_pipe_t * handle)2597 int uv_pipe_pending_count(uv_pipe_t* handle) {
2598   if (!handle->ipc)
2599     return 0;
2600   return handle->pipe.conn.ipc_xfer_queue_length;
2601 }
2602 
2603 
uv_pipe_getsockname(const uv_pipe_t * handle,char * buffer,size_t * size)2604 int uv_pipe_getsockname(const uv_pipe_t* handle, char* buffer, size_t* size) {
2605   if (buffer == NULL || size == NULL || *size == 0)
2606     return UV_EINVAL;
2607 
2608   if (handle->flags & UV_HANDLE_BOUND)
2609     return uv__pipe_getname(handle, buffer, size);
2610 
2611   if (handle->flags & UV_HANDLE_CONNECTION ||
2612       handle->handle != INVALID_HANDLE_VALUE) {
2613     *size = 0;
2614     return 0;
2615   }
2616 
2617   return UV_EBADF;
2618 }
2619 
2620 
uv_pipe_getpeername(const uv_pipe_t * handle,char * buffer,size_t * size)2621 int uv_pipe_getpeername(const uv_pipe_t* handle, char* buffer, size_t* size) {
2622   if (buffer == NULL || size == NULL || *size == 0)
2623     return UV_EINVAL;
2624 
2625   /* emulate unix behaviour */
2626   if (handle->flags & UV_HANDLE_BOUND)
2627     return UV_ENOTCONN;
2628 
2629   if (handle->handle != INVALID_HANDLE_VALUE)
2630     return uv__pipe_getname(handle, buffer, size);
2631 
2632   if (handle->flags & UV_HANDLE_CONNECTION) {
2633     if (handle->name != NULL)
2634       return uv__pipe_getname(handle, buffer, size);
2635   }
2636 
2637   return UV_EBADF;
2638 }
2639 
2640 
uv_pipe_pending_type(uv_pipe_t * handle)2641 uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) {
2642   if (!handle->ipc)
2643     return UV_UNKNOWN_HANDLE;
2644   if (handle->pipe.conn.ipc_xfer_queue_length == 0)
2645     return UV_UNKNOWN_HANDLE;
2646   else
2647     return UV_TCP;
2648 }
2649 
uv_pipe_chmod(uv_pipe_t * handle,int mode)2650 int uv_pipe_chmod(uv_pipe_t* handle, int mode) {
2651   SID_IDENTIFIER_AUTHORITY sid_world = { SECURITY_WORLD_SID_AUTHORITY };
2652   PACL old_dacl, new_dacl;
2653   PSECURITY_DESCRIPTOR sd;
2654   EXPLICIT_ACCESS ea;
2655   PSID everyone;
2656   int error;
2657 
2658   if (handle == NULL || handle->handle == INVALID_HANDLE_VALUE)
2659     return UV_EBADF;
2660 
2661   if (mode != UV_READABLE &&
2662       mode != UV_WRITABLE &&
2663       mode != (UV_WRITABLE | UV_READABLE))
2664     return UV_EINVAL;
2665 
2666   if (!AllocateAndInitializeSid(&sid_world,
2667                                 1,
2668                                 SECURITY_WORLD_RID,
2669                                 0, 0, 0, 0, 0, 0, 0,
2670                                 &everyone)) {
2671     error = GetLastError();
2672     goto done;
2673   }
2674 
2675   if (GetSecurityInfo(handle->handle,
2676                       SE_KERNEL_OBJECT,
2677                       DACL_SECURITY_INFORMATION,
2678                       NULL,
2679                       NULL,
2680                       &old_dacl,
2681                       NULL,
2682                       &sd)) {
2683     error = GetLastError();
2684     goto clean_sid;
2685   }
2686 
2687   memset(&ea, 0, sizeof(EXPLICIT_ACCESS));
2688   if (mode & UV_READABLE)
2689     ea.grfAccessPermissions |= GENERIC_READ | FILE_WRITE_ATTRIBUTES;
2690   if (mode & UV_WRITABLE)
2691     ea.grfAccessPermissions |= GENERIC_WRITE | FILE_READ_ATTRIBUTES;
2692   ea.grfAccessPermissions |= SYNCHRONIZE;
2693   ea.grfAccessMode = SET_ACCESS;
2694   ea.grfInheritance = NO_INHERITANCE;
2695   ea.Trustee.TrusteeForm = TRUSTEE_IS_SID;
2696   ea.Trustee.TrusteeType = TRUSTEE_IS_WELL_KNOWN_GROUP;
2697   ea.Trustee.ptstrName = (LPTSTR)everyone;
2698 
2699   if (SetEntriesInAcl(1, &ea, old_dacl, &new_dacl)) {
2700     error = GetLastError();
2701     goto clean_sd;
2702   }
2703 
2704   if (SetSecurityInfo(handle->handle,
2705                       SE_KERNEL_OBJECT,
2706                       DACL_SECURITY_INFORMATION,
2707                       NULL,
2708                       NULL,
2709                       new_dacl,
2710                       NULL)) {
2711     error = GetLastError();
2712     goto clean_dacl;
2713   }
2714 
2715   error = 0;
2716 
2717 clean_dacl:
2718   LocalFree((HLOCAL) new_dacl);
2719 clean_sd:
2720   LocalFree((HLOCAL) sd);
2721 clean_sid:
2722   FreeSid(everyone);
2723 done:
2724   return uv_translate_sys_error(error);
2725 }
2726