datasketches-cpp
req_sketch_impl.hpp
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef REQ_SKETCH_IMPL_HPP_
21 #define REQ_SKETCH_IMPL_HPP_
22 
23 #include <sstream>
24 #include <stdexcept>
25 
26 namespace datasketches {
27 
28 template<typename T, typename C, typename A>
29 req_sketch<T, C, A>::req_sketch(uint16_t k, bool hra, const C& comparator, const A& allocator):
30 comparator_(comparator),
31 allocator_(allocator),
32 k_(std::max<uint8_t>(static_cast<int>(k) & -2, static_cast<int>(req_constants::MIN_K))), //rounds down one if odd
33 hra_(hra),
34 max_nom_size_(0),
35 num_retained_(0),
36 n_(0),
37 compactors_(allocator),
38 min_item_(),
39 max_item_(),
40 sorted_view_(nullptr)
41 {
42  grow();
43 }
44 
45 template<typename T, typename C, typename A>
46 req_sketch<T, C, A>::~req_sketch() {
47  reset_sorted_view();
48 }
49 
50 template<typename T, typename C, typename A>
52 comparator_(other.comparator_),
53 allocator_(other.allocator_),
54 k_(other.k_),
55 hra_(other.hra_),
56 max_nom_size_(other.max_nom_size_),
57 num_retained_(other.num_retained_),
58 n_(other.n_),
59 compactors_(other.compactors_),
60 min_item_(other.min_item_),
61 max_item_(other.max_item_),
62 sorted_view_(nullptr)
63 {}
64 
65 template<typename T, typename C, typename A>
67 comparator_(std::move(other.comparator_)),
68 allocator_(std::move(other.allocator_)),
69 k_(other.k_),
70 hra_(other.hra_),
71 max_nom_size_(other.max_nom_size_),
72 num_retained_(other.num_retained_),
73 n_(other.n_),
74 compactors_(std::move(other.compactors_)),
75 min_item_(std::move(other.min_item_)),
76 max_item_(std::move(other.max_item_)),
77 sorted_view_(nullptr)
78 {}
79 
80 template<typename T, typename C, typename A>
82  req_sketch copy(other);
83  std::swap(comparator_, copy.comparator_);
84  std::swap(allocator_, copy.allocator_);
85  std::swap(k_, copy.k_);
86  std::swap(hra_, copy.hra_);
87  std::swap(max_nom_size_, copy.max_nom_size_);
88  std::swap(num_retained_, copy.num_retained_);
89  std::swap(n_, copy.n_);
90  std::swap(compactors_, copy.compactors_);
91  std::swap(min_item_, copy.min_item_);
92  std::swap(max_item_, copy.max_item_);
93  reset_sorted_view();
94  return *this;
95 }
96 
97 template<typename T, typename C, typename A>
99  std::swap(comparator_, other.comparator_);
100  std::swap(allocator_, other.allocator_);
101  std::swap(k_, other.k_);
102  std::swap(hra_, other.hra_);
103  std::swap(max_nom_size_, other.max_nom_size_);
104  std::swap(num_retained_, other.num_retained_);
105  std::swap(n_, other.n_);
106  std::swap(compactors_, other.compactors_);
107  std::swap(min_item_, other.min_item_);
108  std::swap(max_item_, other.max_item_);
109  reset_sorted_view();
110  return *this;
111 }
112 
113 template<typename T, typename C, typename A>
114 template<typename TT, typename CC, typename AA>
115 req_sketch<T, C, A>::req_sketch(const req_sketch<TT, CC, AA>& other, const C& comparator, const A& allocator):
116 comparator_(comparator),
117 allocator_(allocator),
118 k_(other.k_),
119 hra_(other.hra_),
120 max_nom_size_(other.max_nom_size_),
121 num_retained_(other.num_retained_),
122 n_(other.n_),
123 compactors_(allocator),
124 min_item_(other.min_item_),
125 max_item_(other.max_item_),
126 sorted_view_(nullptr)
127 {
128  static_assert(
129  std::is_constructible<T, TT>::value,
130  "Type converting constructor requires new type to be constructible from existing type"
131  );
132  compactors_.reserve(other.compactors_.size());
133  for (const auto& compactor: other.compactors_) {
134  compactors_.push_back(req_compactor<T, C, A>(compactor, comparator_, allocator_));
135  }
136 }
137 
138 template<typename T, typename C, typename A>
139 uint16_t req_sketch<T, C, A>::get_k() const {
140  return k_;
141 }
142 
143 template<typename T, typename C, typename A>
145  return hra_;
146 }
147 
148 template<typename T, typename C, typename A>
150  return n_ == 0;
151 }
152 
153 template<typename T, typename C, typename A>
154 uint64_t req_sketch<T, C, A>::get_n() const {
155  return n_;
156 }
157 
158 template<typename T, typename C, typename A>
160  return num_retained_;
161 }
162 
163 template<typename T, typename C, typename A>
165  return compactors_.size() > 1;
166 }
167 
168 template<typename T, typename C, typename A>
169 template<typename FwdT>
170 void req_sketch<T, C, A>::update(FwdT&& item) {
171  if (!check_update_item(item)) { return; }
172  if (is_empty()) {
173  min_item_.emplace(item);
174  max_item_.emplace(item);
175  } else {
176  if (comparator_(item, *min_item_)) *min_item_ = item;
177  if (comparator_(*max_item_, item)) *max_item_ = item;
178  }
179  compactors_[0].append(std::forward<FwdT>(item));
180  ++num_retained_;
181  ++n_;
182  if (num_retained_ == max_nom_size_) compress();
183  reset_sorted_view();
184 }
185 
186 template<typename T, typename C, typename A>
187 template<typename FwdSk>
188 void req_sketch<T, C, A>::merge(FwdSk&& other) {
189  if (is_HRA() != other.is_HRA()) throw std::invalid_argument("merging HRA and LRA is not valid");
190  if (other.is_empty()) return;
191  if (is_empty()) {
192  min_item_.emplace(conditional_forward<FwdSk>(*other.min_item_));
193  max_item_.emplace(conditional_forward<FwdSk>(*other.max_item_));
194  } else {
195  if (comparator_(*other.min_item_, *min_item_)) *min_item_ = conditional_forward<FwdSk>(*other.min_item_);
196  if (comparator_(*max_item_, *other.max_item_)) *max_item_ = conditional_forward<FwdSk>(*other.max_item_);
197  }
198  // grow until this has at least as many compactors as other
199  while (get_num_levels() < other.get_num_levels()) grow();
200  // merge the items in all height compactors
201  for (size_t i = 0; i < other.get_num_levels(); ++i) {
202  compactors_[i].merge(conditional_forward<FwdSk>(other.compactors_[i]));
203  }
204  n_ += other.n_;
205  update_max_nom_size();
206  update_num_retained();
207  if (num_retained_ >= max_nom_size_) compress();
208  reset_sorted_view();
209 }
210 
211 template<typename T, typename C, typename A>
213  if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
214  return *min_item_;
215 }
216 
217 template<typename T, typename C, typename A>
219  if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
220  return *max_item_;
221 }
222 
223 template<typename T, typename C, typename A>
225  return comparator_;
226 }
227 
228 template<typename T, typename C, typename A>
230  return allocator_;
231 }
232 
233 template<typename T, typename C, typename A>
234 double req_sketch<T, C, A>::get_rank(const T& item, bool inclusive) const {
235  if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
236  uint64_t weight = 0;
237  for (const auto& compactor: compactors_) {
238  weight += compactor.compute_weight(item, inclusive);
239  }
240  return static_cast<double>(weight) / n_;
241 }
242 
243 template<typename T, typename C, typename A>
244 auto req_sketch<T, C, A>::get_PMF(const T* split_points, uint32_t size, bool inclusive) const -> vector_double {
245  if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
246  setup_sorted_view();
247  return sorted_view_->get_PMF(split_points, size, inclusive);
248 }
249 
250 template<typename T, typename C, typename A>
251 auto req_sketch<T, C, A>::get_CDF(const T* split_points, uint32_t size, bool inclusive) const -> vector_double {
252  if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
253  setup_sorted_view();
254  return sorted_view_->get_CDF(split_points, size, inclusive);
255 }
256 
257 template<typename T, typename C, typename A>
258 auto req_sketch<T, C, A>::get_quantile(double rank, bool inclusive) const -> quantile_return_type {
259  if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
260  if ((rank < 0.0) || (rank > 1.0)) {
261  throw std::invalid_argument("Normalized rank cannot be less than 0 or greater than 1");
262  }
263  // possible side-effect of sorting level zero
264  setup_sorted_view();
265  return sorted_view_->get_quantile(rank, inclusive);
266 }
267 
268 template<typename T, typename C, typename A>
270  if (!compactors_[0].is_sorted()) {
271  const_cast<Compactor&>(compactors_[0]).sort(); // allow this side effect
272  }
273  quantiles_sorted_view<T, C, A> view(get_num_retained(), comparator_, allocator_);
274 
275  for (auto& compactor: compactors_) {
276  view.add(compactor.begin(), compactor.end(), 1ULL << compactor.get_lg_weight());
277  }
278 
279  view.convert_to_cummulative();
280  return view;
281 }
282 
283 template<typename T, typename C, typename A>
284 double req_sketch<T, C, A>::get_rank_lower_bound(double rank, uint8_t num_std_dev) const {
285  return get_rank_lb(get_k(), get_num_levels(), rank, num_std_dev, get_n(), hra_);
286 }
287 
288 template<typename T, typename C, typename A>
289 double req_sketch<T, C, A>::get_rank_upper_bound(double rank, uint8_t num_std_dev) const {
290  return get_rank_ub(get_k(), get_num_levels(), rank, num_std_dev, get_n(), hra_);
291 }
292 
293 template<typename T, typename C, typename A>
294 double req_sketch<T, C, A>::get_RSE(uint16_t k, double rank, bool hra, uint64_t n) {
295  return get_rank_lb(k, 2, rank, 1, n, hra);
296 }
297 
298 template<typename T, typename C, typename A>
299 double req_sketch<T, C, A>::get_rank_lb(uint16_t k, uint8_t num_levels, double rank, uint8_t num_std_dev, uint64_t n, bool hra) {
300  if (is_exact_rank(k, num_levels, rank, n, hra)) return rank;
301  const double relative = relative_rse_factor() / k * (hra ? 1.0 - rank : rank);
302  const double fixed = FIXED_RSE_FACTOR / k;
303  const double lb_rel = rank - num_std_dev * relative;
304  const double lb_fix = rank - num_std_dev * fixed;
305  return std::max(lb_rel, lb_fix);
306 }
307 
308 template<typename T, typename C, typename A>
309 double req_sketch<T, C, A>::get_rank_ub(uint16_t k, uint8_t num_levels, double rank, uint8_t num_std_dev, uint64_t n, bool hra) {
310  if (is_exact_rank(k, num_levels, rank, n, hra)) return rank;
311  const double relative = relative_rse_factor() / k * (hra ? 1.0 - rank : rank);
312  const double fixed = FIXED_RSE_FACTOR / k;
313  const double ub_rel = rank + num_std_dev * relative;
314  const double ub_fix = rank + num_std_dev * fixed;
315  return std::min(ub_rel, ub_fix);
316 }
317 
318 template<typename T, typename C, typename A>
319 bool req_sketch<T, C, A>::is_exact_rank(uint16_t k, uint8_t num_levels, double rank, uint64_t n, bool hra) {
320  const unsigned base_cap = k * req_constants::INIT_NUM_SECTIONS;
321  if (num_levels == 1 || n <= base_cap) return true;
322  const double exact_rank_thresh = static_cast<double>(base_cap) / n;
323  return (hra && rank >= 1.0 - exact_rank_thresh) || (!hra && rank <= exact_rank_thresh);
324 }
325 
326 template<typename T, typename C, typename A>
327 double req_sketch<T, C, A>::relative_rse_factor() {
328  return sqrt(0.0512 / req_constants::INIT_NUM_SECTIONS);
329 }
330 
331 // implementation for fixed-size arithmetic types (integral and floating point)
332 template<typename T, typename C, typename A>
333 template<typename TT, typename SerDe, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
334 size_t req_sketch<T, C, A>::get_serialized_size_bytes(const SerDe& sd) const {
335  size_t size = PREAMBLE_SIZE_BYTES;
336  if (is_empty()) return size;
337  if (is_estimation_mode()) {
338  size += sizeof(n_) + sizeof(TT) * 2; // min and max
339  }
340  if (n_ == 1) {
341  size += sizeof(TT);
342  } else {
343  for (const auto& compactor: compactors_) size += compactor.get_serialized_size_bytes(sd);
344  }
345  return size;
346 }
347 
348 // implementation for all other types
349 template<typename T, typename C, typename A>
350 template<typename TT, typename SerDe, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
351 size_t req_sketch<T, C, A>::get_serialized_size_bytes(const SerDe& sd) const {
352  size_t size = PREAMBLE_SIZE_BYTES;
353  if (is_empty()) return size;
354  if (is_estimation_mode()) {
355  size += sizeof(n_);
356  size += sd.size_of_item(*min_item_);
357  size += sd.size_of_item(*max_item_);
358  }
359  if (n_ == 1) {
360  size += sd.size_of_item(*compactors_[0].begin());
361  } else {
362  for (const auto& compactor: compactors_) size += compactor.get_serialized_size_bytes(sd);
363  }
364  return size;
365 }
366 
367 template<typename T, typename C, typename A>
368 template<typename SerDe>
369 void req_sketch<T, C, A>::serialize(std::ostream& os, const SerDe& sd) const {
370  const uint8_t preamble_ints = is_estimation_mode() ? 4 : 2;
371  write(os, preamble_ints);
372  const uint8_t serial_version = SERIAL_VERSION;
373  write(os, serial_version);
374  const uint8_t family = FAMILY;
375  write(os, family);
376  const bool raw_items = n_ <= req_constants::MIN_K;
377  const uint8_t flags_byte(
378  (is_empty() ? 1 << flags::IS_EMPTY : 0)
379  | (hra_ ? 1 << flags::IS_HIGH_RANK : 0)
380  | (raw_items ? 1 << flags::RAW_ITEMS : 0)
381  | (compactors_[0].is_sorted() ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0)
382  );
383  write(os, flags_byte);
384  write(os, k_);
385  const uint8_t num_levels = is_empty() ? 0 : get_num_levels();
386  write(os, num_levels);
387  const uint8_t num_raw_items = raw_items ? static_cast<uint8_t>(n_) : 0;
388  write(os, num_raw_items);
389  if (is_empty()) return;
390  if (is_estimation_mode()) {
391  write(os, n_);
392  sd.serialize(os, &*min_item_, 1);
393  sd.serialize(os, &*max_item_, 1);
394  }
395  if (raw_items) {
396  sd.serialize(os, compactors_[0].begin(), num_raw_items);
397  } else {
398  for (const auto& compactor: compactors_) compactor.serialize(os, sd);
399  }
400 }
401 
402 template<typename T, typename C, typename A>
403 template<typename SerDe>
404 auto req_sketch<T, C, A>::serialize(unsigned header_size_bytes, const SerDe& sd) const -> vector_bytes {
405  const size_t size = header_size_bytes + get_serialized_size_bytes(sd);
406  vector_bytes bytes(size, 0, allocator_);
407  uint8_t* ptr = bytes.data() + header_size_bytes;
408  const uint8_t* end_ptr = ptr + size;
409 
410  const uint8_t preamble_ints = is_estimation_mode() ? 4 : 2;
411  ptr += copy_to_mem(preamble_ints, ptr);
412  const uint8_t serial_version = SERIAL_VERSION;
413  ptr += copy_to_mem(serial_version, ptr);
414  const uint8_t family = FAMILY;
415  ptr += copy_to_mem(family, ptr);
416  const bool raw_items = n_ <= req_constants::MIN_K;
417  const uint8_t flags_byte(
418  (is_empty() ? 1 << flags::IS_EMPTY : 0)
419  | (hra_ ? 1 << flags::IS_HIGH_RANK : 0)
420  | (raw_items ? 1 << flags::RAW_ITEMS : 0)
421  | (compactors_[0].is_sorted() ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0)
422  );
423  ptr += copy_to_mem(flags_byte, ptr);
424  ptr += copy_to_mem(k_, ptr);
425  const uint8_t num_levels = is_empty() ? 0 : get_num_levels();
426  ptr += copy_to_mem(num_levels, ptr);
427  const uint8_t num_raw_items = raw_items ? static_cast<uint8_t>(n_) : 0;
428  ptr += copy_to_mem(num_raw_items, ptr);
429  if (!is_empty()) {
430  if (is_estimation_mode()) {
431  ptr += copy_to_mem(n_, ptr);
432  ptr += sd.serialize(ptr, end_ptr - ptr, &*min_item_, 1);
433  ptr += sd.serialize(ptr, end_ptr - ptr, &*max_item_, 1);
434  }
435  if (raw_items) {
436  ptr += sd.serialize(ptr, end_ptr - ptr, compactors_[0].begin(), num_raw_items);
437  } else {
438  for (const auto& compactor: compactors_) ptr += compactor.serialize(ptr, end_ptr - ptr, sd);
439  }
440  }
441  return bytes;
442 }
443 
444 template<typename T, typename C, typename A>
445 template<typename SerDe>
446 req_sketch<T, C, A> req_sketch<T, C, A>::deserialize(std::istream& is, const SerDe& sd, const C& comparator, const A& allocator) {
447  const auto preamble_ints = read<uint8_t>(is);
448  const auto serial_version = read<uint8_t>(is);
449  const auto family_id = read<uint8_t>(is);
450  const auto flags_byte = read<uint8_t>(is);
451  const auto k = read<uint16_t>(is);
452  const auto num_levels = read<uint8_t>(is);
453  const auto num_raw_items = read<uint8_t>(is);
454 
455  check_preamble_ints(preamble_ints, num_levels);
456  check_serial_version(serial_version);
457  check_family_id(family_id);
458 
459  if (!is.good()) throw std::runtime_error("error reading from std::istream");
460  const bool is_empty = flags_byte & (1 << flags::IS_EMPTY);
461  const bool hra = flags_byte & (1 << flags::IS_HIGH_RANK);
462  if (is_empty) return req_sketch(k, hra, comparator, allocator);
463 
464  optional<T> tmp; // space to deserialize min and max
465  optional<T> min_item;
466  optional<T> max_item;
467 
468  const bool raw_items = flags_byte & (1 << flags::RAW_ITEMS);
469  const bool is_level_0_sorted = flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED);
470  std::vector<Compactor, AllocCompactor> compactors(allocator);
471 
472  uint64_t n = 1;
473  if (num_levels > 1) {
474  n = read<uint64_t>(is);
475  sd.deserialize(is, &*tmp, 1);
476  // serde call did not throw, repackage and cleanup
477  min_item.emplace(*tmp);
478  (*tmp).~T();
479  sd.deserialize(is, &*tmp, 1);
480  // serde call did not throw, repackage and cleanup
481  max_item.emplace(*tmp);
482  (*tmp).~T();
483  }
484 
485  if (raw_items) {
486  compactors.push_back(Compactor::deserialize(is, sd, comparator, allocator, is_level_0_sorted, k, num_raw_items, hra));
487  } else {
488  for (size_t i = 0; i < num_levels; ++i) {
489  compactors.push_back(Compactor::deserialize(is, sd, comparator, allocator, i == 0 ? is_level_0_sorted : true, hra));
490  }
491  }
492  if (num_levels == 1) {
493  const auto begin = compactors[0].begin();
494  const auto end = compactors[0].end();
495  n = compactors[0].get_num_items();
496  auto min_it = begin;
497  auto max_it = begin;
498  for (auto it = begin; it != end; ++it) {
499  if (comparator(*it, *min_it)) min_it = it;
500  if (comparator(*max_it, *it)) max_it = it;
501  }
502  min_item.emplace(*min_it);
503  max_item.emplace(*max_it);
504  }
505 
506  if (!is.good()) throw std::runtime_error("error reading from std::istream");
507  return req_sketch(k, hra, n, std::move(min_item), std::move(max_item), std::move(compactors), comparator);
508 }
509 
510 template<typename T, typename C, typename A>
511 template<typename SerDe>
512 req_sketch<T, C, A> req_sketch<T, C, A>::deserialize(const void* bytes, size_t size, const SerDe& sd, const C& comparator, const A& allocator) {
513  ensure_minimum_memory(size, 8);
514  const char* ptr = static_cast<const char*>(bytes);
515  const char* end_ptr = static_cast<const char*>(bytes) + size;
516 
517  uint8_t preamble_ints;
518  ptr += copy_from_mem(ptr, preamble_ints);
519  uint8_t serial_version;
520  ptr += copy_from_mem(ptr, serial_version);
521  uint8_t family_id;
522  ptr += copy_from_mem(ptr, family_id);
523  uint8_t flags_byte;
524  ptr += copy_from_mem(ptr, flags_byte);
525  uint16_t k;
526  ptr += copy_from_mem(ptr, k);
527  uint8_t num_levels;
528  ptr += copy_from_mem(ptr, num_levels);
529  uint8_t num_raw_items;
530  ptr += copy_from_mem(ptr, num_raw_items);
531 
532  check_preamble_ints(preamble_ints, num_levels);
533  check_serial_version(serial_version);
534  check_family_id(family_id);
535 
536  const bool is_empty = flags_byte & (1 << flags::IS_EMPTY);
537  const bool hra = flags_byte & (1 << flags::IS_HIGH_RANK);
538  if (is_empty) return req_sketch(k, hra, comparator, allocator);
539 
540  optional<T> tmp; // space to deserialize min and max
541  optional<T> min_item;
542  optional<T> max_item;
543 
544  const bool raw_items = flags_byte & (1 << flags::RAW_ITEMS);
545  const bool is_level_0_sorted = flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED);
546  std::vector<Compactor, AllocCompactor> compactors(allocator);
547 
548  uint64_t n = 1;
549  if (num_levels > 1) {
550  ensure_minimum_memory(end_ptr - ptr, sizeof(n));
551  ptr += copy_from_mem(ptr, n);
552  ptr += sd.deserialize(ptr, end_ptr - ptr, &*tmp, 1);
553  // serde call did not throw, repackage and cleanup
554  min_item.emplace(*tmp);
555  (*tmp).~T();
556  ptr += sd.deserialize(ptr, end_ptr - ptr, &*tmp, 1);
557  // serde call did not throw, repackage and cleanup
558  max_item.emplace(*tmp);
559  (*tmp).~T();
560  }
561 
562  if (raw_items) {
563  auto pair = Compactor::deserialize(ptr, end_ptr - ptr, sd, comparator, allocator, is_level_0_sorted, k, num_raw_items, hra);
564  compactors.push_back(std::move(pair.first));
565  ptr += pair.second;
566  } else {
567  for (size_t i = 0; i < num_levels; ++i) {
568  auto pair = Compactor::deserialize(ptr, end_ptr - ptr, sd, comparator, allocator, i == 0 ? is_level_0_sorted : true, hra);
569  compactors.push_back(std::move(pair.first));
570  ptr += pair.second;
571  }
572  }
573  if (num_levels == 1) {
574  const auto begin = compactors[0].begin();
575  const auto end = compactors[0].end();
576  n = compactors[0].get_num_items();
577  auto min_it = begin;
578  auto max_it = begin;
579  for (auto it = begin; it != end; ++it) {
580  if (comparator(*it, *min_it)) min_it = it;
581  if (comparator(*max_it, *it)) max_it = it;
582  }
583  min_item.emplace(*min_it);
584  max_item.emplace(*max_it);
585  }
586 
587  return req_sketch(k, hra, n, std::move(min_item), std::move(max_item), std::move(compactors), comparator);
588 }
589 
590 template<typename T, typename C, typename A>
591 void req_sketch<T, C, A>::grow() {
592  const uint8_t lg_weight = get_num_levels();
593  compactors_.push_back(Compactor(hra_, lg_weight, k_, comparator_, allocator_));
594  update_max_nom_size();
595 }
596 
597 template<typename T, typename C, typename A>
598 uint8_t req_sketch<T, C, A>::get_num_levels() const {
599  return static_cast<uint8_t>(compactors_.size());
600 }
601 
602 template<typename T, typename C, typename A>
603 void req_sketch<T, C, A>::update_max_nom_size() {
604  max_nom_size_ = 0;
605  for (const auto& compactor: compactors_) max_nom_size_ += compactor.get_nom_capacity();
606 }
607 
608 template<typename T, typename C, typename A>
609 void req_sketch<T, C, A>::update_num_retained() {
610  num_retained_ = 0;
611  for (const auto& compactor: compactors_) num_retained_ += compactor.get_num_items();
612 }
613 
614 template<typename T, typename C, typename A>
615 void req_sketch<T, C, A>::compress() {
616  for (size_t h = 0; h < compactors_.size(); ++h) {
617  if (compactors_[h].get_num_items() >= compactors_[h].get_nom_capacity()) {
618  if (h == 0) compactors_[0].sort();
619  if (h + 1 >= get_num_levels()) { // at the top?
620  grow(); // add a level, increases max_nom_size
621  }
622  auto pair = compactors_[h].compact(compactors_[h + 1]);
623  num_retained_ -= pair.first;
624  max_nom_size_ += pair.second;
625  if (LAZY_COMPRESSION && num_retained_ < max_nom_size_) break;
626  }
627  }
628 }
629 
630 template<typename T, typename C, typename A>
631 string<A> req_sketch<T, C, A>::to_string(bool print_levels, bool print_items) const {
632  // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements.
633  // The stream does not support passing an allocator instance, and alternatives are complicated.
634  std::ostringstream os;
635  os << "### REQ sketch summary:" << std::endl;
636  os << " K : " << k_ << std::endl;
637  os << " High Rank Acc : " << (hra_ ? "true" : "false") << std::endl;
638  os << " Empty : " << (is_empty() ? "true" : "false") << std::endl;
639  os << " Estimation mode: " << (is_estimation_mode() ? "true" : "false") << std::endl;
640  os << " Sorted : " << (compactors_[0].is_sorted() ? "true" : "false") << std::endl;
641  os << " N : " << n_ << std::endl;
642  os << " Levels : " << compactors_.size() << std::endl;
643  os << " Retained items : " << num_retained_ << std::endl;
644  os << " Capacity items : " << max_nom_size_ << std::endl;
645  if (!is_empty()) {
646  os << " Min item : " << *min_item_ << std::endl;
647  os << " Max item : " << *max_item_ << std::endl;
648  }
649  os << "### End sketch summary" << std::endl;
650 
651  if (print_levels) {
652  os << "### REQ sketch levels:" << std::endl;
653  os << " index: nominal capacity, actual size" << std::endl;
654  for (uint8_t i = 0; i < compactors_.size(); i++) {
655  os << " " << (unsigned int) i << ": "
656  << compactors_[i].get_nom_capacity() << ", "
657  << compactors_[i].get_num_items() << std::endl;
658  }
659  os << "### End sketch levels" << std::endl;
660  }
661 
662  if (print_items) {
663  os << "### REQ sketch data:" << std::endl;
664  unsigned level = 0;
665  for (const auto& compactor: compactors_) {
666  os << " level " << level << ": " << std::endl;
667  for (auto it = compactor.begin(); it != compactor.end(); ++it) {
668  os << " " << *it << std::endl;
669  }
670  ++level;
671  }
672  os << "### End sketch data" << std::endl;
673  }
674  return string<A>(os.str().c_str(), allocator_);
675 }
676 
677 template<typename T, typename C, typename A>
678 req_sketch<T, C, A>::req_sketch(uint16_t k, bool hra, uint64_t n,
679  optional<T>&& min_item, optional<T>&& max_item,
680  std::vector<Compactor, AllocCompactor>&& compactors, const C& comparator):
681 comparator_(comparator),
682 allocator_(compactors.get_allocator()),
683 k_(k),
684 hra_(hra),
685 max_nom_size_(0),
686 num_retained_(0),
687 n_(n),
688 compactors_(std::move(compactors)),
689 min_item_(std::move(min_item)),
690 max_item_(std::move(max_item)),
691 sorted_view_(nullptr)
692 {
693  update_max_nom_size();
694  update_num_retained();
695 }
696 
697 template<typename T, typename C, typename A>
698 void req_sketch<T, C, A>::check_preamble_ints(uint8_t preamble_ints, uint8_t num_levels) {
699  const uint8_t expected_preamble_ints = num_levels > 1 ? 4 : 2;
700  if (preamble_ints != expected_preamble_ints) {
701  throw std::invalid_argument("Possible corruption: preamble ints must be "
702  + std::to_string(expected_preamble_ints) + ", got " + std::to_string(preamble_ints));
703  }
704 }
705 
706 template<typename T, typename C, typename A>
707 void req_sketch<T, C, A>::check_serial_version(uint8_t serial_version) {
708  if (serial_version != SERIAL_VERSION) {
709  throw std::invalid_argument("Possible corruption: serial version mismatch: expected "
710  + std::to_string(SERIAL_VERSION)
711  + ", got " + std::to_string(serial_version));
712  }
713 }
714 
715 template<typename T, typename C, typename A>
716 void req_sketch<T, C, A>::check_family_id(uint8_t family_id) {
717  if (family_id != FAMILY) {
718  throw std::invalid_argument("Possible corruption: family mismatch: expected "
719  + std::to_string(FAMILY) + ", got " + std::to_string(family_id));
720  }
721 }
722 
723 template<typename T, typename C, typename A>
724 auto req_sketch<T, C, A>::begin() const -> const_iterator {
725  return const_iterator(compactors_.begin(), compactors_.end());
726 }
727 
728 template<typename T, typename C, typename A>
729 auto req_sketch<T, C, A>::end() const -> const_iterator {
730  return const_iterator(compactors_.end(), compactors_.end());
731 }
732 
733 template<typename T, typename C, typename A>
735  if (sorted_view_ == nullptr) {
736  using AllocSortedView = typename std::allocator_traits<A>::template rebind_alloc<quantiles_sorted_view<T, C, A>>;
737  sorted_view_ = new (AllocSortedView(allocator_).allocate(1)) quantiles_sorted_view<T, C, A>(get_sorted_view());
738  }
739 }
740 
741 template<typename T, typename C, typename A>
742 void req_sketch<T, C, A>::reset_sorted_view() {
743  if (sorted_view_ != nullptr) {
744  sorted_view_->~quantiles_sorted_view();
745  using AllocSortedView = typename std::allocator_traits<A>::template rebind_alloc<quantiles_sorted_view<T, C, A>>;
746  AllocSortedView(allocator_).deallocate(sorted_view_, 1);
747  sorted_view_ = nullptr;
748  }
749 }
750 
751 // iterator
752 
753 template<typename T, typename C, typename A>
754 req_sketch<T, C, A>::const_iterator::const_iterator(LevelsIterator begin, LevelsIterator end):
755  levels_it_(begin),
756  levels_end_(end),
757  compactor_it_(begin == end ? nullptr : (*levels_it_).begin())
758 {}
759 
760 template<typename T, typename C, typename A>
761 auto req_sketch<T, C, A>::const_iterator::operator++() -> const_iterator& {
762  ++compactor_it_;
763  if (compactor_it_ == (*levels_it_).end()) {
764  ++levels_it_;
765  if (levels_it_ != levels_end_) compactor_it_ = (*levels_it_).begin();
766  }
767  return *this;
768 }
769 
770 template<typename T, typename C, typename A>
771 auto req_sketch<T, C, A>::const_iterator::operator++(int) -> const_iterator& {
772  const_iterator tmp(*this);
773  operator++();
774  return tmp;
775 }
776 
777 template<typename T, typename C, typename A>
778 bool req_sketch<T, C, A>::const_iterator::operator==(const const_iterator& other) const {
779  if (levels_it_ != other.levels_it_) return false;
780  if (levels_it_ == levels_end_) return true;
781  return compactor_it_ == other.compactor_it_;
782 }
783 
784 template<typename T, typename C, typename A>
785 bool req_sketch<T, C, A>::const_iterator::operator!=(const const_iterator& other) const {
786  return !operator==(other);
787 }
788 
789 template<typename T, typename C, typename A>
790 auto req_sketch<T, C, A>::const_iterator::operator*() const -> reference {
791  return value_type(*compactor_it_, 1ULL << (*levels_it_).get_lg_weight());
792 }
793 
794 template<typename T, typename C, typename A>
795 auto req_sketch<T, C, A>::const_iterator::operator->() const -> pointer {
796  return **this;
797 }
798 
799 } /* namespace datasketches */
800 
801 #endif
Relative Error Quantiles Sketch.
Definition: req_sketch.hpp:79
Comparator get_comparator() const
Returns an instance of the comparator for this sketch.
Definition: req_sketch_impl.hpp:224
quantiles_sorted_view< T, Comparator, Allocator > get_sorted_view() const
Gets the sorted view of this sketch.
Definition: req_sketch_impl.hpp:269
static req_sketch deserialize(std::istream &is, const SerDe &sd=SerDe(), const Comparator &comparator=Comparator(), const Allocator &allocator=Allocator())
This method deserializes a sketch from a given stream.
vector_double get_CDF(const T *split_points, uint32_t size, bool inclusive=true) const
Returns an approximation to the Cumulative Distribution Function (CDF), which is the cumulative analo...
Definition: req_sketch_impl.hpp:251
uint32_t get_num_retained() const
Returns the number of retained items in the sketch.
Definition: req_sketch_impl.hpp:159
vector_double get_PMF(const T *split_points, uint32_t size, bool inclusive=true) const
Returns an approximation to the Probability Mass Function (PMF) of the input stream given a set of sp...
Definition: req_sketch_impl.hpp:244
void merge(FwdSk &&other)
Merges another sketch into this one.
Definition: req_sketch_impl.hpp:188
typename quantiles_sorted_view< T, Comparator, Allocator >::quantile_return_type quantile_return_type
Quantile return type.
Definition: req_sketch.hpp:92
void update(FwdT &&item)
Updates this sketch with the given data item.
Definition: req_sketch_impl.hpp:170
void serialize(std::ostream &os, const SerDe &sd=SerDe()) const
This method serializes the sketch into a given stream in a binary form.
Definition: req_sketch_impl.hpp:369
string< Allocator > to_string(bool print_levels=false, bool print_items=false) const
Prints a summary of the sketch.
Definition: req_sketch_impl.hpp:631
uint16_t get_k() const
Returns configured parameter K.
Definition: req_sketch_impl.hpp:139
bool is_empty() const
Returns true if this sketch is empty.
Definition: req_sketch_impl.hpp:149
const T & get_max_item() const
Returns the max item of the stream.
Definition: req_sketch_impl.hpp:218
double get_rank_upper_bound(double rank, uint8_t num_std_dev) const
Returns an approximate upper bound of the given normalized rank.
Definition: req_sketch_impl.hpp:289
double get_rank(const T &item, bool inclusive=true) const
Returns an approximation to the normalized rank of the given item from 0 to 1 inclusive.
Definition: req_sketch_impl.hpp:234
static double get_RSE(uint16_t k, double rank, bool hra, uint64_t n)
Returns an a priori estimate of relative standard error (RSE, expressed as a number in [0,...
Definition: req_sketch_impl.hpp:294
double get_rank_lower_bound(double rank, uint8_t num_std_dev) const
Returns an approximate lower bound of the given normalized rank.
Definition: req_sketch_impl.hpp:284
Allocator get_allocator() const
Returns an instance of the allocator for this sketch.
Definition: req_sketch_impl.hpp:229
req_sketch & operator=(const req_sketch &other)
Copy assignment.
Definition: req_sketch_impl.hpp:81
quantile_return_type get_quantile(double rank, bool inclusive=true) const
Returns an approximate quantile of the given normalized rank.
Definition: req_sketch_impl.hpp:258
const_iterator begin() const
Iterator pointing to the first item in the sketch.
Definition: req_sketch_impl.hpp:724
size_t get_serialized_size_bytes(const SerDe &sd=SerDe()) const
Computes size needed to serialize the current state of the sketch.
Definition: req_sketch_impl.hpp:334
const_iterator end() const
Iterator pointing to the past-the-end item in the sketch.
Definition: req_sketch_impl.hpp:729
bool is_estimation_mode() const
Returns true if this sketch is in estimation mode.
Definition: req_sketch_impl.hpp:164
const T & get_min_item() const
Returns the min item of the stream.
Definition: req_sketch_impl.hpp:212
bool is_HRA() const
Returns configured parameter High Rank Accuracy.
Definition: req_sketch_impl.hpp:144
uint64_t get_n() const
Returns the length of the input stream.
Definition: req_sketch_impl.hpp:154
const uint16_t MIN_K
min value of parameter K
Definition: kll_sketch.hpp:39
const uint8_t INIT_NUM_SECTIONS
initial number of sections
Definition: req_common.hpp:35
const uint16_t MIN_K
minimum value of parameter K
Definition: req_common.hpp:33
DataSketches namespace.
Definition: binomial_bounds.hpp:38