Lines Matching refs:stream
47 uv_stream_t* stream; member
74 static void uv__write(uv_stream_t* stream);
75 static void uv__read(uv_stream_t* stream);
77 static void uv__write_callbacks(uv_stream_t* stream);
79 static void uv__drain(uv_stream_t* stream);
83 uv_stream_t* stream, in uv__stream_init() argument
87 uv__handle_init(loop, (uv_handle_t*)stream, type); in uv__stream_init()
88 stream->read_cb = NULL; in uv__stream_init()
89 stream->alloc_cb = NULL; in uv__stream_init()
90 stream->close_cb = NULL; in uv__stream_init()
91 stream->connection_cb = NULL; in uv__stream_init()
92 stream->connect_req = NULL; in uv__stream_init()
93 stream->shutdown_req = NULL; in uv__stream_init()
94 stream->accepted_fd = -1; in uv__stream_init()
95 stream->queued_fds = NULL; in uv__stream_init()
96 stream->delayed_error = 0; in uv__stream_init()
97 uv__queue_init(&stream->write_queue); in uv__stream_init()
98 uv__queue_init(&stream->write_completed_queue); in uv__stream_init()
99 stream->write_queue_size = 0; in uv__stream_init()
113 stream->select = NULL; in uv__stream_init()
116 uv__io_init(&stream->io_watcher, uv__stream_io, -1); in uv__stream_init()
120 static void uv__stream_osx_interrupt_select(uv_stream_t* stream) { in uv__stream_osx_interrupt_select() argument
126 s = stream->select; in uv__stream_osx_interrupt_select()
147 uv_stream_t* stream; in uv__stream_osx_select() local
155 stream = arg; in uv__stream_osx_select()
156 s = stream->select; in uv__stream_osx_select()
173 if (uv__io_active(&stream->io_watcher, POLLIN)) in uv__stream_osx_select()
175 if (uv__io_active(&stream->io_watcher, POLLOUT)) in uv__stream_osx_select()
228 assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING)); in uv__stream_osx_select()
236 uv_stream_t* stream; in uv__stream_osx_select_cb() local
240 stream = s->stream; in uv__stream_osx_select_cb()
250 if ((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN)) in uv__stream_osx_select_cb()
251 uv__stream_io(stream->loop, &stream->io_watcher, POLLIN); in uv__stream_osx_select_cb()
253 if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT)) in uv__stream_osx_select_cb()
254 uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT); in uv__stream_osx_select_cb()
256 if (stream->flags & UV_HANDLE_CLOSING) in uv__stream_osx_select_cb()
274 int uv__stream_try_select(uv_stream_t* stream, int* fd) { in uv__stream_try_select() argument
346 err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb); in uv__stream_try_select()
365 s->stream = stream; in uv__stream_try_select()
366 stream->select = s; in uv__stream_try_select()
369 err = uv_thread_create(&s->thread, uv__stream_osx_select, stream); in uv__stream_try_select()
376 s->stream = NULL; in uv__stream_try_select()
377 stream->select = NULL; in uv__stream_try_select()
403 int uv__stream_open(uv_stream_t* stream, int fd, int flags) { in uv__stream_open() argument
408 if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd)) in uv__stream_open()
412 stream->flags |= flags; in uv__stream_open()
414 if (stream->type == UV_TCP) { in uv__stream_open()
415 if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1)) in uv__stream_open()
419 if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) && in uv__stream_open()
434 stream->io_watcher.fd = fd; in uv__stream_open()
440 void uv__stream_flush_write_queue(uv_stream_t* stream, int error) { in uv__stream_flush_write_queue() argument
443 while (!uv__queue_empty(&stream->write_queue)) { in uv__stream_flush_write_queue()
444 q = uv__queue_head(&stream->write_queue); in uv__stream_flush_write_queue()
450 uv__queue_insert_tail(&stream->write_completed_queue, &req->queue); in uv__stream_flush_write_queue()
455 void uv__stream_destroy(uv_stream_t* stream) { in uv__stream_destroy() argument
456 assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT)); in uv__stream_destroy()
457 assert(stream->flags & UV_HANDLE_CLOSED); in uv__stream_destroy()
459 if (stream->connect_req) { in uv__stream_destroy()
460 uv__req_unregister(stream->loop); in uv__stream_destroy()
461 stream->connect_req->cb(stream->connect_req, UV_ECANCELED); in uv__stream_destroy()
462 stream->connect_req = NULL; in uv__stream_destroy()
465 uv__stream_flush_write_queue(stream, UV_ECANCELED); in uv__stream_destroy()
466 uv__write_callbacks(stream); in uv__stream_destroy()
467 uv__drain(stream); in uv__stream_destroy()
469 assert(stream->write_queue_size == 0); in uv__stream_destroy()
509 uv_stream_t* stream; in uv__server_io() local
513 stream = container_of(w, uv_stream_t, io_watcher); in uv__server_io()
515 assert(stream->accepted_fd == -1); in uv__server_io()
516 assert(!(stream->flags & UV_HANDLE_CLOSING)); in uv__server_io()
518 fd = uv__stream_fd(stream); in uv__server_io()
527 stream->accepted_fd = err; in uv__server_io()
528 stream->connection_cb(stream, 0); in uv__server_io()
530 if (stream->accepted_fd != -1) in uv__server_io()
532 uv__io_stop(loop, &stream->io_watcher, POLLIN); in uv__server_io()
601 int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) { in uv_listen() argument
603 if (uv__is_closing(stream)) { in uv_listen()
606 switch (stream->type) { in uv_listen()
608 err = uv__tcp_listen((uv_tcp_t*)stream, backlog, cb); in uv_listen()
612 err = uv__pipe_listen((uv_pipe_t*)stream, backlog, cb); in uv_listen()
620 uv__handle_start(stream); in uv_listen()
626 static void uv__drain(uv_stream_t* stream) { in uv__drain() argument
630 assert(uv__queue_empty(&stream->write_queue)); in uv__drain()
631 if (!(stream->flags & UV_HANDLE_CLOSING)) { in uv__drain()
632 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); in uv__drain()
633 uv__stream_osx_interrupt_select(stream); in uv__drain()
636 if (!uv__is_stream_shutting(stream)) in uv__drain()
639 req = stream->shutdown_req; in uv__drain()
642 if ((stream->flags & UV_HANDLE_CLOSING) || in uv__drain()
643 !(stream->flags & UV_HANDLE_SHUT)) { in uv__drain()
644 stream->shutdown_req = NULL; in uv__drain()
645 uv__req_unregister(stream->loop); in uv__drain()
648 if (stream->flags & UV_HANDLE_CLOSING) in uv__drain()
651 else if (shutdown(uv__stream_fd(stream), SHUT_WR)) in uv__drain()
654 stream->flags |= UV_HANDLE_SHUT; in uv__drain()
688 static int uv__write_req_update(uv_stream_t* stream, in uv__write_req_update() argument
694 assert(n <= stream->write_queue_size); in uv__write_req_update()
695 stream->write_queue_size -= n; in uv__write_req_update()
715 uv_stream_t* stream = req->handle; in uv__write_req_finish() local
735 uv__queue_insert_tail(&stream->write_completed_queue, &req->queue); in uv__write_req_finish()
736 uv__io_feed(stream->loop, &stream->io_watcher); in uv__write_req_finish()
754 static int uv__try_write(uv_stream_t* stream, in uv__try_write() argument
809 n = sendmsg(uv__stream_fd(stream), &msg, 0); in uv__try_write()
813 n = uv__writev(uv__stream_fd(stream), iov, iovcnt); in uv__try_write()
840 static void uv__write(uv_stream_t* stream) { in uv__write() argument
846 assert(uv__stream_fd(stream) >= 0); in uv__write()
855 if (uv__queue_empty(&stream->write_queue)) in uv__write()
858 q = uv__queue_head(&stream->write_queue); in uv__write()
860 assert(req->handle == stream); in uv__write()
862 n = uv__try_write(stream, in uv__write()
870 if (uv__write_req_update(stream, req, n)) { in uv__write()
881 if (stream->flags & UV_HANDLE_BLOCKING_WRITES) in uv__write()
885 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); in uv__write()
888 uv__stream_osx_interrupt_select(stream); in uv__write()
896 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); in uv__write()
897 uv__stream_osx_interrupt_select(stream); in uv__write()
901 static void uv__write_callbacks(uv_stream_t* stream) { in uv__write_callbacks() argument
906 if (uv__queue_empty(&stream->write_completed_queue)) in uv__write_callbacks()
909 uv__queue_move(&stream->write_completed_queue, &pq); in uv__write_callbacks()
916 uv__req_unregister(stream->loop); in uv__write_callbacks()
919 stream->write_queue_size -= uv__write_req_size(req); in uv__write_callbacks()
932 static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) { in uv__stream_eof() argument
933 stream->flags |= UV_HANDLE_READ_EOF; in uv__stream_eof()
934 stream->flags &= ~UV_HANDLE_READING; in uv__stream_eof()
935 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); in uv__stream_eof()
936 uv__handle_stop(stream); in uv__stream_eof()
937 uv__stream_osx_interrupt_select(stream); in uv__stream_eof()
938 stream->read_cb(stream, UV_EOF, buf); in uv__stream_eof()
942 static int uv__stream_queue_fd(uv_stream_t* stream, int fd) { in uv__stream_queue_fd() argument
946 queued_fds = stream->queued_fds; in uv__stream_queue_fd()
955 stream->queued_fds = queued_fds; in uv__stream_queue_fd()
971 stream->queued_fds = queued_fds; in uv__stream_queue_fd()
981 static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) { in uv__stream_recv_cmsg() argument
1010 if (stream->accepted_fd == -1) in uv__stream_recv_cmsg()
1011 stream->accepted_fd = fd; in uv__stream_recv_cmsg()
1013 err = uv__stream_queue_fd(stream, fd); in uv__stream_recv_cmsg()
1025 static void uv__read(uv_stream_t* stream) { in uv__read() argument
1034 stream->flags &= ~UV_HANDLE_READ_PARTIAL; in uv__read()
1041 is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc; in uv__read()
1046 while (stream->read_cb in uv__read()
1047 && (stream->flags & UV_HANDLE_READING) in uv__read()
1049 assert(stream->alloc_cb != NULL); in uv__read()
1052 stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf); in uv__read()
1055 stream->read_cb(stream, UV_ENOBUFS, &buf); in uv__read()
1060 assert(uv__stream_fd(stream) >= 0); in uv__read()
1064 nread = read(uv__stream_fd(stream), buf.base, buf.len); in uv__read()
1079 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0); in uv__read()
1088 if (stream->flags & UV_HANDLE_READING) { in uv__read()
1089 uv__io_start(stream->loop, &stream->io_watcher, POLLIN); in uv__read()
1090 uv__stream_osx_interrupt_select(stream); in uv__read()
1092 stream->read_cb(stream, 0, &buf); in uv__read()
1094 } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) { in uv__read()
1095 uv__stream_eof(stream, &buf); in uv__read()
1100 stream->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); in uv__read()
1101 stream->read_cb(stream, UV__ERR(errno), &buf); in uv__read()
1102 if (stream->flags & UV_HANDLE_READING) { in uv__read()
1103 stream->flags &= ~UV_HANDLE_READING; in uv__read()
1104 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); in uv__read()
1105 uv__handle_stop(stream); in uv__read()
1106 uv__stream_osx_interrupt_select(stream); in uv__read()
1111 uv__stream_eof(stream, &buf); in uv__read()
1118 err = uv__stream_recv_cmsg(stream, &msg); in uv__read()
1120 stream->read_cb(stream, err, &buf); in uv__read()
1137 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0); in uv__read()
1138 err = uv__stream_recv_cmsg(stream, &msg); in uv__read()
1140 stream->read_cb(stream, err, &buf); in uv__read()
1148 stream->read_cb(stream, nread, &buf); in uv__read()
1152 stream->flags |= UV_HANDLE_READ_PARTIAL; in uv__read()
1160 int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) { in uv_shutdown() argument
1161 assert(stream->type == UV_TCP || in uv_shutdown()
1162 stream->type == UV_TTY || in uv_shutdown()
1163 stream->type == UV_NAMED_PIPE); in uv_shutdown()
1165 if (!(stream->flags & UV_HANDLE_WRITABLE) || in uv_shutdown()
1166 stream->flags & UV_HANDLE_SHUT || in uv_shutdown()
1167 uv__is_stream_shutting(stream) || in uv_shutdown()
1168 uv__is_closing(stream)) { in uv_shutdown()
1172 assert(uv__stream_fd(stream) >= 0); in uv_shutdown()
1176 uv__req_init(stream->loop, req, UV_SHUTDOWN); in uv_shutdown()
1177 req->handle = stream; in uv_shutdown()
1179 stream->shutdown_req = req; in uv_shutdown()
1180 stream->flags &= ~UV_HANDLE_WRITABLE; in uv_shutdown()
1182 if (uv__queue_empty(&stream->write_queue)) in uv_shutdown()
1183 uv__io_feed(stream->loop, &stream->io_watcher); in uv_shutdown()
1190 uv_stream_t* stream; in uv__stream_io() local
1192 stream = container_of(w, uv_stream_t, io_watcher); in uv__stream_io()
1194 assert(stream->type == UV_TCP || in uv__stream_io()
1195 stream->type == UV_NAMED_PIPE || in uv__stream_io()
1196 stream->type == UV_TTY); in uv__stream_io()
1197 assert(!(stream->flags & UV_HANDLE_CLOSING)); in uv__stream_io()
1199 if (stream->connect_req) { in uv__stream_io()
1200 uv__stream_connect(stream); in uv__stream_io()
1204 assert(uv__stream_fd(stream) >= 0); in uv__stream_io()
1208 uv__read(stream); in uv__stream_io()
1210 if (uv__stream_fd(stream) == -1) in uv__stream_io()
1220 (stream->flags & UV_HANDLE_READING) && in uv__stream_io()
1221 (stream->flags & UV_HANDLE_READ_PARTIAL) && in uv__stream_io()
1222 !(stream->flags & UV_HANDLE_READ_EOF)) { in uv__stream_io()
1224 uv__stream_eof(stream, &buf); in uv__stream_io()
1227 if (uv__stream_fd(stream) == -1) in uv__stream_io()
1231 uv__write(stream); in uv__stream_io()
1232 uv__write_callbacks(stream); in uv__stream_io()
1235 if (uv__queue_empty(&stream->write_queue)) in uv__stream_io()
1236 uv__drain(stream); in uv__stream_io()
1246 static void uv__stream_connect(uv_stream_t* stream) { in uv__stream_connect() argument
1248 uv_connect_t* req = stream->connect_req; in uv__stream_connect()
1251 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE); in uv__stream_connect()
1254 if (stream->delayed_error) { in uv__stream_connect()
1259 error = stream->delayed_error; in uv__stream_connect()
1260 stream->delayed_error = 0; in uv__stream_connect()
1263 assert(uv__stream_fd(stream) >= 0); in uv__stream_connect()
1264 getsockopt(uv__stream_fd(stream), in uv__stream_connect()
1275 stream->connect_req = NULL; in uv__stream_connect()
1276 uv__req_unregister(stream->loop); in uv__stream_connect()
1278 if (error < 0 || uv__queue_empty(&stream->write_queue)) { in uv__stream_connect()
1279 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); in uv__stream_connect()
1285 if (uv__stream_fd(stream) == -1) in uv__stream_connect()
1289 uv__stream_flush_write_queue(stream, UV_ECANCELED); in uv__stream_connect()
1290 uv__write_callbacks(stream); in uv__stream_connect()
1295 static int uv__check_before_write(uv_stream_t* stream, in uv__check_before_write() argument
1299 assert((stream->type == UV_TCP || in uv__check_before_write()
1300 stream->type == UV_NAMED_PIPE || in uv__check_before_write()
1301 stream->type == UV_TTY) && in uv__check_before_write()
1304 if (uv__stream_fd(stream) < 0) in uv__check_before_write()
1307 if (!(stream->flags & UV_HANDLE_WRITABLE)) in uv__check_before_write()
1311 if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc) in uv__check_before_write()
1334 uv_stream_t* stream, in uv_write2() argument
1342 err = uv__check_before_write(stream, nbufs, send_handle); in uv_write2()
1352 empty_queue = (stream->write_queue_size == 0); in uv_write2()
1355 uv__req_init(stream->loop, req, UV_WRITE); in uv_write2()
1357 req->handle = stream; in uv_write2()
1372 stream->write_queue_size += uv__count_bufs(bufs, nbufs); in uv_write2()
1375 uv__queue_insert_tail(&stream->write_queue, &req->queue); in uv_write2()
1381 if (stream->connect_req) { in uv_write2()
1385 uv__write(stream); in uv_write2()
1393 assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES)); in uv_write2()
1394 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); in uv_write2()
1395 uv__stream_osx_interrupt_select(stream); in uv_write2()
1414 int uv_try_write(uv_stream_t* stream, in uv_try_write() argument
1417 return uv_try_write2(stream, bufs, nbufs, NULL); in uv_try_write()
1421 int uv_try_write2(uv_stream_t* stream, in uv_try_write2() argument
1428 if (stream->connect_req != NULL || stream->write_queue_size != 0) in uv_try_write2()
1431 err = uv__check_before_write(stream, nbufs, NULL); in uv_try_write2()
1435 return uv__try_write(stream, bufs, nbufs, send_handle); in uv_try_write2()
1439 int uv__read_start(uv_stream_t* stream, in uv__read_start() argument
1442 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || in uv__read_start()
1443 stream->type == UV_TTY); in uv__read_start()
1447 stream->flags |= UV_HANDLE_READING; in uv__read_start()
1448 stream->flags &= ~UV_HANDLE_READ_EOF; in uv__read_start()
1451 assert(uv__stream_fd(stream) >= 0); in uv__read_start()
1454 stream->read_cb = read_cb; in uv__read_start()
1455 stream->alloc_cb = alloc_cb; in uv__read_start()
1457 uv__io_start(stream->loop, &stream->io_watcher, POLLIN); in uv__read_start()
1458 uv__handle_start(stream); in uv__read_start()
1459 uv__stream_osx_interrupt_select(stream); in uv__read_start()
1465 int uv_read_stop(uv_stream_t* stream) { in uv_read_stop() argument
1466 if (!(stream->flags & UV_HANDLE_READING)) in uv_read_stop()
1469 stream->flags &= ~UV_HANDLE_READING; in uv_read_stop()
1470 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); in uv_read_stop()
1471 uv__handle_stop(stream); in uv_read_stop()
1472 uv__stream_osx_interrupt_select(stream); in uv_read_stop()
1474 stream->read_cb = NULL; in uv_read_stop()
1475 stream->alloc_cb = NULL; in uv_read_stop()
1480 int uv_is_readable(const uv_stream_t* stream) { in uv_is_readable() argument
1481 return !!(stream->flags & UV_HANDLE_READABLE); in uv_is_readable()
1485 int uv_is_writable(const uv_stream_t* stream) { in uv_is_writable() argument
1486 return !!(stream->flags & UV_HANDLE_WRITABLE); in uv_is_writable()