datasketches-cpp
Loading...
Searching...
No Matches
quantiles_sketch_impl.hpp
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#ifndef _QUANTILES_SKETCH_IMPL_HPP_
21#define _QUANTILES_SKETCH_IMPL_HPP_
22
23#include <cmath>
24#include <algorithm>
25#include <stdexcept>
26#include <iomanip>
27#include <sstream>
28#include <type_traits>
29
30#include "count_zeros.hpp"
31#include "conditional_forward.hpp"
32
33namespace datasketches {
34
35template<typename T, typename C, typename A>
36quantiles_sketch<T, C, A>::quantiles_sketch(uint16_t k, const C& comparator, const A& allocator):
37comparator_(comparator),
38allocator_(allocator),
39is_base_buffer_sorted_(true),
40k_(k),
41n_(0),
42bit_pattern_(0),
43base_buffer_(allocator_),
44levels_(allocator_),
45min_item_(),
46max_item_(),
47sorted_view_(nullptr)
48{
49 check_k(k_);
50 base_buffer_.reserve(2 * std::min(quantiles_constants::MIN_K, k));
51}
52
53template<typename T, typename C, typename A>
55comparator_(other.comparator_),
56allocator_(other.allocator_),
57is_base_buffer_sorted_(other.is_base_buffer_sorted_),
58k_(other.k_),
59n_(other.n_),
60bit_pattern_(other.bit_pattern_),
61base_buffer_(other.base_buffer_),
62levels_(other.levels_),
63min_item_(other.min_item_),
64max_item_(other.max_item_),
65sorted_view_(nullptr)
66{
67 for (size_t i = 0; i < levels_.size(); ++i) {
68 if (levels_[i].capacity() != other.levels_[i].capacity()) {
69 levels_[i].reserve(other.levels_[i].capacity());
70 }
71 }
72}
73
74template<typename T, typename C, typename A>
76comparator_(other.comparator_),
77allocator_(other.allocator_),
78is_base_buffer_sorted_(other.is_base_buffer_sorted_),
79k_(other.k_),
80n_(other.n_),
81bit_pattern_(other.bit_pattern_),
82base_buffer_(std::move(other.base_buffer_)),
83levels_(std::move(other.levels_)),
84min_item_(std::move(other.min_item_)),
85max_item_(std::move(other.max_item_)),
86sorted_view_(nullptr)
87{}
88
89template<typename T, typename C, typename A>
91 quantiles_sketch<T, C, A> copy(other);
92 std::swap(comparator_, copy.comparator_);
93 std::swap(allocator_, copy.allocator_);
94 std::swap(is_base_buffer_sorted_, copy.is_base_buffer_sorted_);
95 std::swap(k_, copy.k_);
96 std::swap(n_, copy.n_);
97 std::swap(bit_pattern_, copy.bit_pattern_);
98 std::swap(base_buffer_, copy.base_buffer_);
99 std::swap(levels_, copy.levels_);
100 std::swap(min_item_, copy.min_item_);
101 std::swap(max_item_, copy.max_item_);
102 reset_sorted_view();
103 return *this;
104}
105
106template<typename T, typename C, typename A>
108 std::swap(comparator_, other.comparator_);
109 std::swap(allocator_, other.allocator_);
110 std::swap(is_base_buffer_sorted_, other.is_base_buffer_sorted_);
111 std::swap(k_, other.k_);
112 std::swap(n_, other.n_);
113 std::swap(bit_pattern_, other.bit_pattern_);
114 std::swap(base_buffer_, other.base_buffer_);
115 std::swap(levels_, other.levels_);
116 std::swap(min_item_, other.min_item_);
117 std::swap(max_item_, other.max_item_);
118 reset_sorted_view();
119 return *this;
120}
121
122template<typename T, typename C, typename A>
123quantiles_sketch<T, C, A>::quantiles_sketch(uint16_t k, uint64_t n, uint64_t bit_pattern,
124 Level&& base_buffer, VectorLevels&& levels,
125 optional<T>&& min_item, optional<T>&& max_item,
126 bool is_sorted, const C& comparator, const A& allocator):
127comparator_(comparator),
128allocator_(allocator),
129is_base_buffer_sorted_(is_sorted),
130k_(k),
131n_(n),
132bit_pattern_(bit_pattern),
133base_buffer_(std::move(base_buffer)),
134levels_(std::move(levels)),
135min_item_(std::move(min_item)),
136max_item_(std::move(max_item)),
137sorted_view_(nullptr)
138{
139 uint32_t item_count = static_cast<uint32_t>(base_buffer_.size());
140 for (Level& lvl : levels_) {
141 item_count += static_cast<uint32_t>(lvl.size());
142 }
143 if (item_count != compute_retained_items(k_, n_))
144 throw std::logic_error("Item count does not match value computed from k, n");
145}
146
147template<typename T, typename C, typename A>
148template<typename From, typename FC, typename FA>
149quantiles_sketch<T, C, A>::quantiles_sketch(const quantiles_sketch<From, FC, FA>& other,
150 const C& comparator, const A& allocator):
151comparator_(comparator),
152allocator_(allocator),
153is_base_buffer_sorted_(false),
154k_(other.get_k()),
155n_(other.get_n()),
156bit_pattern_(compute_bit_pattern(other.get_k(), other.get_n())),
157base_buffer_(allocator),
158levels_(allocator),
159min_item_(other.min_item_),
160max_item_(other.max_item_),
161sorted_view_(nullptr)
162{
163 static_assert(std::is_constructible<T, From>::value,
164 "Type converting constructor requires new type to be constructible from existing type");
165
166 base_buffer_.reserve(2 * std::min(quantiles_constants::MIN_K, k_));
167
168 if (!other.is_empty()) {
169 // reserve space in levels
170 const uint8_t num_levels = compute_levels_needed(k_, n_);
171 levels_.reserve(num_levels);
172 for (int i = 0; i < num_levels; ++i) {
173 Level level(allocator);
174 level.reserve(k_);
175 levels_.push_back(std::move(level));
176 }
177
178 // iterate through points, assigning to the correct level as needed
179 for (auto pair : other) {
180 const uint64_t wt = pair.second;
181 if (wt == 1) {
182 base_buffer_.push_back(T(pair.first));
183 // resize where needed as if adding points via update()
184 if (base_buffer_.size() + 1 > base_buffer_.capacity()) {
185 const size_t new_size = std::max(std::min(static_cast<size_t>(2 * k_), 2 * base_buffer_.size()), static_cast<size_t>(1));
186 base_buffer_.reserve(new_size);
187 }
188 }
189 else {
190 const uint8_t idx = count_trailing_zeros_in_u64(pair.second) - 1;
191 levels_[idx].push_back(T(pair.first));
192 }
193 }
194
195 // validate that ordering within each level is preserved
196 // base_buffer_ can be considered unsorted for this purpose
197 for (int i = 0; i < num_levels; ++i) {
198 if (!std::is_sorted(levels_[i].begin(), levels_[i].end(), comparator_)) {
199 throw std::logic_error("Copy construction across types produces invalid sorting");
200 }
201 }
202 }
203}
204
205
206template<typename T, typename C, typename A>
207quantiles_sketch<T, C, A>::~quantiles_sketch() {
208 reset_sorted_view();
209}
210
211template<typename T, typename C, typename A>
212template<typename FwdT>
214 if (!check_update_item(item)) { return; }
215 if (is_empty()) {
216 min_item_.emplace(item);
217 max_item_.emplace(item);
218 } else {
219 if (comparator_(item, *min_item_)) *min_item_ = item;
220 if (comparator_(*max_item_, item)) *max_item_ = item;
221 }
222
223 // if exceed capacity, grow until size 2k -- assumes eager processing
224 if (base_buffer_.size() + 1 > base_buffer_.capacity()) grow_base_buffer();
225
226 base_buffer_.push_back(std::forward<FwdT>(item));
227 ++n_;
228
229 if (base_buffer_.size() > 1) is_base_buffer_sorted_ = false;
230 if (base_buffer_.size() == 2 * k_) process_full_base_buffer();
231 reset_sorted_view();
232}
233
234template<typename T, typename C, typename A>
235template<typename FwdSk>
237 if (other.is_empty()) {
238 return; // nothing to do
239 } else if (!other.is_estimation_mode()) {
240 // other is exact, stream in regardless of k
241 for (auto item : other.base_buffer_) {
242 update(conditional_forward<FwdSk>(item));
243 }
244 reset_sorted_view();
245 return;
246 }
247
248 // other has data and is in estimation mode
249 if (is_estimation_mode()) {
250 if (k_ == other.get_k()) {
251 standard_merge(*this, std::forward<FwdSk>(other));
252 } else if (k_ > other.get_k()) {
253 quantiles_sketch sk_copy(std::forward<FwdSk>(other));
254 downsampling_merge(sk_copy, std::move(*this));
255 *this = std::move(sk_copy);
256 } else { // k_ < other.get_k()
257 downsampling_merge(*this, std::forward<FwdSk>(other));
258 }
259 } else {
260 // exact or empty
261 quantiles_sketch sk_copy(std::forward<FwdSk>(other));
262 if (k_ <= other.get_k()) {
263 if (!is_empty()) {
264 for (uint16_t i = 0; i < base_buffer_.size(); ++i) {
265 sk_copy.update(std::move(base_buffer_[i]));
266 }
267 }
268 } else { // k_ > other.get_k()
269 downsampling_merge(sk_copy, std::move(*this));
270 }
271 *this = std::move(sk_copy);
272 }
273 reset_sorted_view();
274}
275
276template<typename T, typename C, typename A>
277template<typename SerDe>
278void quantiles_sketch<T, C, A>::serialize(std::ostream& os, const SerDe& serde) const {
279 const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_SHORT : PREAMBLE_LONGS_FULL;
280 write(os, preamble_longs);
281 const uint8_t ser_ver = SERIAL_VERSION;
282 write(os, ser_ver);
283 const uint8_t family = FAMILY;
284 write(os, family);
285
286 // side-effect: sort base buffer since always compact
287 std::sort(const_cast<Level&>(base_buffer_).begin(), const_cast<Level&>(base_buffer_).end(), comparator_);
288 const_cast<quantiles_sketch*>(this)->is_base_buffer_sorted_ = true;
289
290 // empty, ordered, compact are valid flags
291 const uint8_t flags_byte(
292 (is_empty() ? 1 << flags::IS_EMPTY : 0)
293 | (1 << flags::IS_SORTED) // always sorted as side effect noted above
294 | (1 << flags::IS_COMPACT) // always compact -- could be optional for numeric types?
295 );
296 write(os, flags_byte);
297 write(os, k_);
298 const uint16_t unused = 0;
299 write(os, unused);
300
301 if (!is_empty()) {
302 write(os, n_);
303
304 // min and max
305 serde.serialize(os, &*min_item_, 1);
306 serde.serialize(os, &*max_item_, 1);
307
308 // base buffer items
309 serde.serialize(os, base_buffer_.data(), static_cast<unsigned>(base_buffer_.size()));
310
311 // levels, only when data is present
312 for (Level lvl : levels_) {
313 if (lvl.size() > 0)
314 serde.serialize(os, lvl.data(), static_cast<unsigned>(lvl.size()));
315 }
316 }
317}
318
319template<typename T, typename C, typename A>
320template<typename SerDe>
321auto quantiles_sketch<T, C, A>::serialize(unsigned header_size_bytes, const SerDe& serde) const -> vector_bytes {
322 const size_t size = get_serialized_size_bytes(serde) + header_size_bytes;
323 vector_bytes bytes(size, 0, allocator_);
324 uint8_t* ptr = bytes.data() + header_size_bytes;
325 const uint8_t* end_ptr = ptr + size;
326
327 const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_SHORT : PREAMBLE_LONGS_FULL;
328 ptr += copy_to_mem(preamble_longs, ptr);
329 const uint8_t ser_ver = SERIAL_VERSION;
330 ptr += copy_to_mem(ser_ver, ptr);
331 const uint8_t family = FAMILY;
332 ptr += copy_to_mem(family, ptr);
333
334 // side-effect: sort base buffer since always compact
335 std::sort(const_cast<Level&>(base_buffer_).begin(), const_cast<Level&>(base_buffer_).end(), comparator_);
336 const_cast<quantiles_sketch*>(this)->is_base_buffer_sorted_ = true;
337
338 // empty, ordered, compact are valid flags
339 const uint8_t flags_byte(
340 (is_empty() ? 1 << flags::IS_EMPTY : 0)
341 | (1 << flags::IS_SORTED) // always sorted as side effect noted above
342 | (1 << flags::IS_COMPACT) // always compact
343 );
344 ptr += copy_to_mem(flags_byte, ptr);
345 ptr += copy_to_mem(k_, ptr);
346 ptr += sizeof(uint16_t); // 2 unused bytes
347
348 if (!is_empty()) {
349
350 ptr += copy_to_mem(n_, ptr);
351
352 // min and max
353 ptr += serde.serialize(ptr, end_ptr - ptr, &*min_item_, 1);
354 ptr += serde.serialize(ptr, end_ptr - ptr, &*max_item_, 1);
355
356 // base buffer items
357 if (base_buffer_.size() > 0)
358 ptr += serde.serialize(ptr, end_ptr - ptr, base_buffer_.data(), static_cast<unsigned>(base_buffer_.size()));
359
360 // levels, only when data is present
361 for (Level lvl : levels_) {
362 if (lvl.size() > 0)
363 ptr += serde.serialize(ptr, end_ptr - ptr, lvl.data(), static_cast<unsigned>(lvl.size()));
364 }
365 }
366
367 return bytes;
368}
369
370template<typename T, typename C, typename A>
371template<typename SerDe>
372auto quantiles_sketch<T, C, A>::deserialize(std::istream &is, const SerDe& serde,
373 const C& comparator, const A &allocator) -> quantiles_sketch {
374 const auto preamble_longs = read<uint8_t>(is);
375 const auto serial_version = read<uint8_t>(is);
376 const auto family_id = read<uint8_t>(is);
377 const auto flags_byte = read<uint8_t>(is);
378 const auto k = read<uint16_t>(is);
379 read<uint16_t>(is); // unused
380
381 check_k(k);
382 check_serial_version(serial_version); // a little redundant with the header check
383 check_family_id(family_id);
384 check_header_validity(preamble_longs, flags_byte, serial_version);
385
386 if (!is.good()) throw std::runtime_error("error reading from std::istream");
387 const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
388 if (is_empty) {
389 return quantiles_sketch(k, comparator, allocator);
390 }
391
392 const auto items_seen = read<uint64_t>(is);
393
394 const bool is_compact = (serial_version == 2) | ((flags_byte & (1 << flags::IS_COMPACT)) > 0);
395 const bool is_sorted = (flags_byte & (1 << flags::IS_SORTED)) > 0;
396
397 optional<T> min_item;
398 optional<T> max_item;
399
400 // Space to deserialize min and max.
401 // serde::deserialize expects allocated but not initialized storage.
402 typename std::aligned_storage<sizeof(T), alignof(T)>::type tmp_storage;
403 T* tmp = reinterpret_cast<T*>(&tmp_storage);
404
405 serde.deserialize(is, tmp, 1);
406 // serde call did not throw, repackage and cleanup
407 min_item.emplace(std::move(*tmp));
408 tmp->~T();
409 serde.deserialize(is, tmp, 1);
410 // serde call did not throw, repackage and cleanup
411 max_item.emplace(std::move(*tmp));
412 tmp->~T();
413
414 if (serial_version == 1) {
415 read<uint64_t>(is); // no longer used
416 }
417
418 // allocate buffers as needed
419 const uint8_t levels_needed = compute_levels_needed(k, items_seen);
420 const uint64_t bit_pattern = compute_bit_pattern(k, items_seen);
421
422 // Java provides a compact storage layout for a sketch of primitive doubles. The C++ version
423 // does not currently operate sketches in compact mode, but will only serialize as compact
424 // to avoid complications around serialization of empty values for generic type T. We also need
425 // to be able to ingest either serialized format from Java.
426
427 // load base buffer
428 const uint32_t bb_items = compute_base_buffer_items(k, items_seen);
429 uint32_t items_to_read = (levels_needed == 0 || is_compact) ? bb_items : 2 * k;
430 Level base_buffer = deserialize_array(is, bb_items, 2 * k, serde, allocator);
431 if (items_to_read > bb_items) { // either equal or greater, never read fewer items
432 // read remaining items, but don't store them
433 deserialize_array(is, items_to_read - bb_items, items_to_read - bb_items, serde, allocator);
434 }
435
436 // populate vector of Levels directly
437 VectorLevels levels(allocator);
438 levels.reserve(levels_needed);
439 if (levels_needed > 0) {
440 uint64_t working_pattern = bit_pattern;
441 for (size_t i = 0; i < levels_needed; ++i, working_pattern >>= 1) {
442 if ((working_pattern & 0x01) == 1) {
443 Level level = deserialize_array(is, k, k, serde, allocator);
444 levels.push_back(std::move(level));
445 } else {
446 Level level(allocator);
447 level.reserve(k);
448 levels.push_back(std::move(level));
449 }
450 }
451 }
452
453 return quantiles_sketch(k, items_seen, bit_pattern,
454 std::move(base_buffer), std::move(levels), std::move(min_item), std::move(max_item), is_sorted,
455 comparator, allocator);
456}
457
458template<typename T, typename C, typename A>
459template<typename SerDe>
460auto quantiles_sketch<T, C, A>::deserialize_array(std::istream& is, uint32_t num_items, uint32_t capacity, const SerDe& serde, const A& allocator) -> Level {
461 A alloc(allocator);
462 std::unique_ptr<T, items_deleter> items(alloc.allocate(num_items), items_deleter(allocator, false, num_items));
463 serde.deserialize(is, items.get(), num_items);
464 // serde did not throw, enable destructors
465 items.get_deleter().set_destroy(true);
466 if (!is.good()) throw std::runtime_error("error reading from std::istream");
467
468 // successfully read, now put into a Level
469 Level level(allocator);
470 level.reserve(capacity);
471 level.insert(level.begin(),
472 std::make_move_iterator(items.get()),
473 std::make_move_iterator(items.get() + num_items));
474 return level;
475}
476
477template<typename T, typename C, typename A>
478template<typename SerDe>
479auto quantiles_sketch<T, C, A>::deserialize(const void* bytes, size_t size, const SerDe& serde,
480 const C& comparator, const A &allocator) -> quantiles_sketch {
481 ensure_minimum_memory(size, 8);
482 const char* ptr = static_cast<const char*>(bytes);
483 const char* end_ptr = static_cast<const char*>(bytes) + size;
484
485 uint8_t preamble_longs;
486 ptr += copy_from_mem(ptr, preamble_longs);
487 uint8_t serial_version;
488 ptr += copy_from_mem(ptr, serial_version);
489 uint8_t family_id;
490 ptr += copy_from_mem(ptr, family_id);
491 uint8_t flags_byte;
492 ptr += copy_from_mem(ptr, flags_byte);
493 uint16_t k;
494 ptr += copy_from_mem(ptr, k);
495 uint16_t unused;
496 ptr += copy_from_mem(ptr, unused);
497
498 check_k(k);
499 check_serial_version(serial_version); // a little redundant with the header check
500 check_family_id(family_id);
501 check_header_validity(preamble_longs, flags_byte, serial_version);
502
503 const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
504 if (is_empty) {
505 return quantiles_sketch(k, comparator, allocator);
506 }
507
508 ensure_minimum_memory(size, 16);
509 uint64_t items_seen;
510 ptr += copy_from_mem(ptr, items_seen);
511
512 const bool is_compact = (serial_version == 2) | ((flags_byte & (1 << flags::IS_COMPACT)) > 0);
513 const bool is_sorted = (flags_byte & (1 << flags::IS_SORTED)) > 0;
514
515 optional<T> min_item;
516 optional<T> max_item;
517
518 // Space to deserialize min and max.
519 // serde::deserialize expects allocated but not initialized storage.
520 typename std::aligned_storage<sizeof(T), alignof(T)>::type tmp_storage;
521 T* tmp = reinterpret_cast<T*>(&tmp_storage);
522
523 ptr += serde.deserialize(ptr, end_ptr - ptr, tmp, 1);
524 // serde call did not throw, repackage and cleanup
525 min_item.emplace(std::move(*tmp));
526 tmp->~T();
527 ptr += serde.deserialize(ptr, end_ptr - ptr, tmp, 1);
528 // serde call did not throw, repackage and cleanup
529 max_item.emplace(std::move(*tmp));
530 tmp->~T();
531
532 if (serial_version == 1) {
533 uint64_t unused_long;
534 ptr += copy_from_mem(ptr, unused_long); // no longer used
535 }
536
537 // allocate buffers as needed
538 const uint8_t levels_needed = compute_levels_needed(k, items_seen);
539 const uint64_t bit_pattern = compute_bit_pattern(k, items_seen);
540
541 // Java provides a compact storage layout for a sketch of primitive doubles. The C++ version
542 // does not currently operate sketches in compact mode, but will only serialize as compact
543 // to avoid complications around serialization of empty values for generic type T. We also need
544 // to be able to ingest either serialized format from Java.
545
546 // load base buffer
547 const uint32_t bb_items = compute_base_buffer_items(k, items_seen);
548 uint32_t items_to_read = (levels_needed == 0 || is_compact) ? bb_items : 2 * k;
549 auto base_buffer_pair = deserialize_array(ptr, end_ptr - ptr, bb_items, 2 * k, serde, allocator);
550 ptr += base_buffer_pair.second;
551 if (items_to_read > bb_items) { // either equal or greater, never read fewer items
552 // read remaining items, only use to advance the pointer
553 auto extras = deserialize_array(ptr, end_ptr - ptr, items_to_read - bb_items, items_to_read - bb_items, serde, allocator);
554 ptr += extras.second;
555 }
556
557 // populate vector of Levels directly
558 VectorLevels levels(allocator);
559 levels.reserve(levels_needed);
560 if (levels_needed > 0) {
561 uint64_t working_pattern = bit_pattern;
562 for (size_t i = 0; i < levels_needed; ++i, working_pattern >>= 1) {
563
564 if ((working_pattern & 0x01) == 1) {
565 auto pair = deserialize_array(ptr, end_ptr - ptr, k, k, serde, allocator);
566 ptr += pair.second;
567 levels.push_back(std::move(pair.first));
568 } else {
569 Level level(allocator);
570 level.reserve(k);
571 levels.push_back(std::move(level));
572 }
573 }
574 }
575
576 return quantiles_sketch(k, items_seen, bit_pattern,
577 std::move(base_buffer_pair.first), std::move(levels), std::move(min_item), std::move(max_item), is_sorted,
578 comparator, allocator);
579}
580
581template<typename T, typename C, typename A>
582template<typename SerDe>
583auto quantiles_sketch<T, C, A>::deserialize_array(const void* bytes, size_t size, uint32_t num_items, uint32_t capacity, const SerDe& serde, const A& allocator)
584 -> std::pair<Level, size_t> {
585 const char* ptr = static_cast<const char*>(bytes);
586 const char* end_ptr = static_cast<const char*>(bytes) + size;
587 A alloc(allocator);
588 std::unique_ptr<T, items_deleter> items(alloc.allocate(num_items), items_deleter(allocator, false, num_items));
589 ptr += serde.deserialize(ptr, end_ptr - ptr, items.get(), num_items);
590 // serde did not throw, enable destructors
591 items.get_deleter().set_destroy(true);
592
593 // successfully read, now put into a Level
594 Level level(allocator);
595 level.reserve(capacity);
596 level.insert(level.begin(),
597 std::make_move_iterator(items.get()),
598 std::make_move_iterator(items.get() + num_items));
599
600 return std::pair<Level, size_t>(std::move(level), ptr - static_cast<const char*>(bytes));
601}
602
603template<typename T, typename C, typename A>
604string<A> quantiles_sketch<T, C, A>::to_string(bool print_levels, bool print_items) const {
605 // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements.
606 // The stream does not support passing an allocator instance, and alternatives are complicated.
607 std::ostringstream os;
608 os << "### Quantiles Sketch summary:" << std::endl;
609 os << " K : " << k_ << std::endl;
610 os << " N : " << n_ << std::endl;
611 os << " Epsilon : " << std::setprecision(3) << get_normalized_rank_error(false) * 100 << "%" << std::endl;
612 os << " Epsilon PMF : " << get_normalized_rank_error(true) * 100 << "%" << std::endl;
613 os << " Empty : " << (is_empty() ? "true" : "false") << std::endl;
614 os << " Estimation mode: " << (is_estimation_mode() ? "true" : "false") << std::endl;
615 os << " Levels (w/o BB): " << levels_.size() << std::endl;
616 os << " Used Levels : " << count_valid_levels(bit_pattern_) << std::endl;
617 os << " Retained items : " << get_num_retained() << std::endl;
618 if (!is_empty()) {
619 os << " Min item : " << *min_item_ << std::endl;
620 os << " Max item : " << *max_item_ << std::endl;
621 }
622 os << "### End sketch summary" << std::endl;
623
624 if (print_levels) {
625 os << "### Quantiles Sketch levels:" << std::endl;
626 os << " index: items in use" << std::endl;
627 os << " BB: " << base_buffer_.size() << std::endl;
628 for (uint8_t i = 0; i < levels_.size(); i++) {
629 os << " " << static_cast<unsigned int>(i) << ": " << levels_[i].size() << std::endl;
630 }
631 os << "### End sketch levels" << std::endl;
632 }
633
634 if (print_items) {
635 os << "### Quantiles Sketch data:" << std::endl;
636 uint8_t level = 0;
637 os << " BB:" << std::endl;
638 for (const T& item : base_buffer_) {
639 os << " " << item << std::endl;
640 }
641 for (uint8_t i = 0; i < levels_.size(); ++i) {
642 os << " level " << static_cast<unsigned int>(level) << ":" << std::endl;
643 for (const T& item : levels_[i]) {
644 os << " " << item << std::endl;
645 }
646 }
647 os << "### End sketch data" << std::endl;
648 }
649 return string<A>(os.str().c_str(), allocator_);
650}
651
652template<typename T, typename C, typename A>
654 return k_;
655}
656
657template<typename T, typename C, typename A>
659 return n_;
660}
661
662template<typename T, typename C, typename A>
664 return n_ == 0;
665}
666
667template<typename T, typename C, typename A>
669 return bit_pattern_ != 0;
670}
671
672template<typename T, typename C, typename A>
674 return compute_retained_items(k_, n_);
675}
676
677template<typename T, typename C, typename A>
679 if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
680 return *min_item_;
681}
682
683template<typename T, typename C, typename A>
685 if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
686 return *max_item_;
687}
688
689template<typename T, typename C, typename A>
691 return comparator_;
692}
693
694template<typename T, typename C, typename A>
696 return allocator_;
697}
698
699// implementation for fixed-size arithmetic types (integral and floating point)
700template<typename T, typename C, typename A>
701template<typename SerDe, typename TT, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
703 if (is_empty()) { return EMPTY_SIZE_BYTES; }
704 return DATA_START + ((get_num_retained() + 2) * sizeof(TT));
705}
706
707// implementation for all other types
708template<typename T, typename C, typename A>
709template<typename SerDe, typename TT, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
711 if (is_empty()) { return EMPTY_SIZE_BYTES; }
712 size_t size = DATA_START;
713 size += serde.size_of_item(*min_item_);
714 size += serde.size_of_item(*max_item_);
715 for (auto it: *this) size += serde.size_of_item(it.first);
716 return size;
717}
718
719template<typename T, typename C, typename A>
721 return get_normalized_rank_error(k_, is_pmf);
722}
723
724template<typename T, typename C, typename A>
726 return is_pmf
727 ? 1.854 / std::pow(k, 0.9657)
728 : 1.576 / std::pow(k, 0.9726);
729}
730
731template<typename T, typename C, typename A>
733 // allow side-effect of sorting the base buffer
734 if (!is_base_buffer_sorted_) {
735 std::sort(const_cast<Level&>(base_buffer_).begin(), const_cast<Level&>(base_buffer_).end(), comparator_);
736 const_cast<quantiles_sketch*>(this)->is_base_buffer_sorted_ = true;
737 }
738 quantiles_sorted_view<T, C, A> view(get_num_retained(), comparator_, allocator_);
739
740 uint64_t weight = 1;
741 view.add(base_buffer_.begin(), base_buffer_.end(), weight);
742 for (const auto& level: levels_) {
743 weight <<= 1;
744 if (level.empty()) { continue; }
745 view.add(level.begin(), level.end(), weight);
746 }
747
748 view.convert_to_cummulative();
749 return view;
750}
751
752template<typename T, typename C, typename A>
753auto quantiles_sketch<T, C, A>::get_quantile(double rank, bool inclusive) const -> quantile_return_type {
754 if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
755 if ((rank < 0.0) || (rank > 1.0)) {
756 throw std::invalid_argument("Normalized rank cannot be less than 0 or greater than 1");
757 }
758 // possible side-effect: sorting base buffer
759 setup_sorted_view();
760 return sorted_view_->get_quantile(rank, inclusive);
761}
762
763template<typename T, typename C, typename A>
764double quantiles_sketch<T, C, A>::get_rank(const T& item, bool inclusive) const {
765 if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
766 setup_sorted_view();
767 return sorted_view_->get_rank(item, inclusive);
768}
769
770template<typename T, typename C, typename A>
771auto quantiles_sketch<T, C, A>::get_PMF(const T* split_points, uint32_t size, bool inclusive) const -> vector_double {
772 if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
773 setup_sorted_view();
774 return sorted_view_->get_PMF(split_points, size, inclusive);
775}
776
777template<typename T, typename C, typename A>
778auto quantiles_sketch<T, C, A>::get_CDF(const T* split_points, uint32_t size, bool inclusive) const -> vector_double {
779 if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
780 setup_sorted_view();
781 return sorted_view_->get_CDF(split_points, size, inclusive);
782}
783
784template<typename T, typename C, typename A>
785uint32_t quantiles_sketch<T, C, A>::compute_retained_items(uint16_t k, uint64_t n) {
786 const uint32_t bb_count = compute_base_buffer_items(k, n);
787 const uint64_t bit_pattern = compute_bit_pattern(k, n);
788 const uint32_t valid_levels = count_valid_levels(bit_pattern);
789 return bb_count + (k * valid_levels);
790}
791
792template<typename T, typename C, typename A>
793uint32_t quantiles_sketch<T, C, A>::compute_base_buffer_items(uint16_t k, uint64_t n) {
794 return n % (static_cast<uint64_t>(2) * k);
795}
796
797template<typename T, typename C, typename A>
798uint64_t quantiles_sketch<T, C, A>::compute_bit_pattern(uint16_t k, uint64_t n) {
799 return n / (static_cast<uint64_t>(2) * k);
800}
801
802template<typename T, typename C, typename A>
803uint32_t quantiles_sketch<T, C, A>::count_valid_levels(uint64_t bit_pattern) {
804 uint32_t count = 0;
805 for (; bit_pattern > 0; ++count) bit_pattern &= bit_pattern - 1;
806 return count;
807}
808
809template<typename T, typename C, typename A>
810uint8_t quantiles_sketch<T, C, A>::compute_levels_needed(uint16_t k, uint64_t n) {
811 return static_cast<uint8_t>(64U) - count_leading_zeros_in_u64(n / (2 * k));
812}
813
814template<typename T, typename C, typename A>
815void quantiles_sketch<T, C, A>::check_k(uint16_t k) {
816 if (k < quantiles_constants::MIN_K || k > quantiles_constants::MAX_K || (k & (k - 1)) != 0) {
817 throw std::invalid_argument("k must be a power of 2 that is >= "
818 + std::to_string(quantiles_constants::MIN_K) + " and <= "
819 + std::to_string(quantiles_constants::MAX_K) + ". Found: " + std::to_string(k));
820 }
821}
822
823template<typename T, typename C, typename A>
824void quantiles_sketch<T, C, A>::check_serial_version(uint8_t serial_version) {
825 if (serial_version == SERIAL_VERSION || serial_version == SERIAL_VERSION_1 || serial_version == SERIAL_VERSION_2)
826 return;
827 else
828 throw std::invalid_argument("Possible corruption. Unrecognized serialization version: " + std::to_string(serial_version));
829}
830
831template<typename T, typename C, typename A>
832void quantiles_sketch<T, C, A>::check_family_id(uint8_t family_id) {
833 if (family_id == FAMILY)
834 return;
835 else
836 throw std::invalid_argument("Possible corruption. Family id does not indicate quantiles sketch: " + std::to_string(family_id));
837}
838
839template<typename T, typename C, typename A>
840void quantiles_sketch<T, C, A>::check_header_validity(uint8_t preamble_longs, uint8_t flags_byte, uint8_t serial_version) {
841 const bool empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
842 const bool compact = (flags_byte & (1 << flags::IS_COMPACT)) > 0;
843
844 const uint8_t sw = (compact ? 1 : 0) + (2 * (empty ? 1 : 0))
845 + (4 * (serial_version & 0xF)) + (32 * (preamble_longs & 0x3F));
846 bool valid = true;
847
848 switch (sw) { // exhaustive list and description of all valid cases
849 case 38 : break;
850 case 164 : break;
851 case 42 : break;
852 case 72 : break;
853 case 47 : break; // compact, empty, serVer = 3, preLongs = 1;
854 case 46 : break;
855 case 79 : break; // compact, empty, serVer = 3, preLongs = 2;
856 case 78 : break;
857 case 77 : break; // compact, !empty, serVer = 3, preLongs = 2;
858 case 76 : break;
859 default : //all other case values are invalid
860 valid = false;
861 }
862
863 if (!valid) {
864 std::ostringstream os;
865 os << "Possible sketch corruption. Inconsistent state: "
866 << "preamble_longs = " << preamble_longs
867 << ", empty = " << (empty ? "true" : "false")
868 << ", serialization_version = " << serial_version
869 << ", compact = " << (compact ? "true" : "false");
870 throw std::invalid_argument(os.str());
871 }
872}
873
874template <typename T, typename C, typename A>
878
879template <typename T, typename C, typename A>
883
884template<typename T, typename C, typename A>
886 const size_t new_size = std::max(std::min(static_cast<size_t>(2 * k_), 2 * base_buffer_.size()), static_cast<size_t>(1));
887 base_buffer_.reserve(new_size);
888}
889
890template<typename T, typename C, typename A>
891void quantiles_sketch<T, C, A>::process_full_base_buffer() {
892 // make sure there will be enough levels for the propagation
893 grow_levels_if_needed(); // note: n_ was already incremented by update() before this
894
895 std::sort(base_buffer_.begin(), base_buffer_.end(), comparator_);
896 in_place_propagate_carry(0,
897 levels_[0], // unused here, but 0 is guaranteed to exist
898 base_buffer_,
899 true, *this);
900 base_buffer_.clear();
901 is_base_buffer_sorted_ = true;
902 if (n_ / (2 * k_) != bit_pattern_) {
903 throw std::logic_error("Internal error: n / 2k (" + std::to_string(n_ / 2 * k_)
904 + " != bit_pattern " + std::to_string(bit_pattern_));
905 }
906}
907
908template<typename T, typename C, typename A>
909bool quantiles_sketch<T, C, A>::grow_levels_if_needed() {
910 const uint8_t levels_needed = compute_levels_needed(k_, n_);
911 if (levels_needed == 0)
912 return false; // don't need levels and might have small base buffer. Possible during merges.
913
914 // from here on, assume full size base buffer (2k) and at least one additional level
915 if (levels_needed <= levels_.size())
916 return false;
917
918 Level empty_level(allocator_);
919 empty_level.reserve(k_);
920 levels_.push_back(std::move(empty_level));
921 return true;
922}
923
924template<typename T, typename C, typename A>
925template<typename FwdV>
926void quantiles_sketch<T, C, A>::in_place_propagate_carry(uint8_t starting_level,
927 FwdV&& buf_size_k, Level& buf_size_2k,
928 bool apply_as_update,
929 quantiles_sketch& sketch) {
930 const uint64_t bit_pattern = sketch.bit_pattern_;
931 const int k = sketch.k_;
932
933 uint8_t ending_level = lowest_zero_bit_starting_at(bit_pattern, starting_level);
934
935 if (apply_as_update) {
936 // update version of computation
937 // its is okay for buf_size_k to be null in this case
938 zip_buffer(buf_size_2k, sketch.levels_[ending_level]);
939 } else {
940 // merge_into version of computation
941 for (uint16_t i = 0; i < k; ++i) {
942 sketch.levels_[ending_level].push_back(conditional_forward<FwdV>(buf_size_k[i]));
943 }
944 }
945
946 for (uint64_t lvl = starting_level; lvl < ending_level; lvl++) {
947 if ((bit_pattern & (static_cast<uint64_t>(1) << lvl)) == 0) {
948 throw std::logic_error("unexpected empty level in bit_pattern");
949 }
950 merge_two_size_k_buffers(
951 sketch.levels_[lvl],
952 sketch.levels_[ending_level],
953 buf_size_2k, sketch.get_comparator());
954 sketch.levels_[lvl].clear();
955 sketch.levels_[ending_level].clear();
956 zip_buffer(buf_size_2k, sketch.levels_[ending_level]);
957 } // end of loop over lower levels
958
959 // update bit pattern with binary-arithmetic ripple carry
960 sketch.bit_pattern_ = bit_pattern + (static_cast<uint64_t>(1) << starting_level);
961}
962
963template<typename T, typename C, typename A>
964void quantiles_sketch<T, C, A>::zip_buffer(Level& buf_in, Level& buf_out) {
965#ifdef QUANTILES_VALIDATION
966 static uint32_t next_offset = 0;
967 uint32_t rand_offset = next_offset;
968 next_offset = 1 - next_offset;
969#else
970 uint32_t rand_offset = random_utils::random_bit();
971#endif
972 if ((buf_in.size() != 2 * buf_out.capacity())
973 || (buf_out.size() > 0)) {
974 throw std::logic_error("zip_buffer requires buf_in.size() == "
975 "2*buf_out.capacity() and empty buf_out");
976 }
977
978 size_t k = buf_out.capacity();
979 for (uint32_t i = rand_offset, o = 0; o < k; i += 2, ++o) {
980 buf_out.push_back(std::move(buf_in[i]));
981 }
982 buf_in.clear();
983}
984
985template<typename T, typename C, typename A>
986template<typename FwdV>
987void quantiles_sketch<T, C, A>::zip_buffer_with_stride(FwdV&& buf_in, Level& buf_out, uint16_t stride) {
988 // Random offset in range [0, stride)
989 std::uniform_int_distribution<uint16_t> dist(0, stride - 1);
990 const uint16_t rand_offset = dist(random_utils::rand);
991
992 if ((buf_in.size() != stride * buf_out.capacity())
993 || (buf_out.size() > 0)) {
994 throw std::logic_error("zip_buffer_with_stride requires buf_in.size() == "
995 "stride*buf_out.capacity() and empty buf_out");
996 }
997
998 const size_t k = buf_out.capacity();
999 for (uint16_t i = rand_offset, o = 0; o < k; i += stride, ++o) {
1000 buf_out.push_back(conditional_forward<FwdV>(buf_in[i]));
1001 }
1002 // do not clear input buffer
1003}
1004
1005template<typename T, typename C, typename A>
1006void quantiles_sketch<T, C, A>::merge_two_size_k_buffers(Level& src_1, Level& src_2,
1007 Level& dst, const C& comparator) {
1008 if (src_1.size() != src_2.size()
1009 || src_1.size() * 2 != dst.capacity()
1010 || dst.size() != 0) {
1011 throw std::logic_error("Input invariants violated in merge_two_size_k_buffers()");
1012 }
1013
1014 auto end1 = src_1.end(), end2 = src_2.end();
1015 auto it1 = src_1.begin(), it2 = src_2.begin();
1016
1017 // TODO: probably actually doing copies given Level&?
1018 while (it1 != end1 && it2 != end2) {
1019 if (comparator(*it1, *it2)) {
1020 dst.push_back(std::move(*it1++));
1021 } else {
1022 dst.push_back(std::move(*it2++));
1023 }
1024 }
1025
1026 if (it1 != end1) {
1027 dst.insert(dst.end(), it1, end1);
1028 } else {
1029 if (it2 == end2) { throw std::logic_error("it2 unexpectedly already at end of range"); }
1030 dst.insert(dst.end(), it2, end2);
1031 }
1032}
1033
1034template<typename T, typename C, typename A>
1035template<typename FwdSk>
1036void quantiles_sketch<T, C, A>::standard_merge(quantiles_sketch& tgt, FwdSk&& src) {
1037 if (src.get_k() != tgt.get_k()) {
1038 throw std::invalid_argument("src.get_k() != tgt.get_k()");
1039 }
1040 if (src.is_empty()) {
1041 return;
1042 }
1043
1044 uint64_t new_n = src.get_n() + tgt.get_n();
1045
1046 // move items from src's base buffer
1047 for (uint16_t i = 0; i < src.base_buffer_.size(); ++i) {
1048 tgt.update(conditional_forward<FwdSk>(src.base_buffer_[i]));
1049 }
1050
1051 // check (after moving raw items) if we need to extend levels array
1052 uint8_t levels_needed = compute_levels_needed(tgt.get_k(), new_n);
1053 if (levels_needed > tgt.levels_.size()) {
1054 tgt.levels_.reserve(levels_needed);
1055 while (tgt.levels_.size() < levels_needed) {
1056 Level empty_level(tgt.allocator_);
1057 empty_level.reserve(tgt.get_k());
1058 tgt.levels_.push_back(std::move(empty_level));
1059 }
1060 }
1061
1062 Level scratch_buf(tgt.allocator_);
1063 scratch_buf.reserve(2 * tgt.get_k());
1064
1065 uint64_t src_pattern = src.bit_pattern_;
1066 for (uint8_t src_lvl = 0; src_pattern != 0; ++src_lvl, src_pattern >>= 1) {
1067 if ((src_pattern & 1) > 0) {
1068 scratch_buf.clear();
1069
1070 // propagate-carry
1071 in_place_propagate_carry(src_lvl,
1072 src.levels_[src_lvl], scratch_buf,
1073 false, tgt);
1074 // update n_ at the end
1075 }
1076 }
1077 tgt.n_ = new_n;
1078 if ((tgt.get_n() / (2 * tgt.get_k())) != tgt.bit_pattern_) {
1079 throw std::logic_error("Failed internal consistency check after standard_merge()");
1080 }
1081
1082 // update min and max items
1083 // can't just check is_empty() since min and max might not have been set if
1084 // there were no base buffer items added via update()
1085 if (!tgt.min_item_) {
1086 tgt.min_item_.emplace(conditional_forward<FwdSk>(*src.min_item_));
1087 } else {
1088 if (tgt.comparator_(*src.min_item_, *tgt.min_item_))
1089 *tgt.min_item_ = conditional_forward<FwdSk>(*src.min_item_);
1090 }
1091 if (!tgt.max_item_) {
1092 tgt.max_item_.emplace(conditional_forward<FwdSk>(*src.max_item_));
1093 } else {
1094 if (tgt.comparator_(*tgt.max_item_, *src.max_item_))
1095 *tgt.max_item_ = conditional_forward<FwdSk>(*src.max_item_);
1096 }
1097}
1098
1099template<typename T, typename C, typename A>
1100template<typename FwdSk>
1101void quantiles_sketch<T, C, A>::downsampling_merge(quantiles_sketch& tgt, FwdSk&& src) {
1102 if (src.get_k() % tgt.get_k() != 0) {
1103 throw std::invalid_argument("src.get_k() is not a multiple of tgt.get_k()");
1104 }
1105 if (src.is_empty()) {
1106 return;
1107 }
1108
1109 const uint16_t downsample_factor = src.get_k() / tgt.get_k();
1110 const uint8_t lg_sample_factor = count_trailing_zeros_in_u32(downsample_factor);
1111
1112 const uint64_t new_n = src.get_n() + tgt.get_n();
1113
1114 // move items from src's base buffer
1115 for (uint16_t i = 0; i < src.base_buffer_.size(); ++i) {
1116 tgt.update(conditional_forward<FwdSk>(src.base_buffer_[i]));
1117 }
1118
1119 // check (after moving raw items) if we need to extend levels array
1120 const uint8_t levels_needed = compute_levels_needed(tgt.get_k(), new_n);
1121 if (levels_needed > tgt.levels_.size()) {
1122 tgt.levels_.reserve(levels_needed);
1123 while (tgt.levels_.size() < levels_needed) {
1124 Level empty_level(tgt.allocator_);
1125 empty_level.reserve(tgt.get_k());
1126 tgt.levels_.push_back(std::move(empty_level));
1127 }
1128 }
1129
1130 Level down_buf(tgt.allocator_);
1131 down_buf.reserve(tgt.get_k());
1132
1133 Level scratch_buf(tgt.allocator_);
1134 scratch_buf.reserve(2 * tgt.get_k());
1135
1136 uint64_t src_pattern = src.bit_pattern_;
1137 for (uint8_t src_lvl = 0; src_pattern != 0; ++src_lvl, src_pattern >>= 1) {
1138 if ((src_pattern & 1) > 0) {
1139 down_buf.clear();
1140 scratch_buf.clear();
1141
1142 // zip with stride, leaving input buffer intact
1143 zip_buffer_with_stride(src.levels_[src_lvl], down_buf, downsample_factor);
1144
1145 // propagate-carry
1146 in_place_propagate_carry(src_lvl + lg_sample_factor,
1147 down_buf, scratch_buf,
1148 false, tgt);
1149 // update n_ at the end
1150 }
1151 }
1152 tgt.n_ = new_n;
1153 if ((tgt.get_n() / (2 * tgt.get_k())) != tgt.bit_pattern_) {
1154 throw std::logic_error("Failed internal consistency check after downsampling_merge()");
1155 }
1156
1157 // update min and max items
1158 // can't just check is_empty() since min and max might not have been set if
1159 // there were no base buffer items added via update()
1160 if (!tgt.min_item_) {
1161 tgt.min_item_.emplace(conditional_forward<FwdSk>(*src.min_item_));
1162 } else {
1163 if (tgt.comparator_(*src.min_item_, *tgt.min_item_))
1164 *tgt.min_item_ = conditional_forward<FwdSk>(*src.min_item_);
1165 }
1166 if (!tgt.max_item_) {
1167 tgt.max_item_.emplace(conditional_forward<FwdSk>(*src.max_item_));
1168 } else {
1169 if (tgt.comparator_(*tgt.max_item_, *src.max_item_))
1170 *tgt.max_item_ = conditional_forward<FwdSk>(*src.max_item_);
1171 }
1172}
1173
1174template<typename T, typename C, typename A>
1175uint8_t quantiles_sketch<T, C, A>::lowest_zero_bit_starting_at(uint64_t bits, uint8_t starting_bit) {
1176 uint8_t pos = starting_bit & 0X3F;
1177 uint64_t my_bits = bits >> pos;
1178
1179 while ((my_bits & static_cast<uint64_t>(1)) != 0) {
1180 my_bits >>= 1;
1181 pos++;
1182 }
1183 return pos;
1184}
1185
1186template<typename T, typename C, typename A>
1187class quantiles_sketch<T, C, A>::items_deleter {
1188 public:
1189 items_deleter(const A& allocator, bool destroy, size_t num): allocator_(allocator), destroy_(destroy), num_(num) {}
1190 void operator() (T* ptr) {
1191 if (ptr != nullptr) {
1192 if (destroy_) {
1193 for (size_t i = 0; i < num_; ++i) {
1194 ptr[i].~T();
1195 }
1196 }
1197 allocator_.deallocate(ptr, num_);
1198 }
1199 }
1200 void set_destroy(bool destroy) { destroy_ = destroy; }
1201 private:
1202 A allocator_;
1203 bool destroy_;
1204 size_t num_;
1205};
1206
1207template<typename T, typename C, typename A>
1208void quantiles_sketch<T, C, A>::setup_sorted_view() const {
1209 if (sorted_view_ == nullptr) {
1210 using AllocSortedView = typename std::allocator_traits<A>::template rebind_alloc<quantiles_sorted_view<T, C, A>>;
1211 sorted_view_ = new (AllocSortedView(allocator_).allocate(1)) quantiles_sorted_view<T, C, A>(get_sorted_view());
1212 }
1213}
1214
1215template<typename T, typename C, typename A>
1216void quantiles_sketch<T, C, A>::reset_sorted_view() {
1217 if (sorted_view_ != nullptr) {
1218 sorted_view_->~quantiles_sorted_view();
1219 using AllocSortedView = typename std::allocator_traits<A>::template rebind_alloc<quantiles_sorted_view<T, C, A>>;
1220 AllocSortedView(allocator_).deallocate(sorted_view_, 1);
1221 sorted_view_ = nullptr;
1222 }
1223}
1224
1225// quantiles_sketch::const_iterator implementation
1226
1227template<typename T, typename C, typename A>
1228quantiles_sketch<T, C, A>::const_iterator::const_iterator(const Level& base_buffer,
1229 const std::vector<Level, AllocLevel>& levels,
1230 uint16_t k,
1231 uint64_t n,
1232 bool is_end):
1233base_buffer_(base_buffer),
1234levels_(levels),
1235level_(-1),
1236index_(0),
1237bb_count_(compute_base_buffer_items(k, n)),
1238bit_pattern_(compute_bit_pattern(k, n)),
1239weight_(1),
1240k_(k)
1241{
1242 if (is_end) {
1243 // if exact mode: index_ = n is end
1244 // if sampling, level_ = max_level + 1 and index_ = 0 is end
1245 if (bit_pattern_ == 0) // only a valid check for exact mode in constructor
1246 index_ = static_cast<uint32_t>(n);
1247 else
1248 level_ = static_cast<int>(levels_.size());
1249 } else { // find first non-empty item
1250 if (bb_count_ == 0 && bit_pattern_ > 0) {
1251 level_ = 0;
1252 weight_ = 2;
1253 while ((bit_pattern_ & 0x01) == 0) {
1254 weight_ *= 2;
1255 ++level_;
1256 bit_pattern_ >>= 1;
1257 }
1258 }
1259 }
1260}
1261
1262template<typename T, typename C, typename A>
1263typename quantiles_sketch<T, C, A>::const_iterator& quantiles_sketch<T, C, A>::const_iterator::operator++() {
1264 ++index_;
1265
1266 if ((level_ == -1 && index_ == base_buffer_.size() && levels_.size() > 0) || (level_ >= 0 && index_ == k_)) { // go to the next non-empty level
1267 index_ = 0;
1268 do {
1269 ++level_;
1270 if (level_ > 0) bit_pattern_ = bit_pattern_ >> 1;
1271 if (bit_pattern_ == 0) return *this;
1272 weight_ *= 2;
1273 } while ((bit_pattern_ & static_cast<uint64_t>(1)) == 0);
1274 }
1275 return *this;
1276}
1277
1278template<typename T, typename C, typename A>
1279typename quantiles_sketch<T, C, A>::const_iterator& quantiles_sketch<T, C, A>::const_iterator::operator++(int) {
1280 const_iterator tmp(*this);
1281 operator++();
1282 return tmp;
1283}
1284
1285template<typename T, typename C, typename A>
1286bool quantiles_sketch<T, C, A>::const_iterator::operator==(const const_iterator& other) const {
1287 return level_ == other.level_ && index_ == other.index_;
1288}
1289
1290template<typename T, typename C, typename A>
1291bool quantiles_sketch<T, C, A>::const_iterator::operator!=(const const_iterator& other) const {
1292 return !operator==(other);
1293}
1294
1295template<typename T, typename C, typename A>
1296auto quantiles_sketch<T, C, A>::const_iterator::operator*() const -> reference {
1297 return value_type(level_ == -1 ? base_buffer_[index_] : levels_[level_][index_], weight_);
1298}
1299
1300template<typename T, typename C, typename A>
1301auto quantiles_sketch<T, C, A>::const_iterator::operator->() const -> pointer {
1302 return **this;
1303}
1304
1305} /* namespace datasketches */
1306
1307#endif // _QUANTILES_SKETCH_IMPL_HPP_
This is a stochastic streaming sketch that enables near-real time analysis of the approximate distrib...
Definition quantiles_sketch.hpp:158
Comparator get_comparator() const
Returns an instance of the comparator for this sketch.
Definition quantiles_sketch_impl.hpp:690
quantiles_sorted_view< T, Comparator, Allocator > get_sorted_view() const
Gets the sorted view of this sketch.
Definition quantiles_sketch_impl.hpp:732
quantiles_sketch & operator=(const quantiles_sketch &other)
Copy assignment.
Definition quantiles_sketch_impl.hpp:90
vector_double get_CDF(const T *split_points, uint32_t size, bool inclusive=true) const
Returns an approximation to the Cumulative Distribution Function (CDF), which is the cumulative analo...
Definition quantiles_sketch_impl.hpp:778
uint32_t get_num_retained() const
Returns the number of retained items (samples) in the sketch.
Definition quantiles_sketch_impl.hpp:673
double get_normalized_rank_error(bool is_pmf) const
Gets the normalized rank error for this sketch.
Definition quantiles_sketch_impl.hpp:720
vector_double get_PMF(const T *split_points, uint32_t size, bool inclusive=true) const
Returns an approximation to the Probability Mass Function (PMF) of the input stream given a set of sp...
Definition quantiles_sketch_impl.hpp:771
void merge(FwdSk &&other)
Merges another sketch into this one.
Definition quantiles_sketch_impl.hpp:236
void update(FwdT &&item)
Updates this sketch with the given data item.
Definition quantiles_sketch_impl.hpp:213
void serialize(std::ostream &os, const SerDe &sd=SerDe()) const
This method serializes the sketch into a given stream in a binary form.
Definition quantiles_sketch_impl.hpp:278
string< Allocator > to_string(bool print_levels=false, bool print_items=false) const
Prints a summary of the sketch.
Definition quantiles_sketch_impl.hpp:604
uint16_t get_k() const
Returns configured parameter k.
Definition quantiles_sketch_impl.hpp:653
bool is_empty() const
Returns true if this sketch is empty.
Definition quantiles_sketch_impl.hpp:663
const T & get_max_item() const
Returns the max item of the stream.
Definition quantiles_sketch_impl.hpp:684
static quantiles_sketch deserialize(std::istream &is, const SerDe &sd=SerDe(), const Comparator &comparator=Comparator(), const Allocator &allocator=Allocator())
This method deserializes a sketch from a given stream.
double get_rank(const T &item, bool inclusive=true) const
Returns an approximation to the normalized rank of the given item from 0 to 1, inclusive.
Definition quantiles_sketch_impl.hpp:764
allocator_type get_allocator() const
Returns the allocator for this sketch.
Definition quantiles_sketch_impl.hpp:695
quantile_return_type get_quantile(double rank, bool inclusive=true) const
Returns an approximation to the data item associated with the given rank of a hypothetical sorted ver...
Definition quantiles_sketch_impl.hpp:753
const_iterator begin() const
Iterator pointing to the first item in the sketch.
Definition quantiles_sketch_impl.hpp:875
size_t get_serialized_size_bytes(const SerDe &sd=SerDe()) const
Computes size needed to serialize the current state of the sketch.
Definition quantiles_sketch_impl.hpp:702
bool is_estimation_mode() const
Returns true if this sketch is in estimation mode.
Definition quantiles_sketch_impl.hpp:668
const T & get_min_item() const
Returns the min item of the stream.
Definition quantiles_sketch_impl.hpp:678
const_iterator end() const
Iterator pointing to the past-the-end item in the sketch.
Definition quantiles_sketch_impl.hpp:880
uint64_t get_n() const
Returns the length of the input stream.
Definition quantiles_sketch_impl.hpp:658
Sorted view for quantiles sketches (REQ, KLL and Quantiles)
Definition quantiles_sorted_view.hpp:38
const uint16_t MAX_K
maximum value of parameter K
Definition quantiles_sketch.hpp:41
const uint16_t MIN_K
minimum value of parameter K
Definition quantiles_sketch.hpp:39
DataSketches namespace.
Definition binomial_bounds.hpp:38
Interface for serializing and deserializing items.
Definition serde.hpp:34
void serialize(std::ostream &os, const T *items, unsigned num) const
Stream serialization.