datasketches-cpp
Loading...
Searching...
No Matches
req_sketch_impl.hpp
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#ifndef REQ_SKETCH_IMPL_HPP_
21#define REQ_SKETCH_IMPL_HPP_
22
23#include <sstream>
24#include <stdexcept>
25
26namespace datasketches {
27
28template<typename T, typename C, typename A>
29req_sketch<T, C, A>::req_sketch(uint16_t k, bool hra, const C& comparator, const A& allocator):
30comparator_(comparator),
31allocator_(allocator),
32k_(std::max<uint8_t>(static_cast<int>(k) & -2, static_cast<int>(req_constants::MIN_K))), //rounds down one if odd
33hra_(hra),
34max_nom_size_(0),
35num_retained_(0),
36n_(0),
37compactors_(allocator),
38min_item_(),
39max_item_(),
40sorted_view_(nullptr)
41{
42 grow();
43}
44
45template<typename T, typename C, typename A>
46req_sketch<T, C, A>::~req_sketch() {
47 reset_sorted_view();
48}
49
50template<typename T, typename C, typename A>
52comparator_(other.comparator_),
53allocator_(other.allocator_),
54k_(other.k_),
55hra_(other.hra_),
56max_nom_size_(other.max_nom_size_),
57num_retained_(other.num_retained_),
58n_(other.n_),
59compactors_(other.compactors_),
60min_item_(other.min_item_),
61max_item_(other.max_item_),
62sorted_view_(nullptr)
63{}
64
65template<typename T, typename C, typename A>
67comparator_(std::move(other.comparator_)),
68allocator_(std::move(other.allocator_)),
69k_(other.k_),
70hra_(other.hra_),
71max_nom_size_(other.max_nom_size_),
72num_retained_(other.num_retained_),
73n_(other.n_),
74compactors_(std::move(other.compactors_)),
75min_item_(std::move(other.min_item_)),
76max_item_(std::move(other.max_item_)),
77sorted_view_(nullptr)
78{}
79
80template<typename T, typename C, typename A>
82 req_sketch copy(other);
83 std::swap(comparator_, copy.comparator_);
84 std::swap(allocator_, copy.allocator_);
85 std::swap(k_, copy.k_);
86 std::swap(hra_, copy.hra_);
87 std::swap(max_nom_size_, copy.max_nom_size_);
88 std::swap(num_retained_, copy.num_retained_);
89 std::swap(n_, copy.n_);
90 std::swap(compactors_, copy.compactors_);
91 std::swap(min_item_, copy.min_item_);
92 std::swap(max_item_, copy.max_item_);
93 reset_sorted_view();
94 return *this;
95}
96
97template<typename T, typename C, typename A>
99 std::swap(comparator_, other.comparator_);
100 std::swap(allocator_, other.allocator_);
101 std::swap(k_, other.k_);
102 std::swap(hra_, other.hra_);
103 std::swap(max_nom_size_, other.max_nom_size_);
104 std::swap(num_retained_, other.num_retained_);
105 std::swap(n_, other.n_);
106 std::swap(compactors_, other.compactors_);
107 std::swap(min_item_, other.min_item_);
108 std::swap(max_item_, other.max_item_);
109 reset_sorted_view();
110 return *this;
111}
112
113template<typename T, typename C, typename A>
114template<typename TT, typename CC, typename AA>
115req_sketch<T, C, A>::req_sketch(const req_sketch<TT, CC, AA>& other, const C& comparator, const A& allocator):
116comparator_(comparator),
117allocator_(allocator),
118k_(other.k_),
119hra_(other.hra_),
120max_nom_size_(other.max_nom_size_),
121num_retained_(other.num_retained_),
122n_(other.n_),
123compactors_(allocator),
124min_item_(other.min_item_),
125max_item_(other.max_item_),
126sorted_view_(nullptr)
127{
128 static_assert(
129 std::is_constructible<T, TT>::value,
130 "Type converting constructor requires new type to be constructible from existing type"
131 );
132 compactors_.reserve(other.compactors_.size());
133 for (const auto& compactor: other.compactors_) {
134 compactors_.push_back(req_compactor<T, C, A>(compactor, comparator_, allocator_));
135 }
136}
137
138template<typename T, typename C, typename A>
140 return k_;
141}
142
143template<typename T, typename C, typename A>
145 return hra_;
146}
147
148template<typename T, typename C, typename A>
150 return n_ == 0;
151}
152
153template<typename T, typename C, typename A>
155 return n_;
156}
157
158template<typename T, typename C, typename A>
160 return num_retained_;
161}
162
163template<typename T, typename C, typename A>
165 return compactors_.size() > 1;
166}
167
168template<typename T, typename C, typename A>
169template<typename FwdT>
171 if (!check_update_item(item)) { return; }
172 if (is_empty()) {
173 min_item_.emplace(item);
174 max_item_.emplace(item);
175 } else {
176 if (comparator_(item, *min_item_)) *min_item_ = item;
177 if (comparator_(*max_item_, item)) *max_item_ = item;
178 }
179 compactors_[0].append(std::forward<FwdT>(item));
180 ++num_retained_;
181 ++n_;
182 if (num_retained_ == max_nom_size_) compress();
183 reset_sorted_view();
184}
185
186template<typename T, typename C, typename A>
187template<typename FwdSk>
188void req_sketch<T, C, A>::merge(FwdSk&& other) {
189 if (is_HRA() != other.is_HRA()) throw std::invalid_argument("merging HRA and LRA is not valid");
190 if (other.is_empty()) return;
191 if (is_empty()) {
192 min_item_.emplace(conditional_forward<FwdSk>(*other.min_item_));
193 max_item_.emplace(conditional_forward<FwdSk>(*other.max_item_));
194 } else {
195 if (comparator_(*other.min_item_, *min_item_)) *min_item_ = conditional_forward<FwdSk>(*other.min_item_);
196 if (comparator_(*max_item_, *other.max_item_)) *max_item_ = conditional_forward<FwdSk>(*other.max_item_);
197 }
198 // grow until this has at least as many compactors as other
199 while (get_num_levels() < other.get_num_levels()) grow();
200 // merge the items in all height compactors
201 for (size_t i = 0; i < other.get_num_levels(); ++i) {
202 compactors_[i].merge(conditional_forward<FwdSk>(other.compactors_[i]));
203 }
204 n_ += other.n_;
205 update_max_nom_size();
206 update_num_retained();
207 if (num_retained_ >= max_nom_size_) compress();
208 reset_sorted_view();
209}
210
211template<typename T, typename C, typename A>
213 if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
214 return *min_item_;
215}
216
217template<typename T, typename C, typename A>
219 if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
220 return *max_item_;
221}
222
223template<typename T, typename C, typename A>
225 return comparator_;
226}
227
228template<typename T, typename C, typename A>
230 return allocator_;
231}
232
233template<typename T, typename C, typename A>
234double req_sketch<T, C, A>::get_rank(const T& item, bool inclusive) const {
235 if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
236 uint64_t weight = 0;
237 for (const auto& compactor: compactors_) {
238 weight += compactor.compute_weight(item, inclusive);
239 }
240 return static_cast<double>(weight) / n_;
241}
242
243template<typename T, typename C, typename A>
244auto req_sketch<T, C, A>::get_PMF(const T* split_points, uint32_t size, bool inclusive) const -> vector_double {
245 if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
246 setup_sorted_view();
247 return sorted_view_->get_PMF(split_points, size, inclusive);
248}
249
250template<typename T, typename C, typename A>
251auto req_sketch<T, C, A>::get_CDF(const T* split_points, uint32_t size, bool inclusive) const -> vector_double {
252 if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
253 setup_sorted_view();
254 return sorted_view_->get_CDF(split_points, size, inclusive);
255}
256
257template<typename T, typename C, typename A>
258auto req_sketch<T, C, A>::get_quantile(double rank, bool inclusive) const -> quantile_return_type {
259 if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
260 if ((rank < 0.0) || (rank > 1.0)) {
261 throw std::invalid_argument("Normalized rank cannot be less than 0 or greater than 1");
262 }
263 // possible side-effect of sorting level zero
264 setup_sorted_view();
265 return sorted_view_->get_quantile(rank, inclusive);
266}
267
268template<typename T, typename C, typename A>
270 if (!compactors_[0].is_sorted()) {
271 const_cast<Compactor&>(compactors_[0]).sort(); // allow this side effect
272 }
273 quantiles_sorted_view<T, C, A> view(get_num_retained(), comparator_, allocator_);
274
275 for (auto& compactor: compactors_) {
276 view.add(compactor.begin(), compactor.end(), 1ULL << compactor.get_lg_weight());
277 }
278
279 view.convert_to_cummulative();
280 return view;
281}
282
283template<typename T, typename C, typename A>
284double req_sketch<T, C, A>::get_rank_lower_bound(double rank, uint8_t num_std_dev) const {
285 return get_rank_lb(get_k(), get_num_levels(), rank, num_std_dev, get_n(), hra_);
286}
287
288template<typename T, typename C, typename A>
289double req_sketch<T, C, A>::get_rank_upper_bound(double rank, uint8_t num_std_dev) const {
290 return get_rank_ub(get_k(), get_num_levels(), rank, num_std_dev, get_n(), hra_);
291}
292
293template<typename T, typename C, typename A>
294double req_sketch<T, C, A>::get_RSE(uint16_t k, double rank, bool hra, uint64_t n) {
295 return get_rank_lb(k, 2, rank, 1, n, hra);
296}
297
298template<typename T, typename C, typename A>
299double req_sketch<T, C, A>::get_rank_lb(uint16_t k, uint8_t num_levels, double rank, uint8_t num_std_dev, uint64_t n, bool hra) {
300 if (is_exact_rank(k, num_levels, rank, n, hra)) return rank;
301 const double relative = relative_rse_factor() / k * (hra ? 1.0 - rank : rank);
302 const double fixed = FIXED_RSE_FACTOR / k;
303 const double lb_rel = rank - num_std_dev * relative;
304 const double lb_fix = rank - num_std_dev * fixed;
305 return std::max(lb_rel, lb_fix);
306}
307
308template<typename T, typename C, typename A>
309double req_sketch<T, C, A>::get_rank_ub(uint16_t k, uint8_t num_levels, double rank, uint8_t num_std_dev, uint64_t n, bool hra) {
310 if (is_exact_rank(k, num_levels, rank, n, hra)) return rank;
311 const double relative = relative_rse_factor() / k * (hra ? 1.0 - rank : rank);
312 const double fixed = FIXED_RSE_FACTOR / k;
313 const double ub_rel = rank + num_std_dev * relative;
314 const double ub_fix = rank + num_std_dev * fixed;
315 return std::min(ub_rel, ub_fix);
316}
317
318template<typename T, typename C, typename A>
319bool req_sketch<T, C, A>::is_exact_rank(uint16_t k, uint8_t num_levels, double rank, uint64_t n, bool hra) {
320 const unsigned base_cap = k * req_constants::INIT_NUM_SECTIONS;
321 if (num_levels == 1 || n <= base_cap) return true;
322 const double exact_rank_thresh = static_cast<double>(base_cap) / n;
323 return (hra && rank >= 1.0 - exact_rank_thresh) || (!hra && rank <= exact_rank_thresh);
324}
325
326template<typename T, typename C, typename A>
327double req_sketch<T, C, A>::relative_rse_factor() {
328 return sqrt(0.0512 / req_constants::INIT_NUM_SECTIONS);
329}
330
331// implementation for fixed-size arithmetic types (integral and floating point)
332template<typename T, typename C, typename A>
333template<typename TT, typename SerDe, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
335 size_t size = PREAMBLE_SIZE_BYTES;
336 if (is_empty()) return size;
337 if (is_estimation_mode()) {
338 size += sizeof(n_) + sizeof(TT) * 2; // min and max
339 }
340 if (n_ == 1) {
341 size += sizeof(TT);
342 } else {
343 for (const auto& compactor: compactors_) size += compactor.get_serialized_size_bytes(sd);
344 }
345 return size;
346}
347
348// implementation for all other types
349template<typename T, typename C, typename A>
350template<typename TT, typename SerDe, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
351size_t req_sketch<T, C, A>::get_serialized_size_bytes(const SerDe& sd) const {
352 size_t size = PREAMBLE_SIZE_BYTES;
353 if (is_empty()) return size;
354 if (is_estimation_mode()) {
355 size += sizeof(n_);
356 size += sd.size_of_item(*min_item_);
357 size += sd.size_of_item(*max_item_);
358 }
359 if (n_ == 1) {
360 size += sd.size_of_item(*compactors_[0].begin());
361 } else {
362 for (const auto& compactor: compactors_) size += compactor.get_serialized_size_bytes(sd);
363 }
364 return size;
365}
366
367template<typename T, typename C, typename A>
368template<typename SerDe>
369void req_sketch<T, C, A>::serialize(std::ostream& os, const SerDe& sd) const {
370 const uint8_t preamble_ints = is_estimation_mode() ? 4 : 2;
371 write(os, preamble_ints);
372 const uint8_t serial_version = SERIAL_VERSION;
373 write(os, serial_version);
374 const uint8_t family = FAMILY;
375 write(os, family);
376 const bool raw_items = n_ <= req_constants::MIN_K;
377 const uint8_t flags_byte(
378 (is_empty() ? 1 << flags::IS_EMPTY : 0)
379 | (hra_ ? 1 << flags::IS_HIGH_RANK : 0)
380 | (raw_items ? 1 << flags::RAW_ITEMS : 0)
381 | (compactors_[0].is_sorted() ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0)
382 );
383 write(os, flags_byte);
384 write(os, k_);
385 const uint8_t num_levels = is_empty() ? 0 : get_num_levels();
386 write(os, num_levels);
387 const uint8_t num_raw_items = raw_items ? static_cast<uint8_t>(n_) : 0;
388 write(os, num_raw_items);
389 if (is_empty()) return;
390 if (is_estimation_mode()) {
391 write(os, n_);
392 sd.serialize(os, &*min_item_, 1);
393 sd.serialize(os, &*max_item_, 1);
394 }
395 if (raw_items) {
396 sd.serialize(os, compactors_[0].begin(), num_raw_items);
397 } else {
398 for (const auto& compactor: compactors_) compactor.serialize(os, sd);
399 }
400}
401
402template<typename T, typename C, typename A>
403template<typename SerDe>
404auto req_sketch<T, C, A>::serialize(unsigned header_size_bytes, const SerDe& sd) const -> vector_bytes {
405 const size_t size = header_size_bytes + get_serialized_size_bytes(sd);
406 vector_bytes bytes(size, 0, allocator_);
407 uint8_t* ptr = bytes.data() + header_size_bytes;
408 const uint8_t* end_ptr = ptr + size;
409
410 const uint8_t preamble_ints = is_estimation_mode() ? 4 : 2;
411 ptr += copy_to_mem(preamble_ints, ptr);
412 const uint8_t serial_version = SERIAL_VERSION;
413 ptr += copy_to_mem(serial_version, ptr);
414 const uint8_t family = FAMILY;
415 ptr += copy_to_mem(family, ptr);
416 const bool raw_items = n_ <= req_constants::MIN_K;
417 const uint8_t flags_byte(
418 (is_empty() ? 1 << flags::IS_EMPTY : 0)
419 | (hra_ ? 1 << flags::IS_HIGH_RANK : 0)
420 | (raw_items ? 1 << flags::RAW_ITEMS : 0)
421 | (compactors_[0].is_sorted() ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0)
422 );
423 ptr += copy_to_mem(flags_byte, ptr);
424 ptr += copy_to_mem(k_, ptr);
425 const uint8_t num_levels = is_empty() ? 0 : get_num_levels();
426 ptr += copy_to_mem(num_levels, ptr);
427 const uint8_t num_raw_items = raw_items ? static_cast<uint8_t>(n_) : 0;
428 ptr += copy_to_mem(num_raw_items, ptr);
429 if (!is_empty()) {
430 if (is_estimation_mode()) {
431 ptr += copy_to_mem(n_, ptr);
432 ptr += sd.serialize(ptr, end_ptr - ptr, &*min_item_, 1);
433 ptr += sd.serialize(ptr, end_ptr - ptr, &*max_item_, 1);
434 }
435 if (raw_items) {
436 ptr += sd.serialize(ptr, end_ptr - ptr, compactors_[0].begin(), num_raw_items);
437 } else {
438 for (const auto& compactor: compactors_) ptr += compactor.serialize(ptr, end_ptr - ptr, sd);
439 }
440 }
441 return bytes;
442}
443
444template<typename T, typename C, typename A>
445template<typename SerDe>
446req_sketch<T, C, A> req_sketch<T, C, A>::deserialize(std::istream& is, const SerDe& sd, const C& comparator, const A& allocator) {
447 const auto preamble_ints = read<uint8_t>(is);
448 const auto serial_version = read<uint8_t>(is);
449 const auto family_id = read<uint8_t>(is);
450 const auto flags_byte = read<uint8_t>(is);
451 const auto k = read<uint16_t>(is);
452 const auto num_levels = read<uint8_t>(is);
453 const auto num_raw_items = read<uint8_t>(is);
454
455 check_preamble_ints(preamble_ints, num_levels);
456 check_serial_version(serial_version);
457 check_family_id(family_id);
458
459 if (!is.good()) throw std::runtime_error("error reading from std::istream");
460 const bool is_empty = flags_byte & (1 << flags::IS_EMPTY);
461 const bool hra = flags_byte & (1 << flags::IS_HIGH_RANK);
462 if (is_empty) return req_sketch(k, hra, comparator, allocator);
463
464 optional<T> tmp; // space to deserialize min and max
465 optional<T> min_item;
466 optional<T> max_item;
467
468 const bool raw_items = flags_byte & (1 << flags::RAW_ITEMS);
469 const bool is_level_0_sorted = flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED);
470 std::vector<Compactor, AllocCompactor> compactors(allocator);
471
472 uint64_t n = 1;
473 if (num_levels > 1) {
474 n = read<uint64_t>(is);
475 sd.deserialize(is, &*tmp, 1);
476 // serde call did not throw, repackage and cleanup
477 min_item.emplace(*tmp);
478 (*tmp).~T();
479 sd.deserialize(is, &*tmp, 1);
480 // serde call did not throw, repackage and cleanup
481 max_item.emplace(*tmp);
482 (*tmp).~T();
483 }
484
485 if (raw_items) {
486 compactors.push_back(Compactor::deserialize(is, sd, comparator, allocator, is_level_0_sorted, k, num_raw_items, hra));
487 } else {
488 for (size_t i = 0; i < num_levels; ++i) {
489 compactors.push_back(Compactor::deserialize(is, sd, comparator, allocator, i == 0 ? is_level_0_sorted : true, hra));
490 }
491 }
492 if (num_levels == 1) {
493 const auto begin = compactors[0].begin();
494 const auto end = compactors[0].end();
495 n = compactors[0].get_num_items();
496 auto min_it = begin;
497 auto max_it = begin;
498 for (auto it = begin; it != end; ++it) {
499 if (comparator(*it, *min_it)) min_it = it;
500 if (comparator(*max_it, *it)) max_it = it;
501 }
502 min_item.emplace(*min_it);
503 max_item.emplace(*max_it);
504 }
505
506 if (!is.good()) throw std::runtime_error("error reading from std::istream");
507 return req_sketch(k, hra, n, std::move(min_item), std::move(max_item), std::move(compactors), comparator);
508}
509
510template<typename T, typename C, typename A>
511template<typename SerDe>
512req_sketch<T, C, A> req_sketch<T, C, A>::deserialize(const void* bytes, size_t size, const SerDe& sd, const C& comparator, const A& allocator) {
513 ensure_minimum_memory(size, 8);
514 const char* ptr = static_cast<const char*>(bytes);
515 const char* end_ptr = static_cast<const char*>(bytes) + size;
516
517 uint8_t preamble_ints;
518 ptr += copy_from_mem(ptr, preamble_ints);
519 uint8_t serial_version;
520 ptr += copy_from_mem(ptr, serial_version);
521 uint8_t family_id;
522 ptr += copy_from_mem(ptr, family_id);
523 uint8_t flags_byte;
524 ptr += copy_from_mem(ptr, flags_byte);
525 uint16_t k;
526 ptr += copy_from_mem(ptr, k);
527 uint8_t num_levels;
528 ptr += copy_from_mem(ptr, num_levels);
529 uint8_t num_raw_items;
530 ptr += copy_from_mem(ptr, num_raw_items);
531
532 check_preamble_ints(preamble_ints, num_levels);
533 check_serial_version(serial_version);
534 check_family_id(family_id);
535
536 const bool is_empty = flags_byte & (1 << flags::IS_EMPTY);
537 const bool hra = flags_byte & (1 << flags::IS_HIGH_RANK);
538 if (is_empty) return req_sketch(k, hra, comparator, allocator);
539
540 optional<T> tmp; // space to deserialize min and max
541 optional<T> min_item;
542 optional<T> max_item;
543
544 const bool raw_items = flags_byte & (1 << flags::RAW_ITEMS);
545 const bool is_level_0_sorted = flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED);
546 std::vector<Compactor, AllocCompactor> compactors(allocator);
547
548 uint64_t n = 1;
549 if (num_levels > 1) {
550 ensure_minimum_memory(end_ptr - ptr, sizeof(n));
551 ptr += copy_from_mem(ptr, n);
552 ptr += sd.deserialize(ptr, end_ptr - ptr, &*tmp, 1);
553 // serde call did not throw, repackage and cleanup
554 min_item.emplace(*tmp);
555 (*tmp).~T();
556 ptr += sd.deserialize(ptr, end_ptr - ptr, &*tmp, 1);
557 // serde call did not throw, repackage and cleanup
558 max_item.emplace(*tmp);
559 (*tmp).~T();
560 }
561
562 if (raw_items) {
563 auto pair = Compactor::deserialize(ptr, end_ptr - ptr, sd, comparator, allocator, is_level_0_sorted, k, num_raw_items, hra);
564 compactors.push_back(std::move(pair.first));
565 ptr += pair.second;
566 } else {
567 for (size_t i = 0; i < num_levels; ++i) {
568 auto pair = Compactor::deserialize(ptr, end_ptr - ptr, sd, comparator, allocator, i == 0 ? is_level_0_sorted : true, hra);
569 compactors.push_back(std::move(pair.first));
570 ptr += pair.second;
571 }
572 }
573 if (num_levels == 1) {
574 const auto begin = compactors[0].begin();
575 const auto end = compactors[0].end();
576 n = compactors[0].get_num_items();
577 auto min_it = begin;
578 auto max_it = begin;
579 for (auto it = begin; it != end; ++it) {
580 if (comparator(*it, *min_it)) min_it = it;
581 if (comparator(*max_it, *it)) max_it = it;
582 }
583 min_item.emplace(*min_it);
584 max_item.emplace(*max_it);
585 }
586
587 return req_sketch(k, hra, n, std::move(min_item), std::move(max_item), std::move(compactors), comparator);
588}
589
590template<typename T, typename C, typename A>
591void req_sketch<T, C, A>::grow() {
592 const uint8_t lg_weight = get_num_levels();
593 compactors_.push_back(Compactor(hra_, lg_weight, k_, comparator_, allocator_));
594 update_max_nom_size();
595}
596
597template<typename T, typename C, typename A>
598uint8_t req_sketch<T, C, A>::get_num_levels() const {
599 return static_cast<uint8_t>(compactors_.size());
600}
601
602template<typename T, typename C, typename A>
603void req_sketch<T, C, A>::update_max_nom_size() {
604 max_nom_size_ = 0;
605 for (const auto& compactor: compactors_) max_nom_size_ += compactor.get_nom_capacity();
606}
607
608template<typename T, typename C, typename A>
609void req_sketch<T, C, A>::update_num_retained() {
610 num_retained_ = 0;
611 for (const auto& compactor: compactors_) num_retained_ += compactor.get_num_items();
612}
613
614template<typename T, typename C, typename A>
615void req_sketch<T, C, A>::compress() {
616 for (size_t h = 0; h < compactors_.size(); ++h) {
617 if (compactors_[h].get_num_items() >= compactors_[h].get_nom_capacity()) {
618 if (h == 0) compactors_[0].sort();
619 if (h + 1 >= get_num_levels()) { // at the top?
620 grow(); // add a level, increases max_nom_size
621 }
622 auto pair = compactors_[h].compact(compactors_[h + 1]);
623 num_retained_ -= pair.first;
624 max_nom_size_ += pair.second;
625 if (LAZY_COMPRESSION && num_retained_ < max_nom_size_) break;
626 }
627 }
628}
629
630template<typename T, typename C, typename A>
631string<A> req_sketch<T, C, A>::to_string(bool print_levels, bool print_items) const {
632 // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements.
633 // The stream does not support passing an allocator instance, and alternatives are complicated.
634 std::ostringstream os;
635 os << "### REQ sketch summary:" << std::endl;
636 os << " K : " << k_ << std::endl;
637 os << " High Rank Acc : " << (hra_ ? "true" : "false") << std::endl;
638 os << " Empty : " << (is_empty() ? "true" : "false") << std::endl;
639 os << " Estimation mode: " << (is_estimation_mode() ? "true" : "false") << std::endl;
640 os << " Sorted : " << (compactors_[0].is_sorted() ? "true" : "false") << std::endl;
641 os << " N : " << n_ << std::endl;
642 os << " Levels : " << compactors_.size() << std::endl;
643 os << " Retained items : " << num_retained_ << std::endl;
644 os << " Capacity items : " << max_nom_size_ << std::endl;
645 if (!is_empty()) {
646 os << " Min item : " << *min_item_ << std::endl;
647 os << " Max item : " << *max_item_ << std::endl;
648 }
649 os << "### End sketch summary" << std::endl;
650
651 if (print_levels) {
652 os << "### REQ sketch levels:" << std::endl;
653 os << " index: nominal capacity, actual size" << std::endl;
654 for (uint8_t i = 0; i < compactors_.size(); i++) {
655 os << " " << (unsigned int) i << ": "
656 << compactors_[i].get_nom_capacity() << ", "
657 << compactors_[i].get_num_items() << std::endl;
658 }
659 os << "### End sketch levels" << std::endl;
660 }
661
662 if (print_items) {
663 os << "### REQ sketch data:" << std::endl;
664 unsigned level = 0;
665 for (const auto& compactor: compactors_) {
666 os << " level " << level << ": " << std::endl;
667 for (auto it = compactor.begin(); it != compactor.end(); ++it) {
668 os << " " << *it << std::endl;
669 }
670 ++level;
671 }
672 os << "### End sketch data" << std::endl;
673 }
674 return string<A>(os.str().c_str(), allocator_);
675}
676
677template<typename T, typename C, typename A>
678req_sketch<T, C, A>::req_sketch(uint16_t k, bool hra, uint64_t n,
679 optional<T>&& min_item, optional<T>&& max_item,
680 std::vector<Compactor, AllocCompactor>&& compactors, const C& comparator):
681comparator_(comparator),
682allocator_(compactors.get_allocator()),
683k_(k),
684hra_(hra),
685max_nom_size_(0),
686num_retained_(0),
687n_(n),
688compactors_(std::move(compactors)),
689min_item_(std::move(min_item)),
690max_item_(std::move(max_item)),
691sorted_view_(nullptr)
692{
693 update_max_nom_size();
694 update_num_retained();
695}
696
697template<typename T, typename C, typename A>
698void req_sketch<T, C, A>::check_preamble_ints(uint8_t preamble_ints, uint8_t num_levels) {
699 const uint8_t expected_preamble_ints = num_levels > 1 ? 4 : 2;
700 if (preamble_ints != expected_preamble_ints) {
701 throw std::invalid_argument("Possible corruption: preamble ints must be "
702 + std::to_string(expected_preamble_ints) + ", got " + std::to_string(preamble_ints));
703 }
704}
705
706template<typename T, typename C, typename A>
707void req_sketch<T, C, A>::check_serial_version(uint8_t serial_version) {
708 if (serial_version != SERIAL_VERSION) {
709 throw std::invalid_argument("Possible corruption: serial version mismatch: expected "
710 + std::to_string(SERIAL_VERSION)
711 + ", got " + std::to_string(serial_version));
712 }
713}
714
715template<typename T, typename C, typename A>
716void req_sketch<T, C, A>::check_family_id(uint8_t family_id) {
717 if (family_id != FAMILY) {
718 throw std::invalid_argument("Possible corruption: family mismatch: expected "
719 + std::to_string(FAMILY) + ", got " + std::to_string(family_id));
720 }
721}
722
723template<typename T, typename C, typename A>
724auto req_sketch<T, C, A>::begin() const -> const_iterator {
725 return const_iterator(compactors_.begin(), compactors_.end());
726}
727
728template<typename T, typename C, typename A>
729auto req_sketch<T, C, A>::end() const -> const_iterator {
730 return const_iterator(compactors_.end(), compactors_.end());
731}
732
733template<typename T, typename C, typename A>
735 if (sorted_view_ == nullptr) {
736 using AllocSortedView = typename std::allocator_traits<A>::template rebind_alloc<quantiles_sorted_view<T, C, A>>;
737 sorted_view_ = new (AllocSortedView(allocator_).allocate(1)) quantiles_sorted_view<T, C, A>(get_sorted_view());
738 }
739}
740
741template<typename T, typename C, typename A>
742void req_sketch<T, C, A>::reset_sorted_view() {
743 if (sorted_view_ != nullptr) {
744 sorted_view_->~quantiles_sorted_view();
745 using AllocSortedView = typename std::allocator_traits<A>::template rebind_alloc<quantiles_sorted_view<T, C, A>>;
746 AllocSortedView(allocator_).deallocate(sorted_view_, 1);
747 sorted_view_ = nullptr;
748 }
749}
750
751// iterator
752
753template<typename T, typename C, typename A>
754req_sketch<T, C, A>::const_iterator::const_iterator(LevelsIterator begin, LevelsIterator end):
755 levels_it_(begin),
756 levels_end_(end),
757 compactor_it_(begin == end ? nullptr : (*levels_it_).begin())
758{}
759
760template<typename T, typename C, typename A>
761auto req_sketch<T, C, A>::const_iterator::operator++() -> const_iterator& {
762 ++compactor_it_;
763 if (compactor_it_ == (*levels_it_).end()) {
764 ++levels_it_;
765 if (levels_it_ != levels_end_) compactor_it_ = (*levels_it_).begin();
766 }
767 return *this;
768}
769
770template<typename T, typename C, typename A>
771auto req_sketch<T, C, A>::const_iterator::operator++(int) -> const_iterator& {
772 const_iterator tmp(*this);
773 operator++();
774 return tmp;
775}
776
777template<typename T, typename C, typename A>
778bool req_sketch<T, C, A>::const_iterator::operator==(const const_iterator& other) const {
779 if (levels_it_ != other.levels_it_) return false;
780 if (levels_it_ == levels_end_) return true;
781 return compactor_it_ == other.compactor_it_;
782}
783
784template<typename T, typename C, typename A>
785bool req_sketch<T, C, A>::const_iterator::operator!=(const const_iterator& other) const {
786 return !operator==(other);
787}
788
789template<typename T, typename C, typename A>
790auto req_sketch<T, C, A>::const_iterator::operator*() const -> reference {
791 return value_type(*compactor_it_, 1ULL << (*levels_it_).get_lg_weight());
792}
793
794template<typename T, typename C, typename A>
795auto req_sketch<T, C, A>::const_iterator::operator->() const -> pointer {
796 return **this;
797}
798
799} /* namespace datasketches */
800
801#endif
Sorted view for quantiles sketches (REQ, KLL and Quantiles)
Definition quantiles_sorted_view.hpp:38
Relative Error Quantiles Sketch.
Definition req_sketch.hpp:79
Comparator get_comparator() const
Returns an instance of the comparator for this sketch.
Definition req_sketch_impl.hpp:224
quantiles_sorted_view< T, Comparator, Allocator > get_sorted_view() const
Gets the sorted view of this sketch.
Definition req_sketch_impl.hpp:269
static req_sketch deserialize(std::istream &is, const SerDe &sd=SerDe(), const Comparator &comparator=Comparator(), const Allocator &allocator=Allocator())
This method deserializes a sketch from a given stream.
vector_double get_CDF(const T *split_points, uint32_t size, bool inclusive=true) const
Returns an approximation to the Cumulative Distribution Function (CDF), which is the cumulative analo...
Definition req_sketch_impl.hpp:251
uint32_t get_num_retained() const
Returns the number of retained items in the sketch.
Definition req_sketch_impl.hpp:159
vector_double get_PMF(const T *split_points, uint32_t size, bool inclusive=true) const
Returns an approximation to the Probability Mass Function (PMF) of the input stream given a set of sp...
Definition req_sketch_impl.hpp:244
void merge(FwdSk &&other)
Merges another sketch into this one.
Definition req_sketch_impl.hpp:188
typename quantiles_sorted_view< T, Comparator, Allocator >::quantile_return_type quantile_return_type
Quantile return type.
Definition req_sketch.hpp:92
void update(FwdT &&item)
Updates this sketch with the given data item.
Definition req_sketch_impl.hpp:170
void serialize(std::ostream &os, const SerDe &sd=SerDe()) const
This method serializes the sketch into a given stream in a binary form.
Definition req_sketch_impl.hpp:369
string< Allocator > to_string(bool print_levels=false, bool print_items=false) const
Prints a summary of the sketch.
Definition req_sketch_impl.hpp:631
uint16_t get_k() const
Returns configured parameter K.
Definition req_sketch_impl.hpp:139
bool is_empty() const
Returns true if this sketch is empty.
Definition req_sketch_impl.hpp:149
const T & get_max_item() const
Returns the max item of the stream.
Definition req_sketch_impl.hpp:218
double get_rank_upper_bound(double rank, uint8_t num_std_dev) const
Returns an approximate upper bound of the given normalized rank.
Definition req_sketch_impl.hpp:289
double get_rank(const T &item, bool inclusive=true) const
Returns an approximation to the normalized rank of the given item from 0 to 1 inclusive.
Definition req_sketch_impl.hpp:234
static double get_RSE(uint16_t k, double rank, bool hra, uint64_t n)
Returns an a priori estimate of relative standard error (RSE, expressed as a number in [0,...
Definition req_sketch_impl.hpp:294
double get_rank_lower_bound(double rank, uint8_t num_std_dev) const
Returns an approximate lower bound of the given normalized rank.
Definition req_sketch_impl.hpp:284
Allocator get_allocator() const
Returns an instance of the allocator for this sketch.
Definition req_sketch_impl.hpp:229
req_sketch & operator=(const req_sketch &other)
Copy assignment.
Definition req_sketch_impl.hpp:81
quantile_return_type get_quantile(double rank, bool inclusive=true) const
Returns an approximate quantile of the given normalized rank.
Definition req_sketch_impl.hpp:258
const_iterator begin() const
Iterator pointing to the first item in the sketch.
Definition req_sketch_impl.hpp:724
size_t get_serialized_size_bytes(const SerDe &sd=SerDe()) const
Computes size needed to serialize the current state of the sketch.
Definition req_sketch_impl.hpp:334
const_iterator end() const
Iterator pointing to the past-the-end item in the sketch.
Definition req_sketch_impl.hpp:729
bool is_estimation_mode() const
Returns true if this sketch is in estimation mode.
Definition req_sketch_impl.hpp:164
const T & get_min_item() const
Returns the min item of the stream.
Definition req_sketch_impl.hpp:212
bool is_HRA() const
Returns configured parameter High Rank Accuracy.
Definition req_sketch_impl.hpp:144
uint64_t get_n() const
Returns the length of the input stream.
Definition req_sketch_impl.hpp:154
const uint16_t MIN_K
min value of parameter K
Definition kll_sketch.hpp:39
const uint8_t INIT_NUM_SECTIONS
initial number of sections
Definition req_common.hpp:35
const uint16_t MIN_K
minimum value of parameter K
Definition req_common.hpp:33
DataSketches namespace.
Definition binomial_bounds.hpp:38