1 /***************************************************************************
2 * _ _ ____ _
3 * Project ___| | | | _ \| |
4 * / __| | | | |_) | |
5 * | (__| |_| | _ <| |___
6 * \___|\___/|_| \_\_____|
7 *
8 * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
9 *
10 * This software is licensed as described in the file COPYING, which
11 * you should have received as part of this distribution. The terms
12 * are also available at https://curl.se/docs/copyright.html.
13 *
14 * You may opt to use, copy, modify, merge, publish, distribute and/or sell
15 * copies of the Software, and permit persons to whom the Software is
16 * furnished to do so, under the terms of the COPYING file.
17 *
18 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
19 * KIND, either express or implied.
20 *
21 * SPDX-License-Identifier: curl
22 *
23 ***************************************************************************/
24 /* <DESC>
25 * multi socket API usage together with glib2
26 * </DESC>
27 */
28 /* Example application source code using the multi socket interface to
29 * download many files at once.
30 *
31 * Written by Jeff Pohlmeyer
32
33 Requires glib-2.x and a (POSIX?) system that has mkfifo().
34
35 This is an adaptation of libcurl's "hipev.c" and libevent's "event-test.c"
36 sample programs, adapted to use glib's g_io_channel in place of libevent.
37
38 When running, the program creates the named pipe "hiper.fifo"
39
40 Whenever there is input into the fifo, the program reads the input as a list
41 of URL's and creates some new easy handles to fetch each URL via the
42 curl_multi "hiper" API.
43
44
45 Thus, you can try a single URL:
46 % echo http://www.yahoo.com > hiper.fifo
47
48 Or a whole bunch of them:
49 % cat my-url-list > hiper.fifo
50
51 The fifo buffer is handled almost instantly, so you can even add more URL's
52 while the previous requests are still being downloaded.
53
54 This is purely a demo app, all retrieved data is simply discarded by the write
55 callback.
56
57 */
58
59 #include <glib.h>
60 #include <sys/stat.h>
61 #include <unistd.h>
62 #include <fcntl.h>
63 #include <stdlib.h>
64 #include <stdio.h>
65 #include <errno.h>
66 #include <curl/curl.h>
67
68 #define MSG_OUT g_print /* Change to "g_error" to write to stderr */
69 #define SHOW_VERBOSE 0 /* Set to non-zero for libcurl messages */
70 #define SHOW_PROGRESS 0 /* Set to non-zero to enable progress callback */
71
72 /* Global information, common to all connections */
73 typedef struct _GlobalInfo {
74 CURLM *multi;
75 guint timer_event;
76 int still_running;
77 } GlobalInfo;
78
79 /* Information associated with a specific easy handle */
80 typedef struct _ConnInfo {
81 CURL *easy;
82 char *url;
83 GlobalInfo *global;
84 char error[CURL_ERROR_SIZE];
85 } ConnInfo;
86
87 /* Information associated with a specific socket */
88 typedef struct _SockInfo {
89 curl_socket_t sockfd;
90 CURL *easy;
91 int action;
92 long timeout;
93 GIOChannel *ch;
94 guint ev;
95 GlobalInfo *global;
96 } SockInfo;
97
98 /* Die if we get a bad CURLMcode somewhere */
mcode_or_die(const char * where,CURLMcode code)99 static void mcode_or_die(const char *where, CURLMcode code)
100 {
101 if(CURLM_OK != code) {
102 const char *s;
103 switch(code) {
104 case CURLM_BAD_HANDLE: s = "CURLM_BAD_HANDLE"; break;
105 case CURLM_BAD_EASY_HANDLE: s = "CURLM_BAD_EASY_HANDLE"; break;
106 case CURLM_OUT_OF_MEMORY: s = "CURLM_OUT_OF_MEMORY"; break;
107 case CURLM_INTERNAL_ERROR: s = "CURLM_INTERNAL_ERROR"; break;
108 case CURLM_BAD_SOCKET: s = "CURLM_BAD_SOCKET"; break;
109 case CURLM_UNKNOWN_OPTION: s = "CURLM_UNKNOWN_OPTION"; break;
110 case CURLM_LAST: s = "CURLM_LAST"; break;
111 default: s = "CURLM_unknown";
112 }
113 MSG_OUT("ERROR: %s returns %s\n", where, s);
114 exit(code);
115 }
116 }
117
118 /* Check for completed transfers, and remove their easy handles */
check_multi_info(GlobalInfo * g)119 static void check_multi_info(GlobalInfo *g)
120 {
121 char *eff_url;
122 CURLMsg *msg;
123 int msgs_left;
124 ConnInfo *conn;
125 CURL *easy;
126 CURLcode res;
127
128 MSG_OUT("REMAINING: %d\n", g->still_running);
129 while((msg = curl_multi_info_read(g->multi, &msgs_left))) {
130 if(msg->msg == CURLMSG_DONE) {
131 easy = msg->easy_handle;
132 res = msg->data.result;
133 curl_easy_getinfo(easy, CURLINFO_PRIVATE, &conn);
134 curl_easy_getinfo(easy, CURLINFO_EFFECTIVE_URL, &eff_url);
135 MSG_OUT("DONE: %s => (%d) %s\n", eff_url, res, conn->error);
136 curl_multi_remove_handle(g->multi, easy);
137 free(conn->url);
138 curl_easy_cleanup(easy);
139 free(conn);
140 }
141 }
142 }
143
144 /* Called by glib when our timeout expires */
timer_cb(gpointer data)145 static gboolean timer_cb(gpointer data)
146 {
147 GlobalInfo *g = (GlobalInfo *)data;
148 CURLMcode rc;
149
150 rc = curl_multi_socket_action(g->multi,
151 CURL_SOCKET_TIMEOUT, 0, &g->still_running);
152 mcode_or_die("timer_cb: curl_multi_socket_action", rc);
153 check_multi_info(g);
154 return FALSE;
155 }
156
157 /* Update the event timer after curl_multi library calls */
update_timeout_cb(CURLM * multi,long timeout_ms,void * userp)158 static int update_timeout_cb(CURLM *multi, long timeout_ms, void *userp)
159 {
160 struct timeval timeout;
161 GlobalInfo *g = (GlobalInfo *)userp;
162 timeout.tv_sec = timeout_ms/1000;
163 timeout.tv_usec = (timeout_ms%1000)*1000;
164
165 MSG_OUT("*** update_timeout_cb %ld => %ld:%ld ***\n",
166 timeout_ms, timeout.tv_sec, timeout.tv_usec);
167
168 /*
169 * if timeout_ms is -1, just delete the timer
170 *
171 * For other values of timeout_ms, this should set or *update* the timer to
172 * the new value
173 */
174 if(timeout_ms >= 0)
175 g->timer_event = g_timeout_add(timeout_ms, timer_cb, g);
176 return 0;
177 }
178
179 /* Called by glib when we get action on a multi socket */
event_cb(GIOChannel * ch,GIOCondition condition,gpointer data)180 static gboolean event_cb(GIOChannel *ch, GIOCondition condition, gpointer data)
181 {
182 GlobalInfo *g = (GlobalInfo*) data;
183 CURLMcode rc;
184 int fd = g_io_channel_unix_get_fd(ch);
185
186 int action =
187 ((condition & G_IO_IN) ? CURL_CSELECT_IN : 0) |
188 ((condition & G_IO_OUT) ? CURL_CSELECT_OUT : 0);
189
190 rc = curl_multi_socket_action(g->multi, fd, action, &g->still_running);
191 mcode_or_die("event_cb: curl_multi_socket_action", rc);
192
193 check_multi_info(g);
194 if(g->still_running) {
195 return TRUE;
196 }
197 else {
198 MSG_OUT("last transfer done, kill timeout\n");
199 if(g->timer_event) {
200 g_source_remove(g->timer_event);
201 }
202 return FALSE;
203 }
204 }
205
206 /* Clean up the SockInfo structure */
remsock(SockInfo * f)207 static void remsock(SockInfo *f)
208 {
209 if(!f) {
210 return;
211 }
212 if(f->ev) {
213 g_source_remove(f->ev);
214 }
215 g_free(f);
216 }
217
218 /* Assign information to a SockInfo structure */
setsock(SockInfo * f,curl_socket_t s,CURL * e,int act,GlobalInfo * g)219 static void setsock(SockInfo *f, curl_socket_t s, CURL *e, int act,
220 GlobalInfo *g)
221 {
222 GIOCondition kind =
223 ((act & CURL_POLL_IN) ? G_IO_IN : 0) |
224 ((act & CURL_POLL_OUT) ? G_IO_OUT : 0);
225
226 f->sockfd = s;
227 f->action = act;
228 f->easy = e;
229 if(f->ev) {
230 g_source_remove(f->ev);
231 }
232 f->ev = g_io_add_watch(f->ch, kind, event_cb, g);
233 }
234
235 /* Initialize a new SockInfo structure */
addsock(curl_socket_t s,CURL * easy,int action,GlobalInfo * g)236 static void addsock(curl_socket_t s, CURL *easy, int action, GlobalInfo *g)
237 {
238 SockInfo *fdp = g_malloc0(sizeof(SockInfo));
239
240 fdp->global = g;
241 fdp->ch = g_io_channel_unix_new(s);
242 setsock(fdp, s, easy, action, g);
243 curl_multi_assign(g->multi, s, fdp);
244 }
245
246 /* CURLMOPT_SOCKETFUNCTION */
sock_cb(CURL * e,curl_socket_t s,int what,void * cbp,void * sockp)247 static int sock_cb(CURL *e, curl_socket_t s, int what, void *cbp, void *sockp)
248 {
249 GlobalInfo *g = (GlobalInfo*) cbp;
250 SockInfo *fdp = (SockInfo*) sockp;
251 static const char *whatstr[]={ "none", "IN", "OUT", "INOUT", "REMOVE" };
252
253 MSG_OUT("socket callback: s=%d e=%p what=%s ", s, e, whatstr[what]);
254 if(what == CURL_POLL_REMOVE) {
255 MSG_OUT("\n");
256 remsock(fdp);
257 }
258 else {
259 if(!fdp) {
260 MSG_OUT("Adding data: %s%s\n",
261 (what & CURL_POLL_IN) ? "READ" : "",
262 (what & CURL_POLL_OUT) ? "WRITE" : "");
263 addsock(s, e, what, g);
264 }
265 else {
266 MSG_OUT(
267 "Changing action from %d to %d\n", fdp->action, what);
268 setsock(fdp, s, e, what, g);
269 }
270 }
271 return 0;
272 }
273
274 /* CURLOPT_WRITEFUNCTION */
write_cb(void * ptr,size_t size,size_t nmemb,void * data)275 static size_t write_cb(void *ptr, size_t size, size_t nmemb, void *data)
276 {
277 size_t realsize = size * nmemb;
278 ConnInfo *conn = (ConnInfo*) data;
279 (void)ptr;
280 (void)conn;
281 return realsize;
282 }
283
284 /* CURLOPT_PROGRESSFUNCTION */
prog_cb(void * p,double dltotal,double dlnow,double ult,double uln)285 static int prog_cb(void *p, double dltotal, double dlnow, double ult,
286 double uln)
287 {
288 ConnInfo *conn = (ConnInfo *)p;
289 MSG_OUT("Progress: %s (%g/%g)\n", conn->url, dlnow, dltotal);
290 return 0;
291 }
292
293 /* Create a new easy handle, and add it to the global curl_multi */
new_conn(char * url,GlobalInfo * g)294 static void new_conn(char *url, GlobalInfo *g)
295 {
296 ConnInfo *conn;
297 CURLMcode rc;
298
299 conn = g_malloc0(sizeof(ConnInfo));
300 conn->error[0]='\0';
301 conn->easy = curl_easy_init();
302 if(!conn->easy) {
303 MSG_OUT("curl_easy_init() failed, exiting!\n");
304 exit(2);
305 }
306 conn->global = g;
307 conn->url = g_strdup(url);
308 curl_easy_setopt(conn->easy, CURLOPT_URL, conn->url);
309 curl_easy_setopt(conn->easy, CURLOPT_WRITEFUNCTION, write_cb);
310 curl_easy_setopt(conn->easy, CURLOPT_WRITEDATA, &conn);
311 curl_easy_setopt(conn->easy, CURLOPT_VERBOSE, (long)SHOW_VERBOSE);
312 curl_easy_setopt(conn->easy, CURLOPT_ERRORBUFFER, conn->error);
313 curl_easy_setopt(conn->easy, CURLOPT_PRIVATE, conn);
314 curl_easy_setopt(conn->easy, CURLOPT_NOPROGRESS, SHOW_PROGRESS ? 0L : 1L);
315 curl_easy_setopt(conn->easy, CURLOPT_PROGRESSFUNCTION, prog_cb);
316 curl_easy_setopt(conn->easy, CURLOPT_PROGRESSDATA, conn);
317 curl_easy_setopt(conn->easy, CURLOPT_FOLLOWLOCATION, 1L);
318 curl_easy_setopt(conn->easy, CURLOPT_CONNECTTIMEOUT, 30L);
319 curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_LIMIT, 1L);
320 curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_TIME, 30L);
321
322 MSG_OUT("Adding easy %p to multi %p (%s)\n", conn->easy, g->multi, url);
323 rc = curl_multi_add_handle(g->multi, conn->easy);
324 mcode_or_die("new_conn: curl_multi_add_handle", rc);
325
326 /* note that add_handle() sets a timeout to trigger soon so that the
327 necessary socket_action() gets called */
328 }
329
330 /* This gets called by glib whenever data is received from the fifo */
fifo_cb(GIOChannel * ch,GIOCondition condition,gpointer data)331 static gboolean fifo_cb(GIOChannel *ch, GIOCondition condition, gpointer data)
332 {
333 #define BUF_SIZE 1024
334 gsize len, tp;
335 gchar *buf, *tmp, *all = NULL;
336 GIOStatus rv;
337
338 do {
339 GError *err = NULL;
340 rv = g_io_channel_read_line(ch, &buf, &len, &tp, &err);
341 if(buf) {
342 if(tp) {
343 buf[tp]='\0';
344 }
345 new_conn(buf, (GlobalInfo*)data);
346 g_free(buf);
347 }
348 else {
349 buf = g_malloc(BUF_SIZE + 1);
350 while(TRUE) {
351 buf[BUF_SIZE]='\0';
352 g_io_channel_read_chars(ch, buf, BUF_SIZE, &len, &err);
353 if(len) {
354 buf[len]='\0';
355 if(all) {
356 tmp = all;
357 all = g_strdup_printf("%s%s", tmp, buf);
358 g_free(tmp);
359 }
360 else {
361 all = g_strdup(buf);
362 }
363 }
364 else {
365 break;
366 }
367 }
368 if(all) {
369 new_conn(all, (GlobalInfo*)data);
370 g_free(all);
371 }
372 g_free(buf);
373 }
374 if(err) {
375 g_error("fifo_cb: %s", err->message);
376 g_free(err);
377 break;
378 }
379 } while((len) && (rv == G_IO_STATUS_NORMAL));
380 return TRUE;
381 }
382
init_fifo(void)383 int init_fifo(void)
384 {
385 struct stat st;
386 const char *fifo = "hiper.fifo";
387 int socket;
388
389 if(lstat (fifo, &st) == 0) {
390 if((st.st_mode & S_IFMT) == S_IFREG) {
391 errno = EEXIST;
392 perror("lstat");
393 exit(1);
394 }
395 }
396
397 unlink(fifo);
398 if(mkfifo (fifo, 0600) == -1) {
399 perror("mkfifo");
400 exit(1);
401 }
402
403 socket = open(fifo, O_RDWR | O_NONBLOCK, 0);
404
405 if(socket == -1) {
406 perror("open");
407 exit(1);
408 }
409 MSG_OUT("Now, pipe some URL's into > %s\n", fifo);
410
411 return socket;
412 }
413
main(int argc,char ** argv)414 int main(int argc, char **argv)
415 {
416 GlobalInfo *g;
417 GMainLoop*gmain;
418 int fd;
419 GIOChannel* ch;
420 g = g_malloc0(sizeof(GlobalInfo));
421
422 fd = init_fifo();
423 ch = g_io_channel_unix_new(fd);
424 g_io_add_watch(ch, G_IO_IN, fifo_cb, g);
425 gmain = g_main_loop_new(NULL, FALSE);
426 g->multi = curl_multi_init();
427 curl_multi_setopt(g->multi, CURLMOPT_SOCKETFUNCTION, sock_cb);
428 curl_multi_setopt(g->multi, CURLMOPT_SOCKETDATA, g);
429 curl_multi_setopt(g->multi, CURLMOPT_TIMERFUNCTION, update_timeout_cb);
430 curl_multi_setopt(g->multi, CURLMOPT_TIMERDATA, g);
431
432 /* we do not call any curl_multi_socket*() function yet as we have no handles
433 added! */
434
435 g_main_loop_run(gmain);
436 curl_multi_cleanup(g->multi);
437 return 0;
438 }
439