xref: /curl/docs/examples/ghiper.c (revision fbf5d507)
1 /***************************************************************************
2  *                                  _   _ ____  _
3  *  Project                     ___| | | |  _ \| |
4  *                             / __| | | | |_) | |
5  *                            | (__| |_| |  _ <| |___
6  *                             \___|\___/|_| \_\_____|
7  *
8  * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
9  *
10  * This software is licensed as described in the file COPYING, which
11  * you should have received as part of this distribution. The terms
12  * are also available at https://curl.se/docs/copyright.html.
13  *
14  * You may opt to use, copy, modify, merge, publish, distribute and/or sell
15  * copies of the Software, and permit persons to whom the Software is
16  * furnished to do so, under the terms of the COPYING file.
17  *
18  * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
19  * KIND, either express or implied.
20  *
21  * SPDX-License-Identifier: curl
22  *
23  ***************************************************************************/
24 /* <DESC>
25  * multi socket API usage together with glib2
26  * </DESC>
27  */
28 /* Example application source code using the multi socket interface to
29  * download many files at once.
30  *
31  * Written by Jeff Pohlmeyer
32 
33  Requires glib-2.x and a (POSIX?) system that has mkfifo().
34 
35  This is an adaptation of libcurl's "hipev.c" and libevent's "event-test.c"
36  sample programs, adapted to use glib's g_io_channel in place of libevent.
37 
38  When running, the program creates the named pipe "hiper.fifo"
39 
40  Whenever there is input into the fifo, the program reads the input as a list
41  of URL's and creates some new easy handles to fetch each URL via the
42  curl_multi "hiper" API.
43 
44 
45  Thus, you can try a single URL:
46  % echo http://www.yahoo.com > hiper.fifo
47 
48  Or a whole bunch of them:
49  % cat my-url-list > hiper.fifo
50 
51  The fifo buffer is handled almost instantly, so you can even add more URL's
52  while the previous requests are still being downloaded.
53 
54  This is purely a demo app, all retrieved data is simply discarded by the write
55  callback.
56 
57 */
58 
59 #include <glib.h>
60 #include <sys/stat.h>
61 #include <unistd.h>
62 #include <fcntl.h>
63 #include <stdlib.h>
64 #include <stdio.h>
65 #include <errno.h>
66 #include <curl/curl.h>
67 
68 #define MSG_OUT g_print   /* Change to "g_error" to write to stderr */
69 #define SHOW_VERBOSE 0    /* Set to non-zero for libcurl messages */
70 #define SHOW_PROGRESS 0   /* Set to non-zero to enable progress callback */
71 
72 /* Global information, common to all connections */
73 typedef struct _GlobalInfo {
74   CURLM *multi;
75   guint timer_event;
76   int still_running;
77 } GlobalInfo;
78 
79 /* Information associated with a specific easy handle */
80 typedef struct _ConnInfo {
81   CURL *easy;
82   char *url;
83   GlobalInfo *global;
84   char error[CURL_ERROR_SIZE];
85 } ConnInfo;
86 
87 /* Information associated with a specific socket */
88 typedef struct _SockInfo {
89   curl_socket_t sockfd;
90   CURL *easy;
91   int action;
92   long timeout;
93   GIOChannel *ch;
94   guint ev;
95   GlobalInfo *global;
96 } SockInfo;
97 
98 /* Die if we get a bad CURLMcode somewhere */
mcode_or_die(const char * where,CURLMcode code)99 static void mcode_or_die(const char *where, CURLMcode code)
100 {
101   if(CURLM_OK != code) {
102     const char *s;
103     switch(code) {
104     case     CURLM_BAD_HANDLE:         s = "CURLM_BAD_HANDLE";         break;
105     case     CURLM_BAD_EASY_HANDLE:    s = "CURLM_BAD_EASY_HANDLE";    break;
106     case     CURLM_OUT_OF_MEMORY:      s = "CURLM_OUT_OF_MEMORY";      break;
107     case     CURLM_INTERNAL_ERROR:     s = "CURLM_INTERNAL_ERROR";     break;
108     case     CURLM_BAD_SOCKET:         s = "CURLM_BAD_SOCKET";         break;
109     case     CURLM_UNKNOWN_OPTION:     s = "CURLM_UNKNOWN_OPTION";     break;
110     case     CURLM_LAST:               s = "CURLM_LAST";               break;
111     default: s = "CURLM_unknown";
112     }
113     MSG_OUT("ERROR: %s returns %s\n", where, s);
114     exit(code);
115   }
116 }
117 
118 /* Check for completed transfers, and remove their easy handles */
check_multi_info(GlobalInfo * g)119 static void check_multi_info(GlobalInfo *g)
120 {
121   char *eff_url;
122   CURLMsg *msg;
123   int msgs_left;
124   ConnInfo *conn;
125   CURL *easy;
126   CURLcode res;
127 
128   MSG_OUT("REMAINING: %d\n", g->still_running);
129   while((msg = curl_multi_info_read(g->multi, &msgs_left))) {
130     if(msg->msg == CURLMSG_DONE) {
131       easy = msg->easy_handle;
132       res = msg->data.result;
133       curl_easy_getinfo(easy, CURLINFO_PRIVATE, &conn);
134       curl_easy_getinfo(easy, CURLINFO_EFFECTIVE_URL, &eff_url);
135       MSG_OUT("DONE: %s => (%d) %s\n", eff_url, res, conn->error);
136       curl_multi_remove_handle(g->multi, easy);
137       free(conn->url);
138       curl_easy_cleanup(easy);
139       free(conn);
140     }
141   }
142 }
143 
144 /* Called by glib when our timeout expires */
timer_cb(gpointer data)145 static gboolean timer_cb(gpointer data)
146 {
147   GlobalInfo *g = (GlobalInfo *)data;
148   CURLMcode rc;
149 
150   rc = curl_multi_socket_action(g->multi,
151                                 CURL_SOCKET_TIMEOUT, 0, &g->still_running);
152   mcode_or_die("timer_cb: curl_multi_socket_action", rc);
153   check_multi_info(g);
154   return FALSE;
155 }
156 
157 /* Update the event timer after curl_multi library calls */
update_timeout_cb(CURLM * multi,long timeout_ms,void * userp)158 static int update_timeout_cb(CURLM *multi, long timeout_ms, void *userp)
159 {
160   struct timeval timeout;
161   GlobalInfo *g = (GlobalInfo *)userp;
162   timeout.tv_sec = timeout_ms/1000;
163   timeout.tv_usec = (timeout_ms%1000)*1000;
164 
165   MSG_OUT("*** update_timeout_cb %ld => %ld:%ld ***\n",
166           timeout_ms, timeout.tv_sec, timeout.tv_usec);
167 
168   /*
169    * if timeout_ms is -1, just delete the timer
170    *
171    * For other values of timeout_ms, this should set or *update* the timer to
172    * the new value
173    */
174   if(timeout_ms >= 0)
175     g->timer_event = g_timeout_add(timeout_ms, timer_cb, g);
176   return 0;
177 }
178 
179 /* Called by glib when we get action on a multi socket */
event_cb(GIOChannel * ch,GIOCondition condition,gpointer data)180 static gboolean event_cb(GIOChannel *ch, GIOCondition condition, gpointer data)
181 {
182   GlobalInfo *g = (GlobalInfo*) data;
183   CURLMcode rc;
184   int fd = g_io_channel_unix_get_fd(ch);
185 
186   int action =
187     ((condition & G_IO_IN) ? CURL_CSELECT_IN : 0) |
188     ((condition & G_IO_OUT) ? CURL_CSELECT_OUT : 0);
189 
190   rc = curl_multi_socket_action(g->multi, fd, action, &g->still_running);
191   mcode_or_die("event_cb: curl_multi_socket_action", rc);
192 
193   check_multi_info(g);
194   if(g->still_running) {
195     return TRUE;
196   }
197   else {
198     MSG_OUT("last transfer done, kill timeout\n");
199     if(g->timer_event) {
200       g_source_remove(g->timer_event);
201     }
202     return FALSE;
203   }
204 }
205 
206 /* Clean up the SockInfo structure */
remsock(SockInfo * f)207 static void remsock(SockInfo *f)
208 {
209   if(!f) {
210     return;
211   }
212   if(f->ev) {
213     g_source_remove(f->ev);
214   }
215   g_free(f);
216 }
217 
218 /* Assign information to a SockInfo structure */
setsock(SockInfo * f,curl_socket_t s,CURL * e,int act,GlobalInfo * g)219 static void setsock(SockInfo *f, curl_socket_t s, CURL *e, int act,
220                     GlobalInfo *g)
221 {
222   GIOCondition kind =
223     ((act & CURL_POLL_IN) ? G_IO_IN : 0) |
224     ((act & CURL_POLL_OUT) ? G_IO_OUT : 0);
225 
226   f->sockfd = s;
227   f->action = act;
228   f->easy = e;
229   if(f->ev) {
230     g_source_remove(f->ev);
231   }
232   f->ev = g_io_add_watch(f->ch, kind, event_cb, g);
233 }
234 
235 /* Initialize a new SockInfo structure */
addsock(curl_socket_t s,CURL * easy,int action,GlobalInfo * g)236 static void addsock(curl_socket_t s, CURL *easy, int action, GlobalInfo *g)
237 {
238   SockInfo *fdp = g_malloc0(sizeof(SockInfo));
239 
240   fdp->global = g;
241   fdp->ch = g_io_channel_unix_new(s);
242   setsock(fdp, s, easy, action, g);
243   curl_multi_assign(g->multi, s, fdp);
244 }
245 
246 /* CURLMOPT_SOCKETFUNCTION */
sock_cb(CURL * e,curl_socket_t s,int what,void * cbp,void * sockp)247 static int sock_cb(CURL *e, curl_socket_t s, int what, void *cbp, void *sockp)
248 {
249   GlobalInfo *g = (GlobalInfo*) cbp;
250   SockInfo *fdp = (SockInfo*) sockp;
251   static const char *whatstr[]={ "none", "IN", "OUT", "INOUT", "REMOVE" };
252 
253   MSG_OUT("socket callback: s=%d e=%p what=%s ", s, e, whatstr[what]);
254   if(what == CURL_POLL_REMOVE) {
255     MSG_OUT("\n");
256     remsock(fdp);
257   }
258   else {
259     if(!fdp) {
260       MSG_OUT("Adding data: %s%s\n",
261               (what & CURL_POLL_IN) ? "READ" : "",
262               (what & CURL_POLL_OUT) ? "WRITE" : "");
263       addsock(s, e, what, g);
264     }
265     else {
266       MSG_OUT(
267         "Changing action from %d to %d\n", fdp->action, what);
268       setsock(fdp, s, e, what, g);
269     }
270   }
271   return 0;
272 }
273 
274 /* CURLOPT_WRITEFUNCTION */
write_cb(void * ptr,size_t size,size_t nmemb,void * data)275 static size_t write_cb(void *ptr, size_t size, size_t nmemb, void *data)
276 {
277   size_t realsize = size * nmemb;
278   ConnInfo *conn = (ConnInfo*) data;
279   (void)ptr;
280   (void)conn;
281   return realsize;
282 }
283 
284 /* CURLOPT_PROGRESSFUNCTION */
prog_cb(void * p,double dltotal,double dlnow,double ult,double uln)285 static int prog_cb(void *p, double dltotal, double dlnow, double ult,
286                    double uln)
287 {
288   ConnInfo *conn = (ConnInfo *)p;
289   MSG_OUT("Progress: %s (%g/%g)\n", conn->url, dlnow, dltotal);
290   return 0;
291 }
292 
293 /* Create a new easy handle, and add it to the global curl_multi */
new_conn(char * url,GlobalInfo * g)294 static void new_conn(char *url, GlobalInfo *g)
295 {
296   ConnInfo *conn;
297   CURLMcode rc;
298 
299   conn = g_malloc0(sizeof(ConnInfo));
300   conn->error[0]='\0';
301   conn->easy = curl_easy_init();
302   if(!conn->easy) {
303     MSG_OUT("curl_easy_init() failed, exiting!\n");
304     exit(2);
305   }
306   conn->global = g;
307   conn->url = g_strdup(url);
308   curl_easy_setopt(conn->easy, CURLOPT_URL, conn->url);
309   curl_easy_setopt(conn->easy, CURLOPT_WRITEFUNCTION, write_cb);
310   curl_easy_setopt(conn->easy, CURLOPT_WRITEDATA, &conn);
311   curl_easy_setopt(conn->easy, CURLOPT_VERBOSE, (long)SHOW_VERBOSE);
312   curl_easy_setopt(conn->easy, CURLOPT_ERRORBUFFER, conn->error);
313   curl_easy_setopt(conn->easy, CURLOPT_PRIVATE, conn);
314   curl_easy_setopt(conn->easy, CURLOPT_NOPROGRESS, SHOW_PROGRESS ? 0L : 1L);
315   curl_easy_setopt(conn->easy, CURLOPT_PROGRESSFUNCTION, prog_cb);
316   curl_easy_setopt(conn->easy, CURLOPT_PROGRESSDATA, conn);
317   curl_easy_setopt(conn->easy, CURLOPT_FOLLOWLOCATION, 1L);
318   curl_easy_setopt(conn->easy, CURLOPT_CONNECTTIMEOUT, 30L);
319   curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_LIMIT, 1L);
320   curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_TIME, 30L);
321 
322   MSG_OUT("Adding easy %p to multi %p (%s)\n", conn->easy, g->multi, url);
323   rc = curl_multi_add_handle(g->multi, conn->easy);
324   mcode_or_die("new_conn: curl_multi_add_handle", rc);
325 
326   /* note that add_handle() sets a timeout to trigger soon so that the
327      necessary socket_action() gets called */
328 }
329 
330 /* This gets called by glib whenever data is received from the fifo */
fifo_cb(GIOChannel * ch,GIOCondition condition,gpointer data)331 static gboolean fifo_cb(GIOChannel *ch, GIOCondition condition, gpointer data)
332 {
333 #define BUF_SIZE 1024
334   gsize len, tp;
335   gchar *buf, *tmp, *all = NULL;
336   GIOStatus rv;
337 
338   do {
339     GError *err = NULL;
340     rv = g_io_channel_read_line(ch, &buf, &len, &tp, &err);
341     if(buf) {
342       if(tp) {
343         buf[tp]='\0';
344       }
345       new_conn(buf, (GlobalInfo*)data);
346       g_free(buf);
347     }
348     else {
349       buf = g_malloc(BUF_SIZE + 1);
350       while(TRUE) {
351         buf[BUF_SIZE]='\0';
352         g_io_channel_read_chars(ch, buf, BUF_SIZE, &len, &err);
353         if(len) {
354           buf[len]='\0';
355           if(all) {
356             tmp = all;
357             all = g_strdup_printf("%s%s", tmp, buf);
358             g_free(tmp);
359           }
360           else {
361             all = g_strdup(buf);
362           }
363         }
364         else {
365           break;
366         }
367       }
368       if(all) {
369         new_conn(all, (GlobalInfo*)data);
370         g_free(all);
371       }
372       g_free(buf);
373     }
374     if(err) {
375       g_error("fifo_cb: %s", err->message);
376       g_free(err);
377       break;
378     }
379   } while((len) && (rv == G_IO_STATUS_NORMAL));
380   return TRUE;
381 }
382 
init_fifo(void)383 int init_fifo(void)
384 {
385   struct stat st;
386   const char *fifo = "hiper.fifo";
387   int socket;
388 
389   if(lstat (fifo, &st) == 0) {
390     if((st.st_mode & S_IFMT) == S_IFREG) {
391       errno = EEXIST;
392       perror("lstat");
393       exit(1);
394     }
395   }
396 
397   unlink(fifo);
398   if(mkfifo (fifo, 0600) == -1) {
399     perror("mkfifo");
400     exit(1);
401   }
402 
403   socket = open(fifo, O_RDWR | O_NONBLOCK, 0);
404 
405   if(socket == -1) {
406     perror("open");
407     exit(1);
408   }
409   MSG_OUT("Now, pipe some URL's into > %s\n", fifo);
410 
411   return socket;
412 }
413 
main(int argc,char ** argv)414 int main(int argc, char **argv)
415 {
416   GlobalInfo *g;
417   GMainLoop*gmain;
418   int fd;
419   GIOChannel* ch;
420   g = g_malloc0(sizeof(GlobalInfo));
421 
422   fd = init_fifo();
423   ch = g_io_channel_unix_new(fd);
424   g_io_add_watch(ch, G_IO_IN, fifo_cb, g);
425   gmain = g_main_loop_new(NULL, FALSE);
426   g->multi = curl_multi_init();
427   curl_multi_setopt(g->multi, CURLMOPT_SOCKETFUNCTION, sock_cb);
428   curl_multi_setopt(g->multi, CURLMOPT_SOCKETDATA, g);
429   curl_multi_setopt(g->multi, CURLMOPT_TIMERFUNCTION, update_timeout_cb);
430   curl_multi_setopt(g->multi, CURLMOPT_TIMERDATA, g);
431 
432   /* we do not call any curl_multi_socket*() function yet as we have no handles
433      added! */
434 
435   g_main_loop_run(gmain);
436   curl_multi_cleanup(g->multi);
437   return 0;
438 }
439