1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22 #include "uv.h"
23 #include "task.h"
24 #include <stdio.h>
25 #include <stdlib.h>
26
27 typedef struct {
28 uv_write_t req;
29 uv_buf_t buf;
30 } write_req_t;
31
32 static uv_loop_t* loop;
33
34 static int server_closed;
35 static stream_type serverType;
36 static uv_tcp_t tcpServer;
37 static uv_udp_t udpServer;
38 static uv_pipe_t pipeServer;
39 static uv_handle_t* server;
40 static uv_udp_send_t* send_freelist;
41
42 static void after_write(uv_write_t* req, int status);
43 static void after_read(uv_stream_t*, ssize_t nread, const uv_buf_t* buf);
44 static void on_close(uv_handle_t* peer);
45 static void on_server_close(uv_handle_t* handle);
46 static void on_connection(uv_stream_t*, int status);
47
48
after_write(uv_write_t * req,int status)49 static void after_write(uv_write_t* req, int status) {
50 write_req_t* wr;
51
52 /* Free the read/write buffer and the request */
53 wr = (write_req_t*) req;
54 free(wr->buf.base);
55 free(wr);
56
57 if (status == 0)
58 return;
59
60 fprintf(stderr,
61 "uv_write error: %s - %s\n",
62 uv_err_name(status),
63 uv_strerror(status));
64 }
65
66
after_shutdown(uv_shutdown_t * req,int status)67 static void after_shutdown(uv_shutdown_t* req, int status) {
68 ASSERT_OK(status);
69 uv_close((uv_handle_t*) req->handle, on_close);
70 free(req);
71 }
72
73
on_shutdown(uv_shutdown_t * req,int status)74 static void on_shutdown(uv_shutdown_t* req, int status) {
75 ASSERT_OK(status);
76 free(req);
77 }
78
79
after_read(uv_stream_t * handle,ssize_t nread,const uv_buf_t * buf)80 static void after_read(uv_stream_t* handle,
81 ssize_t nread,
82 const uv_buf_t* buf) {
83 int i;
84 write_req_t *wr;
85 uv_shutdown_t* sreq;
86 int shutdown = 0;
87
88 if (nread < 0) {
89 /* Error or EOF */
90 ASSERT_EQ(nread, UV_EOF);
91
92 free(buf->base);
93 sreq = malloc(sizeof* sreq);
94 if (uv_is_writable(handle)) {
95 ASSERT_OK(uv_shutdown(sreq, handle, after_shutdown));
96 }
97 return;
98 }
99
100 if (nread == 0) {
101 /* Everything OK, but nothing read. */
102 free(buf->base);
103 return;
104 }
105
106 /*
107 * Scan for the letter Q which signals that we should quit the server.
108 * If we get QS it means close the stream.
109 * If we get QSS it means shutdown the stream.
110 * If we get QSH it means disable linger before close the socket.
111 */
112 for (i = 0; i < nread; i++) {
113 if (buf->base[i] == 'Q') {
114 if (i + 1 < nread && buf->base[i + 1] == 'S') {
115 int reset = 0;
116 if (i + 2 < nread && buf->base[i + 2] == 'S')
117 shutdown = 1;
118 if (i + 2 < nread && buf->base[i + 2] == 'H')
119 reset = 1;
120 if (reset && handle->type == UV_TCP)
121 ASSERT_OK(uv_tcp_close_reset((uv_tcp_t*) handle, on_close));
122 else if (shutdown)
123 break;
124 else
125 uv_close((uv_handle_t*) handle, on_close);
126 free(buf->base);
127 return;
128 } else if (!server_closed) {
129 uv_close(server, on_server_close);
130 server_closed = 1;
131 }
132 }
133 }
134
135 wr = (write_req_t*) malloc(sizeof *wr);
136 ASSERT_NOT_NULL(wr);
137 wr->buf = uv_buf_init(buf->base, nread);
138
139 if (uv_write(&wr->req, handle, &wr->buf, 1, after_write)) {
140 FATAL("uv_write failed");
141 }
142
143 if (shutdown)
144 ASSERT_OK(uv_shutdown(malloc(sizeof* sreq), handle, on_shutdown));
145 }
146
147
on_close(uv_handle_t * peer)148 static void on_close(uv_handle_t* peer) {
149 free(peer);
150 }
151
152
echo_alloc(uv_handle_t * handle,size_t suggested_size,uv_buf_t * buf)153 static void echo_alloc(uv_handle_t* handle,
154 size_t suggested_size,
155 uv_buf_t* buf) {
156 buf->base = malloc(suggested_size);
157 buf->len = suggested_size;
158 }
159
slab_alloc(uv_handle_t * handle,size_t suggested_size,uv_buf_t * buf)160 static void slab_alloc(uv_handle_t* handle,
161 size_t suggested_size,
162 uv_buf_t* buf) {
163 /* up to 16 datagrams at once */
164 static char slab[16 * 64 * 1024];
165 buf->base = slab;
166 buf->len = sizeof(slab);
167 }
168
on_connection(uv_stream_t * server,int status)169 static void on_connection(uv_stream_t* server, int status) {
170 uv_stream_t* stream;
171 int r;
172
173 if (status != 0) {
174 fprintf(stderr, "Connect error %s\n", uv_err_name(status));
175 }
176 ASSERT_OK(status);
177
178 switch (serverType) {
179 case TCP:
180 stream = malloc(sizeof(uv_tcp_t));
181 ASSERT_NOT_NULL(stream);
182 r = uv_tcp_init(loop, (uv_tcp_t*)stream);
183 ASSERT_OK(r);
184 break;
185
186 case PIPE:
187 stream = malloc(sizeof(uv_pipe_t));
188 ASSERT_NOT_NULL(stream);
189 r = uv_pipe_init(loop, (uv_pipe_t*)stream, 0);
190 ASSERT_OK(r);
191 break;
192
193 default:
194 ASSERT(0 && "Bad serverType");
195 abort();
196 }
197
198 /* associate server with stream */
199 stream->data = server;
200
201 r = uv_accept(server, stream);
202 ASSERT_OK(r);
203
204 r = uv_read_start(stream, echo_alloc, after_read);
205 ASSERT_OK(r);
206 }
207
208
on_server_close(uv_handle_t * handle)209 static void on_server_close(uv_handle_t* handle) {
210 ASSERT_PTR_EQ(handle, server);
211 }
212
send_alloc(void)213 static uv_udp_send_t* send_alloc(void) {
214 uv_udp_send_t* req = send_freelist;
215 if (req != NULL)
216 send_freelist = req->data;
217 else
218 req = malloc(sizeof(*req));
219 return req;
220 }
221
on_send(uv_udp_send_t * req,int status)222 static void on_send(uv_udp_send_t* req, int status) {
223 ASSERT_NOT_NULL(req);
224 ASSERT_OK(status);
225 req->data = send_freelist;
226 send_freelist = req;
227 }
228
on_recv(uv_udp_t * handle,ssize_t nread,const uv_buf_t * rcvbuf,const struct sockaddr * addr,unsigned flags)229 static void on_recv(uv_udp_t* handle,
230 ssize_t nread,
231 const uv_buf_t* rcvbuf,
232 const struct sockaddr* addr,
233 unsigned flags) {
234 uv_buf_t sndbuf;
235 uv_udp_send_t* req;
236
237 if (nread == 0) {
238 /* Everything OK, but nothing read. */
239 return;
240 }
241
242 ASSERT_GT(nread, 0);
243 ASSERT_EQ(addr->sa_family, AF_INET);
244
245 req = send_alloc();
246 ASSERT_NOT_NULL(req);
247 sndbuf = uv_buf_init(rcvbuf->base, nread);
248 ASSERT_LE(0, uv_udp_send(req, handle, &sndbuf, 1, addr, on_send));
249 }
250
tcp4_echo_start(int port)251 static int tcp4_echo_start(int port) {
252 struct sockaddr_in addr;
253 int r;
254
255 ASSERT_OK(uv_ip4_addr("127.0.0.1", port, &addr));
256
257 server = (uv_handle_t*)&tcpServer;
258 serverType = TCP;
259
260 r = uv_tcp_init(loop, &tcpServer);
261 if (r) {
262 /* TODO: Error codes */
263 fprintf(stderr, "Socket creation error\n");
264 return 1;
265 }
266
267 r = uv_tcp_bind(&tcpServer, (const struct sockaddr*) &addr, 0);
268 if (r) {
269 /* TODO: Error codes */
270 fprintf(stderr, "Bind error\n");
271 return 1;
272 }
273
274 r = uv_listen((uv_stream_t*)&tcpServer, SOMAXCONN, on_connection);
275 if (r) {
276 /* TODO: Error codes */
277 fprintf(stderr, "Listen error %s\n", uv_err_name(r));
278 return 1;
279 }
280
281 return 0;
282 }
283
284
tcp6_echo_start(int port)285 static int tcp6_echo_start(int port) {
286 struct sockaddr_in6 addr6;
287 int r;
288
289 ASSERT_OK(uv_ip6_addr("::1", port, &addr6));
290
291 server = (uv_handle_t*)&tcpServer;
292 serverType = TCP;
293
294 r = uv_tcp_init(loop, &tcpServer);
295 if (r) {
296 /* TODO: Error codes */
297 fprintf(stderr, "Socket creation error\n");
298 return 1;
299 }
300
301 /* IPv6 is optional as not all platforms support it */
302 r = uv_tcp_bind(&tcpServer, (const struct sockaddr*) &addr6, 0);
303 if (r) {
304 /* show message but return OK */
305 fprintf(stderr, "IPv6 not supported\n");
306 return 0;
307 }
308
309 r = uv_listen((uv_stream_t*)&tcpServer, SOMAXCONN, on_connection);
310 if (r) {
311 /* TODO: Error codes */
312 fprintf(stderr, "Listen error\n");
313 return 1;
314 }
315
316 return 0;
317 }
318
319
udp4_echo_start(int port)320 static int udp4_echo_start(int port) {
321 struct sockaddr_in addr;
322 int r;
323
324 ASSERT_OK(uv_ip4_addr("127.0.0.1", port, &addr));
325 server = (uv_handle_t*)&udpServer;
326 serverType = UDP;
327
328 r = uv_udp_init(loop, &udpServer);
329 if (r) {
330 fprintf(stderr, "uv_udp_init: %s\n", uv_strerror(r));
331 return 1;
332 }
333
334 r = uv_udp_bind(&udpServer, (const struct sockaddr*) &addr, 0);
335 if (r) {
336 fprintf(stderr, "uv_udp_bind: %s\n", uv_strerror(r));
337 return 1;
338 }
339
340 r = uv_udp_recv_start(&udpServer, slab_alloc, on_recv);
341 if (r) {
342 fprintf(stderr, "uv_udp_recv_start: %s\n", uv_strerror(r));
343 return 1;
344 }
345
346 return 0;
347 }
348
349
pipe_echo_start(char * pipeName)350 static int pipe_echo_start(char* pipeName) {
351 int r;
352
353 #ifndef _WIN32
354 {
355 uv_fs_t req;
356 uv_fs_unlink(NULL, &req, pipeName, NULL);
357 uv_fs_req_cleanup(&req);
358 }
359 #endif
360
361 server = (uv_handle_t*)&pipeServer;
362 serverType = PIPE;
363
364 r = uv_pipe_init(loop, &pipeServer, 0);
365 if (r) {
366 fprintf(stderr, "uv_pipe_init: %s\n", uv_strerror(r));
367 return 1;
368 }
369
370 r = uv_pipe_bind(&pipeServer, pipeName);
371 if (r) {
372 fprintf(stderr, "uv_pipe_bind: %s\n", uv_strerror(r));
373 return 1;
374 }
375
376 r = uv_listen((uv_stream_t*)&pipeServer, SOMAXCONN, on_connection);
377 if (r) {
378 fprintf(stderr, "uv_pipe_listen: %s\n", uv_strerror(r));
379 return 1;
380 }
381
382 return 0;
383 }
384
385
HELPER_IMPL(tcp4_echo_server)386 HELPER_IMPL(tcp4_echo_server) {
387 loop = uv_default_loop();
388
389 if (tcp4_echo_start(TEST_PORT))
390 return 1;
391
392 notify_parent_process();
393 uv_run(loop, UV_RUN_DEFAULT);
394 return 0;
395 }
396
397
HELPER_IMPL(tcp6_echo_server)398 HELPER_IMPL(tcp6_echo_server) {
399 loop = uv_default_loop();
400
401 if (tcp6_echo_start(TEST_PORT))
402 return 1;
403
404 notify_parent_process();
405 uv_run(loop, UV_RUN_DEFAULT);
406 return 0;
407 }
408
409
HELPER_IMPL(pipe_echo_server)410 HELPER_IMPL(pipe_echo_server) {
411 loop = uv_default_loop();
412
413 if (pipe_echo_start(TEST_PIPENAME))
414 return 1;
415
416 notify_parent_process();
417 uv_run(loop, UV_RUN_DEFAULT);
418 return 0;
419 }
420
421
HELPER_IMPL(udp4_echo_server)422 HELPER_IMPL(udp4_echo_server) {
423 loop = uv_default_loop();
424
425 if (udp4_echo_start(TEST_PORT))
426 return 1;
427
428 notify_parent_process();
429 uv_run(loop, UV_RUN_DEFAULT);
430 return 0;
431 }
432