xref: /curl/lib/bufq.c (revision b1005d12)
1 /***************************************************************************
2  *                                  _   _ ____  _
3  *  Project                     ___| | | |  _ \| |
4  *                             / __| | | | |_) | |
5  *                            | (__| |_| |  _ <| |___
6  *                             \___|\___/|_| \_\_____|
7  *
8  * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
9  *
10  * This software is licensed as described in the file COPYING, which
11  * you should have received as part of this distribution. The terms
12  * are also available at https://curl.se/docs/copyright.html.
13  *
14  * You may opt to use, copy, modify, merge, publish, distribute and/or sell
15  * copies of the Software, and permit persons to whom the Software is
16  * furnished to do so, under the terms of the COPYING file.
17  *
18  * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
19  * KIND, either express or implied.
20  *
21  * SPDX-License-Identifier: curl
22  *
23  ***************************************************************************/
24 
25 #include "curl_setup.h"
26 #include "bufq.h"
27 
28 /* The last 3 #include files should be in this order */
29 #include "curl_printf.h"
30 #include "curl_memory.h"
31 #include "memdebug.h"
32 
chunk_is_empty(const struct buf_chunk * chunk)33 static bool chunk_is_empty(const struct buf_chunk *chunk)
34 {
35   return chunk->r_offset >= chunk->w_offset;
36 }
37 
chunk_is_full(const struct buf_chunk * chunk)38 static bool chunk_is_full(const struct buf_chunk *chunk)
39 {
40   return chunk->w_offset >= chunk->dlen;
41 }
42 
chunk_len(const struct buf_chunk * chunk)43 static size_t chunk_len(const struct buf_chunk *chunk)
44 {
45   return chunk->w_offset - chunk->r_offset;
46 }
47 
chunk_space(const struct buf_chunk * chunk)48 static size_t chunk_space(const struct buf_chunk *chunk)
49 {
50   return chunk->dlen - chunk->w_offset;
51 }
52 
chunk_reset(struct buf_chunk * chunk)53 static void chunk_reset(struct buf_chunk *chunk)
54 {
55   chunk->next = NULL;
56   chunk->r_offset = chunk->w_offset = 0;
57 }
58 
chunk_append(struct buf_chunk * chunk,const unsigned char * buf,size_t len)59 static size_t chunk_append(struct buf_chunk *chunk,
60                            const unsigned char *buf, size_t len)
61 {
62   unsigned char *p = &chunk->x.data[chunk->w_offset];
63   size_t n = chunk->dlen - chunk->w_offset;
64   DEBUGASSERT(chunk->dlen >= chunk->w_offset);
65   if(n) {
66     n = CURLMIN(n, len);
67     memcpy(p, buf, n);
68     chunk->w_offset += n;
69   }
70   return n;
71 }
72 
chunk_read(struct buf_chunk * chunk,unsigned char * buf,size_t len)73 static size_t chunk_read(struct buf_chunk *chunk,
74                          unsigned char *buf, size_t len)
75 {
76   unsigned char *p = &chunk->x.data[chunk->r_offset];
77   size_t n = chunk->w_offset - chunk->r_offset;
78   DEBUGASSERT(chunk->w_offset >= chunk->r_offset);
79   if(!n) {
80     return 0;
81   }
82   else if(n <= len) {
83     memcpy(buf, p, n);
84     chunk->r_offset = chunk->w_offset = 0;
85     return n;
86   }
87   else {
88     memcpy(buf, p, len);
89     chunk->r_offset += len;
90     return len;
91   }
92 }
93 
chunk_slurpn(struct buf_chunk * chunk,size_t max_len,Curl_bufq_reader * reader,void * reader_ctx,CURLcode * err)94 static ssize_t chunk_slurpn(struct buf_chunk *chunk, size_t max_len,
95                             Curl_bufq_reader *reader,
96                             void *reader_ctx, CURLcode *err)
97 {
98   unsigned char *p = &chunk->x.data[chunk->w_offset];
99   size_t n = chunk->dlen - chunk->w_offset; /* free amount */
100   ssize_t nread;
101 
102   DEBUGASSERT(chunk->dlen >= chunk->w_offset);
103   if(!n) {
104     *err = CURLE_AGAIN;
105     return -1;
106   }
107   if(max_len && n > max_len)
108     n = max_len;
109   nread = reader(reader_ctx, p, n, err);
110   if(nread > 0) {
111     DEBUGASSERT((size_t)nread <= n);
112     chunk->w_offset += nread;
113   }
114   return nread;
115 }
116 
chunk_peek(const struct buf_chunk * chunk,const unsigned char ** pbuf,size_t * plen)117 static void chunk_peek(const struct buf_chunk *chunk,
118                        const unsigned char **pbuf, size_t *plen)
119 {
120   DEBUGASSERT(chunk->w_offset >= chunk->r_offset);
121   *pbuf = &chunk->x.data[chunk->r_offset];
122   *plen = chunk->w_offset - chunk->r_offset;
123 }
124 
chunk_peek_at(const struct buf_chunk * chunk,size_t offset,const unsigned char ** pbuf,size_t * plen)125 static void chunk_peek_at(const struct buf_chunk *chunk, size_t offset,
126                           const unsigned char **pbuf, size_t *plen)
127 {
128   offset += chunk->r_offset;
129   DEBUGASSERT(chunk->w_offset >= offset);
130   *pbuf = &chunk->x.data[offset];
131   *plen = chunk->w_offset - offset;
132 }
133 
chunk_skip(struct buf_chunk * chunk,size_t amount)134 static size_t chunk_skip(struct buf_chunk *chunk, size_t amount)
135 {
136   size_t n = chunk->w_offset - chunk->r_offset;
137   DEBUGASSERT(chunk->w_offset >= chunk->r_offset);
138   if(n) {
139     n = CURLMIN(n, amount);
140     chunk->r_offset += n;
141     if(chunk->r_offset == chunk->w_offset)
142       chunk->r_offset = chunk->w_offset = 0;
143   }
144   return n;
145 }
146 
chunk_list_free(struct buf_chunk ** anchor)147 static void chunk_list_free(struct buf_chunk **anchor)
148 {
149   struct buf_chunk *chunk;
150   while(*anchor) {
151     chunk = *anchor;
152     *anchor = chunk->next;
153     free(chunk);
154   }
155 }
156 
157 
158 
Curl_bufcp_init(struct bufc_pool * pool,size_t chunk_size,size_t spare_max)159 void Curl_bufcp_init(struct bufc_pool *pool,
160                      size_t chunk_size, size_t spare_max)
161 {
162   DEBUGASSERT(chunk_size > 0);
163   DEBUGASSERT(spare_max > 0);
164   memset(pool, 0, sizeof(*pool));
165   pool->chunk_size = chunk_size;
166   pool->spare_max = spare_max;
167 }
168 
bufcp_take(struct bufc_pool * pool,struct buf_chunk ** pchunk)169 static CURLcode bufcp_take(struct bufc_pool *pool,
170                            struct buf_chunk **pchunk)
171 {
172   struct buf_chunk *chunk = NULL;
173 
174   if(pool->spare) {
175     chunk = pool->spare;
176     pool->spare = chunk->next;
177     --pool->spare_count;
178     chunk_reset(chunk);
179     *pchunk = chunk;
180     return CURLE_OK;
181   }
182 
183   chunk = calloc(1, sizeof(*chunk) + pool->chunk_size);
184   if(!chunk) {
185     *pchunk = NULL;
186     return CURLE_OUT_OF_MEMORY;
187   }
188   chunk->dlen = pool->chunk_size;
189   *pchunk = chunk;
190   return CURLE_OK;
191 }
192 
bufcp_put(struct bufc_pool * pool,struct buf_chunk * chunk)193 static void bufcp_put(struct bufc_pool *pool,
194                       struct buf_chunk *chunk)
195 {
196   if(pool->spare_count >= pool->spare_max) {
197     free(chunk);
198   }
199   else {
200     chunk_reset(chunk);
201     chunk->next = pool->spare;
202     pool->spare = chunk;
203     ++pool->spare_count;
204   }
205 }
206 
Curl_bufcp_free(struct bufc_pool * pool)207 void Curl_bufcp_free(struct bufc_pool *pool)
208 {
209   chunk_list_free(&pool->spare);
210   pool->spare_count = 0;
211 }
212 
bufq_init(struct bufq * q,struct bufc_pool * pool,size_t chunk_size,size_t max_chunks,int opts)213 static void bufq_init(struct bufq *q, struct bufc_pool *pool,
214                       size_t chunk_size, size_t max_chunks, int opts)
215 {
216   DEBUGASSERT(chunk_size > 0);
217   DEBUGASSERT(max_chunks > 0);
218   memset(q, 0, sizeof(*q));
219   q->chunk_size = chunk_size;
220   q->max_chunks = max_chunks;
221   q->pool = pool;
222   q->opts = opts;
223 }
224 
Curl_bufq_init2(struct bufq * q,size_t chunk_size,size_t max_chunks,int opts)225 void Curl_bufq_init2(struct bufq *q, size_t chunk_size, size_t max_chunks,
226                      int opts)
227 {
228   bufq_init(q, NULL, chunk_size, max_chunks, opts);
229 }
230 
Curl_bufq_init(struct bufq * q,size_t chunk_size,size_t max_chunks)231 void Curl_bufq_init(struct bufq *q, size_t chunk_size, size_t max_chunks)
232 {
233   bufq_init(q, NULL, chunk_size, max_chunks, BUFQ_OPT_NONE);
234 }
235 
Curl_bufq_initp(struct bufq * q,struct bufc_pool * pool,size_t max_chunks,int opts)236 void Curl_bufq_initp(struct bufq *q, struct bufc_pool *pool,
237                      size_t max_chunks, int opts)
238 {
239   bufq_init(q, pool, pool->chunk_size, max_chunks, opts);
240 }
241 
Curl_bufq_free(struct bufq * q)242 void Curl_bufq_free(struct bufq *q)
243 {
244   chunk_list_free(&q->head);
245   chunk_list_free(&q->spare);
246   q->tail = NULL;
247   q->chunk_count = 0;
248 }
249 
Curl_bufq_reset(struct bufq * q)250 void Curl_bufq_reset(struct bufq *q)
251 {
252   struct buf_chunk *chunk;
253   while(q->head) {
254     chunk = q->head;
255     q->head = chunk->next;
256     chunk->next = q->spare;
257     q->spare = chunk;
258   }
259   q->tail = NULL;
260 }
261 
Curl_bufq_len(const struct bufq * q)262 size_t Curl_bufq_len(const struct bufq *q)
263 {
264   const struct buf_chunk *chunk = q->head;
265   size_t len = 0;
266   while(chunk) {
267     len += chunk_len(chunk);
268     chunk = chunk->next;
269   }
270   return len;
271 }
272 
Curl_bufq_space(const struct bufq * q)273 size_t Curl_bufq_space(const struct bufq *q)
274 {
275   size_t space = 0;
276   if(q->tail)
277     space += chunk_space(q->tail);
278   if(q->spare) {
279     struct buf_chunk *chunk = q->spare;
280     while(chunk) {
281       space += chunk->dlen;
282       chunk = chunk->next;
283     }
284   }
285   if(q->chunk_count < q->max_chunks) {
286     space += (q->max_chunks - q->chunk_count) * q->chunk_size;
287   }
288   return space;
289 }
290 
Curl_bufq_is_empty(const struct bufq * q)291 bool Curl_bufq_is_empty(const struct bufq *q)
292 {
293   return !q->head || chunk_is_empty(q->head);
294 }
295 
Curl_bufq_is_full(const struct bufq * q)296 bool Curl_bufq_is_full(const struct bufq *q)
297 {
298   if(!q->tail || q->spare)
299     return FALSE;
300   if(q->chunk_count < q->max_chunks)
301     return FALSE;
302   if(q->chunk_count > q->max_chunks)
303     return TRUE;
304   /* we have no spares and cannot make more, is the tail full? */
305   return chunk_is_full(q->tail);
306 }
307 
get_spare(struct bufq * q)308 static struct buf_chunk *get_spare(struct bufq *q)
309 {
310   struct buf_chunk *chunk = NULL;
311 
312   if(q->spare) {
313     chunk = q->spare;
314     q->spare = chunk->next;
315     chunk_reset(chunk);
316     return chunk;
317   }
318 
319   if(q->chunk_count >= q->max_chunks && (!(q->opts & BUFQ_OPT_SOFT_LIMIT)))
320     return NULL;
321 
322   if(q->pool) {
323     if(bufcp_take(q->pool, &chunk))
324       return NULL;
325     ++q->chunk_count;
326     return chunk;
327   }
328   else {
329     chunk = calloc(1, sizeof(*chunk) + q->chunk_size);
330     if(!chunk)
331       return NULL;
332     chunk->dlen = q->chunk_size;
333     ++q->chunk_count;
334     return chunk;
335   }
336 }
337 
prune_head(struct bufq * q)338 static void prune_head(struct bufq *q)
339 {
340   struct buf_chunk *chunk;
341 
342   while(q->head && chunk_is_empty(q->head)) {
343     chunk = q->head;
344     q->head = chunk->next;
345     if(q->tail == chunk)
346       q->tail = q->head;
347     if(q->pool) {
348       bufcp_put(q->pool, chunk);
349       --q->chunk_count;
350     }
351     else if((q->chunk_count > q->max_chunks) ||
352        (q->opts & BUFQ_OPT_NO_SPARES)) {
353       /* SOFT_LIMIT allowed us more than max. free spares until
354        * we are at max again. Or free them if we are configured
355        * to not use spares. */
356       free(chunk);
357       --q->chunk_count;
358     }
359     else {
360       chunk->next = q->spare;
361       q->spare = chunk;
362     }
363   }
364 }
365 
get_non_full_tail(struct bufq * q)366 static struct buf_chunk *get_non_full_tail(struct bufq *q)
367 {
368   struct buf_chunk *chunk;
369 
370   if(q->tail && !chunk_is_full(q->tail))
371     return q->tail;
372   chunk = get_spare(q);
373   if(chunk) {
374     /* new tail, and possibly new head */
375     if(q->tail) {
376       q->tail->next = chunk;
377       q->tail = chunk;
378     }
379     else {
380       DEBUGASSERT(!q->head);
381       q->head = q->tail = chunk;
382     }
383   }
384   return chunk;
385 }
386 
Curl_bufq_write(struct bufq * q,const unsigned char * buf,size_t len,CURLcode * err)387 ssize_t Curl_bufq_write(struct bufq *q,
388                         const unsigned char *buf, size_t len,
389                         CURLcode *err)
390 {
391   struct buf_chunk *tail;
392   ssize_t nwritten = 0;
393   size_t n;
394 
395   DEBUGASSERT(q->max_chunks > 0);
396   while(len) {
397     tail = get_non_full_tail(q);
398     if(!tail) {
399       if((q->chunk_count < q->max_chunks) || (q->opts & BUFQ_OPT_SOFT_LIMIT)) {
400         *err = CURLE_OUT_OF_MEMORY;
401         return -1;
402       }
403       break;
404     }
405     n = chunk_append(tail, buf, len);
406     if(!n)
407       break;
408     nwritten += n;
409     buf += n;
410     len -= n;
411   }
412   if(nwritten == 0 && len) {
413     *err = CURLE_AGAIN;
414     return -1;
415   }
416   *err = CURLE_OK;
417   return nwritten;
418 }
419 
Curl_bufq_cwrite(struct bufq * q,const char * buf,size_t len,size_t * pnwritten)420 CURLcode Curl_bufq_cwrite(struct bufq *q,
421                           const char *buf, size_t len,
422                           size_t *pnwritten)
423 {
424   ssize_t n;
425   CURLcode result;
426   n = Curl_bufq_write(q, (const unsigned char *)buf, len, &result);
427   *pnwritten = (n < 0)? 0 : (size_t)n;
428   return result;
429 }
430 
Curl_bufq_read(struct bufq * q,unsigned char * buf,size_t len,CURLcode * err)431 ssize_t Curl_bufq_read(struct bufq *q, unsigned char *buf, size_t len,
432                        CURLcode *err)
433 {
434   ssize_t nread = 0;
435   size_t n;
436 
437   *err = CURLE_OK;
438   while(len && q->head) {
439     n = chunk_read(q->head, buf, len);
440     if(n) {
441       nread += n;
442       buf += n;
443       len -= n;
444     }
445     prune_head(q);
446   }
447   if(nread == 0) {
448     *err = CURLE_AGAIN;
449     return -1;
450   }
451   return nread;
452 }
453 
Curl_bufq_cread(struct bufq * q,char * buf,size_t len,size_t * pnread)454 CURLcode Curl_bufq_cread(struct bufq *q, char *buf, size_t len,
455                          size_t *pnread)
456 {
457   ssize_t n;
458   CURLcode result;
459   n = Curl_bufq_read(q, (unsigned char *)buf, len, &result);
460   *pnread = (n < 0)? 0 : (size_t)n;
461   return result;
462 }
463 
Curl_bufq_peek(struct bufq * q,const unsigned char ** pbuf,size_t * plen)464 bool Curl_bufq_peek(struct bufq *q,
465                     const unsigned char **pbuf, size_t *plen)
466 {
467   if(q->head && chunk_is_empty(q->head)) {
468     prune_head(q);
469   }
470   if(q->head && !chunk_is_empty(q->head)) {
471     chunk_peek(q->head, pbuf, plen);
472     return TRUE;
473   }
474   *pbuf = NULL;
475   *plen = 0;
476   return FALSE;
477 }
478 
Curl_bufq_peek_at(struct bufq * q,size_t offset,const unsigned char ** pbuf,size_t * plen)479 bool Curl_bufq_peek_at(struct bufq *q, size_t offset,
480                        const unsigned char **pbuf, size_t *plen)
481 {
482   struct buf_chunk *c = q->head;
483   size_t clen;
484 
485   while(c) {
486     clen = chunk_len(c);
487     if(!clen)
488       break;
489     if(offset >= clen) {
490       offset -= clen;
491       c = c->next;
492       continue;
493     }
494     chunk_peek_at(c, offset, pbuf, plen);
495     return TRUE;
496   }
497   *pbuf = NULL;
498   *plen = 0;
499   return FALSE;
500 }
501 
Curl_bufq_skip(struct bufq * q,size_t amount)502 void Curl_bufq_skip(struct bufq *q, size_t amount)
503 {
504   size_t n;
505 
506   while(amount && q->head) {
507     n = chunk_skip(q->head, amount);
508     amount -= n;
509     prune_head(q);
510   }
511 }
512 
Curl_bufq_pass(struct bufq * q,Curl_bufq_writer * writer,void * writer_ctx,CURLcode * err)513 ssize_t Curl_bufq_pass(struct bufq *q, Curl_bufq_writer *writer,
514                        void *writer_ctx, CURLcode *err)
515 {
516   const unsigned char *buf;
517   size_t blen;
518   ssize_t nwritten = 0;
519 
520   while(Curl_bufq_peek(q, &buf, &blen)) {
521     ssize_t chunk_written;
522 
523     chunk_written = writer(writer_ctx, buf, blen, err);
524     if(chunk_written < 0) {
525       if(!nwritten || *err != CURLE_AGAIN) {
526         /* blocked on first write or real error, fail */
527         nwritten = -1;
528       }
529       break;
530     }
531     if(!chunk_written) {
532       if(!nwritten) {
533         /* treat as blocked */
534         *err = CURLE_AGAIN;
535         nwritten = -1;
536       }
537       break;
538     }
539     Curl_bufq_skip(q, (size_t)chunk_written);
540     nwritten += chunk_written;
541   }
542   return nwritten;
543 }
544 
Curl_bufq_write_pass(struct bufq * q,const unsigned char * buf,size_t len,Curl_bufq_writer * writer,void * writer_ctx,CURLcode * err)545 ssize_t Curl_bufq_write_pass(struct bufq *q,
546                              const unsigned char *buf, size_t len,
547                              Curl_bufq_writer *writer, void *writer_ctx,
548                              CURLcode *err)
549 {
550   ssize_t nwritten = 0, n;
551 
552   *err = CURLE_OK;
553   while(len) {
554     if(Curl_bufq_is_full(q)) {
555       /* try to make room in case we are full */
556       n = Curl_bufq_pass(q, writer, writer_ctx, err);
557       if(n < 0) {
558         if(*err != CURLE_AGAIN) {
559           /* real error, fail */
560           return -1;
561         }
562         /* would block, bufq is full, give up */
563         break;
564       }
565     }
566 
567     /* Add whatever is remaining now to bufq */
568     n = Curl_bufq_write(q, buf, len, err);
569     if(n < 0) {
570       if(*err != CURLE_AGAIN) {
571         /* real error, fail */
572         return -1;
573       }
574       /* no room in bufq */
575       break;
576     }
577     /* edge case of writer returning 0 (and len is >0)
578      * break or we might enter an infinite loop here */
579     if(n == 0)
580       break;
581 
582     /* Maybe only part of `data` has been added, continue to loop */
583     buf += (size_t)n;
584     len -= (size_t)n;
585     nwritten += (size_t)n;
586   }
587 
588   if(!nwritten && len) {
589     *err = CURLE_AGAIN;
590     return -1;
591   }
592   *err = CURLE_OK;
593   return nwritten;
594 }
595 
Curl_bufq_sipn(struct bufq * q,size_t max_len,Curl_bufq_reader * reader,void * reader_ctx,CURLcode * err)596 ssize_t Curl_bufq_sipn(struct bufq *q, size_t max_len,
597                        Curl_bufq_reader *reader, void *reader_ctx,
598                        CURLcode *err)
599 {
600   struct buf_chunk *tail = NULL;
601   ssize_t nread;
602 
603   *err = CURLE_AGAIN;
604   tail = get_non_full_tail(q);
605   if(!tail) {
606     if(q->chunk_count < q->max_chunks) {
607       *err = CURLE_OUT_OF_MEMORY;
608       return -1;
609     }
610     /* full, blocked */
611     *err = CURLE_AGAIN;
612     return -1;
613   }
614 
615   nread = chunk_slurpn(tail, max_len, reader, reader_ctx, err);
616   if(nread < 0) {
617     return -1;
618   }
619   else if(nread == 0) {
620     /* eof */
621     *err = CURLE_OK;
622   }
623   return nread;
624 }
625 
626 /**
627  * Read up to `max_len` bytes and append it to the end of the buffer queue.
628  * if `max_len` is 0, no limit is imposed and the call behaves exactly
629  * the same as `Curl_bufq_slurp()`.
630  * Returns the total amount of buf read (may be 0) or -1 on other
631  * reader errors.
632  * Note that even in case of a -1 chunks may have been read and
633  * the buffer queue will have different length than before.
634  */
bufq_slurpn(struct bufq * q,size_t max_len,Curl_bufq_reader * reader,void * reader_ctx,CURLcode * err)635 static ssize_t bufq_slurpn(struct bufq *q, size_t max_len,
636                            Curl_bufq_reader *reader, void *reader_ctx,
637                            CURLcode *err)
638 {
639   ssize_t nread = 0, n;
640 
641   *err = CURLE_AGAIN;
642   while(1) {
643 
644     n = Curl_bufq_sipn(q, max_len, reader, reader_ctx, err);
645     if(n < 0) {
646       if(!nread || *err != CURLE_AGAIN) {
647         /* blocked on first read or real error, fail */
648         nread = -1;
649       }
650       else
651         *err = CURLE_OK;
652       break;
653     }
654     else if(n == 0) {
655       /* eof */
656       *err = CURLE_OK;
657       break;
658     }
659     nread += (size_t)n;
660     if(max_len) {
661       DEBUGASSERT((size_t)n <= max_len);
662       max_len -= (size_t)n;
663       if(!max_len)
664         break;
665     }
666     /* give up slurping when we get less bytes than we asked for */
667     if(q->tail && !chunk_is_full(q->tail))
668       break;
669   }
670   return nread;
671 }
672 
Curl_bufq_slurp(struct bufq * q,Curl_bufq_reader * reader,void * reader_ctx,CURLcode * err)673 ssize_t Curl_bufq_slurp(struct bufq *q, Curl_bufq_reader *reader,
674                         void *reader_ctx, CURLcode *err)
675 {
676   return bufq_slurpn(q, 0, reader, reader_ctx, err);
677 }
678