123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420 |
- /*
- * Copyright (c) 2016 Fastly, Inc.
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to
- * deal in the Software without restriction, including without limitation the
- * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
- * sell copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
- * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
- * IN THE SOFTWARE.
- */
- /*
- * A streaming quantile library.
- *
- *
- * A `gkc_summary` structure is used to summarize observations
- * within a given error range. Observations are inserted using
- * `gkc_insert_value`, quantile queries can then be performed with
- * `gkc_query` against the summary.
- * Provided two summaries are using the same epsilon, they can be merged
- * by calling `gkc_combine`.
- *
- * The algorithm guaranties a bounded memory usage to:
- * (11/(2 x epsilon))*log(2 * epsilon * N)
- *
- * For epsilon = 0.01 and N = 2^64, this is only 10k max in the
- * theoritical worse case. In practice, it's reliably using less:
- * inserting random data gets us * ~100 max insertions for > 50 millions
- * of entries.
- *
- * See www.cis.upenn.edu/~sanjeev/papers/sigmod01_quantiles.pdf for
- * the paper describing this algorithm and data structure.
- *
- */
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <limits.h>
- #include <inttypes.h>
- struct list {
- struct list *prev, *next;
- };
- struct gkc_summary {
- size_t nr_elems;
- double epsilon;
- uint64_t alloced;
- uint64_t max_alloced;
- struct list head;
- struct freelist *fl;
- };
- static inline int list_empty(struct list *l)
- {
- return l->next == l;
- }
- static inline void list_init(struct list *n)
- {
- n->next = n;
- n->prev = n;
- }
- static inline void list_del(struct list *n)
- {
- n->next->prev = n->prev;
- n->prev->next = n->next;
- }
- static inline void list_add(struct list *l, struct list *n)
- {
- n->next = l->next;
- n->next->prev = n;
- l->next = n;
- n->prev = l;
- }
- static inline void list_add_tail(struct list *l, struct list *n)
- {
- list_add(l->prev, n);
- }
- #undef offsetof
- #ifdef __compiler_offsetof
- #define offsetof(TYPE,MEMBER) __compiler_offsetof(TYPE,MEMBER)
- #else
- #define offsetof(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER)
- #endif
- #define container_of(ptr, type, member) ((type *)((char *)(ptr) - offsetof(type, member)))
- struct freelist {
- struct freelist *next;
- };
- static int ullog2(uint64_t x)
- {
- return 63 - __builtin_clzll(x);
- }
- struct gkc_tuple {
- uint64_t value;
- double g;
- uint64_t delta;
- struct list node;
- };
- #define list_to_tuple(ln) (container_of((ln), struct gkc_tuple, node))
- void gkc_summary_init(struct gkc_summary *s, double epsilon)
- {
- list_init(&s->head);
- s->epsilon = epsilon;
- }
- struct gkc_summary *gkc_summary_alloc(double epsilon)
- {
- struct gkc_summary *s;
- s = calloc(1, sizeof(*s));
- gkc_summary_init(s, epsilon);
- return s;
- }
- #include <assert.h>
- /* debug only, checks a number of properties that s must satisfy at all times */
- void gkc_sanity_check(struct gkc_summary *s)
- {
- uint64_t nr_elems, nr_alloced;
- struct list *cur;
- struct gkc_tuple *tcur;
- nr_elems = 0;
- nr_alloced = 0;
- cur = s->head.next;
- while (cur != &s->head) {
- tcur = list_to_tuple(cur);
- cur = cur->next;
- nr_elems += tcur->g;
- nr_alloced++;
- if (s->nr_elems > (1/s->epsilon)) {
- /* there must be enough observations for this to become true */
- assert(tcur->g + tcur->delta <= (s->nr_elems * s->epsilon * 2));
- }
- assert(nr_alloced <= s->alloced);
- }
- assert(nr_elems == s->nr_elems);
- assert(nr_alloced == s->alloced);
- }
- static struct gkc_tuple *gkc_alloc(struct gkc_summary *s)
- {
- s->alloced++;
- if (s->alloced > s->max_alloced) {
- s->max_alloced = s->alloced;
- }
- if (s->fl) {
- void *ret;
- ret = s->fl;
- s->fl = s->fl->next;
- return ret;
- }
- return malloc(sizeof(struct gkc_tuple));
- }
- static void gkc_free(struct gkc_summary *s, struct gkc_tuple *p)
- {
- struct freelist *flp = (void *)p;
- s->alloced--;
- flp->next = s->fl;
- s->fl = flp;
- }
- void gkc_summary_free(struct gkc_summary *s)
- {
- struct freelist *fl;
- struct list *cur;
- cur = s->head.next;
- while (cur != &s->head) {
- struct list *next;
- next = cur->next;
- gkc_free(s, list_to_tuple(cur));
- cur = next;
- }
- fl = s->fl;
- while (fl) {
- void *p;
- p = fl;
- fl = fl->next;
- free(p);
- }
- free(s);
- }
- uint64_t gkc_query(struct gkc_summary *s, double q)
- {
- struct list *cur, *next;
- int rank;
- double gi;
- double ne;
- rank = 0.5 + q * s->nr_elems;
- ne = s->nr_elems * s->epsilon;
- gi = 0;
- if (list_empty(&s->head)) {
- return 0;
- }
- cur = s->head.next;
- while (1) {
- struct gkc_tuple *tcur, *tnext;
- tcur = list_to_tuple(cur);
- next = cur->next;
- if (next == &s->head) {
- return tcur->value;
- }
- tnext = list_to_tuple(next);
- gi += tcur->g;
- if ((rank + ne) < (gi + tnext->g + tnext->delta)) {
- if ((rank + ne) < (gi + tnext->g)) {
- return tcur->value;
- }
- return tnext->value;
- }
- cur = next;
- }
- }
- static uint64_t band(struct gkc_summary *s, uint64_t delta)
- {
- uint64_t diff;
- diff = 1 + (s->epsilon * s->nr_elems * 2) - delta;
- return ullog2(diff);
- }
- static void gkc_compress(struct gkc_summary *s)
- {
- int max_compress;
- struct list *cur, *prev;
- struct gkc_tuple *tcur, *tprev;
- uint64_t bi, b_plus_1;
- max_compress = 2 * s->epsilon * s->nr_elems;
- if (s->nr_elems < 2) {
- return;
- }
- prev = s->head.prev;
- cur = prev->prev;
- while (cur != &s->head) {
- tcur = list_to_tuple(cur);
- tprev = list_to_tuple(prev);
- b_plus_1 = band(s, tprev->delta);
- bi = band(s, tcur->delta);
- if (bi <= b_plus_1 && ((tcur->g + tprev->g + tprev->delta) <= max_compress)) {
- tprev->g += tcur->g;
- list_del(cur);
- gkc_free(s, tcur);
- cur = prev->prev;
- continue;
- }
- prev = cur;
- cur = cur->prev;
- }
- }
- void gkc_insert_value(struct gkc_summary *s, double value)
- {
- struct list *cur = NULL;
- struct gkc_tuple *new, *tcur, *tnext = NULL;
- new = gkc_alloc(s);
- memset(new, 0, sizeof(*new));
- new->value = value;
- new->g = 1;
- list_init(&new->node);
- s->nr_elems++;
- /* first insert */
- if (list_empty(&s->head)) {
- list_add(&s->head, &new->node);
- return;
- }
- cur = s->head.next;
- tcur = list_to_tuple(cur);
- /* v < v0, new min */
- if (tcur->value > new->value) {
- list_add(&s->head, &new->node);
- goto out;
- }
- double gi = 0;
- while (cur->next != &s->head) {
- tnext = list_to_tuple(cur->next);
- tcur = list_to_tuple(cur);
- gi += tcur->g;
- if (tcur->value <= new->value && new->value < tnext->value) {
- /* INSERT "(v, 1, Δ)" into S between vi and vi+1; */
- new->delta = tcur->g + tcur->delta - 1;
- list_add(cur, &new->node);
- goto out;
- }
- cur = cur->next;
- }
- /* v > vs-1, new max */
- list_add_tail(&s->head, &new->node);
- out:
- if (s->nr_elems % (int)(1/(2*s->epsilon))) {
- gkc_compress(s);
- }
- }
- void gkc_print_summary(struct gkc_summary *s)
- {
- struct gkc_tuple *tcur;
- struct list *cur;
- fprintf(stderr, "nr_elems: %zu, epsilon: %.02f, alloced: %" PRIu64 ", overfilled: %.02f, max_alloced: %" PRIu64 "\n",
- s->nr_elems, s->epsilon, s->alloced, 2 * s->epsilon * s->nr_elems, s->max_alloced);
- if (list_empty(&s->head)) {
- fprintf(stderr, "Empty summary\n");
- return;
- }
- cur = s->head.next;
- while (cur != &s->head) {
- tcur = list_to_tuple(cur);
- fprintf(stderr, "(v: %" PRIu64 ", g: %.02f, d: %" PRIu64 ") ", tcur->value, tcur->g, tcur->delta);
- cur = cur->next;
- }
- fprintf(stderr, "\n");
- }
- struct gkc_summary *gkc_combine(struct gkc_summary *s1, struct gkc_summary *s2)
- {
- struct gkc_summary *snew;
- struct list *cur1, *cur2;
- struct gkc_tuple *tcur1, *tcur2, *tnew;
- if (s1->epsilon != s2->epsilon) {
- return NULL;
- }
- snew = gkc_summary_alloc(s1->epsilon);
- cur1 = s1->head.next;
- cur2 = s2->head.next;
- while (cur1 != &s1->head && cur2 != &s2->head) {
- tcur1 = list_to_tuple(cur1);
- tcur2 = list_to_tuple(cur2);
- tnew = gkc_alloc(snew);
- if (tcur1->value < tcur2->value) {
- tnew->value = tcur1->value;
- tnew->g = tcur1->g;
- tnew->delta = tcur1->delta;
- cur1 = cur1->next;
- } else {
- tnew->value = tcur2->value;
- tnew->g = tcur2->g;
- tnew->delta = tcur2->delta;
- cur2 = cur2->next;
- }
- list_add_tail(&snew->head, &tnew->node);
- snew->nr_elems += tnew->g;
- }
- while (cur1 != &s1->head) {
- tcur1 = list_to_tuple(cur1);
- tnew = gkc_alloc(snew);
- tnew->value = tcur1->value;
- tnew->g = tcur1->g;
- tnew->delta = tcur1->delta;
- list_add_tail(&snew->head, &tnew->node);
- snew->nr_elems += tnew->g;
- cur1 = cur1->next;
- }
- while (cur2 != &s2->head) {
- tcur2 = list_to_tuple(cur2);
- tnew = gkc_alloc(snew);
- tnew->value = tcur2->value;
- tnew->g = tcur2->g;
- tnew->delta = tcur2->delta;
- list_add_tail(&snew->head, &tnew->node);
- snew->nr_elems += tnew->g;
- cur2 = cur2->next;
- }
- snew->max_alloced = snew->alloced;
- gkc_compress(snew);
- return snew;
- }
|