1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22 #include <assert.h>
23 #include <io.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <string.h>
27
28 #include "handle-inl.h"
29 #include "internal.h"
30 #include "req-inl.h"
31 #include "stream-inl.h"
32 #include "uv-common.h"
33 #include "uv.h"
34
35 #include <aclapi.h>
36 #include <accctrl.h>
37
38 /* A zero-size buffer for use by uv_pipe_read */
39 static char uv_zero_[] = "";
40
41 /* Null uv_buf_t */
42 static const uv_buf_t uv_null_buf_ = { 0, NULL };
43
44 /* The timeout that the pipe will wait for the remote end to write data when
45 * the local ends wants to shut it down. */
46 static const int64_t eof_timeout = 50; /* ms */
47
48 static const int default_pending_pipe_instances = 4;
49
50 /* Pipe prefix */
51 static char pipe_prefix[] = "\\\\?\\pipe";
52 static const size_t pipe_prefix_len = sizeof(pipe_prefix) - 1;
53
54 /* IPC incoming xfer queue item. */
55 typedef struct {
56 uv__ipc_socket_xfer_type_t xfer_type;
57 uv__ipc_socket_xfer_info_t xfer_info;
58 struct uv__queue member;
59 } uv__ipc_xfer_queue_item_t;
60
61 /* IPC frame header flags. */
62 /* clang-format off */
63 enum {
64 UV__IPC_FRAME_HAS_DATA = 0x01,
65 UV__IPC_FRAME_HAS_SOCKET_XFER = 0x02,
66 UV__IPC_FRAME_XFER_IS_TCP_CONNECTION = 0x04,
67 /* These are combinations of the flags above. */
68 UV__IPC_FRAME_XFER_FLAGS = 0x06,
69 UV__IPC_FRAME_VALID_FLAGS = 0x07
70 };
71 /* clang-format on */
72
73 /* IPC frame header. */
74 typedef struct {
75 uint32_t flags;
76 uint32_t reserved1; /* Ignored. */
77 uint32_t data_length; /* Must be zero if there is no data. */
78 uint32_t reserved2; /* Must be zero. */
79 } uv__ipc_frame_header_t;
80
81 /* To implement the IPC protocol correctly, these structures must have exactly
82 * the right size. */
83 STATIC_ASSERT(sizeof(uv__ipc_frame_header_t) == 16);
84 STATIC_ASSERT(sizeof(uv__ipc_socket_xfer_info_t) == 632);
85
86 /* Coalesced write request. */
87 typedef struct {
88 uv_write_t req; /* Internal heap-allocated write request. */
89 uv_write_t* user_req; /* Pointer to user-specified uv_write_t. */
90 } uv__coalesced_write_t;
91
92
93 static void eof_timer_init(uv_pipe_t* pipe);
94 static void eof_timer_start(uv_pipe_t* pipe);
95 static void eof_timer_stop(uv_pipe_t* pipe);
96 static void eof_timer_cb(uv_timer_t* timer);
97 static void eof_timer_destroy(uv_pipe_t* pipe);
98 static void eof_timer_close_cb(uv_handle_t* handle);
99
100
101 /* Does the file path contain embedded nul bytes? */
includes_nul(const char * s,size_t n)102 static int includes_nul(const char *s, size_t n) {
103 if (n == 0)
104 return 0;
105 return NULL != memchr(s, '\0', n);
106 }
107
108
uv__unique_pipe_name(char * ptr,char * name,size_t size)109 static void uv__unique_pipe_name(char* ptr, char* name, size_t size) {
110 snprintf(name, size, "\\\\?\\pipe\\uv\\%p-%lu", ptr, GetCurrentProcessId());
111 }
112
113
uv_pipe_init(uv_loop_t * loop,uv_pipe_t * handle,int ipc)114 int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
115 uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
116
117 handle->reqs_pending = 0;
118 handle->handle = INVALID_HANDLE_VALUE;
119 handle->name = NULL;
120 handle->pipe.conn.ipc_remote_pid = 0;
121 handle->pipe.conn.ipc_data_frame.payload_remaining = 0;
122 uv__queue_init(&handle->pipe.conn.ipc_xfer_queue);
123 handle->pipe.conn.ipc_xfer_queue_length = 0;
124 handle->ipc = ipc;
125 handle->pipe.conn.non_overlapped_writes_tail = NULL;
126
127 return 0;
128 }
129
130
uv__pipe_connection_init(uv_pipe_t * handle)131 static void uv__pipe_connection_init(uv_pipe_t* handle) {
132 assert(!(handle->flags & UV_HANDLE_PIPESERVER));
133 uv__connection_init((uv_stream_t*) handle);
134 handle->read_req.data = handle;
135 handle->pipe.conn.eof_timer = NULL;
136 }
137
138
open_named_pipe(const WCHAR * name,DWORD * duplex_flags)139 static HANDLE open_named_pipe(const WCHAR* name, DWORD* duplex_flags) {
140 HANDLE pipeHandle;
141
142 /*
143 * Assume that we have a duplex pipe first, so attempt to
144 * connect with GENERIC_READ | GENERIC_WRITE.
145 */
146 pipeHandle = CreateFileW(name,
147 GENERIC_READ | GENERIC_WRITE,
148 0,
149 NULL,
150 OPEN_EXISTING,
151 FILE_FLAG_OVERLAPPED,
152 NULL);
153 if (pipeHandle != INVALID_HANDLE_VALUE) {
154 *duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
155 return pipeHandle;
156 }
157
158 /*
159 * If the pipe is not duplex CreateFileW fails with
160 * ERROR_ACCESS_DENIED. In that case try to connect
161 * as a read-only or write-only.
162 */
163 if (GetLastError() == ERROR_ACCESS_DENIED) {
164 pipeHandle = CreateFileW(name,
165 GENERIC_READ | FILE_WRITE_ATTRIBUTES,
166 0,
167 NULL,
168 OPEN_EXISTING,
169 FILE_FLAG_OVERLAPPED,
170 NULL);
171
172 if (pipeHandle != INVALID_HANDLE_VALUE) {
173 *duplex_flags = UV_HANDLE_READABLE;
174 return pipeHandle;
175 }
176 }
177
178 if (GetLastError() == ERROR_ACCESS_DENIED) {
179 pipeHandle = CreateFileW(name,
180 GENERIC_WRITE | FILE_READ_ATTRIBUTES,
181 0,
182 NULL,
183 OPEN_EXISTING,
184 FILE_FLAG_OVERLAPPED,
185 NULL);
186
187 if (pipeHandle != INVALID_HANDLE_VALUE) {
188 *duplex_flags = UV_HANDLE_WRITABLE;
189 return pipeHandle;
190 }
191 }
192
193 return INVALID_HANDLE_VALUE;
194 }
195
196
close_pipe(uv_pipe_t * pipe)197 static void close_pipe(uv_pipe_t* pipe) {
198 assert(pipe->u.fd == -1 || pipe->u.fd > 2);
199 if (pipe->u.fd == -1)
200 CloseHandle(pipe->handle);
201 else
202 _close(pipe->u.fd);
203
204 pipe->u.fd = -1;
205 pipe->handle = INVALID_HANDLE_VALUE;
206 }
207
208
uv__pipe_server(HANDLE * pipeHandle_ptr,DWORD access,char * name,size_t nameSize,char * random)209 static int uv__pipe_server(
210 HANDLE* pipeHandle_ptr, DWORD access,
211 char* name, size_t nameSize, char* random) {
212 HANDLE pipeHandle;
213 int err;
214
215 for (;;) {
216 uv__unique_pipe_name(random, name, nameSize);
217
218 pipeHandle = CreateNamedPipeA(name,
219 access | FILE_FLAG_FIRST_PIPE_INSTANCE,
220 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0,
221 NULL);
222
223 if (pipeHandle != INVALID_HANDLE_VALUE) {
224 /* No name collisions. We're done. */
225 break;
226 }
227
228 err = GetLastError();
229 if (err != ERROR_PIPE_BUSY && err != ERROR_ACCESS_DENIED) {
230 goto error;
231 }
232
233 /* Pipe name collision. Increment the random number and try again. */
234 random++;
235 }
236
237 *pipeHandle_ptr = pipeHandle;
238
239 return 0;
240
241 error:
242 if (pipeHandle != INVALID_HANDLE_VALUE)
243 CloseHandle(pipeHandle);
244
245 return err;
246 }
247
248
uv__create_pipe_pair(HANDLE * server_pipe_ptr,HANDLE * client_pipe_ptr,unsigned int server_flags,unsigned int client_flags,int inherit_client,char * random)249 static int uv__create_pipe_pair(
250 HANDLE* server_pipe_ptr, HANDLE* client_pipe_ptr,
251 unsigned int server_flags, unsigned int client_flags,
252 int inherit_client, char* random) {
253 /* allowed flags are: UV_READABLE_PIPE | UV_WRITABLE_PIPE | UV_NONBLOCK_PIPE */
254 char pipe_name[64];
255 SECURITY_ATTRIBUTES sa;
256 DWORD server_access;
257 DWORD client_access;
258 HANDLE server_pipe;
259 HANDLE client_pipe;
260 int err;
261
262 server_pipe = INVALID_HANDLE_VALUE;
263 client_pipe = INVALID_HANDLE_VALUE;
264
265 server_access = 0;
266 if (server_flags & UV_READABLE_PIPE)
267 server_access |= PIPE_ACCESS_INBOUND;
268 if (server_flags & UV_WRITABLE_PIPE)
269 server_access |= PIPE_ACCESS_OUTBOUND;
270 if (server_flags & UV_NONBLOCK_PIPE)
271 server_access |= FILE_FLAG_OVERLAPPED;
272 server_access |= WRITE_DAC;
273
274 client_access = 0;
275 if (client_flags & UV_READABLE_PIPE)
276 client_access |= GENERIC_READ;
277 else
278 client_access |= FILE_READ_ATTRIBUTES;
279 if (client_flags & UV_WRITABLE_PIPE)
280 client_access |= GENERIC_WRITE;
281 else
282 client_access |= FILE_WRITE_ATTRIBUTES;
283 client_access |= WRITE_DAC;
284
285 /* Create server pipe handle. */
286 err = uv__pipe_server(&server_pipe,
287 server_access,
288 pipe_name,
289 sizeof(pipe_name),
290 random);
291 if (err)
292 goto error;
293
294 /* Create client pipe handle. */
295 sa.nLength = sizeof sa;
296 sa.lpSecurityDescriptor = NULL;
297 sa.bInheritHandle = inherit_client;
298
299 client_pipe = CreateFileA(pipe_name,
300 client_access,
301 0,
302 &sa,
303 OPEN_EXISTING,
304 (client_flags & UV_NONBLOCK_PIPE) ? FILE_FLAG_OVERLAPPED : 0,
305 NULL);
306 if (client_pipe == INVALID_HANDLE_VALUE) {
307 err = GetLastError();
308 goto error;
309 }
310
311 #ifndef NDEBUG
312 /* Validate that the pipe was opened in the right mode. */
313 {
314 DWORD mode;
315 BOOL r;
316 r = GetNamedPipeHandleState(client_pipe, &mode, NULL, NULL, NULL, NULL, 0);
317 if (r == TRUE) {
318 assert(mode == (PIPE_READMODE_BYTE | PIPE_WAIT));
319 } else {
320 fprintf(stderr, "libuv assertion failure: GetNamedPipeHandleState failed\n");
321 }
322 }
323 #endif
324
325 /* Do a blocking ConnectNamedPipe. This should not block because we have
326 * both ends of the pipe created. */
327 if (!ConnectNamedPipe(server_pipe, NULL)) {
328 if (GetLastError() != ERROR_PIPE_CONNECTED) {
329 err = GetLastError();
330 goto error;
331 }
332 }
333
334 *client_pipe_ptr = client_pipe;
335 *server_pipe_ptr = server_pipe;
336 return 0;
337
338 error:
339 if (server_pipe != INVALID_HANDLE_VALUE)
340 CloseHandle(server_pipe);
341
342 if (client_pipe != INVALID_HANDLE_VALUE)
343 CloseHandle(client_pipe);
344
345 return err;
346 }
347
348
uv_pipe(uv_file fds[2],int read_flags,int write_flags)349 int uv_pipe(uv_file fds[2], int read_flags, int write_flags) {
350 uv_file temp[2];
351 int err;
352 HANDLE readh;
353 HANDLE writeh;
354
355 /* Make the server side the inbound (read) end, */
356 /* so that both ends will have FILE_READ_ATTRIBUTES permission. */
357 /* TODO: better source of local randomness than &fds? */
358 read_flags |= UV_READABLE_PIPE;
359 write_flags |= UV_WRITABLE_PIPE;
360 err = uv__create_pipe_pair(&readh, &writeh, read_flags, write_flags, 0, (char*) &fds[0]);
361 if (err != 0)
362 return err;
363 temp[0] = _open_osfhandle((intptr_t) readh, 0);
364 if (temp[0] == -1) {
365 if (errno == UV_EMFILE)
366 err = UV_EMFILE;
367 else
368 err = UV_UNKNOWN;
369 CloseHandle(readh);
370 CloseHandle(writeh);
371 return err;
372 }
373 temp[1] = _open_osfhandle((intptr_t) writeh, 0);
374 if (temp[1] == -1) {
375 if (errno == UV_EMFILE)
376 err = UV_EMFILE;
377 else
378 err = UV_UNKNOWN;
379 _close(temp[0]);
380 CloseHandle(writeh);
381 return err;
382 }
383 fds[0] = temp[0];
384 fds[1] = temp[1];
385 return 0;
386 }
387
388
uv__create_stdio_pipe_pair(uv_loop_t * loop,uv_pipe_t * parent_pipe,HANDLE * child_pipe_ptr,unsigned int flags)389 int uv__create_stdio_pipe_pair(uv_loop_t* loop,
390 uv_pipe_t* parent_pipe, HANDLE* child_pipe_ptr, unsigned int flags) {
391 /* The parent_pipe is always the server_pipe and kept by libuv.
392 * The child_pipe is always the client_pipe and is passed to the child.
393 * The flags are specified with respect to their usage in the child. */
394 HANDLE server_pipe;
395 HANDLE client_pipe;
396 unsigned int server_flags;
397 unsigned int client_flags;
398 int err;
399
400 uv__pipe_connection_init(parent_pipe);
401
402 server_pipe = INVALID_HANDLE_VALUE;
403 client_pipe = INVALID_HANDLE_VALUE;
404
405 server_flags = 0;
406 client_flags = 0;
407 if (flags & UV_READABLE_PIPE) {
408 /* The server needs inbound (read) access too, otherwise CreateNamedPipe()
409 * won't give us the FILE_READ_ATTRIBUTES permission. We need that to probe
410 * the state of the write buffer when we're trying to shutdown the pipe. */
411 server_flags |= UV_READABLE_PIPE | UV_WRITABLE_PIPE;
412 client_flags |= UV_READABLE_PIPE;
413 }
414 if (flags & UV_WRITABLE_PIPE) {
415 server_flags |= UV_READABLE_PIPE;
416 client_flags |= UV_WRITABLE_PIPE;
417 }
418 server_flags |= UV_NONBLOCK_PIPE;
419 if (flags & UV_NONBLOCK_PIPE || parent_pipe->ipc) {
420 client_flags |= UV_NONBLOCK_PIPE;
421 }
422
423 err = uv__create_pipe_pair(&server_pipe, &client_pipe,
424 server_flags, client_flags, 1, (char*) server_pipe);
425 if (err)
426 goto error;
427
428 if (CreateIoCompletionPort(server_pipe,
429 loop->iocp,
430 (ULONG_PTR) parent_pipe,
431 0) == NULL) {
432 err = GetLastError();
433 goto error;
434 }
435
436 parent_pipe->handle = server_pipe;
437 *child_pipe_ptr = client_pipe;
438
439 /* The server end is now readable and/or writable. */
440 if (flags & UV_READABLE_PIPE)
441 parent_pipe->flags |= UV_HANDLE_WRITABLE;
442 if (flags & UV_WRITABLE_PIPE)
443 parent_pipe->flags |= UV_HANDLE_READABLE;
444
445 return 0;
446
447 error:
448 if (server_pipe != INVALID_HANDLE_VALUE)
449 CloseHandle(server_pipe);
450
451 if (client_pipe != INVALID_HANDLE_VALUE)
452 CloseHandle(client_pipe);
453
454 return err;
455 }
456
457
uv__set_pipe_handle(uv_loop_t * loop,uv_pipe_t * handle,HANDLE pipeHandle,int fd,DWORD duplex_flags)458 static int uv__set_pipe_handle(uv_loop_t* loop,
459 uv_pipe_t* handle,
460 HANDLE pipeHandle,
461 int fd,
462 DWORD duplex_flags) {
463 NTSTATUS nt_status;
464 IO_STATUS_BLOCK io_status;
465 FILE_MODE_INFORMATION mode_info;
466 DWORD mode = PIPE_READMODE_BYTE | PIPE_WAIT;
467 DWORD current_mode = 0;
468 DWORD err = 0;
469
470 assert(handle->flags & UV_HANDLE_CONNECTION);
471 assert(!(handle->flags & UV_HANDLE_PIPESERVER));
472 if (handle->flags & UV_HANDLE_CLOSING)
473 return UV_EINVAL;
474 if (handle->handle != INVALID_HANDLE_VALUE)
475 return UV_EBUSY;
476
477 if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
478 err = GetLastError();
479 if (err == ERROR_ACCESS_DENIED) {
480 /*
481 * SetNamedPipeHandleState can fail if the handle doesn't have either
482 * GENERIC_WRITE or FILE_WRITE_ATTRIBUTES.
483 * But if the handle already has the desired wait and blocking modes
484 * we can continue.
485 */
486 if (!GetNamedPipeHandleState(pipeHandle, ¤t_mode, NULL, NULL,
487 NULL, NULL, 0)) {
488 return uv_translate_sys_error(GetLastError());
489 } else if (current_mode & PIPE_NOWAIT) {
490 return UV_EACCES;
491 }
492 } else {
493 /* If this returns ERROR_INVALID_PARAMETER we probably opened
494 * something that is not a pipe. */
495 if (err == ERROR_INVALID_PARAMETER) {
496 return UV_ENOTSOCK;
497 }
498 return uv_translate_sys_error(err);
499 }
500 }
501
502 /* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */
503 nt_status = pNtQueryInformationFile(pipeHandle,
504 &io_status,
505 &mode_info,
506 sizeof(mode_info),
507 FileModeInformation);
508 if (nt_status != STATUS_SUCCESS) {
509 return uv_translate_sys_error(err);
510 }
511
512 if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT ||
513 mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) {
514 /* Non-overlapped pipe. */
515 handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE;
516 handle->pipe.conn.readfile_thread_handle = NULL;
517 InitializeCriticalSection(&handle->pipe.conn.readfile_thread_lock);
518 } else {
519 /* Overlapped pipe. Try to associate with IOCP. */
520 if (CreateIoCompletionPort(pipeHandle,
521 loop->iocp,
522 (ULONG_PTR) handle,
523 0) == NULL) {
524 handle->flags |= UV_HANDLE_EMULATE_IOCP;
525 }
526 }
527
528 handle->handle = pipeHandle;
529 handle->u.fd = fd;
530 handle->flags |= duplex_flags;
531
532 return 0;
533 }
534
535
pipe_alloc_accept(uv_loop_t * loop,uv_pipe_t * handle,uv_pipe_accept_t * req,BOOL firstInstance)536 static int pipe_alloc_accept(uv_loop_t* loop, uv_pipe_t* handle,
537 uv_pipe_accept_t* req, BOOL firstInstance) {
538 assert(req->pipeHandle == INVALID_HANDLE_VALUE);
539
540 req->pipeHandle =
541 CreateNamedPipeW(handle->name,
542 PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | WRITE_DAC |
543 (firstInstance ? FILE_FLAG_FIRST_PIPE_INSTANCE : 0),
544 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
545 PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
546
547 if (req->pipeHandle == INVALID_HANDLE_VALUE) {
548 return 0;
549 }
550
551 /* Associate it with IOCP so we can get events. */
552 if (CreateIoCompletionPort(req->pipeHandle,
553 loop->iocp,
554 (ULONG_PTR) handle,
555 0) == NULL) {
556 uv_fatal_error(GetLastError(), "CreateIoCompletionPort");
557 }
558
559 /* Stash a handle in the server object for use from places such as
560 * getsockname and chmod. As we transfer ownership of these to client
561 * objects, we'll allocate new ones here. */
562 handle->handle = req->pipeHandle;
563
564 return 1;
565 }
566
567
pipe_shutdown_thread_proc(void * parameter)568 static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) {
569 uv_loop_t* loop;
570 uv_pipe_t* handle;
571 uv_shutdown_t* req;
572
573 req = (uv_shutdown_t*) parameter;
574 assert(req);
575 handle = (uv_pipe_t*) req->handle;
576 assert(handle);
577 loop = handle->loop;
578 assert(loop);
579
580 FlushFileBuffers(handle->handle);
581
582 /* Post completed */
583 POST_COMPLETION_FOR_REQ(loop, req);
584
585 return 0;
586 }
587
588
uv__pipe_shutdown(uv_loop_t * loop,uv_pipe_t * handle,uv_shutdown_t * req)589 void uv__pipe_shutdown(uv_loop_t* loop, uv_pipe_t* handle, uv_shutdown_t *req) {
590 DWORD result;
591 NTSTATUS nt_status;
592 IO_STATUS_BLOCK io_status;
593 FILE_PIPE_LOCAL_INFORMATION pipe_info;
594
595 assert(handle->flags & UV_HANDLE_CONNECTION);
596 assert(req != NULL);
597 assert(handle->stream.conn.write_reqs_pending == 0);
598 SET_REQ_SUCCESS(req);
599
600 if (handle->flags & UV_HANDLE_CLOSING) {
601 uv__insert_pending_req(loop, (uv_req_t*) req);
602 return;
603 }
604
605 /* Try to avoid flushing the pipe buffer in the thread pool. */
606 nt_status = pNtQueryInformationFile(handle->handle,
607 &io_status,
608 &pipe_info,
609 sizeof pipe_info,
610 FilePipeLocalInformation);
611
612 if (nt_status != STATUS_SUCCESS) {
613 SET_REQ_ERROR(req, pRtlNtStatusToDosError(nt_status));
614 handle->flags |= UV_HANDLE_WRITABLE; /* Questionable. */
615 uv__insert_pending_req(loop, (uv_req_t*) req);
616 return;
617 }
618
619 if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
620 /* Short-circuit, no need to call FlushFileBuffers:
621 * all writes have been read. */
622 uv__insert_pending_req(loop, (uv_req_t*) req);
623 return;
624 }
625
626 /* Run FlushFileBuffers in the thread pool. */
627 result = QueueUserWorkItem(pipe_shutdown_thread_proc,
628 req,
629 WT_EXECUTELONGFUNCTION);
630 if (!result) {
631 SET_REQ_ERROR(req, GetLastError());
632 handle->flags |= UV_HANDLE_WRITABLE; /* Questionable. */
633 uv__insert_pending_req(loop, (uv_req_t*) req);
634 return;
635 }
636 }
637
638
uv__pipe_endgame(uv_loop_t * loop,uv_pipe_t * handle)639 void uv__pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
640 uv__ipc_xfer_queue_item_t* xfer_queue_item;
641
642 assert(handle->reqs_pending == 0);
643 assert(handle->flags & UV_HANDLE_CLOSING);
644 assert(!(handle->flags & UV_HANDLE_CLOSED));
645
646 if (handle->flags & UV_HANDLE_CONNECTION) {
647 /* Free pending sockets */
648 while (!uv__queue_empty(&handle->pipe.conn.ipc_xfer_queue)) {
649 struct uv__queue* q;
650 SOCKET socket;
651
652 q = uv__queue_head(&handle->pipe.conn.ipc_xfer_queue);
653 uv__queue_remove(q);
654 xfer_queue_item = uv__queue_data(q, uv__ipc_xfer_queue_item_t, member);
655
656 /* Materialize socket and close it */
657 socket = WSASocketW(FROM_PROTOCOL_INFO,
658 FROM_PROTOCOL_INFO,
659 FROM_PROTOCOL_INFO,
660 &xfer_queue_item->xfer_info.socket_info,
661 0,
662 WSA_FLAG_OVERLAPPED);
663 uv__free(xfer_queue_item);
664
665 if (socket != INVALID_SOCKET)
666 closesocket(socket);
667 }
668 handle->pipe.conn.ipc_xfer_queue_length = 0;
669
670 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
671 if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
672 UnregisterWait(handle->read_req.wait_handle);
673 handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
674 }
675 if (handle->read_req.event_handle != NULL) {
676 CloseHandle(handle->read_req.event_handle);
677 handle->read_req.event_handle = NULL;
678 }
679 }
680
681 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)
682 DeleteCriticalSection(&handle->pipe.conn.readfile_thread_lock);
683 }
684
685 if (handle->flags & UV_HANDLE_PIPESERVER) {
686 assert(handle->pipe.serv.accept_reqs);
687 uv__free(handle->pipe.serv.accept_reqs);
688 handle->pipe.serv.accept_reqs = NULL;
689 }
690
691 uv__handle_close(handle);
692 }
693
694
uv_pipe_pending_instances(uv_pipe_t * handle,int count)695 void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
696 if (handle->flags & UV_HANDLE_BOUND)
697 return;
698 handle->pipe.serv.pending_instances = count;
699 handle->flags |= UV_HANDLE_PIPESERVER;
700 }
701
702
703 /* Creates a pipe server. */
uv_pipe_bind(uv_pipe_t * handle,const char * name)704 int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
705 return uv_pipe_bind2(handle, name, strlen(name), 0);
706 }
707
708
uv_pipe_bind2(uv_pipe_t * handle,const char * name,size_t namelen,unsigned int flags)709 int uv_pipe_bind2(uv_pipe_t* handle,
710 const char* name,
711 size_t namelen,
712 unsigned int flags) {
713 uv_loop_t* loop = handle->loop;
714 int i, err;
715 uv_pipe_accept_t* req;
716 char* name_copy;
717
718 if (flags & ~UV_PIPE_NO_TRUNCATE) {
719 return UV_EINVAL;
720 }
721
722 if (name == NULL) {
723 return UV_EINVAL;
724 }
725
726 if (namelen == 0) {
727 return UV_EINVAL;
728 }
729
730 if (includes_nul(name, namelen)) {
731 return UV_EINVAL;
732 }
733
734 if (handle->flags & UV_HANDLE_BOUND) {
735 return UV_EINVAL;
736 }
737
738 if (uv__is_closing(handle)) {
739 return UV_EINVAL;
740 }
741
742 name_copy = uv__malloc(namelen + 1);
743 if (name_copy == NULL) {
744 return UV_ENOMEM;
745 }
746
747 memcpy(name_copy, name, namelen);
748 name_copy[namelen] = '\0';
749
750 if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
751 handle->pipe.serv.pending_instances = default_pending_pipe_instances;
752 }
753
754 err = UV_ENOMEM;
755 handle->pipe.serv.accept_reqs = (uv_pipe_accept_t*)
756 uv__malloc(sizeof(uv_pipe_accept_t) * handle->pipe.serv.pending_instances);
757 if (handle->pipe.serv.accept_reqs == NULL) {
758 goto error;
759 }
760
761 for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
762 req = &handle->pipe.serv.accept_reqs[i];
763 UV_REQ_INIT(req, UV_ACCEPT);
764 req->data = handle;
765 req->pipeHandle = INVALID_HANDLE_VALUE;
766 req->next_pending = NULL;
767 }
768
769 /* TODO(bnoordhuis) Add converters that take a |length| parameter. */
770 err = uv__convert_utf8_to_utf16(name_copy, &handle->name);
771 uv__free(name_copy);
772 name_copy = NULL;
773
774 if (err) {
775 goto error;
776 }
777
778 /*
779 * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE.
780 * If this fails then there's already a pipe server for the given pipe name.
781 */
782 if (!pipe_alloc_accept(loop,
783 handle,
784 &handle->pipe.serv.accept_reqs[0],
785 TRUE)) {
786 err = GetLastError();
787 if (err == ERROR_ACCESS_DENIED) {
788 err = UV_EADDRINUSE;
789 } else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) {
790 err = UV_EACCES;
791 } else {
792 err = uv_translate_sys_error(err);
793 }
794 goto error;
795 }
796
797 handle->pipe.serv.pending_accepts = NULL;
798 handle->flags |= UV_HANDLE_PIPESERVER;
799 handle->flags |= UV_HANDLE_BOUND;
800
801 return 0;
802
803 error:
804 uv__free(handle->pipe.serv.accept_reqs);
805 uv__free(handle->name);
806 uv__free(name_copy);
807 handle->pipe.serv.accept_reqs = NULL;
808 handle->name = NULL;
809
810 return err;
811 }
812
813
pipe_connect_thread_proc(void * parameter)814 static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
815 uv_loop_t* loop;
816 uv_pipe_t* handle;
817 uv_connect_t* req;
818 HANDLE pipeHandle = INVALID_HANDLE_VALUE;
819 DWORD duplex_flags;
820
821 req = (uv_connect_t*) parameter;
822 assert(req);
823 handle = (uv_pipe_t*) req->handle;
824 assert(handle);
825 loop = handle->loop;
826 assert(loop);
827
828 /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. We wait
829 * up to 30 seconds for the pipe to become available with WaitNamedPipe. */
830 while (WaitNamedPipeW(req->u.connect.name, 30000)) {
831 /* The pipe is now available, try to connect. */
832 pipeHandle = open_named_pipe(req->u.connect.name, &duplex_flags);
833 if (pipeHandle != INVALID_HANDLE_VALUE)
834 break;
835
836 SwitchToThread();
837 }
838
839 uv__free(req->u.connect.name);
840 req->u.connect.name = NULL;
841 if (pipeHandle != INVALID_HANDLE_VALUE) {
842 SET_REQ_SUCCESS(req);
843 req->u.connect.pipeHandle = pipeHandle;
844 req->u.connect.duplex_flags = duplex_flags;
845 } else {
846 SET_REQ_ERROR(req, GetLastError());
847 }
848
849 /* Post completed */
850 POST_COMPLETION_FOR_REQ(loop, req);
851
852 return 0;
853 }
854
855
uv_pipe_connect(uv_connect_t * req,uv_pipe_t * handle,const char * name,uv_connect_cb cb)856 void uv_pipe_connect(uv_connect_t* req,
857 uv_pipe_t* handle,
858 const char* name,
859 uv_connect_cb cb) {
860 uv_loop_t* loop;
861 int err;
862
863 err = uv_pipe_connect2(req, handle, name, strlen(name), 0, cb);
864
865 if (err) {
866 loop = handle->loop;
867 /* Make this req pending reporting an error. */
868 SET_REQ_ERROR(req, err);
869 uv__insert_pending_req(loop, (uv_req_t*) req);
870 handle->reqs_pending++;
871 REGISTER_HANDLE_REQ(loop, handle, req);
872 }
873 }
874
875
uv_pipe_connect2(uv_connect_t * req,uv_pipe_t * handle,const char * name,size_t namelen,unsigned int flags,uv_connect_cb cb)876 int uv_pipe_connect2(uv_connect_t* req,
877 uv_pipe_t* handle,
878 const char* name,
879 size_t namelen,
880 unsigned int flags,
881 uv_connect_cb cb) {
882 uv_loop_t* loop;
883 int err;
884 size_t nameSize;
885 HANDLE pipeHandle = INVALID_HANDLE_VALUE;
886 DWORD duplex_flags;
887 char* name_copy;
888
889 loop = handle->loop;
890 UV_REQ_INIT(req, UV_CONNECT);
891 req->handle = (uv_stream_t*) handle;
892 req->cb = cb;
893 req->u.connect.pipeHandle = INVALID_HANDLE_VALUE;
894 req->u.connect.duplex_flags = 0;
895 req->u.connect.name = NULL;
896
897 if (flags & ~UV_PIPE_NO_TRUNCATE) {
898 return UV_EINVAL;
899 }
900
901 if (name == NULL) {
902 return UV_EINVAL;
903 }
904
905 if (namelen == 0) {
906 return UV_EINVAL;
907 }
908
909 if (includes_nul(name, namelen)) {
910 return UV_EINVAL;
911 }
912
913 name_copy = uv__malloc(namelen + 1);
914 if (name_copy == NULL) {
915 return UV_ENOMEM;
916 }
917
918 memcpy(name_copy, name, namelen);
919 name_copy[namelen] = '\0';
920
921 if (handle->flags & UV_HANDLE_PIPESERVER) {
922 err = ERROR_INVALID_PARAMETER;
923 goto error;
924 }
925 if (handle->flags & UV_HANDLE_CONNECTION) {
926 err = ERROR_PIPE_BUSY;
927 goto error;
928 }
929 uv__pipe_connection_init(handle);
930
931 /* TODO(bnoordhuis) Add converters that take a |length| parameter. */
932 err = uv__convert_utf8_to_utf16(name_copy, &handle->name);
933 uv__free(name_copy);
934 name_copy = NULL;
935
936 if (err) {
937 err = ERROR_NO_UNICODE_TRANSLATION;
938 goto error;
939 }
940
941 pipeHandle = open_named_pipe(handle->name, &duplex_flags);
942 if (pipeHandle == INVALID_HANDLE_VALUE) {
943 if (GetLastError() == ERROR_PIPE_BUSY) {
944 nameSize = (wcslen(handle->name) + 1) * sizeof(WCHAR);
945 req->u.connect.name = uv__malloc(nameSize);
946 if (!req->u.connect.name) {
947 uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
948 }
949
950 memcpy(req->u.connect.name, handle->name, nameSize);
951
952 /* Wait for the server to make a pipe instance available. */
953 if (!QueueUserWorkItem(&pipe_connect_thread_proc,
954 req,
955 WT_EXECUTELONGFUNCTION)) {
956 uv__free(req->u.connect.name);
957 req->u.connect.name = NULL;
958 err = GetLastError();
959 goto error;
960 }
961
962 REGISTER_HANDLE_REQ(loop, handle, req);
963 handle->reqs_pending++;
964
965 return 0;
966 }
967
968 err = GetLastError();
969 goto error;
970 }
971
972 req->u.connect.pipeHandle = pipeHandle;
973 req->u.connect.duplex_flags = duplex_flags;
974 SET_REQ_SUCCESS(req);
975 uv__insert_pending_req(loop, (uv_req_t*) req);
976 handle->reqs_pending++;
977 REGISTER_HANDLE_REQ(loop, handle, req);
978 return 0;
979
980 error:
981 uv__free(name_copy);
982
983 if (handle->name) {
984 uv__free(handle->name);
985 handle->name = NULL;
986 }
987
988 if (pipeHandle != INVALID_HANDLE_VALUE)
989 CloseHandle(pipeHandle);
990
991 /* Make this req pending reporting an error. */
992 SET_REQ_ERROR(req, err);
993 uv__insert_pending_req(loop, (uv_req_t*) req);
994 handle->reqs_pending++;
995 REGISTER_HANDLE_REQ(loop, handle, req);
996 return 0;
997 }
998
999
uv__pipe_interrupt_read(uv_pipe_t * handle)1000 void uv__pipe_interrupt_read(uv_pipe_t* handle) {
1001 BOOL r;
1002
1003 if (!(handle->flags & UV_HANDLE_READ_PENDING))
1004 return; /* No pending reads. */
1005 if (handle->flags & UV_HANDLE_CANCELLATION_PENDING)
1006 return; /* Already cancelled. */
1007 if (handle->handle == INVALID_HANDLE_VALUE)
1008 return; /* Pipe handle closed. */
1009
1010 if (!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)) {
1011 /* Cancel asynchronous read. */
1012 r = CancelIoEx(handle->handle, &handle->read_req.u.io.overlapped);
1013 assert(r || GetLastError() == ERROR_NOT_FOUND);
1014 (void) r;
1015 } else {
1016 /* Cancel synchronous read (which is happening in the thread pool). */
1017 HANDLE thread;
1018 volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle;
1019
1020 EnterCriticalSection(&handle->pipe.conn.readfile_thread_lock);
1021
1022 thread = *thread_ptr;
1023 if (thread == NULL) {
1024 /* The thread pool thread has not yet reached the point of blocking, we
1025 * can pre-empt it by setting thread_handle to INVALID_HANDLE_VALUE. */
1026 *thread_ptr = INVALID_HANDLE_VALUE;
1027
1028 } else {
1029 /* Spin until the thread has acknowledged (by setting the thread to
1030 * INVALID_HANDLE_VALUE) that it is past the point of blocking. */
1031 while (thread != INVALID_HANDLE_VALUE) {
1032 r = CancelSynchronousIo(thread);
1033 assert(r || GetLastError() == ERROR_NOT_FOUND);
1034 SwitchToThread(); /* Yield thread. */
1035 thread = *thread_ptr;
1036 }
1037 }
1038
1039 LeaveCriticalSection(&handle->pipe.conn.readfile_thread_lock);
1040 }
1041
1042 /* Set flag to indicate that read has been cancelled. */
1043 handle->flags |= UV_HANDLE_CANCELLATION_PENDING;
1044 }
1045
1046
uv__pipe_read_stop(uv_pipe_t * handle)1047 void uv__pipe_read_stop(uv_pipe_t* handle) {
1048 handle->flags &= ~UV_HANDLE_READING;
1049 DECREASE_ACTIVE_COUNT(handle->loop, handle);
1050 uv__pipe_interrupt_read(handle);
1051 }
1052
1053
1054 /* Cleans up uv_pipe_t (server or connection) and all resources associated with
1055 * it. */
uv__pipe_close(uv_loop_t * loop,uv_pipe_t * handle)1056 void uv__pipe_close(uv_loop_t* loop, uv_pipe_t* handle) {
1057 int i;
1058 HANDLE pipeHandle;
1059
1060 if (handle->flags & UV_HANDLE_READING) {
1061 handle->flags &= ~UV_HANDLE_READING;
1062 DECREASE_ACTIVE_COUNT(loop, handle);
1063 }
1064
1065 if (handle->flags & UV_HANDLE_LISTENING) {
1066 handle->flags &= ~UV_HANDLE_LISTENING;
1067 DECREASE_ACTIVE_COUNT(loop, handle);
1068 }
1069
1070 handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1071
1072 uv__handle_closing(handle);
1073
1074 uv__pipe_interrupt_read(handle);
1075
1076 if (handle->name) {
1077 uv__free(handle->name);
1078 handle->name = NULL;
1079 }
1080
1081 if (handle->flags & UV_HANDLE_PIPESERVER) {
1082 for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
1083 pipeHandle = handle->pipe.serv.accept_reqs[i].pipeHandle;
1084 if (pipeHandle != INVALID_HANDLE_VALUE) {
1085 CloseHandle(pipeHandle);
1086 handle->pipe.serv.accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE;
1087 }
1088 }
1089 handle->handle = INVALID_HANDLE_VALUE;
1090 }
1091
1092 if (handle->flags & UV_HANDLE_CONNECTION) {
1093 eof_timer_destroy(handle);
1094 }
1095
1096 if ((handle->flags & UV_HANDLE_CONNECTION)
1097 && handle->handle != INVALID_HANDLE_VALUE) {
1098 /* This will eventually destroy the write queue for us too. */
1099 close_pipe(handle);
1100 }
1101
1102 if (handle->reqs_pending == 0)
1103 uv__want_endgame(loop, (uv_handle_t*) handle);
1104 }
1105
1106
uv__pipe_queue_accept(uv_loop_t * loop,uv_pipe_t * handle,uv_pipe_accept_t * req,BOOL firstInstance)1107 static void uv__pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
1108 uv_pipe_accept_t* req, BOOL firstInstance) {
1109 assert(handle->flags & UV_HANDLE_LISTENING);
1110
1111 if (!firstInstance && !pipe_alloc_accept(loop, handle, req, FALSE)) {
1112 SET_REQ_ERROR(req, GetLastError());
1113 uv__insert_pending_req(loop, (uv_req_t*) req);
1114 handle->reqs_pending++;
1115 return;
1116 }
1117
1118 assert(req->pipeHandle != INVALID_HANDLE_VALUE);
1119
1120 /* Prepare the overlapped structure. */
1121 memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
1122
1123 if (!ConnectNamedPipe(req->pipeHandle, &req->u.io.overlapped) &&
1124 GetLastError() != ERROR_IO_PENDING) {
1125 if (GetLastError() == ERROR_PIPE_CONNECTED) {
1126 SET_REQ_SUCCESS(req);
1127 } else {
1128 CloseHandle(req->pipeHandle);
1129 req->pipeHandle = INVALID_HANDLE_VALUE;
1130 /* Make this req pending reporting an error. */
1131 SET_REQ_ERROR(req, GetLastError());
1132 }
1133 uv__insert_pending_req(loop, (uv_req_t*) req);
1134 handle->reqs_pending++;
1135 return;
1136 }
1137
1138 /* Wait for completion via IOCP */
1139 handle->reqs_pending++;
1140 }
1141
1142
uv__pipe_accept(uv_pipe_t * server,uv_stream_t * client)1143 int uv__pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
1144 uv_loop_t* loop = server->loop;
1145 uv_pipe_t* pipe_client;
1146 uv_pipe_accept_t* req;
1147 struct uv__queue* q;
1148 uv__ipc_xfer_queue_item_t* item;
1149 int err;
1150
1151 if (server->ipc) {
1152 if (uv__queue_empty(&server->pipe.conn.ipc_xfer_queue)) {
1153 /* No valid pending sockets. */
1154 return WSAEWOULDBLOCK;
1155 }
1156
1157 q = uv__queue_head(&server->pipe.conn.ipc_xfer_queue);
1158 uv__queue_remove(q);
1159 server->pipe.conn.ipc_xfer_queue_length--;
1160 item = uv__queue_data(q, uv__ipc_xfer_queue_item_t, member);
1161
1162 err = uv__tcp_xfer_import(
1163 (uv_tcp_t*) client, item->xfer_type, &item->xfer_info);
1164
1165 uv__free(item);
1166
1167 if (err != 0)
1168 return err;
1169
1170 } else {
1171 pipe_client = (uv_pipe_t*) client;
1172 uv__pipe_connection_init(pipe_client);
1173
1174 /* Find a connection instance that has been connected, but not yet
1175 * accepted. */
1176 req = server->pipe.serv.pending_accepts;
1177
1178 if (!req) {
1179 /* No valid connections found, so we error out. */
1180 return WSAEWOULDBLOCK;
1181 }
1182
1183 /* Initialize the client handle and copy the pipeHandle to the client */
1184 pipe_client->handle = req->pipeHandle;
1185 pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1186
1187 /* Prepare the req to pick up a new connection */
1188 server->pipe.serv.pending_accepts = req->next_pending;
1189 req->next_pending = NULL;
1190 req->pipeHandle = INVALID_HANDLE_VALUE;
1191
1192 server->handle = INVALID_HANDLE_VALUE;
1193 if (!(server->flags & UV_HANDLE_CLOSING)) {
1194 uv__pipe_queue_accept(loop, server, req, FALSE);
1195 }
1196 }
1197
1198 return 0;
1199 }
1200
1201
1202 /* Starts listening for connections for the given pipe. */
uv__pipe_listen(uv_pipe_t * handle,int backlog,uv_connection_cb cb)1203 int uv__pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
1204 uv_loop_t* loop = handle->loop;
1205 int i;
1206
1207 if (handle->flags & UV_HANDLE_LISTENING) {
1208 handle->stream.serv.connection_cb = cb;
1209 }
1210
1211 if (!(handle->flags & UV_HANDLE_BOUND)) {
1212 return WSAEINVAL;
1213 }
1214
1215 if (handle->flags & UV_HANDLE_READING) {
1216 return WSAEISCONN;
1217 }
1218
1219 if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
1220 return ERROR_NOT_SUPPORTED;
1221 }
1222
1223 if (handle->ipc) {
1224 return WSAEINVAL;
1225 }
1226
1227 handle->flags |= UV_HANDLE_LISTENING;
1228 INCREASE_ACTIVE_COUNT(loop, handle);
1229 handle->stream.serv.connection_cb = cb;
1230
1231 /* First pipe handle should have already been created in uv_pipe_bind */
1232 assert(handle->pipe.serv.accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
1233
1234 for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
1235 uv__pipe_queue_accept(loop, handle, &handle->pipe.serv.accept_reqs[i], i == 0);
1236 }
1237
1238 return 0;
1239 }
1240
1241
uv_pipe_zero_readfile_thread_proc(void * arg)1242 static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* arg) {
1243 uv_read_t* req = (uv_read_t*) arg;
1244 uv_pipe_t* handle = (uv_pipe_t*) req->data;
1245 uv_loop_t* loop = handle->loop;
1246 volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle;
1247 CRITICAL_SECTION* lock = &handle->pipe.conn.readfile_thread_lock;
1248 HANDLE thread;
1249 DWORD bytes;
1250 DWORD err;
1251
1252 assert(req->type == UV_READ);
1253 assert(handle->type == UV_NAMED_PIPE);
1254
1255 err = 0;
1256
1257 /* Create a handle to the current thread. */
1258 if (!DuplicateHandle(GetCurrentProcess(),
1259 GetCurrentThread(),
1260 GetCurrentProcess(),
1261 &thread,
1262 0,
1263 FALSE,
1264 DUPLICATE_SAME_ACCESS)) {
1265 err = GetLastError();
1266 goto out1;
1267 }
1268
1269 /* The lock needs to be held when thread handle is modified. */
1270 EnterCriticalSection(lock);
1271 if (*thread_ptr == INVALID_HANDLE_VALUE) {
1272 /* uv__pipe_interrupt_read() cancelled reading before we got here. */
1273 err = ERROR_OPERATION_ABORTED;
1274 } else {
1275 /* Let main thread know which worker thread is doing the blocking read. */
1276 assert(*thread_ptr == NULL);
1277 *thread_ptr = thread;
1278 }
1279 LeaveCriticalSection(lock);
1280
1281 if (err)
1282 goto out2;
1283
1284 /* Block the thread until data is available on the pipe, or the read is
1285 * cancelled. */
1286 if (!ReadFile(handle->handle, &uv_zero_, 0, &bytes, NULL))
1287 err = GetLastError();
1288
1289 /* Let the main thread know the worker is past the point of blocking. */
1290 assert(thread == *thread_ptr);
1291 *thread_ptr = INVALID_HANDLE_VALUE;
1292
1293 /* Briefly acquire the mutex. Since the main thread holds the lock while it
1294 * is spinning trying to cancel this thread's I/O, we will block here until
1295 * it stops doing that. */
1296 EnterCriticalSection(lock);
1297 LeaveCriticalSection(lock);
1298
1299 out2:
1300 /* Close the handle to the current thread. */
1301 CloseHandle(thread);
1302
1303 out1:
1304 /* Set request status and post a completion record to the IOCP. */
1305 if (err)
1306 SET_REQ_ERROR(req, err);
1307 else
1308 SET_REQ_SUCCESS(req);
1309 POST_COMPLETION_FOR_REQ(loop, req);
1310
1311 return 0;
1312 }
1313
1314
uv_pipe_writefile_thread_proc(void * parameter)1315 static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) {
1316 int result;
1317 DWORD bytes;
1318 uv_write_t* req = (uv_write_t*) parameter;
1319 uv_pipe_t* handle = (uv_pipe_t*) req->handle;
1320 uv_loop_t* loop = handle->loop;
1321
1322 assert(req != NULL);
1323 assert(req->type == UV_WRITE);
1324 assert(handle->type == UV_NAMED_PIPE);
1325
1326 result = WriteFile(handle->handle,
1327 req->write_buffer.base,
1328 req->write_buffer.len,
1329 &bytes,
1330 NULL);
1331
1332 if (!result) {
1333 SET_REQ_ERROR(req, GetLastError());
1334 }
1335
1336 POST_COMPLETION_FOR_REQ(loop, req);
1337 return 0;
1338 }
1339
1340
post_completion_read_wait(void * context,BOOLEAN timed_out)1341 static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) {
1342 uv_read_t* req;
1343 uv_tcp_t* handle;
1344
1345 req = (uv_read_t*) context;
1346 assert(req != NULL);
1347 handle = (uv_tcp_t*)req->data;
1348 assert(handle != NULL);
1349 assert(!timed_out);
1350
1351 if (!PostQueuedCompletionStatus(handle->loop->iocp,
1352 req->u.io.overlapped.InternalHigh,
1353 0,
1354 &req->u.io.overlapped)) {
1355 uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
1356 }
1357 }
1358
1359
post_completion_write_wait(void * context,BOOLEAN timed_out)1360 static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) {
1361 uv_write_t* req;
1362 uv_tcp_t* handle;
1363
1364 req = (uv_write_t*) context;
1365 assert(req != NULL);
1366 handle = (uv_tcp_t*)req->handle;
1367 assert(handle != NULL);
1368 assert(!timed_out);
1369
1370 if (!PostQueuedCompletionStatus(handle->loop->iocp,
1371 req->u.io.overlapped.InternalHigh,
1372 0,
1373 &req->u.io.overlapped)) {
1374 uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
1375 }
1376 }
1377
1378
uv__pipe_queue_read(uv_loop_t * loop,uv_pipe_t * handle)1379 static void uv__pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
1380 uv_read_t* req;
1381 int result;
1382
1383 assert(handle->flags & UV_HANDLE_READING);
1384 assert(!(handle->flags & UV_HANDLE_READ_PENDING));
1385
1386 assert(handle->handle != INVALID_HANDLE_VALUE);
1387
1388 req = &handle->read_req;
1389
1390 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1391 handle->pipe.conn.readfile_thread_handle = NULL; /* Reset cancellation. */
1392 if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc,
1393 req,
1394 WT_EXECUTELONGFUNCTION)) {
1395 /* Make this req pending reporting an error. */
1396 SET_REQ_ERROR(req, GetLastError());
1397 goto error;
1398 }
1399 } else {
1400 memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
1401 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1402 assert(req->event_handle != NULL);
1403 req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
1404 }
1405
1406 /* Do 0-read */
1407 result = ReadFile(handle->handle,
1408 &uv_zero_,
1409 0,
1410 NULL,
1411 &req->u.io.overlapped);
1412
1413 if (!result && GetLastError() != ERROR_IO_PENDING) {
1414 /* Make this req pending reporting an error. */
1415 SET_REQ_ERROR(req, GetLastError());
1416 goto error;
1417 }
1418
1419 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1420 if (req->wait_handle == INVALID_HANDLE_VALUE) {
1421 if (!RegisterWaitForSingleObject(&req->wait_handle,
1422 req->event_handle, post_completion_read_wait, (void*) req,
1423 INFINITE, WT_EXECUTEINWAITTHREAD)) {
1424 SET_REQ_ERROR(req, GetLastError());
1425 goto error;
1426 }
1427 }
1428 }
1429 }
1430
1431 /* Start the eof timer if there is one */
1432 eof_timer_start(handle);
1433 handle->flags |= UV_HANDLE_READ_PENDING;
1434 handle->reqs_pending++;
1435 return;
1436
1437 error:
1438 uv__insert_pending_req(loop, (uv_req_t*)req);
1439 handle->flags |= UV_HANDLE_READ_PENDING;
1440 handle->reqs_pending++;
1441 }
1442
1443
uv__pipe_read_start(uv_pipe_t * handle,uv_alloc_cb alloc_cb,uv_read_cb read_cb)1444 int uv__pipe_read_start(uv_pipe_t* handle,
1445 uv_alloc_cb alloc_cb,
1446 uv_read_cb read_cb) {
1447 uv_loop_t* loop = handle->loop;
1448
1449 handle->flags |= UV_HANDLE_READING;
1450 INCREASE_ACTIVE_COUNT(loop, handle);
1451 handle->read_cb = read_cb;
1452 handle->alloc_cb = alloc_cb;
1453
1454 /* If reading was stopped and then started again, there could still be a read
1455 * request pending. */
1456 if (!(handle->flags & UV_HANDLE_READ_PENDING)) {
1457 if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
1458 handle->read_req.event_handle == NULL) {
1459 handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL);
1460 if (handle->read_req.event_handle == NULL) {
1461 uv_fatal_error(GetLastError(), "CreateEvent");
1462 }
1463 }
1464 uv__pipe_queue_read(loop, handle);
1465 }
1466
1467 return 0;
1468 }
1469
1470
uv__insert_non_overlapped_write_req(uv_pipe_t * handle,uv_write_t * req)1471 static void uv__insert_non_overlapped_write_req(uv_pipe_t* handle,
1472 uv_write_t* req) {
1473 req->next_req = NULL;
1474 if (handle->pipe.conn.non_overlapped_writes_tail) {
1475 req->next_req =
1476 handle->pipe.conn.non_overlapped_writes_tail->next_req;
1477 handle->pipe.conn.non_overlapped_writes_tail->next_req = (uv_req_t*)req;
1478 handle->pipe.conn.non_overlapped_writes_tail = req;
1479 } else {
1480 req->next_req = (uv_req_t*)req;
1481 handle->pipe.conn.non_overlapped_writes_tail = req;
1482 }
1483 }
1484
1485
uv_remove_non_overlapped_write_req(uv_pipe_t * handle)1486 static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) {
1487 uv_write_t* req;
1488
1489 if (handle->pipe.conn.non_overlapped_writes_tail) {
1490 req = (uv_write_t*)handle->pipe.conn.non_overlapped_writes_tail->next_req;
1491
1492 if (req == handle->pipe.conn.non_overlapped_writes_tail) {
1493 handle->pipe.conn.non_overlapped_writes_tail = NULL;
1494 } else {
1495 handle->pipe.conn.non_overlapped_writes_tail->next_req =
1496 req->next_req;
1497 }
1498
1499 return req;
1500 } else {
1501 /* queue empty */
1502 return NULL;
1503 }
1504 }
1505
1506
uv__queue_non_overlapped_write(uv_pipe_t * handle)1507 static void uv__queue_non_overlapped_write(uv_pipe_t* handle) {
1508 uv_write_t* req = uv_remove_non_overlapped_write_req(handle);
1509 if (req) {
1510 if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc,
1511 req,
1512 WT_EXECUTELONGFUNCTION)) {
1513 uv_fatal_error(GetLastError(), "QueueUserWorkItem");
1514 }
1515 }
1516 }
1517
1518
uv__build_coalesced_write_req(uv_write_t * user_req,const uv_buf_t bufs[],size_t nbufs,uv_write_t ** req_out,uv_buf_t * write_buf_out)1519 static int uv__build_coalesced_write_req(uv_write_t* user_req,
1520 const uv_buf_t bufs[],
1521 size_t nbufs,
1522 uv_write_t** req_out,
1523 uv_buf_t* write_buf_out) {
1524 /* Pack into a single heap-allocated buffer:
1525 * (a) a uv_write_t structure where libuv stores the actual state.
1526 * (b) a pointer to the original uv_write_t.
1527 * (c) data from all `bufs` entries.
1528 */
1529 char* heap_buffer;
1530 size_t heap_buffer_length, heap_buffer_offset;
1531 uv__coalesced_write_t* coalesced_write_req; /* (a) + (b) */
1532 char* data_start; /* (c) */
1533 size_t data_length;
1534 unsigned int i;
1535
1536 /* Compute combined size of all combined buffers from `bufs`. */
1537 data_length = 0;
1538 for (i = 0; i < nbufs; i++)
1539 data_length += bufs[i].len;
1540
1541 /* The total combined size of data buffers should not exceed UINT32_MAX,
1542 * because WriteFile() won't accept buffers larger than that. */
1543 if (data_length > UINT32_MAX)
1544 return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
1545
1546 /* Compute heap buffer size. */
1547 heap_buffer_length = sizeof *coalesced_write_req + /* (a) + (b) */
1548 data_length; /* (c) */
1549
1550 /* Allocate buffer. */
1551 heap_buffer = uv__malloc(heap_buffer_length);
1552 if (heap_buffer == NULL)
1553 return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
1554
1555 /* Copy uv_write_t information to the buffer. */
1556 coalesced_write_req = (uv__coalesced_write_t*) heap_buffer;
1557 coalesced_write_req->req = *user_req; /* copy (a) */
1558 coalesced_write_req->req.coalesced = 1;
1559 coalesced_write_req->user_req = user_req; /* copy (b) */
1560 heap_buffer_offset = sizeof *coalesced_write_req; /* offset (a) + (b) */
1561
1562 /* Copy data buffers to the heap buffer. */
1563 data_start = &heap_buffer[heap_buffer_offset];
1564 for (i = 0; i < nbufs; i++) {
1565 memcpy(&heap_buffer[heap_buffer_offset],
1566 bufs[i].base,
1567 bufs[i].len); /* copy (c) */
1568 heap_buffer_offset += bufs[i].len; /* offset (c) */
1569 }
1570 assert(heap_buffer_offset == heap_buffer_length);
1571
1572 /* Set out arguments and return. */
1573 *req_out = &coalesced_write_req->req;
1574 *write_buf_out = uv_buf_init(data_start, (unsigned int) data_length);
1575 return 0;
1576 }
1577
1578
uv__pipe_write_data(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t bufs[],size_t nbufs,uv_write_cb cb,int copy_always)1579 static int uv__pipe_write_data(uv_loop_t* loop,
1580 uv_write_t* req,
1581 uv_pipe_t* handle,
1582 const uv_buf_t bufs[],
1583 size_t nbufs,
1584 uv_write_cb cb,
1585 int copy_always) {
1586 int err;
1587 int result;
1588 uv_buf_t write_buf;
1589
1590 assert(handle->handle != INVALID_HANDLE_VALUE);
1591
1592 UV_REQ_INIT(req, UV_WRITE);
1593 req->handle = (uv_stream_t*) handle;
1594 req->send_handle = NULL;
1595 req->cb = cb;
1596 /* Private fields. */
1597 req->coalesced = 0;
1598 req->event_handle = NULL;
1599 req->wait_handle = INVALID_HANDLE_VALUE;
1600
1601 /* Prepare the overlapped structure. */
1602 memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
1603 if (handle->flags & (UV_HANDLE_EMULATE_IOCP | UV_HANDLE_BLOCKING_WRITES)) {
1604 req->event_handle = CreateEvent(NULL, 0, 0, NULL);
1605 if (req->event_handle == NULL) {
1606 uv_fatal_error(GetLastError(), "CreateEvent");
1607 }
1608 req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
1609 }
1610 req->write_buffer = uv_null_buf_;
1611
1612 if (nbufs == 0) {
1613 /* Write empty buffer. */
1614 write_buf = uv_null_buf_;
1615 } else if (nbufs == 1 && !copy_always) {
1616 /* Write directly from bufs[0]. */
1617 write_buf = bufs[0];
1618 } else {
1619 /* Coalesce all `bufs` into one big buffer. This also creates a new
1620 * write-request structure that replaces the old one. */
1621 err = uv__build_coalesced_write_req(req, bufs, nbufs, &req, &write_buf);
1622 if (err != 0)
1623 return err;
1624 }
1625
1626 if ((handle->flags &
1627 (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
1628 (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) {
1629 DWORD bytes;
1630 result =
1631 WriteFile(handle->handle, write_buf.base, write_buf.len, &bytes, NULL);
1632
1633 if (!result) {
1634 err = GetLastError();
1635 return err;
1636 } else {
1637 /* Request completed immediately. */
1638 req->u.io.queued_bytes = 0;
1639 }
1640
1641 REGISTER_HANDLE_REQ(loop, handle, req);
1642 handle->reqs_pending++;
1643 handle->stream.conn.write_reqs_pending++;
1644 POST_COMPLETION_FOR_REQ(loop, req);
1645 return 0;
1646 } else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1647 req->write_buffer = write_buf;
1648 uv__insert_non_overlapped_write_req(handle, req);
1649 if (handle->stream.conn.write_reqs_pending == 0) {
1650 uv__queue_non_overlapped_write(handle);
1651 }
1652
1653 /* Request queued by the kernel. */
1654 req->u.io.queued_bytes = write_buf.len;
1655 handle->write_queue_size += req->u.io.queued_bytes;
1656 } else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
1657 /* Using overlapped IO, but wait for completion before returning */
1658 result = WriteFile(handle->handle,
1659 write_buf.base,
1660 write_buf.len,
1661 NULL,
1662 &req->u.io.overlapped);
1663
1664 if (!result && GetLastError() != ERROR_IO_PENDING) {
1665 err = GetLastError();
1666 CloseHandle(req->event_handle);
1667 req->event_handle = NULL;
1668 return err;
1669 }
1670
1671 if (result) {
1672 /* Request completed immediately. */
1673 req->u.io.queued_bytes = 0;
1674 } else {
1675 /* Request queued by the kernel. */
1676 req->u.io.queued_bytes = write_buf.len;
1677 handle->write_queue_size += req->u.io.queued_bytes;
1678 if (WaitForSingleObject(req->event_handle, INFINITE) !=
1679 WAIT_OBJECT_0) {
1680 err = GetLastError();
1681 CloseHandle(req->event_handle);
1682 req->event_handle = NULL;
1683 return err;
1684 }
1685 }
1686 CloseHandle(req->event_handle);
1687 req->event_handle = NULL;
1688
1689 REGISTER_HANDLE_REQ(loop, handle, req);
1690 handle->reqs_pending++;
1691 handle->stream.conn.write_reqs_pending++;
1692 return 0;
1693 } else {
1694 result = WriteFile(handle->handle,
1695 write_buf.base,
1696 write_buf.len,
1697 NULL,
1698 &req->u.io.overlapped);
1699
1700 if (!result && GetLastError() != ERROR_IO_PENDING) {
1701 return GetLastError();
1702 }
1703
1704 if (result) {
1705 /* Request completed immediately. */
1706 req->u.io.queued_bytes = 0;
1707 } else {
1708 /* Request queued by the kernel. */
1709 req->u.io.queued_bytes = write_buf.len;
1710 handle->write_queue_size += req->u.io.queued_bytes;
1711 }
1712
1713 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1714 if (!RegisterWaitForSingleObject(&req->wait_handle,
1715 req->event_handle, post_completion_write_wait, (void*) req,
1716 INFINITE, WT_EXECUTEINWAITTHREAD)) {
1717 return GetLastError();
1718 }
1719 }
1720 }
1721
1722 REGISTER_HANDLE_REQ(loop, handle, req);
1723 handle->reqs_pending++;
1724 handle->stream.conn.write_reqs_pending++;
1725
1726 return 0;
1727 }
1728
1729
uv__pipe_get_ipc_remote_pid(uv_pipe_t * handle)1730 static DWORD uv__pipe_get_ipc_remote_pid(uv_pipe_t* handle) {
1731 DWORD* pid = &handle->pipe.conn.ipc_remote_pid;
1732
1733 /* If the both ends of the IPC pipe are owned by the same process,
1734 * the remote end pid may not yet be set. If so, do it here.
1735 * TODO: this is weird; it'd probably better to use a handshake. */
1736 if (*pid == 0) {
1737 GetNamedPipeClientProcessId(handle->handle, pid);
1738 if (*pid == GetCurrentProcessId()) {
1739 GetNamedPipeServerProcessId(handle->handle, pid);
1740 }
1741 }
1742
1743 return *pid;
1744 }
1745
1746
uv__pipe_write_ipc(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t data_bufs[],size_t data_buf_count,uv_stream_t * send_handle,uv_write_cb cb)1747 int uv__pipe_write_ipc(uv_loop_t* loop,
1748 uv_write_t* req,
1749 uv_pipe_t* handle,
1750 const uv_buf_t data_bufs[],
1751 size_t data_buf_count,
1752 uv_stream_t* send_handle,
1753 uv_write_cb cb) {
1754 uv_buf_t stack_bufs[6];
1755 uv_buf_t* bufs;
1756 size_t buf_count, buf_index;
1757 uv__ipc_frame_header_t frame_header;
1758 uv__ipc_socket_xfer_type_t xfer_type = UV__IPC_SOCKET_XFER_NONE;
1759 uv__ipc_socket_xfer_info_t xfer_info;
1760 uint64_t data_length;
1761 size_t i;
1762 int err;
1763
1764 /* Compute the combined size of data buffers. */
1765 data_length = 0;
1766 for (i = 0; i < data_buf_count; i++)
1767 data_length += data_bufs[i].len;
1768 if (data_length > UINT32_MAX)
1769 return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
1770
1771 /* Prepare the frame's socket xfer payload. */
1772 if (send_handle != NULL) {
1773 uv_tcp_t* send_tcp_handle = (uv_tcp_t*) send_handle;
1774
1775 /* Verify that `send_handle` it is indeed a tcp handle. */
1776 if (send_tcp_handle->type != UV_TCP)
1777 return ERROR_NOT_SUPPORTED;
1778
1779 /* Export the tcp handle. */
1780 err = uv__tcp_xfer_export(send_tcp_handle,
1781 uv__pipe_get_ipc_remote_pid(handle),
1782 &xfer_type,
1783 &xfer_info);
1784 if (err != 0)
1785 return err;
1786 }
1787
1788 /* Compute the number of uv_buf_t's required. */
1789 buf_count = 1 + data_buf_count; /* Frame header and data buffers. */
1790 if (send_handle != NULL)
1791 buf_count += 1; /* One extra for the socket xfer information. */
1792
1793 /* Use the on-stack buffer array if it is big enough; otherwise allocate
1794 * space for it on the heap. */
1795 if (buf_count < ARRAY_SIZE(stack_bufs)) {
1796 /* Use on-stack buffer array. */
1797 bufs = stack_bufs;
1798 } else {
1799 /* Use heap-allocated buffer array. */
1800 bufs = uv__calloc(buf_count, sizeof(uv_buf_t));
1801 if (bufs == NULL)
1802 return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
1803 }
1804 buf_index = 0;
1805
1806 /* Initialize frame header and add it to the buffers list. */
1807 memset(&frame_header, 0, sizeof frame_header);
1808 bufs[buf_index++] = uv_buf_init((char*) &frame_header, sizeof frame_header);
1809
1810 if (send_handle != NULL) {
1811 /* Add frame header flags. */
1812 switch (xfer_type) {
1813 case UV__IPC_SOCKET_XFER_TCP_CONNECTION:
1814 frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER |
1815 UV__IPC_FRAME_XFER_IS_TCP_CONNECTION;
1816 break;
1817 case UV__IPC_SOCKET_XFER_TCP_SERVER:
1818 frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER;
1819 break;
1820 default:
1821 assert(0); /* Unreachable. */
1822 }
1823 /* Add xfer info buffer. */
1824 bufs[buf_index++] = uv_buf_init((char*) &xfer_info, sizeof xfer_info);
1825 }
1826
1827 if (data_length > 0) {
1828 /* Update frame header. */
1829 frame_header.flags |= UV__IPC_FRAME_HAS_DATA;
1830 frame_header.data_length = (uint32_t) data_length;
1831 /* Add data buffers to buffers list. */
1832 for (i = 0; i < data_buf_count; i++)
1833 bufs[buf_index++] = data_bufs[i];
1834 }
1835
1836 /* Write buffers. We set the `always_copy` flag, so it is not a problem that
1837 * some of the written data lives on the stack. */
1838 err = uv__pipe_write_data(loop, req, handle, bufs, buf_count, cb, 1);
1839
1840 /* If we had to heap-allocate the bufs array, free it now. */
1841 if (bufs != stack_bufs) {
1842 uv__free(bufs);
1843 }
1844
1845 return err;
1846 }
1847
1848
uv__pipe_write(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t bufs[],size_t nbufs,uv_stream_t * send_handle,uv_write_cb cb)1849 int uv__pipe_write(uv_loop_t* loop,
1850 uv_write_t* req,
1851 uv_pipe_t* handle,
1852 const uv_buf_t bufs[],
1853 size_t nbufs,
1854 uv_stream_t* send_handle,
1855 uv_write_cb cb) {
1856 if (handle->ipc) {
1857 /* IPC pipe write: use framing protocol. */
1858 return uv__pipe_write_ipc(loop, req, handle, bufs, nbufs, send_handle, cb);
1859 } else {
1860 /* Non-IPC pipe write: put data on the wire directly. */
1861 assert(send_handle == NULL);
1862 return uv__pipe_write_data(loop, req, handle, bufs, nbufs, cb, 0);
1863 }
1864 }
1865
1866
uv__pipe_read_eof(uv_loop_t * loop,uv_pipe_t * handle,uv_buf_t buf)1867 static void uv__pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
1868 uv_buf_t buf) {
1869 /* If there is an eof timer running, we don't need it any more, so discard
1870 * it. */
1871 eof_timer_destroy(handle);
1872
1873 uv_read_stop((uv_stream_t*) handle);
1874
1875 handle->read_cb((uv_stream_t*) handle, UV_EOF, &buf);
1876 }
1877
1878
uv__pipe_read_error(uv_loop_t * loop,uv_pipe_t * handle,int error,uv_buf_t buf)1879 static void uv__pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
1880 uv_buf_t buf) {
1881 /* If there is an eof timer running, we don't need it any more, so discard
1882 * it. */
1883 eof_timer_destroy(handle);
1884
1885 uv_read_stop((uv_stream_t*) handle);
1886
1887 handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf);
1888 }
1889
1890
uv__pipe_read_error_or_eof(uv_loop_t * loop,uv_pipe_t * handle,int error,uv_buf_t buf)1891 static void uv__pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
1892 int error, uv_buf_t buf) {
1893 if (error == ERROR_BROKEN_PIPE) {
1894 uv__pipe_read_eof(loop, handle, buf);
1895 } else {
1896 uv__pipe_read_error(loop, handle, error, buf);
1897 }
1898 }
1899
1900
uv__pipe_queue_ipc_xfer_info(uv_pipe_t * handle,uv__ipc_socket_xfer_type_t xfer_type,uv__ipc_socket_xfer_info_t * xfer_info)1901 static void uv__pipe_queue_ipc_xfer_info(
1902 uv_pipe_t* handle,
1903 uv__ipc_socket_xfer_type_t xfer_type,
1904 uv__ipc_socket_xfer_info_t* xfer_info) {
1905 uv__ipc_xfer_queue_item_t* item;
1906
1907 item = (uv__ipc_xfer_queue_item_t*) uv__malloc(sizeof(*item));
1908 if (item == NULL)
1909 uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
1910
1911 item->xfer_type = xfer_type;
1912 item->xfer_info = *xfer_info;
1913
1914 uv__queue_insert_tail(&handle->pipe.conn.ipc_xfer_queue, &item->member);
1915 handle->pipe.conn.ipc_xfer_queue_length++;
1916 }
1917
1918
1919 /* Read an exact number of bytes from a pipe. If an error or end-of-file is
1920 * encountered before the requested number of bytes are read, an error is
1921 * returned. */
uv__pipe_read_exactly(HANDLE h,void * buffer,DWORD count)1922 static int uv__pipe_read_exactly(HANDLE h, void* buffer, DWORD count) {
1923 DWORD bytes_read, bytes_read_now;
1924
1925 bytes_read = 0;
1926 while (bytes_read < count) {
1927 if (!ReadFile(h,
1928 (char*) buffer + bytes_read,
1929 count - bytes_read,
1930 &bytes_read_now,
1931 NULL)) {
1932 return GetLastError();
1933 }
1934
1935 bytes_read += bytes_read_now;
1936 }
1937
1938 assert(bytes_read == count);
1939 return 0;
1940 }
1941
1942
uv__pipe_read_data(uv_loop_t * loop,uv_pipe_t * handle,DWORD suggested_bytes,DWORD max_bytes)1943 static DWORD uv__pipe_read_data(uv_loop_t* loop,
1944 uv_pipe_t* handle,
1945 DWORD suggested_bytes,
1946 DWORD max_bytes) {
1947 DWORD bytes_read;
1948 uv_buf_t buf;
1949
1950 /* Ask the user for a buffer to read data into. */
1951 buf = uv_buf_init(NULL, 0);
1952 handle->alloc_cb((uv_handle_t*) handle, suggested_bytes, &buf);
1953 if (buf.base == NULL || buf.len == 0) {
1954 handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
1955 return 0; /* Break out of read loop. */
1956 }
1957
1958 /* Ensure we read at most the smaller of:
1959 * (a) the length of the user-allocated buffer.
1960 * (b) the maximum data length as specified by the `max_bytes` argument.
1961 */
1962 if (max_bytes > buf.len)
1963 max_bytes = buf.len;
1964
1965 /* Read into the user buffer. */
1966 if (!ReadFile(handle->handle, buf.base, max_bytes, &bytes_read, NULL)) {
1967 uv__pipe_read_error_or_eof(loop, handle, GetLastError(), buf);
1968 return 0; /* Break out of read loop. */
1969 }
1970
1971 /* Call the read callback. */
1972 handle->read_cb((uv_stream_t*) handle, bytes_read, &buf);
1973
1974 return bytes_read;
1975 }
1976
1977
uv__pipe_read_ipc(uv_loop_t * loop,uv_pipe_t * handle)1978 static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
1979 uint32_t* data_remaining = &handle->pipe.conn.ipc_data_frame.payload_remaining;
1980 int err;
1981
1982 if (*data_remaining > 0) {
1983 /* Read frame data payload. */
1984 DWORD bytes_read =
1985 uv__pipe_read_data(loop, handle, *data_remaining, *data_remaining);
1986 *data_remaining -= bytes_read;
1987 return bytes_read;
1988
1989 } else {
1990 /* Start of a new IPC frame. */
1991 uv__ipc_frame_header_t frame_header;
1992 uint32_t xfer_flags;
1993 uv__ipc_socket_xfer_type_t xfer_type;
1994 uv__ipc_socket_xfer_info_t xfer_info;
1995
1996 /* Read the IPC frame header. */
1997 err = uv__pipe_read_exactly(
1998 handle->handle, &frame_header, sizeof frame_header);
1999 if (err)
2000 goto error;
2001
2002 /* Validate that flags are valid. */
2003 if ((frame_header.flags & ~UV__IPC_FRAME_VALID_FLAGS) != 0)
2004 goto invalid;
2005 /* Validate that reserved2 is zero. */
2006 if (frame_header.reserved2 != 0)
2007 goto invalid;
2008
2009 /* Parse xfer flags. */
2010 xfer_flags = frame_header.flags & UV__IPC_FRAME_XFER_FLAGS;
2011 if (xfer_flags & UV__IPC_FRAME_HAS_SOCKET_XFER) {
2012 /* Socket coming -- determine the type. */
2013 xfer_type = xfer_flags & UV__IPC_FRAME_XFER_IS_TCP_CONNECTION
2014 ? UV__IPC_SOCKET_XFER_TCP_CONNECTION
2015 : UV__IPC_SOCKET_XFER_TCP_SERVER;
2016 } else if (xfer_flags == 0) {
2017 /* No socket. */
2018 xfer_type = UV__IPC_SOCKET_XFER_NONE;
2019 } else {
2020 /* Invalid flags. */
2021 goto invalid;
2022 }
2023
2024 /* Parse data frame information. */
2025 if (frame_header.flags & UV__IPC_FRAME_HAS_DATA) {
2026 *data_remaining = frame_header.data_length;
2027 } else if (frame_header.data_length != 0) {
2028 /* Data length greater than zero but data flag not set -- invalid. */
2029 goto invalid;
2030 }
2031
2032 /* If no socket xfer info follows, return here. Data will be read in a
2033 * subsequent invocation of uv__pipe_read_ipc(). */
2034 if (xfer_type == UV__IPC_SOCKET_XFER_NONE)
2035 return sizeof frame_header; /* Number of bytes read. */
2036
2037 /* Read transferred socket information. */
2038 err = uv__pipe_read_exactly(handle->handle, &xfer_info, sizeof xfer_info);
2039 if (err)
2040 goto error;
2041
2042 /* Store the pending socket info. */
2043 uv__pipe_queue_ipc_xfer_info(handle, xfer_type, &xfer_info);
2044
2045 /* Return number of bytes read. */
2046 return sizeof frame_header + sizeof xfer_info;
2047 }
2048
2049 invalid:
2050 /* Invalid frame. */
2051 err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */
2052
2053 error:
2054 uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
2055 return 0; /* Break out of read loop. */
2056 }
2057
2058
uv__process_pipe_read_req(uv_loop_t * loop,uv_pipe_t * handle,uv_req_t * req)2059 void uv__process_pipe_read_req(uv_loop_t* loop,
2060 uv_pipe_t* handle,
2061 uv_req_t* req) {
2062 assert(handle->type == UV_NAMED_PIPE);
2063
2064 handle->flags &= ~(UV_HANDLE_READ_PENDING | UV_HANDLE_CANCELLATION_PENDING);
2065 DECREASE_PENDING_REQ_COUNT(handle);
2066 eof_timer_stop(handle);
2067
2068 /* At this point, we're done with bookkeeping. If the user has stopped
2069 * reading the pipe in the meantime, there is nothing left to do, since there
2070 * is no callback that we can call. */
2071 if (!(handle->flags & UV_HANDLE_READING))
2072 return;
2073
2074 if (!REQ_SUCCESS(req)) {
2075 /* An error occurred doing the zero-read. */
2076 DWORD err = GET_REQ_ERROR(req);
2077
2078 /* If the read was cancelled by uv__pipe_interrupt_read(), the request may
2079 * indicate an ERROR_OPERATION_ABORTED error. This error isn't relevant to
2080 * the user; we'll start a new zero-read at the end of this function. */
2081 if (err != ERROR_OPERATION_ABORTED)
2082 uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
2083
2084 } else {
2085 /* The zero-read completed without error, indicating there is data
2086 * available in the kernel buffer. */
2087 DWORD avail;
2088
2089 /* Get the number of bytes available. */
2090 avail = 0;
2091 if (!PeekNamedPipe(handle->handle, NULL, 0, NULL, &avail, NULL))
2092 uv__pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
2093
2094 /* Read until we've either read all the bytes available, or the 'reading'
2095 * flag is cleared. */
2096 while (avail > 0 && handle->flags & UV_HANDLE_READING) {
2097 /* Depending on the type of pipe, read either IPC frames or raw data. */
2098 DWORD bytes_read =
2099 handle->ipc ? uv__pipe_read_ipc(loop, handle)
2100 : uv__pipe_read_data(loop, handle, avail, (DWORD) -1);
2101
2102 /* If no bytes were read, treat this as an indication that an error
2103 * occurred, and break out of the read loop. */
2104 if (bytes_read == 0)
2105 break;
2106
2107 /* It is possible that more bytes were read than we thought were
2108 * available. To prevent `avail` from underflowing, break out of the loop
2109 * if this is the case. */
2110 if (bytes_read > avail)
2111 break;
2112
2113 /* Recompute the number of bytes available. */
2114 avail -= bytes_read;
2115 }
2116 }
2117
2118 /* Start another zero-read request if necessary. */
2119 if ((handle->flags & UV_HANDLE_READING) &&
2120 !(handle->flags & UV_HANDLE_READ_PENDING)) {
2121 uv__pipe_queue_read(loop, handle);
2122 }
2123 }
2124
2125
uv__process_pipe_write_req(uv_loop_t * loop,uv_pipe_t * handle,uv_write_t * req)2126 void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
2127 uv_write_t* req) {
2128 int err;
2129
2130 assert(handle->type == UV_NAMED_PIPE);
2131
2132 assert(handle->write_queue_size >= req->u.io.queued_bytes);
2133 handle->write_queue_size -= req->u.io.queued_bytes;
2134
2135 UNREGISTER_HANDLE_REQ(loop, handle, req);
2136
2137 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
2138 if (req->wait_handle != INVALID_HANDLE_VALUE) {
2139 UnregisterWait(req->wait_handle);
2140 req->wait_handle = INVALID_HANDLE_VALUE;
2141 }
2142 if (req->event_handle) {
2143 CloseHandle(req->event_handle);
2144 req->event_handle = NULL;
2145 }
2146 }
2147
2148 err = GET_REQ_ERROR(req);
2149
2150 /* If this was a coalesced write, extract pointer to the user_provided
2151 * uv_write_t structure so we can pass the expected pointer to the callback,
2152 * then free the heap-allocated write req. */
2153 if (req->coalesced) {
2154 uv__coalesced_write_t* coalesced_write =
2155 container_of(req, uv__coalesced_write_t, req);
2156 req = coalesced_write->user_req;
2157 uv__free(coalesced_write);
2158 }
2159 if (req->cb) {
2160 req->cb(req, uv_translate_sys_error(err));
2161 }
2162
2163 handle->stream.conn.write_reqs_pending--;
2164
2165 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
2166 handle->pipe.conn.non_overlapped_writes_tail) {
2167 assert(handle->stream.conn.write_reqs_pending > 0);
2168 uv__queue_non_overlapped_write(handle);
2169 }
2170
2171 if (handle->stream.conn.write_reqs_pending == 0 &&
2172 uv__is_stream_shutting(handle))
2173 uv__pipe_shutdown(loop, handle, handle->stream.conn.shutdown_req);
2174
2175 DECREASE_PENDING_REQ_COUNT(handle);
2176 }
2177
2178
uv__process_pipe_accept_req(uv_loop_t * loop,uv_pipe_t * handle,uv_req_t * raw_req)2179 void uv__process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
2180 uv_req_t* raw_req) {
2181 uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req;
2182
2183 assert(handle->type == UV_NAMED_PIPE);
2184
2185 if (handle->flags & UV_HANDLE_CLOSING) {
2186 /* The req->pipeHandle should be freed already in uv__pipe_close(). */
2187 assert(req->pipeHandle == INVALID_HANDLE_VALUE);
2188 DECREASE_PENDING_REQ_COUNT(handle);
2189 return;
2190 }
2191
2192 if (REQ_SUCCESS(req)) {
2193 assert(req->pipeHandle != INVALID_HANDLE_VALUE);
2194 req->next_pending = handle->pipe.serv.pending_accepts;
2195 handle->pipe.serv.pending_accepts = req;
2196
2197 if (handle->stream.serv.connection_cb) {
2198 handle->stream.serv.connection_cb((uv_stream_t*)handle, 0);
2199 }
2200 } else {
2201 if (req->pipeHandle != INVALID_HANDLE_VALUE) {
2202 CloseHandle(req->pipeHandle);
2203 req->pipeHandle = INVALID_HANDLE_VALUE;
2204 }
2205 if (!(handle->flags & UV_HANDLE_CLOSING)) {
2206 uv__pipe_queue_accept(loop, handle, req, FALSE);
2207 }
2208 }
2209
2210 DECREASE_PENDING_REQ_COUNT(handle);
2211 }
2212
2213
uv__process_pipe_connect_req(uv_loop_t * loop,uv_pipe_t * handle,uv_connect_t * req)2214 void uv__process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
2215 uv_connect_t* req) {
2216 HANDLE pipeHandle;
2217 DWORD duplex_flags;
2218 int err;
2219
2220 assert(handle->type == UV_NAMED_PIPE);
2221
2222 UNREGISTER_HANDLE_REQ(loop, handle, req);
2223
2224 err = 0;
2225 if (REQ_SUCCESS(req)) {
2226 pipeHandle = req->u.connect.pipeHandle;
2227 duplex_flags = req->u.connect.duplex_flags;
2228 if (handle->flags & UV_HANDLE_CLOSING)
2229 err = UV_ECANCELED;
2230 else
2231 err = uv__set_pipe_handle(loop, handle, pipeHandle, -1, duplex_flags);
2232 if (err)
2233 CloseHandle(pipeHandle);
2234 } else {
2235 err = uv_translate_sys_error(GET_REQ_ERROR(req));
2236 }
2237
2238 if (req->cb)
2239 req->cb(req, err);
2240
2241 DECREASE_PENDING_REQ_COUNT(handle);
2242 }
2243
2244
2245
uv__process_pipe_shutdown_req(uv_loop_t * loop,uv_pipe_t * handle,uv_shutdown_t * req)2246 void uv__process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
2247 uv_shutdown_t* req) {
2248 int err;
2249
2250 assert(handle->type == UV_NAMED_PIPE);
2251
2252 /* Clear the shutdown_req field so we don't go here again. */
2253 handle->stream.conn.shutdown_req = NULL;
2254 UNREGISTER_HANDLE_REQ(loop, handle, req);
2255
2256 if (handle->flags & UV_HANDLE_CLOSING) {
2257 /* Already closing. Cancel the shutdown. */
2258 err = UV_ECANCELED;
2259 } else if (!REQ_SUCCESS(req)) {
2260 /* An error occurred in trying to shutdown gracefully. */
2261 err = uv_translate_sys_error(GET_REQ_ERROR(req));
2262 } else {
2263 if (handle->flags & UV_HANDLE_READABLE) {
2264 /* Initialize and optionally start the eof timer. Only do this if the pipe
2265 * is readable and we haven't seen EOF come in ourselves. */
2266 eof_timer_init(handle);
2267
2268 /* If reading start the timer right now. Otherwise uv__pipe_queue_read will
2269 * start it. */
2270 if (handle->flags & UV_HANDLE_READ_PENDING) {
2271 eof_timer_start(handle);
2272 }
2273
2274 } else {
2275 /* This pipe is not readable. We can just close it to let the other end
2276 * know that we're done writing. */
2277 close_pipe(handle);
2278 }
2279 err = 0;
2280 }
2281
2282 if (req->cb)
2283 req->cb(req, err);
2284
2285 DECREASE_PENDING_REQ_COUNT(handle);
2286 }
2287
2288
eof_timer_init(uv_pipe_t * pipe)2289 static void eof_timer_init(uv_pipe_t* pipe) {
2290 int r;
2291
2292 assert(pipe->pipe.conn.eof_timer == NULL);
2293 assert(pipe->flags & UV_HANDLE_CONNECTION);
2294
2295 pipe->pipe.conn.eof_timer = (uv_timer_t*) uv__malloc(sizeof *pipe->pipe.conn.eof_timer);
2296
2297 r = uv_timer_init(pipe->loop, pipe->pipe.conn.eof_timer);
2298 assert(r == 0); /* timers can't fail */
2299 (void) r;
2300 pipe->pipe.conn.eof_timer->data = pipe;
2301 uv_unref((uv_handle_t*) pipe->pipe.conn.eof_timer);
2302 }
2303
2304
eof_timer_start(uv_pipe_t * pipe)2305 static void eof_timer_start(uv_pipe_t* pipe) {
2306 assert(pipe->flags & UV_HANDLE_CONNECTION);
2307
2308 if (pipe->pipe.conn.eof_timer != NULL) {
2309 uv_timer_start(pipe->pipe.conn.eof_timer, eof_timer_cb, eof_timeout, 0);
2310 }
2311 }
2312
2313
eof_timer_stop(uv_pipe_t * pipe)2314 static void eof_timer_stop(uv_pipe_t* pipe) {
2315 assert(pipe->flags & UV_HANDLE_CONNECTION);
2316
2317 if (pipe->pipe.conn.eof_timer != NULL) {
2318 uv_timer_stop(pipe->pipe.conn.eof_timer);
2319 }
2320 }
2321
2322
eof_timer_cb(uv_timer_t * timer)2323 static void eof_timer_cb(uv_timer_t* timer) {
2324 uv_pipe_t* pipe = (uv_pipe_t*) timer->data;
2325 uv_loop_t* loop = timer->loop;
2326
2327 assert(pipe->type == UV_NAMED_PIPE);
2328
2329 /* This should always be true, since we start the timer only in
2330 * uv__pipe_queue_read after successfully calling ReadFile, or in
2331 * uv__process_pipe_shutdown_req if a read is pending, and we always
2332 * immediately stop the timer in uv__process_pipe_read_req. */
2333 assert(pipe->flags & UV_HANDLE_READ_PENDING);
2334
2335 /* If there are many packets coming off the iocp then the timer callback may
2336 * be called before the read request is coming off the queue. Therefore we
2337 * check here if the read request has completed but will be processed later.
2338 */
2339 if ((pipe->flags & UV_HANDLE_READ_PENDING) &&
2340 HasOverlappedIoCompleted(&pipe->read_req.u.io.overlapped)) {
2341 return;
2342 }
2343
2344 /* Force both ends off the pipe. */
2345 close_pipe(pipe);
2346
2347 /* Stop reading, so the pending read that is going to fail will not be
2348 * reported to the user. */
2349 uv_read_stop((uv_stream_t*) pipe);
2350
2351 /* Report the eof and update flags. This will get reported even if the user
2352 * stopped reading in the meantime. TODO: is that okay? */
2353 uv__pipe_read_eof(loop, pipe, uv_null_buf_);
2354 }
2355
2356
eof_timer_destroy(uv_pipe_t * pipe)2357 static void eof_timer_destroy(uv_pipe_t* pipe) {
2358 assert(pipe->flags & UV_HANDLE_CONNECTION);
2359
2360 if (pipe->pipe.conn.eof_timer) {
2361 uv_close((uv_handle_t*) pipe->pipe.conn.eof_timer, eof_timer_close_cb);
2362 pipe->pipe.conn.eof_timer = NULL;
2363 }
2364 }
2365
2366
eof_timer_close_cb(uv_handle_t * handle)2367 static void eof_timer_close_cb(uv_handle_t* handle) {
2368 assert(handle->type == UV_TIMER);
2369 uv__free(handle);
2370 }
2371
2372
uv_pipe_open(uv_pipe_t * pipe,uv_file file)2373 int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
2374 HANDLE os_handle = uv__get_osfhandle(file);
2375 NTSTATUS nt_status;
2376 IO_STATUS_BLOCK io_status;
2377 FILE_ACCESS_INFORMATION access;
2378 DWORD duplex_flags = 0;
2379 int err;
2380
2381 if (os_handle == INVALID_HANDLE_VALUE)
2382 return UV_EBADF;
2383 if (pipe->flags & UV_HANDLE_PIPESERVER)
2384 return UV_EINVAL;
2385 if (pipe->flags & UV_HANDLE_CONNECTION)
2386 return UV_EBUSY;
2387
2388 uv__pipe_connection_init(pipe);
2389 uv__once_init();
2390 /* In order to avoid closing a stdio file descriptor 0-2, duplicate the
2391 * underlying OS handle and forget about the original fd.
2392 * We could also opt to use the original OS handle and just never close it,
2393 * but then there would be no reliable way to cancel pending read operations
2394 * upon close.
2395 */
2396 if (file <= 2) {
2397 if (!DuplicateHandle(INVALID_HANDLE_VALUE,
2398 os_handle,
2399 INVALID_HANDLE_VALUE,
2400 &os_handle,
2401 0,
2402 FALSE,
2403 DUPLICATE_SAME_ACCESS))
2404 return uv_translate_sys_error(GetLastError());
2405 assert(os_handle != INVALID_HANDLE_VALUE);
2406 file = -1;
2407 }
2408
2409 /* Determine what kind of permissions we have on this handle.
2410 * Cygwin opens the pipe in message mode, but we can support it,
2411 * just query the access flags and set the stream flags accordingly.
2412 */
2413 nt_status = pNtQueryInformationFile(os_handle,
2414 &io_status,
2415 &access,
2416 sizeof(access),
2417 FileAccessInformation);
2418 if (nt_status != STATUS_SUCCESS)
2419 return UV_EINVAL;
2420
2421 if (pipe->ipc) {
2422 if (!(access.AccessFlags & FILE_WRITE_DATA) ||
2423 !(access.AccessFlags & FILE_READ_DATA)) {
2424 return UV_EINVAL;
2425 }
2426 }
2427
2428 if (access.AccessFlags & FILE_WRITE_DATA)
2429 duplex_flags |= UV_HANDLE_WRITABLE;
2430 if (access.AccessFlags & FILE_READ_DATA)
2431 duplex_flags |= UV_HANDLE_READABLE;
2432
2433 err = uv__set_pipe_handle(pipe->loop,
2434 pipe,
2435 os_handle,
2436 file,
2437 duplex_flags);
2438 if (err) {
2439 if (file == -1)
2440 CloseHandle(os_handle);
2441 return err;
2442 }
2443
2444 if (pipe->ipc) {
2445 assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
2446 GetNamedPipeClientProcessId(os_handle, &pipe->pipe.conn.ipc_remote_pid);
2447 if (pipe->pipe.conn.ipc_remote_pid == GetCurrentProcessId()) {
2448 GetNamedPipeServerProcessId(os_handle, &pipe->pipe.conn.ipc_remote_pid);
2449 }
2450 assert(pipe->pipe.conn.ipc_remote_pid != (DWORD)(uv_pid_t) -1);
2451 }
2452 return 0;
2453 }
2454
2455
uv__pipe_getname(const uv_pipe_t * handle,char * buffer,size_t * size)2456 static int uv__pipe_getname(const uv_pipe_t* handle, char* buffer, size_t* size) {
2457 NTSTATUS nt_status;
2458 IO_STATUS_BLOCK io_status;
2459 FILE_NAME_INFORMATION tmp_name_info;
2460 FILE_NAME_INFORMATION* name_info;
2461 WCHAR* name_buf;
2462 unsigned int name_size;
2463 unsigned int name_len;
2464 int err;
2465
2466 uv__once_init();
2467 name_info = NULL;
2468
2469 if (handle->name != NULL) {
2470 /* The user might try to query the name before we are connected,
2471 * and this is just easier to return the cached value if we have it. */
2472 return uv__copy_utf16_to_utf8(handle->name, -1, buffer, size);
2473 }
2474
2475 if (handle->handle == INVALID_HANDLE_VALUE) {
2476 *size = 0;
2477 return UV_EINVAL;
2478 }
2479
2480 /* NtQueryInformationFile will block if another thread is performing a
2481 * blocking operation on the queried handle. If the pipe handle is
2482 * synchronous, there may be a worker thread currently calling ReadFile() on
2483 * the pipe handle, which could cause a deadlock. To avoid this, interrupt
2484 * the read. */
2485 if (handle->flags & UV_HANDLE_CONNECTION &&
2486 handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
2487 uv__pipe_interrupt_read((uv_pipe_t*) handle); /* cast away const warning */
2488 }
2489
2490 nt_status = pNtQueryInformationFile(handle->handle,
2491 &io_status,
2492 &tmp_name_info,
2493 sizeof tmp_name_info,
2494 FileNameInformation);
2495 if (nt_status == STATUS_BUFFER_OVERFLOW) {
2496 name_size = sizeof(*name_info) + tmp_name_info.FileNameLength;
2497 name_info = uv__malloc(name_size);
2498 if (!name_info) {
2499 *size = 0;
2500 return UV_ENOMEM;
2501 }
2502
2503 nt_status = pNtQueryInformationFile(handle->handle,
2504 &io_status,
2505 name_info,
2506 name_size,
2507 FileNameInformation);
2508 }
2509
2510 if (nt_status != STATUS_SUCCESS) {
2511 *size = 0;
2512 err = uv_translate_sys_error(pRtlNtStatusToDosError(nt_status));
2513 goto error;
2514 }
2515
2516 if (!name_info) {
2517 /* the struct on stack was used */
2518 name_buf = tmp_name_info.FileName;
2519 name_len = tmp_name_info.FileNameLength;
2520 } else {
2521 name_buf = name_info->FileName;
2522 name_len = name_info->FileNameLength;
2523 }
2524
2525 if (name_len == 0) {
2526 *size = 0;
2527 err = 0;
2528 goto error;
2529 }
2530
2531 name_len /= sizeof(WCHAR);
2532
2533 /* "\\\\.\\pipe" + name */
2534 if (*size < pipe_prefix_len) {
2535 *size = 0;
2536 }
2537 else {
2538 memcpy(buffer, pipe_prefix, pipe_prefix_len);
2539 *size -= pipe_prefix_len;
2540 }
2541 err = uv__copy_utf16_to_utf8(name_buf, name_len, buffer+pipe_prefix_len, size);
2542 *size += pipe_prefix_len;
2543
2544 error:
2545 uv__free(name_info);
2546 return err;
2547 }
2548
2549
uv_pipe_pending_count(uv_pipe_t * handle)2550 int uv_pipe_pending_count(uv_pipe_t* handle) {
2551 if (!handle->ipc)
2552 return 0;
2553 return handle->pipe.conn.ipc_xfer_queue_length;
2554 }
2555
2556
uv_pipe_getsockname(const uv_pipe_t * handle,char * buffer,size_t * size)2557 int uv_pipe_getsockname(const uv_pipe_t* handle, char* buffer, size_t* size) {
2558 if (handle->flags & UV_HANDLE_BOUND)
2559 return uv__pipe_getname(handle, buffer, size);
2560
2561 if (handle->flags & UV_HANDLE_CONNECTION ||
2562 handle->handle != INVALID_HANDLE_VALUE) {
2563 *size = 0;
2564 return 0;
2565 }
2566
2567 return UV_EBADF;
2568 }
2569
2570
uv_pipe_getpeername(const uv_pipe_t * handle,char * buffer,size_t * size)2571 int uv_pipe_getpeername(const uv_pipe_t* handle, char* buffer, size_t* size) {
2572 /* emulate unix behaviour */
2573 if (handle->flags & UV_HANDLE_BOUND)
2574 return UV_ENOTCONN;
2575
2576 if (handle->handle != INVALID_HANDLE_VALUE)
2577 return uv__pipe_getname(handle, buffer, size);
2578
2579 if (handle->flags & UV_HANDLE_CONNECTION) {
2580 if (handle->name != NULL)
2581 return uv__pipe_getname(handle, buffer, size);
2582 }
2583
2584 return UV_EBADF;
2585 }
2586
2587
uv_pipe_pending_type(uv_pipe_t * handle)2588 uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) {
2589 if (!handle->ipc)
2590 return UV_UNKNOWN_HANDLE;
2591 if (handle->pipe.conn.ipc_xfer_queue_length == 0)
2592 return UV_UNKNOWN_HANDLE;
2593 else
2594 return UV_TCP;
2595 }
2596
uv_pipe_chmod(uv_pipe_t * handle,int mode)2597 int uv_pipe_chmod(uv_pipe_t* handle, int mode) {
2598 SID_IDENTIFIER_AUTHORITY sid_world = { SECURITY_WORLD_SID_AUTHORITY };
2599 PACL old_dacl, new_dacl;
2600 PSECURITY_DESCRIPTOR sd;
2601 EXPLICIT_ACCESS ea;
2602 PSID everyone;
2603 int error;
2604
2605 if (handle == NULL || handle->handle == INVALID_HANDLE_VALUE)
2606 return UV_EBADF;
2607
2608 if (mode != UV_READABLE &&
2609 mode != UV_WRITABLE &&
2610 mode != (UV_WRITABLE | UV_READABLE))
2611 return UV_EINVAL;
2612
2613 if (!AllocateAndInitializeSid(&sid_world,
2614 1,
2615 SECURITY_WORLD_RID,
2616 0, 0, 0, 0, 0, 0, 0,
2617 &everyone)) {
2618 error = GetLastError();
2619 goto done;
2620 }
2621
2622 if (GetSecurityInfo(handle->handle,
2623 SE_KERNEL_OBJECT,
2624 DACL_SECURITY_INFORMATION,
2625 NULL,
2626 NULL,
2627 &old_dacl,
2628 NULL,
2629 &sd)) {
2630 error = GetLastError();
2631 goto clean_sid;
2632 }
2633
2634 memset(&ea, 0, sizeof(EXPLICIT_ACCESS));
2635 if (mode & UV_READABLE)
2636 ea.grfAccessPermissions |= GENERIC_READ | FILE_WRITE_ATTRIBUTES;
2637 if (mode & UV_WRITABLE)
2638 ea.grfAccessPermissions |= GENERIC_WRITE | FILE_READ_ATTRIBUTES;
2639 ea.grfAccessPermissions |= SYNCHRONIZE;
2640 ea.grfAccessMode = SET_ACCESS;
2641 ea.grfInheritance = NO_INHERITANCE;
2642 ea.Trustee.TrusteeForm = TRUSTEE_IS_SID;
2643 ea.Trustee.TrusteeType = TRUSTEE_IS_WELL_KNOWN_GROUP;
2644 ea.Trustee.ptstrName = (LPTSTR)everyone;
2645
2646 if (SetEntriesInAcl(1, &ea, old_dacl, &new_dacl)) {
2647 error = GetLastError();
2648 goto clean_sd;
2649 }
2650
2651 if (SetSecurityInfo(handle->handle,
2652 SE_KERNEL_OBJECT,
2653 DACL_SECURITY_INFORMATION,
2654 NULL,
2655 NULL,
2656 new_dacl,
2657 NULL)) {
2658 error = GetLastError();
2659 goto clean_dacl;
2660 }
2661
2662 error = 0;
2663
2664 clean_dacl:
2665 LocalFree((HLOCAL) new_dacl);
2666 clean_sd:
2667 LocalFree((HLOCAL) sd);
2668 clean_sid:
2669 FreeSid(everyone);
2670 done:
2671 return uv_translate_sys_error(error);
2672 }
2673