1 /***************************************************************************
2 * _ _ ____ _
3 * Project ___| | | | _ \| |
4 * / __| | | | |_) | |
5 * | (__| |_| | _ <| |___
6 * \___|\___/|_| \_\_____|
7 *
8 * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
9 *
10 * This software is licensed as described in the file COPYING, which
11 * you should have received as part of this distribution. The terms
12 * are also available at https://curl.se/docs/copyright.html.
13 *
14 * You may opt to use, copy, modify, merge, publish, distribute and/or sell
15 * copies of the Software, and permit persons to whom the Software is
16 * furnished to do so, under the terms of the COPYING file.
17 *
18 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
19 * KIND, either express or implied.
20 *
21 * SPDX-License-Identifier: curl
22 *
23 ***************************************************************************/
24 /* <DESC>
25 * multi socket API usage together with glib2
26 * </DESC>
27 */
28 /* Example application source code using the multi socket interface to
29 * download many files at once.
30 *
31 * Written by Jeff Pohlmeyer
32
33 Requires glib-2.x and a (POSIX?) system that has mkfifo().
34
35 This is an adaptation of libcurl's "hipev.c" and libevent's "event-test.c"
36 sample programs, adapted to use glib's g_io_channel in place of libevent.
37
38 When running, the program creates the named pipe "hiper.fifo"
39
40 Whenever there is input into the fifo, the program reads the input as a list
41 of URL's and creates some new easy handles to fetch each URL via the
42 curl_multi "hiper" API.
43
44
45 Thus, you can try a single URL:
46 % echo http://www.yahoo.com > hiper.fifo
47
48 Or a whole bunch of them:
49 % cat my-url-list > hiper.fifo
50
51 The fifo buffer is handled almost instantly, so you can even add more URL's
52 while the previous requests are still being downloaded.
53
54 This is purely a demo app, all retrieved data is simply discarded by the write
55 callback.
56
57 */
58
59 #include <glib.h>
60 #include <sys/stat.h>
61 #include <unistd.h>
62 #include <fcntl.h>
63 #include <stdlib.h>
64 #include <stdio.h>
65 #include <errno.h>
66 #include <curl/curl.h>
67
68 #define MSG_OUT g_print /* Change to "g_error" to write to stderr */
69 #define SHOW_VERBOSE 0 /* Set to non-zero for libcurl messages */
70 #define SHOW_PROGRESS 0 /* Set to non-zero to enable progress callback */
71
72 /* Global information, common to all connections */
73 typedef struct _GlobalInfo {
74 CURLM *multi;
75 guint timer_event;
76 int still_running;
77 } GlobalInfo;
78
79 /* Information associated with a specific easy handle */
80 typedef struct _ConnInfo {
81 CURL *easy;
82 char *url;
83 GlobalInfo *global;
84 char error[CURL_ERROR_SIZE];
85 } ConnInfo;
86
87 /* Information associated with a specific socket */
88 typedef struct _SockInfo {
89 curl_socket_t sockfd;
90 CURL *easy;
91 int action;
92 long timeout;
93 GIOChannel *ch;
94 guint ev;
95 GlobalInfo *global;
96 } SockInfo;
97
98 /* Die if we get a bad CURLMcode somewhere */
mcode_or_die(const char * where,CURLMcode code)99 static void mcode_or_die(const char *where, CURLMcode code)
100 {
101 if(CURLM_OK != code) {
102 const char *s;
103 switch(code) {
104 case CURLM_BAD_HANDLE: s = "CURLM_BAD_HANDLE"; break;
105 case CURLM_BAD_EASY_HANDLE: s = "CURLM_BAD_EASY_HANDLE"; break;
106 case CURLM_OUT_OF_MEMORY: s = "CURLM_OUT_OF_MEMORY"; break;
107 case CURLM_INTERNAL_ERROR: s = "CURLM_INTERNAL_ERROR"; break;
108 case CURLM_BAD_SOCKET: s = "CURLM_BAD_SOCKET"; break;
109 case CURLM_UNKNOWN_OPTION: s = "CURLM_UNKNOWN_OPTION"; break;
110 case CURLM_LAST: s = "CURLM_LAST"; break;
111 default: s = "CURLM_unknown";
112 }
113 MSG_OUT("ERROR: %s returns %s\n", where, s);
114 exit(code);
115 }
116 }
117
118 /* Check for completed transfers, and remove their easy handles */
check_multi_info(GlobalInfo * g)119 static void check_multi_info(GlobalInfo *g)
120 {
121 CURLMsg *msg;
122 int msgs_left;
123
124 MSG_OUT("REMAINING: %d\n", g->still_running);
125 while((msg = curl_multi_info_read(g->multi, &msgs_left))) {
126 if(msg->msg == CURLMSG_DONE) {
127 CURL *easy = msg->easy_handle;
128 CURLcode res = msg->data.result;
129 char *eff_url;
130 ConnInfo *conn;
131 curl_easy_getinfo(easy, CURLINFO_PRIVATE, &conn);
132 curl_easy_getinfo(easy, CURLINFO_EFFECTIVE_URL, &eff_url);
133 MSG_OUT("DONE: %s => (%d) %s\n", eff_url, res, conn->error);
134 curl_multi_remove_handle(g->multi, easy);
135 free(conn->url);
136 curl_easy_cleanup(easy);
137 free(conn);
138 }
139 }
140 }
141
142 /* Called by glib when our timeout expires */
timer_cb(gpointer data)143 static gboolean timer_cb(gpointer data)
144 {
145 GlobalInfo *g = (GlobalInfo *)data;
146 CURLMcode rc;
147
148 rc = curl_multi_socket_action(g->multi,
149 CURL_SOCKET_TIMEOUT, 0, &g->still_running);
150 mcode_or_die("timer_cb: curl_multi_socket_action", rc);
151 check_multi_info(g);
152 return FALSE;
153 }
154
155 /* Update the event timer after curl_multi library calls */
update_timeout_cb(CURLM * multi,long timeout_ms,void * userp)156 static int update_timeout_cb(CURLM *multi, long timeout_ms, void *userp)
157 {
158 struct timeval timeout;
159 GlobalInfo *g = (GlobalInfo *)userp;
160 timeout.tv_sec = timeout_ms/1000;
161 timeout.tv_usec = (timeout_ms%1000)*1000;
162
163 MSG_OUT("*** update_timeout_cb %ld => %ld:%ld ***\n",
164 timeout_ms, timeout.tv_sec, timeout.tv_usec);
165
166 /*
167 * if timeout_ms is -1, just delete the timer
168 *
169 * For other values of timeout_ms, this should set or *update* the timer to
170 * the new value
171 */
172 if(timeout_ms >= 0)
173 g->timer_event = g_timeout_add(timeout_ms, timer_cb, g);
174 return 0;
175 }
176
177 /* Called by glib when we get action on a multi socket */
event_cb(GIOChannel * ch,GIOCondition condition,gpointer data)178 static gboolean event_cb(GIOChannel *ch, GIOCondition condition, gpointer data)
179 {
180 GlobalInfo *g = (GlobalInfo*) data;
181 CURLMcode rc;
182 int fd = g_io_channel_unix_get_fd(ch);
183
184 int action =
185 ((condition & G_IO_IN) ? CURL_CSELECT_IN : 0) |
186 ((condition & G_IO_OUT) ? CURL_CSELECT_OUT : 0);
187
188 rc = curl_multi_socket_action(g->multi, fd, action, &g->still_running);
189 mcode_or_die("event_cb: curl_multi_socket_action", rc);
190
191 check_multi_info(g);
192 if(g->still_running) {
193 return TRUE;
194 }
195 else {
196 MSG_OUT("last transfer done, kill timeout\n");
197 if(g->timer_event) {
198 g_source_remove(g->timer_event);
199 }
200 return FALSE;
201 }
202 }
203
204 /* Clean up the SockInfo structure */
remsock(SockInfo * f)205 static void remsock(SockInfo *f)
206 {
207 if(!f) {
208 return;
209 }
210 if(f->ev) {
211 g_source_remove(f->ev);
212 }
213 g_free(f);
214 }
215
216 /* Assign information to a SockInfo structure */
setsock(SockInfo * f,curl_socket_t s,CURL * e,int act,GlobalInfo * g)217 static void setsock(SockInfo *f, curl_socket_t s, CURL *e, int act,
218 GlobalInfo *g)
219 {
220 GIOCondition kind =
221 ((act & CURL_POLL_IN) ? G_IO_IN : 0) |
222 ((act & CURL_POLL_OUT) ? G_IO_OUT : 0);
223
224 f->sockfd = s;
225 f->action = act;
226 f->easy = e;
227 if(f->ev) {
228 g_source_remove(f->ev);
229 }
230 f->ev = g_io_add_watch(f->ch, kind, event_cb, g);
231 }
232
233 /* Initialize a new SockInfo structure */
addsock(curl_socket_t s,CURL * easy,int action,GlobalInfo * g)234 static void addsock(curl_socket_t s, CURL *easy, int action, GlobalInfo *g)
235 {
236 SockInfo *fdp = g_malloc0(sizeof(SockInfo));
237
238 fdp->global = g;
239 fdp->ch = g_io_channel_unix_new(s);
240 setsock(fdp, s, easy, action, g);
241 curl_multi_assign(g->multi, s, fdp);
242 }
243
244 /* CURLMOPT_SOCKETFUNCTION */
sock_cb(CURL * e,curl_socket_t s,int what,void * cbp,void * sockp)245 static int sock_cb(CURL *e, curl_socket_t s, int what, void *cbp, void *sockp)
246 {
247 GlobalInfo *g = (GlobalInfo*) cbp;
248 SockInfo *fdp = (SockInfo*) sockp;
249 static const char *whatstr[]={ "none", "IN", "OUT", "INOUT", "REMOVE" };
250
251 MSG_OUT("socket callback: s=%d e=%p what=%s ", s, e, whatstr[what]);
252 if(what == CURL_POLL_REMOVE) {
253 MSG_OUT("\n");
254 remsock(fdp);
255 }
256 else {
257 if(!fdp) {
258 MSG_OUT("Adding data: %s%s\n",
259 (what & CURL_POLL_IN) ? "READ" : "",
260 (what & CURL_POLL_OUT) ? "WRITE" : "");
261 addsock(s, e, what, g);
262 }
263 else {
264 MSG_OUT(
265 "Changing action from %d to %d\n", fdp->action, what);
266 setsock(fdp, s, e, what, g);
267 }
268 }
269 return 0;
270 }
271
272 /* CURLOPT_WRITEFUNCTION */
write_cb(void * ptr,size_t size,size_t nmemb,void * data)273 static size_t write_cb(void *ptr, size_t size, size_t nmemb, void *data)
274 {
275 size_t realsize = size * nmemb;
276 ConnInfo *conn = (ConnInfo*) data;
277 (void)ptr;
278 (void)conn;
279 return realsize;
280 }
281
282 /* CURLOPT_XFERINFOFUNCTION */
xferinfo_cb(void * p,curl_off_t dltotal,curl_off_t dlnow,curl_off_t ult,curl_off_t uln)283 static int xferinfo_cb(void *p, curl_off_t dltotal, curl_off_t dlnow,
284 curl_off_t ult, curl_off_t uln)
285 {
286 ConnInfo *conn = (ConnInfo *)p;
287 (void)ult;
288 (void)uln;
289
290 fprintf(MSG_OUT, "Progress: %s (%" CURL_FORMAT_CURL_OFF_T
291 "/%" CURL_FORMAT_CURL_OFF_T ")\n", conn->url, dlnow, dltotal);
292 return 0;
293 }
294
295 /* Create a new easy handle, and add it to the global curl_multi */
new_conn(const char * url,GlobalInfo * g)296 static void new_conn(const char *url, GlobalInfo *g)
297 {
298 ConnInfo *conn;
299 CURLMcode rc;
300
301 conn = g_malloc0(sizeof(ConnInfo));
302 conn->error[0] = '\0';
303 conn->easy = curl_easy_init();
304 if(!conn->easy) {
305 MSG_OUT("curl_easy_init() failed, exiting!\n");
306 exit(2);
307 }
308 conn->global = g;
309 conn->url = g_strdup(url);
310 curl_easy_setopt(conn->easy, CURLOPT_URL, conn->url);
311 curl_easy_setopt(conn->easy, CURLOPT_WRITEFUNCTION, write_cb);
312 curl_easy_setopt(conn->easy, CURLOPT_WRITEDATA, &conn);
313 curl_easy_setopt(conn->easy, CURLOPT_VERBOSE, (long)SHOW_VERBOSE);
314 curl_easy_setopt(conn->easy, CURLOPT_ERRORBUFFER, conn->error);
315 curl_easy_setopt(conn->easy, CURLOPT_PRIVATE, conn);
316 curl_easy_setopt(conn->easy, CURLOPT_NOPROGRESS, SHOW_PROGRESS ? 0L : 1L);
317 curl_easy_setopt(conn->easy, CURLOPT_XFERINFOFUNCTION, xferinfo_cb);
318 curl_easy_setopt(conn->easy, CURLOPT_PROGRESSDATA, conn);
319 curl_easy_setopt(conn->easy, CURLOPT_FOLLOWLOCATION, 1L);
320 curl_easy_setopt(conn->easy, CURLOPT_CONNECTTIMEOUT, 30L);
321 curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_LIMIT, 1L);
322 curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_TIME, 30L);
323
324 MSG_OUT("Adding easy %p to multi %p (%s)\n", conn->easy, g->multi, url);
325 rc = curl_multi_add_handle(g->multi, conn->easy);
326 mcode_or_die("new_conn: curl_multi_add_handle", rc);
327
328 /* note that add_handle() sets a timeout to trigger soon so that the
329 necessary socket_action() gets called */
330 }
331
332 /* This gets called by glib whenever data is received from the fifo */
fifo_cb(GIOChannel * ch,GIOCondition condition,gpointer data)333 static gboolean fifo_cb(GIOChannel *ch, GIOCondition condition, gpointer data)
334 {
335 #define BUF_SIZE 1024
336 gsize len, tp;
337 gchar *buf, *tmp, *all = NULL;
338 GIOStatus rv;
339
340 do {
341 GError *err = NULL;
342 rv = g_io_channel_read_line(ch, &buf, &len, &tp, &err);
343 if(buf) {
344 if(tp) {
345 buf[tp]='\0';
346 }
347 new_conn(buf, (GlobalInfo*)data);
348 g_free(buf);
349 }
350 else {
351 buf = g_malloc(BUF_SIZE + 1);
352 while(TRUE) {
353 buf[BUF_SIZE]='\0';
354 g_io_channel_read_chars(ch, buf, BUF_SIZE, &len, &err);
355 if(len) {
356 buf[len]='\0';
357 if(all) {
358 tmp = all;
359 all = g_strdup_printf("%s%s", tmp, buf);
360 g_free(tmp);
361 }
362 else {
363 all = g_strdup(buf);
364 }
365 }
366 else {
367 break;
368 }
369 }
370 if(all) {
371 new_conn(all, (GlobalInfo*)data);
372 g_free(all);
373 }
374 g_free(buf);
375 }
376 if(err) {
377 g_error("fifo_cb: %s", err->message);
378 g_free(err);
379 break;
380 }
381 } while((len) && (rv == G_IO_STATUS_NORMAL));
382 return TRUE;
383 }
384
init_fifo(void)385 int init_fifo(void)
386 {
387 struct stat st;
388 const char *fifo = "hiper.fifo";
389 int socket;
390
391 if(lstat(fifo, &st) == 0) {
392 if((st.st_mode & S_IFMT) == S_IFREG) {
393 errno = EEXIST;
394 perror("lstat");
395 exit(1);
396 }
397 }
398
399 unlink(fifo);
400 if(mkfifo (fifo, 0600) == -1) {
401 perror("mkfifo");
402 exit(1);
403 }
404
405 socket = open(fifo, O_RDWR | O_NONBLOCK, 0);
406
407 if(socket == -1) {
408 perror("open");
409 exit(1);
410 }
411 MSG_OUT("Now, pipe some URL's into > %s\n", fifo);
412
413 return socket;
414 }
415
main(void)416 int main(void)
417 {
418 GlobalInfo *g = g_malloc0(sizeof(GlobalInfo));
419 GMainLoop*gmain;
420 int fd;
421 GIOChannel* ch;
422
423 fd = init_fifo();
424 ch = g_io_channel_unix_new(fd);
425 g_io_add_watch(ch, G_IO_IN, fifo_cb, g);
426 gmain = g_main_loop_new(NULL, FALSE);
427 g->multi = curl_multi_init();
428 curl_multi_setopt(g->multi, CURLMOPT_SOCKETFUNCTION, sock_cb);
429 curl_multi_setopt(g->multi, CURLMOPT_SOCKETDATA, g);
430 curl_multi_setopt(g->multi, CURLMOPT_TIMERFUNCTION, update_timeout_cb);
431 curl_multi_setopt(g->multi, CURLMOPT_TIMERDATA, g);
432
433 /* we do not call any curl_multi_socket*() function yet as we have no handles
434 added! */
435
436 g_main_loop_run(gmain);
437 curl_multi_cleanup(g->multi);
438 return 0;
439 }
440