xref: /libuv/test/test-threadpool-cancel.c (revision 8a499e13)
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include "uv.h"
23 #include "task.h"
24 
25 #ifdef _WIN32
26 # define putenv _putenv
27 #endif
28 
29 #define INIT_CANCEL_INFO(ci, what)                                            \
30   do {                                                                        \
31     (ci)->reqs = (what);                                                      \
32     (ci)->nreqs = ARRAY_SIZE(what);                                           \
33     (ci)->stride = sizeof((what)[0]);                                         \
34   }                                                                           \
35   while (0)
36 
37 struct cancel_info {
38   void* reqs;
39   unsigned nreqs;
40   unsigned stride;
41   uv_timer_t timer_handle;
42 };
43 
44 struct random_info {
45   uv_random_t random_req;
46   char buf[1];
47 };
48 
49 static unsigned fs_cb_called;
50 static unsigned done_cb_called;
51 static unsigned done2_cb_called;
52 static unsigned timer_cb_called;
53 static uv_work_t pause_reqs[4];
54 static uv_sem_t pause_sems[ARRAY_SIZE(pause_reqs)];
55 
56 
work_cb(uv_work_t * req)57 static void work_cb(uv_work_t* req) {
58   uv_sem_wait(pause_sems + (req - pause_reqs));
59 }
60 
61 
done_cb(uv_work_t * req,int status)62 static void done_cb(uv_work_t* req, int status) {
63   uv_sem_destroy(pause_sems + (req - pause_reqs));
64 }
65 
66 
saturate_threadpool(void)67 static void saturate_threadpool(void) {
68   uv_loop_t* loop;
69   char buf[64];
70   size_t i;
71 
72   snprintf(buf,
73            sizeof(buf),
74            "UV_THREADPOOL_SIZE=%lu",
75            (unsigned long)ARRAY_SIZE(pause_reqs));
76   putenv(buf);
77 
78   loop = uv_default_loop();
79   for (i = 0; i < ARRAY_SIZE(pause_reqs); i += 1) {
80     ASSERT_OK(uv_sem_init(pause_sems + i, 0));
81     ASSERT_OK(uv_queue_work(loop, pause_reqs + i, work_cb, done_cb));
82   }
83 }
84 
85 
unblock_threadpool(void)86 static void unblock_threadpool(void) {
87   size_t i;
88 
89   for (i = 0; i < ARRAY_SIZE(pause_reqs); i += 1)
90     uv_sem_post(pause_sems + i);
91 }
92 
93 
known_broken(uv_req_t * req)94 static int known_broken(uv_req_t* req) {
95   if (req->type != UV_FS)
96     return 0;
97 
98 #ifdef __linux__
99   /* TODO(bnoordhuis) make cancellation work with io_uring */
100   switch (((uv_fs_t*) req)->fs_type) {
101     case UV_FS_CLOSE:
102     case UV_FS_FDATASYNC:
103     case UV_FS_FSTAT:
104     case UV_FS_FSYNC:
105     case UV_FS_LINK:
106     case UV_FS_LSTAT:
107     case UV_FS_MKDIR:
108     case UV_FS_OPEN:
109     case UV_FS_READ:
110     case UV_FS_RENAME:
111     case UV_FS_STAT:
112     case UV_FS_SYMLINK:
113     case UV_FS_WRITE:
114     case UV_FS_UNLINK:
115       return 1;
116     default:  /* Squelch -Wswitch warnings. */
117       break;
118   }
119 #endif
120 
121   return 0;
122 }
123 
124 
fs_cb(uv_fs_t * req)125 static void fs_cb(uv_fs_t* req) {
126   ASSERT_NE(known_broken((uv_req_t*) req) || \
127       req->result == UV_ECANCELED, 0);
128   uv_fs_req_cleanup(req);
129   fs_cb_called++;
130 }
131 
132 
getaddrinfo_cb(uv_getaddrinfo_t * req,int status,struct addrinfo * res)133 static void getaddrinfo_cb(uv_getaddrinfo_t* req,
134                            int status,
135                            struct addrinfo* res) {
136   ASSERT_EQ(status, UV_EAI_CANCELED);
137   ASSERT_NULL(res);
138   uv_freeaddrinfo(res);  /* Should not crash. */
139 }
140 
141 
getnameinfo_cb(uv_getnameinfo_t * handle,int status,const char * hostname,const char * service)142 static void getnameinfo_cb(uv_getnameinfo_t* handle,
143                            int status,
144                            const char* hostname,
145                            const char* service) {
146   ASSERT_EQ(status, UV_EAI_CANCELED);
147   ASSERT_NULL(hostname);
148   ASSERT_NULL(service);
149 }
150 
151 
work2_cb(uv_work_t * req)152 static void work2_cb(uv_work_t* req) {
153   ASSERT(0 && "work2_cb called");
154 }
155 
156 
done2_cb(uv_work_t * req,int status)157 static void done2_cb(uv_work_t* req, int status) {
158   ASSERT_EQ(status, UV_ECANCELED);
159   done2_cb_called++;
160 }
161 
162 
timer_cb(uv_timer_t * handle)163 static void timer_cb(uv_timer_t* handle) {
164   struct cancel_info* ci;
165   uv_req_t* req;
166   unsigned i;
167 
168   ci = container_of(handle, struct cancel_info, timer_handle);
169 
170   for (i = 0; i < ci->nreqs; i++) {
171     req = (uv_req_t*) ((char*) ci->reqs + i * ci->stride);
172     ASSERT(known_broken(req) || 0 == uv_cancel(req));
173   }
174 
175   uv_close((uv_handle_t*) &ci->timer_handle, NULL);
176   unblock_threadpool();
177   timer_cb_called++;
178 }
179 
180 
nop_done_cb(uv_work_t * req,int status)181 static void nop_done_cb(uv_work_t* req, int status) {
182   ASSERT_EQ(status, UV_ECANCELED);
183   done_cb_called++;
184 }
185 
186 
nop_random_cb(uv_random_t * req,int status,void * buf,size_t len)187 static void nop_random_cb(uv_random_t* req, int status, void* buf, size_t len) {
188   struct random_info* ri;
189 
190   ri = container_of(req, struct random_info, random_req);
191 
192   ASSERT_EQ(status, UV_ECANCELED);
193   ASSERT_PTR_EQ(buf, (void*) ri->buf);
194   ASSERT_EQ(len, sizeof(ri->buf));
195 
196   done_cb_called++;
197 }
198 
199 
TEST_IMPL(threadpool_cancel_getaddrinfo)200 TEST_IMPL(threadpool_cancel_getaddrinfo) {
201   uv_getaddrinfo_t reqs[4];
202   struct cancel_info ci;
203   struct addrinfo hints;
204   uv_loop_t* loop;
205   int r;
206 
207   INIT_CANCEL_INFO(&ci, reqs);
208   loop = uv_default_loop();
209   saturate_threadpool();
210 
211   r = uv_getaddrinfo(loop, reqs + 0, getaddrinfo_cb, "fail", NULL, NULL);
212   ASSERT_OK(r);
213 
214   r = uv_getaddrinfo(loop, reqs + 1, getaddrinfo_cb, NULL, "fail", NULL);
215   ASSERT_OK(r);
216 
217   r = uv_getaddrinfo(loop, reqs + 2, getaddrinfo_cb, "fail", "fail", NULL);
218   ASSERT_OK(r);
219 
220   r = uv_getaddrinfo(loop, reqs + 3, getaddrinfo_cb, "fail", NULL, &hints);
221   ASSERT_OK(r);
222 
223   ASSERT_OK(uv_timer_init(loop, &ci.timer_handle));
224   ASSERT_OK(uv_timer_start(&ci.timer_handle, timer_cb, 10, 0));
225   ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
226   ASSERT_EQ(1, timer_cb_called);
227 
228   MAKE_VALGRIND_HAPPY(loop);
229   return 0;
230 }
231 
232 
TEST_IMPL(threadpool_cancel_getnameinfo)233 TEST_IMPL(threadpool_cancel_getnameinfo) {
234   uv_getnameinfo_t reqs[4];
235   struct sockaddr_in addr4;
236   struct cancel_info ci;
237   uv_loop_t* loop;
238   int r;
239 
240   r = uv_ip4_addr("127.0.0.1", 80, &addr4);
241   ASSERT_OK(r);
242 
243   INIT_CANCEL_INFO(&ci, reqs);
244   loop = uv_default_loop();
245   saturate_threadpool();
246 
247   r = uv_getnameinfo(loop, reqs + 0, getnameinfo_cb, (const struct sockaddr*)&addr4, 0);
248   ASSERT_OK(r);
249 
250   r = uv_getnameinfo(loop, reqs + 1, getnameinfo_cb, (const struct sockaddr*)&addr4, 0);
251   ASSERT_OK(r);
252 
253   r = uv_getnameinfo(loop, reqs + 2, getnameinfo_cb, (const struct sockaddr*)&addr4, 0);
254   ASSERT_OK(r);
255 
256   r = uv_getnameinfo(loop, reqs + 3, getnameinfo_cb, (const struct sockaddr*)&addr4, 0);
257   ASSERT_OK(r);
258 
259   ASSERT_OK(uv_timer_init(loop, &ci.timer_handle));
260   ASSERT_OK(uv_timer_start(&ci.timer_handle, timer_cb, 10, 0));
261   ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
262   ASSERT_EQ(1, timer_cb_called);
263 
264   MAKE_VALGRIND_HAPPY(loop);
265   return 0;
266 }
267 
268 
TEST_IMPL(threadpool_cancel_random)269 TEST_IMPL(threadpool_cancel_random) {
270   struct random_info req;
271   uv_loop_t* loop;
272 
273   saturate_threadpool();
274   loop = uv_default_loop();
275   ASSERT_OK(uv_random(loop,
276                       &req.random_req,
277                       &req.buf,
278                       sizeof(req.buf),
279                       0,
280                       nop_random_cb));
281   ASSERT_OK(uv_cancel((uv_req_t*) &req));
282   ASSERT_OK(done_cb_called);
283   unblock_threadpool();
284   ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
285   ASSERT_EQ(1, done_cb_called);
286 
287   MAKE_VALGRIND_HAPPY(loop);
288   return 0;
289 }
290 
291 
TEST_IMPL(threadpool_cancel_work)292 TEST_IMPL(threadpool_cancel_work) {
293   struct cancel_info ci;
294   uv_work_t reqs[16];
295   uv_loop_t* loop;
296   unsigned i;
297 
298   INIT_CANCEL_INFO(&ci, reqs);
299   loop = uv_default_loop();
300   saturate_threadpool();
301 
302   for (i = 0; i < ARRAY_SIZE(reqs); i++)
303     ASSERT_OK(uv_queue_work(loop, reqs + i, work2_cb, done2_cb));
304 
305   ASSERT_OK(uv_timer_init(loop, &ci.timer_handle));
306   ASSERT_OK(uv_timer_start(&ci.timer_handle, timer_cb, 10, 0));
307   ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
308   ASSERT_EQ(1, timer_cb_called);
309   ASSERT_EQ(ARRAY_SIZE(reqs), done2_cb_called);
310 
311   MAKE_VALGRIND_HAPPY(loop);
312   return 0;
313 }
314 
315 
TEST_IMPL(threadpool_cancel_fs)316 TEST_IMPL(threadpool_cancel_fs) {
317   struct cancel_info ci;
318   uv_fs_t reqs[26];
319   uv_loop_t* loop;
320   unsigned n;
321   uv_buf_t iov;
322 
323   INIT_CANCEL_INFO(&ci, reqs);
324   loop = uv_default_loop();
325   saturate_threadpool();
326   iov = uv_buf_init(NULL, 0);
327 
328   /* Needs to match ARRAY_SIZE(fs_reqs). */
329   n = 0;
330   ASSERT_OK(uv_fs_chmod(loop, reqs + n++, "/", 0, fs_cb));
331   ASSERT_OK(uv_fs_chown(loop, reqs + n++, "/", 0, 0, fs_cb));
332   ASSERT_OK(uv_fs_close(loop, reqs + n++, 0, fs_cb));
333   ASSERT_OK(uv_fs_fchmod(loop, reqs + n++, 0, 0, fs_cb));
334   ASSERT_OK(uv_fs_fchown(loop, reqs + n++, 0, 0, 0, fs_cb));
335   ASSERT_OK(uv_fs_fdatasync(loop, reqs + n++, 0, fs_cb));
336   ASSERT_OK(uv_fs_fstat(loop, reqs + n++, 0, fs_cb));
337   ASSERT_OK(uv_fs_fsync(loop, reqs + n++, 0, fs_cb));
338   ASSERT_OK(uv_fs_ftruncate(loop, reqs + n++, 0, 0, fs_cb));
339   ASSERT_OK(uv_fs_futime(loop, reqs + n++, 0, 0, 0, fs_cb));
340   ASSERT_OK(uv_fs_link(loop, reqs + n++, "/", "/", fs_cb));
341   ASSERT_OK(uv_fs_lstat(loop, reqs + n++, "/", fs_cb));
342   ASSERT_OK(uv_fs_mkdir(loop, reqs + n++, "/", 0, fs_cb));
343   ASSERT_OK(uv_fs_open(loop, reqs + n++, "/", 0, 0, fs_cb));
344   ASSERT_OK(uv_fs_read(loop, reqs + n++, -1, &iov, 1, 0, fs_cb));
345   ASSERT_OK(uv_fs_scandir(loop, reqs + n++, "/", 0, fs_cb));
346   ASSERT_OK(uv_fs_readlink(loop, reqs + n++, "/", fs_cb));
347   ASSERT_OK(uv_fs_realpath(loop, reqs + n++, "/", fs_cb));
348   ASSERT_OK(uv_fs_rename(loop, reqs + n++, "/", "/", fs_cb));
349   ASSERT_OK(uv_fs_mkdir(loop, reqs + n++, "/", 0, fs_cb));
350   ASSERT_OK(uv_fs_sendfile(loop, reqs + n++, 0, 0, 0, 0, fs_cb));
351   ASSERT_OK(uv_fs_stat(loop, reqs + n++, "/", fs_cb));
352   ASSERT_OK(uv_fs_symlink(loop, reqs + n++, "/", "/", 0, fs_cb));
353   ASSERT_OK(uv_fs_unlink(loop, reqs + n++, "/", fs_cb));
354   ASSERT_OK(uv_fs_utime(loop, reqs + n++, "/", 0, 0, fs_cb));
355   ASSERT_OK(uv_fs_write(loop, reqs + n++, -1, &iov, 1, 0, fs_cb));
356   ASSERT_EQ(n, ARRAY_SIZE(reqs));
357 
358   ASSERT_OK(uv_timer_init(loop, &ci.timer_handle));
359   ASSERT_OK(uv_timer_start(&ci.timer_handle, timer_cb, 10, 0));
360   ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
361   ASSERT_EQ(n, fs_cb_called);
362   ASSERT_EQ(1, timer_cb_called);
363 
364 
365   MAKE_VALGRIND_HAPPY(loop);
366   return 0;
367 }
368 
369 
TEST_IMPL(threadpool_cancel_single)370 TEST_IMPL(threadpool_cancel_single) {
371   uv_loop_t* loop;
372   uv_work_t req;
373 
374   saturate_threadpool();
375   loop = uv_default_loop();
376   ASSERT_OK(uv_queue_work(loop, &req, (uv_work_cb) abort, nop_done_cb));
377   ASSERT_OK(uv_cancel((uv_req_t*) &req));
378   ASSERT_OK(done_cb_called);
379   unblock_threadpool();
380   ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
381   ASSERT_EQ(1, done_cb_called);
382 
383   MAKE_VALGRIND_HAPPY(loop);
384   return 0;
385 }
386 
387 
after_busy_cb(uv_work_t * req,int status)388 static void after_busy_cb(uv_work_t* req, int status) {
389   ASSERT_OK(status);
390   done_cb_called++;
391 }
392 
busy_cb(uv_work_t * req)393 static void busy_cb(uv_work_t* req) {
394   uv_sem_post((uv_sem_t*) req->data);
395   /* Assume that calling uv_cancel() takes less than 10ms. */
396   uv_sleep(10);
397 }
398 
TEST_IMPL(threadpool_cancel_when_busy)399 TEST_IMPL(threadpool_cancel_when_busy) {
400   uv_sem_t sem_lock;
401   uv_work_t req;
402 
403   req.data = &sem_lock;
404 
405   ASSERT_OK(uv_sem_init(&sem_lock, 0));
406   ASSERT_OK(uv_queue_work(uv_default_loop(), &req, busy_cb, after_busy_cb));
407 
408   uv_sem_wait(&sem_lock);
409 
410   ASSERT_EQ(uv_cancel((uv_req_t*) &req), UV_EBUSY);
411   ASSERT_OK(uv_run(uv_default_loop(), UV_RUN_DEFAULT));
412   ASSERT_EQ(1, done_cb_called);
413 
414   uv_sem_destroy(&sem_lock);
415 
416   MAKE_VALGRIND_HAPPY(uv_default_loop());
417   return 0;
418 }
419