1 /***************************************************************************
2 * _ _ ____ _
3 * Project ___| | | | _ \| |
4 * / __| | | | |_) | |
5 * | (__| |_| | _ <| |___
6 * \___|\___/|_| \_\_____|
7 *
8 * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
9 *
10 * This software is licensed as described in the file COPYING, which
11 * you should have received as part of this distribution. The terms
12 * are also available at https://curl.se/docs/copyright.html.
13 *
14 * You may opt to use, copy, modify, merge, publish, distribute and/or sell
15 * copies of the Software, and permit persons to whom the Software is
16 * furnished to do so, under the terms of the COPYING file.
17 *
18 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
19 * KIND, either express or implied.
20 *
21 * SPDX-License-Identifier: curl
22 *
23 ***************************************************************************/
24 /* <DESC>
25 * multi socket interface together with libev
26 * </DESC>
27 */
28 /* Example application source code using the multi socket interface to
29 * download many files at once.
30 *
31 * This example features the same basic functionality as hiperfifo.c does,
32 * but this uses libev instead of libevent.
33 *
34 * Written by Jeff Pohlmeyer, converted to use libev by Markus Koetter
35
36 Requires libev and a (POSIX?) system that has mkfifo().
37
38 This is an adaptation of libcurl's "hipev.c" and libevent's "event-test.c"
39 sample programs.
40
41 When running, the program creates the named pipe "hiper.fifo"
42
43 Whenever there is input into the fifo, the program reads the input as a list
44 of URL's and creates some new easy handles to fetch each URL via the
45 curl_multi "hiper" API.
46
47
48 Thus, you can try a single URL:
49 % echo http://www.yahoo.com > hiper.fifo
50
51 Or a whole bunch of them:
52 % cat my-url-list > hiper.fifo
53
54 The fifo buffer is handled almost instantly, so you can even add more URL's
55 while the previous requests are still being downloaded.
56
57 Note:
58 For the sake of simplicity, URL length is limited to 1023 char's !
59
60 This is purely a demo app, all retrieved data is simply discarded by the write
61 callback.
62
63 */
64
65 #include <stdio.h>
66 #include <string.h>
67 #include <stdlib.h>
68 #include <sys/time.h>
69 #include <time.h>
70 #include <unistd.h>
71 #include <sys/poll.h>
72 #include <curl/curl.h>
73 #include <ev.h>
74 #include <fcntl.h>
75 #include <sys/stat.h>
76 #include <errno.h>
77
78 #define DPRINT(x...) printf(x)
79
80 #define MSG_OUT stdout /* Send info to stdout, change to stderr if you want */
81
82
83 /* Global information, common to all connections */
84 typedef struct _GlobalInfo
85 {
86 struct ev_loop *loop;
87 struct ev_io fifo_event;
88 struct ev_timer timer_event;
89 CURLM *multi;
90 int still_running;
91 FILE *input;
92 } GlobalInfo;
93
94
95 /* Information associated with a specific easy handle */
96 typedef struct _ConnInfo
97 {
98 CURL *easy;
99 char *url;
100 GlobalInfo *global;
101 char error[CURL_ERROR_SIZE];
102 } ConnInfo;
103
104
105 /* Information associated with a specific socket */
106 typedef struct _SockInfo
107 {
108 curl_socket_t sockfd;
109 CURL *easy;
110 int action;
111 long timeout;
112 struct ev_io ev;
113 int evset;
114 GlobalInfo *global;
115 } SockInfo;
116
117 static void timer_cb(EV_P_ struct ev_timer *w, int revents);
118
119 /* Update the event timer after curl_multi library calls */
multi_timer_cb(CURLM * multi,long timeout_ms,GlobalInfo * g)120 static int multi_timer_cb(CURLM *multi, long timeout_ms, GlobalInfo *g)
121 {
122 DPRINT("%s %li\n", __PRETTY_FUNCTION__, timeout_ms);
123 ev_timer_stop(g->loop, &g->timer_event);
124 if(timeout_ms >= 0) {
125 /* -1 means delete, other values are timeout times in milliseconds */
126 double t = timeout_ms / 1000;
127 ev_timer_init(&g->timer_event, timer_cb, t, 0.);
128 ev_timer_start(g->loop, &g->timer_event);
129 }
130 return 0;
131 }
132
133 /* Die if we get a bad CURLMcode somewhere */
mcode_or_die(const char * where,CURLMcode code)134 static void mcode_or_die(const char *where, CURLMcode code)
135 {
136 if(CURLM_OK != code) {
137 const char *s;
138 switch(code) {
139 case CURLM_BAD_HANDLE:
140 s = "CURLM_BAD_HANDLE";
141 break;
142 case CURLM_BAD_EASY_HANDLE:
143 s = "CURLM_BAD_EASY_HANDLE";
144 break;
145 case CURLM_OUT_OF_MEMORY:
146 s = "CURLM_OUT_OF_MEMORY";
147 break;
148 case CURLM_INTERNAL_ERROR:
149 s = "CURLM_INTERNAL_ERROR";
150 break;
151 case CURLM_UNKNOWN_OPTION:
152 s = "CURLM_UNKNOWN_OPTION";
153 break;
154 case CURLM_LAST:
155 s = "CURLM_LAST";
156 break;
157 default:
158 s = "CURLM_unknown";
159 break;
160 case CURLM_BAD_SOCKET:
161 s = "CURLM_BAD_SOCKET";
162 fprintf(MSG_OUT, "ERROR: %s returns %s\n", where, s);
163 /* ignore this error */
164 return;
165 }
166 fprintf(MSG_OUT, "ERROR: %s returns %s\n", where, s);
167 exit(code);
168 }
169 }
170
171
172
173 /* Check for completed transfers, and remove their easy handles */
check_multi_info(GlobalInfo * g)174 static void check_multi_info(GlobalInfo *g)
175 {
176 char *eff_url;
177 CURLMsg *msg;
178 int msgs_left;
179 ConnInfo *conn;
180 CURL *easy;
181 CURLcode res;
182
183 fprintf(MSG_OUT, "REMAINING: %d\n", g->still_running);
184 while((msg = curl_multi_info_read(g->multi, &msgs_left))) {
185 if(msg->msg == CURLMSG_DONE) {
186 easy = msg->easy_handle;
187 res = msg->data.result;
188 curl_easy_getinfo(easy, CURLINFO_PRIVATE, &conn);
189 curl_easy_getinfo(easy, CURLINFO_EFFECTIVE_URL, &eff_url);
190 fprintf(MSG_OUT, "DONE: %s => (%d) %s\n", eff_url, res, conn->error);
191 curl_multi_remove_handle(g->multi, easy);
192 free(conn->url);
193 curl_easy_cleanup(easy);
194 free(conn);
195 }
196 }
197 }
198
199
200
201 /* Called by libevent when we get action on a multi socket */
event_cb(EV_P_ struct ev_io * w,int revents)202 static void event_cb(EV_P_ struct ev_io *w, int revents)
203 {
204 DPRINT("%s w %p revents %i\n", __PRETTY_FUNCTION__, w, revents);
205 GlobalInfo *g = (GlobalInfo*) w->data;
206 CURLMcode rc;
207
208 int action = ((revents & EV_READ) ? CURL_POLL_IN : 0) |
209 ((revents & EV_WRITE) ? CURL_POLL_OUT : 0);
210 rc = curl_multi_socket_action(g->multi, w->fd, action, &g->still_running);
211 mcode_or_die("event_cb: curl_multi_socket_action", rc);
212 check_multi_info(g);
213 if(g->still_running <= 0) {
214 fprintf(MSG_OUT, "last transfer done, kill timeout\n");
215 ev_timer_stop(g->loop, &g->timer_event);
216 }
217 }
218
219 /* Called by libevent when our timeout expires */
timer_cb(EV_P_ struct ev_timer * w,int revents)220 static void timer_cb(EV_P_ struct ev_timer *w, int revents)
221 {
222 DPRINT("%s w %p revents %i\n", __PRETTY_FUNCTION__, w, revents);
223
224 GlobalInfo *g = (GlobalInfo *)w->data;
225 CURLMcode rc;
226
227 rc = curl_multi_socket_action(g->multi, CURL_SOCKET_TIMEOUT, 0,
228 &g->still_running);
229 mcode_or_die("timer_cb: curl_multi_socket_action", rc);
230 check_multi_info(g);
231 }
232
233 /* Clean up the SockInfo structure */
remsock(SockInfo * f,GlobalInfo * g)234 static void remsock(SockInfo *f, GlobalInfo *g)
235 {
236 printf("%s \n", __PRETTY_FUNCTION__);
237 if(f) {
238 if(f->evset)
239 ev_io_stop(g->loop, &f->ev);
240 free(f);
241 }
242 }
243
244
245
246 /* Assign information to a SockInfo structure */
setsock(SockInfo * f,curl_socket_t s,CURL * e,int act,GlobalInfo * g)247 static void setsock(SockInfo *f, curl_socket_t s, CURL *e, int act,
248 GlobalInfo *g)
249 {
250 printf("%s \n", __PRETTY_FUNCTION__);
251
252 int kind = ((act & CURL_POLL_IN) ? EV_READ : 0) |
253 ((act & CURL_POLL_OUT) ? EV_WRITE : 0);
254
255 f->sockfd = s;
256 f->action = act;
257 f->easy = e;
258 if(f->evset)
259 ev_io_stop(g->loop, &f->ev);
260 ev_io_init(&f->ev, event_cb, f->sockfd, kind);
261 f->ev.data = g;
262 f->evset = 1;
263 ev_io_start(g->loop, &f->ev);
264 }
265
266
267
268 /* Initialize a new SockInfo structure */
addsock(curl_socket_t s,CURL * easy,int action,GlobalInfo * g)269 static void addsock(curl_socket_t s, CURL *easy, int action, GlobalInfo *g)
270 {
271 SockInfo *fdp = calloc(1, sizeof(SockInfo));
272
273 fdp->global = g;
274 setsock(fdp, s, easy, action, g);
275 curl_multi_assign(g->multi, s, fdp);
276 }
277
278 /* CURLMOPT_SOCKETFUNCTION */
sock_cb(CURL * e,curl_socket_t s,int what,void * cbp,void * sockp)279 static int sock_cb(CURL *e, curl_socket_t s, int what, void *cbp, void *sockp)
280 {
281 DPRINT("%s e %p s %i what %i cbp %p sockp %p\n",
282 __PRETTY_FUNCTION__, e, s, what, cbp, sockp);
283
284 GlobalInfo *g = (GlobalInfo*) cbp;
285 SockInfo *fdp = (SockInfo*) sockp;
286 const char *whatstr[]={ "none", "IN", "OUT", "INOUT", "REMOVE"};
287
288 fprintf(MSG_OUT,
289 "socket callback: s=%d e=%p what=%s ", s, e, whatstr[what]);
290 if(what == CURL_POLL_REMOVE) {
291 fprintf(MSG_OUT, "\n");
292 remsock(fdp, g);
293 }
294 else {
295 if(!fdp) {
296 fprintf(MSG_OUT, "Adding data: %s\n", whatstr[what]);
297 addsock(s, e, what, g);
298 }
299 else {
300 fprintf(MSG_OUT,
301 "Changing action from %s to %s\n",
302 whatstr[fdp->action], whatstr[what]);
303 setsock(fdp, s, e, what, g);
304 }
305 }
306 return 0;
307 }
308
309
310 /* CURLOPT_WRITEFUNCTION */
write_cb(void * ptr,size_t size,size_t nmemb,void * data)311 static size_t write_cb(void *ptr, size_t size, size_t nmemb, void *data)
312 {
313 size_t realsize = size * nmemb;
314 ConnInfo *conn = (ConnInfo*) data;
315 (void)ptr;
316 (void)conn;
317 return realsize;
318 }
319
320
321 /* CURLOPT_PROGRESSFUNCTION */
prog_cb(void * p,double dltotal,double dlnow,double ult,double uln)322 static int prog_cb(void *p, double dltotal, double dlnow, double ult,
323 double uln)
324 {
325 ConnInfo *conn = (ConnInfo *)p;
326 (void)ult;
327 (void)uln;
328
329 fprintf(MSG_OUT, "Progress: %s (%g/%g)\n", conn->url, dlnow, dltotal);
330 return 0;
331 }
332
333
334 /* Create a new easy handle, and add it to the global curl_multi */
new_conn(char * url,GlobalInfo * g)335 static void new_conn(char *url, GlobalInfo *g)
336 {
337 ConnInfo *conn;
338 CURLMcode rc;
339
340 conn = calloc(1, sizeof(ConnInfo));
341 conn->error[0]='\0';
342
343 conn->easy = curl_easy_init();
344 if(!conn->easy) {
345 fprintf(MSG_OUT, "curl_easy_init() failed, exiting!\n");
346 exit(2);
347 }
348 conn->global = g;
349 conn->url = strdup(url);
350 curl_easy_setopt(conn->easy, CURLOPT_URL, conn->url);
351 curl_easy_setopt(conn->easy, CURLOPT_WRITEFUNCTION, write_cb);
352 curl_easy_setopt(conn->easy, CURLOPT_WRITEDATA, conn);
353 curl_easy_setopt(conn->easy, CURLOPT_VERBOSE, 1L);
354 curl_easy_setopt(conn->easy, CURLOPT_ERRORBUFFER, conn->error);
355 curl_easy_setopt(conn->easy, CURLOPT_PRIVATE, conn);
356 curl_easy_setopt(conn->easy, CURLOPT_NOPROGRESS, 0L);
357 curl_easy_setopt(conn->easy, CURLOPT_PROGRESSFUNCTION, prog_cb);
358 curl_easy_setopt(conn->easy, CURLOPT_PROGRESSDATA, conn);
359 curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_TIME, 3L);
360 curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_LIMIT, 10L);
361
362 fprintf(MSG_OUT,
363 "Adding easy %p to multi %p (%s)\n", conn->easy, g->multi, url);
364 rc = curl_multi_add_handle(g->multi, conn->easy);
365 mcode_or_die("new_conn: curl_multi_add_handle", rc);
366
367 /* note that add_handle() sets a timeout to trigger soon so that the
368 necessary socket_action() gets called */
369 }
370
371 /* This gets called whenever data is received from the fifo */
fifo_cb(EV_P_ struct ev_io * w,int revents)372 static void fifo_cb(EV_P_ struct ev_io *w, int revents)
373 {
374 char s[1024];
375 long int rv = 0;
376 int n = 0;
377 GlobalInfo *g = (GlobalInfo *)w->data;
378
379 do {
380 s[0]='\0';
381 rv = fscanf(g->input, "%1023s%n", s, &n);
382 s[n]='\0';
383 if(n && s[0]) {
384 new_conn(s, g); /* if we read a URL, go get it! */
385 }
386 else
387 break;
388 } while(rv != EOF);
389 }
390
391 /* Create a named pipe and tell libevent to monitor it */
init_fifo(GlobalInfo * g)392 static int init_fifo(GlobalInfo *g)
393 {
394 struct stat st;
395 static const char *fifo = "hiper.fifo";
396 curl_socket_t sockfd;
397
398 fprintf(MSG_OUT, "Creating named pipe \"%s\"\n", fifo);
399 if(lstat (fifo, &st) == 0) {
400 if((st.st_mode & S_IFMT) == S_IFREG) {
401 errno = EEXIST;
402 perror("lstat");
403 exit(1);
404 }
405 }
406 unlink(fifo);
407 if(mkfifo (fifo, 0600) == -1) {
408 perror("mkfifo");
409 exit(1);
410 }
411 sockfd = open(fifo, O_RDWR | O_NONBLOCK, 0);
412 if(sockfd == -1) {
413 perror("open");
414 exit(1);
415 }
416 g->input = fdopen(sockfd, "r");
417
418 fprintf(MSG_OUT, "Now, pipe some URL's into > %s\n", fifo);
419 ev_io_init(&g->fifo_event, fifo_cb, sockfd, EV_READ);
420 ev_io_start(g->loop, &g->fifo_event);
421 return (0);
422 }
423
main(int argc,char ** argv)424 int main(int argc, char **argv)
425 {
426 GlobalInfo g;
427 (void)argc;
428 (void)argv;
429
430 memset(&g, 0, sizeof(GlobalInfo));
431 g.loop = ev_default_loop(0);
432
433 init_fifo(&g);
434 g.multi = curl_multi_init();
435
436 ev_timer_init(&g.timer_event, timer_cb, 0., 0.);
437 g.timer_event.data = &g;
438 g.fifo_event.data = &g;
439 curl_multi_setopt(g.multi, CURLMOPT_SOCKETFUNCTION, sock_cb);
440 curl_multi_setopt(g.multi, CURLMOPT_SOCKETDATA, &g);
441 curl_multi_setopt(g.multi, CURLMOPT_TIMERFUNCTION, multi_timer_cb);
442 curl_multi_setopt(g.multi, CURLMOPT_TIMERDATA, &g);
443
444 /* we do not call any curl_multi_socket*() function yet as we have no handles
445 added! */
446
447 ev_loop(g.loop, 0);
448 curl_multi_cleanup(g.multi);
449 return 0;
450 }
451