datasketches-cpp
var_opt_sketch_impl.hpp
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef _VAR_OPT_SKETCH_IMPL_HPP_
21 #define _VAR_OPT_SKETCH_IMPL_HPP_
22 
23 #include <memory>
24 #include <sstream>
25 #include <cmath>
26 #include <random>
27 #include <algorithm>
28 #include <stdexcept>
29 
30 #include "var_opt_sketch.hpp"
31 #include "serde.hpp"
32 #include "bounds_binomial_proportions.hpp"
33 #include "count_zeros.hpp"
34 #include "memory_operations.hpp"
35 #include "ceiling_power_of_2.hpp"
36 
37 namespace datasketches {
38 
39 /*
40  * Implementation code for the VarOpt sketch.
41  *
42  * author Kevin Lang
43  * author Jon Malkin
44  */
45 template<typename T, typename A>
46 var_opt_sketch<T, A>::var_opt_sketch(uint32_t k, resize_factor rf, const A& allocator) :
47  var_opt_sketch(k, rf, false, allocator) {}
48 
49 template<typename T, typename A>
51  k_(other.k_),
52  h_(other.h_),
53  m_(other.m_),
54  r_(other.r_),
55  n_(other.n_),
56  total_wt_r_(other.total_wt_r_),
57  rf_(other.rf_),
58  curr_items_alloc_(other.curr_items_alloc_),
59  filled_data_(other.filled_data_),
60  allocator_(other.allocator_),
61  data_(nullptr),
62  weights_(nullptr),
63  num_marks_in_h_(other.num_marks_in_h_),
64  marks_(nullptr)
65  {
66  data_ = allocator_.allocate(curr_items_alloc_);
67  // skip gap or anything unused at the end
68  for (size_t i = 0; i < h_; ++i)
69  new (&data_[i]) T(other.data_[i]);
70  for (size_t i = h_ + 1; i < h_ + r_ + 1; ++i)
71  new (&data_[i]) T(other.data_[i]);
72 
73  // we skipped the gap
74  filled_data_ = false;
75 
76  weights_ = AllocDouble(allocator_).allocate(curr_items_alloc_);
77  // doubles so can successfully copy regardless of the internal state
78  std::copy(other.weights_, other.weights_ + curr_items_alloc_, weights_);
79 
80  if (other.marks_ != nullptr) {
81  marks_ = AllocBool(allocator_).allocate(curr_items_alloc_);
82  std::copy(other.marks_, other.marks_ + curr_items_alloc_, marks_);
83  }
84  }
85 
86 template<typename T, typename A>
87 var_opt_sketch<T, A>::var_opt_sketch(const var_opt_sketch& other, bool as_sketch, uint64_t adjusted_n) :
88  k_(other.k_),
89  h_(other.h_),
90  m_(other.m_),
91  r_(other.r_),
92  n_(adjusted_n),
93  total_wt_r_(other.total_wt_r_),
94  rf_(other.rf_),
95  curr_items_alloc_(other.curr_items_alloc_),
96  filled_data_(other.filled_data_),
97  allocator_(other.allocator_),
98  data_(nullptr),
99  weights_(nullptr),
100  num_marks_in_h_(other.num_marks_in_h_),
101  marks_(nullptr)
102  {
103  data_ = allocator_.allocate(curr_items_alloc_);
104  // skip gap or anything unused at the end
105  for (size_t i = 0; i < h_; ++i)
106  new (&data_[i]) T(other.data_[i]);
107  for (size_t i = h_ + 1; i < h_ + r_ + 1; ++i)
108  new (&data_[i]) T(other.data_[i]);
109 
110  // we skipped the gap
111  filled_data_ = false;
112 
113  weights_ = AllocDouble(allocator_).allocate(curr_items_alloc_);
114  // doubles so can successfully copy regardless of the internal state
115  std::copy(other.weights_, other.weights_ + curr_items_alloc_, weights_);
116 
117  if (!as_sketch && other.marks_ != nullptr) {
118  marks_ = AllocBool(allocator_).allocate(curr_items_alloc_);
119  std::copy(other.marks_, other.marks_ + curr_items_alloc_, marks_);
120  }
121  }
122 
123 template<typename T, typename A>
125  k_(other.k_),
126  h_(other.h_),
127  m_(other.m_),
128  r_(other.r_),
129  n_(other.n_),
130  total_wt_r_(other.total_wt_r_),
131  rf_(other.rf_),
132  curr_items_alloc_(other.curr_items_alloc_),
133  filled_data_(other.filled_data_),
134  allocator_(other.allocator_),
135  data_(other.data_),
136  weights_(other.weights_),
137  num_marks_in_h_(other.num_marks_in_h_),
138  marks_(other.marks_)
139  {
140  other.data_ = nullptr;
141  other.weights_ = nullptr;
142  other.marks_ = nullptr;
143  }
144 
145 template<typename T, typename A>
146 var_opt_sketch<T, A>::var_opt_sketch(uint32_t k, resize_factor rf, bool is_gadget, const A& allocator) :
147  k_(k), h_(0), m_(0), r_(0), n_(0), total_wt_r_(0.0), rf_(rf), allocator_(allocator) {
148  if (k == 0 || k_ > MAX_K) {
149  throw std::invalid_argument("k must be at least 1 and less than 2^31 - 1");
150  }
151 
152  uint32_t ceiling_lg_k = to_log_2(ceiling_power_of_2(k_));
153  uint32_t initial_lg_size = starting_sub_multiple(ceiling_lg_k, rf_, MIN_LG_ARR_ITEMS);
154  curr_items_alloc_ = get_adjusted_size(k_, 1 << initial_lg_size);
155  if (curr_items_alloc_ == k_) { // if full size, need to leave 1 for the gap
156  ++curr_items_alloc_;
157  }
158 
159  allocate_data_arrays(curr_items_alloc_, is_gadget);
160  num_marks_in_h_ = 0;
161 }
162 
163 template<typename T, typename A>
164 var_opt_sketch<T, A>::var_opt_sketch(uint32_t k, uint32_t h, uint32_t m, uint32_t r, uint64_t n, double total_wt_r, resize_factor rf,
165  uint32_t curr_items_alloc, bool filled_data, std::unique_ptr<T, items_deleter> items,
166  std::unique_ptr<double, weights_deleter> weights, uint32_t num_marks_in_h,
167  std::unique_ptr<bool, marks_deleter> marks, const A& allocator) :
168  k_(k),
169  h_(h),
170  m_(m),
171  r_(r),
172  n_(n),
173  total_wt_r_(total_wt_r),
174  rf_(rf),
175  curr_items_alloc_(curr_items_alloc),
176  filled_data_(filled_data),
177  allocator_(allocator),
178  data_(items.release()),
179  weights_(weights.release()),
180  num_marks_in_h_(num_marks_in_h),
181  marks_(marks.release())
182 {}
183 
184 
185 template<typename T, typename A>
187  if (data_ != nullptr) {
188  if (filled_data_) {
189  // destroy everything
190  const size_t num_to_destroy = std::min(k_ + 1, curr_items_alloc_);
191  for (size_t i = 0; i < num_to_destroy; ++i) {
192  data_[i].~T();
193  }
194  } else {
195  // skip gap or anything unused at the end
196  for (size_t i = 0; i < h_; ++i) {
197  data_[i].~T();
198  }
199 
200  for (size_t i = h_ + 1; i < h_ + r_ + 1; ++i) {
201  data_[i].~T();
202  }
203  }
204  allocator_.deallocate(data_, curr_items_alloc_);
205  }
206 
207  if (weights_ != nullptr) {
208  AllocDouble(allocator_).deallocate(weights_, curr_items_alloc_);
209  }
210 
211  if (marks_ != nullptr) {
212  AllocBool(allocator_).deallocate(marks_, curr_items_alloc_);
213  }
214 }
215 
216 template<typename T, typename A>
218  var_opt_sketch sk_copy(other);
219  std::swap(k_, sk_copy.k_);
220  std::swap(h_, sk_copy.h_);
221  std::swap(m_, sk_copy.m_);
222  std::swap(r_, sk_copy.r_);
223  std::swap(n_, sk_copy.n_);
224  std::swap(total_wt_r_, sk_copy.total_wt_r_);
225  std::swap(rf_, sk_copy.rf_);
226  std::swap(curr_items_alloc_, sk_copy.curr_items_alloc_);
227  std::swap(filled_data_, sk_copy.filled_data_);
228  std::swap(allocator_, sk_copy.allocator_);
229  std::swap(data_, sk_copy.data_);
230  std::swap(weights_, sk_copy.weights_);
231  std::swap(num_marks_in_h_, sk_copy.num_marks_in_h_);
232  std::swap(marks_, sk_copy.marks_);
233  return *this;
234 }
235 
236 template<typename T, typename A>
238  std::swap(k_, other.k_);
239  std::swap(h_, other.h_);
240  std::swap(m_, other.m_);
241  std::swap(r_, other.r_);
242  std::swap(n_, other.n_);
243  std::swap(total_wt_r_, other.total_wt_r_);
244  std::swap(rf_, other.rf_);
245  std::swap(curr_items_alloc_, other.curr_items_alloc_);
246  std::swap(filled_data_, other.filled_data_);
247  std::swap(allocator_, other.allocator_);
248  std::swap(data_, other.data_);
249  std::swap(weights_, other.weights_);
250  std::swap(num_marks_in_h_, other.num_marks_in_h_);
251  std::swap(marks_, other.marks_);
252  return *this;
253 }
254 
255 /*
256  * An empty sketch requires 8 bytes.
257  *
258  * <pre>
259  * Long || Start Byte Adr:
260  * Adr:
261  * || 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
262  * 0 || Preamble_Longs | SerVer | FamID | Flags |---------Max Res. Size (K)---------|
263  * </pre>
264  *
265  * A non-empty sketch requires 24 bytes of preamble for an under-full sample; once there are
266  * at least k items the sketch uses 32 bytes of preamble.
267  *
268  * The count of items seen is limited to 48 bits (~256 trillion) even though there are adjacent
269  * unused preamble bits. The acceptance probability for an item is a double in the range [0,1),
270  * limiting us to 53 bits of randomness due to details of the IEEE floating point format. To
271  * ensure meaningful probabilities as the items seen count approaches capacity, we intentionally
272  * use slightly fewer bits.
273  *
274  * Following the header are weights for the heavy items, then marks in the event this is a gadget.
275  * The serialized items come last.
276  *
277  * <pre>
278  * Long || Start Byte Adr:
279  * Adr:
280  * || 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
281  * 0 || Preamble_Longs | SerVer | FamID | Flags |---------Max Res. Size (K)---------|
282  *
283  * || 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 |
284  * 1 ||---------------------------Items Seen Count (N)--------------------------------|
285  *
286  * || 16 | 17 | 18 | 19 | 20 | 21 | 22 | 23 |
287  * 2 ||-------------Item Count in H---------------|-------Item Count in R-------------|
288  *
289  * || 24 | 25 | 26 | 27 | 28 | 29 | 30 | 31 |
290  * 3 ||-------------------------------Total Weight in R-------------------------------|
291  * </pre>
292  */
293 
294 // implementation for fixed-size arithmetic types (integral and floating point)
295 template<typename T, typename A>
296 template<typename TT, typename SerDe, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
298  if (is_empty()) { return PREAMBLE_LONGS_EMPTY << 3; }
299  size_t num_bytes = (r_ == 0 ? PREAMBLE_LONGS_WARMUP : PREAMBLE_LONGS_FULL) << 3;
300  num_bytes += h_ * sizeof(double); // weights
301  if (marks_ != nullptr) { // marks
302  num_bytes += (h_ / 8) + (h_ % 8 > 0);
303  }
304  num_bytes += (h_ + r_) * sizeof(TT); // the actual items
305  return num_bytes;
306 }
307 
308 // implementation for all other types
309 template<typename T, typename A>
310 template<typename TT, typename SerDe, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
311 size_t var_opt_sketch<T, A>::get_serialized_size_bytes(const SerDe& sd) const {
312  if (is_empty()) { return PREAMBLE_LONGS_EMPTY << 3; }
313  size_t num_bytes = (r_ == 0 ? PREAMBLE_LONGS_WARMUP : PREAMBLE_LONGS_FULL) << 3;
314  num_bytes += h_ * sizeof(double); // weights
315  if (marks_ != nullptr) { // marks
316  num_bytes += (h_ / 8) + (h_ % 8 > 0);
317  }
318  // must iterate over the items
319  for (auto it: *this)
320  num_bytes += sd.size_of_item(it.first);
321  return num_bytes;
322 }
323 
324 template<typename T, typename A>
325 template<typename SerDe>
326 std::vector<uint8_t, AllocU8<A>> var_opt_sketch<T, A>::serialize(unsigned header_size_bytes, const SerDe& sd) const {
327  const size_t size = header_size_bytes + get_serialized_size_bytes(sd);
328  std::vector<uint8_t, AllocU8<A>> bytes(size, 0, allocator_);
329  uint8_t* ptr = bytes.data() + header_size_bytes;
330  uint8_t* end_ptr = ptr + size;
331 
332  bool empty = is_empty();
333  uint8_t preLongs = (empty ? PREAMBLE_LONGS_EMPTY
334  : (r_ == 0 ? PREAMBLE_LONGS_WARMUP : PREAMBLE_LONGS_FULL));
335  uint8_t first_byte = (preLongs & 0x3F) | ((static_cast<uint8_t>(rf_)) << 6);
336  uint8_t flags = (marks_ != nullptr ? GADGET_FLAG_MASK : 0);
337 
338  if (empty) {
339  flags |= EMPTY_FLAG_MASK;
340  }
341 
342  // first prelong
343  uint8_t ser_ver(SER_VER);
344  uint8_t family(FAMILY_ID);
345  ptr += copy_to_mem(first_byte, ptr);
346  ptr += copy_to_mem(ser_ver, ptr);
347  ptr += copy_to_mem(family, ptr);
348  ptr += copy_to_mem(flags, ptr);
349  ptr += copy_to_mem(k_, ptr);
350 
351  if (!empty) {
352  // second and third prelongs
353  ptr += copy_to_mem(n_, ptr);
354  ptr += copy_to_mem(h_, ptr);
355  ptr += copy_to_mem(r_, ptr);
356 
357  // fourth prelong, if needed
358  if (r_ > 0) {
359  ptr += copy_to_mem(total_wt_r_, ptr);
360  }
361 
362  // first h_ weights
363  ptr += copy_to_mem(weights_, ptr, h_ * sizeof(double));
364 
365  // first h_ marks as packed bytes iff we have a gadget
366  if (marks_ != nullptr) {
367  uint8_t val = 0;
368  for (uint32_t i = 0; i < h_; ++i) {
369  if (marks_[i]) {
370  val |= 0x1 << (i & 0x7);
371  }
372 
373  if ((i & 0x7) == 0x7) {
374  ptr += copy_to_mem(val, ptr);
375  val = 0;
376  }
377  }
378 
379  // write out any remaining values
380  if ((h_ & 0x7) > 0) {
381  ptr += copy_to_mem(val, ptr);
382  }
383  }
384 
385  // write the sample items, skipping the gap. Either h_ or r_ may be 0
386  ptr += sd.serialize(ptr, end_ptr - ptr, data_, h_);
387  ptr += sd.serialize(ptr, end_ptr - ptr, &data_[h_ + 1], r_);
388  }
389 
390  size_t bytes_written = ptr - bytes.data();
391  if (bytes_written != size) {
392  throw std::logic_error("serialized size mismatch: " + std::to_string(bytes_written) + " != " + std::to_string(size));
393  }
394 
395  return bytes;
396 }
397 
398 template<typename T, typename A>
399 template<typename SerDe>
400 void var_opt_sketch<T, A>::serialize(std::ostream& os, const SerDe& sd) const {
401  const bool empty = (h_ == 0) && (r_ == 0);
402 
403  const uint8_t preLongs = (empty ? PREAMBLE_LONGS_EMPTY
404  : (r_ == 0 ? PREAMBLE_LONGS_WARMUP : PREAMBLE_LONGS_FULL));
405  const uint8_t first_byte = (preLongs & 0x3F) | ((static_cast<uint8_t>(rf_)) << 6);
406  uint8_t flags = (marks_ != nullptr ? GADGET_FLAG_MASK : 0);
407 
408  if (empty) {
409  flags |= EMPTY_FLAG_MASK;
410  }
411 
412  // first prelong
413  const uint8_t ser_ver(SER_VER);
414  const uint8_t family(FAMILY_ID);
415  write(os, first_byte);
416  write(os, ser_ver);
417  write(os, family);
418  write(os, flags);
419  write(os, k_);
420 
421  if (!empty) {
422  // second and third prelongs
423  write(os, n_);
424  write(os, h_);
425  write(os, r_);
426 
427  // fourth prelong, if needed
428  if (r_ > 0) {
429  write(os, total_wt_r_);
430  }
431 
432  // write the first h_ weights
433  write(os, weights_, h_ * sizeof(double));
434 
435  // write the first h_ marks as packed bytes iff we have a gadget
436  if (marks_ != nullptr) {
437  uint8_t val = 0;
438  for (uint32_t i = 0; i < h_; ++i) {
439  if (marks_[i]) {
440  val |= 0x1 << (i & 0x7);
441  }
442 
443  if ((i & 0x7) == 0x7) {
444  write(os, val);
445  val = 0;
446  }
447  }
448 
449  // write out any remaining values
450  if ((h_ & 0x7) > 0) {
451  write(os, val);
452  }
453  }
454 
455  // write the sample items, skipping the gap. Either h_ or r_ may be 0
456  sd.serialize(os, data_, h_);
457  sd.serialize(os, &data_[h_ + 1], r_);
458  }
459 }
460 
461 template<typename T, typename A>
462 template<typename SerDe>
463 var_opt_sketch<T, A> var_opt_sketch<T, A>::deserialize(const void* bytes, size_t size, const SerDe& sd, const A& allocator) {
464  ensure_minimum_memory(size, 8);
465  const char* ptr = static_cast<const char*>(bytes);
466  const char* base = ptr;
467  const char* end_ptr = ptr + size;
468  uint8_t first_byte;
469  ptr += copy_from_mem(ptr, first_byte);
470  uint8_t preamble_longs = first_byte & 0x3f;
471  resize_factor rf = static_cast<resize_factor>((first_byte >> 6) & 0x03);
472  uint8_t serial_version;
473  ptr += copy_from_mem(ptr, serial_version);
474  uint8_t family_id;
475  ptr += copy_from_mem(ptr, family_id);
476  uint8_t flags;
477  ptr += copy_from_mem(ptr, flags);
478  uint32_t k;
479  ptr += copy_from_mem(ptr, k);
480 
481  check_preamble_longs(preamble_longs, flags);
482  check_family_and_serialization_version(family_id, serial_version);
483  ensure_minimum_memory(size, preamble_longs << 3);
484 
485  const bool is_empty = flags & EMPTY_FLAG_MASK;
486  const bool is_gadget = flags & GADGET_FLAG_MASK;
487 
488  if (is_empty) {
489  return var_opt_sketch(k, rf, is_gadget, allocator);
490  }
491 
492  // second and third prelongs
493  uint64_t n;
494  uint32_t h, r;
495  ptr += copy_from_mem(ptr, n);
496  ptr += copy_from_mem(ptr, h);
497  ptr += copy_from_mem(ptr, r);
498 
499  const uint32_t array_size = validate_and_get_target_size(preamble_longs, k, n, h, r, rf);
500 
501  // current_items_alloc_ is set but validate R region weight (4th prelong), if needed, before allocating
502  double total_wt_r = 0.0;
503  if (preamble_longs == PREAMBLE_LONGS_FULL) {
504  ptr += copy_from_mem(ptr, total_wt_r);
505  if (std::isnan(total_wt_r) || r == 0 || total_wt_r <= 0.0) {
506  throw std::invalid_argument("Possible corruption: deserializing in full mode but r = 0 or invalid R weight. "
507  "Found r = " + std::to_string(r) + ", R region weight = " + std::to_string(total_wt_r));
508  }
509  } else {
510  total_wt_r = 0.0;
511  }
512 
513  // read the first h_ weights, fill in rest of array with -1.0
514  check_memory_size(ptr - base + (h * sizeof(double)), size);
515  std::unique_ptr<double, weights_deleter> weights(AllocDouble(allocator).allocate(array_size),
516  weights_deleter(array_size, allocator));
517  double* wts = weights.get(); // to avoid lots of .get() calls -- do not delete
518  ptr += copy_from_mem(ptr, wts, h * sizeof(double));
519  for (size_t i = 0; i < h; ++i) {
520  if (!(wts[i] > 0.0)) {
521  throw std::invalid_argument("Possible corruption: Non-positive weight when deserializing: " + std::to_string(wts[i]));
522  }
523  }
524  std::fill(wts + h, wts + array_size, -1.0);
525 
526  // read the first h_ marks as packed bytes iff we have a gadget
527  uint32_t num_marks_in_h = 0;
528  std::unique_ptr<bool, marks_deleter> marks(nullptr, marks_deleter(array_size, allocator));
529  if (is_gadget) {
530  uint8_t val = 0;
531  marks = std::unique_ptr<bool, marks_deleter>(AllocBool(allocator).allocate(array_size), marks_deleter(array_size, allocator));
532  const size_t size_marks = (h / 8) + (h % 8 > 0 ? 1 : 0);
533  check_memory_size(ptr - base + size_marks, size);
534  for (uint32_t i = 0; i < h; ++i) {
535  if ((i & 0x7) == 0x0) { // should trigger on first iteration
536  ptr += copy_from_mem(ptr, val);
537  }
538  marks.get()[i] = ((val >> (i & 0x7)) & 0x1) == 1;
539  num_marks_in_h += (marks.get()[i] ? 1 : 0);
540  }
541  }
542 
543  // read the sample items, skipping the gap. Either h_ or r_ may be 0
544  items_deleter deleter(array_size, allocator);
545  std::unique_ptr<T, items_deleter> items(A(allocator).allocate(array_size), deleter);
546 
547  ptr += sd.deserialize(ptr, end_ptr - ptr, items.get(), h);
548  items.get_deleter().set_h(h); // serde didn't throw, so the items are now valid
549 
550  ptr += sd.deserialize(ptr, end_ptr - ptr, &(items.get()[h + 1]), r);
551  items.get_deleter().set_r(r); // serde didn't throw, so the items are now valid
552 
553  return var_opt_sketch(k, h, (r > 0 ? 1 : 0), r, n, total_wt_r, rf, array_size, false,
554  std::move(items), std::move(weights), num_marks_in_h, std::move(marks), allocator);
555 }
556 
557 template<typename T, typename A>
558 template<typename SerDe>
559 var_opt_sketch<T, A> var_opt_sketch<T, A>::deserialize(std::istream& is, const SerDe& sd, const A& allocator) {
560  const auto first_byte = read<uint8_t>(is);
561  uint8_t preamble_longs = first_byte & 0x3f;
562  const resize_factor rf = static_cast<resize_factor>((first_byte >> 6) & 0x03);
563  const auto serial_version = read<uint8_t>(is);
564  const auto family_id = read<uint8_t>(is);
565  const auto flags = read<uint8_t>(is);
566  const auto k = read<uint32_t>(is);
567 
568  check_preamble_longs(preamble_longs, flags);
569  check_family_and_serialization_version(family_id, serial_version);
570 
571  const bool is_empty = flags & EMPTY_FLAG_MASK;
572  const bool is_gadget = flags & GADGET_FLAG_MASK;
573 
574  if (is_empty) {
575  if (!is.good())
576  throw std::runtime_error("error reading from std::istream");
577  else
578  return var_opt_sketch(k, rf, is_gadget, allocator);
579  }
580 
581  // second and third prelongs
582  const auto n = read<uint64_t>(is);
583  const auto h = read<uint32_t>(is);
584  const auto r = read<uint32_t>(is);
585 
586  const uint32_t array_size = validate_and_get_target_size(preamble_longs, k, n, h, r, rf);
587 
588  // current_items_alloc_ is set but validate R region weight (4th prelong), if needed, before allocating
589  double total_wt_r = 0.0;
590  if (preamble_longs == PREAMBLE_LONGS_FULL) {
591  total_wt_r = read<double>(is);
592  if (std::isnan(total_wt_r) || r == 0 || total_wt_r <= 0.0) {
593  throw std::invalid_argument("Possible corruption: deserializing in full mode but r = 0 or invalid R weight. "
594  "Found r = " + std::to_string(r) + ", R region weight = " + std::to_string(total_wt_r));
595  }
596  }
597 
598  // read the first h weights, fill remainder with -1.0
599  std::unique_ptr<double, weights_deleter> weights(AllocDouble(allocator).allocate(array_size),
600  weights_deleter(array_size, allocator));
601  double* wts = weights.get(); // to avoid lots of .get() calls -- do not delete
602  read(is, wts, h * sizeof(double));
603  for (size_t i = 0; i < h; ++i) {
604  if (!(wts[i] > 0.0)) {
605  throw std::invalid_argument("Possible corruption: Non-positive weight when deserializing: " + std::to_string(wts[i]));
606  }
607  }
608  std::fill(wts + h, wts + array_size, -1.0);
609 
610  // read the first h_ marks as packed bytes iff we have a gadget
611  uint32_t num_marks_in_h = 0;
612  std::unique_ptr<bool, marks_deleter> marks(nullptr, marks_deleter(array_size, allocator));
613  if (is_gadget) {
614  marks = std::unique_ptr<bool, marks_deleter>(AllocBool(allocator).allocate(array_size), marks_deleter(array_size, allocator));
615  uint8_t val = 0;
616  for (uint32_t i = 0; i < h; ++i) {
617  if ((i & 0x7) == 0x0) { // should trigger on first iteration
618  val = read<uint8_t>(is);
619  }
620  marks.get()[i] = ((val >> (i & 0x7)) & 0x1) == 1;
621  num_marks_in_h += (marks.get()[i] ? 1 : 0);
622  }
623  }
624 
625  // read the sample items, skipping the gap. Either h or r may be 0
626  items_deleter deleter(array_size, allocator);
627  std::unique_ptr<T, items_deleter> items(A(allocator).allocate(array_size), deleter);
628 
629  sd.deserialize(is, items.get(), h); // aka &data_[0]
630  items.get_deleter().set_h(h); // serde didn't throw, so the items are now valid
631 
632  sd.deserialize(is, &(items.get()[h + 1]), r);
633  items.get_deleter().set_r(r); // serde didn't throw, so the items are now valid
634 
635  if (!is.good())
636  throw std::runtime_error("error reading from std::istream");
637 
638  return var_opt_sketch(k, h, (r > 0 ? 1 : 0), r, n, total_wt_r, rf, array_size, false,
639  std::move(items), std::move(weights), num_marks_in_h, std::move(marks), allocator);
640 }
641 
642 template<typename T, typename A>
644  return (h_ == 0 && r_ == 0);
645 }
646 
647 template<typename T, typename A>
649  const uint32_t prev_alloc = curr_items_alloc_;
650  const uint32_t ceiling_lg_k = to_log_2(ceiling_power_of_2(k_));
651  const uint32_t initial_lg_size = starting_sub_multiple(ceiling_lg_k, rf_, MIN_LG_ARR_ITEMS);
652  curr_items_alloc_ = get_adjusted_size(k_, 1 << initial_lg_size);
653  if (curr_items_alloc_ == k_) { // if full size, need to leave 1 for the gap
654  ++curr_items_alloc_;
655  }
656 
657  if (filled_data_) {
658  // destroy everything
659  const size_t num_to_destroy = std::min(k_ + 1, prev_alloc);
660  for (size_t i = 0; i < num_to_destroy; ++i)
661  data_[i].~T();
662  } else {
663  // skip gap or anything unused at the end
664  for (size_t i = 0; i < h_; ++i)
665  data_[i].~T();
666 
667  for (size_t i = h_ + 1; i < h_ + r_ + 1; ++i)
668  data_[i].~T();
669  }
670 
671  if (curr_items_alloc_ < prev_alloc) {
672  const bool is_gadget = (marks_ != nullptr);
673 
674  allocator_.deallocate(data_, prev_alloc);
675  AllocDouble(allocator_).deallocate(weights_, prev_alloc);
676 
677  if (marks_ != nullptr)
678  AllocBool(allocator_).deallocate(marks_, prev_alloc);
679 
680  allocate_data_arrays(curr_items_alloc_, is_gadget);
681  }
682 
683  n_ = 0;
684  h_ = 0;
685  m_ = 0;
686  r_ = 0;
687  num_marks_in_h_ = 0;
688  total_wt_r_ = 0.0;
689  filled_data_ = false;
690 }
691 
692 template<typename T, typename A>
693 uint64_t var_opt_sketch<T, A>::get_n() const {
694  return n_;
695 }
696 
697 template<typename T, typename A>
698 uint32_t var_opt_sketch<T, A>::get_k() const {
699  return k_;
700 }
701 
702 template<typename T, typename A>
704  const uint32_t num_in_sketch = h_ + r_;
705  return (num_in_sketch < k_ ? num_in_sketch : k_);
706 }
707 
708 template<typename T, typename A>
709 void var_opt_sketch<T, A>::update(const T& item, double weight) {
710  update(item, weight, false);
711 }
712 
713 template<typename T, typename A>
714 void var_opt_sketch<T, A>::update(T&& item, double weight) {
715  update(std::move(item), weight, false);
716 }
717 
718 template<typename T, typename A>
720  // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements.
721  // The stream does not support passing an allocator instance, and alternatives are complicated.
722  std::ostringstream os;
723  os << "### VarOpt SUMMARY:" << std::endl;
724  os << " k : " << k_ << std::endl;
725  os << " h : " << h_ << std::endl;
726  os << " r : " << r_ << std::endl;
727  os << " weight_r : " << total_wt_r_ << std::endl;
728  os << " Current size : " << curr_items_alloc_ << std::endl;
729  os << " Resize factor: " << (1 << rf_) << std::endl;
730  os << "### END SKETCH SUMMARY" << std::endl;
731  return string<A>(os.str().c_str(), allocator_);
732 }
733 
734 template<typename T, typename A>
736  // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements.
737  // The stream does not support passing an allocator instance, and alternatives are complicated.
738  std::ostringstream os;
739  os << "### Sketch Items" << std::endl;
740  int idx = 0;
741  for (auto record : *this) {
742  os << idx << ": " << record.first << "\twt = " << record.second << std::endl;
743  ++idx;
744  }
745  return string<A>(os.str().c_str(), allocator_);
746 }
747 
748 template<typename T, typename A>
749 string<A> var_opt_sketch<T, A>::items_to_string(bool print_gap) const {
750  // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements.
751  // The stream does not support passing an allocator instance, and alternatives are complicated.
752  std::ostringstream os;
753  os << "### Sketch Items" << std::endl;
754  const uint32_t array_length = (n_ < k_ ? n_ : k_ + 1);
755  for (uint32_t i = 0, display_idx = 0; i < array_length; ++i) {
756  if (i == h_ && print_gap) {
757  os << display_idx << ": GAP" << std::endl;
758  ++display_idx;
759  } else {
760  os << display_idx << ": " << data_[i] << "\twt = ";
761  if (weights_[i] == -1.0) {
762  os << get_tau() << "\t(-1.0)" << std::endl;
763  } else {
764  os << weights_[i] << std::endl;
765  }
766  ++display_idx;
767  }
768  }
769  return string<A>(os.str().c_str(), allocator_);
770 }
771 
772 template<typename T, typename A>
773 template<typename O>
774 void var_opt_sketch<T, A>::update(O&& item, double weight, bool mark) {
775  if (weight < 0.0 || std::isnan(weight) || std::isinf(weight)) {
776  throw std::invalid_argument("Item weights must be nonnegative and finite. Found: "
777  + std::to_string(weight));
778  } else if (weight == 0.0) {
779  return;
780  }
781  ++n_;
782 
783  if (r_ == 0) {
784  // exact mode
785  update_warmup_phase(std::forward<O>(item), weight, mark);
786  } else {
787  // sketch is in estimation mode so we can make the following check,
788  // although very conservative to check every time
789  if ((h_ != 0) && (peek_min() < get_tau()))
790  throw std::logic_error("sketch not in valid estimation mode");
791 
792  // what tau would be if deletion candidates turn out to be R plus the new item
793  // note: (r_ + 1) - 1 is intentional
794  const double hypothetical_tau = (weight + total_wt_r_) / ((r_ + 1) - 1);
795 
796  // is new item's turn to be considered for reservoir?
797  const double condition1 = (h_ == 0) || (weight <= peek_min());
798 
799  // is new item light enough for reservoir?
800  const double condition2 = weight < hypothetical_tau;
801 
802  if (condition1 && condition2) {
803  update_light(std::forward<O>(item), weight, mark);
804  } else if (r_ == 1) {
805  update_heavy_r_eq1(std::forward<O>(item), weight, mark);
806  } else {
807  update_heavy_general(std::forward<O>(item), weight, mark);
808  }
809  }
810 }
811 
812 template<typename T, typename A>
813 template<typename O>
814 void var_opt_sketch<T, A>::update_warmup_phase(O&& item, double weight, bool mark) {
815  // seems overly cautious
816  if (r_ > 0 || m_ != 0 || h_ > k_) throw std::logic_error("invalid sketch state during warmup");
817 
818  if (h_ >= curr_items_alloc_) {
819  grow_data_arrays();
820  }
821 
822  // store items as they come in until full
823  new (&data_[h_]) T(std::forward<O>(item));
824  weights_[h_] = weight;
825  if (marks_ != nullptr) {
826  marks_[h_] = mark;
827  }
828  ++h_;
829  num_marks_in_h_ += mark ? 1 : 0;
830 
831  // check if need to heapify
832  if (h_ > k_) {
833  filled_data_ = true;
834  transition_from_warmup();
835  }
836 }
837 
838 /* In the "light" case the new item has weight <= old_tau, so
839  would appear to the right of the R items in a hypothetical reverse-sorted
840  list. It is easy to prove that it is light enough to be part of this
841  round's downsampling */
842 template<typename T, typename A>
843 template<typename O>
844 void var_opt_sketch<T, A>::update_light(O&& item, double weight, bool mark) {
845  if (r_ == 0 || (r_ + h_) != k_) throw std::logic_error("invalid sketch state during light warmup");
846 
847  const uint32_t m_slot = h_; // index of the gap, which becomes the M region
848  if (filled_data_) {
849  if (&data_[m_slot] != &item)
850  data_[m_slot] = std::forward<O>(item);
851  } else {
852  new (&data_[m_slot]) T(std::forward<O>(item));
853  filled_data_ = true;
854  }
855  weights_[m_slot] = weight;
856  if (marks_ != nullptr) { marks_[m_slot] = mark; }
857  ++m_;
858 
859  grow_candidate_set(total_wt_r_ + weight, r_ + 1);
860 }
861 
862 /* In the "heavy" case the new item has weight > old_tau, so would
863  appear to the left of items in R in a hypothetical reverse-sorted list and
864  might or might not be light enough be part of this round's downsampling.
865  [After first splitting off the R=1 case] we greatly simplify the code by
866  putting the new item into the H heap whether it needs to be there or not.
867  In other words, it might go into the heap and then come right back out,
868  but that should be okay because pseudo_heavy items cannot predominate
869  in long streams unless (max wt) / (min wt) > o(exp(N)) */
870 template<typename T, typename A>
871 template<typename O>
872 void var_opt_sketch<T, A>::update_heavy_general(O&& item, double weight, bool mark) {
873  if (r_ < 2 || m_ != 0 || (r_ + h_) != k_) throw std::logic_error("invalid sketch state during heavy general update");
874 
875  // put into H, although may come back out momentarily
876  push(std::forward<O>(item), weight, mark);
877 
878  grow_candidate_set(total_wt_r_, r_);
879 }
880 
881 /* The analysis of this case is similar to that of the general heavy case.
882  The one small technical difference is that since R < 2, we must grab an M item
883  to have a valid starting point for continue_by_growing_candidate_set () */
884 template<typename T, typename A>
885 template<typename O>
886 void var_opt_sketch<T, A>::update_heavy_r_eq1(O&& item, double weight, bool mark) {
887  if (r_ != 1 || m_ != 0 || (r_ + h_) != k_) throw std::logic_error("invalid sketch state during heavy r=1 update");
888 
889  push(std::forward<O>(item), weight, mark); // new item into H
890  pop_min_to_m_region(); // pop lightest back into M
891 
892  // Any set of two items is downsample-able to one item,
893  // so the two lightest items are a valid starting point for the following
894  const uint32_t m_slot = k_ - 1; // array is k+1, 1 in R, so slot before is M
895  grow_candidate_set(weights_[m_slot] + total_wt_r_, 2);
896 }
897 
898 /*
899  * Decreases sketch's value of k by 1, updating stored values as needed.
900  *
901  * <p>Subject to certain pre-conditions, decreasing k causes tau to increase. This fact is used by
902  * the unioning algorithm to force "marked" items out of H and into the reservoir region.</p>
903  */
904 template<typename T, typename A>
905 void var_opt_sketch<T, A>::decrease_k_by_1() {
906  if (k_ <= 1) {
907  throw std::logic_error("Cannot decrease k below 1 in union");
908  }
909 
910  if ((h_ == 0) && (r_ == 0)) {
911  // exact mode, but no data yet; this reduction is somewhat gratuitous
912  --k_;
913  } else if ((h_ > 0) && (r_ == 0)) {
914  // exact mode, but we have some data
915  --k_;
916  if (h_ > k_) {
917  transition_from_warmup();
918  }
919  } else if ((h_ > 0) && (r_ > 0)) {
920  // reservoir mode, but we have some exact samples.
921  // Our strategy will be to pull an item out of H (which we are allowed to do since it's
922  // still just data), reduce k, and then re-insert the item
923 
924  // first, slide the R zone to the left by 1, temporarily filling the gap
925  const uint32_t old_gap_idx = h_;
926  const uint32_t old_final_r_idx = (h_ + 1 + r_) - 1;
927  if (old_final_r_idx != k_) throw std::logic_error("gadget in invalid state");
928 
929  swap_values(old_final_r_idx, old_gap_idx);
930  filled_data_ = true; // we just filled the gap, and no need to check previous state
931 
932  // now we pull an item out of H; any item is ok, but if we grab the rightmost and then
933  // reduce h_, the heap invariant will be preserved (and the gap will be restored), plus
934  // the push() of the item that will probably happen later will be cheap.
935 
936  const uint32_t pulled_idx = h_ - 1;
937  double pulled_weight = weights_[pulled_idx];
938  bool pulled_mark = marks_[pulled_idx];
939  // will move the pulled item below; don't do antying to it here
940 
941  if (pulled_mark) { --num_marks_in_h_; }
942  weights_[pulled_idx] = -1.0; // to make bugs easier to spot
943 
944  --h_;
945  --k_;
946  --n_; // will be re-incremented with the update
947 
948  update(std::move(data_[pulled_idx]), pulled_weight, pulled_mark);
949  } else if ((h_ == 0) && (r_ > 0)) {
950  // pure reservoir mode, so can simply eject a randomly chosen sample from the reservoir
951  if (r_ < 2) throw std::logic_error("r_ too small for pure reservoir mode");
952 
953  const uint32_t r_idx_to_delete = 1 + next_int(r_); // 1 for the gap
954  const uint32_t rightmost_r_idx = (1 + r_) - 1;
955  swap_values(r_idx_to_delete, rightmost_r_idx);
956  weights_[rightmost_r_idx] = -1.0;
957 
958  --k_;
959  --r_;
960  }
961 }
962 
963 template<typename T, typename A>
964 void var_opt_sketch<T, A>::allocate_data_arrays(uint32_t tgt_size, bool use_marks) {
965  filled_data_ = false;
966 
967  data_ = allocator_.allocate(tgt_size);
968  weights_ = AllocDouble(allocator_).allocate(tgt_size);
969 
970  if (use_marks) {
971  marks_ = AllocBool(allocator_).allocate(tgt_size);
972  } else {
973  marks_ = nullptr;
974  }
975 }
976 
977 template<typename T, typename A>
978 void var_opt_sketch<T, A>::grow_data_arrays() {
979  const uint32_t prev_size = curr_items_alloc_;
980  curr_items_alloc_ = get_adjusted_size(k_, curr_items_alloc_ << rf_);
981  if (curr_items_alloc_ == k_) {
982  ++curr_items_alloc_;
983  }
984 
985  if (prev_size < curr_items_alloc_) {
986  filled_data_ = false;
987 
988  T* tmp_data = allocator_.allocate(curr_items_alloc_);
989  double* tmp_weights = AllocDouble(allocator_).allocate(curr_items_alloc_);
990 
991  for (uint32_t i = 0; i < prev_size; ++i) {
992  new (&tmp_data[i]) T(std::move(data_[i]));
993  data_[i].~T();
994  tmp_weights[i] = weights_[i];
995  }
996 
997  allocator_.deallocate(data_, prev_size);
998  AllocDouble(allocator_).deallocate(weights_, prev_size);
999 
1000  data_ = tmp_data;
1001  weights_ = tmp_weights;
1002 
1003  if (marks_ != nullptr) {
1004  bool* tmp_marks = AllocBool(allocator_).allocate(curr_items_alloc_);
1005  for (uint32_t i = 0; i < prev_size; ++i) {
1006  tmp_marks[i] = marks_[i];
1007  }
1008  AllocBool(allocator_).deallocate(marks_, prev_size);
1009  marks_ = tmp_marks;
1010  }
1011  }
1012 }
1013 
1014 template<typename T, typename A>
1015 void var_opt_sketch<T, A>::transition_from_warmup() {
1016  // Move the 2 lightest items from H to M
1017  // But the lighter really belongs in R, so update counts to reflect that
1018  convert_to_heap();
1019  pop_min_to_m_region();
1020  pop_min_to_m_region();
1021  --m_;
1022  ++r_;
1023 
1024  if (h_ != (k_ -1) || m_ != 1 || r_ != 1)
1025  throw std::logic_error("invalid state for transitioning from warmup");
1026 
1027  // Update total weight in R and then, having grabbed the value, overwrite
1028  // in weight_ array to help make bugs more obvious
1029  total_wt_r_ = weights_[k_]; // only one item, known location
1030  weights_[k_] = -1.0;
1031 
1032  // The two lightest items are ncessarily downsample-able to one item,
1033  // and are therefore a valid initial candidate set
1034  grow_candidate_set(weights_[k_ - 1] + total_wt_r_, 2);
1035 }
1036 
1037 template<typename T, typename A>
1038 void var_opt_sketch<T, A>::convert_to_heap() {
1039  if (h_ < 2) {
1040  return; // nothing to do
1041  }
1042 
1043  const uint32_t last_slot = h_ - 1;
1044  const int last_non_leaf = ((last_slot + 1) / 2) - 1;
1045 
1046  for (int j = last_non_leaf; j >= 0; --j) {
1047  restore_towards_leaves(j);
1048  }
1049 
1050  // validates heap, used for initial debugging
1051  //for (uint32_t j = h_ - 1; j >= 1; --j) {
1052  // uint32_t p = ((j + 1) / 2) - 1;
1053  // if (weights_[p] > weights_[j]) throw std::logic_error("invalid heap");
1054  //}
1055 }
1056 
1057 template<typename T, typename A>
1058 void var_opt_sketch<T, A>::restore_towards_leaves(uint32_t slot_in) {
1059  const uint32_t last_slot = h_ - 1;
1060  if (h_ == 0 || slot_in > last_slot) throw std::logic_error("invalid heap state");
1061 
1062  uint32_t slot = slot_in;
1063  uint32_t child = (2 * slot_in) + 1; // might be invalid, need to check
1064 
1065  while (child <= last_slot) {
1066  uint32_t child2 = child + 1; // might also be invalid
1067  if ((child2 <= last_slot) && (weights_[child2] < weights_[child])) {
1068  // siwtch to other child if it's both valid and smaller
1069  child = child2;
1070  }
1071 
1072  if (weights_[slot] <= weights_[child]) {
1073  // invariant holds so we're done
1074  break;
1075  }
1076 
1077  // swap and continue
1078  swap_values(slot, child);
1079 
1080  slot = child;
1081  child = (2 * slot) + 1; // might be invalid, checked on next loop
1082  }
1083 }
1084 
1085 template<typename T, typename A>
1086 void var_opt_sketch<T, A>::restore_towards_root(uint32_t slot_in) {
1087  uint32_t slot = slot_in;
1088  uint32_t p = (((slot + 1) / 2) - 1); // valid if slot >= 1
1089  while ((slot > 0) && (weights_[slot] < weights_[p])) {
1090  swap_values(slot, p);
1091  slot = p;
1092  p = (((slot + 1) / 2) - 1); // valid if slot >= 1
1093  }
1094 }
1095 
1096 template<typename T, typename A>
1097 template<typename O>
1098 void var_opt_sketch<T, A>::push(O&& item, double wt, bool mark) {
1099  if (filled_data_) {
1100  if (&data_[h_] != &item)
1101  data_[h_] = std::forward<O>(item);
1102  } else {
1103  new (&data_[h_]) T(std::forward<O>(item));
1104  filled_data_ = true;
1105  }
1106  weights_[h_] = wt;
1107  if (marks_ != nullptr) {
1108  marks_[h_] = mark;
1109  num_marks_in_h_ += (mark ? 1 : 0);
1110  }
1111  ++h_;
1112 
1113  restore_towards_root(h_ - 1); // need use old h_, but want accurate h_
1114 }
1115 
1116 template<typename T, typename A>
1117 void var_opt_sketch<T, A>::pop_min_to_m_region() {
1118  if (h_ == 0 || (h_ + m_ + r_ != k_ + 1))
1119  throw std::logic_error("invalid heap state popping min to M region");
1120 
1121  if (h_ == 1) {
1122  // just update bookkeeping
1123  ++m_;
1124  --h_;
1125  } else {
1126  // main case
1127  uint32_t tgt = h_ - 1; // last slot, will swap with root
1128  swap_values(0, tgt);
1129  ++m_;
1130  --h_;
1131 
1132  restore_towards_leaves(0);
1133  }
1134 
1135  if (is_marked(h_)) {
1136  --num_marks_in_h_;
1137  }
1138 }
1139 
1140 
1141 template<typename T, typename A>
1142 void var_opt_sketch<T, A>::swap_values(uint32_t src, uint32_t dst) {
1143  std::swap(data_[src], data_[dst]);
1144  std::swap(weights_[src], weights_[dst]);
1145 
1146  if (marks_ != nullptr) {
1147  std::swap(marks_[src], marks_[dst]);
1148  }
1149 }
1150 
1151 /* When entering here we should be in a well-characterized state where the
1152  new item has been placed in either h or m and we have a valid but not necessarily
1153  maximal sampling plan figured out. The array is completely full at this point.
1154  Everyone in h and m has an explicit weight. The candidates are right-justified
1155  and are either just the r set or the r set + exactly one m item. The number
1156  of cands is at least 2. We will now grow the candidate set as much as possible
1157  by pulling sufficiently light items from h to m.
1158 */
1159 template<typename T, typename A>
1160 void var_opt_sketch<T, A>::grow_candidate_set(double wt_cands, uint32_t num_cands) {
1161  if ((h_ + m_ + r_ != k_ + 1) || (num_cands < 1) || (num_cands != m_ + r_) || (m_ >= 2))
1162  throw std::logic_error("invariant violated when growing candidate set");
1163 
1164  while (h_ > 0) {
1165  const double next_wt = peek_min();
1166  const double next_tot_wt = wt_cands + next_wt;
1167 
1168  // test for strict lightness of next prospect (denominator multiplied through)
1169  // ideally: (next_wt * (next_num_cands-1) < next_tot_wt)
1170  // but can use num_cands directly
1171  if ((next_wt * num_cands) < next_tot_wt) {
1172  wt_cands = next_tot_wt;
1173  ++num_cands;
1174  pop_min_to_m_region(); // adjusts h_ and m_
1175  } else {
1176  break;
1177  }
1178  }
1179 
1180  downsample_candidate_set(wt_cands, num_cands);
1181 }
1182 
1183 template<typename T, typename A>
1184 void var_opt_sketch<T, A>::downsample_candidate_set(double wt_cands, uint32_t num_cands) {
1185  if (num_cands < 2 || h_ + num_cands != k_ + 1)
1186  throw std::logic_error("invalid num_cands when downsampling");
1187 
1188  // need this before overwriting anything
1189  const uint32_t delete_slot = choose_delete_slot(wt_cands, num_cands);
1190  const uint32_t leftmost_cand_slot = h_;
1191  if (delete_slot < leftmost_cand_slot || delete_slot > k_)
1192  throw std::logic_error("invalid delete slot index when downsampling");
1193 
1194  // Overwrite weights for items from M moving into R,
1195  // to make bugs more obvious. Also needed so anyone reading the
1196  // weight knows if it's invalid without checking h_ and m_
1197  const uint32_t stop_idx = leftmost_cand_slot + m_;
1198  for (uint32_t j = leftmost_cand_slot; j < stop_idx; ++j) {
1199  weights_[j] = -1.0;
1200  }
1201 
1202  // The next line works even when delete_slot == leftmost_cand_slot
1203  data_[delete_slot] = std::move(data_[leftmost_cand_slot]);
1204 
1205  m_ = 0;
1206  r_ = num_cands - 1;
1207  total_wt_r_ = wt_cands;
1208 }
1209 
1210 template<typename T, typename A>
1211 uint32_t var_opt_sketch<T, A>::choose_delete_slot(double wt_cands, uint32_t num_cands) const {
1212  if (r_ == 0) throw std::logic_error("choosing delete slot while in exact mode");
1213 
1214  if (m_ == 0) {
1215  // this happens if we insert a really heavy item
1216  return pick_random_slot_in_r();
1217  } else if (m_ == 1) {
1218  // check if we keep th item in M or pick oen from R
1219  // p(keep) = (num_cand - 1) * wt_M / wt_cand
1220  double wt_m_cand = weights_[h_]; // slot of item in M is h_
1221  if ((wt_cands * next_double_exclude_zero()) < ((num_cands - 1) * wt_m_cand)) {
1222  return pick_random_slot_in_r(); // keep item in M
1223  } else {
1224  return h_; // indext of item in M
1225  }
1226  } else {
1227  // general case
1228  const uint32_t delete_slot = choose_weighted_delete_slot(wt_cands, num_cands);
1229  const uint32_t first_r_slot = h_ + m_;
1230  if (delete_slot == first_r_slot) {
1231  return pick_random_slot_in_r();
1232  } else {
1233  return delete_slot;
1234  }
1235  }
1236 }
1237 
1238 template<typename T, typename A>
1239 uint32_t var_opt_sketch<T, A>::choose_weighted_delete_slot(double wt_cands, uint32_t num_cands) const {
1240  if (m_ < 1) throw std::logic_error("must have weighted delete slot");
1241 
1242  const uint32_t offset = h_;
1243  const uint32_t final_m = (offset + m_) - 1;
1244  const uint32_t num_to_keep = num_cands - 1;
1245 
1246  double left_subtotal = 0.0;
1247  double right_subtotal = -1.0 * wt_cands * next_double_exclude_zero();
1248 
1249  for (uint32_t i = offset; i <= final_m; ++i) {
1250  left_subtotal += num_to_keep * weights_[i];
1251  right_subtotal += wt_cands;
1252 
1253  if (left_subtotal < right_subtotal) {
1254  return i;
1255  }
1256  }
1257 
1258  // this slot tells caller that we need to delete out of R
1259  return final_m + 1;
1260 }
1261 
1262 template<typename T, typename A>
1263 uint32_t var_opt_sketch<T, A>::pick_random_slot_in_r() const {
1264  if (r_ == 0) throw std::logic_error("r_ = 0 when picking slot in R region");
1265  const uint32_t offset = h_ + m_;
1266  if (r_ == 1) {
1267  return offset;
1268  } else {
1269  return offset + next_int(r_);
1270  }
1271 }
1272 
1273 template<typename T, typename A>
1274 double var_opt_sketch<T, A>::peek_min() const {
1275  if (h_ == 0) throw std::logic_error("h_ = 0 when checking min in H region");
1276  return weights_[0];
1277 }
1278 
1279 template<typename T, typename A>
1280 inline bool var_opt_sketch<T, A>::is_marked(uint32_t idx) const {
1281  return marks_ == nullptr ? false : marks_[idx];
1282 }
1283 
1284 template<typename T, typename A>
1285 double var_opt_sketch<T, A>::get_tau() const {
1286  return r_ == 0 ? std::nan("1") : (total_wt_r_ / r_);
1287 }
1288 
1289 template<typename T, typename A>
1290 void var_opt_sketch<T, A>::strip_marks() {
1291  if (marks_ == nullptr) throw std::logic_error("request to strip marks from non-gadget");
1292  num_marks_in_h_ = 0;
1293  AllocBool(allocator_).deallocate(marks_, curr_items_alloc_);
1294  marks_ = nullptr;
1295 }
1296 
1297 template<typename T, typename A>
1298 void var_opt_sketch<T, A>::check_preamble_longs(uint8_t preamble_longs, uint8_t flags) {
1299  const bool is_empty(flags & EMPTY_FLAG_MASK);
1300 
1301  if (is_empty) {
1302  if (preamble_longs != PREAMBLE_LONGS_EMPTY) {
1303  throw std::invalid_argument("Possible corruption: Preamble longs must be "
1304  + std::to_string(PREAMBLE_LONGS_EMPTY) + " for an empty sketch. Found: "
1305  + std::to_string(preamble_longs));
1306  }
1307  } else {
1308  if (preamble_longs != PREAMBLE_LONGS_WARMUP
1309  && preamble_longs != PREAMBLE_LONGS_FULL) {
1310  throw std::invalid_argument("Possible corruption: Preamble longs must be "
1311  + std::to_string(PREAMBLE_LONGS_WARMUP) + " or "
1312  + std::to_string(PREAMBLE_LONGS_FULL)
1313  + " for a non-empty sketch. Found: " + std::to_string(preamble_longs));
1314  }
1315  }
1316 }
1317 
1318 template<typename T, typename A>
1319 void var_opt_sketch<T, A>::check_family_and_serialization_version(uint8_t family_id, uint8_t ser_ver) {
1320  if (family_id == FAMILY_ID) {
1321  if (ser_ver != SER_VER) {
1322  throw std::invalid_argument("Possible corruption: VarOpt serialization version must be "
1323  + std::to_string(SER_VER) + ". Found: " + std::to_string(ser_ver));
1324  }
1325  return;
1326  }
1327  // TODO: extend to handle reservoir sampling
1328 
1329  throw std::invalid_argument("Possible corruption: VarOpt family id must be "
1330  + std::to_string(FAMILY_ID) + ". Found: " + std::to_string(family_id));
1331 }
1332 
1333 template<typename T, typename A>
1334 uint32_t var_opt_sketch<T, A>::validate_and_get_target_size(uint32_t preamble_longs, uint32_t k, uint64_t n,
1335  uint32_t h, uint32_t r, resize_factor rf) {
1336  if (k == 0 || k > MAX_K) {
1337  throw std::invalid_argument("k must be at least 1 and less than 2^31 - 1");
1338  }
1339 
1340  uint32_t array_size;
1341 
1342  if (n <= k) {
1343  if (preamble_longs != PREAMBLE_LONGS_WARMUP) {
1344  throw std::invalid_argument("Possible corruption: deserializing with n <= k but not in warmup mode. "
1345  "Found n = " + std::to_string(n) + ", k = " + std::to_string(k));
1346  }
1347  if (n != h) {
1348  throw std::invalid_argument("Possible corruption: deserializing in warmup mode but n != h. "
1349  "Found n = " + std::to_string(n) + ", h = " + std::to_string(h));
1350  }
1351  if (r > 0) {
1352  throw std::invalid_argument("Possible corruption: deserializing in warmup mode but r > 0. "
1353  "Found r = " + std::to_string(r));
1354  }
1355 
1356  const uint32_t ceiling_lg_k = to_log_2(ceiling_power_of_2(k));
1357  const uint32_t min_lg_size = to_log_2(ceiling_power_of_2(h));
1358  const uint32_t initial_lg_size = starting_sub_multiple(ceiling_lg_k, rf, min_lg_size);
1359  array_size = get_adjusted_size(k, 1 << initial_lg_size);
1360  if (array_size == k) { // if full size, need to leave 1 for the gap
1361  ++array_size;
1362  }
1363  } else { // n > k
1364  if (preamble_longs != PREAMBLE_LONGS_FULL) {
1365  throw std::invalid_argument("Possible corruption: deserializing with n > k but not in full mode. "
1366  "Found n = " + std::to_string(n) + ", k = " + std::to_string(k));
1367  }
1368  if (h + r != k) {
1369  throw std::invalid_argument("Possible corruption: deserializing in full mode but h + r != n. "
1370  "Found h = " + std::to_string(h) + ", r = " + std::to_string(r) + ", n = " + std::to_string(n));
1371  }
1372 
1373  array_size = k + 1;
1374  }
1375 
1376  return array_size;
1377 }
1378 
1379 template<typename T, typename A>
1380 template<typename P>
1381 subset_summary var_opt_sketch<T, A>::estimate_subset_sum(P predicate) const {
1382  if (n_ == 0) {
1383  return {0.0, 0.0, 0.0, 0.0};
1384  }
1385 
1386  double total_wt_h = 0.0;
1387  double h_true_wt = 0.0;
1388  size_t idx = 0;
1389  for (; idx < h_; ++idx) {
1390  double wt = weights_[idx];
1391  total_wt_h += wt;
1392  if (predicate(data_[idx])) {
1393  h_true_wt += wt;
1394  }
1395  }
1396 
1397  // if only heavy items, we have an exact answer
1398  if (r_ == 0) {
1399  return {h_true_wt, h_true_wt, h_true_wt, h_true_wt};
1400  }
1401 
1402  // since r_ > 0, we know we have samples
1403  const uint64_t num_samples = n_ - h_;
1404  double effective_sampling_rate = r_ / static_cast<double>(num_samples);
1405  if (effective_sampling_rate < 0.0 || effective_sampling_rate > 1.0)
1406  throw std::logic_error("invalid sampling rate outside [0.0, 1.0]");
1407 
1408  uint32_t r_true_count = 0;
1409  ++idx; // skip the gap
1410  for (; idx < (k_ + 1); ++idx) {
1411  if (predicate(data_[idx])) {
1412  ++r_true_count;
1413  }
1414  }
1415 
1416  double lb_true_fraction = pseudo_hypergeometric_lb_on_p(r_, r_true_count, effective_sampling_rate);
1417  double estimated_true_fraction = (1.0 * r_true_count) / r_;
1418  double ub_true_fraction = pseudo_hypergeometric_ub_on_p(r_, r_true_count, effective_sampling_rate);
1419 
1420  return { h_true_wt + (total_wt_r_ * lb_true_fraction),
1421  h_true_wt + (total_wt_r_ * estimated_true_fraction),
1422  h_true_wt + (total_wt_r_ * ub_true_fraction),
1423  total_wt_h + total_wt_r_
1424  };
1425 }
1426 
1427 template<typename T, typename A>
1428 class var_opt_sketch<T, A>::items_deleter {
1429  public:
1430  items_deleter(uint32_t num, const A& allocator) : num(num), h_count(0), r_count(0), allocator(allocator) {}
1431  void set_h(uint32_t h) { h_count = h; }
1432  void set_r(uint32_t r) { r_count = r; }
1433  void operator() (T* ptr) {
1434  if (h_count > 0) {
1435  for (size_t i = 0; i < h_count; ++i) {
1436  ptr[i].~T();
1437  }
1438  }
1439  if (r_count > 0) {
1440  uint32_t end = h_count + r_count + 1;
1441  for (size_t i = h_count + 1; i < end; ++i) {
1442  ptr[i].~T();
1443  }
1444  }
1445  if (ptr != nullptr) {
1446  allocator.deallocate(ptr, num);
1447  }
1448  }
1449  private:
1450  uint32_t num;
1451  uint32_t h_count;
1452  uint32_t r_count;
1453  A allocator;
1454 };
1455 
1456 template<typename T, typename A>
1457 class var_opt_sketch<T, A>::weights_deleter {
1458  public:
1459  weights_deleter(uint32_t num, const A& allocator) : num(num), allocator(allocator) {}
1460  void operator() (double* ptr) {
1461  if (ptr != nullptr) {
1462  allocator.deallocate(ptr, num);
1463  }
1464  }
1465  private:
1466  uint32_t num;
1467  AllocDouble allocator;
1468 };
1469 
1470 template<typename T, typename A>
1471 class var_opt_sketch<T, A>::marks_deleter {
1472  public:
1473  marks_deleter(uint32_t num, const A& allocator) : num(num), allocator(allocator) {}
1474  void operator() (bool* ptr) {
1475  if (ptr != nullptr) {
1476  allocator.deallocate(ptr, 1);
1477  }
1478  }
1479  private:
1480  uint32_t num;
1481  AllocBool allocator;
1482 };
1483 
1484 
1485 template<typename T, typename A>
1487  return const_iterator(*this, false);
1488 }
1489 
1490 template<typename T, typename A>
1492  return const_iterator(*this, true);
1493 }
1494 
1495 // -------- var_opt_sketch::const_iterator implementation ---------
1496 
1497 template<typename T, typename A>
1499  sk_(&sk),
1500  cum_r_weight_(0.0),
1501  r_item_wt_(sk.get_tau()),
1502  final_idx_(sk.r_ > 0 ? sk.h_ + sk.r_ + 1 : sk.h_)
1503 {
1504  // index logic easier to read if not inline
1505  if (is_end) {
1506  idx_ = final_idx_;
1507  sk_ = nullptr;
1508  } else {
1509  idx_ = (sk.h_ == 0 && sk.r_ > 0 ? 1 : 0); // skip if gap is at start
1510  }
1511 
1512  // should only apply if sketch is empty
1513  if (idx_ == final_idx_) { sk_ = nullptr; }
1514 }
1515 
1516 template<typename T, typename A>
1517 var_opt_sketch<T, A>::const_iterator::const_iterator(const var_opt_sketch& sk, bool is_end, bool use_r_region) :
1518  sk_(&sk),
1519  cum_r_weight_(0.0),
1520  r_item_wt_(sk.get_tau()),
1521  final_idx_(sk.h_ + (use_r_region ? 1 + sk.r_ : 0))
1522 {
1523  if (use_r_region) {
1524  idx_ = sk.h_ + 1 + (is_end ? sk.r_ : 0);
1525  } else { // H region
1526  // gap at start only if h_ == 0, so index always starts at 0
1527  idx_ = (is_end ? sk.h_ : 0);
1528  }
1529 
1530  // unlike in full iterator case, may happen even if sketch is not empty
1531  if (idx_ == final_idx_) { sk_ = nullptr; }
1532 }
1533 
1534 template<typename T, typename A>
1535 var_opt_sketch<T, A>::const_iterator::const_iterator(const const_iterator& other) :
1536  sk_(other.sk_),
1537  cum_r_weight_(other.cum_r_weight_),
1538  r_item_wt_(other.r_item_wt_),
1539  idx_(other.idx_),
1540  final_idx_(other.final_idx_)
1541 {}
1542 
1543 template<typename T, typename A>
1544 typename var_opt_sketch<T, A>::const_iterator& var_opt_sketch<T, A>::const_iterator::operator++() {
1545  // accumulate weight already visited
1546  if (idx_ > sk_->h_) { cum_r_weight_ += r_item_wt_; }
1547 
1548  ++idx_;
1549 
1550  if (idx_ == final_idx_) {
1551  sk_ = nullptr;
1552  return *this;
1553  } else if (idx_ == sk_->h_ && sk_->r_ > 0) { // check for the gap
1554  ++idx_;
1555  }
1556  return *this;
1557 }
1558 
1559 template<typename T, typename A>
1560 typename var_opt_sketch<T, A>::const_iterator& var_opt_sketch<T, A>::const_iterator::operator++(int) {
1561  const_iterator tmp(*this);
1562  operator++();
1563  return tmp;
1564 }
1565 
1566 template<typename T, typename A>
1567 bool var_opt_sketch<T, A>::const_iterator::operator==(const const_iterator& other) const {
1568  if (sk_ != other.sk_) return false;
1569  if (sk_ == nullptr) return true; // end (and we know other.sk_ is also null)
1570  return idx_ == other.idx_;
1571 }
1572 
1573 template<typename T, typename A>
1574 bool var_opt_sketch<T, A>::const_iterator::operator!=(const const_iterator& other) const {
1575  return !operator==(other);
1576 }
1577 
1578 template<typename T, typename A>
1579 auto var_opt_sketch<T, A>::const_iterator::operator*() const -> reference {
1580  double wt;
1581  if (idx_ < sk_->h_) {
1582  wt = sk_->weights_[idx_];
1583  } else {
1584  wt = r_item_wt_;
1585  }
1586  return value_type(sk_->data_[idx_], wt);
1587 }
1588 
1589 template<typename T, typename A>
1590 auto var_opt_sketch<T, A>::const_iterator::operator->() const -> pointer {
1591  return **this;
1592 }
1593 
1594 template<typename T, typename A>
1595 bool var_opt_sketch<T, A>::const_iterator::get_mark() const {
1596  return sk_->marks_ == nullptr ? false : sk_->marks_[idx_];
1597 }
1598 
1599 
1600 // -------- var_opt_sketch::iterator implementation ---------
1601 
1602 template<typename T, typename A>
1603 var_opt_sketch<T, A>::iterator::iterator(const var_opt_sketch& sk, bool is_end, bool use_r_region) :
1604  sk_(&sk),
1605  cum_r_weight_(0.0),
1606  r_item_wt_(sk.get_tau()),
1607  final_idx_(sk.h_ + (use_r_region ? 1 + sk.r_ : 0))
1608 {
1609  if (use_r_region) {
1610  idx_ = sk.h_ + 1 + (is_end ? sk.r_ : 0);
1611  } else { // H region
1612  // gap at start only if h_ == 0, so index always starts at 0
1613  idx_ = (is_end ? sk.h_ : 0);
1614  }
1615 
1616  // unlike in full iterator case, may happen even if sketch is not empty
1617  if (idx_ == final_idx_) { sk_ = nullptr; }
1618 }
1619 
1620 template<typename T, typename A>
1621 var_opt_sketch<T, A>::iterator::iterator(const iterator& other) :
1622  sk_(other.sk_),
1623  cum_r_weight_(other.cum_r_weight_),
1624  r_item_wt_(other.r_item_wt_),
1625  idx_(other.idx_),
1626  final_idx_(other.final_idx_)
1627 {}
1628 
1629 template<typename T, typename A>
1630 typename var_opt_sketch<T, A>::iterator& var_opt_sketch<T, A>::iterator::operator++() {
1631  // accumulate weight already visited
1632  if (idx_ > sk_->h_) { cum_r_weight_ += r_item_wt_; }
1633 
1634  ++idx_;
1635 
1636  if (idx_ == final_idx_) {
1637  sk_ = nullptr;
1638  return *this;
1639  } else if (idx_ == sk_->h_ && sk_->r_ > 0) { // check for the gap
1640  ++idx_;
1641  }
1642 
1643  return *this;
1644 }
1645 
1646 template<typename T, typename A>
1647 typename var_opt_sketch<T, A>::iterator& var_opt_sketch<T, A>::iterator::operator++(int) {
1648  const_iterator tmp(*this);
1649  operator++();
1650  return tmp;
1651 }
1652 
1653 template<typename T, typename A>
1654 bool var_opt_sketch<T, A>::iterator::operator==(const iterator& other) const {
1655  if (sk_ != other.sk_) return false;
1656  if (sk_ == nullptr) return true; // end (and we know other.sk_ is also null)
1657  return idx_ == other.idx_;
1658 }
1659 
1660 template<typename T, typename A>
1661 bool var_opt_sketch<T, A>::iterator::operator!=(const iterator& other) const {
1662  return !operator==(other);
1663 }
1664 
1665 template<typename T, typename A>
1666 auto var_opt_sketch<T, A>::iterator::operator*() -> reference {
1667  double wt;
1668  if (idx_ < sk_->h_) {
1669  wt = sk_->weights_[idx_];
1670  } else if (idx_ == final_idx_ - 1) {
1671  wt = sk_->total_wt_r_ - cum_r_weight_;
1672  } else {
1673  wt = r_item_wt_;
1674  }
1675  return value_type(sk_->data_[idx_], wt);
1676 }
1677 
1678 template<typename T, typename A>
1679 auto var_opt_sketch<T, A>::iterator::operator->() -> pointer {
1680  return **this;
1681 }
1682 
1683 template<typename T, typename A>
1684 bool var_opt_sketch<T, A>::iterator::get_mark() const {
1685  return sk_->marks_ == nullptr ? false : sk_->marks_[idx_];
1686 }
1687 
1688 /*
1689  * Checks if target sampling allocation is more than 50% of max sampling size.
1690  * If so, returns max sampling size, otherwise passes through target size.
1691  */
1692 template<typename T, typename A>
1693 uint32_t var_opt_sketch<T, A>::get_adjusted_size(uint32_t max_size, uint32_t resize_target) {
1694  if (max_size < (resize_target << 1)) {
1695  return max_size;
1696  }
1697  return resize_target;
1698 }
1699 
1700 template<typename T, typename A>
1701 uint32_t var_opt_sketch<T, A>::starting_sub_multiple(uint32_t lg_target, uint32_t lg_rf, uint32_t lg_min) {
1702  return (lg_target <= lg_min)
1703  ? lg_min : (lg_rf == 0) ? lg_target
1704  : (lg_target - lg_min) % lg_rf + lg_min;
1705 }
1706 
1707 template<typename T, typename A>
1708 double var_opt_sketch<T, A>::pseudo_hypergeometric_ub_on_p(uint64_t n, uint32_t k, double sampling_rate) {
1709  const double adjusted_kappa = DEFAULT_KAPPA * sqrt(1 - sampling_rate);
1710  return bounds_binomial_proportions::approximate_upper_bound_on_p(n, k, adjusted_kappa);
1711 }
1712 
1713 template<typename T, typename A>
1714 double var_opt_sketch<T, A>::pseudo_hypergeometric_lb_on_p(uint64_t n, uint32_t k, double sampling_rate) {
1715  const double adjusted_kappa = DEFAULT_KAPPA * sqrt(1 - sampling_rate);
1716  return bounds_binomial_proportions::approximate_lower_bound_on_p(n, k, adjusted_kappa);
1717 }
1718 
1719 template<typename T, typename A>
1720 bool var_opt_sketch<T, A>::is_power_of_2(uint32_t v) {
1721  return v && !(v & (v - 1));
1722 }
1723 
1724 template<typename T, typename A>
1725 uint32_t var_opt_sketch<T, A>::to_log_2(uint32_t v) {
1726  if (is_power_of_2(v)) {
1727  return count_trailing_zeros_in_u32(v);
1728  } else {
1729  throw std::invalid_argument("Attempt to compute integer log2 of non-positive or non-power of 2");
1730  }
1731 }
1732 
1733 // Returns an integer in the range [0, max_value) -- excludes max_value
1734 template<typename T, typename A>
1735 uint32_t var_opt_sketch<T, A>::next_int(uint32_t max_value) {
1736  std::uniform_int_distribution<uint32_t> dist(0, max_value - 1);
1737  return dist(random_utils::rand);
1738 }
1739 
1740 template<typename T, typename A>
1741 double var_opt_sketch<T, A>::next_double_exclude_zero() {
1742  double r = random_utils::next_double(random_utils::rand);
1743  while (r == 0.0) {
1744  r = random_utils::next_double(random_utils::rand);
1745  }
1746  return r;
1747 }
1748 
1749 }
1750 
1751 // namespace datasketches
1752 
1753 #endif // _VAR_OPT_SKETCH_IMPL_HPP_
This sketch samples data from a stream of items.
Definition: var_opt_sketch.hpp:67
void update(const T &item, double weight=1.0)
Updates this sketch with the given data item with the given weight.
Definition: var_opt_sketch_impl.hpp:709
vector_bytes serialize(unsigned header_size_bytes=0, const SerDe &sd=SerDe()) const
This method serializes the sketch as a vector of bytes.
var_opt_sketch(uint32_t k, resize_factor rf=var_opt_constants::DEFAULT_RESIZE_FACTOR, const A &allocator=A())
Constructor.
Definition: var_opt_sketch_impl.hpp:46
static var_opt_sketch deserialize(std::istream &is, const SerDe &sd=SerDe(), const A &allocator=A())
This method deserializes a sketch from a given stream.
DataSketches namespace.
Definition: binomial_bounds.hpp:38