20#ifndef KLL_SKETCH_IMPL_HPP_
21#define KLL_SKETCH_IMPL_HPP_
29#include "conditional_forward.hpp"
30#include "count_zeros.hpp"
31#include "memory_operations.hpp"
32#include "kll_helper.hpp"
36template<
typename T,
typename C,
typename A>
38comparator_(comparator),
41m_(kll_constants::DEFAULT_M),
44is_level_zero_sorted_(false),
46levels_(2, 0, allocator),
57 levels_[0] = levels_[1] = k;
58 items_ = allocator_.allocate(items_size_);
61template<
typename T,
typename C,
typename A>
63comparator_(other.comparator_),
64allocator_(other.allocator_),
68num_levels_(other.num_levels_),
69is_level_zero_sorted_(other.is_level_zero_sorted_),
71levels_(other.levels_),
73items_size_(other.items_size_),
74min_item_(other.min_item_),
75max_item_(other.max_item_),
78 items_ = allocator_.allocate(items_size_);
79 for (
auto i = levels_[0]; i < levels_[num_levels_]; ++i)
new (&items_[i]) T(other.items_[i]);
82template<
typename T,
typename C,
typename A>
84comparator_(std::move(other.comparator_)),
85allocator_(std::move(other.allocator_)),
89num_levels_(other.num_levels_),
90is_level_zero_sorted_(other.is_level_zero_sorted_),
92levels_(std::move(other.levels_)),
94items_size_(other.items_size_),
95min_item_(std::move(other.min_item_)),
96max_item_(std::move(other.max_item_)),
99 other.items_ =
nullptr;
102template<
typename T,
typename C,
typename A>
105 std::swap(comparator_, copy.comparator_);
106 std::swap(allocator_, copy.allocator_);
107 std::swap(k_, copy.k_);
108 std::swap(m_, copy.m_);
109 std::swap(min_k_, copy.min_k_);
110 std::swap(num_levels_, copy.num_levels_);
111 std::swap(is_level_zero_sorted_, copy.is_level_zero_sorted_);
112 std::swap(n_, copy.n_);
113 std::swap(levels_, copy.levels_);
114 std::swap(items_, copy.items_);
115 std::swap(items_size_, copy.items_size_);
116 std::swap(min_item_, copy.min_item_);
117 std::swap(max_item_, copy.max_item_);
122template<
typename T,
typename C,
typename A>
124 std::swap(comparator_, other.comparator_);
125 std::swap(allocator_, other.allocator_);
126 std::swap(k_, other.k_);
127 std::swap(m_, other.m_);
128 std::swap(min_k_, other.min_k_);
129 std::swap(num_levels_, other.num_levels_);
130 std::swap(is_level_zero_sorted_, other.is_level_zero_sorted_);
131 std::swap(n_, other.n_);
132 std::swap(levels_, other.levels_);
133 std::swap(items_, other.items_);
134 std::swap(items_size_, other.items_size_);
135 std::swap(min_item_, other.min_item_);
136 std::swap(max_item_, other.max_item_);
141template<
typename T,
typename C,
typename A>
143 if (items_ !=
nullptr) {
144 const uint32_t begin = levels_[0];
145 const uint32_t end = levels_[num_levels_];
146 for (uint32_t i = begin; i < end; i++) items_[i].~T();
147 allocator_.deallocate(items_, items_size_);
152template<
typename T,
typename C,
typename A>
153template<
typename TT,
typename CC,
typename AA>
154kll_sketch<T, C, A>::kll_sketch(
const kll_sketch<TT, CC, AA>& other,
const C& comparator,
const A& allocator):
155comparator_(comparator),
156allocator_(allocator),
160num_levels_(other.num_levels_),
161is_level_zero_sorted_(other.is_level_zero_sorted_),
163levels_(other.levels_, allocator_),
165items_size_(other.items_size_),
166min_item_(other.min_item_),
167max_item_(other.max_item_),
171 std::is_constructible<T, TT>::value,
172 "Type converting constructor requires new type to be constructible from existing type"
174 items_ = allocator_.allocate(items_size_);
175 for (
auto i = levels_[0]; i < levels_[num_levels_]; ++i)
new (&items_[i]) T(other.items_[i]);
179template<
typename T,
typename C,
typename A>
180template<
typename FwdT>
182 if (!check_update_item(item)) {
return; }
183 update_min_max(
static_cast<const T&
>(item));
184 const uint32_t index = internal_update();
185 new (&items_[index]) T(std::forward<FwdT>(item));
189template<
typename T,
typename C,
typename A>
192 min_item_.emplace(item);
193 max_item_.emplace(item);
195 if (comparator_(item, *min_item_)) *min_item_ = item;
196 if (comparator_(*max_item_, item)) *max_item_ = item;
200template<
typename T,
typename C,
typename A>
201uint32_t kll_sketch<T, C, A>::internal_update() {
202 if (levels_[0] == 0) { compress_while_updating(); }
204 is_level_zero_sorted_ =
false;
208template<
typename T,
typename C,
typename A>
209template<
typename FwdSk>
211 if (other.is_empty()) {
return; }
212 if (m_ != other.m_) {
213 throw std::invalid_argument(
"incompatible M: " + std::to_string(m_) +
" and " + std::to_string(other.m_));
216 min_item_.emplace(conditional_forward<FwdSk>(*other.min_item_));
217 max_item_.emplace(conditional_forward<FwdSk>(*other.max_item_));
219 if (comparator_(*other.min_item_, *min_item_)) *min_item_ = conditional_forward<FwdSk>(*other.min_item_);
220 if (comparator_(*max_item_, *other.max_item_)) *max_item_ = conditional_forward<FwdSk>(*other.max_item_);
222 const uint64_t final_n = n_ + other.n_;
223 for (uint32_t i = other.levels_[0]; i < other.levels_[1]; i++) {
224 const uint32_t index = internal_update();
225 new (&items_[index]) T(conditional_forward<FwdSk>(other.items_[i]));
227 if (other.num_levels_ >= 2) { merge_higher_levels(other, final_n); }
229 if (other.is_estimation_mode()) { min_k_ = std::min(min_k_, other.min_k_); }
230 assert_correct_total_weight();
234template<
typename T,
typename C,
typename A>
239template<
typename T,
typename C,
typename A>
244template<
typename T,
typename C,
typename A>
249template<
typename T,
typename C,
typename A>
251 return levels_[num_levels_] - levels_[0];
254template<
typename T,
typename C,
typename A>
256 return num_levels_ > 1;
259template<
typename T,
typename C,
typename A>
261 if (is_empty()) {
throw std::runtime_error(
"operation is undefined for an empty sketch"); }
265template<
typename T,
typename C,
typename A>
267 if (is_empty()) {
throw std::runtime_error(
"operation is undefined for an empty sketch"); }
271template<
typename T,
typename C,
typename A>
276template<
typename T,
typename C,
typename A>
281template<
typename T,
typename C,
typename A>
283 if (is_empty()) {
throw std::runtime_error(
"operation is undefined for an empty sketch"); }
285 return sorted_view_->get_rank(item, inclusive);
288template<
typename T,
typename C,
typename A>
290 if (is_empty()) {
throw std::runtime_error(
"operation is undefined for an empty sketch"); }
292 return sorted_view_->get_PMF(split_points, size, inclusive);
295template<
typename T,
typename C,
typename A>
297 if (is_empty()) {
throw std::runtime_error(
"operation is undefined for an empty sketch"); }
299 return sorted_view_->get_CDF(split_points, size, inclusive);
302template<
typename T,
typename C,
typename A>
304 if (is_empty()) {
throw std::runtime_error(
"operation is undefined for an empty sketch"); }
305 if ((rank < 0.0) || (rank > 1.0)) {
306 throw std::invalid_argument(
"normalized rank cannot be less than zero or greater than 1.0");
310 return sorted_view_->get_quantile(rank, inclusive);
313template<
typename T,
typename C,
typename A>
315 return get_normalized_rank_error(min_k_, pmf);
319template<
typename T,
typename C,
typename A>
320template<typename TT, typename SerDe, typename std::enable_if<std::is_arithmetic<TT>::value,
int>::type>
322 if (is_empty()) {
return EMPTY_SIZE_BYTES; }
323 if (num_levels_ == 1 && get_num_retained() == 1) {
324 return DATA_START_SINGLE_ITEM +
sizeof(TT);
327 return DATA_START + num_levels_ *
sizeof(uint32_t) + (get_num_retained() + 2) *
sizeof(TT);
331template<
typename T,
typename C,
typename A>
332template<typename TT, typename SerDe, typename std::enable_if<!std::is_arithmetic<TT>::value,
int>::type>
334 if (is_empty()) {
return EMPTY_SIZE_BYTES; }
335 if (num_levels_ == 1 && get_num_retained() == 1) {
336 return DATA_START_SINGLE_ITEM + sd.size_of_item(items_[levels_[0]]);
339 size_t size = DATA_START + num_levels_ *
sizeof(uint32_t);
340 size += sd.size_of_item(*min_item_);
341 size += sd.size_of_item(*max_item_);
342 for (
auto it: *this) size += sd.size_of_item(it.first);
347template<
typename T,
typename C,
typename A>
348template<typename TT, typename std::enable_if<std::is_arithmetic<TT>::value,
int>::type>
350 const uint8_t num_levels = kll_helper::ub_on_num_levels(n);
351 const uint32_t max_num_retained = kll_helper::compute_total_capacity(k, kll_constants::DEFAULT_M, num_levels);
353 return DATA_START + num_levels *
sizeof(uint32_t) + (max_num_retained + 2) *
sizeof(TT);
357template<
typename T,
typename C,
typename A>
358template<typename TT, typename std::enable_if<!std::is_arithmetic<TT>::value,
int>::type>
360 const uint8_t num_levels = kll_helper::ub_on_num_levels(n);
361 const uint32_t max_num_retained = kll_helper::compute_total_capacity(k, kll_constants::DEFAULT_M, num_levels);
363 return DATA_START + num_levels *
sizeof(uint32_t) + (max_num_retained + 2) * max_item_size_bytes;
366template<
typename T,
typename C,
typename A>
367template<
typename SerDe>
369 const bool is_single_item = n_ == 1;
370 const uint8_t preamble_ints(is_empty() || is_single_item ? PREAMBLE_INTS_SHORT : PREAMBLE_INTS_FULL);
371 write(os, preamble_ints);
372 const uint8_t serial_version(is_single_item ? SERIAL_VERSION_2 : SERIAL_VERSION_1);
373 write(os, serial_version);
374 const uint8_t family(FAMILY);
376 const uint8_t flags_byte(
377 (is_empty() ? 1 << flags::IS_EMPTY : 0)
378 | (is_level_zero_sorted_ ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0)
379 | (is_single_item ? 1 << flags::IS_SINGLE_ITEM : 0)
381 write(os, flags_byte);
384 const uint8_t unused = 0;
386 if (is_empty())
return;
387 if (!is_single_item) {
390 write(os, num_levels_);
392 write(os, levels_.data(),
sizeof(levels_[0]) * num_levels_);
393 sd.serialize(os, &*min_item_, 1);
394 sd.serialize(os, &*max_item_, 1);
396 sd.serialize(os, &items_[levels_[0]], get_num_retained());
399template<
typename T,
typename C,
typename A>
400template<
typename SerDe>
402 const bool is_single_item = n_ == 1;
403 const size_t size = header_size_bytes + get_serialized_size_bytes(sd);
404 vector_bytes bytes(size, 0, allocator_);
405 uint8_t* ptr = bytes.data() + header_size_bytes;
406 const uint8_t* end_ptr = ptr + size;
407 const uint8_t preamble_ints(is_empty() || is_single_item ? PREAMBLE_INTS_SHORT : PREAMBLE_INTS_FULL);
408 ptr += copy_to_mem(preamble_ints, ptr);
409 const uint8_t serial_version(is_single_item ? SERIAL_VERSION_2 : SERIAL_VERSION_1);
410 ptr += copy_to_mem(serial_version, ptr);
411 const uint8_t family(FAMILY);
412 ptr += copy_to_mem(family, ptr);
413 const uint8_t flags_byte(
414 (is_empty() ? 1 << flags::IS_EMPTY : 0)
415 | (is_level_zero_sorted_ ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0)
416 | (is_single_item ? 1 << flags::IS_SINGLE_ITEM : 0)
418 ptr += copy_to_mem(flags_byte, ptr);
419 ptr += copy_to_mem(k_, ptr);
420 ptr += copy_to_mem(m_, ptr);
421 ptr +=
sizeof(uint8_t);
423 if (!is_single_item) {
424 ptr += copy_to_mem(n_, ptr);
425 ptr += copy_to_mem(min_k_, ptr);
426 ptr += copy_to_mem(num_levels_, ptr);
427 ptr +=
sizeof(uint8_t);
428 ptr += copy_to_mem(levels_.data(), ptr,
sizeof(levels_[0]) * num_levels_);
429 ptr += sd.serialize(ptr, end_ptr - ptr, &*min_item_, 1);
430 ptr += sd.serialize(ptr, end_ptr - ptr, &*max_item_, 1);
432 const size_t bytes_remaining = end_ptr - ptr;
433 ptr += sd.serialize(ptr, bytes_remaining, &items_[levels_[0]], get_num_retained());
435 const size_t delta = ptr - bytes.data();
436 if (delta != size)
throw std::logic_error(
"serialized size mismatch: " + std::to_string(delta)
437 +
" != " + std::to_string(size));
441template<
typename T,
typename C,
typename A>
442template<
typename SerDe>
444 const C& comparator,
const A& allocator) {
445 const auto preamble_ints = read<uint8_t>(is);
446 const auto serial_version = read<uint8_t>(is);
447 const auto family_id = read<uint8_t>(is);
448 const auto flags_byte = read<uint8_t>(is);
449 const auto k = read<uint16_t>(is);
450 const auto m = read<uint8_t>(is);
454 check_preamble_ints(preamble_ints, flags_byte);
455 check_serial_version(serial_version);
456 check_family_id(family_id);
458 if (!is.good())
throw std::runtime_error(
"error reading from std::istream");
459 const bool is_empty(flags_byte & (1 << flags::IS_EMPTY));
460 if (is_empty)
return kll_sketch(k, comparator, allocator);
465 const bool is_single_item(flags_byte & (1 << flags::IS_SINGLE_ITEM));
466 if (is_single_item) {
471 n = read<uint64_t>(is);
472 min_k = read<uint16_t>(is);
473 num_levels = read<uint8_t>(is);
476 vector_u32 levels(num_levels + 1, 0, allocator);
477 const uint32_t capacity(kll_helper::compute_total_capacity(k, m, num_levels));
478 if (is_single_item) {
479 levels[0] = capacity - 1;
482 read(is, levels.data(),
sizeof(levels[0]) * num_levels);
484 levels[num_levels] = capacity;
485 optional<T> min_item;
486 optional<T> max_item;
487 if (!is_single_item) {
490 typename std::aligned_storage<
sizeof(T),
alignof(T)>::type tmp_storage;
491 T* tmp =
reinterpret_cast<T*
>(&tmp_storage);
493 sd.deserialize(is, tmp, 1);
495 min_item.emplace(std::move(*tmp));
497 sd.deserialize(is, tmp, 1);
499 max_item.emplace(std::move(*tmp));
503 auto items_buffer_deleter = [capacity, &alloc](T* ptr) { alloc.deallocate(ptr, capacity); };
504 std::unique_ptr<T,
decltype(items_buffer_deleter)> items_buffer(alloc.allocate(capacity), items_buffer_deleter);
505 const auto num_items = levels[num_levels] - levels[0];
506 sd.deserialize(is, &items_buffer.get()[levels[0]], num_items);
508 std::unique_ptr<T, items_deleter> items(items_buffer.release(), items_deleter(levels[0], capacity, allocator));
509 const bool is_level_zero_sorted = (flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED)) > 0;
510 if (is_single_item) {
511 min_item.emplace(items.get()[levels[0]]);
512 max_item.emplace(items.get()[levels[0]]);
515 throw std::runtime_error(
"error reading from std::istream");
516 return kll_sketch(k, min_k, n, num_levels, std::move(levels), std::move(items), capacity,
517 std::move(min_item), std::move(max_item), is_level_zero_sorted, comparator);
520template<
typename T,
typename C,
typename A>
521template<
typename SerDe>
523 const C& comparator,
const A& allocator) {
524 ensure_minimum_memory(size, 8);
525 const char* ptr =
static_cast<const char*
>(bytes);
526 uint8_t preamble_ints;
527 ptr += copy_from_mem(ptr, preamble_ints);
528 uint8_t serial_version;
529 ptr += copy_from_mem(ptr, serial_version);
531 ptr += copy_from_mem(ptr, family_id);
533 ptr += copy_from_mem(ptr, flags_byte);
535 ptr += copy_from_mem(ptr, k);
537 ptr += copy_from_mem(ptr, m);
538 ptr +=
sizeof(uint8_t);
541 check_preamble_ints(preamble_ints, flags_byte);
542 check_serial_version(serial_version);
543 check_family_id(family_id);
544 ensure_minimum_memory(size, preamble_ints *
sizeof(uint32_t));
546 const bool is_empty(flags_byte & (1 << flags::IS_EMPTY));
547 if (is_empty)
return kll_sketch(k, comparator, allocator);
552 const bool is_single_item(flags_byte & (1 << flags::IS_SINGLE_ITEM));
553 const char* end_ptr =
static_cast<const char*
>(bytes) + size;
554 if (is_single_item) {
559 ptr += copy_from_mem(ptr, n);
560 ptr += copy_from_mem(ptr, min_k);
561 ptr += copy_from_mem(ptr, num_levels);
562 ptr +=
sizeof(uint8_t);
564 vector_u32 levels(num_levels + 1, 0, allocator);
565 const uint32_t capacity(kll_helper::compute_total_capacity(k, m, num_levels));
566 if (is_single_item) {
567 levels[0] = capacity - 1;
570 ptr += copy_from_mem(ptr, levels.data(),
sizeof(levels[0]) * num_levels);
572 levels[num_levels] = capacity;
573 optional<T> min_item;
574 optional<T> max_item;
575 if (!is_single_item) {
578 typename std::aligned_storage<
sizeof(T),
alignof(T)>::type tmp_storage;
579 T* tmp =
reinterpret_cast<T*
>(&tmp_storage);
581 ptr += sd.deserialize(ptr, end_ptr - ptr, tmp, 1);
583 min_item.emplace(std::move(*tmp));
585 ptr += sd.deserialize(ptr, end_ptr - ptr, tmp, 1);
587 max_item.emplace(std::move(*tmp));
591 auto items_buffer_deleter = [capacity, &alloc](T* ptr) { alloc.deallocate(ptr, capacity); };
592 std::unique_ptr<T,
decltype(items_buffer_deleter)> items_buffer(alloc.allocate(capacity), items_buffer_deleter);
593 const auto num_items = levels[num_levels] - levels[0];
594 ptr += sd.deserialize(ptr, end_ptr - ptr, &items_buffer.get()[levels[0]], num_items);
596 std::unique_ptr<T, items_deleter> items(items_buffer.release(), items_deleter(levels[0], capacity, allocator));
597 const size_t delta = ptr -
static_cast<const char*
>(bytes);
598 if (delta != size)
throw std::logic_error(
"deserialized size mismatch: " + std::to_string(delta) +
" != " + std::to_string(size));
599 const bool is_level_zero_sorted = (flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED)) > 0;
600 if (is_single_item) {
601 min_item.emplace(items.get()[levels[0]]);
602 max_item.emplace(items.get()[levels[0]]);
604 return kll_sketch(k, min_k, n, num_levels, std::move(levels), std::move(items), capacity,
605 std::move(min_item), std::move(max_item), is_level_zero_sorted, comparator);
615template<
typename T,
typename C,
typename A>
618 ? 2.446 / pow(k, 0.9433)
619 : 2.296 / pow(k, 0.9723);
623template<
typename T,
typename C,
typename A>
624kll_sketch<T, C, A>::kll_sketch(uint16_t k, uint16_t min_k, uint64_t n, uint8_t num_levels, vector_u32&& levels,
625 std::unique_ptr<T, items_deleter> items, uint32_t items_size, optional<T>&& min_item,
626 optional<T>&& max_item,
bool is_level_zero_sorted,
const C& comparator):
627comparator_(comparator),
628allocator_(levels.get_allocator()),
630m_(kll_constants::DEFAULT_M),
632num_levels_(num_levels),
633is_level_zero_sorted_(is_level_zero_sorted),
635levels_(std::move(levels)),
636items_(items.release()),
637items_size_(items_size),
638min_item_(std::move(min_item)),
639max_item_(std::move(max_item)),
645template<
typename T,
typename C,
typename A>
646void kll_sketch<T, C, A>::compress_while_updating(
void) {
647 const uint8_t level = find_level_to_compact();
652 if (level == (num_levels_ - 1)) {
653 add_empty_top_level_to_completely_full_sketch();
656 const uint32_t raw_beg = levels_[level];
657 const uint32_t raw_lim = levels_[level + 1];
659 const uint32_t pop_above = levels_[level + 2] - raw_lim;
660 const uint32_t raw_pop = raw_lim - raw_beg;
661 const bool odd_pop = kll_helper::is_odd(raw_pop);
662 const uint32_t adj_beg = odd_pop ? raw_beg + 1 : raw_beg;
663 const uint32_t adj_pop = odd_pop ? raw_pop - 1 : raw_pop;
664 const uint32_t half_adj_pop = adj_pop / 2;
665 const uint32_t destroy_beg = levels_[0];
669 if ((level == 0) && !is_level_zero_sorted_) {
670 std::sort(items_ + adj_beg, items_ + adj_beg + adj_pop, comparator_);
672 if (pop_above == 0) {
673 kll_helper::randomly_halve_up(items_, adj_beg, adj_pop);
675 kll_helper::randomly_halve_down(items_, adj_beg, adj_pop);
676 kll_helper::merge_sorted_arrays<T, C>(items_, adj_beg, half_adj_pop, raw_lim, pop_above, adj_beg + half_adj_pop);
678 levels_[level + 1] -= half_adj_pop;
680 levels_[level] = levels_[level + 1] - 1;
681 if (levels_[level] != raw_beg) items_[levels_[level]] = std::move(items_[raw_beg]);
683 levels_[level] = levels_[level + 1];
687 if (levels_[level] != (raw_beg + half_adj_pop))
throw std::logic_error(
"compaction error");
692 const uint32_t amount = raw_beg - levels_[0];
693 std::move_backward(items_ + levels_[0], items_ + levels_[0] + amount, items_ + levels_[0] + half_adj_pop + amount);
694 for (uint8_t lvl = 0; lvl < level; lvl++) levels_[lvl] += half_adj_pop;
696 for (uint32_t i = 0; i < half_adj_pop; i++) items_[i + destroy_beg].~T();
699template<
typename T,
typename C,
typename A>
700uint8_t kll_sketch<T, C, A>::find_level_to_compact()
const {
703 if (level >= num_levels_)
throw std::logic_error(
"capacity calculation error");
704 const uint32_t pop = levels_[level + 1] - levels_[level];
705 const uint32_t cap = kll_helper::level_capacity(k_, num_levels_, level, m_);
713template<
typename T,
typename C,
typename A>
714void kll_sketch<T, C, A>::add_empty_top_level_to_completely_full_sketch() {
715 const uint32_t cur_total_cap = levels_[num_levels_];
718 if (levels_[0] != 0)
throw std::logic_error(
"full sketch expected");
719 if (items_size_ != cur_total_cap)
throw std::logic_error(
"current capacity mismatch");
722 const uint8_t new_levels_size = num_levels_ + 2;
723 if (levels_.size() < new_levels_size) {
724 levels_.resize(new_levels_size);
727 const uint32_t delta_cap = kll_helper::level_capacity(k_, num_levels_ + 1, 0, m_);
728 const uint32_t new_total_cap = cur_total_cap + delta_cap;
731 T* new_buf = allocator_.allocate(new_total_cap);
732 kll_helper::move_construct<T>(items_, 0, cur_total_cap, new_buf, delta_cap,
true);
733 allocator_.deallocate(items_, items_size_);
735 items_size_ = new_total_cap;
738 for (uint8_t i = 0; i <= num_levels_; i++) {
739 levels_[i] += delta_cap;
742 if (levels_[num_levels_] != new_total_cap)
throw std::logic_error(
"new capacity mismatch");
745 levels_[num_levels_] = new_total_cap;
748template<
typename T,
typename C,
typename A>
749void kll_sketch<T, C, A>::sort_level_zero() {
750 if (!is_level_zero_sorted_) {
751 std::sort(items_ + levels_[0], items_ + levels_[1], comparator_);
752 is_level_zero_sorted_ =
true;
756template<
typename T,
typename C,
typename A>
757void kll_sketch<T, C, A>::check_sorting()
const {
759 for (uint8_t level = 1; level < num_levels_; ++level) {
760 const auto from = items_ + levels_[level];
761 const auto to = items_ + levels_[level + 1];
762 if (!std::is_sorted(from, to, comparator_)) {
763 throw std::logic_error(
"levels must be sorted");
768template<
typename T,
typename C,
typename A>
770 const_cast<kll_sketch*
>(
this)->sort_level_zero();
772 for (uint8_t level = 0; level < num_levels_; ++level) {
773 const auto from = items_ + levels_[level];
774 const auto to = items_ + levels_[level + 1];
775 view.add(from, to, 1ULL << level);
777 view.convert_to_cummulative();
781template<
typename T,
typename C,
typename A>
784 const uint32_t tmp_num_items = get_num_retained() + other.get_num_retained_above_level_zero();
786 auto tmp_items_deleter = [tmp_num_items, &alloc](T* ptr) { alloc.deallocate(ptr, tmp_num_items); };
787 const std::unique_ptr<T,
decltype(tmp_items_deleter)> workbuf(allocator_.allocate(tmp_num_items), tmp_items_deleter);
788 const uint8_t ub = kll_helper::ub_on_num_levels(final_n);
789 const size_t work_levels_size = ub + 2;
790 vector_u32 worklevels(work_levels_size, 0, allocator_);
791 vector_u32 outlevels(work_levels_size, 0, allocator_);
793 const uint8_t provisional_num_levels = std::max(num_levels_, other.num_levels_);
795 populate_work_arrays(std::forward<O>(other), workbuf.get(), worklevels.data(), provisional_num_levels);
797 const kll_helper::compress_result result = kll_helper::general_compress<T, C>(k_, m_, provisional_num_levels, workbuf.get(),
798 worklevels.data(), outlevels.data(), is_level_zero_sorted_);
801 if (result.final_num_levels > ub)
throw std::logic_error(
"merge error");
804 if (result.final_capacity != items_size_) {
805 allocator_.deallocate(items_, items_size_);
806 items_size_ = result.final_capacity;
807 items_ = allocator_.allocate(items_size_);
809 const uint32_t free_space_at_bottom = result.final_capacity - result.final_num_items;
810 kll_helper::move_construct<T>(workbuf.get(), outlevels[0], outlevels[0] + result.final_num_items, items_, free_space_at_bottom,
true);
812 const size_t new_levels_size = result.final_num_levels + 1;
813 if (levels_.size() < new_levels_size) {
814 levels_.resize(new_levels_size);
816 const uint32_t offset = free_space_at_bottom - outlevels[0];
817 for (uint8_t lvl = 0; lvl < levels_.size(); lvl++) {
818 levels_[lvl] = outlevels[lvl] + offset;
820 num_levels_ = result.final_num_levels;
824template<
typename T,
typename C,
typename A>
825template<
typename FwdSk>
826void kll_sketch<T, C, A>::populate_work_arrays(FwdSk&& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels) {
830 kll_helper::move_construct<T>(items_, levels_[0], levels_[1], workbuf, 0,
true);
831 worklevels[1] = safe_level_size(0);
833 for (uint8_t lvl = 1; lvl < provisional_num_levels; lvl++) {
834 const uint32_t self_pop = safe_level_size(lvl);
835 const uint32_t other_pop = other.safe_level_size(lvl);
836 worklevels[lvl + 1] = worklevels[lvl] + self_pop + other_pop;
838 if ((self_pop > 0) && (other_pop == 0)) {
839 kll_helper::move_construct<T>(items_, levels_[lvl], levels_[lvl] + self_pop, workbuf, worklevels[lvl],
true);
840 }
else if ((self_pop == 0) && (other_pop > 0)) {
841 for (
auto i = other.levels_[lvl], j = worklevels[lvl]; i < other.levels_[lvl] + other_pop; ++i, ++j) {
842 new (&workbuf[j]) T(conditional_forward<FwdSk>(other.items_[i]));
844 }
else if ((self_pop > 0) && (other_pop > 0)) {
845 kll_helper::merge_sorted_arrays<T, C>(items_, levels_[lvl], self_pop, other.items_, other.levels_[lvl], other_pop, workbuf, worklevels[lvl]);
850template<
typename T,
typename C,
typename A>
851void kll_sketch<T, C, A>::assert_correct_total_weight()
const {
852 const uint64_t total(kll_helper::sum_the_sample_weights(num_levels_, levels_.data()));
854 throw std::logic_error(
"Total weight does not match N");
858template<
typename T,
typename C,
typename A>
859uint32_t kll_sketch<T, C, A>::safe_level_size(uint8_t level)
const {
860 if (level >= num_levels_)
return 0;
861 return levels_[level + 1] - levels_[level];
864template<
typename T,
typename C,
typename A>
865uint32_t kll_sketch<T, C, A>::get_num_retained_above_level_zero()
const {
866 if (num_levels_ == 1)
return 0;
867 return levels_[num_levels_] - levels_[1];
870template<
typename T,
typename C,
typename A>
871void kll_sketch<T, C, A>::check_m(uint8_t m) {
872 if (m != kll_constants::DEFAULT_M) {
873 throw std::invalid_argument(
"Possible corruption: M must be " + std::to_string(kll_constants::DEFAULT_M)
874 +
": " + std::to_string(m));
878template<
typename T,
typename C,
typename A>
879void kll_sketch<T, C, A>::check_preamble_ints(uint8_t preamble_ints, uint8_t flags_byte) {
880 const bool is_empty(flags_byte & (1 << flags::IS_EMPTY));
881 const bool is_single_item(flags_byte & (1 << flags::IS_SINGLE_ITEM));
882 if (is_empty || is_single_item) {
883 if (preamble_ints != PREAMBLE_INTS_SHORT) {
884 throw std::invalid_argument(
"Possible corruption: preamble ints must be "
885 + std::to_string(PREAMBLE_INTS_SHORT) +
" for an empty or single item sketch: " + std::to_string(preamble_ints));
888 if (preamble_ints != PREAMBLE_INTS_FULL) {
889 throw std::invalid_argument(
"Possible corruption: preamble ints must be "
890 + std::to_string(PREAMBLE_INTS_FULL) +
" for a sketch with more than one item: " + std::to_string(preamble_ints));
895template<
typename T,
typename C,
typename A>
896void kll_sketch<T, C, A>::check_serial_version(uint8_t serial_version) {
897 if (serial_version != SERIAL_VERSION_1 && serial_version != SERIAL_VERSION_2) {
898 throw std::invalid_argument(
"Possible corruption: serial version mismatch: expected "
899 + std::to_string(SERIAL_VERSION_1) +
" or " + std::to_string(SERIAL_VERSION_2)
900 +
", got " + std::to_string(serial_version));
904template<
typename T,
typename C,
typename A>
905void kll_sketch<T, C, A>::check_family_id(uint8_t family_id) {
906 if (family_id != FAMILY) {
907 throw std::invalid_argument(
"Possible corruption: family mismatch: expected "
908 + std::to_string(FAMILY) +
", got " + std::to_string(family_id));
912template <
typename T,
typename C,
typename A>
916 std::ostringstream os;
917 os <<
"### KLL sketch summary:" << std::endl;
918 os <<
" K : " << k_ << std::endl;
919 os <<
" min K : " << min_k_ << std::endl;
920 os <<
" M : " << (
unsigned int) m_ << std::endl;
921 os <<
" N : " << n_ << std::endl;
922 os <<
" Epsilon : " << std::setprecision(3) << get_normalized_rank_error(
false) * 100 <<
"%" << std::endl;
923 os <<
" Epsilon PMF : " << get_normalized_rank_error(
true) * 100 <<
"%" << std::endl;
924 os <<
" Empty : " << (is_empty() ?
"true" :
"false") << std::endl;
925 os <<
" Estimation mode: " << (is_estimation_mode() ?
"true" :
"false") << std::endl;
926 os <<
" Levels : " << (
unsigned int) num_levels_ << std::endl;
927 os <<
" Sorted : " << (is_level_zero_sorted_ ?
"true" :
"false") << std::endl;
928 os <<
" Capacity items : " << items_size_ << std::endl;
929 os <<
" Retained items : " << get_num_retained() << std::endl;
931 os <<
" Min item : " << *min_item_ << std::endl;
932 os <<
" Max item : " << *max_item_ << std::endl;
934 os <<
"### End sketch summary" << std::endl;
937 os <<
"### KLL sketch levels:" << std::endl;
938 os <<
" index: nominal capacity, actual size" << std::endl;
939 for (uint8_t i = 0; i < num_levels_; i++) {
940 os <<
" " << (
unsigned int) i <<
": " << kll_helper::level_capacity(k_, num_levels_, i, m_) <<
", " << safe_level_size(i) << std::endl;
942 os <<
"### End sketch levels" << std::endl;
946 os <<
"### KLL sketch data:" << std::endl;
948 while (level < num_levels_) {
949 const uint32_t from_index = levels_[level];
950 const uint32_t to_index = levels_[level + 1];
951 if (from_index < to_index) {
952 os <<
" level " << (
unsigned int) level <<
":" << std::endl;
954 for (uint32_t i = from_index; i < to_index; i++) {
955 os <<
" " << items_[i] << std::endl;
959 os <<
"### End sketch data" << std::endl;
961 return string<A>(os.str().c_str(), allocator_);
964template <
typename T,
typename C,
typename A>
969template <
typename T,
typename C,
typename A>
974template<
typename T,
typename C,
typename A>
977 items_deleter(uint32_t start, uint32_t num,
const A& allocator):
978 allocator_(allocator), start_(start), num_(num) {}
979 void operator() (T* ptr) {
980 if (ptr !=
nullptr) {
981 for (uint32_t i = start_; i < num_; ++i) ptr[i].~T();
982 allocator_.deallocate(ptr, num_);
991template<
typename T,
typename C,
typename A>
992void kll_sketch<T, C, A>::setup_sorted_view()
const {
993 if (sorted_view_ ==
nullptr) {
994 using AllocSortedView =
typename std::allocator_traits<A>::template rebind_alloc<quantiles_sorted_view<T, C, A>>;
995 sorted_view_ =
new (AllocSortedView(allocator_).allocate(1)) quantiles_sorted_view<T, C, A>(get_sorted_view());
999template<
typename T,
typename C,
typename A>
1000void kll_sketch<T, C, A>::reset_sorted_view() {
1001 if (sorted_view_ !=
nullptr) {
1002 sorted_view_->~quantiles_sorted_view();
1003 using AllocSortedView =
typename std::allocator_traits<A>::template rebind_alloc<quantiles_sorted_view<T, C, A>>;
1004 AllocSortedView(allocator_).deallocate(sorted_view_, 1);
1005 sorted_view_ =
nullptr;
1011template<
typename T,
typename C,
typename A>
1012kll_sketch<T, C, A>::const_iterator::const_iterator(
const T* items,
const uint32_t* levels,
const uint8_t num_levels):
1013items(items), levels(levels), num_levels(num_levels), index(items == nullptr ? levels[num_levels] : levels[0]), level(items == nullptr ? num_levels : 0), weight(1)
1016template<
typename T,
typename C,
typename A>
1017typename kll_sketch<T, C, A>::const_iterator& kll_sketch<T, C, A>::const_iterator::operator++() {
1019 if (index == levels[level + 1]) {
1023 }
while (level < num_levels && levels[level] == levels[level + 1]);
1028template<
typename T,
typename C,
typename A>
1029typename kll_sketch<T, C, A>::const_iterator& kll_sketch<T, C, A>::const_iterator::operator++(
int) {
1030 const_iterator tmp(*
this);
1035template<
typename T,
typename C,
typename A>
1036bool kll_sketch<T, C, A>::const_iterator::operator==(
const const_iterator& other)
const {
1037 return index == other.index;
1040template<
typename T,
typename C,
typename A>
1041bool kll_sketch<T, C, A>::const_iterator::operator!=(
const const_iterator& other)
const {
1042 return !operator==(other);
1045template<
typename T,
typename C,
typename A>
1046auto kll_sketch<T, C, A>::const_iterator::operator*() const -> reference {
1047 return value_type(items[index], weight);
1050template<
typename T,
typename C,
typename A>
1051auto kll_sketch<T, C, A>::const_iterator::operator->() const -> pointer {
Implementation of a very compact quantiles sketch with lazy compaction scheme and nearly optimal accu...
Definition kll_sketch.hpp:171
C get_comparator() const
Returns an instance of the comparator for this sketch.
Definition kll_sketch_impl.hpp:272
quantiles_sorted_view< T, C, A > get_sorted_view() const
Gets the sorted view of this sketch.
Definition kll_sketch_impl.hpp:769
kll_sketch & operator=(const kll_sketch &other)
Copy assignment.
Definition kll_sketch_impl.hpp:103
vector_double get_CDF(const T *split_points, uint32_t size, bool inclusive=true) const
Returns an approximation to the Cumulative Distribution Function (CDF), which is the cumulative analo...
Definition kll_sketch_impl.hpp:296
const_iterator begin() const
Iterator pointing to the first item in the sketch.
Definition kll_sketch_impl.hpp:965
uint32_t get_num_retained() const
Returns the number of retained items (samples) in the sketch.
Definition kll_sketch_impl.hpp:250
static kll_sketch deserialize(std::istream &is, const SerDe &sd=SerDe(), const C &comparator=C(), const A &allocator=A())
This method deserializes a sketch from a given stream.
vector_double get_PMF(const T *split_points, uint32_t size, bool inclusive=true) const
Returns an approximation to the Probability Mass Function (PMF) of the input stream given a set of sp...
Definition kll_sketch_impl.hpp:289
void merge(FwdSk &&other)
Merges another sketch into this one.
Definition kll_sketch_impl.hpp:210
T get_max_item() const
Returns the max item of the stream.
Definition kll_sketch_impl.hpp:266
const_iterator end() const
Iterator pointing to the past-the-end item in the sketch.
Definition kll_sketch_impl.hpp:970
void update(FwdT &&item)
Updates this sketch with the given data item.
Definition kll_sketch_impl.hpp:181
void serialize(std::ostream &os, const SerDe &sd=SerDe()) const
This method serializes the sketch into a given stream in a binary form.
Definition kll_sketch_impl.hpp:368
string< A > to_string(bool print_levels=false, bool print_items=false) const
Prints a summary of the sketch.
Definition kll_sketch_impl.hpp:913
uint16_t get_k() const
Returns configured parameter k.
Definition kll_sketch_impl.hpp:240
bool is_empty() const
Returns true if this sketch is empty.
Definition kll_sketch_impl.hpp:235
double get_rank(const T &item, bool inclusive=true) const
Returns an approximation to the normalized rank of the given item from 0 to 1, inclusive.
Definition kll_sketch_impl.hpp:282
A get_allocator() const
Returns an instance of the allocator for this sketch.
Definition kll_sketch_impl.hpp:277
T get_min_item() const
Returns the min item of the stream.
Definition kll_sketch_impl.hpp:260
typename quantiles_sorted_view< T, C, A >::quantile_return_type quantile_return_type
Quantile return type.
Definition kll_sketch.hpp:183
quantile_return_type get_quantile(double rank, bool inclusive=true) const
Returns an item from the sketch that is the best approximation to an item from the original stream wi...
Definition kll_sketch_impl.hpp:303
size_t get_serialized_size_bytes(const SerDe &sd=SerDe()) const
Computes size needed to serialize the current state of the sketch.
Definition kll_sketch_impl.hpp:321
static size_t get_max_serialized_size_bytes(uint16_t k, uint64_t n)
Returns upper bound on the serialized size of a sketch given a parameter k and stream length.
Definition kll_sketch_impl.hpp:349
double get_normalized_rank_error(bool pmf) const
Gets the approximate rank error of this sketch normalized as a fraction between zero and one.
Definition kll_sketch_impl.hpp:314
bool is_estimation_mode() const
Returns true if this sketch is in estimation mode.
Definition kll_sketch_impl.hpp:255
uint64_t get_n() const
Returns the length of the input stream.
Definition kll_sketch_impl.hpp:245
Sorted view for quantiles sketches (REQ, KLL and Quantiles)
Definition quantiles_sorted_view.hpp:38
const uint16_t MAX_K
max value of parameter K
Definition kll_sketch.hpp:41
const uint16_t MIN_K
min value of parameter K
Definition kll_sketch.hpp:39
DataSketches namespace.
Definition binomial_bounds.hpp:38