datasketches-cpp
quantiles_sketch_impl.hpp
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef _QUANTILES_SKETCH_IMPL_HPP_
21 #define _QUANTILES_SKETCH_IMPL_HPP_
22 
23 #include <cmath>
24 #include <algorithm>
25 #include <stdexcept>
26 #include <iomanip>
27 #include <sstream>
28 
29 #include "count_zeros.hpp"
30 #include "conditional_forward.hpp"
31 
32 namespace datasketches {
33 
34 template<typename T, typename C, typename A>
35 quantiles_sketch<T, C, A>::quantiles_sketch(uint16_t k, const C& comparator, const A& allocator):
36 comparator_(comparator),
37 allocator_(allocator),
38 is_base_buffer_sorted_(true),
39 k_(k),
40 n_(0),
41 bit_pattern_(0),
42 base_buffer_(allocator_),
43 levels_(allocator_),
44 min_item_(),
45 max_item_(),
46 sorted_view_(nullptr)
47 {
48  check_k(k_);
49  base_buffer_.reserve(2 * std::min(quantiles_constants::MIN_K, k));
50 }
51 
52 template<typename T, typename C, typename A>
54 comparator_(other.comparator_),
55 allocator_(other.allocator_),
56 is_base_buffer_sorted_(other.is_base_buffer_sorted_),
57 k_(other.k_),
58 n_(other.n_),
59 bit_pattern_(other.bit_pattern_),
60 base_buffer_(other.base_buffer_),
61 levels_(other.levels_),
62 min_item_(other.min_item_),
63 max_item_(other.max_item_),
64 sorted_view_(nullptr)
65 {
66  for (size_t i = 0; i < levels_.size(); ++i) {
67  if (levels_[i].capacity() != other.levels_[i].capacity()) {
68  levels_[i].reserve(other.levels_[i].capacity());
69  }
70  }
71 }
72 
73 template<typename T, typename C, typename A>
75 comparator_(other.comparator_),
76 allocator_(other.allocator_),
77 is_base_buffer_sorted_(other.is_base_buffer_sorted_),
78 k_(other.k_),
79 n_(other.n_),
80 bit_pattern_(other.bit_pattern_),
81 base_buffer_(std::move(other.base_buffer_)),
82 levels_(std::move(other.levels_)),
83 min_item_(std::move(other.min_item_)),
84 max_item_(std::move(other.max_item_)),
85 sorted_view_(nullptr)
86 {}
87 
88 template<typename T, typename C, typename A>
90  quantiles_sketch<T, C, A> copy(other);
91  std::swap(comparator_, copy.comparator_);
92  std::swap(allocator_, copy.allocator_);
93  std::swap(is_base_buffer_sorted_, copy.is_base_buffer_sorted_);
94  std::swap(k_, copy.k_);
95  std::swap(n_, copy.n_);
96  std::swap(bit_pattern_, copy.bit_pattern_);
97  std::swap(base_buffer_, copy.base_buffer_);
98  std::swap(levels_, copy.levels_);
99  std::swap(min_item_, copy.min_item_);
100  std::swap(max_item_, copy.max_item_);
101  reset_sorted_view();
102  return *this;
103 }
104 
105 template<typename T, typename C, typename A>
107  std::swap(comparator_, other.comparator_);
108  std::swap(allocator_, other.allocator_);
109  std::swap(is_base_buffer_sorted_, other.is_base_buffer_sorted_);
110  std::swap(k_, other.k_);
111  std::swap(n_, other.n_);
112  std::swap(bit_pattern_, other.bit_pattern_);
113  std::swap(base_buffer_, other.base_buffer_);
114  std::swap(levels_, other.levels_);
115  std::swap(min_item_, other.min_item_);
116  std::swap(max_item_, other.max_item_);
117  reset_sorted_view();
118  return *this;
119 }
120 
121 template<typename T, typename C, typename A>
122 quantiles_sketch<T, C, A>::quantiles_sketch(uint16_t k, uint64_t n, uint64_t bit_pattern,
123  Level&& base_buffer, VectorLevels&& levels,
124  optional<T>&& min_item, optional<T>&& max_item,
125  bool is_sorted, const C& comparator, const A& allocator):
126 comparator_(comparator),
127 allocator_(allocator),
128 is_base_buffer_sorted_(is_sorted),
129 k_(k),
130 n_(n),
131 bit_pattern_(bit_pattern),
132 base_buffer_(std::move(base_buffer)),
133 levels_(std::move(levels)),
134 min_item_(std::move(min_item)),
135 max_item_(std::move(max_item)),
136 sorted_view_(nullptr)
137 {
138  uint32_t item_count = static_cast<uint32_t>(base_buffer_.size());
139  for (Level& lvl : levels_) {
140  item_count += static_cast<uint32_t>(lvl.size());
141  }
142  if (item_count != compute_retained_items(k_, n_))
143  throw std::logic_error("Item count does not match value computed from k, n");
144 }
145 
146 template<typename T, typename C, typename A>
147 template<typename From, typename FC, typename FA>
148 quantiles_sketch<T, C, A>::quantiles_sketch(const quantiles_sketch<From, FC, FA>& other,
149  const C& comparator, const A& allocator):
150 comparator_(comparator),
151 allocator_(allocator),
152 is_base_buffer_sorted_(false),
153 k_(other.get_k()),
154 n_(other.get_n()),
155 bit_pattern_(compute_bit_pattern(other.get_k(), other.get_n())),
156 base_buffer_(allocator),
157 levels_(allocator),
158 min_item_(other.min_item_),
159 max_item_(other.max_item_),
160 sorted_view_(nullptr)
161 {
162  static_assert(std::is_constructible<T, From>::value,
163  "Type converting constructor requires new type to be constructible from existing type");
164 
165  base_buffer_.reserve(2 * std::min(quantiles_constants::MIN_K, k_));
166 
167  if (!other.is_empty()) {
168  // reserve space in levels
169  const uint8_t num_levels = compute_levels_needed(k_, n_);
170  levels_.reserve(num_levels);
171  for (int i = 0; i < num_levels; ++i) {
172  Level level(allocator);
173  level.reserve(k_);
174  levels_.push_back(std::move(level));
175  }
176 
177  // iterate through points, assigning to the correct level as needed
178  for (auto pair : other) {
179  const uint64_t wt = pair.second;
180  if (wt == 1) {
181  base_buffer_.push_back(T(pair.first));
182  // resize where needed as if adding points via update()
183  if (base_buffer_.size() + 1 > base_buffer_.capacity()) {
184  const size_t new_size = std::max(std::min(static_cast<size_t>(2 * k_), 2 * base_buffer_.size()), static_cast<size_t>(1));
185  base_buffer_.reserve(new_size);
186  }
187  }
188  else {
189  const uint8_t idx = count_trailing_zeros_in_u64(pair.second) - 1;
190  levels_[idx].push_back(T(pair.first));
191  }
192  }
193 
194  // validate that ordering within each level is preserved
195  // base_buffer_ can be considered unsorted for this purpose
196  for (int i = 0; i < num_levels; ++i) {
197  if (!std::is_sorted(levels_[i].begin(), levels_[i].end(), comparator_)) {
198  throw std::logic_error("Copy construction across types produces invalid sorting");
199  }
200  }
201  }
202 }
203 
204 
205 template<typename T, typename C, typename A>
206 quantiles_sketch<T, C, A>::~quantiles_sketch() {
207  reset_sorted_view();
208 }
209 
210 template<typename T, typename C, typename A>
211 template<typename FwdT>
213  if (!check_update_item(item)) { return; }
214  if (is_empty()) {
215  min_item_.emplace(item);
216  max_item_.emplace(item);
217  } else {
218  if (comparator_(item, *min_item_)) *min_item_ = item;
219  if (comparator_(*max_item_, item)) *max_item_ = item;
220  }
221 
222  // if exceed capacity, grow until size 2k -- assumes eager processing
223  if (base_buffer_.size() + 1 > base_buffer_.capacity()) grow_base_buffer();
224 
225  base_buffer_.push_back(std::forward<FwdT>(item));
226  ++n_;
227 
228  if (base_buffer_.size() > 1) is_base_buffer_sorted_ = false;
229  if (base_buffer_.size() == 2 * k_) process_full_base_buffer();
230  reset_sorted_view();
231 }
232 
233 template<typename T, typename C, typename A>
234 template<typename FwdSk>
235 void quantiles_sketch<T, C, A>::merge(FwdSk&& other) {
236  if (other.is_empty()) {
237  return; // nothing to do
238  } else if (!other.is_estimation_mode()) {
239  // other is exact, stream in regardless of k
240  for (auto item : other.base_buffer_) {
241  update(conditional_forward<FwdSk>(item));
242  }
243  reset_sorted_view();
244  return;
245  }
246 
247  // other has data and is in estimation mode
248  if (is_estimation_mode()) {
249  if (k_ == other.get_k()) {
250  standard_merge(*this, std::forward<FwdSk>(other));
251  } else if (k_ > other.get_k()) {
252  quantiles_sketch sk_copy(std::forward<FwdSk>(other));
253  downsampling_merge(sk_copy, std::move(*this));
254  *this = std::move(sk_copy);
255  } else { // k_ < other.get_k()
256  downsampling_merge(*this, std::forward<FwdSk>(other));
257  }
258  } else {
259  // exact or empty
260  quantiles_sketch sk_copy(std::forward<FwdSk>(other));
261  if (k_ <= other.get_k()) {
262  if (!is_empty()) {
263  for (uint16_t i = 0; i < base_buffer_.size(); ++i) {
264  sk_copy.update(std::move(base_buffer_[i]));
265  }
266  }
267  } else { // k_ > other.get_k()
268  downsampling_merge(sk_copy, std::move(*this));
269  }
270  *this = std::move(sk_copy);
271  }
272  reset_sorted_view();
273 }
274 
275 template<typename T, typename C, typename A>
276 template<typename SerDe>
277 void quantiles_sketch<T, C, A>::serialize(std::ostream& os, const SerDe& serde) const {
278  const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_SHORT : PREAMBLE_LONGS_FULL;
279  write(os, preamble_longs);
280  const uint8_t ser_ver = SERIAL_VERSION;
281  write(os, ser_ver);
282  const uint8_t family = FAMILY;
283  write(os, family);
284 
285  // side-effect: sort base buffer since always compact
286  std::sort(const_cast<Level&>(base_buffer_).begin(), const_cast<Level&>(base_buffer_).end(), comparator_);
287  const_cast<quantiles_sketch*>(this)->is_base_buffer_sorted_ = true;
288 
289  // empty, ordered, compact are valid flags
290  const uint8_t flags_byte(
291  (is_empty() ? 1 << flags::IS_EMPTY : 0)
292  | (1 << flags::IS_SORTED) // always sorted as side effect noted above
293  | (1 << flags::IS_COMPACT) // always compact -- could be optional for numeric types?
294  );
295  write(os, flags_byte);
296  write(os, k_);
297  const uint16_t unused = 0;
298  write(os, unused);
299 
300  if (!is_empty()) {
301  write(os, n_);
302 
303  // min and max
304  serde.serialize(os, &*min_item_, 1);
305  serde.serialize(os, &*max_item_, 1);
306 
307  // base buffer items
308  serde.serialize(os, base_buffer_.data(), static_cast<unsigned>(base_buffer_.size()));
309 
310  // levels, only when data is present
311  for (Level lvl : levels_) {
312  if (lvl.size() > 0)
313  serde.serialize(os, lvl.data(), static_cast<unsigned>(lvl.size()));
314  }
315  }
316 }
317 
318 template<typename T, typename C, typename A>
319 template<typename SerDe>
320 auto quantiles_sketch<T, C, A>::serialize(unsigned header_size_bytes, const SerDe& serde) const -> vector_bytes {
321  const size_t size = get_serialized_size_bytes(serde) + header_size_bytes;
322  vector_bytes bytes(size, 0, allocator_);
323  uint8_t* ptr = bytes.data() + header_size_bytes;
324  const uint8_t* end_ptr = ptr + size;
325 
326  const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_SHORT : PREAMBLE_LONGS_FULL;
327  ptr += copy_to_mem(preamble_longs, ptr);
328  const uint8_t ser_ver = SERIAL_VERSION;
329  ptr += copy_to_mem(ser_ver, ptr);
330  const uint8_t family = FAMILY;
331  ptr += copy_to_mem(family, ptr);
332 
333  // side-effect: sort base buffer since always compact
334  std::sort(const_cast<Level&>(base_buffer_).begin(), const_cast<Level&>(base_buffer_).end(), comparator_);
335  const_cast<quantiles_sketch*>(this)->is_base_buffer_sorted_ = true;
336 
337  // empty, ordered, compact are valid flags
338  const uint8_t flags_byte(
339  (is_empty() ? 1 << flags::IS_EMPTY : 0)
340  | (1 << flags::IS_SORTED) // always sorted as side effect noted above
341  | (1 << flags::IS_COMPACT) // always compact
342  );
343  ptr += copy_to_mem(flags_byte, ptr);
344  ptr += copy_to_mem(k_, ptr);
345  ptr += sizeof(uint16_t); // 2 unused bytes
346 
347  if (!is_empty()) {
348 
349  ptr += copy_to_mem(n_, ptr);
350 
351  // min and max
352  ptr += serde.serialize(ptr, end_ptr - ptr, &*min_item_, 1);
353  ptr += serde.serialize(ptr, end_ptr - ptr, &*max_item_, 1);
354 
355  // base buffer items
356  if (base_buffer_.size() > 0)
357  ptr += serde.serialize(ptr, end_ptr - ptr, base_buffer_.data(), static_cast<unsigned>(base_buffer_.size()));
358 
359  // levels, only when data is present
360  for (Level lvl : levels_) {
361  if (lvl.size() > 0)
362  ptr += serde.serialize(ptr, end_ptr - ptr, lvl.data(), static_cast<unsigned>(lvl.size()));
363  }
364  }
365 
366  return bytes;
367 }
368 
369 template<typename T, typename C, typename A>
370 template<typename SerDe>
371 auto quantiles_sketch<T, C, A>::deserialize(std::istream &is, const SerDe& serde,
372  const C& comparator, const A &allocator) -> quantiles_sketch {
373  const auto preamble_longs = read<uint8_t>(is);
374  const auto serial_version = read<uint8_t>(is);
375  const auto family_id = read<uint8_t>(is);
376  const auto flags_byte = read<uint8_t>(is);
377  const auto k = read<uint16_t>(is);
378  read<uint16_t>(is); // unused
379 
380  check_k(k);
381  check_serial_version(serial_version); // a little redundant with the header check
382  check_family_id(family_id);
383  check_header_validity(preamble_longs, flags_byte, serial_version);
384 
385  if (!is.good()) throw std::runtime_error("error reading from std::istream");
386  const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
387  if (is_empty) {
388  return quantiles_sketch(k, comparator, allocator);
389  }
390 
391  const auto items_seen = read<uint64_t>(is);
392 
393  const bool is_compact = (serial_version == 2) | ((flags_byte & (1 << flags::IS_COMPACT)) > 0);
394  const bool is_sorted = (flags_byte & (1 << flags::IS_SORTED)) > 0;
395 
396  optional<T> tmp; // space to deserialize min and max
397  optional<T> min_item;
398  optional<T> max_item;
399 
400  serde.deserialize(is, &*tmp, 1);
401  // serde call did not throw, repackage and cleanup
402  min_item.emplace(*tmp);
403  (*tmp).~T();
404  serde.deserialize(is, &*tmp, 1);
405  // serde call did not throw, repackage and cleanup
406  max_item.emplace(*tmp);
407  (*tmp).~T();
408 
409  if (serial_version == 1) {
410  read<uint64_t>(is); // no longer used
411  }
412 
413  // allocate buffers as needed
414  const uint8_t levels_needed = compute_levels_needed(k, items_seen);
415  const uint64_t bit_pattern = compute_bit_pattern(k, items_seen);
416 
417  // Java provides a compact storage layout for a sketch of primitive doubles. The C++ version
418  // does not currently operate sketches in compact mode, but will only serialize as compact
419  // to avoid complications around serialization of empty values for generic type T. We also need
420  // to be able to ingest either serialized format from Java.
421 
422  // load base buffer
423  const uint32_t bb_items = compute_base_buffer_items(k, items_seen);
424  uint32_t items_to_read = (levels_needed == 0 || is_compact) ? bb_items : 2 * k;
425  Level base_buffer = deserialize_array(is, bb_items, 2 * k, serde, allocator);
426  if (items_to_read > bb_items) { // either equal or greater, never read fewer items
427  // read remaining items, but don't store them
428  deserialize_array(is, items_to_read - bb_items, items_to_read - bb_items, serde, allocator);
429  }
430 
431  // populate vector of Levels directly
432  VectorLevels levels(allocator);
433  levels.reserve(levels_needed);
434  if (levels_needed > 0) {
435  uint64_t working_pattern = bit_pattern;
436  for (size_t i = 0; i < levels_needed; ++i, working_pattern >>= 1) {
437  if ((working_pattern & 0x01) == 1) {
438  Level level = deserialize_array(is, k, k, serde, allocator);
439  levels.push_back(std::move(level));
440  } else {
441  Level level(allocator);
442  level.reserve(k);
443  levels.push_back(std::move(level));
444  }
445  }
446  }
447 
448  return quantiles_sketch(k, items_seen, bit_pattern,
449  std::move(base_buffer), std::move(levels), std::move(min_item), std::move(max_item), is_sorted,
450  comparator, allocator);
451 }
452 
453 template<typename T, typename C, typename A>
454 template<typename SerDe>
455 auto quantiles_sketch<T, C, A>::deserialize_array(std::istream& is, uint32_t num_items, uint32_t capacity, const SerDe& serde, const A& allocator) -> Level {
456  A alloc(allocator);
457  std::unique_ptr<T, items_deleter> items(alloc.allocate(num_items), items_deleter(allocator, false, num_items));
458  serde.deserialize(is, items.get(), num_items);
459  // serde did not throw, enable destructors
460  items.get_deleter().set_destroy(true);
461  if (!is.good()) throw std::runtime_error("error reading from std::istream");
462 
463  // successfully read, now put into a Level
464  Level level(allocator);
465  level.reserve(capacity);
466  level.insert(level.begin(),
467  std::make_move_iterator(items.get()),
468  std::make_move_iterator(items.get() + num_items));
469  return level;
470 }
471 
472 template<typename T, typename C, typename A>
473 template<typename SerDe>
474 auto quantiles_sketch<T, C, A>::deserialize(const void* bytes, size_t size, const SerDe& serde,
475  const C& comparator, const A &allocator) -> quantiles_sketch {
476  ensure_minimum_memory(size, 8);
477  const char* ptr = static_cast<const char*>(bytes);
478  const char* end_ptr = static_cast<const char*>(bytes) + size;
479 
480  uint8_t preamble_longs;
481  ptr += copy_from_mem(ptr, preamble_longs);
482  uint8_t serial_version;
483  ptr += copy_from_mem(ptr, serial_version);
484  uint8_t family_id;
485  ptr += copy_from_mem(ptr, family_id);
486  uint8_t flags_byte;
487  ptr += copy_from_mem(ptr, flags_byte);
488  uint16_t k;
489  ptr += copy_from_mem(ptr, k);
490  uint16_t unused;
491  ptr += copy_from_mem(ptr, unused);
492 
493  check_k(k);
494  check_serial_version(serial_version); // a little redundant with the header check
495  check_family_id(family_id);
496  check_header_validity(preamble_longs, flags_byte, serial_version);
497 
498  const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
499  if (is_empty) {
500  return quantiles_sketch(k, comparator, allocator);
501  }
502 
503  ensure_minimum_memory(size, 16);
504  uint64_t items_seen;
505  ptr += copy_from_mem(ptr, items_seen);
506 
507  const bool is_compact = (serial_version == 2) | ((flags_byte & (1 << flags::IS_COMPACT)) > 0);
508  const bool is_sorted = (flags_byte & (1 << flags::IS_SORTED)) > 0;
509 
510  optional<T> tmp; // space to deserialize min and max
511  optional<T> min_item;
512  optional<T> max_item;
513 
514  ptr += serde.deserialize(ptr, end_ptr - ptr, &*tmp, 1);
515  // serde call did not throw, repackage and cleanup
516  min_item.emplace(*tmp);
517  (*tmp).~T();
518  ptr += serde.deserialize(ptr, end_ptr - ptr, &*tmp, 1);
519  // serde call did not throw, repackage and cleanup
520  max_item.emplace(*tmp);
521  (*tmp).~T();
522 
523  if (serial_version == 1) {
524  uint64_t unused_long;
525  ptr += copy_from_mem(ptr, unused_long); // no longer used
526  }
527 
528  // allocate buffers as needed
529  const uint8_t levels_needed = compute_levels_needed(k, items_seen);
530  const uint64_t bit_pattern = compute_bit_pattern(k, items_seen);
531 
532  // Java provides a compact storage layout for a sketch of primitive doubles. The C++ version
533  // does not currently operate sketches in compact mode, but will only serialize as compact
534  // to avoid complications around serialization of empty values for generic type T. We also need
535  // to be able to ingest either serialized format from Java.
536 
537  // load base buffer
538  const uint32_t bb_items = compute_base_buffer_items(k, items_seen);
539  uint32_t items_to_read = (levels_needed == 0 || is_compact) ? bb_items : 2 * k;
540  auto base_buffer_pair = deserialize_array(ptr, end_ptr - ptr, bb_items, 2 * k, serde, allocator);
541  ptr += base_buffer_pair.second;
542  if (items_to_read > bb_items) { // either equal or greater, never read fewer items
543  // read remaining items, only use to advance the pointer
544  auto extras = deserialize_array(ptr, end_ptr - ptr, items_to_read - bb_items, items_to_read - bb_items, serde, allocator);
545  ptr += extras.second;
546  }
547 
548  // populate vector of Levels directly
549  VectorLevels levels(allocator);
550  levels.reserve(levels_needed);
551  if (levels_needed > 0) {
552  uint64_t working_pattern = bit_pattern;
553  for (size_t i = 0; i < levels_needed; ++i, working_pattern >>= 1) {
554 
555  if ((working_pattern & 0x01) == 1) {
556  auto pair = deserialize_array(ptr, end_ptr - ptr, k, k, serde, allocator);
557  ptr += pair.second;
558  levels.push_back(std::move(pair.first));
559  } else {
560  Level level(allocator);
561  level.reserve(k);
562  levels.push_back(std::move(level));
563  }
564  }
565  }
566 
567  return quantiles_sketch(k, items_seen, bit_pattern,
568  std::move(base_buffer_pair.first), std::move(levels), std::move(min_item), std::move(max_item), is_sorted,
569  comparator, allocator);
570 }
571 
572 template<typename T, typename C, typename A>
573 template<typename SerDe>
574 auto quantiles_sketch<T, C, A>::deserialize_array(const void* bytes, size_t size, uint32_t num_items, uint32_t capacity, const SerDe& serde, const A& allocator)
575  -> std::pair<Level, size_t> {
576  const char* ptr = static_cast<const char*>(bytes);
577  const char* end_ptr = static_cast<const char*>(bytes) + size;
578  A alloc(allocator);
579  std::unique_ptr<T, items_deleter> items(alloc.allocate(num_items), items_deleter(allocator, false, num_items));
580  ptr += serde.deserialize(ptr, end_ptr - ptr, items.get(), num_items);
581  // serde did not throw, enable destructors
582  items.get_deleter().set_destroy(true);
583 
584  // succesfully read, now put into a Level
585  Level level(allocator);
586  level.reserve(capacity);
587  level.insert(level.begin(),
588  std::make_move_iterator(items.get()),
589  std::make_move_iterator(items.get() + num_items));
590 
591  return std::pair<Level, size_t>(std::move(level), ptr - static_cast<const char*>(bytes));
592 }
593 
594 template<typename T, typename C, typename A>
595 string<A> quantiles_sketch<T, C, A>::to_string(bool print_levels, bool print_items) const {
596  // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements.
597  // The stream does not support passing an allocator instance, and alternatives are complicated.
598  std::ostringstream os;
599  os << "### Quantiles Sketch summary:" << std::endl;
600  os << " K : " << k_ << std::endl;
601  os << " N : " << n_ << std::endl;
602  os << " Epsilon : " << std::setprecision(3) << get_normalized_rank_error(false) * 100 << "%" << std::endl;
603  os << " Epsilon PMF : " << get_normalized_rank_error(true) * 100 << "%" << std::endl;
604  os << " Empty : " << (is_empty() ? "true" : "false") << std::endl;
605  os << " Estimation mode: " << (is_estimation_mode() ? "true" : "false") << std::endl;
606  os << " Levels (w/o BB): " << levels_.size() << std::endl;
607  os << " Used Levels : " << count_valid_levels(bit_pattern_) << std::endl;
608  os << " Retained items : " << get_num_retained() << std::endl;
609  if (!is_empty()) {
610  os << " Min item : " << *min_item_ << std::endl;
611  os << " Max item : " << *max_item_ << std::endl;
612  }
613  os << "### End sketch summary" << std::endl;
614 
615  if (print_levels) {
616  os << "### Quantiles Sketch levels:" << std::endl;
617  os << " index: items in use" << std::endl;
618  os << " BB: " << base_buffer_.size() << std::endl;
619  for (uint8_t i = 0; i < levels_.size(); i++) {
620  os << " " << static_cast<unsigned int>(i) << ": " << levels_[i].size() << std::endl;
621  }
622  os << "### End sketch levels" << std::endl;
623  }
624 
625  if (print_items) {
626  os << "### Quantiles Sketch data:" << std::endl;
627  uint8_t level = 0;
628  os << " BB:" << std::endl;
629  for (const T& item : base_buffer_) {
630  os << " " << item << std::endl;
631  }
632  for (uint8_t i = 0; i < levels_.size(); ++i) {
633  os << " level " << static_cast<unsigned int>(level) << ":" << std::endl;
634  for (const T& item : levels_[i]) {
635  os << " " << item << std::endl;
636  }
637  }
638  os << "### End sketch data" << std::endl;
639  }
640  return string<A>(os.str().c_str(), allocator_);
641 }
642 
643 template<typename T, typename C, typename A>
645  return k_;
646 }
647 
648 template<typename T, typename C, typename A>
650  return n_;
651 }
652 
653 template<typename T, typename C, typename A>
655  return n_ == 0;
656 }
657 
658 template<typename T, typename C, typename A>
660  return bit_pattern_ != 0;
661 }
662 
663 template<typename T, typename C, typename A>
665  return compute_retained_items(k_, n_);
666 }
667 
668 template<typename T, typename C, typename A>
670  if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
671  return *min_item_;
672 }
673 
674 template<typename T, typename C, typename A>
676  if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
677  return *max_item_;
678 }
679 
680 template<typename T, typename C, typename A>
682  return comparator_;
683 }
684 
685 template<typename T, typename C, typename A>
687  return allocator_;
688 }
689 
690 // implementation for fixed-size arithmetic types (integral and floating point)
691 template<typename T, typename C, typename A>
692 template<typename SerDe, typename TT, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
694  if (is_empty()) { return EMPTY_SIZE_BYTES; }
695  return DATA_START + ((get_num_retained() + 2) * sizeof(TT));
696 }
697 
698 // implementation for all other types
699 template<typename T, typename C, typename A>
700 template<typename SerDe, typename TT, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
702  if (is_empty()) { return EMPTY_SIZE_BYTES; }
703  size_t size = DATA_START;
704  size += serde.size_of_item(*min_item_);
705  size += serde.size_of_item(*max_item_);
706  for (auto it: *this) size += serde.size_of_item(it.first);
707  return size;
708 }
709 
710 template<typename T, typename C, typename A>
712  return get_normalized_rank_error(k_, is_pmf);
713 }
714 
715 template<typename T, typename C, typename A>
717  return is_pmf
718  ? 1.854 / std::pow(k, 0.9657)
719  : 1.576 / std::pow(k, 0.9726);
720 }
721 
722 template<typename T, typename C, typename A>
724  // allow side-effect of sorting the base buffer
725  if (!is_base_buffer_sorted_) {
726  std::sort(const_cast<Level&>(base_buffer_).begin(), const_cast<Level&>(base_buffer_).end(), comparator_);
727  const_cast<quantiles_sketch*>(this)->is_base_buffer_sorted_ = true;
728  }
729  quantiles_sorted_view<T, C, A> view(get_num_retained(), comparator_, allocator_);
730 
731  uint64_t weight = 1;
732  view.add(base_buffer_.begin(), base_buffer_.end(), weight);
733  for (const auto& level: levels_) {
734  weight <<= 1;
735  if (level.empty()) { continue; }
736  view.add(level.begin(), level.end(), weight);
737  }
738 
739  view.convert_to_cummulative();
740  return view;
741 }
742 
743 template<typename T, typename C, typename A>
744 auto quantiles_sketch<T, C, A>::get_quantile(double rank, bool inclusive) const -> quantile_return_type {
745  if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
746  if ((rank < 0.0) || (rank > 1.0)) {
747  throw std::invalid_argument("Normalized rank cannot be less than 0 or greater than 1");
748  }
749  // possible side-effect: sorting base buffer
750  setup_sorted_view();
751  return sorted_view_->get_quantile(rank, inclusive);
752 }
753 
754 template<typename T, typename C, typename A>
755 double quantiles_sketch<T, C, A>::get_rank(const T& item, bool inclusive) const {
756  if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
757  setup_sorted_view();
758  return sorted_view_->get_rank(item, inclusive);
759 }
760 
761 template<typename T, typename C, typename A>
762 auto quantiles_sketch<T, C, A>::get_PMF(const T* split_points, uint32_t size, bool inclusive) const -> vector_double {
763  if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
764  setup_sorted_view();
765  return sorted_view_->get_PMF(split_points, size, inclusive);
766 }
767 
768 template<typename T, typename C, typename A>
769 auto quantiles_sketch<T, C, A>::get_CDF(const T* split_points, uint32_t size, bool inclusive) const -> vector_double {
770  if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
771  setup_sorted_view();
772  return sorted_view_->get_CDF(split_points, size, inclusive);
773 }
774 
775 template<typename T, typename C, typename A>
776 uint32_t quantiles_sketch<T, C, A>::compute_retained_items(uint16_t k, uint64_t n) {
777  const uint32_t bb_count = compute_base_buffer_items(k, n);
778  const uint64_t bit_pattern = compute_bit_pattern(k, n);
779  const uint32_t valid_levels = count_valid_levels(bit_pattern);
780  return bb_count + (k * valid_levels);
781 }
782 
783 template<typename T, typename C, typename A>
784 uint32_t quantiles_sketch<T, C, A>::compute_base_buffer_items(uint16_t k, uint64_t n) {
785  return n % (static_cast<uint64_t>(2) * k);
786 }
787 
788 template<typename T, typename C, typename A>
789 uint64_t quantiles_sketch<T, C, A>::compute_bit_pattern(uint16_t k, uint64_t n) {
790  return n / (static_cast<uint64_t>(2) * k);
791 }
792 
793 template<typename T, typename C, typename A>
794 uint32_t quantiles_sketch<T, C, A>::count_valid_levels(uint64_t bit_pattern) {
795  uint32_t count = 0;
796  for (; bit_pattern > 0; ++count) bit_pattern &= bit_pattern - 1;
797  return count;
798 }
799 
800 template<typename T, typename C, typename A>
801 uint8_t quantiles_sketch<T, C, A>::compute_levels_needed(uint16_t k, uint64_t n) {
802  return static_cast<uint8_t>(64U) - count_leading_zeros_in_u64(n / (2 * k));
803 }
804 
805 template<typename T, typename C, typename A>
806 void quantiles_sketch<T, C, A>::check_k(uint16_t k) {
807  if (k < quantiles_constants::MIN_K || k > quantiles_constants::MAX_K || (k & (k - 1)) != 0) {
808  throw std::invalid_argument("k must be a power of 2 that is >= "
809  + std::to_string(quantiles_constants::MIN_K) + " and <= "
810  + std::to_string(quantiles_constants::MAX_K) + ". Found: " + std::to_string(k));
811  }
812 }
813 
814 template<typename T, typename C, typename A>
815 void quantiles_sketch<T, C, A>::check_serial_version(uint8_t serial_version) {
816  if (serial_version == SERIAL_VERSION || serial_version == SERIAL_VERSION_1 || serial_version == SERIAL_VERSION_2)
817  return;
818  else
819  throw std::invalid_argument("Possible corruption. Unrecognized serialization version: " + std::to_string(serial_version));
820 }
821 
822 template<typename T, typename C, typename A>
823 void quantiles_sketch<T, C, A>::check_family_id(uint8_t family_id) {
824  if (family_id == FAMILY)
825  return;
826  else
827  throw std::invalid_argument("Possible corruption. Family id does not indicate quantiles sketch: " + std::to_string(family_id));
828 }
829 
830 template<typename T, typename C, typename A>
831 void quantiles_sketch<T, C, A>::check_header_validity(uint8_t preamble_longs, uint8_t flags_byte, uint8_t serial_version) {
832  const bool empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
833  const bool compact = (flags_byte & (1 << flags::IS_COMPACT)) > 0;
834 
835  const uint8_t sw = (compact ? 1 : 0) + (2 * (empty ? 1 : 0))
836  + (4 * (serial_version & 0xF)) + (32 * (preamble_longs & 0x3F));
837  bool valid = true;
838 
839  switch (sw) { // exhaustive list and description of all valid cases
840  case 38 : break;
841  case 164 : break;
842  case 42 : break;
843  case 72 : break;
844  case 47 : break; // compact, empty, serVer = 3, preLongs = 1;
845  case 46 : break;
846  case 79 : break; // compact, empty, serVer = 3, preLongs = 2;
847  case 78 : break;
848  case 77 : break; // compact, !empty, serVer = 3, preLongs = 2;
849  case 76 : break;
850  default : //all other case values are invalid
851  valid = false;
852  }
853 
854  if (!valid) {
855  std::ostringstream os;
856  os << "Possible sketch corruption. Inconsistent state: "
857  << "preamble_longs = " << preamble_longs
858  << ", empty = " << (empty ? "true" : "false")
859  << ", serialization_version = " << serial_version
860  << ", compact = " << (compact ? "true" : "false");
861  throw std::invalid_argument(os.str());
862  }
863 }
864 
865 template <typename T, typename C, typename A>
867  return quantiles_sketch<T, C, A>::const_iterator(base_buffer_, levels_, k_, n_, false);
868 }
869 
870 template <typename T, typename C, typename A>
872  return quantiles_sketch<T, C, A>::const_iterator(base_buffer_, levels_, k_, n_, true);
873 }
874 
875 template<typename T, typename C, typename A>
877  const size_t new_size = std::max(std::min(static_cast<size_t>(2 * k_), 2 * base_buffer_.size()), static_cast<size_t>(1));
878  base_buffer_.reserve(new_size);
879 }
880 
881 template<typename T, typename C, typename A>
882 void quantiles_sketch<T, C, A>::process_full_base_buffer() {
883  // make sure there will be enough levels for the propagation
884  grow_levels_if_needed(); // note: n_ was already incremented by update() before this
885 
886  std::sort(base_buffer_.begin(), base_buffer_.end(), comparator_);
887  in_place_propagate_carry(0,
888  levels_[0], // unused here, but 0 is guaranteed to exist
889  base_buffer_,
890  true, *this);
891  base_buffer_.clear();
892  is_base_buffer_sorted_ = true;
893  if (n_ / (2 * k_) != bit_pattern_) {
894  throw std::logic_error("Internal error: n / 2k (" + std::to_string(n_ / 2 * k_)
895  + " != bit_pattern " + std::to_string(bit_pattern_));
896  }
897 }
898 
899 template<typename T, typename C, typename A>
900 bool quantiles_sketch<T, C, A>::grow_levels_if_needed() {
901  const uint8_t levels_needed = compute_levels_needed(k_, n_);
902  if (levels_needed == 0)
903  return false; // don't need levels and might have small base buffer. Possible during merges.
904 
905  // from here on, assume full size base buffer (2k) and at least one additional level
906  if (levels_needed <= levels_.size())
907  return false;
908 
909  Level empty_level(allocator_);
910  empty_level.reserve(k_);
911  levels_.push_back(std::move(empty_level));
912  return true;
913 }
914 
915 template<typename T, typename C, typename A>
916 template<typename FwdV>
917 void quantiles_sketch<T, C, A>::in_place_propagate_carry(uint8_t starting_level,
918  FwdV&& buf_size_k, Level& buf_size_2k,
919  bool apply_as_update,
920  quantiles_sketch& sketch) {
921  const uint64_t bit_pattern = sketch.bit_pattern_;
922  const int k = sketch.k_;
923 
924  uint8_t ending_level = lowest_zero_bit_starting_at(bit_pattern, starting_level);
925 
926  if (apply_as_update) {
927  // update version of computation
928  // its is okay for buf_size_k to be null in this case
929  zip_buffer(buf_size_2k, sketch.levels_[ending_level]);
930  } else {
931  // merge_into version of computation
932  for (uint16_t i = 0; i < k; ++i) {
933  sketch.levels_[ending_level].push_back(conditional_forward<FwdV>(buf_size_k[i]));
934  }
935  }
936 
937  for (uint64_t lvl = starting_level; lvl < ending_level; lvl++) {
938  if ((bit_pattern & (static_cast<uint64_t>(1) << lvl)) == 0) {
939  throw std::logic_error("unexpected empty level in bit_pattern");
940  }
941  merge_two_size_k_buffers(
942  sketch.levels_[lvl],
943  sketch.levels_[ending_level],
944  buf_size_2k, sketch.get_comparator());
945  sketch.levels_[lvl].clear();
946  sketch.levels_[ending_level].clear();
947  zip_buffer(buf_size_2k, sketch.levels_[ending_level]);
948  } // end of loop over lower levels
949 
950  // update bit pattern with binary-arithmetic ripple carry
951  sketch.bit_pattern_ = bit_pattern + (static_cast<uint64_t>(1) << starting_level);
952 }
953 
954 template<typename T, typename C, typename A>
955 void quantiles_sketch<T, C, A>::zip_buffer(Level& buf_in, Level& buf_out) {
956 #ifdef QUANTILES_VALIDATION
957  static uint32_t next_offset = 0;
958  uint32_t rand_offset = next_offset;
959  next_offset = 1 - next_offset;
960 #else
961  uint32_t rand_offset = random_utils::random_bit();
962 #endif
963  if ((buf_in.size() != 2 * buf_out.capacity())
964  || (buf_out.size() > 0)) {
965  throw std::logic_error("zip_buffer requires buf_in.size() == "
966  "2*buf_out.capacity() and empty buf_out");
967  }
968 
969  size_t k = buf_out.capacity();
970  for (uint32_t i = rand_offset, o = 0; o < k; i += 2, ++o) {
971  buf_out.push_back(std::move(buf_in[i]));
972  }
973  buf_in.clear();
974 }
975 
976 template<typename T, typename C, typename A>
977 template<typename FwdV>
978 void quantiles_sketch<T, C, A>::zip_buffer_with_stride(FwdV&& buf_in, Level& buf_out, uint16_t stride) {
979  // Random offset in range [0, stride)
980  std::uniform_int_distribution<uint16_t> dist(0, stride - 1);
981  const uint16_t rand_offset = dist(random_utils::rand);
982 
983  if ((buf_in.size() != stride * buf_out.capacity())
984  || (buf_out.size() > 0)) {
985  throw std::logic_error("zip_buffer_with_stride requires buf_in.size() == "
986  "stride*buf_out.capacity() and empty buf_out");
987  }
988 
989  const size_t k = buf_out.capacity();
990  for (uint16_t i = rand_offset, o = 0; o < k; i += stride, ++o) {
991  buf_out.push_back(conditional_forward<FwdV>(buf_in[i]));
992  }
993  // do not clear input buffer
994 }
995 
996 template<typename T, typename C, typename A>
997 void quantiles_sketch<T, C, A>::merge_two_size_k_buffers(Level& src_1, Level& src_2,
998  Level& dst, const C& comparator) {
999  if (src_1.size() != src_2.size()
1000  || src_1.size() * 2 != dst.capacity()
1001  || dst.size() != 0) {
1002  throw std::logic_error("Input invariants violated in merge_two_size_k_buffers()");
1003  }
1004 
1005  auto end1 = src_1.end(), end2 = src_2.end();
1006  auto it1 = src_1.begin(), it2 = src_2.begin();
1007 
1008  // TODO: probably actually doing copies given Level&?
1009  while (it1 != end1 && it2 != end2) {
1010  if (comparator(*it1, *it2)) {
1011  dst.push_back(std::move(*it1++));
1012  } else {
1013  dst.push_back(std::move(*it2++));
1014  }
1015  }
1016 
1017  if (it1 != end1) {
1018  dst.insert(dst.end(), it1, end1);
1019  } else {
1020  if (it2 == end2) { throw std::logic_error("it2 unexpectedly already at end of range"); }
1021  dst.insert(dst.end(), it2, end2);
1022  }
1023 }
1024 
1025 template<typename T, typename C, typename A>
1026 template<typename FwdSk>
1027 void quantiles_sketch<T, C, A>::standard_merge(quantiles_sketch& tgt, FwdSk&& src) {
1028  if (src.get_k() != tgt.get_k()) {
1029  throw std::invalid_argument("src.get_k() != tgt.get_k()");
1030  }
1031  if (src.is_empty()) {
1032  return;
1033  }
1034 
1035  uint64_t new_n = src.get_n() + tgt.get_n();
1036 
1037  // move items from src's base buffer
1038  for (uint16_t i = 0; i < src.base_buffer_.size(); ++i) {
1039  tgt.update(conditional_forward<FwdSk>(src.base_buffer_[i]));
1040  }
1041 
1042  // check (after moving raw items) if we need to extend levels array
1043  uint8_t levels_needed = compute_levels_needed(tgt.get_k(), new_n);
1044  if (levels_needed > tgt.levels_.size()) {
1045  tgt.levels_.reserve(levels_needed);
1046  while (tgt.levels_.size() < levels_needed) {
1047  Level empty_level(tgt.allocator_);
1048  empty_level.reserve(tgt.get_k());
1049  tgt.levels_.push_back(std::move(empty_level));
1050  }
1051  }
1052 
1053  Level scratch_buf(tgt.allocator_);
1054  scratch_buf.reserve(2 * tgt.get_k());
1055 
1056  uint64_t src_pattern = src.bit_pattern_;
1057  for (uint8_t src_lvl = 0; src_pattern != 0; ++src_lvl, src_pattern >>= 1) {
1058  if ((src_pattern & 1) > 0) {
1059  scratch_buf.clear();
1060 
1061  // propagate-carry
1062  in_place_propagate_carry(src_lvl,
1063  src.levels_[src_lvl], scratch_buf,
1064  false, tgt);
1065  // update n_ at the end
1066  }
1067  }
1068  tgt.n_ = new_n;
1069  if ((tgt.get_n() / (2 * tgt.get_k())) != tgt.bit_pattern_) {
1070  throw std::logic_error("Failed internal consistency check after standard_merge()");
1071  }
1072 
1073  // update min and max items
1074  // can't just check is_empty() since min and max might not have been set if
1075  // there were no base buffer items added via update()
1076  if (!tgt.min_item_) {
1077  tgt.min_item_.emplace(conditional_forward<FwdSk>(*src.min_item_));
1078  } else {
1079  if (tgt.comparator_(*src.min_item_, *tgt.min_item_))
1080  *tgt.min_item_ = conditional_forward<FwdSk>(*src.min_item_);
1081  }
1082  if (!tgt.max_item_) {
1083  tgt.max_item_.emplace(conditional_forward<FwdSk>(*src.max_item_));
1084  } else {
1085  if (tgt.comparator_(*tgt.max_item_, *src.max_item_))
1086  *tgt.max_item_ = conditional_forward<FwdSk>(*src.max_item_);
1087  }
1088 }
1089 
1090 template<typename T, typename C, typename A>
1091 template<typename FwdSk>
1092 void quantiles_sketch<T, C, A>::downsampling_merge(quantiles_sketch& tgt, FwdSk&& src) {
1093  if (src.get_k() % tgt.get_k() != 0) {
1094  throw std::invalid_argument("src.get_k() is not a multiple of tgt.get_k()");
1095  }
1096  if (src.is_empty()) {
1097  return;
1098  }
1099 
1100  const uint16_t downsample_factor = src.get_k() / tgt.get_k();
1101  const uint8_t lg_sample_factor = count_trailing_zeros_in_u32(downsample_factor);
1102 
1103  const uint64_t new_n = src.get_n() + tgt.get_n();
1104 
1105  // move items from src's base buffer
1106  for (uint16_t i = 0; i < src.base_buffer_.size(); ++i) {
1107  tgt.update(conditional_forward<FwdSk>(src.base_buffer_[i]));
1108  }
1109 
1110  // check (after moving raw items) if we need to extend levels array
1111  const uint8_t levels_needed = compute_levels_needed(tgt.get_k(), new_n);
1112  if (levels_needed > tgt.levels_.size()) {
1113  tgt.levels_.reserve(levels_needed);
1114  while (tgt.levels_.size() < levels_needed) {
1115  Level empty_level(tgt.allocator_);
1116  empty_level.reserve(tgt.get_k());
1117  tgt.levels_.push_back(std::move(empty_level));
1118  }
1119  }
1120 
1121  Level down_buf(tgt.allocator_);
1122  down_buf.reserve(tgt.get_k());
1123 
1124  Level scratch_buf(tgt.allocator_);
1125  scratch_buf.reserve(2 * tgt.get_k());
1126 
1127  uint64_t src_pattern = src.bit_pattern_;
1128  for (uint8_t src_lvl = 0; src_pattern != 0; ++src_lvl, src_pattern >>= 1) {
1129  if ((src_pattern & 1) > 0) {
1130  down_buf.clear();
1131  scratch_buf.clear();
1132 
1133  // zip with stride, leaving input buffer intact
1134  zip_buffer_with_stride(src.levels_[src_lvl], down_buf, downsample_factor);
1135 
1136  // propagate-carry
1137  in_place_propagate_carry(src_lvl + lg_sample_factor,
1138  down_buf, scratch_buf,
1139  false, tgt);
1140  // update n_ at the end
1141  }
1142  }
1143  tgt.n_ = new_n;
1144  if ((tgt.get_n() / (2 * tgt.get_k())) != tgt.bit_pattern_) {
1145  throw std::logic_error("Failed internal consistency check after downsampling_merge()");
1146  }
1147 
1148  // update min and max items
1149  // can't just check is_empty() since min and max might not have been set if
1150  // there were no base buffer items added via update()
1151  if (!tgt.min_item_) {
1152  tgt.min_item_.emplace(conditional_forward<FwdSk>(*src.min_item_));
1153  } else {
1154  if (tgt.comparator_(*src.min_item_, *tgt.min_item_))
1155  *tgt.min_item_ = conditional_forward<FwdSk>(*src.min_item_);
1156  }
1157  if (!tgt.max_item_) {
1158  tgt.max_item_.emplace(conditional_forward<FwdSk>(*src.max_item_));
1159  } else {
1160  if (tgt.comparator_(*tgt.max_item_, *src.max_item_))
1161  *tgt.max_item_ = conditional_forward<FwdSk>(*src.max_item_);
1162  }
1163 }
1164 
1165 template<typename T, typename C, typename A>
1166 uint8_t quantiles_sketch<T, C, A>::lowest_zero_bit_starting_at(uint64_t bits, uint8_t starting_bit) {
1167  uint8_t pos = starting_bit & 0X3F;
1168  uint64_t my_bits = bits >> pos;
1169 
1170  while ((my_bits & static_cast<uint64_t>(1)) != 0) {
1171  my_bits >>= 1;
1172  pos++;
1173  }
1174  return pos;
1175 }
1176 
1177 template<typename T, typename C, typename A>
1178 class quantiles_sketch<T, C, A>::items_deleter {
1179  public:
1180  items_deleter(const A& allocator, bool destroy, size_t num): allocator_(allocator), destroy_(destroy), num_(num) {}
1181  void operator() (T* ptr) {
1182  if (ptr != nullptr) {
1183  if (destroy_) {
1184  for (size_t i = 0; i < num_; ++i) {
1185  ptr[i].~T();
1186  }
1187  }
1188  allocator_.deallocate(ptr, num_);
1189  }
1190  }
1191  void set_destroy(bool destroy) { destroy_ = destroy; }
1192  private:
1193  A allocator_;
1194  bool destroy_;
1195  size_t num_;
1196 };
1197 
1198 template<typename T, typename C, typename A>
1199 void quantiles_sketch<T, C, A>::setup_sorted_view() const {
1200  if (sorted_view_ == nullptr) {
1201  using AllocSortedView = typename std::allocator_traits<A>::template rebind_alloc<quantiles_sorted_view<T, C, A>>;
1202  sorted_view_ = new (AllocSortedView(allocator_).allocate(1)) quantiles_sorted_view<T, C, A>(get_sorted_view());
1203  }
1204 }
1205 
1206 template<typename T, typename C, typename A>
1207 void quantiles_sketch<T, C, A>::reset_sorted_view() {
1208  if (sorted_view_ != nullptr) {
1209  sorted_view_->~quantiles_sorted_view();
1210  using AllocSortedView = typename std::allocator_traits<A>::template rebind_alloc<quantiles_sorted_view<T, C, A>>;
1211  AllocSortedView(allocator_).deallocate(sorted_view_, 1);
1212  sorted_view_ = nullptr;
1213  }
1214 }
1215 
1216 // quantiles_sketch::const_iterator implementation
1217 
1218 template<typename T, typename C, typename A>
1219 quantiles_sketch<T, C, A>::const_iterator::const_iterator(const Level& base_buffer,
1220  const std::vector<Level, AllocLevel>& levels,
1221  uint16_t k,
1222  uint64_t n,
1223  bool is_end):
1224 base_buffer_(base_buffer),
1225 levels_(levels),
1226 level_(-1),
1227 index_(0),
1228 bb_count_(compute_base_buffer_items(k, n)),
1229 bit_pattern_(compute_bit_pattern(k, n)),
1230 weight_(1),
1231 k_(k)
1232 {
1233  if (is_end) {
1234  // if exact mode: index_ = n is end
1235  // if sampling, level_ = max_level + 1 and index_ = 0 is end
1236  if (bit_pattern_ == 0) // only a valid check for exact mode in constructor
1237  index_ = static_cast<uint32_t>(n);
1238  else
1239  level_ = static_cast<int>(levels_.size());
1240  } else { // find first non-empty item
1241  if (bb_count_ == 0 && bit_pattern_ > 0) {
1242  level_ = 0;
1243  weight_ = 2;
1244  while ((bit_pattern_ & 0x01) == 0) {
1245  weight_ *= 2;
1246  ++level_;
1247  bit_pattern_ >>= 1;
1248  }
1249  }
1250  }
1251 }
1252 
1253 template<typename T, typename C, typename A>
1254 typename quantiles_sketch<T, C, A>::const_iterator& quantiles_sketch<T, C, A>::const_iterator::operator++() {
1255  ++index_;
1256 
1257  if ((level_ == -1 && index_ == base_buffer_.size() && levels_.size() > 0) || (level_ >= 0 && index_ == k_)) { // go to the next non-empty level
1258  index_ = 0;
1259  do {
1260  ++level_;
1261  if (level_ > 0) bit_pattern_ = bit_pattern_ >> 1;
1262  if (bit_pattern_ == 0) return *this;
1263  weight_ *= 2;
1264  } while ((bit_pattern_ & static_cast<uint64_t>(1)) == 0);
1265  }
1266  return *this;
1267 }
1268 
1269 template<typename T, typename C, typename A>
1270 typename quantiles_sketch<T, C, A>::const_iterator& quantiles_sketch<T, C, A>::const_iterator::operator++(int) {
1271  const_iterator tmp(*this);
1272  operator++();
1273  return tmp;
1274 }
1275 
1276 template<typename T, typename C, typename A>
1277 bool quantiles_sketch<T, C, A>::const_iterator::operator==(const const_iterator& other) const {
1278  return level_ == other.level_ && index_ == other.index_;
1279 }
1280 
1281 template<typename T, typename C, typename A>
1282 bool quantiles_sketch<T, C, A>::const_iterator::operator!=(const const_iterator& other) const {
1283  return !operator==(other);
1284 }
1285 
1286 template<typename T, typename C, typename A>
1287 auto quantiles_sketch<T, C, A>::const_iterator::operator*() const -> reference {
1288  return value_type(level_ == -1 ? base_buffer_[index_] : levels_[level_][index_], weight_);
1289 }
1290 
1291 template<typename T, typename C, typename A>
1292 auto quantiles_sketch<T, C, A>::const_iterator::operator->() const -> pointer {
1293  return **this;
1294 }
1295 
1296 } /* namespace datasketches */
1297 
1298 #endif // _QUANTILES_SKETCH_IMPL_HPP_
This is a stochastic streaming sketch that enables near-real time analysis of the approximate distrib...
Definition: quantiles_sketch.hpp:153
Comparator get_comparator() const
Returns an instance of the comparator for this sketch.
Definition: quantiles_sketch_impl.hpp:681
quantiles_sorted_view< T, Comparator, Allocator > get_sorted_view() const
Gets the sorted view of this sketch.
Definition: quantiles_sketch_impl.hpp:723
quantiles_sketch & operator=(const quantiles_sketch &other)
Copy assignment.
Definition: quantiles_sketch_impl.hpp:89
vector_double get_CDF(const T *split_points, uint32_t size, bool inclusive=true) const
Returns an approximation to the Cumulative Distribution Function (CDF), which is the cumulative analo...
Definition: quantiles_sketch_impl.hpp:769
uint32_t get_num_retained() const
Returns the number of retained items (samples) in the sketch.
Definition: quantiles_sketch_impl.hpp:664
double get_normalized_rank_error(bool is_pmf) const
Gets the normalized rank error for this sketch.
Definition: quantiles_sketch_impl.hpp:711
vector_double get_PMF(const T *split_points, uint32_t size, bool inclusive=true) const
Returns an approximation to the Probability Mass Function (PMF) of the input stream given a set of sp...
Definition: quantiles_sketch_impl.hpp:762
void merge(FwdSk &&other)
Merges another sketch into this one.
Definition: quantiles_sketch_impl.hpp:235
void update(FwdT &&item)
Updates this sketch with the given data item.
Definition: quantiles_sketch_impl.hpp:212
void serialize(std::ostream &os, const SerDe &sd=SerDe()) const
This method serializes the sketch into a given stream in a binary form.
Definition: quantiles_sketch_impl.hpp:277
string< Allocator > to_string(bool print_levels=false, bool print_items=false) const
Prints a summary of the sketch.
Definition: quantiles_sketch_impl.hpp:595
uint16_t get_k() const
Returns configured parameter k.
Definition: quantiles_sketch_impl.hpp:644
bool is_empty() const
Returns true if this sketch is empty.
Definition: quantiles_sketch_impl.hpp:654
const T & get_max_item() const
Returns the max item of the stream.
Definition: quantiles_sketch_impl.hpp:675
static quantiles_sketch deserialize(std::istream &is, const SerDe &sd=SerDe(), const Comparator &comparator=Comparator(), const Allocator &allocator=Allocator())
This method deserializes a sketch from a given stream.
double get_rank(const T &item, bool inclusive=true) const
Returns an approximation to the normalized rank of the given item from 0 to 1, inclusive.
Definition: quantiles_sketch_impl.hpp:755
allocator_type get_allocator() const
Returns the allocator for this sketch.
Definition: quantiles_sketch_impl.hpp:686
quantile_return_type get_quantile(double rank, bool inclusive=true) const
Returns an approximation to the data item associated with the given rank of a hypothetical sorted ver...
Definition: quantiles_sketch_impl.hpp:744
const_iterator begin() const
Iterator pointing to the first item in the sketch.
Definition: quantiles_sketch_impl.hpp:866
size_t get_serialized_size_bytes(const SerDe &sd=SerDe()) const
Computes size needed to serialize the current state of the sketch.
Definition: quantiles_sketch_impl.hpp:693
bool is_estimation_mode() const
Returns true if this sketch is in estimation mode.
Definition: quantiles_sketch_impl.hpp:659
const T & get_min_item() const
Returns the min item of the stream.
Definition: quantiles_sketch_impl.hpp:669
const_iterator end() const
Iterator pointing to the past-the-end item in the sketch.
Definition: quantiles_sketch_impl.hpp:871
uint64_t get_n() const
Returns the length of the input stream.
Definition: quantiles_sketch_impl.hpp:649
const uint16_t MAX_K
maximum value of parameter K
Definition: quantiles_sketch.hpp:41
const uint16_t MIN_K
minimum value of parameter K
Definition: quantiles_sketch.hpp:39
DataSketches namespace.
Definition: binomial_bounds.hpp:38
Interface for serializing and deserializing items.
Definition: serde.hpp:34
void serialize(std::ostream &os, const T *items, unsigned num) const
Stream serialization.