datasketches-cpp
Loading...
Searching...
No Matches
quantiles_sketch_impl.hpp
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#ifndef _QUANTILES_SKETCH_IMPL_HPP_
21#define _QUANTILES_SKETCH_IMPL_HPP_
22
23#include <cmath>
24#include <algorithm>
25#include <stdexcept>
26#include <iomanip>
27#include <sstream>
28
29#include "count_zeros.hpp"
30#include "conditional_forward.hpp"
31
32namespace datasketches {
33
34template<typename T, typename C, typename A>
35quantiles_sketch<T, C, A>::quantiles_sketch(uint16_t k, const C& comparator, const A& allocator):
36comparator_(comparator),
37allocator_(allocator),
38is_base_buffer_sorted_(true),
39k_(k),
40n_(0),
41bit_pattern_(0),
42base_buffer_(allocator_),
43levels_(allocator_),
44min_item_(),
45max_item_(),
46sorted_view_(nullptr)
47{
48 check_k(k_);
49 base_buffer_.reserve(2 * std::min(quantiles_constants::MIN_K, k));
50}
51
52template<typename T, typename C, typename A>
54comparator_(other.comparator_),
55allocator_(other.allocator_),
56is_base_buffer_sorted_(other.is_base_buffer_sorted_),
57k_(other.k_),
58n_(other.n_),
59bit_pattern_(other.bit_pattern_),
60base_buffer_(other.base_buffer_),
61levels_(other.levels_),
62min_item_(other.min_item_),
63max_item_(other.max_item_),
64sorted_view_(nullptr)
65{
66 for (size_t i = 0; i < levels_.size(); ++i) {
67 if (levels_[i].capacity() != other.levels_[i].capacity()) {
68 levels_[i].reserve(other.levels_[i].capacity());
69 }
70 }
71}
72
73template<typename T, typename C, typename A>
75comparator_(other.comparator_),
76allocator_(other.allocator_),
77is_base_buffer_sorted_(other.is_base_buffer_sorted_),
78k_(other.k_),
79n_(other.n_),
80bit_pattern_(other.bit_pattern_),
81base_buffer_(std::move(other.base_buffer_)),
82levels_(std::move(other.levels_)),
83min_item_(std::move(other.min_item_)),
84max_item_(std::move(other.max_item_)),
85sorted_view_(nullptr)
86{}
87
88template<typename T, typename C, typename A>
90 quantiles_sketch<T, C, A> copy(other);
91 std::swap(comparator_, copy.comparator_);
92 std::swap(allocator_, copy.allocator_);
93 std::swap(is_base_buffer_sorted_, copy.is_base_buffer_sorted_);
94 std::swap(k_, copy.k_);
95 std::swap(n_, copy.n_);
96 std::swap(bit_pattern_, copy.bit_pattern_);
97 std::swap(base_buffer_, copy.base_buffer_);
98 std::swap(levels_, copy.levels_);
99 std::swap(min_item_, copy.min_item_);
100 std::swap(max_item_, copy.max_item_);
101 reset_sorted_view();
102 return *this;
103}
104
105template<typename T, typename C, typename A>
107 std::swap(comparator_, other.comparator_);
108 std::swap(allocator_, other.allocator_);
109 std::swap(is_base_buffer_sorted_, other.is_base_buffer_sorted_);
110 std::swap(k_, other.k_);
111 std::swap(n_, other.n_);
112 std::swap(bit_pattern_, other.bit_pattern_);
113 std::swap(base_buffer_, other.base_buffer_);
114 std::swap(levels_, other.levels_);
115 std::swap(min_item_, other.min_item_);
116 std::swap(max_item_, other.max_item_);
117 reset_sorted_view();
118 return *this;
119}
120
121template<typename T, typename C, typename A>
122quantiles_sketch<T, C, A>::quantiles_sketch(uint16_t k, uint64_t n, uint64_t bit_pattern,
123 Level&& base_buffer, VectorLevels&& levels,
124 optional<T>&& min_item, optional<T>&& max_item,
125 bool is_sorted, const C& comparator, const A& allocator):
126comparator_(comparator),
127allocator_(allocator),
128is_base_buffer_sorted_(is_sorted),
129k_(k),
130n_(n),
131bit_pattern_(bit_pattern),
132base_buffer_(std::move(base_buffer)),
133levels_(std::move(levels)),
134min_item_(std::move(min_item)),
135max_item_(std::move(max_item)),
136sorted_view_(nullptr)
137{
138 uint32_t item_count = static_cast<uint32_t>(base_buffer_.size());
139 for (Level& lvl : levels_) {
140 item_count += static_cast<uint32_t>(lvl.size());
141 }
142 if (item_count != compute_retained_items(k_, n_))
143 throw std::logic_error("Item count does not match value computed from k, n");
144}
145
146template<typename T, typename C, typename A>
147template<typename From, typename FC, typename FA>
148quantiles_sketch<T, C, A>::quantiles_sketch(const quantiles_sketch<From, FC, FA>& other,
149 const C& comparator, const A& allocator):
150comparator_(comparator),
151allocator_(allocator),
152is_base_buffer_sorted_(false),
153k_(other.get_k()),
154n_(other.get_n()),
155bit_pattern_(compute_bit_pattern(other.get_k(), other.get_n())),
156base_buffer_(allocator),
157levels_(allocator),
158min_item_(other.min_item_),
159max_item_(other.max_item_),
160sorted_view_(nullptr)
161{
162 static_assert(std::is_constructible<T, From>::value,
163 "Type converting constructor requires new type to be constructible from existing type");
164
165 base_buffer_.reserve(2 * std::min(quantiles_constants::MIN_K, k_));
166
167 if (!other.is_empty()) {
168 // reserve space in levels
169 const uint8_t num_levels = compute_levels_needed(k_, n_);
170 levels_.reserve(num_levels);
171 for (int i = 0; i < num_levels; ++i) {
172 Level level(allocator);
173 level.reserve(k_);
174 levels_.push_back(std::move(level));
175 }
176
177 // iterate through points, assigning to the correct level as needed
178 for (auto pair : other) {
179 const uint64_t wt = pair.second;
180 if (wt == 1) {
181 base_buffer_.push_back(T(pair.first));
182 // resize where needed as if adding points via update()
183 if (base_buffer_.size() + 1 > base_buffer_.capacity()) {
184 const size_t new_size = std::max(std::min(static_cast<size_t>(2 * k_), 2 * base_buffer_.size()), static_cast<size_t>(1));
185 base_buffer_.reserve(new_size);
186 }
187 }
188 else {
189 const uint8_t idx = count_trailing_zeros_in_u64(pair.second) - 1;
190 levels_[idx].push_back(T(pair.first));
191 }
192 }
193
194 // validate that ordering within each level is preserved
195 // base_buffer_ can be considered unsorted for this purpose
196 for (int i = 0; i < num_levels; ++i) {
197 if (!std::is_sorted(levels_[i].begin(), levels_[i].end(), comparator_)) {
198 throw std::logic_error("Copy construction across types produces invalid sorting");
199 }
200 }
201 }
202}
203
204
205template<typename T, typename C, typename A>
206quantiles_sketch<T, C, A>::~quantiles_sketch() {
207 reset_sorted_view();
208}
209
210template<typename T, typename C, typename A>
211template<typename FwdT>
213 if (!check_update_item(item)) { return; }
214 if (is_empty()) {
215 min_item_.emplace(item);
216 max_item_.emplace(item);
217 } else {
218 if (comparator_(item, *min_item_)) *min_item_ = item;
219 if (comparator_(*max_item_, item)) *max_item_ = item;
220 }
221
222 // if exceed capacity, grow until size 2k -- assumes eager processing
223 if (base_buffer_.size() + 1 > base_buffer_.capacity()) grow_base_buffer();
224
225 base_buffer_.push_back(std::forward<FwdT>(item));
226 ++n_;
227
228 if (base_buffer_.size() > 1) is_base_buffer_sorted_ = false;
229 if (base_buffer_.size() == 2 * k_) process_full_base_buffer();
230 reset_sorted_view();
231}
232
233template<typename T, typename C, typename A>
234template<typename FwdSk>
236 if (other.is_empty()) {
237 return; // nothing to do
238 } else if (!other.is_estimation_mode()) {
239 // other is exact, stream in regardless of k
240 for (auto item : other.base_buffer_) {
241 update(conditional_forward<FwdSk>(item));
242 }
243 reset_sorted_view();
244 return;
245 }
246
247 // other has data and is in estimation mode
248 if (is_estimation_mode()) {
249 if (k_ == other.get_k()) {
250 standard_merge(*this, std::forward<FwdSk>(other));
251 } else if (k_ > other.get_k()) {
252 quantiles_sketch sk_copy(std::forward<FwdSk>(other));
253 downsampling_merge(sk_copy, std::move(*this));
254 *this = std::move(sk_copy);
255 } else { // k_ < other.get_k()
256 downsampling_merge(*this, std::forward<FwdSk>(other));
257 }
258 } else {
259 // exact or empty
260 quantiles_sketch sk_copy(std::forward<FwdSk>(other));
261 if (k_ <= other.get_k()) {
262 if (!is_empty()) {
263 for (uint16_t i = 0; i < base_buffer_.size(); ++i) {
264 sk_copy.update(std::move(base_buffer_[i]));
265 }
266 }
267 } else { // k_ > other.get_k()
268 downsampling_merge(sk_copy, std::move(*this));
269 }
270 *this = std::move(sk_copy);
271 }
272 reset_sorted_view();
273}
274
275template<typename T, typename C, typename A>
276template<typename SerDe>
277void quantiles_sketch<T, C, A>::serialize(std::ostream& os, const SerDe& serde) const {
278 const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_SHORT : PREAMBLE_LONGS_FULL;
279 write(os, preamble_longs);
280 const uint8_t ser_ver = SERIAL_VERSION;
281 write(os, ser_ver);
282 const uint8_t family = FAMILY;
283 write(os, family);
284
285 // side-effect: sort base buffer since always compact
286 std::sort(const_cast<Level&>(base_buffer_).begin(), const_cast<Level&>(base_buffer_).end(), comparator_);
287 const_cast<quantiles_sketch*>(this)->is_base_buffer_sorted_ = true;
288
289 // empty, ordered, compact are valid flags
290 const uint8_t flags_byte(
291 (is_empty() ? 1 << flags::IS_EMPTY : 0)
292 | (1 << flags::IS_SORTED) // always sorted as side effect noted above
293 | (1 << flags::IS_COMPACT) // always compact -- could be optional for numeric types?
294 );
295 write(os, flags_byte);
296 write(os, k_);
297 const uint16_t unused = 0;
298 write(os, unused);
299
300 if (!is_empty()) {
301 write(os, n_);
302
303 // min and max
304 serde.serialize(os, &*min_item_, 1);
305 serde.serialize(os, &*max_item_, 1);
306
307 // base buffer items
308 serde.serialize(os, base_buffer_.data(), static_cast<unsigned>(base_buffer_.size()));
309
310 // levels, only when data is present
311 for (Level lvl : levels_) {
312 if (lvl.size() > 0)
313 serde.serialize(os, lvl.data(), static_cast<unsigned>(lvl.size()));
314 }
315 }
316}
317
318template<typename T, typename C, typename A>
319template<typename SerDe>
320auto quantiles_sketch<T, C, A>::serialize(unsigned header_size_bytes, const SerDe& serde) const -> vector_bytes {
321 const size_t size = get_serialized_size_bytes(serde) + header_size_bytes;
322 vector_bytes bytes(size, 0, allocator_);
323 uint8_t* ptr = bytes.data() + header_size_bytes;
324 const uint8_t* end_ptr = ptr + size;
325
326 const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_SHORT : PREAMBLE_LONGS_FULL;
327 ptr += copy_to_mem(preamble_longs, ptr);
328 const uint8_t ser_ver = SERIAL_VERSION;
329 ptr += copy_to_mem(ser_ver, ptr);
330 const uint8_t family = FAMILY;
331 ptr += copy_to_mem(family, ptr);
332
333 // side-effect: sort base buffer since always compact
334 std::sort(const_cast<Level&>(base_buffer_).begin(), const_cast<Level&>(base_buffer_).end(), comparator_);
335 const_cast<quantiles_sketch*>(this)->is_base_buffer_sorted_ = true;
336
337 // empty, ordered, compact are valid flags
338 const uint8_t flags_byte(
339 (is_empty() ? 1 << flags::IS_EMPTY : 0)
340 | (1 << flags::IS_SORTED) // always sorted as side effect noted above
341 | (1 << flags::IS_COMPACT) // always compact
342 );
343 ptr += copy_to_mem(flags_byte, ptr);
344 ptr += copy_to_mem(k_, ptr);
345 ptr += sizeof(uint16_t); // 2 unused bytes
346
347 if (!is_empty()) {
348
349 ptr += copy_to_mem(n_, ptr);
350
351 // min and max
352 ptr += serde.serialize(ptr, end_ptr - ptr, &*min_item_, 1);
353 ptr += serde.serialize(ptr, end_ptr - ptr, &*max_item_, 1);
354
355 // base buffer items
356 if (base_buffer_.size() > 0)
357 ptr += serde.serialize(ptr, end_ptr - ptr, base_buffer_.data(), static_cast<unsigned>(base_buffer_.size()));
358
359 // levels, only when data is present
360 for (Level lvl : levels_) {
361 if (lvl.size() > 0)
362 ptr += serde.serialize(ptr, end_ptr - ptr, lvl.data(), static_cast<unsigned>(lvl.size()));
363 }
364 }
365
366 return bytes;
367}
368
369template<typename T, typename C, typename A>
370template<typename SerDe>
371auto quantiles_sketch<T, C, A>::deserialize(std::istream &is, const SerDe& serde,
372 const C& comparator, const A &allocator) -> quantiles_sketch {
373 const auto preamble_longs = read<uint8_t>(is);
374 const auto serial_version = read<uint8_t>(is);
375 const auto family_id = read<uint8_t>(is);
376 const auto flags_byte = read<uint8_t>(is);
377 const auto k = read<uint16_t>(is);
378 read<uint16_t>(is); // unused
379
380 check_k(k);
381 check_serial_version(serial_version); // a little redundant with the header check
382 check_family_id(family_id);
383 check_header_validity(preamble_longs, flags_byte, serial_version);
384
385 if (!is.good()) throw std::runtime_error("error reading from std::istream");
386 const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
387 if (is_empty) {
388 return quantiles_sketch(k, comparator, allocator);
389 }
390
391 const auto items_seen = read<uint64_t>(is);
392
393 const bool is_compact = (serial_version == 2) | ((flags_byte & (1 << flags::IS_COMPACT)) > 0);
394 const bool is_sorted = (flags_byte & (1 << flags::IS_SORTED)) > 0;
395
396 optional<T> tmp; // space to deserialize min and max
397 optional<T> min_item;
398 optional<T> max_item;
399
400 serde.deserialize(is, &*tmp, 1);
401 // serde call did not throw, repackage and cleanup
402 min_item.emplace(*tmp);
403 (*tmp).~T();
404 serde.deserialize(is, &*tmp, 1);
405 // serde call did not throw, repackage and cleanup
406 max_item.emplace(*tmp);
407 (*tmp).~T();
408
409 if (serial_version == 1) {
410 read<uint64_t>(is); // no longer used
411 }
412
413 // allocate buffers as needed
414 const uint8_t levels_needed = compute_levels_needed(k, items_seen);
415 const uint64_t bit_pattern = compute_bit_pattern(k, items_seen);
416
417 // Java provides a compact storage layout for a sketch of primitive doubles. The C++ version
418 // does not currently operate sketches in compact mode, but will only serialize as compact
419 // to avoid complications around serialization of empty values for generic type T. We also need
420 // to be able to ingest either serialized format from Java.
421
422 // load base buffer
423 const uint32_t bb_items = compute_base_buffer_items(k, items_seen);
424 uint32_t items_to_read = (levels_needed == 0 || is_compact) ? bb_items : 2 * k;
425 Level base_buffer = deserialize_array(is, bb_items, 2 * k, serde, allocator);
426 if (items_to_read > bb_items) { // either equal or greater, never read fewer items
427 // read remaining items, but don't store them
428 deserialize_array(is, items_to_read - bb_items, items_to_read - bb_items, serde, allocator);
429 }
430
431 // populate vector of Levels directly
432 VectorLevels levels(allocator);
433 levels.reserve(levels_needed);
434 if (levels_needed > 0) {
435 uint64_t working_pattern = bit_pattern;
436 for (size_t i = 0; i < levels_needed; ++i, working_pattern >>= 1) {
437 if ((working_pattern & 0x01) == 1) {
438 Level level = deserialize_array(is, k, k, serde, allocator);
439 levels.push_back(std::move(level));
440 } else {
441 Level level(allocator);
442 level.reserve(k);
443 levels.push_back(std::move(level));
444 }
445 }
446 }
447
448 return quantiles_sketch(k, items_seen, bit_pattern,
449 std::move(base_buffer), std::move(levels), std::move(min_item), std::move(max_item), is_sorted,
450 comparator, allocator);
451}
452
453template<typename T, typename C, typename A>
454template<typename SerDe>
455auto quantiles_sketch<T, C, A>::deserialize_array(std::istream& is, uint32_t num_items, uint32_t capacity, const SerDe& serde, const A& allocator) -> Level {
456 A alloc(allocator);
457 std::unique_ptr<T, items_deleter> items(alloc.allocate(num_items), items_deleter(allocator, false, num_items));
458 serde.deserialize(is, items.get(), num_items);
459 // serde did not throw, enable destructors
460 items.get_deleter().set_destroy(true);
461 if (!is.good()) throw std::runtime_error("error reading from std::istream");
462
463 // successfully read, now put into a Level
464 Level level(allocator);
465 level.reserve(capacity);
466 level.insert(level.begin(),
467 std::make_move_iterator(items.get()),
468 std::make_move_iterator(items.get() + num_items));
469 return level;
470}
471
472template<typename T, typename C, typename A>
473template<typename SerDe>
474auto quantiles_sketch<T, C, A>::deserialize(const void* bytes, size_t size, const SerDe& serde,
475 const C& comparator, const A &allocator) -> quantiles_sketch {
476 ensure_minimum_memory(size, 8);
477 const char* ptr = static_cast<const char*>(bytes);
478 const char* end_ptr = static_cast<const char*>(bytes) + size;
479
480 uint8_t preamble_longs;
481 ptr += copy_from_mem(ptr, preamble_longs);
482 uint8_t serial_version;
483 ptr += copy_from_mem(ptr, serial_version);
484 uint8_t family_id;
485 ptr += copy_from_mem(ptr, family_id);
486 uint8_t flags_byte;
487 ptr += copy_from_mem(ptr, flags_byte);
488 uint16_t k;
489 ptr += copy_from_mem(ptr, k);
490 uint16_t unused;
491 ptr += copy_from_mem(ptr, unused);
492
493 check_k(k);
494 check_serial_version(serial_version); // a little redundant with the header check
495 check_family_id(family_id);
496 check_header_validity(preamble_longs, flags_byte, serial_version);
497
498 const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
499 if (is_empty) {
500 return quantiles_sketch(k, comparator, allocator);
501 }
502
503 ensure_minimum_memory(size, 16);
504 uint64_t items_seen;
505 ptr += copy_from_mem(ptr, items_seen);
506
507 const bool is_compact = (serial_version == 2) | ((flags_byte & (1 << flags::IS_COMPACT)) > 0);
508 const bool is_sorted = (flags_byte & (1 << flags::IS_SORTED)) > 0;
509
510 optional<T> tmp; // space to deserialize min and max
511 optional<T> min_item;
512 optional<T> max_item;
513
514 ptr += serde.deserialize(ptr, end_ptr - ptr, &*tmp, 1);
515 // serde call did not throw, repackage and cleanup
516 min_item.emplace(*tmp);
517 (*tmp).~T();
518 ptr += serde.deserialize(ptr, end_ptr - ptr, &*tmp, 1);
519 // serde call did not throw, repackage and cleanup
520 max_item.emplace(*tmp);
521 (*tmp).~T();
522
523 if (serial_version == 1) {
524 uint64_t unused_long;
525 ptr += copy_from_mem(ptr, unused_long); // no longer used
526 }
527
528 // allocate buffers as needed
529 const uint8_t levels_needed = compute_levels_needed(k, items_seen);
530 const uint64_t bit_pattern = compute_bit_pattern(k, items_seen);
531
532 // Java provides a compact storage layout for a sketch of primitive doubles. The C++ version
533 // does not currently operate sketches in compact mode, but will only serialize as compact
534 // to avoid complications around serialization of empty values for generic type T. We also need
535 // to be able to ingest either serialized format from Java.
536
537 // load base buffer
538 const uint32_t bb_items = compute_base_buffer_items(k, items_seen);
539 uint32_t items_to_read = (levels_needed == 0 || is_compact) ? bb_items : 2 * k;
540 auto base_buffer_pair = deserialize_array(ptr, end_ptr - ptr, bb_items, 2 * k, serde, allocator);
541 ptr += base_buffer_pair.second;
542 if (items_to_read > bb_items) { // either equal or greater, never read fewer items
543 // read remaining items, only use to advance the pointer
544 auto extras = deserialize_array(ptr, end_ptr - ptr, items_to_read - bb_items, items_to_read - bb_items, serde, allocator);
545 ptr += extras.second;
546 }
547
548 // populate vector of Levels directly
549 VectorLevels levels(allocator);
550 levels.reserve(levels_needed);
551 if (levels_needed > 0) {
552 uint64_t working_pattern = bit_pattern;
553 for (size_t i = 0; i < levels_needed; ++i, working_pattern >>= 1) {
554
555 if ((working_pattern & 0x01) == 1) {
556 auto pair = deserialize_array(ptr, end_ptr - ptr, k, k, serde, allocator);
557 ptr += pair.second;
558 levels.push_back(std::move(pair.first));
559 } else {
560 Level level(allocator);
561 level.reserve(k);
562 levels.push_back(std::move(level));
563 }
564 }
565 }
566
567 return quantiles_sketch(k, items_seen, bit_pattern,
568 std::move(base_buffer_pair.first), std::move(levels), std::move(min_item), std::move(max_item), is_sorted,
569 comparator, allocator);
570}
571
572template<typename T, typename C, typename A>
573template<typename SerDe>
574auto quantiles_sketch<T, C, A>::deserialize_array(const void* bytes, size_t size, uint32_t num_items, uint32_t capacity, const SerDe& serde, const A& allocator)
575 -> std::pair<Level, size_t> {
576 const char* ptr = static_cast<const char*>(bytes);
577 const char* end_ptr = static_cast<const char*>(bytes) + size;
578 A alloc(allocator);
579 std::unique_ptr<T, items_deleter> items(alloc.allocate(num_items), items_deleter(allocator, false, num_items));
580 ptr += serde.deserialize(ptr, end_ptr - ptr, items.get(), num_items);
581 // serde did not throw, enable destructors
582 items.get_deleter().set_destroy(true);
583
584 // succesfully read, now put into a Level
585 Level level(allocator);
586 level.reserve(capacity);
587 level.insert(level.begin(),
588 std::make_move_iterator(items.get()),
589 std::make_move_iterator(items.get() + num_items));
590
591 return std::pair<Level, size_t>(std::move(level), ptr - static_cast<const char*>(bytes));
592}
593
594template<typename T, typename C, typename A>
595string<A> quantiles_sketch<T, C, A>::to_string(bool print_levels, bool print_items) const {
596 // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements.
597 // The stream does not support passing an allocator instance, and alternatives are complicated.
598 std::ostringstream os;
599 os << "### Quantiles Sketch summary:" << std::endl;
600 os << " K : " << k_ << std::endl;
601 os << " N : " << n_ << std::endl;
602 os << " Epsilon : " << std::setprecision(3) << get_normalized_rank_error(false) * 100 << "%" << std::endl;
603 os << " Epsilon PMF : " << get_normalized_rank_error(true) * 100 << "%" << std::endl;
604 os << " Empty : " << (is_empty() ? "true" : "false") << std::endl;
605 os << " Estimation mode: " << (is_estimation_mode() ? "true" : "false") << std::endl;
606 os << " Levels (w/o BB): " << levels_.size() << std::endl;
607 os << " Used Levels : " << count_valid_levels(bit_pattern_) << std::endl;
608 os << " Retained items : " << get_num_retained() << std::endl;
609 if (!is_empty()) {
610 os << " Min item : " << *min_item_ << std::endl;
611 os << " Max item : " << *max_item_ << std::endl;
612 }
613 os << "### End sketch summary" << std::endl;
614
615 if (print_levels) {
616 os << "### Quantiles Sketch levels:" << std::endl;
617 os << " index: items in use" << std::endl;
618 os << " BB: " << base_buffer_.size() << std::endl;
619 for (uint8_t i = 0; i < levels_.size(); i++) {
620 os << " " << static_cast<unsigned int>(i) << ": " << levels_[i].size() << std::endl;
621 }
622 os << "### End sketch levels" << std::endl;
623 }
624
625 if (print_items) {
626 os << "### Quantiles Sketch data:" << std::endl;
627 uint8_t level = 0;
628 os << " BB:" << std::endl;
629 for (const T& item : base_buffer_) {
630 os << " " << item << std::endl;
631 }
632 for (uint8_t i = 0; i < levels_.size(); ++i) {
633 os << " level " << static_cast<unsigned int>(level) << ":" << std::endl;
634 for (const T& item : levels_[i]) {
635 os << " " << item << std::endl;
636 }
637 }
638 os << "### End sketch data" << std::endl;
639 }
640 return string<A>(os.str().c_str(), allocator_);
641}
642
643template<typename T, typename C, typename A>
645 return k_;
646}
647
648template<typename T, typename C, typename A>
650 return n_;
651}
652
653template<typename T, typename C, typename A>
655 return n_ == 0;
656}
657
658template<typename T, typename C, typename A>
660 return bit_pattern_ != 0;
661}
662
663template<typename T, typename C, typename A>
665 return compute_retained_items(k_, n_);
666}
667
668template<typename T, typename C, typename A>
670 if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
671 return *min_item_;
672}
673
674template<typename T, typename C, typename A>
676 if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
677 return *max_item_;
678}
679
680template<typename T, typename C, typename A>
682 return comparator_;
683}
684
685template<typename T, typename C, typename A>
687 return allocator_;
688}
689
690// implementation for fixed-size arithmetic types (integral and floating point)
691template<typename T, typename C, typename A>
692template<typename SerDe, typename TT, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
694 if (is_empty()) { return EMPTY_SIZE_BYTES; }
695 return DATA_START + ((get_num_retained() + 2) * sizeof(TT));
696}
697
698// implementation for all other types
699template<typename T, typename C, typename A>
700template<typename SerDe, typename TT, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
702 if (is_empty()) { return EMPTY_SIZE_BYTES; }
703 size_t size = DATA_START;
704 size += serde.size_of_item(*min_item_);
705 size += serde.size_of_item(*max_item_);
706 for (auto it: *this) size += serde.size_of_item(it.first);
707 return size;
708}
709
710template<typename T, typename C, typename A>
712 return get_normalized_rank_error(k_, is_pmf);
713}
714
715template<typename T, typename C, typename A>
717 return is_pmf
718 ? 1.854 / std::pow(k, 0.9657)
719 : 1.576 / std::pow(k, 0.9726);
720}
721
722template<typename T, typename C, typename A>
724 // allow side-effect of sorting the base buffer
725 if (!is_base_buffer_sorted_) {
726 std::sort(const_cast<Level&>(base_buffer_).begin(), const_cast<Level&>(base_buffer_).end(), comparator_);
727 const_cast<quantiles_sketch*>(this)->is_base_buffer_sorted_ = true;
728 }
729 quantiles_sorted_view<T, C, A> view(get_num_retained(), comparator_, allocator_);
730
731 uint64_t weight = 1;
732 view.add(base_buffer_.begin(), base_buffer_.end(), weight);
733 for (const auto& level: levels_) {
734 weight <<= 1;
735 if (level.empty()) { continue; }
736 view.add(level.begin(), level.end(), weight);
737 }
738
739 view.convert_to_cummulative();
740 return view;
741}
742
743template<typename T, typename C, typename A>
744auto quantiles_sketch<T, C, A>::get_quantile(double rank, bool inclusive) const -> quantile_return_type {
745 if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
746 if ((rank < 0.0) || (rank > 1.0)) {
747 throw std::invalid_argument("Normalized rank cannot be less than 0 or greater than 1");
748 }
749 // possible side-effect: sorting base buffer
750 setup_sorted_view();
751 return sorted_view_->get_quantile(rank, inclusive);
752}
753
754template<typename T, typename C, typename A>
755double quantiles_sketch<T, C, A>::get_rank(const T& item, bool inclusive) const {
756 if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
757 setup_sorted_view();
758 return sorted_view_->get_rank(item, inclusive);
759}
760
761template<typename T, typename C, typename A>
762auto quantiles_sketch<T, C, A>::get_PMF(const T* split_points, uint32_t size, bool inclusive) const -> vector_double {
763 if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
764 setup_sorted_view();
765 return sorted_view_->get_PMF(split_points, size, inclusive);
766}
767
768template<typename T, typename C, typename A>
769auto quantiles_sketch<T, C, A>::get_CDF(const T* split_points, uint32_t size, bool inclusive) const -> vector_double {
770 if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
771 setup_sorted_view();
772 return sorted_view_->get_CDF(split_points, size, inclusive);
773}
774
775template<typename T, typename C, typename A>
776uint32_t quantiles_sketch<T, C, A>::compute_retained_items(uint16_t k, uint64_t n) {
777 const uint32_t bb_count = compute_base_buffer_items(k, n);
778 const uint64_t bit_pattern = compute_bit_pattern(k, n);
779 const uint32_t valid_levels = count_valid_levels(bit_pattern);
780 return bb_count + (k * valid_levels);
781}
782
783template<typename T, typename C, typename A>
784uint32_t quantiles_sketch<T, C, A>::compute_base_buffer_items(uint16_t k, uint64_t n) {
785 return n % (static_cast<uint64_t>(2) * k);
786}
787
788template<typename T, typename C, typename A>
789uint64_t quantiles_sketch<T, C, A>::compute_bit_pattern(uint16_t k, uint64_t n) {
790 return n / (static_cast<uint64_t>(2) * k);
791}
792
793template<typename T, typename C, typename A>
794uint32_t quantiles_sketch<T, C, A>::count_valid_levels(uint64_t bit_pattern) {
795 uint32_t count = 0;
796 for (; bit_pattern > 0; ++count) bit_pattern &= bit_pattern - 1;
797 return count;
798}
799
800template<typename T, typename C, typename A>
801uint8_t quantiles_sketch<T, C, A>::compute_levels_needed(uint16_t k, uint64_t n) {
802 return static_cast<uint8_t>(64U) - count_leading_zeros_in_u64(n / (2 * k));
803}
804
805template<typename T, typename C, typename A>
806void quantiles_sketch<T, C, A>::check_k(uint16_t k) {
807 if (k < quantiles_constants::MIN_K || k > quantiles_constants::MAX_K || (k & (k - 1)) != 0) {
808 throw std::invalid_argument("k must be a power of 2 that is >= "
809 + std::to_string(quantiles_constants::MIN_K) + " and <= "
810 + std::to_string(quantiles_constants::MAX_K) + ". Found: " + std::to_string(k));
811 }
812}
813
814template<typename T, typename C, typename A>
815void quantiles_sketch<T, C, A>::check_serial_version(uint8_t serial_version) {
816 if (serial_version == SERIAL_VERSION || serial_version == SERIAL_VERSION_1 || serial_version == SERIAL_VERSION_2)
817 return;
818 else
819 throw std::invalid_argument("Possible corruption. Unrecognized serialization version: " + std::to_string(serial_version));
820}
821
822template<typename T, typename C, typename A>
823void quantiles_sketch<T, C, A>::check_family_id(uint8_t family_id) {
824 if (family_id == FAMILY)
825 return;
826 else
827 throw std::invalid_argument("Possible corruption. Family id does not indicate quantiles sketch: " + std::to_string(family_id));
828}
829
830template<typename T, typename C, typename A>
831void quantiles_sketch<T, C, A>::check_header_validity(uint8_t preamble_longs, uint8_t flags_byte, uint8_t serial_version) {
832 const bool empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
833 const bool compact = (flags_byte & (1 << flags::IS_COMPACT)) > 0;
834
835 const uint8_t sw = (compact ? 1 : 0) + (2 * (empty ? 1 : 0))
836 + (4 * (serial_version & 0xF)) + (32 * (preamble_longs & 0x3F));
837 bool valid = true;
838
839 switch (sw) { // exhaustive list and description of all valid cases
840 case 38 : break;
841 case 164 : break;
842 case 42 : break;
843 case 72 : break;
844 case 47 : break; // compact, empty, serVer = 3, preLongs = 1;
845 case 46 : break;
846 case 79 : break; // compact, empty, serVer = 3, preLongs = 2;
847 case 78 : break;
848 case 77 : break; // compact, !empty, serVer = 3, preLongs = 2;
849 case 76 : break;
850 default : //all other case values are invalid
851 valid = false;
852 }
853
854 if (!valid) {
855 std::ostringstream os;
856 os << "Possible sketch corruption. Inconsistent state: "
857 << "preamble_longs = " << preamble_longs
858 << ", empty = " << (empty ? "true" : "false")
859 << ", serialization_version = " << serial_version
860 << ", compact = " << (compact ? "true" : "false");
861 throw std::invalid_argument(os.str());
862 }
863}
864
865template <typename T, typename C, typename A>
869
870template <typename T, typename C, typename A>
874
875template<typename T, typename C, typename A>
877 const size_t new_size = std::max(std::min(static_cast<size_t>(2 * k_), 2 * base_buffer_.size()), static_cast<size_t>(1));
878 base_buffer_.reserve(new_size);
879}
880
881template<typename T, typename C, typename A>
882void quantiles_sketch<T, C, A>::process_full_base_buffer() {
883 // make sure there will be enough levels for the propagation
884 grow_levels_if_needed(); // note: n_ was already incremented by update() before this
885
886 std::sort(base_buffer_.begin(), base_buffer_.end(), comparator_);
887 in_place_propagate_carry(0,
888 levels_[0], // unused here, but 0 is guaranteed to exist
889 base_buffer_,
890 true, *this);
891 base_buffer_.clear();
892 is_base_buffer_sorted_ = true;
893 if (n_ / (2 * k_) != bit_pattern_) {
894 throw std::logic_error("Internal error: n / 2k (" + std::to_string(n_ / 2 * k_)
895 + " != bit_pattern " + std::to_string(bit_pattern_));
896 }
897}
898
899template<typename T, typename C, typename A>
900bool quantiles_sketch<T, C, A>::grow_levels_if_needed() {
901 const uint8_t levels_needed = compute_levels_needed(k_, n_);
902 if (levels_needed == 0)
903 return false; // don't need levels and might have small base buffer. Possible during merges.
904
905 // from here on, assume full size base buffer (2k) and at least one additional level
906 if (levels_needed <= levels_.size())
907 return false;
908
909 Level empty_level(allocator_);
910 empty_level.reserve(k_);
911 levels_.push_back(std::move(empty_level));
912 return true;
913}
914
915template<typename T, typename C, typename A>
916template<typename FwdV>
917void quantiles_sketch<T, C, A>::in_place_propagate_carry(uint8_t starting_level,
918 FwdV&& buf_size_k, Level& buf_size_2k,
919 bool apply_as_update,
920 quantiles_sketch& sketch) {
921 const uint64_t bit_pattern = sketch.bit_pattern_;
922 const int k = sketch.k_;
923
924 uint8_t ending_level = lowest_zero_bit_starting_at(bit_pattern, starting_level);
925
926 if (apply_as_update) {
927 // update version of computation
928 // its is okay for buf_size_k to be null in this case
929 zip_buffer(buf_size_2k, sketch.levels_[ending_level]);
930 } else {
931 // merge_into version of computation
932 for (uint16_t i = 0; i < k; ++i) {
933 sketch.levels_[ending_level].push_back(conditional_forward<FwdV>(buf_size_k[i]));
934 }
935 }
936
937 for (uint64_t lvl = starting_level; lvl < ending_level; lvl++) {
938 if ((bit_pattern & (static_cast<uint64_t>(1) << lvl)) == 0) {
939 throw std::logic_error("unexpected empty level in bit_pattern");
940 }
941 merge_two_size_k_buffers(
942 sketch.levels_[lvl],
943 sketch.levels_[ending_level],
944 buf_size_2k, sketch.get_comparator());
945 sketch.levels_[lvl].clear();
946 sketch.levels_[ending_level].clear();
947 zip_buffer(buf_size_2k, sketch.levels_[ending_level]);
948 } // end of loop over lower levels
949
950 // update bit pattern with binary-arithmetic ripple carry
951 sketch.bit_pattern_ = bit_pattern + (static_cast<uint64_t>(1) << starting_level);
952}
953
954template<typename T, typename C, typename A>
955void quantiles_sketch<T, C, A>::zip_buffer(Level& buf_in, Level& buf_out) {
956#ifdef QUANTILES_VALIDATION
957 static uint32_t next_offset = 0;
958 uint32_t rand_offset = next_offset;
959 next_offset = 1 - next_offset;
960#else
961 uint32_t rand_offset = random_utils::random_bit();
962#endif
963 if ((buf_in.size() != 2 * buf_out.capacity())
964 || (buf_out.size() > 0)) {
965 throw std::logic_error("zip_buffer requires buf_in.size() == "
966 "2*buf_out.capacity() and empty buf_out");
967 }
968
969 size_t k = buf_out.capacity();
970 for (uint32_t i = rand_offset, o = 0; o < k; i += 2, ++o) {
971 buf_out.push_back(std::move(buf_in[i]));
972 }
973 buf_in.clear();
974}
975
976template<typename T, typename C, typename A>
977template<typename FwdV>
978void quantiles_sketch<T, C, A>::zip_buffer_with_stride(FwdV&& buf_in, Level& buf_out, uint16_t stride) {
979 // Random offset in range [0, stride)
980 std::uniform_int_distribution<uint16_t> dist(0, stride - 1);
981 const uint16_t rand_offset = dist(random_utils::rand);
982
983 if ((buf_in.size() != stride * buf_out.capacity())
984 || (buf_out.size() > 0)) {
985 throw std::logic_error("zip_buffer_with_stride requires buf_in.size() == "
986 "stride*buf_out.capacity() and empty buf_out");
987 }
988
989 const size_t k = buf_out.capacity();
990 for (uint16_t i = rand_offset, o = 0; o < k; i += stride, ++o) {
991 buf_out.push_back(conditional_forward<FwdV>(buf_in[i]));
992 }
993 // do not clear input buffer
994}
995
996template<typename T, typename C, typename A>
997void quantiles_sketch<T, C, A>::merge_two_size_k_buffers(Level& src_1, Level& src_2,
998 Level& dst, const C& comparator) {
999 if (src_1.size() != src_2.size()
1000 || src_1.size() * 2 != dst.capacity()
1001 || dst.size() != 0) {
1002 throw std::logic_error("Input invariants violated in merge_two_size_k_buffers()");
1003 }
1004
1005 auto end1 = src_1.end(), end2 = src_2.end();
1006 auto it1 = src_1.begin(), it2 = src_2.begin();
1007
1008 // TODO: probably actually doing copies given Level&?
1009 while (it1 != end1 && it2 != end2) {
1010 if (comparator(*it1, *it2)) {
1011 dst.push_back(std::move(*it1++));
1012 } else {
1013 dst.push_back(std::move(*it2++));
1014 }
1015 }
1016
1017 if (it1 != end1) {
1018 dst.insert(dst.end(), it1, end1);
1019 } else {
1020 if (it2 == end2) { throw std::logic_error("it2 unexpectedly already at end of range"); }
1021 dst.insert(dst.end(), it2, end2);
1022 }
1023}
1024
1025template<typename T, typename C, typename A>
1026template<typename FwdSk>
1027void quantiles_sketch<T, C, A>::standard_merge(quantiles_sketch& tgt, FwdSk&& src) {
1028 if (src.get_k() != tgt.get_k()) {
1029 throw std::invalid_argument("src.get_k() != tgt.get_k()");
1030 }
1031 if (src.is_empty()) {
1032 return;
1033 }
1034
1035 uint64_t new_n = src.get_n() + tgt.get_n();
1036
1037 // move items from src's base buffer
1038 for (uint16_t i = 0; i < src.base_buffer_.size(); ++i) {
1039 tgt.update(conditional_forward<FwdSk>(src.base_buffer_[i]));
1040 }
1041
1042 // check (after moving raw items) if we need to extend levels array
1043 uint8_t levels_needed = compute_levels_needed(tgt.get_k(), new_n);
1044 if (levels_needed > tgt.levels_.size()) {
1045 tgt.levels_.reserve(levels_needed);
1046 while (tgt.levels_.size() < levels_needed) {
1047 Level empty_level(tgt.allocator_);
1048 empty_level.reserve(tgt.get_k());
1049 tgt.levels_.push_back(std::move(empty_level));
1050 }
1051 }
1052
1053 Level scratch_buf(tgt.allocator_);
1054 scratch_buf.reserve(2 * tgt.get_k());
1055
1056 uint64_t src_pattern = src.bit_pattern_;
1057 for (uint8_t src_lvl = 0; src_pattern != 0; ++src_lvl, src_pattern >>= 1) {
1058 if ((src_pattern & 1) > 0) {
1059 scratch_buf.clear();
1060
1061 // propagate-carry
1062 in_place_propagate_carry(src_lvl,
1063 src.levels_[src_lvl], scratch_buf,
1064 false, tgt);
1065 // update n_ at the end
1066 }
1067 }
1068 tgt.n_ = new_n;
1069 if ((tgt.get_n() / (2 * tgt.get_k())) != tgt.bit_pattern_) {
1070 throw std::logic_error("Failed internal consistency check after standard_merge()");
1071 }
1072
1073 // update min and max items
1074 // can't just check is_empty() since min and max might not have been set if
1075 // there were no base buffer items added via update()
1076 if (!tgt.min_item_) {
1077 tgt.min_item_.emplace(conditional_forward<FwdSk>(*src.min_item_));
1078 } else {
1079 if (tgt.comparator_(*src.min_item_, *tgt.min_item_))
1080 *tgt.min_item_ = conditional_forward<FwdSk>(*src.min_item_);
1081 }
1082 if (!tgt.max_item_) {
1083 tgt.max_item_.emplace(conditional_forward<FwdSk>(*src.max_item_));
1084 } else {
1085 if (tgt.comparator_(*tgt.max_item_, *src.max_item_))
1086 *tgt.max_item_ = conditional_forward<FwdSk>(*src.max_item_);
1087 }
1088}
1089
1090template<typename T, typename C, typename A>
1091template<typename FwdSk>
1092void quantiles_sketch<T, C, A>::downsampling_merge(quantiles_sketch& tgt, FwdSk&& src) {
1093 if (src.get_k() % tgt.get_k() != 0) {
1094 throw std::invalid_argument("src.get_k() is not a multiple of tgt.get_k()");
1095 }
1096 if (src.is_empty()) {
1097 return;
1098 }
1099
1100 const uint16_t downsample_factor = src.get_k() / tgt.get_k();
1101 const uint8_t lg_sample_factor = count_trailing_zeros_in_u32(downsample_factor);
1102
1103 const uint64_t new_n = src.get_n() + tgt.get_n();
1104
1105 // move items from src's base buffer
1106 for (uint16_t i = 0; i < src.base_buffer_.size(); ++i) {
1107 tgt.update(conditional_forward<FwdSk>(src.base_buffer_[i]));
1108 }
1109
1110 // check (after moving raw items) if we need to extend levels array
1111 const uint8_t levels_needed = compute_levels_needed(tgt.get_k(), new_n);
1112 if (levels_needed > tgt.levels_.size()) {
1113 tgt.levels_.reserve(levels_needed);
1114 while (tgt.levels_.size() < levels_needed) {
1115 Level empty_level(tgt.allocator_);
1116 empty_level.reserve(tgt.get_k());
1117 tgt.levels_.push_back(std::move(empty_level));
1118 }
1119 }
1120
1121 Level down_buf(tgt.allocator_);
1122 down_buf.reserve(tgt.get_k());
1123
1124 Level scratch_buf(tgt.allocator_);
1125 scratch_buf.reserve(2 * tgt.get_k());
1126
1127 uint64_t src_pattern = src.bit_pattern_;
1128 for (uint8_t src_lvl = 0; src_pattern != 0; ++src_lvl, src_pattern >>= 1) {
1129 if ((src_pattern & 1) > 0) {
1130 down_buf.clear();
1131 scratch_buf.clear();
1132
1133 // zip with stride, leaving input buffer intact
1134 zip_buffer_with_stride(src.levels_[src_lvl], down_buf, downsample_factor);
1135
1136 // propagate-carry
1137 in_place_propagate_carry(src_lvl + lg_sample_factor,
1138 down_buf, scratch_buf,
1139 false, tgt);
1140 // update n_ at the end
1141 }
1142 }
1143 tgt.n_ = new_n;
1144 if ((tgt.get_n() / (2 * tgt.get_k())) != tgt.bit_pattern_) {
1145 throw std::logic_error("Failed internal consistency check after downsampling_merge()");
1146 }
1147
1148 // update min and max items
1149 // can't just check is_empty() since min and max might not have been set if
1150 // there were no base buffer items added via update()
1151 if (!tgt.min_item_) {
1152 tgt.min_item_.emplace(conditional_forward<FwdSk>(*src.min_item_));
1153 } else {
1154 if (tgt.comparator_(*src.min_item_, *tgt.min_item_))
1155 *tgt.min_item_ = conditional_forward<FwdSk>(*src.min_item_);
1156 }
1157 if (!tgt.max_item_) {
1158 tgt.max_item_.emplace(conditional_forward<FwdSk>(*src.max_item_));
1159 } else {
1160 if (tgt.comparator_(*tgt.max_item_, *src.max_item_))
1161 *tgt.max_item_ = conditional_forward<FwdSk>(*src.max_item_);
1162 }
1163}
1164
1165template<typename T, typename C, typename A>
1166uint8_t quantiles_sketch<T, C, A>::lowest_zero_bit_starting_at(uint64_t bits, uint8_t starting_bit) {
1167 uint8_t pos = starting_bit & 0X3F;
1168 uint64_t my_bits = bits >> pos;
1169
1170 while ((my_bits & static_cast<uint64_t>(1)) != 0) {
1171 my_bits >>= 1;
1172 pos++;
1173 }
1174 return pos;
1175}
1176
1177template<typename T, typename C, typename A>
1178class quantiles_sketch<T, C, A>::items_deleter {
1179 public:
1180 items_deleter(const A& allocator, bool destroy, size_t num): allocator_(allocator), destroy_(destroy), num_(num) {}
1181 void operator() (T* ptr) {
1182 if (ptr != nullptr) {
1183 if (destroy_) {
1184 for (size_t i = 0; i < num_; ++i) {
1185 ptr[i].~T();
1186 }
1187 }
1188 allocator_.deallocate(ptr, num_);
1189 }
1190 }
1191 void set_destroy(bool destroy) { destroy_ = destroy; }
1192 private:
1193 A allocator_;
1194 bool destroy_;
1195 size_t num_;
1196};
1197
1198template<typename T, typename C, typename A>
1199void quantiles_sketch<T, C, A>::setup_sorted_view() const {
1200 if (sorted_view_ == nullptr) {
1201 using AllocSortedView = typename std::allocator_traits<A>::template rebind_alloc<quantiles_sorted_view<T, C, A>>;
1202 sorted_view_ = new (AllocSortedView(allocator_).allocate(1)) quantiles_sorted_view<T, C, A>(get_sorted_view());
1203 }
1204}
1205
1206template<typename T, typename C, typename A>
1207void quantiles_sketch<T, C, A>::reset_sorted_view() {
1208 if (sorted_view_ != nullptr) {
1209 sorted_view_->~quantiles_sorted_view();
1210 using AllocSortedView = typename std::allocator_traits<A>::template rebind_alloc<quantiles_sorted_view<T, C, A>>;
1211 AllocSortedView(allocator_).deallocate(sorted_view_, 1);
1212 sorted_view_ = nullptr;
1213 }
1214}
1215
1216// quantiles_sketch::const_iterator implementation
1217
1218template<typename T, typename C, typename A>
1219quantiles_sketch<T, C, A>::const_iterator::const_iterator(const Level& base_buffer,
1220 const std::vector<Level, AllocLevel>& levels,
1221 uint16_t k,
1222 uint64_t n,
1223 bool is_end):
1224base_buffer_(base_buffer),
1225levels_(levels),
1226level_(-1),
1227index_(0),
1228bb_count_(compute_base_buffer_items(k, n)),
1229bit_pattern_(compute_bit_pattern(k, n)),
1230weight_(1),
1231k_(k)
1232{
1233 if (is_end) {
1234 // if exact mode: index_ = n is end
1235 // if sampling, level_ = max_level + 1 and index_ = 0 is end
1236 if (bit_pattern_ == 0) // only a valid check for exact mode in constructor
1237 index_ = static_cast<uint32_t>(n);
1238 else
1239 level_ = static_cast<int>(levels_.size());
1240 } else { // find first non-empty item
1241 if (bb_count_ == 0 && bit_pattern_ > 0) {
1242 level_ = 0;
1243 weight_ = 2;
1244 while ((bit_pattern_ & 0x01) == 0) {
1245 weight_ *= 2;
1246 ++level_;
1247 bit_pattern_ >>= 1;
1248 }
1249 }
1250 }
1251}
1252
1253template<typename T, typename C, typename A>
1254typename quantiles_sketch<T, C, A>::const_iterator& quantiles_sketch<T, C, A>::const_iterator::operator++() {
1255 ++index_;
1256
1257 if ((level_ == -1 && index_ == base_buffer_.size() && levels_.size() > 0) || (level_ >= 0 && index_ == k_)) { // go to the next non-empty level
1258 index_ = 0;
1259 do {
1260 ++level_;
1261 if (level_ > 0) bit_pattern_ = bit_pattern_ >> 1;
1262 if (bit_pattern_ == 0) return *this;
1263 weight_ *= 2;
1264 } while ((bit_pattern_ & static_cast<uint64_t>(1)) == 0);
1265 }
1266 return *this;
1267}
1268
1269template<typename T, typename C, typename A>
1270typename quantiles_sketch<T, C, A>::const_iterator& quantiles_sketch<T, C, A>::const_iterator::operator++(int) {
1271 const_iterator tmp(*this);
1272 operator++();
1273 return tmp;
1274}
1275
1276template<typename T, typename C, typename A>
1277bool quantiles_sketch<T, C, A>::const_iterator::operator==(const const_iterator& other) const {
1278 return level_ == other.level_ && index_ == other.index_;
1279}
1280
1281template<typename T, typename C, typename A>
1282bool quantiles_sketch<T, C, A>::const_iterator::operator!=(const const_iterator& other) const {
1283 return !operator==(other);
1284}
1285
1286template<typename T, typename C, typename A>
1287auto quantiles_sketch<T, C, A>::const_iterator::operator*() const -> reference {
1288 return value_type(level_ == -1 ? base_buffer_[index_] : levels_[level_][index_], weight_);
1289}
1290
1291template<typename T, typename C, typename A>
1292auto quantiles_sketch<T, C, A>::const_iterator::operator->() const -> pointer {
1293 return **this;
1294}
1295
1296} /* namespace datasketches */
1297
1298#endif // _QUANTILES_SKETCH_IMPL_HPP_
This is a stochastic streaming sketch that enables near-real time analysis of the approximate distrib...
Definition quantiles_sketch.hpp:153
Comparator get_comparator() const
Returns an instance of the comparator for this sketch.
Definition quantiles_sketch_impl.hpp:681
quantiles_sorted_view< T, Comparator, Allocator > get_sorted_view() const
Gets the sorted view of this sketch.
Definition quantiles_sketch_impl.hpp:723
quantiles_sketch & operator=(const quantiles_sketch &other)
Copy assignment.
Definition quantiles_sketch_impl.hpp:89
vector_double get_CDF(const T *split_points, uint32_t size, bool inclusive=true) const
Returns an approximation to the Cumulative Distribution Function (CDF), which is the cumulative analo...
Definition quantiles_sketch_impl.hpp:769
uint32_t get_num_retained() const
Returns the number of retained items (samples) in the sketch.
Definition quantiles_sketch_impl.hpp:664
double get_normalized_rank_error(bool is_pmf) const
Gets the normalized rank error for this sketch.
Definition quantiles_sketch_impl.hpp:711
vector_double get_PMF(const T *split_points, uint32_t size, bool inclusive=true) const
Returns an approximation to the Probability Mass Function (PMF) of the input stream given a set of sp...
Definition quantiles_sketch_impl.hpp:762
void merge(FwdSk &&other)
Merges another sketch into this one.
Definition quantiles_sketch_impl.hpp:235
void update(FwdT &&item)
Updates this sketch with the given data item.
Definition quantiles_sketch_impl.hpp:212
void serialize(std::ostream &os, const SerDe &sd=SerDe()) const
This method serializes the sketch into a given stream in a binary form.
Definition quantiles_sketch_impl.hpp:277
string< Allocator > to_string(bool print_levels=false, bool print_items=false) const
Prints a summary of the sketch.
Definition quantiles_sketch_impl.hpp:595
uint16_t get_k() const
Returns configured parameter k.
Definition quantiles_sketch_impl.hpp:644
bool is_empty() const
Returns true if this sketch is empty.
Definition quantiles_sketch_impl.hpp:654
const T & get_max_item() const
Returns the max item of the stream.
Definition quantiles_sketch_impl.hpp:675
static quantiles_sketch deserialize(std::istream &is, const SerDe &sd=SerDe(), const Comparator &comparator=Comparator(), const Allocator &allocator=Allocator())
This method deserializes a sketch from a given stream.
double get_rank(const T &item, bool inclusive=true) const
Returns an approximation to the normalized rank of the given item from 0 to 1, inclusive.
Definition quantiles_sketch_impl.hpp:755
allocator_type get_allocator() const
Returns the allocator for this sketch.
Definition quantiles_sketch_impl.hpp:686
quantile_return_type get_quantile(double rank, bool inclusive=true) const
Returns an approximation to the data item associated with the given rank of a hypothetical sorted ver...
Definition quantiles_sketch_impl.hpp:744
const_iterator begin() const
Iterator pointing to the first item in the sketch.
Definition quantiles_sketch_impl.hpp:866
size_t get_serialized_size_bytes(const SerDe &sd=SerDe()) const
Computes size needed to serialize the current state of the sketch.
Definition quantiles_sketch_impl.hpp:693
bool is_estimation_mode() const
Returns true if this sketch is in estimation mode.
Definition quantiles_sketch_impl.hpp:659
const T & get_min_item() const
Returns the min item of the stream.
Definition quantiles_sketch_impl.hpp:669
const_iterator end() const
Iterator pointing to the past-the-end item in the sketch.
Definition quantiles_sketch_impl.hpp:871
uint64_t get_n() const
Returns the length of the input stream.
Definition quantiles_sketch_impl.hpp:649
Sorted view for quantiles sketches (REQ, KLL and Quantiles)
Definition quantiles_sorted_view.hpp:38
const uint16_t MAX_K
maximum value of parameter K
Definition quantiles_sketch.hpp:41
const uint16_t MIN_K
minimum value of parameter K
Definition quantiles_sketch.hpp:39
DataSketches namespace.
Definition binomial_bounds.hpp:38
Interface for serializing and deserializing items.
Definition serde.hpp:34
void serialize(std::ostream &os, const T *items, unsigned num) const
Stream serialization.