datasketches-cpp
Loading...
Searching...
No Matches
req_sketch_impl.hpp
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#ifndef REQ_SKETCH_IMPL_HPP_
21#define REQ_SKETCH_IMPL_HPP_
22
23#include <sstream>
24#include <stdexcept>
25#include <type_traits>
26
27namespace datasketches {
28
29template<typename T, typename C, typename A>
30req_sketch<T, C, A>::req_sketch(uint16_t k, bool hra, const C& comparator, const A& allocator):
31comparator_(comparator),
32allocator_(allocator),
33k_(std::max<uint8_t>(static_cast<int>(k) & -2, static_cast<int>(req_constants::MIN_K))), //rounds down one if odd
34hra_(hra),
35max_nom_size_(0),
36num_retained_(0),
37n_(0),
38compactors_(allocator),
39min_item_(),
40max_item_(),
41sorted_view_(nullptr)
42{
43 grow();
44}
45
46template<typename T, typename C, typename A>
47req_sketch<T, C, A>::~req_sketch() {
48 reset_sorted_view();
49}
50
51template<typename T, typename C, typename A>
53comparator_(other.comparator_),
54allocator_(other.allocator_),
55k_(other.k_),
56hra_(other.hra_),
57max_nom_size_(other.max_nom_size_),
58num_retained_(other.num_retained_),
59n_(other.n_),
60compactors_(other.compactors_),
61min_item_(other.min_item_),
62max_item_(other.max_item_),
63sorted_view_(nullptr)
64{}
65
66template<typename T, typename C, typename A>
68comparator_(std::move(other.comparator_)),
69allocator_(std::move(other.allocator_)),
70k_(other.k_),
71hra_(other.hra_),
72max_nom_size_(other.max_nom_size_),
73num_retained_(other.num_retained_),
74n_(other.n_),
75compactors_(std::move(other.compactors_)),
76min_item_(std::move(other.min_item_)),
77max_item_(std::move(other.max_item_)),
78sorted_view_(nullptr)
79{}
80
81template<typename T, typename C, typename A>
83 req_sketch copy(other);
84 std::swap(comparator_, copy.comparator_);
85 std::swap(allocator_, copy.allocator_);
86 std::swap(k_, copy.k_);
87 std::swap(hra_, copy.hra_);
88 std::swap(max_nom_size_, copy.max_nom_size_);
89 std::swap(num_retained_, copy.num_retained_);
90 std::swap(n_, copy.n_);
91 std::swap(compactors_, copy.compactors_);
92 std::swap(min_item_, copy.min_item_);
93 std::swap(max_item_, copy.max_item_);
94 reset_sorted_view();
95 return *this;
96}
97
98template<typename T, typename C, typename A>
100 std::swap(comparator_, other.comparator_);
101 std::swap(allocator_, other.allocator_);
102 std::swap(k_, other.k_);
103 std::swap(hra_, other.hra_);
104 std::swap(max_nom_size_, other.max_nom_size_);
105 std::swap(num_retained_, other.num_retained_);
106 std::swap(n_, other.n_);
107 std::swap(compactors_, other.compactors_);
108 std::swap(min_item_, other.min_item_);
109 std::swap(max_item_, other.max_item_);
110 reset_sorted_view();
111 return *this;
112}
113
114template<typename T, typename C, typename A>
115template<typename TT, typename CC, typename AA>
116req_sketch<T, C, A>::req_sketch(const req_sketch<TT, CC, AA>& other, const C& comparator, const A& allocator):
117comparator_(comparator),
118allocator_(allocator),
119k_(other.k_),
120hra_(other.hra_),
121max_nom_size_(other.max_nom_size_),
122num_retained_(other.num_retained_),
123n_(other.n_),
124compactors_(allocator),
125min_item_(other.min_item_),
126max_item_(other.max_item_),
127sorted_view_(nullptr)
128{
129 static_assert(
130 std::is_constructible<T, TT>::value,
131 "Type converting constructor requires new type to be constructible from existing type"
132 );
133 compactors_.reserve(other.compactors_.size());
134 for (const auto& compactor: other.compactors_) {
135 compactors_.push_back(req_compactor<T, C, A>(compactor, comparator_, allocator_));
136 }
137}
138
139template<typename T, typename C, typename A>
141 return k_;
142}
143
144template<typename T, typename C, typename A>
146 return hra_;
147}
148
149template<typename T, typename C, typename A>
151 return n_ == 0;
152}
153
154template<typename T, typename C, typename A>
156 return n_;
157}
158
159template<typename T, typename C, typename A>
161 return num_retained_;
162}
163
164template<typename T, typename C, typename A>
166 return compactors_.size() > 1;
167}
168
169template<typename T, typename C, typename A>
170template<typename FwdT>
172 if (!check_update_item(item)) { return; }
173 if (is_empty()) {
174 min_item_.emplace(item);
175 max_item_.emplace(item);
176 } else {
177 if (comparator_(item, *min_item_)) *min_item_ = item;
178 if (comparator_(*max_item_, item)) *max_item_ = item;
179 }
180 compactors_[0].append(std::forward<FwdT>(item));
181 ++num_retained_;
182 ++n_;
183 if (num_retained_ == max_nom_size_) compress();
184 reset_sorted_view();
185}
186
187template<typename T, typename C, typename A>
188template<typename FwdSk>
189void req_sketch<T, C, A>::merge(FwdSk&& other) {
190 if (is_HRA() != other.is_HRA()) throw std::invalid_argument("merging HRA and LRA is not valid");
191 if (other.is_empty()) return;
192 if (is_empty()) {
193 min_item_.emplace(conditional_forward<FwdSk>(*other.min_item_));
194 max_item_.emplace(conditional_forward<FwdSk>(*other.max_item_));
195 } else {
196 if (comparator_(*other.min_item_, *min_item_)) *min_item_ = conditional_forward<FwdSk>(*other.min_item_);
197 if (comparator_(*max_item_, *other.max_item_)) *max_item_ = conditional_forward<FwdSk>(*other.max_item_);
198 }
199 // grow until this has at least as many compactors as other
200 while (get_num_levels() < other.get_num_levels()) grow();
201 // merge the items in all height compactors
202 for (size_t i = 0; i < other.get_num_levels(); ++i) {
203 compactors_[i].merge(conditional_forward<FwdSk>(other.compactors_[i]));
204 }
205 n_ += other.n_;
206 update_max_nom_size();
207 update_num_retained();
208 if (num_retained_ >= max_nom_size_) compress();
209 reset_sorted_view();
210}
211
212template<typename T, typename C, typename A>
214 if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
215 return *min_item_;
216}
217
218template<typename T, typename C, typename A>
220 if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
221 return *max_item_;
222}
223
224template<typename T, typename C, typename A>
226 return comparator_;
227}
228
229template<typename T, typename C, typename A>
231 return allocator_;
232}
233
234template<typename T, typename C, typename A>
235double req_sketch<T, C, A>::get_rank(const T& item, bool inclusive) const {
236 if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
237 uint64_t weight = 0;
238 for (const auto& compactor: compactors_) {
239 weight += compactor.compute_weight(item, inclusive);
240 }
241 return static_cast<double>(weight) / n_;
242}
243
244template<typename T, typename C, typename A>
245auto req_sketch<T, C, A>::get_PMF(const T* split_points, uint32_t size, bool inclusive) const -> vector_double {
246 if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
247 setup_sorted_view();
248 return sorted_view_->get_PMF(split_points, size, inclusive);
249}
250
251template<typename T, typename C, typename A>
252auto req_sketch<T, C, A>::get_CDF(const T* split_points, uint32_t size, bool inclusive) const -> vector_double {
253 if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
254 setup_sorted_view();
255 return sorted_view_->get_CDF(split_points, size, inclusive);
256}
257
258template<typename T, typename C, typename A>
259auto req_sketch<T, C, A>::get_quantile(double rank, bool inclusive) const -> quantile_return_type {
260 if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
261 if ((rank < 0.0) || (rank > 1.0)) {
262 throw std::invalid_argument("Normalized rank cannot be less than 0 or greater than 1");
263 }
264 // possible side-effect of sorting level zero
265 setup_sorted_view();
266 return sorted_view_->get_quantile(rank, inclusive);
267}
268
269template<typename T, typename C, typename A>
271 if (!compactors_[0].is_sorted()) {
272 const_cast<Compactor&>(compactors_[0]).sort(); // allow this side effect
273 }
274 quantiles_sorted_view<T, C, A> view(get_num_retained(), comparator_, allocator_);
275
276 for (auto& compactor: compactors_) {
277 view.add(compactor.begin(), compactor.end(), 1ULL << compactor.get_lg_weight());
278 }
279
280 view.convert_to_cummulative();
281 return view;
282}
283
284template<typename T, typename C, typename A>
285double req_sketch<T, C, A>::get_rank_lower_bound(double rank, uint8_t num_std_dev) const {
286 return get_rank_lb(get_k(), get_num_levels(), rank, num_std_dev, get_n(), hra_);
287}
288
289template<typename T, typename C, typename A>
290double req_sketch<T, C, A>::get_rank_upper_bound(double rank, uint8_t num_std_dev) const {
291 return get_rank_ub(get_k(), get_num_levels(), rank, num_std_dev, get_n(), hra_);
292}
293
294template<typename T, typename C, typename A>
295double req_sketch<T, C, A>::get_RSE(uint16_t k, double rank, bool hra, uint64_t n) {
296 return get_rank_ub(k, 2, rank, 1, n, hra) - rank;
297}
298
299template<typename T, typename C, typename A>
300double req_sketch<T, C, A>::get_rank_lb(uint16_t k, uint8_t num_levels, double rank, uint8_t num_std_dev, uint64_t n, bool hra) {
301 if (is_exact_rank(k, num_levels, rank, n, hra)) return rank;
302 const double relative = relative_rse_factor() / k * (hra ? 1.0 - rank : rank);
303 const double fixed = FIXED_RSE_FACTOR / k;
304 const double lb_rel = rank - num_std_dev * relative;
305 const double lb_fix = rank - num_std_dev * fixed;
306 return std::max(lb_rel, lb_fix);
307}
308
309template<typename T, typename C, typename A>
310double req_sketch<T, C, A>::get_rank_ub(uint16_t k, uint8_t num_levels, double rank, uint8_t num_std_dev, uint64_t n, bool hra) {
311 if (is_exact_rank(k, num_levels, rank, n, hra)) return rank;
312 const double relative = relative_rse_factor() / k * (hra ? 1.0 - rank : rank);
313 const double fixed = FIXED_RSE_FACTOR / k;
314 const double ub_rel = rank + num_std_dev * relative;
315 const double ub_fix = rank + num_std_dev * fixed;
316 return std::min(ub_rel, ub_fix);
317}
318
319template<typename T, typename C, typename A>
320bool req_sketch<T, C, A>::is_exact_rank(uint16_t k, uint8_t num_levels, double rank, uint64_t n, bool hra) {
321 const unsigned base_cap = k * req_constants::INIT_NUM_SECTIONS;
322 if (num_levels == 1 || n <= base_cap) return true;
323 const double exact_rank_thresh = static_cast<double>(base_cap) / n;
324 return (hra && rank >= 1.0 - exact_rank_thresh) || (!hra && rank <= exact_rank_thresh);
325}
326
327template<typename T, typename C, typename A>
328double req_sketch<T, C, A>::relative_rse_factor() {
329 return sqrt(0.0512 / req_constants::INIT_NUM_SECTIONS);
330}
331
332// implementation for fixed-size arithmetic types (integral and floating point)
333template<typename T, typename C, typename A>
334template<typename TT, typename SerDe, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
336 size_t size = PREAMBLE_SIZE_BYTES;
337 if (is_empty()) return size;
338 if (is_estimation_mode()) {
339 size += sizeof(n_) + sizeof(TT) * 2; // min and max
340 }
341 if (n_ == 1) {
342 size += sizeof(TT);
343 } else {
344 for (const auto& compactor: compactors_) size += compactor.get_serialized_size_bytes(sd);
345 }
346 return size;
347}
348
349// implementation for all other types
350template<typename T, typename C, typename A>
351template<typename TT, typename SerDe, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
352size_t req_sketch<T, C, A>::get_serialized_size_bytes(const SerDe& sd) const {
353 size_t size = PREAMBLE_SIZE_BYTES;
354 if (is_empty()) return size;
355 if (is_estimation_mode()) {
356 size += sizeof(n_);
357 size += sd.size_of_item(*min_item_);
358 size += sd.size_of_item(*max_item_);
359 }
360 if (n_ == 1) {
361 size += sd.size_of_item(*compactors_[0].begin());
362 } else {
363 for (const auto& compactor: compactors_) size += compactor.get_serialized_size_bytes(sd);
364 }
365 return size;
366}
367
368template<typename T, typename C, typename A>
369template<typename SerDe>
370void req_sketch<T, C, A>::serialize(std::ostream& os, const SerDe& sd) const {
371 const uint8_t preamble_ints = is_estimation_mode() ? 4 : 2;
372 write(os, preamble_ints);
373 const uint8_t serial_version = SERIAL_VERSION;
374 write(os, serial_version);
375 const uint8_t family = FAMILY;
376 write(os, family);
377 const bool raw_items = n_ <= req_constants::MIN_K;
378 const uint8_t flags_byte(
379 (is_empty() ? 1 << flags::IS_EMPTY : 0)
380 | (hra_ ? 1 << flags::IS_HIGH_RANK : 0)
381 | (raw_items ? 1 << flags::RAW_ITEMS : 0)
382 | (compactors_[0].is_sorted() ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0)
383 );
384 write(os, flags_byte);
385 write(os, k_);
386 const uint8_t num_levels = is_empty() ? 0 : get_num_levels();
387 write(os, num_levels);
388 const uint8_t num_raw_items = raw_items ? static_cast<uint8_t>(n_) : 0;
389 write(os, num_raw_items);
390 if (is_empty()) return;
391 if (is_estimation_mode()) {
392 write(os, n_);
393 sd.serialize(os, &*min_item_, 1);
394 sd.serialize(os, &*max_item_, 1);
395 }
396 if (raw_items) {
397 sd.serialize(os, compactors_[0].begin(), num_raw_items);
398 } else {
399 for (const auto& compactor: compactors_) compactor.serialize(os, sd);
400 }
401}
402
403template<typename T, typename C, typename A>
404template<typename SerDe>
405auto req_sketch<T, C, A>::serialize(unsigned header_size_bytes, const SerDe& sd) const -> vector_bytes {
406 const size_t size = header_size_bytes + get_serialized_size_bytes(sd);
407 vector_bytes bytes(size, 0, allocator_);
408 uint8_t* ptr = bytes.data() + header_size_bytes;
409 const uint8_t* end_ptr = ptr + size;
410
411 const uint8_t preamble_ints = is_estimation_mode() ? 4 : 2;
412 ptr += copy_to_mem(preamble_ints, ptr);
413 const uint8_t serial_version = SERIAL_VERSION;
414 ptr += copy_to_mem(serial_version, ptr);
415 const uint8_t family = FAMILY;
416 ptr += copy_to_mem(family, ptr);
417 const bool raw_items = n_ <= req_constants::MIN_K;
418 const uint8_t flags_byte(
419 (is_empty() ? 1 << flags::IS_EMPTY : 0)
420 | (hra_ ? 1 << flags::IS_HIGH_RANK : 0)
421 | (raw_items ? 1 << flags::RAW_ITEMS : 0)
422 | (compactors_[0].is_sorted() ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0)
423 );
424 ptr += copy_to_mem(flags_byte, ptr);
425 ptr += copy_to_mem(k_, ptr);
426 const uint8_t num_levels = is_empty() ? 0 : get_num_levels();
427 ptr += copy_to_mem(num_levels, ptr);
428 const uint8_t num_raw_items = raw_items ? static_cast<uint8_t>(n_) : 0;
429 ptr += copy_to_mem(num_raw_items, ptr);
430 if (!is_empty()) {
431 if (is_estimation_mode()) {
432 ptr += copy_to_mem(n_, ptr);
433 ptr += sd.serialize(ptr, end_ptr - ptr, &*min_item_, 1);
434 ptr += sd.serialize(ptr, end_ptr - ptr, &*max_item_, 1);
435 }
436 if (raw_items) {
437 ptr += sd.serialize(ptr, end_ptr - ptr, compactors_[0].begin(), num_raw_items);
438 } else {
439 for (const auto& compactor: compactors_) ptr += compactor.serialize(ptr, end_ptr - ptr, sd);
440 }
441 }
442 return bytes;
443}
444
445template<typename T, typename C, typename A>
446template<typename SerDe>
447req_sketch<T, C, A> req_sketch<T, C, A>::deserialize(std::istream& is, const SerDe& sd, const C& comparator, const A& allocator) {
448 const auto preamble_ints = read<uint8_t>(is);
449 const auto serial_version = read<uint8_t>(is);
450 const auto family_id = read<uint8_t>(is);
451 const auto flags_byte = read<uint8_t>(is);
452 const auto k = read<uint16_t>(is);
453 const auto num_levels = read<uint8_t>(is);
454 const auto num_raw_items = read<uint8_t>(is);
455
456 check_preamble_ints(preamble_ints, num_levels);
457 check_serial_version(serial_version);
458 check_family_id(family_id);
459
460 if (!is.good()) throw std::runtime_error("error reading from std::istream");
461 const bool is_empty = flags_byte & (1 << flags::IS_EMPTY);
462 const bool hra = flags_byte & (1 << flags::IS_HIGH_RANK);
463 if (is_empty) return req_sketch(k, hra, comparator, allocator);
464
465 optional<T> min_item;
466 optional<T> max_item;
467
468 const bool raw_items = flags_byte & (1 << flags::RAW_ITEMS);
469 const bool is_level_0_sorted = flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED);
470 std::vector<Compactor, AllocCompactor> compactors(allocator);
471
472 uint64_t n = 1;
473 if (num_levels > 1) {
474 n = read<uint64_t>(is);
475 // Space to deserialize min and max.
476 // serde::deserialize expects allocated but not initialized storage.
477 typename std::aligned_storage<sizeof(T), alignof(T)>::type tmp_storage;
478 T* tmp = reinterpret_cast<T*>(&tmp_storage);
479
480 sd.deserialize(is, tmp, 1);
481 // serde call did not throw, repackage and cleanup
482 min_item.emplace(std::move(*tmp));
483 tmp->~T();
484 sd.deserialize(is, tmp, 1);
485 // serde call did not throw, repackage and cleanup
486 max_item.emplace(std::move(*tmp));
487 tmp->~T();
488 }
489
490 if (raw_items) {
491 compactors.push_back(Compactor::deserialize(is, sd, comparator, allocator, is_level_0_sorted, k, num_raw_items, hra));
492 } else {
493 for (size_t i = 0; i < num_levels; ++i) {
494 compactors.push_back(Compactor::deserialize(is, sd, comparator, allocator, i == 0 ? is_level_0_sorted : true, hra));
495 }
496 }
497 if (num_levels == 1) {
498 const auto begin = compactors[0].begin();
499 const auto end = compactors[0].end();
500 n = compactors[0].get_num_items();
501 auto min_it = begin;
502 auto max_it = begin;
503 for (auto it = begin; it != end; ++it) {
504 if (comparator(*it, *min_it)) min_it = it;
505 if (comparator(*max_it, *it)) max_it = it;
506 }
507 min_item.emplace(*min_it);
508 max_item.emplace(*max_it);
509 }
510
511 if (!is.good()) throw std::runtime_error("error reading from std::istream");
512 return req_sketch(k, hra, n, std::move(min_item), std::move(max_item), std::move(compactors), comparator);
513}
514
515template<typename T, typename C, typename A>
516template<typename SerDe>
517req_sketch<T, C, A> req_sketch<T, C, A>::deserialize(const void* bytes, size_t size, const SerDe& sd, const C& comparator, const A& allocator) {
518 ensure_minimum_memory(size, 8);
519 const char* ptr = static_cast<const char*>(bytes);
520 const char* end_ptr = static_cast<const char*>(bytes) + size;
521
522 uint8_t preamble_ints;
523 ptr += copy_from_mem(ptr, preamble_ints);
524 uint8_t serial_version;
525 ptr += copy_from_mem(ptr, serial_version);
526 uint8_t family_id;
527 ptr += copy_from_mem(ptr, family_id);
528 uint8_t flags_byte;
529 ptr += copy_from_mem(ptr, flags_byte);
530 uint16_t k;
531 ptr += copy_from_mem(ptr, k);
532 uint8_t num_levels;
533 ptr += copy_from_mem(ptr, num_levels);
534 uint8_t num_raw_items;
535 ptr += copy_from_mem(ptr, num_raw_items);
536
537 check_preamble_ints(preamble_ints, num_levels);
538 check_serial_version(serial_version);
539 check_family_id(family_id);
540
541 const bool is_empty = flags_byte & (1 << flags::IS_EMPTY);
542 const bool hra = flags_byte & (1 << flags::IS_HIGH_RANK);
543 if (is_empty) return req_sketch(k, hra, comparator, allocator);
544
545 optional<T> min_item;
546 optional<T> max_item;
547
548 const bool raw_items = flags_byte & (1 << flags::RAW_ITEMS);
549 const bool is_level_0_sorted = flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED);
550 std::vector<Compactor, AllocCompactor> compactors(allocator);
551
552 uint64_t n = 1;
553 if (num_levels > 1) {
554 ensure_minimum_memory(end_ptr - ptr, sizeof(n));
555 ptr += copy_from_mem(ptr, n);
556 // Space to deserialize min and max.
557 // serde::deserialize expects allocated but not initialized storage.
558 typename std::aligned_storage<sizeof(T), alignof(T)>::type tmp_storage;
559 T* tmp = reinterpret_cast<T*>(&tmp_storage);
560
561 ptr += sd.deserialize(ptr, end_ptr - ptr, tmp, 1);
562 // serde call did not throw, repackage and cleanup
563 min_item.emplace(std::move(*tmp));
564 tmp->~T();
565 ptr += sd.deserialize(ptr, end_ptr - ptr, tmp, 1);
566 // serde call did not throw, repackage and cleanup
567 max_item.emplace(std::move(*tmp));
568 tmp->~T();
569 }
570
571 if (raw_items) {
572 auto pair = Compactor::deserialize(ptr, end_ptr - ptr, sd, comparator, allocator, is_level_0_sorted, k, num_raw_items, hra);
573 compactors.push_back(std::move(pair.first));
574 ptr += pair.second;
575 } else {
576 for (size_t i = 0; i < num_levels; ++i) {
577 auto pair = Compactor::deserialize(ptr, end_ptr - ptr, sd, comparator, allocator, i == 0 ? is_level_0_sorted : true, hra);
578 compactors.push_back(std::move(pair.first));
579 ptr += pair.second;
580 }
581 }
582 if (num_levels == 1) {
583 const auto begin = compactors[0].begin();
584 const auto end = compactors[0].end();
585 n = compactors[0].get_num_items();
586 auto min_it = begin;
587 auto max_it = begin;
588 for (auto it = begin; it != end; ++it) {
589 if (comparator(*it, *min_it)) min_it = it;
590 if (comparator(*max_it, *it)) max_it = it;
591 }
592 min_item.emplace(*min_it);
593 max_item.emplace(*max_it);
594 }
595
596 return req_sketch(k, hra, n, std::move(min_item), std::move(max_item), std::move(compactors), comparator);
597}
598
599template<typename T, typename C, typename A>
600void req_sketch<T, C, A>::grow() {
601 const uint8_t lg_weight = get_num_levels();
602 compactors_.push_back(Compactor(hra_, lg_weight, k_, comparator_, allocator_));
603 update_max_nom_size();
604}
605
606template<typename T, typename C, typename A>
607uint8_t req_sketch<T, C, A>::get_num_levels() const {
608 return static_cast<uint8_t>(compactors_.size());
609}
610
611template<typename T, typename C, typename A>
612void req_sketch<T, C, A>::update_max_nom_size() {
613 max_nom_size_ = 0;
614 for (const auto& compactor: compactors_) max_nom_size_ += compactor.get_nom_capacity();
615}
616
617template<typename T, typename C, typename A>
618void req_sketch<T, C, A>::update_num_retained() {
619 num_retained_ = 0;
620 for (const auto& compactor: compactors_) num_retained_ += compactor.get_num_items();
621}
622
623template<typename T, typename C, typename A>
624void req_sketch<T, C, A>::compress() {
625 for (size_t h = 0; h < compactors_.size(); ++h) {
626 if (compactors_[h].get_num_items() >= compactors_[h].get_nom_capacity()) {
627 if (h == 0) compactors_[0].sort();
628 if (h + 1 >= get_num_levels()) { // at the top?
629 grow(); // add a level, increases max_nom_size
630 }
631 auto pair = compactors_[h].compact(compactors_[h + 1]);
632 num_retained_ -= pair.first;
633 max_nom_size_ += pair.second;
634 if (LAZY_COMPRESSION && num_retained_ < max_nom_size_) break;
635 }
636 }
637}
638
639template<typename T, typename C, typename A>
640string<A> req_sketch<T, C, A>::to_string(bool print_levels, bool print_items) const {
641 // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements.
642 // The stream does not support passing an allocator instance, and alternatives are complicated.
643 std::ostringstream os;
644 os << "### REQ sketch summary:" << std::endl;
645 os << " K : " << k_ << std::endl;
646 os << " High Rank Acc : " << (hra_ ? "true" : "false") << std::endl;
647 os << " Empty : " << (is_empty() ? "true" : "false") << std::endl;
648 os << " Estimation mode: " << (is_estimation_mode() ? "true" : "false") << std::endl;
649 os << " Sorted : " << (compactors_[0].is_sorted() ? "true" : "false") << std::endl;
650 os << " N : " << n_ << std::endl;
651 os << " Levels : " << compactors_.size() << std::endl;
652 os << " Retained items : " << num_retained_ << std::endl;
653 os << " Capacity items : " << max_nom_size_ << std::endl;
654 if (!is_empty()) {
655 os << " Min item : " << *min_item_ << std::endl;
656 os << " Max item : " << *max_item_ << std::endl;
657 }
658 os << "### End sketch summary" << std::endl;
659
660 if (print_levels) {
661 os << "### REQ sketch levels:" << std::endl;
662 os << " index: nominal capacity, actual size" << std::endl;
663 for (uint8_t i = 0; i < compactors_.size(); i++) {
664 os << " " << (unsigned int) i << ": "
665 << compactors_[i].get_nom_capacity() << ", "
666 << compactors_[i].get_num_items() << std::endl;
667 }
668 os << "### End sketch levels" << std::endl;
669 }
670
671 if (print_items) {
672 os << "### REQ sketch data:" << std::endl;
673 unsigned level = 0;
674 for (const auto& compactor: compactors_) {
675 os << " level " << level << ": " << std::endl;
676 for (auto it = compactor.begin(); it != compactor.end(); ++it) {
677 os << " " << *it << std::endl;
678 }
679 ++level;
680 }
681 os << "### End sketch data" << std::endl;
682 }
683 return string<A>(os.str().c_str(), allocator_);
684}
685
686template<typename T, typename C, typename A>
687req_sketch<T, C, A>::req_sketch(uint16_t k, bool hra, uint64_t n,
688 optional<T>&& min_item, optional<T>&& max_item,
689 std::vector<Compactor, AllocCompactor>&& compactors, const C& comparator):
690comparator_(comparator),
691allocator_(compactors.get_allocator()),
692k_(k),
693hra_(hra),
694max_nom_size_(0),
695num_retained_(0),
696n_(n),
697compactors_(std::move(compactors)),
698min_item_(std::move(min_item)),
699max_item_(std::move(max_item)),
700sorted_view_(nullptr)
701{
702 update_max_nom_size();
703 update_num_retained();
704}
705
706template<typename T, typename C, typename A>
707void req_sketch<T, C, A>::check_preamble_ints(uint8_t preamble_ints, uint8_t num_levels) {
708 const uint8_t expected_preamble_ints = num_levels > 1 ? 4 : 2;
709 if (preamble_ints != expected_preamble_ints) {
710 throw std::invalid_argument("Possible corruption: preamble ints must be "
711 + std::to_string(expected_preamble_ints) + ", got " + std::to_string(preamble_ints));
712 }
713}
714
715template<typename T, typename C, typename A>
716void req_sketch<T, C, A>::check_serial_version(uint8_t serial_version) {
717 if (serial_version != SERIAL_VERSION) {
718 throw std::invalid_argument("Possible corruption: serial version mismatch: expected "
719 + std::to_string(SERIAL_VERSION)
720 + ", got " + std::to_string(serial_version));
721 }
722}
723
724template<typename T, typename C, typename A>
725void req_sketch<T, C, A>::check_family_id(uint8_t family_id) {
726 if (family_id != FAMILY) {
727 throw std::invalid_argument("Possible corruption: family mismatch: expected "
728 + std::to_string(FAMILY) + ", got " + std::to_string(family_id));
729 }
730}
731
732template<typename T, typename C, typename A>
733auto req_sketch<T, C, A>::begin() const -> const_iterator {
734 return const_iterator(compactors_.begin(), compactors_.end());
735}
736
737template<typename T, typename C, typename A>
738auto req_sketch<T, C, A>::end() const -> const_iterator {
739 return const_iterator(compactors_.end(), compactors_.end());
740}
741
742template<typename T, typename C, typename A>
744 if (sorted_view_ == nullptr) {
745 using AllocSortedView = typename std::allocator_traits<A>::template rebind_alloc<quantiles_sorted_view<T, C, A>>;
746 sorted_view_ = new (AllocSortedView(allocator_).allocate(1)) quantiles_sorted_view<T, C, A>(get_sorted_view());
747 }
748}
749
750template<typename T, typename C, typename A>
751void req_sketch<T, C, A>::reset_sorted_view() {
752 if (sorted_view_ != nullptr) {
753 sorted_view_->~quantiles_sorted_view();
754 using AllocSortedView = typename std::allocator_traits<A>::template rebind_alloc<quantiles_sorted_view<T, C, A>>;
755 AllocSortedView(allocator_).deallocate(sorted_view_, 1);
756 sorted_view_ = nullptr;
757 }
758}
759
760// iterator
761
762template<typename T, typename C, typename A>
763req_sketch<T, C, A>::const_iterator::const_iterator(LevelsIterator begin, LevelsIterator end):
764 levels_it_(begin),
765 levels_end_(end),
766 compactor_it_(begin == end ? nullptr : (*levels_it_).begin())
767{}
768
769template<typename T, typename C, typename A>
770auto req_sketch<T, C, A>::const_iterator::operator++() -> const_iterator& {
771 ++compactor_it_;
772 if (compactor_it_ == (*levels_it_).end()) {
773 ++levels_it_;
774 if (levels_it_ != levels_end_) compactor_it_ = (*levels_it_).begin();
775 }
776 return *this;
777}
778
779template<typename T, typename C, typename A>
780auto req_sketch<T, C, A>::const_iterator::operator++(int) -> const_iterator& {
781 const_iterator tmp(*this);
782 operator++();
783 return tmp;
784}
785
786template<typename T, typename C, typename A>
787bool req_sketch<T, C, A>::const_iterator::operator==(const const_iterator& other) const {
788 if (levels_it_ != other.levels_it_) return false;
789 if (levels_it_ == levels_end_) return true;
790 return compactor_it_ == other.compactor_it_;
791}
792
793template<typename T, typename C, typename A>
794bool req_sketch<T, C, A>::const_iterator::operator!=(const const_iterator& other) const {
795 return !operator==(other);
796}
797
798template<typename T, typename C, typename A>
799auto req_sketch<T, C, A>::const_iterator::operator*() const -> reference {
800 return value_type(*compactor_it_, 1ULL << (*levels_it_).get_lg_weight());
801}
802
803template<typename T, typename C, typename A>
804auto req_sketch<T, C, A>::const_iterator::operator->() const -> pointer {
805 return **this;
806}
807
808} /* namespace datasketches */
809
810#endif
Sorted view for quantiles sketches (REQ, KLL and Quantiles)
Definition quantiles_sorted_view.hpp:38
Relative Error Quantiles Sketch.
Definition req_sketch.hpp:84
Comparator get_comparator() const
Returns an instance of the comparator for this sketch.
Definition req_sketch_impl.hpp:225
quantiles_sorted_view< T, Comparator, Allocator > get_sorted_view() const
Gets the sorted view of this sketch.
Definition req_sketch_impl.hpp:270
static req_sketch deserialize(std::istream &is, const SerDe &sd=SerDe(), const Comparator &comparator=Comparator(), const Allocator &allocator=Allocator())
This method deserializes a sketch from a given stream.
vector_double get_CDF(const T *split_points, uint32_t size, bool inclusive=true) const
Returns an approximation to the Cumulative Distribution Function (CDF), which is the cumulative analo...
Definition req_sketch_impl.hpp:252
uint32_t get_num_retained() const
Returns the number of retained items in the sketch.
Definition req_sketch_impl.hpp:160
vector_double get_PMF(const T *split_points, uint32_t size, bool inclusive=true) const
Returns an approximation to the Probability Mass Function (PMF) of the input stream given a set of sp...
Definition req_sketch_impl.hpp:245
void merge(FwdSk &&other)
Merges another sketch into this one.
Definition req_sketch_impl.hpp:189
typename quantiles_sorted_view< T, Comparator, Allocator >::quantile_return_type quantile_return_type
Quantile return type.
Definition req_sketch.hpp:97
void update(FwdT &&item)
Updates this sketch with the given data item.
Definition req_sketch_impl.hpp:171
void serialize(std::ostream &os, const SerDe &sd=SerDe()) const
This method serializes the sketch into a given stream in a binary form.
Definition req_sketch_impl.hpp:370
string< Allocator > to_string(bool print_levels=false, bool print_items=false) const
Prints a summary of the sketch.
Definition req_sketch_impl.hpp:640
uint16_t get_k() const
Returns configured parameter K.
Definition req_sketch_impl.hpp:140
bool is_empty() const
Returns true if this sketch is empty.
Definition req_sketch_impl.hpp:150
const T & get_max_item() const
Returns the max item of the stream.
Definition req_sketch_impl.hpp:219
double get_rank_upper_bound(double rank, uint8_t num_std_dev) const
Returns an approximate upper bound of the given normalized rank.
Definition req_sketch_impl.hpp:290
double get_rank(const T &item, bool inclusive=true) const
Returns an approximation to the normalized rank of the given item from 0 to 1 inclusive.
Definition req_sketch_impl.hpp:235
static double get_RSE(uint16_t k, double rank, bool hra, uint64_t n)
Returns an a priori estimate of relative standard error (RSE, expressed as a number in [0,...
Definition req_sketch_impl.hpp:295
double get_rank_lower_bound(double rank, uint8_t num_std_dev) const
Returns an approximate lower bound of the given normalized rank.
Definition req_sketch_impl.hpp:285
Allocator get_allocator() const
Returns an instance of the allocator for this sketch.
Definition req_sketch_impl.hpp:230
req_sketch & operator=(const req_sketch &other)
Copy assignment.
Definition req_sketch_impl.hpp:82
quantile_return_type get_quantile(double rank, bool inclusive=true) const
Returns an approximate quantile of the given normalized rank.
Definition req_sketch_impl.hpp:259
const_iterator begin() const
Iterator pointing to the first item in the sketch.
Definition req_sketch_impl.hpp:733
size_t get_serialized_size_bytes(const SerDe &sd=SerDe()) const
Computes size needed to serialize the current state of the sketch.
Definition req_sketch_impl.hpp:335
const_iterator end() const
Iterator pointing to the past-the-end item in the sketch.
Definition req_sketch_impl.hpp:738
bool is_estimation_mode() const
Returns true if this sketch is in estimation mode.
Definition req_sketch_impl.hpp:165
const T & get_min_item() const
Returns the min item of the stream.
Definition req_sketch_impl.hpp:213
bool is_HRA() const
Returns configured parameter High Rank Accuracy.
Definition req_sketch_impl.hpp:145
uint64_t get_n() const
Returns the length of the input stream.
Definition req_sketch_impl.hpp:155
const uint16_t MIN_K
min value of parameter K
Definition kll_sketch.hpp:39
const uint8_t INIT_NUM_SECTIONS
initial number of sections
Definition req_common.hpp:35
const uint16_t MIN_K
minimum value of parameter K
Definition req_common.hpp:33
DataSketches namespace.
Definition binomial_bounds.hpp:38