source: trunk/debian/packages/libubox/trunk/ustream.c @ 857

Last change on this file since 857 was 857, checked in by amain, 4 years ago

libubox: initial import / part 4

File size: 10.4 KB
Line 
1/*
2 * ustream - library for stream buffer management
3 *
4 * Copyright (C) 2012 Felix Fietkau <nbd@openwrt.org>
5 *
6 * Permission to use, copy, modify, and/or distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
9 *
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18
19#include <stdlib.h>
20#include <string.h>
21#include <unistd.h>
22#include <stdio.h>
23#include <stdarg.h>
24
25#include "ustream.h"
26
27static void ustream_init_buf(struct ustream_buf *buf, int len)
28{
29        if (!len)
30                abort();
31
32        memset(buf, 0, sizeof(*buf));
33        buf->data = buf->tail = buf->head;
34        buf->end = buf->head + len;
35        *buf->head = 0;
36}
37
38static void ustream_add_buf(struct ustream_buf_list *l, struct ustream_buf *buf)
39{
40        l->buffers++;
41        if (!l->tail)
42                l->head = buf;
43        else
44                l->tail->next = buf;
45
46        buf->next = NULL;
47        l->tail = buf;
48        if (!l->data_tail)
49                l->data_tail = l->head;
50}
51
52static bool ustream_can_alloc(struct ustream_buf_list *l)
53{
54        if (l->max_buffers <= 0)
55                return true;
56
57        return (l->buffers < l->max_buffers);
58}
59
60static int ustream_alloc_default(struct ustream *s, struct ustream_buf_list *l)
61{
62        struct ustream_buf *buf;
63
64        if (!ustream_can_alloc(l))
65                return -1;
66
67        buf = malloc(sizeof(*buf) + l->buffer_len + s->string_data);
68        if (!buf)
69                return -1;
70
71        ustream_init_buf(buf, l->buffer_len);
72        ustream_add_buf(l, buf);
73
74        return 0;
75}
76
77static void ustream_free_buffers(struct ustream_buf_list *l)
78{
79        struct ustream_buf *buf = l->head;
80
81        while (buf) {
82                struct ustream_buf *next = buf->next;
83
84                free(buf);
85                buf = next;
86        }
87        l->head = NULL;
88        l->tail = NULL;
89        l->data_tail = NULL;
90}
91
92void ustream_free(struct ustream *s)
93{
94        if (s->free)
95                s->free(s);
96
97        uloop_timeout_cancel(&s->state_change);
98        ustream_free_buffers(&s->r);
99        ustream_free_buffers(&s->w);
100}
101
102static void ustream_state_change_cb(struct uloop_timeout *t)
103{
104        struct ustream *s = container_of(t, struct ustream, state_change);
105
106        if (s->write_error)
107                ustream_free_buffers(&s->w);
108        if (s->notify_state)
109                s->notify_state(s);
110}
111
112void ustream_init_defaults(struct ustream *s)
113{
114#define DEFAULT_SET(_f, _default)       \
115        do {                            \
116                if (!_f)                \
117                        _f = _default;  \
118        } while(0)
119
120        DEFAULT_SET(s->r.alloc, ustream_alloc_default);
121        DEFAULT_SET(s->w.alloc, ustream_alloc_default);
122
123        DEFAULT_SET(s->r.min_buffers, 1);
124        DEFAULT_SET(s->r.max_buffers, 1);
125        DEFAULT_SET(s->r.buffer_len, 4096);
126
127        DEFAULT_SET(s->w.min_buffers, 2);
128        DEFAULT_SET(s->w.max_buffers, -1);
129        DEFAULT_SET(s->w.buffer_len, 256);
130
131#undef DEFAULT_SET
132
133        s->state_change.cb = ustream_state_change_cb;
134        s->write_error = false;
135        s->eof = false;
136        s->eof_write_done = false;
137        s->read_blocked = 0;
138
139        s->r.buffers = 0;
140        s->r.data_bytes = 0;
141
142        s->w.buffers = 0;
143        s->w.data_bytes = 0;
144}
145
146static bool ustream_should_move(struct ustream_buf_list *l, struct ustream_buf *buf, int len)
147{
148        int maxlen;
149        int offset;
150
151        /* nothing to squeeze */
152        if (buf->data == buf->head)
153                return false;
154
155        maxlen = buf->end - buf->head;
156        offset = buf->data - buf->head;
157
158        /* less than half is available */
159        if (offset > maxlen / 2)
160                return true;
161
162        /* less than 32 bytes data but takes more than 1/4 space */
163        if (buf->tail - buf->data < 32 && offset > maxlen / 4)
164                return true;
165
166        /* more buf is already in list or can be allocated */
167        if (buf != l->tail || ustream_can_alloc(l))
168                return false;
169
170        /* no need to move if len is available at the tail */
171        return (buf->end - buf->tail < len);
172}
173
174static void ustream_free_buf(struct ustream_buf_list *l, struct ustream_buf *buf)
175{
176        if (buf == l->head)
177                l->head = buf->next;
178
179        if (buf == l->data_tail)
180                l->data_tail = buf->next;
181
182        if (buf == l->tail)
183                l->tail = NULL;
184
185        if (--l->buffers >= l->min_buffers) {
186                free(buf);
187                return;
188        }
189
190        /* recycle */
191        ustream_init_buf(buf, buf->end - buf->head);
192        ustream_add_buf(l, buf);
193}
194
195static void __ustream_set_read_blocked(struct ustream *s, unsigned char val)
196{
197        bool changed = !!s->read_blocked != !!val;
198
199        s->read_blocked = val;
200        if (changed)
201                s->set_read_blocked(s);
202}
203
204void ustream_set_read_blocked(struct ustream *s, bool set)
205{
206        unsigned char val = s->read_blocked & ~READ_BLOCKED_USER;
207
208        if (set)
209                val |= READ_BLOCKED_USER;
210
211        __ustream_set_read_blocked(s, val);
212}
213
214void ustream_consume(struct ustream *s, int len)
215{
216        struct ustream_buf *buf = s->r.head;
217
218        if (!len)
219                return;
220
221        s->r.data_bytes -= len;
222        if (s->r.data_bytes < 0)
223                abort();
224
225        do {
226                struct ustream_buf *next = buf->next;
227                int buf_len = buf->tail - buf->data;
228
229                if (len < buf_len) {
230                        buf->data += len;
231                        break;
232                }
233
234                len -= buf_len;
235                ustream_free_buf(&s->r, buf);
236                buf = next;
237        } while(len);
238
239        __ustream_set_read_blocked(s, s->read_blocked & ~READ_BLOCKED_FULL);
240}
241
242static void ustream_fixup_string(struct ustream *s, struct ustream_buf *buf)
243{
244        if (!s->string_data)
245                return;
246
247        *buf->tail = 0;
248}
249
250static bool ustream_prepare_buf(struct ustream *s, struct ustream_buf_list *l, int len)
251{
252        struct ustream_buf *buf;
253
254        buf = l->data_tail;
255        if (buf) {
256                if (ustream_should_move(l, buf, len)) {
257                        int len = buf->tail - buf->data;
258
259                        memmove(buf->head, buf->data, len);
260                        buf->data = buf->head;
261                        buf->tail = buf->data + len;
262
263                        if (l == &s->r)
264                                ustream_fixup_string(s, buf);
265                }
266                /* some chunks available at the tail */
267                if (buf->tail != buf->end)
268                        return true;
269                /* next buf available */
270                if (buf->next) {
271                        l->data_tail = buf->next;
272                        return true;
273                }
274        }
275
276        if (!ustream_can_alloc(l))
277                return false;
278
279        if (l->alloc(s, l) < 0)
280                return false;
281
282        l->data_tail = l->tail;
283        return true;
284}
285
286char *ustream_reserve(struct ustream *s, int len, int *maxlen)
287{
288        struct ustream_buf *buf;
289
290        if (!ustream_prepare_buf(s, &s->r, len)) {
291                __ustream_set_read_blocked(s, s->read_blocked | READ_BLOCKED_FULL);
292                *maxlen = 0;
293                return NULL;
294        }
295
296        buf = s->r.data_tail;
297        *maxlen = buf->end - buf->tail;
298        return buf->tail;
299}
300
301void ustream_fill_read(struct ustream *s, int len)
302{
303        struct ustream_buf *buf = s->r.data_tail;
304        int n = len;
305        int maxlen;
306
307        s->r.data_bytes += len;
308        do {
309                if (!buf)
310                        abort();
311
312                maxlen = buf->end - buf->tail;
313                if (len < maxlen)
314                        maxlen = len;
315
316                len -= maxlen;
317                buf->tail += maxlen;
318                ustream_fixup_string(s, buf);
319
320                s->r.data_tail = buf;
321                buf = buf->next;
322        } while (len);
323
324        if (s->notify_read)
325                s->notify_read(s, n);
326}
327
328char *ustream_get_read_buf(struct ustream *s, int *buflen)
329{
330        char *data = NULL;
331        int len = 0;
332
333        if (s->r.head) {
334                len = s->r.head->tail - s->r.head->data;
335                if (len > 0)
336                        data = s->r.head->data;
337        }
338
339        if (buflen)
340                *buflen = len;
341
342        return data;
343}
344
345int ustream_read(struct ustream *s, char *buf, int buflen)
346{
347        char *chunk;
348        int chunk_len;
349        int len = 0;
350
351        do {
352                chunk = ustream_get_read_buf(s, &chunk_len);
353                if (!chunk)
354                        break;
355                if (chunk_len > buflen - len)
356                        chunk_len = buflen - len;
357                memcpy(buf + len, chunk, chunk_len);
358                ustream_consume(s, chunk_len);
359                len += chunk_len;
360        } while (len < buflen);
361
362        return len;
363}
364
365static void ustream_write_error(struct ustream *s)
366{
367        if (!s->write_error)
368                ustream_state_change(s);
369        s->write_error = true;
370}
371
372bool ustream_write_pending(struct ustream *s)
373{
374        struct ustream_buf *buf = s->w.head;
375        int wr = 0, len;
376
377        if (s->write_error)
378                return false;
379
380        while (buf && s->w.data_bytes) {
381                struct ustream_buf *next = buf->next;
382                int maxlen = buf->tail - buf->data;
383
384                len = s->write(s, buf->data, maxlen, !!buf->next);
385                if (len < 0) {
386                        ustream_write_error(s);
387                        break;
388                }
389
390                if (len == 0)
391                        break;
392
393                wr += len;
394                s->w.data_bytes -= len;
395                if (len < maxlen) {
396                        buf->data += len;
397                        break;
398                }
399
400                ustream_free_buf(&s->w, buf);
401                buf = next;
402        }
403
404        if (s->notify_write)
405                s->notify_write(s, wr);
406
407        if (s->eof && wr && !s->w.data_bytes)
408                ustream_state_change(s);
409
410        return !s->w.data_bytes;
411}
412
413static int ustream_write_buffered(struct ustream *s, const char *data, int len, int wr)
414{
415        struct ustream_buf_list *l = &s->w;
416        struct ustream_buf *buf;
417        int maxlen;
418
419        while (len) {
420                if (!ustream_prepare_buf(s, &s->w, len))
421                        break;
422
423                buf = l->data_tail;
424
425                maxlen = buf->end - buf->tail;
426                if (maxlen > len)
427                        maxlen = len;
428
429                memcpy(buf->tail, data, maxlen);
430                buf->tail += maxlen;
431                data += maxlen;
432                len -= maxlen;
433                wr += maxlen;
434                l->data_bytes += maxlen;
435        }
436
437        return wr;
438}
439
440int ustream_write(struct ustream *s, const char *data, int len, bool more)
441{
442        struct ustream_buf_list *l = &s->w;
443        int wr = 0;
444
445        if (s->write_error)
446                return 0;
447
448        if (!l->data_bytes) {
449                wr = s->write(s, data, len, more);
450                if (wr == len)
451                        return wr;
452
453                if (wr < 0) {
454                        ustream_write_error(s);
455                        return wr;
456                }
457
458                data += wr;
459                len -= wr;
460        }
461
462        return ustream_write_buffered(s, data, len, wr);
463}
464
465#define MAX_STACK_BUFLEN        256
466
467int ustream_vprintf(struct ustream *s, const char *format, va_list arg)
468{
469        struct ustream_buf_list *l = &s->w;
470        char *buf;
471        va_list arg2;
472        int wr, maxlen, buflen;
473
474        if (s->write_error)
475                return 0;
476
477        if (!l->data_bytes) {
478                buf = alloca(MAX_STACK_BUFLEN);
479                va_copy(arg2, arg);
480                maxlen = vsnprintf(buf, MAX_STACK_BUFLEN, format, arg2);
481                va_end(arg2);
482                if (maxlen < MAX_STACK_BUFLEN) {
483                        wr = s->write(s, buf, maxlen, false);
484                        if (wr < 0) {
485                                ustream_write_error(s);
486                                return wr;
487                        }
488                        if (wr == maxlen)
489                                return wr;
490
491                        buf += wr;
492                        maxlen -= wr;
493                        return ustream_write_buffered(s, buf, maxlen, wr);
494                } else {
495                        buf = malloc(maxlen + 1);
496                        if (!buf)
497                                return 0;
498                        wr = vsnprintf(buf, maxlen + 1, format, arg);
499                        wr = ustream_write(s, buf, wr, false);
500                        free(buf);
501                        return wr;
502                }
503        }
504
505        if (!ustream_prepare_buf(s, l, 1))
506                return 0;
507
508        buf = l->data_tail->tail;
509        buflen = l->data_tail->end - buf;
510
511        va_copy(arg2, arg);
512        maxlen = vsnprintf(buf, buflen, format, arg2);
513        va_end(arg2);
514
515        wr = maxlen;
516        if (wr >= buflen)
517                wr = buflen - 1;
518
519        l->data_tail->tail += wr;
520        l->data_bytes += wr;
521        if (maxlen < buflen)
522                return wr;
523
524        buf = malloc(maxlen + 1);
525        if (!buf)
526                return wr;
527        maxlen = vsnprintf(buf, maxlen + 1, format, arg);
528        wr = ustream_write_buffered(s, buf + wr, maxlen - wr, wr);
529        free(buf);
530
531        return wr;
532}
533
534int ustream_printf(struct ustream *s, const char *format, ...)
535{
536        va_list arg;
537        int ret;
538
539        if (s->write_error)
540                return 0;
541
542        va_start(arg, format);
543        ret = ustream_vprintf(s, format, arg);
544        va_end(arg);
545
546        return ret;
547}
Note: See TracBrowser for help on using the repository browser.