xref: /curl/docs/examples/ghiper.c (revision 37fb50a8)
1 /***************************************************************************
2  *                                  _   _ ____  _
3  *  Project                     ___| | | |  _ \| |
4  *                             / __| | | | |_) | |
5  *                            | (__| |_| |  _ <| |___
6  *                             \___|\___/|_| \_\_____|
7  *
8  * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
9  *
10  * This software is licensed as described in the file COPYING, which
11  * you should have received as part of this distribution. The terms
12  * are also available at https://curl.se/docs/copyright.html.
13  *
14  * You may opt to use, copy, modify, merge, publish, distribute and/or sell
15  * copies of the Software, and permit persons to whom the Software is
16  * furnished to do so, under the terms of the COPYING file.
17  *
18  * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
19  * KIND, either express or implied.
20  *
21  * SPDX-License-Identifier: curl
22  *
23  ***************************************************************************/
24 /* <DESC>
25  * multi socket API usage together with glib2
26  * </DESC>
27  */
28 /* Example application source code using the multi socket interface to
29  * download many files at once.
30  *
31  * Written by Jeff Pohlmeyer
32 
33  Requires glib-2.x and a (POSIX?) system that has mkfifo().
34 
35  This is an adaptation of libcurl's "hipev.c" and libevent's "event-test.c"
36  sample programs, adapted to use glib's g_io_channel in place of libevent.
37 
38  When running, the program creates the named pipe "hiper.fifo"
39 
40  Whenever there is input into the fifo, the program reads the input as a list
41  of URL's and creates some new easy handles to fetch each URL via the
42  curl_multi "hiper" API.
43 
44 
45  Thus, you can try a single URL:
46  % echo http://www.yahoo.com > hiper.fifo
47 
48  Or a whole bunch of them:
49  % cat my-url-list > hiper.fifo
50 
51  The fifo buffer is handled almost instantly, so you can even add more URL's
52  while the previous requests are still being downloaded.
53 
54  This is purely a demo app, all retrieved data is simply discarded by the write
55  callback.
56 
57 */
58 
59 #include <glib.h>
60 #include <sys/stat.h>
61 #include <unistd.h>
62 #include <fcntl.h>
63 #include <stdlib.h>
64 #include <stdio.h>
65 #include <errno.h>
66 #include <curl/curl.h>
67 
68 #define MSG_OUT g_print   /* Change to "g_error" to write to stderr */
69 #define SHOW_VERBOSE 0    /* Set to non-zero for libcurl messages */
70 #define SHOW_PROGRESS 0   /* Set to non-zero to enable progress callback */
71 
72 /* Global information, common to all connections */
73 typedef struct _GlobalInfo {
74   CURLM *multi;
75   guint timer_event;
76   int still_running;
77 } GlobalInfo;
78 
79 /* Information associated with a specific easy handle */
80 typedef struct _ConnInfo {
81   CURL *easy;
82   char *url;
83   GlobalInfo *global;
84   char error[CURL_ERROR_SIZE];
85 } ConnInfo;
86 
87 /* Information associated with a specific socket */
88 typedef struct _SockInfo {
89   curl_socket_t sockfd;
90   CURL *easy;
91   int action;
92   long timeout;
93   GIOChannel *ch;
94   guint ev;
95   GlobalInfo *global;
96 } SockInfo;
97 
98 /* Die if we get a bad CURLMcode somewhere */
mcode_or_die(const char * where,CURLMcode code)99 static void mcode_or_die(const char *where, CURLMcode code)
100 {
101   if(CURLM_OK != code) {
102     const char *s;
103     switch(code) {
104     case     CURLM_BAD_HANDLE:         s = "CURLM_BAD_HANDLE";         break;
105     case     CURLM_BAD_EASY_HANDLE:    s = "CURLM_BAD_EASY_HANDLE";    break;
106     case     CURLM_OUT_OF_MEMORY:      s = "CURLM_OUT_OF_MEMORY";      break;
107     case     CURLM_INTERNAL_ERROR:     s = "CURLM_INTERNAL_ERROR";     break;
108     case     CURLM_BAD_SOCKET:         s = "CURLM_BAD_SOCKET";         break;
109     case     CURLM_UNKNOWN_OPTION:     s = "CURLM_UNKNOWN_OPTION";     break;
110     case     CURLM_LAST:               s = "CURLM_LAST";               break;
111     default: s = "CURLM_unknown";
112     }
113     MSG_OUT("ERROR: %s returns %s\n", where, s);
114     exit(code);
115   }
116 }
117 
118 /* Check for completed transfers, and remove their easy handles */
check_multi_info(GlobalInfo * g)119 static void check_multi_info(GlobalInfo *g)
120 {
121   CURLMsg *msg;
122   int msgs_left;
123 
124   MSG_OUT("REMAINING: %d\n", g->still_running);
125   while((msg = curl_multi_info_read(g->multi, &msgs_left))) {
126     if(msg->msg == CURLMSG_DONE) {
127       CURL *easy = msg->easy_handle;
128       CURLcode res = msg->data.result;
129       char *eff_url;
130       ConnInfo *conn;
131       curl_easy_getinfo(easy, CURLINFO_PRIVATE, &conn);
132       curl_easy_getinfo(easy, CURLINFO_EFFECTIVE_URL, &eff_url);
133       MSG_OUT("DONE: %s => (%d) %s\n", eff_url, res, conn->error);
134       curl_multi_remove_handle(g->multi, easy);
135       free(conn->url);
136       curl_easy_cleanup(easy);
137       free(conn);
138     }
139   }
140 }
141 
142 /* Called by glib when our timeout expires */
timer_cb(gpointer data)143 static gboolean timer_cb(gpointer data)
144 {
145   GlobalInfo *g = (GlobalInfo *)data;
146   CURLMcode rc;
147 
148   rc = curl_multi_socket_action(g->multi,
149                                 CURL_SOCKET_TIMEOUT, 0, &g->still_running);
150   mcode_or_die("timer_cb: curl_multi_socket_action", rc);
151   check_multi_info(g);
152   return FALSE;
153 }
154 
155 /* Update the event timer after curl_multi library calls */
update_timeout_cb(CURLM * multi,long timeout_ms,void * userp)156 static int update_timeout_cb(CURLM *multi, long timeout_ms, void *userp)
157 {
158   struct timeval timeout;
159   GlobalInfo *g = (GlobalInfo *)userp;
160   timeout.tv_sec = timeout_ms/1000;
161   timeout.tv_usec = (timeout_ms%1000)*1000;
162 
163   MSG_OUT("*** update_timeout_cb %ld => %ld:%ld ***\n",
164           timeout_ms, timeout.tv_sec, timeout.tv_usec);
165 
166   /*
167    * if timeout_ms is -1, just delete the timer
168    *
169    * For other values of timeout_ms, this should set or *update* the timer to
170    * the new value
171    */
172   if(timeout_ms >= 0)
173     g->timer_event = g_timeout_add(timeout_ms, timer_cb, g);
174   return 0;
175 }
176 
177 /* Called by glib when we get action on a multi socket */
event_cb(GIOChannel * ch,GIOCondition condition,gpointer data)178 static gboolean event_cb(GIOChannel *ch, GIOCondition condition, gpointer data)
179 {
180   GlobalInfo *g = (GlobalInfo*) data;
181   CURLMcode rc;
182   int fd = g_io_channel_unix_get_fd(ch);
183 
184   int action =
185     ((condition & G_IO_IN) ? CURL_CSELECT_IN : 0) |
186     ((condition & G_IO_OUT) ? CURL_CSELECT_OUT : 0);
187 
188   rc = curl_multi_socket_action(g->multi, fd, action, &g->still_running);
189   mcode_or_die("event_cb: curl_multi_socket_action", rc);
190 
191   check_multi_info(g);
192   if(g->still_running) {
193     return TRUE;
194   }
195   else {
196     MSG_OUT("last transfer done, kill timeout\n");
197     if(g->timer_event) {
198       g_source_remove(g->timer_event);
199     }
200     return FALSE;
201   }
202 }
203 
204 /* Clean up the SockInfo structure */
remsock(SockInfo * f)205 static void remsock(SockInfo *f)
206 {
207   if(!f) {
208     return;
209   }
210   if(f->ev) {
211     g_source_remove(f->ev);
212   }
213   g_free(f);
214 }
215 
216 /* Assign information to a SockInfo structure */
setsock(SockInfo * f,curl_socket_t s,CURL * e,int act,GlobalInfo * g)217 static void setsock(SockInfo *f, curl_socket_t s, CURL *e, int act,
218                     GlobalInfo *g)
219 {
220   GIOCondition kind =
221     ((act & CURL_POLL_IN) ? G_IO_IN : 0) |
222     ((act & CURL_POLL_OUT) ? G_IO_OUT : 0);
223 
224   f->sockfd = s;
225   f->action = act;
226   f->easy = e;
227   if(f->ev) {
228     g_source_remove(f->ev);
229   }
230   f->ev = g_io_add_watch(f->ch, kind, event_cb, g);
231 }
232 
233 /* Initialize a new SockInfo structure */
addsock(curl_socket_t s,CURL * easy,int action,GlobalInfo * g)234 static void addsock(curl_socket_t s, CURL *easy, int action, GlobalInfo *g)
235 {
236   SockInfo *fdp = g_malloc0(sizeof(SockInfo));
237 
238   fdp->global = g;
239   fdp->ch = g_io_channel_unix_new(s);
240   setsock(fdp, s, easy, action, g);
241   curl_multi_assign(g->multi, s, fdp);
242 }
243 
244 /* CURLMOPT_SOCKETFUNCTION */
sock_cb(CURL * e,curl_socket_t s,int what,void * cbp,void * sockp)245 static int sock_cb(CURL *e, curl_socket_t s, int what, void *cbp, void *sockp)
246 {
247   GlobalInfo *g = (GlobalInfo*) cbp;
248   SockInfo *fdp = (SockInfo*) sockp;
249   static const char *whatstr[]={ "none", "IN", "OUT", "INOUT", "REMOVE" };
250 
251   MSG_OUT("socket callback: s=%d e=%p what=%s ", s, e, whatstr[what]);
252   if(what == CURL_POLL_REMOVE) {
253     MSG_OUT("\n");
254     remsock(fdp);
255   }
256   else {
257     if(!fdp) {
258       MSG_OUT("Adding data: %s%s\n",
259               (what & CURL_POLL_IN) ? "READ" : "",
260               (what & CURL_POLL_OUT) ? "WRITE" : "");
261       addsock(s, e, what, g);
262     }
263     else {
264       MSG_OUT(
265         "Changing action from %d to %d\n", fdp->action, what);
266       setsock(fdp, s, e, what, g);
267     }
268   }
269   return 0;
270 }
271 
272 /* CURLOPT_WRITEFUNCTION */
write_cb(void * ptr,size_t size,size_t nmemb,void * data)273 static size_t write_cb(void *ptr, size_t size, size_t nmemb, void *data)
274 {
275   size_t realsize = size * nmemb;
276   ConnInfo *conn = (ConnInfo*) data;
277   (void)ptr;
278   (void)conn;
279   return realsize;
280 }
281 
282 /* CURLOPT_XFERINFOFUNCTION */
xferinfo_cb(void * p,curl_off_t dltotal,curl_off_t dlnow,curl_off_t ult,curl_off_t uln)283 static int xferinfo_cb(void *p, curl_off_t dltotal, curl_off_t dlnow,
284                        curl_off_t ult, curl_off_t uln)
285 {
286   ConnInfo *conn = (ConnInfo *)p;
287   (void)ult;
288   (void)uln;
289 
290   fprintf(MSG_OUT, "Progress: %s (%" CURL_FORMAT_CURL_OFF_T
291           "/%" CURL_FORMAT_CURL_OFF_T ")\n", conn->url, dlnow, dltotal);
292   return 0;
293 }
294 
295 /* Create a new easy handle, and add it to the global curl_multi */
new_conn(const char * url,GlobalInfo * g)296 static void new_conn(const char *url, GlobalInfo *g)
297 {
298   ConnInfo *conn;
299   CURLMcode rc;
300 
301   conn = g_malloc0(sizeof(ConnInfo));
302   conn->error[0] = '\0';
303   conn->easy = curl_easy_init();
304   if(!conn->easy) {
305     MSG_OUT("curl_easy_init() failed, exiting!\n");
306     exit(2);
307   }
308   conn->global = g;
309   conn->url = g_strdup(url);
310   curl_easy_setopt(conn->easy, CURLOPT_URL, conn->url);
311   curl_easy_setopt(conn->easy, CURLOPT_WRITEFUNCTION, write_cb);
312   curl_easy_setopt(conn->easy, CURLOPT_WRITEDATA, &conn);
313   curl_easy_setopt(conn->easy, CURLOPT_VERBOSE, (long)SHOW_VERBOSE);
314   curl_easy_setopt(conn->easy, CURLOPT_ERRORBUFFER, conn->error);
315   curl_easy_setopt(conn->easy, CURLOPT_PRIVATE, conn);
316   curl_easy_setopt(conn->easy, CURLOPT_NOPROGRESS, SHOW_PROGRESS ? 0L : 1L);
317   curl_easy_setopt(conn->easy, CURLOPT_XFERINFOFUNCTION, xferinfo_cb);
318   curl_easy_setopt(conn->easy, CURLOPT_PROGRESSDATA, conn);
319   curl_easy_setopt(conn->easy, CURLOPT_FOLLOWLOCATION, 1L);
320   curl_easy_setopt(conn->easy, CURLOPT_CONNECTTIMEOUT, 30L);
321   curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_LIMIT, 1L);
322   curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_TIME, 30L);
323 
324   MSG_OUT("Adding easy %p to multi %p (%s)\n", conn->easy, g->multi, url);
325   rc = curl_multi_add_handle(g->multi, conn->easy);
326   mcode_or_die("new_conn: curl_multi_add_handle", rc);
327 
328   /* note that add_handle() sets a timeout to trigger soon so that the
329      necessary socket_action() gets called */
330 }
331 
332 /* This gets called by glib whenever data is received from the fifo */
fifo_cb(GIOChannel * ch,GIOCondition condition,gpointer data)333 static gboolean fifo_cb(GIOChannel *ch, GIOCondition condition, gpointer data)
334 {
335 #define BUF_SIZE 1024
336   gsize len, tp;
337   gchar *buf, *tmp, *all = NULL;
338   GIOStatus rv;
339 
340   do {
341     GError *err = NULL;
342     rv = g_io_channel_read_line(ch, &buf, &len, &tp, &err);
343     if(buf) {
344       if(tp) {
345         buf[tp]='\0';
346       }
347       new_conn(buf, (GlobalInfo*)data);
348       g_free(buf);
349     }
350     else {
351       buf = g_malloc(BUF_SIZE + 1);
352       while(TRUE) {
353         buf[BUF_SIZE]='\0';
354         g_io_channel_read_chars(ch, buf, BUF_SIZE, &len, &err);
355         if(len) {
356           buf[len]='\0';
357           if(all) {
358             tmp = all;
359             all = g_strdup_printf("%s%s", tmp, buf);
360             g_free(tmp);
361           }
362           else {
363             all = g_strdup(buf);
364           }
365         }
366         else {
367           break;
368         }
369       }
370       if(all) {
371         new_conn(all, (GlobalInfo*)data);
372         g_free(all);
373       }
374       g_free(buf);
375     }
376     if(err) {
377       g_error("fifo_cb: %s", err->message);
378       g_free(err);
379       break;
380     }
381   } while((len) && (rv == G_IO_STATUS_NORMAL));
382   return TRUE;
383 }
384 
init_fifo(void)385 int init_fifo(void)
386 {
387   struct stat st;
388   const char *fifo = "hiper.fifo";
389   int socket;
390 
391   if(lstat(fifo, &st) == 0) {
392     if((st.st_mode & S_IFMT) == S_IFREG) {
393       errno = EEXIST;
394       perror("lstat");
395       exit(1);
396     }
397   }
398 
399   unlink(fifo);
400   if(mkfifo (fifo, 0600) == -1) {
401     perror("mkfifo");
402     exit(1);
403   }
404 
405   socket = open(fifo, O_RDWR | O_NONBLOCK, 0);
406 
407   if(socket == -1) {
408     perror("open");
409     exit(1);
410   }
411   MSG_OUT("Now, pipe some URL's into > %s\n", fifo);
412 
413   return socket;
414 }
415 
main(void)416 int main(void)
417 {
418   GlobalInfo *g = g_malloc0(sizeof(GlobalInfo));
419   GMainLoop*gmain;
420   int fd;
421   GIOChannel* ch;
422 
423   fd = init_fifo();
424   ch = g_io_channel_unix_new(fd);
425   g_io_add_watch(ch, G_IO_IN, fifo_cb, g);
426   gmain = g_main_loop_new(NULL, FALSE);
427   g->multi = curl_multi_init();
428   curl_multi_setopt(g->multi, CURLMOPT_SOCKETFUNCTION, sock_cb);
429   curl_multi_setopt(g->multi, CURLMOPT_SOCKETDATA, g);
430   curl_multi_setopt(g->multi, CURLMOPT_TIMERFUNCTION, update_timeout_cb);
431   curl_multi_setopt(g->multi, CURLMOPT_TIMERDATA, g);
432 
433   /* we do not call any curl_multi_socket*() function yet as we have no handles
434      added! */
435 
436   g_main_loop_run(gmain);
437   curl_multi_cleanup(g->multi);
438   return 0;
439 }
440