20 #ifndef KLL_SKETCH_IMPL_HPP_
21 #define KLL_SKETCH_IMPL_HPP_
28 #include "conditional_forward.hpp"
29 #include "count_zeros.hpp"
30 #include "memory_operations.hpp"
31 #include "kll_helper.hpp"
35 template<
typename T,
typename C,
typename A>
37 comparator_(comparator),
38 allocator_(allocator),
40 m_(kll_constants::DEFAULT_M),
43 is_level_zero_sorted_(false),
45 levels_(2, 0, allocator),
56 levels_[0] = levels_[1] = k;
57 items_ = allocator_.allocate(items_size_);
60 template<
typename T,
typename C,
typename A>
62 comparator_(other.comparator_),
63 allocator_(other.allocator_),
67 num_levels_(other.num_levels_),
68 is_level_zero_sorted_(other.is_level_zero_sorted_),
70 levels_(other.levels_),
72 items_size_(other.items_size_),
73 min_item_(other.min_item_),
74 max_item_(other.max_item_),
77 items_ = allocator_.allocate(items_size_);
78 for (
auto i = levels_[0]; i < levels_[num_levels_]; ++i)
new (&items_[i]) T(other.items_[i]);
81 template<
typename T,
typename C,
typename A>
83 comparator_(std::move(other.comparator_)),
84 allocator_(std::move(other.allocator_)),
88 num_levels_(other.num_levels_),
89 is_level_zero_sorted_(other.is_level_zero_sorted_),
91 levels_(std::move(other.levels_)),
93 items_size_(other.items_size_),
94 min_item_(std::move(other.min_item_)),
95 max_item_(std::move(other.max_item_)),
98 other.items_ =
nullptr;
101 template<
typename T,
typename C,
typename A>
104 std::swap(comparator_, copy.comparator_);
105 std::swap(allocator_, copy.allocator_);
106 std::swap(k_, copy.k_);
107 std::swap(m_, copy.m_);
108 std::swap(min_k_, copy.min_k_);
109 std::swap(num_levels_, copy.num_levels_);
110 std::swap(is_level_zero_sorted_, copy.is_level_zero_sorted_);
111 std::swap(n_, copy.n_);
112 std::swap(levels_, copy.levels_);
113 std::swap(items_, copy.items_);
114 std::swap(items_size_, copy.items_size_);
115 std::swap(min_item_, copy.min_item_);
116 std::swap(max_item_, copy.max_item_);
121 template<
typename T,
typename C,
typename A>
123 std::swap(comparator_, other.comparator_);
124 std::swap(allocator_, other.allocator_);
125 std::swap(k_, other.k_);
126 std::swap(m_, other.m_);
127 std::swap(min_k_, other.min_k_);
128 std::swap(num_levels_, other.num_levels_);
129 std::swap(is_level_zero_sorted_, other.is_level_zero_sorted_);
130 std::swap(n_, other.n_);
131 std::swap(levels_, other.levels_);
132 std::swap(items_, other.items_);
133 std::swap(items_size_, other.items_size_);
134 std::swap(min_item_, other.min_item_);
135 std::swap(max_item_, other.max_item_);
140 template<
typename T,
typename C,
typename A>
142 if (items_ !=
nullptr) {
143 const uint32_t begin = levels_[0];
144 const uint32_t end = levels_[num_levels_];
145 for (uint32_t i = begin; i < end; i++) items_[i].~T();
146 allocator_.deallocate(items_, items_size_);
151 template<
typename T,
typename C,
typename A>
152 template<
typename TT,
typename CC,
typename AA>
153 kll_sketch<T, C, A>::kll_sketch(
const kll_sketch<TT, CC, AA>& other,
const C& comparator,
const A& allocator):
154 comparator_(comparator),
155 allocator_(allocator),
158 min_k_(other.min_k_),
159 num_levels_(other.num_levels_),
160 is_level_zero_sorted_(other.is_level_zero_sorted_),
162 levels_(other.levels_, allocator_),
164 items_size_(other.items_size_),
165 min_item_(other.min_item_),
166 max_item_(other.max_item_),
167 sorted_view_(nullptr)
170 std::is_constructible<T, TT>::value,
171 "Type converting constructor requires new type to be constructible from existing type"
173 items_ = allocator_.allocate(items_size_);
174 for (
auto i = levels_[0]; i < levels_[num_levels_]; ++i)
new (&items_[i]) T(other.items_[i]);
178 template<
typename T,
typename C,
typename A>
179 template<
typename FwdT>
181 if (!check_update_item(item)) {
return; }
182 update_min_max(
static_cast<const T&
>(item));
183 const uint32_t index = internal_update();
184 new (&items_[index]) T(std::forward<FwdT>(item));
188 template<
typename T,
typename C,
typename A>
191 min_item_.emplace(item);
192 max_item_.emplace(item);
194 if (comparator_(item, *min_item_)) *min_item_ = item;
195 if (comparator_(*max_item_, item)) *max_item_ = item;
199 template<
typename T,
typename C,
typename A>
200 uint32_t kll_sketch<T, C, A>::internal_update() {
201 if (levels_[0] == 0) compress_while_updating();
203 is_level_zero_sorted_ =
false;
207 template<
typename T,
typename C,
typename A>
208 template<
typename FwdSk>
210 if (other.is_empty())
return;
211 if (m_ != other.m_) {
212 throw std::invalid_argument(
"incompatible M: " + std::to_string(m_) +
" and " + std::to_string(other.m_));
215 min_item_.emplace(conditional_forward<FwdSk>(*other.min_item_));
216 max_item_.emplace(conditional_forward<FwdSk>(*other.max_item_));
218 if (comparator_(*other.min_item_, *min_item_)) *min_item_ = conditional_forward<FwdSk>(*other.min_item_);
219 if (comparator_(*max_item_, *other.max_item_)) *max_item_ = conditional_forward<FwdSk>(*other.max_item_);
221 const uint64_t final_n = n_ + other.n_;
222 for (uint32_t i = other.levels_[0]; i < other.levels_[1]; i++) {
223 const uint32_t index = internal_update();
224 new (&items_[index]) T(conditional_forward<FwdSk>(other.items_[i]));
226 if (other.num_levels_ >= 2) merge_higher_levels(other, final_n);
228 if (other.is_estimation_mode()) min_k_ = std::min(min_k_, other.min_k_);
229 assert_correct_total_weight();
233 template<
typename T,
typename C,
typename A>
238 template<
typename T,
typename C,
typename A>
243 template<
typename T,
typename C,
typename A>
248 template<
typename T,
typename C,
typename A>
250 return levels_[num_levels_] - levels_[0];
253 template<
typename T,
typename C,
typename A>
255 return num_levels_ > 1;
258 template<
typename T,
typename C,
typename A>
260 if (is_empty())
throw std::runtime_error(
"operation is undefined for an empty sketch");
264 template<
typename T,
typename C,
typename A>
266 if (is_empty())
throw std::runtime_error(
"operation is undefined for an empty sketch");
270 template<
typename T,
typename C,
typename A>
275 template<
typename T,
typename C,
typename A>
280 template<
typename T,
typename C,
typename A>
282 if (is_empty())
throw std::runtime_error(
"operation is undefined for an empty sketch");
284 return sorted_view_->get_rank(item, inclusive);
287 template<
typename T,
typename C,
typename A>
289 if (is_empty())
throw std::runtime_error(
"operation is undefined for an empty sketch");
291 return sorted_view_->get_PMF(split_points, size, inclusive);
294 template<
typename T,
typename C,
typename A>
296 if (is_empty())
throw std::runtime_error(
"operation is undefined for an empty sketch");
298 return sorted_view_->get_CDF(split_points, size, inclusive);
301 template<
typename T,
typename C,
typename A>
303 if (is_empty())
throw std::runtime_error(
"operation is undefined for an empty sketch");
304 if ((rank < 0.0) || (rank > 1.0)) {
305 throw std::invalid_argument(
"normalized rank cannot be less than zero or greater than 1.0");
309 return sorted_view_->get_quantile(rank, inclusive);
312 template<
typename T,
typename C,
typename A>
314 return get_normalized_rank_error(min_k_, pmf);
318 template<
typename T,
typename C,
typename A>
319 template<typename TT, typename SerDe, typename std::enable_if<std::is_arithmetic<TT>::value,
int>::type>
321 if (is_empty()) {
return EMPTY_SIZE_BYTES; }
322 if (num_levels_ == 1 && get_num_retained() == 1) {
323 return DATA_START_SINGLE_ITEM +
sizeof(TT);
326 return DATA_START + num_levels_ *
sizeof(uint32_t) + (get_num_retained() + 2) *
sizeof(TT);
330 template<
typename T,
typename C,
typename A>
331 template<typename TT, typename SerDe, typename std::enable_if<!std::is_arithmetic<TT>::value,
int>::type>
333 if (is_empty()) {
return EMPTY_SIZE_BYTES; }
334 if (num_levels_ == 1 && get_num_retained() == 1) {
335 return DATA_START_SINGLE_ITEM + sd.size_of_item(items_[levels_[0]]);
338 size_t size = DATA_START + num_levels_ *
sizeof(uint32_t);
339 size += sd.size_of_item(*min_item_);
340 size += sd.size_of_item(*max_item_);
341 for (
auto it: *
this) size += sd.size_of_item(it.first);
346 template<
typename T,
typename C,
typename A>
347 template<typename TT, typename std::enable_if<std::is_arithmetic<TT>::value,
int>::type>
349 const uint8_t num_levels = kll_helper::ub_on_num_levels(n);
350 const uint32_t max_num_retained = kll_helper::compute_total_capacity(k, kll_constants::DEFAULT_M, num_levels);
352 return DATA_START + num_levels *
sizeof(uint32_t) + (max_num_retained + 2) *
sizeof(TT);
356 template<
typename T,
typename C,
typename A>
357 template<typename TT, typename std::enable_if<!std::is_arithmetic<TT>::value,
int>::type>
359 const uint8_t num_levels = kll_helper::ub_on_num_levels(n);
360 const uint32_t max_num_retained = kll_helper::compute_total_capacity(k, kll_constants::DEFAULT_M, num_levels);
362 return DATA_START + num_levels *
sizeof(uint32_t) + (max_num_retained + 2) * max_item_size_bytes;
365 template<
typename T,
typename C,
typename A>
366 template<
typename SerDe>
368 const bool is_single_item = n_ == 1;
369 const uint8_t preamble_ints(is_empty() || is_single_item ? PREAMBLE_INTS_SHORT : PREAMBLE_INTS_FULL);
370 write(os, preamble_ints);
371 const uint8_t serial_version(is_single_item ? SERIAL_VERSION_2 : SERIAL_VERSION_1);
372 write(os, serial_version);
373 const uint8_t family(FAMILY);
375 const uint8_t flags_byte(
376 (is_empty() ? 1 << flags::IS_EMPTY : 0)
377 | (is_level_zero_sorted_ ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0)
378 | (is_single_item ? 1 << flags::IS_SINGLE_ITEM : 0)
380 write(os, flags_byte);
383 const uint8_t unused = 0;
385 if (is_empty())
return;
386 if (!is_single_item) {
389 write(os, num_levels_);
391 write(os, levels_.data(),
sizeof(levels_[0]) * num_levels_);
392 sd.serialize(os, &*min_item_, 1);
393 sd.serialize(os, &*max_item_, 1);
395 sd.serialize(os, &items_[levels_[0]], get_num_retained());
398 template<
typename T,
typename C,
typename A>
399 template<
typename SerDe>
401 const bool is_single_item = n_ == 1;
402 const size_t size = header_size_bytes + get_serialized_size_bytes(sd);
403 vector_bytes bytes(size, 0, allocator_);
404 uint8_t* ptr = bytes.data() + header_size_bytes;
405 const uint8_t* end_ptr = ptr + size;
406 const uint8_t preamble_ints(is_empty() || is_single_item ? PREAMBLE_INTS_SHORT : PREAMBLE_INTS_FULL);
407 ptr += copy_to_mem(preamble_ints, ptr);
408 const uint8_t serial_version(is_single_item ? SERIAL_VERSION_2 : SERIAL_VERSION_1);
409 ptr += copy_to_mem(serial_version, ptr);
410 const uint8_t family(FAMILY);
411 ptr += copy_to_mem(family, ptr);
412 const uint8_t flags_byte(
413 (is_empty() ? 1 << flags::IS_EMPTY : 0)
414 | (is_level_zero_sorted_ ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0)
415 | (is_single_item ? 1 << flags::IS_SINGLE_ITEM : 0)
417 ptr += copy_to_mem(flags_byte, ptr);
418 ptr += copy_to_mem(k_, ptr);
419 ptr += copy_to_mem(m_, ptr);
420 ptr +=
sizeof(uint8_t);
422 if (!is_single_item) {
423 ptr += copy_to_mem(n_, ptr);
424 ptr += copy_to_mem(min_k_, ptr);
425 ptr += copy_to_mem(num_levels_, ptr);
426 ptr +=
sizeof(uint8_t);
427 ptr += copy_to_mem(levels_.data(), ptr,
sizeof(levels_[0]) * num_levels_);
428 ptr += sd.serialize(ptr, end_ptr - ptr, &*min_item_, 1);
429 ptr += sd.serialize(ptr, end_ptr - ptr, &*max_item_, 1);
431 const size_t bytes_remaining = end_ptr - ptr;
432 ptr += sd.serialize(ptr, bytes_remaining, &items_[levels_[0]], get_num_retained());
434 const size_t delta = ptr - bytes.data();
435 if (delta != size)
throw std::logic_error(
"serialized size mismatch: " + std::to_string(delta)
436 +
" != " + std::to_string(size));
440 template<
typename T,
typename C,
typename A>
441 template<
typename SerDe>
443 const C& comparator,
const A& allocator) {
444 const auto preamble_ints = read<uint8_t>(is);
445 const auto serial_version = read<uint8_t>(is);
446 const auto family_id = read<uint8_t>(is);
447 const auto flags_byte = read<uint8_t>(is);
448 const auto k = read<uint16_t>(is);
449 const auto m = read<uint8_t>(is);
453 check_preamble_ints(preamble_ints, flags_byte);
454 check_serial_version(serial_version);
455 check_family_id(family_id);
457 if (!is.good())
throw std::runtime_error(
"error reading from std::istream");
458 const bool is_empty(flags_byte & (1 << flags::IS_EMPTY));
459 if (is_empty)
return kll_sketch(k, comparator, allocator);
464 const bool is_single_item(flags_byte & (1 << flags::IS_SINGLE_ITEM));
465 if (is_single_item) {
470 n = read<uint64_t>(is);
471 min_k = read<uint16_t>(is);
472 num_levels = read<uint8_t>(is);
475 vector_u32 levels(num_levels + 1, 0, allocator);
476 const uint32_t capacity(kll_helper::compute_total_capacity(k, m, num_levels));
477 if (is_single_item) {
478 levels[0] = capacity - 1;
481 read(is, levels.data(),
sizeof(levels[0]) * num_levels);
483 levels[num_levels] = capacity;
485 optional<T> min_item;
486 optional<T> max_item;
487 if (!is_single_item) {
488 sd.deserialize(is, &*tmp, 1);
490 min_item.emplace(*tmp);
492 sd.deserialize(is, &*tmp, 1);
494 max_item.emplace(*tmp);
498 auto items_buffer_deleter = [capacity, &alloc](T* ptr) { alloc.deallocate(ptr, capacity); };
499 std::unique_ptr<T, decltype(items_buffer_deleter)> items_buffer(alloc.allocate(capacity), items_buffer_deleter);
500 const auto num_items = levels[num_levels] - levels[0];
501 sd.deserialize(is, &items_buffer.get()[levels[0]], num_items);
503 std::unique_ptr<T, items_deleter> items(items_buffer.release(), items_deleter(levels[0], capacity, allocator));
504 const bool is_level_zero_sorted = (flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED)) > 0;
505 if (is_single_item) {
506 min_item.emplace(items.get()[levels[0]]);
507 max_item.emplace(items.get()[levels[0]]);
510 throw std::runtime_error(
"error reading from std::istream");
511 return kll_sketch(k, min_k, n, num_levels, std::move(levels), std::move(items), capacity,
512 std::move(min_item), std::move(max_item), is_level_zero_sorted, comparator);
515 template<
typename T,
typename C,
typename A>
516 template<
typename SerDe>
518 const C& comparator,
const A& allocator) {
519 ensure_minimum_memory(size, 8);
520 const char* ptr =
static_cast<const char*
>(bytes);
521 uint8_t preamble_ints;
522 ptr += copy_from_mem(ptr, preamble_ints);
523 uint8_t serial_version;
524 ptr += copy_from_mem(ptr, serial_version);
526 ptr += copy_from_mem(ptr, family_id);
528 ptr += copy_from_mem(ptr, flags_byte);
530 ptr += copy_from_mem(ptr, k);
532 ptr += copy_from_mem(ptr, m);
533 ptr +=
sizeof(uint8_t);
536 check_preamble_ints(preamble_ints, flags_byte);
537 check_serial_version(serial_version);
538 check_family_id(family_id);
539 ensure_minimum_memory(size, preamble_ints *
sizeof(uint32_t));
541 const bool is_empty(flags_byte & (1 << flags::IS_EMPTY));
542 if (is_empty)
return kll_sketch(k, comparator, allocator);
547 const bool is_single_item(flags_byte & (1 << flags::IS_SINGLE_ITEM));
548 const char* end_ptr =
static_cast<const char*
>(bytes) + size;
549 if (is_single_item) {
554 ptr += copy_from_mem(ptr, n);
555 ptr += copy_from_mem(ptr, min_k);
556 ptr += copy_from_mem(ptr, num_levels);
557 ptr +=
sizeof(uint8_t);
559 vector_u32 levels(num_levels + 1, 0, allocator);
560 const uint32_t capacity(kll_helper::compute_total_capacity(k, m, num_levels));
561 if (is_single_item) {
562 levels[0] = capacity - 1;
565 ptr += copy_from_mem(ptr, levels.data(),
sizeof(levels[0]) * num_levels);
567 levels[num_levels] = capacity;
569 optional<T> min_item;
570 optional<T> max_item;
571 if (!is_single_item) {
572 ptr += sd.deserialize(ptr, end_ptr - ptr, &*tmp, 1);
574 min_item.emplace(*tmp);
576 ptr += sd.deserialize(ptr, end_ptr - ptr, &*tmp, 1);
578 max_item.emplace(*tmp);
582 auto items_buffer_deleter = [capacity, &alloc](T* ptr) { alloc.deallocate(ptr, capacity); };
583 std::unique_ptr<T, decltype(items_buffer_deleter)> items_buffer(alloc.allocate(capacity), items_buffer_deleter);
584 const auto num_items = levels[num_levels] - levels[0];
585 ptr += sd.deserialize(ptr, end_ptr - ptr, &items_buffer.get()[levels[0]], num_items);
587 std::unique_ptr<T, items_deleter> items(items_buffer.release(), items_deleter(levels[0], capacity, allocator));
588 const size_t delta = ptr -
static_cast<const char*
>(bytes);
589 if (delta != size)
throw std::logic_error(
"deserialized size mismatch: " + std::to_string(delta) +
" != " + std::to_string(size));
590 const bool is_level_zero_sorted = (flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED)) > 0;
591 if (is_single_item) {
592 min_item.emplace(items.get()[levels[0]]);
593 max_item.emplace(items.get()[levels[0]]);
595 return kll_sketch(k, min_k, n, num_levels, std::move(levels), std::move(items), capacity,
596 std::move(min_item), std::move(max_item), is_level_zero_sorted, comparator);
606 template<
typename T,
typename C,
typename A>
609 ? 2.446 / pow(k, 0.9433)
610 : 2.296 / pow(k, 0.9723);
614 template<
typename T,
typename C,
typename A>
615 kll_sketch<T, C, A>::kll_sketch(uint16_t k, uint16_t min_k, uint64_t n, uint8_t num_levels, vector_u32&& levels,
616 std::unique_ptr<T, items_deleter> items, uint32_t items_size, optional<T>&& min_item,
617 optional<T>&& max_item,
bool is_level_zero_sorted,
const C& comparator):
618 comparator_(comparator),
619 allocator_(levels.get_allocator()),
621 m_(kll_constants::DEFAULT_M),
623 num_levels_(num_levels),
624 is_level_zero_sorted_(is_level_zero_sorted),
626 levels_(std::move(levels)),
627 items_(items.release()),
628 items_size_(items_size),
629 min_item_(std::move(min_item)),
630 max_item_(std::move(max_item)),
631 sorted_view_(nullptr)
636 template<
typename T,
typename C,
typename A>
637 void kll_sketch<T, C, A>::compress_while_updating(
void) {
638 const uint8_t level = find_level_to_compact();
643 if (level == (num_levels_ - 1)) {
644 add_empty_top_level_to_completely_full_sketch();
647 const uint32_t raw_beg = levels_[level];
648 const uint32_t raw_lim = levels_[level + 1];
650 const uint32_t pop_above = levels_[level + 2] - raw_lim;
651 const uint32_t raw_pop = raw_lim - raw_beg;
652 const bool odd_pop = kll_helper::is_odd(raw_pop);
653 const uint32_t adj_beg = odd_pop ? raw_beg + 1 : raw_beg;
654 const uint32_t adj_pop = odd_pop ? raw_pop - 1 : raw_pop;
655 const uint32_t half_adj_pop = adj_pop / 2;
656 const uint32_t destroy_beg = levels_[0];
660 if ((level == 0) && !is_level_zero_sorted_) {
661 std::sort(items_ + adj_beg, items_ + adj_beg + adj_pop, comparator_);
663 if (pop_above == 0) {
664 kll_helper::randomly_halve_up(items_, adj_beg, adj_pop);
666 kll_helper::randomly_halve_down(items_, adj_beg, adj_pop);
667 kll_helper::merge_sorted_arrays<T, C>(items_, adj_beg, half_adj_pop, raw_lim, pop_above, adj_beg + half_adj_pop);
669 levels_[level + 1] -= half_adj_pop;
671 levels_[level] = levels_[level + 1] - 1;
672 if (levels_[level] != raw_beg) items_[levels_[level]] = std::move(items_[raw_beg]);
674 levels_[level] = levels_[level + 1];
678 if (levels_[level] != (raw_beg + half_adj_pop))
throw std::logic_error(
"compaction error");
683 const uint32_t amount = raw_beg - levels_[0];
684 std::move_backward(items_ + levels_[0], items_ + levels_[0] + amount, items_ + levels_[0] + half_adj_pop + amount);
685 for (uint8_t lvl = 0; lvl < level; lvl++) levels_[lvl] += half_adj_pop;
687 for (uint32_t i = 0; i < half_adj_pop; i++) items_[i + destroy_beg].~T();
690 template<
typename T,
typename C,
typename A>
691 uint8_t kll_sketch<T, C, A>::find_level_to_compact()
const {
694 if (level >= num_levels_)
throw std::logic_error(
"capacity calculation error");
695 const uint32_t pop = levels_[level + 1] - levels_[level];
696 const uint32_t cap = kll_helper::level_capacity(k_, num_levels_, level, m_);
704 template<
typename T,
typename C,
typename A>
705 void kll_sketch<T, C, A>::add_empty_top_level_to_completely_full_sketch() {
706 const uint32_t cur_total_cap = levels_[num_levels_];
709 if (levels_[0] != 0)
throw std::logic_error(
"full sketch expected");
710 if (items_size_ != cur_total_cap)
throw std::logic_error(
"current capacity mismatch");
713 const uint8_t new_levels_size = num_levels_ + 2;
714 if (levels_.size() < new_levels_size) {
715 levels_.resize(new_levels_size);
718 const uint32_t delta_cap = kll_helper::level_capacity(k_, num_levels_ + 1, 0, m_);
719 const uint32_t new_total_cap = cur_total_cap + delta_cap;
722 T* new_buf = allocator_.allocate(new_total_cap);
723 kll_helper::move_construct<T>(items_, 0, cur_total_cap, new_buf, delta_cap,
true);
724 allocator_.deallocate(items_, items_size_);
726 items_size_ = new_total_cap;
729 for (uint8_t i = 0; i <= num_levels_; i++) {
730 levels_[i] += delta_cap;
733 if (levels_[num_levels_] != new_total_cap)
throw std::logic_error(
"new capacity mismatch");
736 levels_[num_levels_] = new_total_cap;
739 template<
typename T,
typename C,
typename A>
740 void kll_sketch<T, C, A>::sort_level_zero() {
741 if (!is_level_zero_sorted_) {
742 std::sort(items_ + levels_[0], items_ + levels_[1], comparator_);
743 is_level_zero_sorted_ =
true;
747 template<
typename T,
typename C,
typename A>
748 void kll_sketch<T, C, A>::check_sorting()
const {
750 for (uint8_t level = 1; level < num_levels_; ++level) {
751 const auto from = items_ + levels_[level];
752 const auto to = items_ + levels_[level + 1];
753 if (!std::is_sorted(from, to, comparator_)) {
754 throw std::logic_error(
"levels must be sorted");
759 template<
typename T,
typename C,
typename A>
761 const_cast<kll_sketch*
>(
this)->sort_level_zero();
763 for (uint8_t level = 0; level < num_levels_; ++level) {
764 const auto from = items_ + levels_[level];
765 const auto to = items_ + levels_[level + 1];
766 view.add(from, to, 1ULL << level);
768 view.convert_to_cummulative();
772 template<
typename T,
typename C,
typename A>
775 const uint32_t tmp_num_items = get_num_retained() + other.get_num_retained_above_level_zero();
777 auto tmp_items_deleter = [tmp_num_items, &alloc](T* ptr) { alloc.deallocate(ptr, tmp_num_items); };
778 const std::unique_ptr<T, decltype(tmp_items_deleter)> workbuf(allocator_.allocate(tmp_num_items), tmp_items_deleter);
779 const uint8_t ub = kll_helper::ub_on_num_levels(final_n);
780 const size_t work_levels_size = ub + 2;
781 vector_u32 worklevels(work_levels_size, 0, allocator_);
782 vector_u32 outlevels(work_levels_size, 0, allocator_);
784 const uint8_t provisional_num_levels = std::max(num_levels_, other.num_levels_);
786 populate_work_arrays(std::forward<O>(other), workbuf.get(), worklevels.data(), provisional_num_levels);
788 const kll_helper::compress_result result = kll_helper::general_compress<T, C>(k_, m_, provisional_num_levels, workbuf.get(),
789 worklevels.data(), outlevels.data(), is_level_zero_sorted_);
792 if (result.final_num_levels > ub)
throw std::logic_error(
"merge error");
795 if (result.final_capacity != items_size_) {
796 allocator_.deallocate(items_, items_size_);
797 items_size_ = result.final_capacity;
798 items_ = allocator_.allocate(items_size_);
800 const uint32_t free_space_at_bottom = result.final_capacity - result.final_num_items;
801 kll_helper::move_construct<T>(workbuf.get(), outlevels[0], outlevels[0] + result.final_num_items, items_, free_space_at_bottom,
true);
803 const size_t new_levels_size = result.final_num_levels + 1;
804 if (levels_.size() < new_levels_size) {
805 levels_.resize(new_levels_size);
807 const uint32_t offset = free_space_at_bottom - outlevels[0];
808 for (uint8_t lvl = 0; lvl < levels_.size(); lvl++) {
809 levels_[lvl] = outlevels[lvl] + offset;
811 num_levels_ = result.final_num_levels;
815 template<
typename T,
typename C,
typename A>
816 template<
typename FwdSk>
817 void kll_sketch<T, C, A>::populate_work_arrays(FwdSk&& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels) {
821 kll_helper::move_construct<T>(items_, levels_[0], levels_[1], workbuf, 0,
true);
822 worklevels[1] = safe_level_size(0);
824 for (uint8_t lvl = 1; lvl < provisional_num_levels; lvl++) {
825 const uint32_t self_pop = safe_level_size(lvl);
826 const uint32_t other_pop = other.safe_level_size(lvl);
827 worklevels[lvl + 1] = worklevels[lvl] + self_pop + other_pop;
829 if ((self_pop > 0) && (other_pop == 0)) {
830 kll_helper::move_construct<T>(items_, levels_[lvl], levels_[lvl] + self_pop, workbuf, worklevels[lvl],
true);
831 }
else if ((self_pop == 0) && (other_pop > 0)) {
832 for (
auto i = other.levels_[lvl], j = worklevels[lvl]; i < other.levels_[lvl] + other_pop; ++i, ++j) {
833 new (&workbuf[j]) T(conditional_forward<FwdSk>(other.items_[i]));
835 }
else if ((self_pop > 0) && (other_pop > 0)) {
836 kll_helper::merge_sorted_arrays<T, C>(items_, levels_[lvl], self_pop, other.items_, other.levels_[lvl], other_pop, workbuf, worklevels[lvl]);
841 template<
typename T,
typename C,
typename A>
842 void kll_sketch<T, C, A>::assert_correct_total_weight()
const {
843 const uint64_t total(kll_helper::sum_the_sample_weights(num_levels_, levels_.data()));
845 throw std::logic_error(
"Total weight does not match N");
849 template<
typename T,
typename C,
typename A>
850 uint32_t kll_sketch<T, C, A>::safe_level_size(uint8_t level)
const {
851 if (level >= num_levels_)
return 0;
852 return levels_[level + 1] - levels_[level];
855 template<
typename T,
typename C,
typename A>
856 uint32_t kll_sketch<T, C, A>::get_num_retained_above_level_zero()
const {
857 if (num_levels_ == 1)
return 0;
858 return levels_[num_levels_] - levels_[1];
861 template<
typename T,
typename C,
typename A>
862 void kll_sketch<T, C, A>::check_m(uint8_t m) {
863 if (m != kll_constants::DEFAULT_M) {
864 throw std::invalid_argument(
"Possible corruption: M must be " + std::to_string(kll_constants::DEFAULT_M)
865 +
": " + std::to_string(m));
869 template<
typename T,
typename C,
typename A>
870 void kll_sketch<T, C, A>::check_preamble_ints(uint8_t preamble_ints, uint8_t flags_byte) {
871 const bool is_empty(flags_byte & (1 << flags::IS_EMPTY));
872 const bool is_single_item(flags_byte & (1 << flags::IS_SINGLE_ITEM));
873 if (is_empty || is_single_item) {
874 if (preamble_ints != PREAMBLE_INTS_SHORT) {
875 throw std::invalid_argument(
"Possible corruption: preamble ints must be "
876 + std::to_string(PREAMBLE_INTS_SHORT) +
" for an empty or single item sketch: " + std::to_string(preamble_ints));
879 if (preamble_ints != PREAMBLE_INTS_FULL) {
880 throw std::invalid_argument(
"Possible corruption: preamble ints must be "
881 + std::to_string(PREAMBLE_INTS_FULL) +
" for a sketch with more than one item: " + std::to_string(preamble_ints));
886 template<
typename T,
typename C,
typename A>
887 void kll_sketch<T, C, A>::check_serial_version(uint8_t serial_version) {
888 if (serial_version != SERIAL_VERSION_1 && serial_version != SERIAL_VERSION_2) {
889 throw std::invalid_argument(
"Possible corruption: serial version mismatch: expected "
890 + std::to_string(SERIAL_VERSION_1) +
" or " + std::to_string(SERIAL_VERSION_2)
891 +
", got " + std::to_string(serial_version));
895 template<
typename T,
typename C,
typename A>
896 void kll_sketch<T, C, A>::check_family_id(uint8_t family_id) {
897 if (family_id != FAMILY) {
898 throw std::invalid_argument(
"Possible corruption: family mismatch: expected "
899 + std::to_string(FAMILY) +
", got " + std::to_string(family_id));
903 template <
typename T,
typename C,
typename A>
907 std::ostringstream os;
908 os <<
"### KLL sketch summary:" << std::endl;
909 os <<
" K : " << k_ << std::endl;
910 os <<
" min K : " << min_k_ << std::endl;
911 os <<
" M : " << (
unsigned int) m_ << std::endl;
912 os <<
" N : " << n_ << std::endl;
913 os <<
" Epsilon : " << std::setprecision(3) << get_normalized_rank_error(
false) * 100 <<
"%" << std::endl;
914 os <<
" Epsilon PMF : " << get_normalized_rank_error(
true) * 100 <<
"%" << std::endl;
915 os <<
" Empty : " << (is_empty() ?
"true" :
"false") << std::endl;
916 os <<
" Estimation mode: " << (is_estimation_mode() ?
"true" :
"false") << std::endl;
917 os <<
" Levels : " << (
unsigned int) num_levels_ << std::endl;
918 os <<
" Sorted : " << (is_level_zero_sorted_ ?
"true" :
"false") << std::endl;
919 os <<
" Capacity items : " << items_size_ << std::endl;
920 os <<
" Retained items : " << get_num_retained() << std::endl;
922 os <<
" Min item : " << *min_item_ << std::endl;
923 os <<
" Max item : " << *max_item_ << std::endl;
925 os <<
"### End sketch summary" << std::endl;
928 os <<
"### KLL sketch levels:" << std::endl;
929 os <<
" index: nominal capacity, actual size" << std::endl;
930 for (uint8_t i = 0; i < num_levels_; i++) {
931 os <<
" " << (
unsigned int) i <<
": " << kll_helper::level_capacity(k_, num_levels_, i, m_) <<
", " << safe_level_size(i) << std::endl;
933 os <<
"### End sketch levels" << std::endl;
937 os <<
"### KLL sketch data:" << std::endl;
939 while (level < num_levels_) {
940 const uint32_t from_index = levels_[level];
941 const uint32_t to_index = levels_[level + 1];
942 if (from_index < to_index) {
943 os <<
" level " << (
unsigned int) level <<
":" << std::endl;
945 for (uint32_t i = from_index; i < to_index; i++) {
946 os <<
" " << items_[i] << std::endl;
950 os <<
"### End sketch data" << std::endl;
952 return string<A>(os.str().c_str(), allocator_);
955 template <
typename T,
typename C,
typename A>
960 template <
typename T,
typename C,
typename A>
965 template<
typename T,
typename C,
typename A>
968 items_deleter(uint32_t start, uint32_t num,
const A& allocator):
969 allocator_(allocator), start_(start), num_(num) {}
970 void operator() (T* ptr) {
971 if (ptr !=
nullptr) {
972 for (uint32_t i = start_; i < num_; ++i) ptr[i].~T();
973 allocator_.deallocate(ptr, num_);
982 template<
typename T,
typename C,
typename A>
983 void kll_sketch<T, C, A>::setup_sorted_view()
const {
984 if (sorted_view_ ==
nullptr) {
985 using AllocSortedView =
typename std::allocator_traits<A>::template rebind_alloc<quantiles_sorted_view<T, C, A>>;
986 sorted_view_ =
new (AllocSortedView(allocator_).allocate(1)) quantiles_sorted_view<T, C, A>(get_sorted_view());
990 template<
typename T,
typename C,
typename A>
991 void kll_sketch<T, C, A>::reset_sorted_view() {
992 if (sorted_view_ !=
nullptr) {
993 sorted_view_->~quantiles_sorted_view();
994 using AllocSortedView =
typename std::allocator_traits<A>::template rebind_alloc<quantiles_sorted_view<T, C, A>>;
995 AllocSortedView(allocator_).deallocate(sorted_view_, 1);
996 sorted_view_ =
nullptr;
1002 template<
typename T,
typename C,
typename A>
1003 kll_sketch<T, C, A>::const_iterator::const_iterator(
const T* items,
const uint32_t* levels,
const uint8_t num_levels):
1004 items(items), levels(levels), num_levels(num_levels), index(items == nullptr ? levels[num_levels] : levels[0]), level(items == nullptr ? num_levels : 0), weight(1)
1007 template<
typename T,
typename C,
typename A>
1008 typename kll_sketch<T, C, A>::const_iterator& kll_sketch<T, C, A>::const_iterator::operator++() {
1010 if (index == levels[level + 1]) {
1014 }
while (level < num_levels && levels[level] == levels[level + 1]);
1019 template<
typename T,
typename C,
typename A>
1020 typename kll_sketch<T, C, A>::const_iterator& kll_sketch<T, C, A>::const_iterator::operator++(
int) {
1021 const_iterator tmp(*
this);
1026 template<
typename T,
typename C,
typename A>
1027 bool kll_sketch<T, C, A>::const_iterator::operator==(
const const_iterator& other)
const {
1028 return index == other.index;
1031 template<
typename T,
typename C,
typename A>
1032 bool kll_sketch<T, C, A>::const_iterator::operator!=(
const const_iterator& other)
const {
1033 return !operator==(other);
1036 template<
typename T,
typename C,
typename A>
1037 auto kll_sketch<T, C, A>::const_iterator::operator*() const -> reference {
1038 return value_type(items[index], weight);
1041 template<
typename T,
typename C,
typename A>
1042 auto kll_sketch<T, C, A>::const_iterator::operator->() const -> pointer {
Implementation of a very compact quantiles sketch with lazy compaction scheme and nearly optimal accu...
Definition: kll_sketch.hpp:166
C get_comparator() const
Returns an instance of the comparator for this sketch.
Definition: kll_sketch_impl.hpp:271
quantiles_sorted_view< T, C, A > get_sorted_view() const
Gets the sorted view of this sketch.
Definition: kll_sketch_impl.hpp:760
kll_sketch & operator=(const kll_sketch &other)
Copy assignment.
Definition: kll_sketch_impl.hpp:102
vector_double get_CDF(const T *split_points, uint32_t size, bool inclusive=true) const
Returns an approximation to the Cumulative Distribution Function (CDF), which is the cumulative analo...
Definition: kll_sketch_impl.hpp:295
const_iterator begin() const
Iterator pointing to the first item in the sketch.
Definition: kll_sketch_impl.hpp:956
uint32_t get_num_retained() const
Returns the number of retained items (samples) in the sketch.
Definition: kll_sketch_impl.hpp:249
static kll_sketch deserialize(std::istream &is, const SerDe &sd=SerDe(), const C &comparator=C(), const A &allocator=A())
This method deserializes a sketch from a given stream.
vector_double get_PMF(const T *split_points, uint32_t size, bool inclusive=true) const
Returns an approximation to the Probability Mass Function (PMF) of the input stream given a set of sp...
Definition: kll_sketch_impl.hpp:288
void merge(FwdSk &&other)
Merges another sketch into this one.
Definition: kll_sketch_impl.hpp:209
T get_max_item() const
Returns the max item of the stream.
Definition: kll_sketch_impl.hpp:265
const_iterator end() const
Iterator pointing to the past-the-end item in the sketch.
Definition: kll_sketch_impl.hpp:961
void update(FwdT &&item)
Updates this sketch with the given data item.
Definition: kll_sketch_impl.hpp:180
void serialize(std::ostream &os, const SerDe &sd=SerDe()) const
This method serializes the sketch into a given stream in a binary form.
Definition: kll_sketch_impl.hpp:367
string< A > to_string(bool print_levels=false, bool print_items=false) const
Prints a summary of the sketch.
Definition: kll_sketch_impl.hpp:904
uint16_t get_k() const
Returns configured parameter k.
Definition: kll_sketch_impl.hpp:239
bool is_empty() const
Returns true if this sketch is empty.
Definition: kll_sketch_impl.hpp:234
double get_rank(const T &item, bool inclusive=true) const
Returns an approximation to the normalized rank of the given item from 0 to 1, inclusive.
Definition: kll_sketch_impl.hpp:281
A get_allocator() const
Returns an instance of the allocator for this sketch.
Definition: kll_sketch_impl.hpp:276
T get_min_item() const
Returns the min item of the stream.
Definition: kll_sketch_impl.hpp:259
typename quantiles_sorted_view< T, C, A >::quantile_return_type quantile_return_type
Quantile return type.
Definition: kll_sketch.hpp:178
quantile_return_type get_quantile(double rank, bool inclusive=true) const
Returns an item from the sketch that is the best approximation to an item from the original stream wi...
Definition: kll_sketch_impl.hpp:302
size_t get_serialized_size_bytes(const SerDe &sd=SerDe()) const
Computes size needed to serialize the current state of the sketch.
Definition: kll_sketch_impl.hpp:320
static size_t get_max_serialized_size_bytes(uint16_t k, uint64_t n)
Returns upper bound on the serialized size of a sketch given a parameter k and stream length.
Definition: kll_sketch_impl.hpp:348
double get_normalized_rank_error(bool pmf) const
Gets the approximate rank error of this sketch normalized as a fraction between zero and one.
Definition: kll_sketch_impl.hpp:313
bool is_estimation_mode() const
Returns true if this sketch is in estimation mode.
Definition: kll_sketch_impl.hpp:254
uint64_t get_n() const
Returns the length of the input stream.
Definition: kll_sketch_impl.hpp:244
const uint16_t MAX_K
max value of parameter K
Definition: kll_sketch.hpp:41
const uint16_t MIN_K
min value of parameter K
Definition: kll_sketch.hpp:39
DataSketches namespace.
Definition: binomial_bounds.hpp:38