datasketches-cpp
Loading...
Searching...
No Matches
var_opt_sketch_impl.hpp
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#ifndef _VAR_OPT_SKETCH_IMPL_HPP_
21#define _VAR_OPT_SKETCH_IMPL_HPP_
22
23#include <memory>
24#include <sstream>
25#include <cmath>
26#include <random>
27#include <algorithm>
28#include <stdexcept>
29
30#include "var_opt_sketch.hpp"
31#include "serde.hpp"
32#include "bounds_binomial_proportions.hpp"
33#include "count_zeros.hpp"
34#include "memory_operations.hpp"
35#include "ceiling_power_of_2.hpp"
36
37namespace datasketches {
38
39/*
40 * Implementation code for the VarOpt sketch.
41 *
42 * author Kevin Lang
43 * author Jon Malkin
44 */
45template<typename T, typename A>
46var_opt_sketch<T, A>::var_opt_sketch(uint32_t k, resize_factor rf, const A& allocator) :
47 var_opt_sketch(k, rf, false, allocator) {}
48
49template<typename T, typename A>
51 k_(other.k_),
52 h_(other.h_),
53 m_(other.m_),
54 r_(other.r_),
55 n_(other.n_),
56 total_wt_r_(other.total_wt_r_),
57 rf_(other.rf_),
58 curr_items_alloc_(other.curr_items_alloc_),
59 filled_data_(other.filled_data_),
60 allocator_(other.allocator_),
61 data_(nullptr),
62 weights_(nullptr),
63 num_marks_in_h_(other.num_marks_in_h_),
64 marks_(nullptr)
65 {
66 data_ = allocator_.allocate(curr_items_alloc_);
67 // skip gap or anything unused at the end
68 for (size_t i = 0; i < h_; ++i)
69 new (&data_[i]) T(other.data_[i]);
70 for (size_t i = h_ + 1; i < h_ + r_ + 1; ++i)
71 new (&data_[i]) T(other.data_[i]);
72
73 // we skipped the gap
74 filled_data_ = false;
75
76 weights_ = AllocDouble(allocator_).allocate(curr_items_alloc_);
77 // doubles so can successfully copy regardless of the internal state
78 std::copy(other.weights_, other.weights_ + curr_items_alloc_, weights_);
80 if (other.marks_ != nullptr) {
81 marks_ = AllocBool(allocator_).allocate(curr_items_alloc_);
82 std::copy(other.marks_, other.marks_ + curr_items_alloc_, marks_);
83 }
84 }
85
86template<typename T, typename A>
87var_opt_sketch<T, A>::var_opt_sketch(const var_opt_sketch& other, bool as_sketch, uint64_t adjusted_n) :
88 k_(other.k_),
89 h_(other.h_),
90 m_(other.m_),
91 r_(other.r_),
92 n_(adjusted_n),
93 total_wt_r_(other.total_wt_r_),
94 rf_(other.rf_),
95 curr_items_alloc_(other.curr_items_alloc_),
96 filled_data_(other.filled_data_),
97 allocator_(other.allocator_),
98 data_(nullptr),
99 weights_(nullptr),
100 num_marks_in_h_(other.num_marks_in_h_),
101 marks_(nullptr)
103 data_ = allocator_.allocate(curr_items_alloc_);
104 // skip gap or anything unused at the end
105 for (size_t i = 0; i < h_; ++i)
106 new (&data_[i]) T(other.data_[i]);
107 for (size_t i = h_ + 1; i < h_ + r_ + 1; ++i)
108 new (&data_[i]) T(other.data_[i]);
110 // we skipped the gap
111 filled_data_ = false;
112
113 weights_ = AllocDouble(allocator_).allocate(curr_items_alloc_);
114 // doubles so can successfully copy regardless of the internal state
115 std::copy(other.weights_, other.weights_ + curr_items_alloc_, weights_);
116
117 if (!as_sketch && other.marks_ != nullptr) {
118 marks_ = AllocBool(allocator_).allocate(curr_items_alloc_);
119 std::copy(other.marks_, other.marks_ + curr_items_alloc_, marks_);
120 }
121 }
122
123template<typename T, typename A>
125 k_(other.k_),
126 h_(other.h_),
127 m_(other.m_),
128 r_(other.r_),
129 n_(other.n_),
130 total_wt_r_(other.total_wt_r_),
131 rf_(other.rf_),
132 curr_items_alloc_(other.curr_items_alloc_),
133 filled_data_(other.filled_data_),
134 allocator_(other.allocator_),
135 data_(other.data_),
136 weights_(other.weights_),
137 num_marks_in_h_(other.num_marks_in_h_),
138 marks_(other.marks_)
139 {
140 other.data_ = nullptr;
141 other.weights_ = nullptr;
142 other.marks_ = nullptr;
144
145template<typename T, typename A>
146var_opt_sketch<T, A>::var_opt_sketch(uint32_t k, resize_factor rf, bool is_gadget, const A& allocator) :
147 k_(k), h_(0), m_(0), r_(0), n_(0), total_wt_r_(0.0), rf_(rf), allocator_(allocator) {
148 if (k == 0 || k_ > MAX_K) {
149 throw std::invalid_argument("k must be at least 1 and less than 2^31 - 1");
150 }
151
152 uint32_t ceiling_lg_k = to_log_2(ceiling_power_of_2(k_));
153 uint32_t initial_lg_size = starting_sub_multiple(ceiling_lg_k, rf_, MIN_LG_ARR_ITEMS);
154 curr_items_alloc_ = get_adjusted_size(k_, 1 << initial_lg_size);
155 if (curr_items_alloc_ == k_) { // if full size, need to leave 1 for the gap
156 ++curr_items_alloc_;
157 }
158
159 allocate_data_arrays(curr_items_alloc_, is_gadget);
160 num_marks_in_h_ = 0;
161}
162
163template<typename T, typename A>
164var_opt_sketch<T, A>::var_opt_sketch(uint32_t k, uint32_t h, uint32_t m, uint32_t r, uint64_t n, double total_wt_r, resize_factor rf,
165 uint32_t curr_items_alloc, bool filled_data, std::unique_ptr<T, items_deleter> items,
166 std::unique_ptr<double, weights_deleter> weights, uint32_t num_marks_in_h,
167 std::unique_ptr<bool, marks_deleter> marks, const A& allocator) :
168 k_(k),
169 h_(h),
170 m_(m),
171 r_(r),
172 n_(n),
173 total_wt_r_(total_wt_r),
174 rf_(rf),
175 curr_items_alloc_(curr_items_alloc),
176 filled_data_(filled_data),
177 allocator_(allocator),
178 data_(items.release()),
179 weights_(weights.release()),
180 num_marks_in_h_(num_marks_in_h),
181 marks_(marks.release())
182{}
183
184
185template<typename T, typename A>
187 if (data_ != nullptr) {
188 if (filled_data_) {
189 // destroy everything
190 const size_t num_to_destroy = std::min(k_ + 1, curr_items_alloc_);
191 for (size_t i = 0; i < num_to_destroy; ++i) {
192 data_[i].~T();
193 }
194 } else {
195 // skip gap or anything unused at the end
196 for (size_t i = 0; i < h_; ++i) {
197 data_[i].~T();
198 }
199
200 for (size_t i = h_ + 1; i < h_ + r_ + 1; ++i) {
201 data_[i].~T();
202 }
203 }
204 allocator_.deallocate(data_, curr_items_alloc_);
205 }
207 if (weights_ != nullptr) {
208 AllocDouble(allocator_).deallocate(weights_, curr_items_alloc_);
209 }
210
211 if (marks_ != nullptr) {
212 AllocBool(allocator_).deallocate(marks_, curr_items_alloc_);
213 }
214}
215
216template<typename T, typename A>
218 var_opt_sketch sk_copy(other);
219 std::swap(k_, sk_copy.k_);
220 std::swap(h_, sk_copy.h_);
221 std::swap(m_, sk_copy.m_);
222 std::swap(r_, sk_copy.r_);
223 std::swap(n_, sk_copy.n_);
224 std::swap(total_wt_r_, sk_copy.total_wt_r_);
225 std::swap(rf_, sk_copy.rf_);
226 std::swap(curr_items_alloc_, sk_copy.curr_items_alloc_);
227 std::swap(filled_data_, sk_copy.filled_data_);
228 std::swap(allocator_, sk_copy.allocator_);
229 std::swap(data_, sk_copy.data_);
230 std::swap(weights_, sk_copy.weights_);
231 std::swap(num_marks_in_h_, sk_copy.num_marks_in_h_);
232 std::swap(marks_, sk_copy.marks_);
233 return *this;
234}
235
236template<typename T, typename A>
238 std::swap(k_, other.k_);
239 std::swap(h_, other.h_);
240 std::swap(m_, other.m_);
241 std::swap(r_, other.r_);
242 std::swap(n_, other.n_);
243 std::swap(total_wt_r_, other.total_wt_r_);
244 std::swap(rf_, other.rf_);
245 std::swap(curr_items_alloc_, other.curr_items_alloc_);
246 std::swap(filled_data_, other.filled_data_);
247 std::swap(allocator_, other.allocator_);
248 std::swap(data_, other.data_);
249 std::swap(weights_, other.weights_);
250 std::swap(num_marks_in_h_, other.num_marks_in_h_);
251 std::swap(marks_, other.marks_);
252 return *this;
253}
254
255/*
256 * An empty sketch requires 8 bytes.
257 *
258 * <pre>
259 * Long || Start Byte Adr:
260 * Adr:
261 * || 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
262 * 0 || Preamble_Longs | SerVer | FamID | Flags |---------Max Res. Size (K)---------|
263 * </pre>
264 *
265 * A non-empty sketch requires 24 bytes of preamble for an under-full sample; once there are
266 * at least k items the sketch uses 32 bytes of preamble.
267 *
268 * The count of items seen is limited to 48 bits (~256 trillion) even though there are adjacent
269 * unused preamble bits. The acceptance probability for an item is a double in the range [0,1),
270 * limiting us to 53 bits of randomness due to details of the IEEE floating point format. To
271 * ensure meaningful probabilities as the items seen count approaches capacity, we intentionally
272 * use slightly fewer bits.
273 *
274 * Following the header are weights for the heavy items, then marks in the event this is a gadget.
275 * The serialized items come last.
276 *
277 * <pre>
278 * Long || Start Byte Adr:
279 * Adr:
280 * || 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
281 * 0 || Preamble_Longs | SerVer | FamID | Flags |---------Max Res. Size (K)---------|
282 *
283 * || 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 |
284 * 1 ||---------------------------Items Seen Count (N)--------------------------------|
285 *
286 * || 16 | 17 | 18 | 19 | 20 | 21 | 22 | 23 |
287 * 2 ||-------------Item Count in H---------------|-------Item Count in R-------------|
288 *
289 * || 24 | 25 | 26 | 27 | 28 | 29 | 30 | 31 |
290 * 3 ||-------------------------------Total Weight in R-------------------------------|
291 * </pre>
292 */
293
294// implementation for fixed-size arithmetic types (integral and floating point)
295template<typename T, typename A>
296template<typename TT, typename SerDe, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
298 if (is_empty()) { return PREAMBLE_LONGS_EMPTY << 3; }
299 size_t num_bytes = (r_ == 0 ? PREAMBLE_LONGS_WARMUP : PREAMBLE_LONGS_FULL) << 3;
300 num_bytes += h_ * sizeof(double); // weights
301 if (marks_ != nullptr) { // marks
302 num_bytes += (h_ / 8) + (h_ % 8 > 0);
303 }
304 num_bytes += (h_ + r_) * sizeof(TT); // the actual items
305 return num_bytes;
306}
307
308// implementation for all other types
309template<typename T, typename A>
310template<typename TT, typename SerDe, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
311size_t var_opt_sketch<T, A>::get_serialized_size_bytes(const SerDe& sd) const {
312 if (is_empty()) { return PREAMBLE_LONGS_EMPTY << 3; }
313 size_t num_bytes = (r_ == 0 ? PREAMBLE_LONGS_WARMUP : PREAMBLE_LONGS_FULL) << 3;
314 num_bytes += h_ * sizeof(double); // weights
315 if (marks_ != nullptr) { // marks
316 num_bytes += (h_ / 8) + (h_ % 8 > 0);
317 }
318 // must iterate over the items
319 for (auto it: *this)
320 num_bytes += sd.size_of_item(it.first);
321 return num_bytes;
322}
323
324template<typename T, typename A>
325template<typename SerDe>
326std::vector<uint8_t, AllocU8<A>> var_opt_sketch<T, A>::serialize(unsigned header_size_bytes, const SerDe& sd) const {
327 const size_t size = header_size_bytes + get_serialized_size_bytes(sd);
328 std::vector<uint8_t, AllocU8<A>> bytes(size, 0, allocator_);
329 uint8_t* ptr = bytes.data() + header_size_bytes;
330 uint8_t* end_ptr = ptr + size;
331
332 bool empty = is_empty();
333 uint8_t preLongs = (empty ? PREAMBLE_LONGS_EMPTY
334 : (r_ == 0 ? PREAMBLE_LONGS_WARMUP : PREAMBLE_LONGS_FULL));
335 uint8_t first_byte = (preLongs & 0x3F) | ((static_cast<uint8_t>(rf_)) << 6);
336 uint8_t flags = (marks_ != nullptr ? GADGET_FLAG_MASK : 0);
337
338 if (empty) {
339 flags |= EMPTY_FLAG_MASK;
340 }
341
342 // first prelong
343 uint8_t ser_ver(SER_VER);
344 uint8_t family(FAMILY_ID);
345 ptr += copy_to_mem(first_byte, ptr);
346 ptr += copy_to_mem(ser_ver, ptr);
347 ptr += copy_to_mem(family, ptr);
348 ptr += copy_to_mem(flags, ptr);
349 ptr += copy_to_mem(k_, ptr);
350
351 if (!empty) {
352 // second and third prelongs
353 ptr += copy_to_mem(n_, ptr);
354 ptr += copy_to_mem(h_, ptr);
355 ptr += copy_to_mem(r_, ptr);
356
357 // fourth prelong, if needed
358 if (r_ > 0) {
359 ptr += copy_to_mem(total_wt_r_, ptr);
360 }
361
362 // first h_ weights
363 ptr += copy_to_mem(weights_, ptr, h_ * sizeof(double));
364
365 // first h_ marks as packed bytes iff we have a gadget
366 if (marks_ != nullptr) {
367 uint8_t val = 0;
368 for (uint32_t i = 0; i < h_; ++i) {
369 if (marks_[i]) {
370 val |= 0x1 << (i & 0x7);
371 }
372
373 if ((i & 0x7) == 0x7) {
374 ptr += copy_to_mem(val, ptr);
375 val = 0;
376 }
377 }
378
379 // write out any remaining values
380 if ((h_ & 0x7) > 0) {
381 ptr += copy_to_mem(val, ptr);
382 }
383 }
384
385 // write the sample items, skipping the gap. Either h_ or r_ may be 0
386 ptr += sd.serialize(ptr, end_ptr - ptr, data_, h_);
387 ptr += sd.serialize(ptr, end_ptr - ptr, &data_[h_ + 1], r_);
388 }
389
390 size_t bytes_written = ptr - bytes.data();
391 if (bytes_written != size) {
392 throw std::logic_error("serialized size mismatch: " + std::to_string(bytes_written) + " != " + std::to_string(size));
393 }
394
395 return bytes;
396}
397
398template<typename T, typename A>
399template<typename SerDe>
400void var_opt_sketch<T, A>::serialize(std::ostream& os, const SerDe& sd) const {
401 const bool empty = (h_ == 0) && (r_ == 0);
402
403 const uint8_t preLongs = (empty ? PREAMBLE_LONGS_EMPTY
404 : (r_ == 0 ? PREAMBLE_LONGS_WARMUP : PREAMBLE_LONGS_FULL));
405 const uint8_t first_byte = (preLongs & 0x3F) | ((static_cast<uint8_t>(rf_)) << 6);
406 uint8_t flags = (marks_ != nullptr ? GADGET_FLAG_MASK : 0);
407
408 if (empty) {
409 flags |= EMPTY_FLAG_MASK;
410 }
411
412 // first prelong
413 const uint8_t ser_ver(SER_VER);
414 const uint8_t family(FAMILY_ID);
415 write(os, first_byte);
416 write(os, ser_ver);
417 write(os, family);
418 write(os, flags);
419 write(os, k_);
420
421 if (!empty) {
422 // second and third prelongs
423 write(os, n_);
424 write(os, h_);
425 write(os, r_);
426
427 // fourth prelong, if needed
428 if (r_ > 0) {
429 write(os, total_wt_r_);
430 }
431
432 // write the first h_ weights
433 write(os, weights_, h_ * sizeof(double));
434
435 // write the first h_ marks as packed bytes iff we have a gadget
436 if (marks_ != nullptr) {
437 uint8_t val = 0;
438 for (uint32_t i = 0; i < h_; ++i) {
439 if (marks_[i]) {
440 val |= 0x1 << (i & 0x7);
441 }
442
443 if ((i & 0x7) == 0x7) {
444 write(os, val);
445 val = 0;
446 }
447 }
448
449 // write out any remaining values
450 if ((h_ & 0x7) > 0) {
451 write(os, val);
452 }
453 }
454
455 // write the sample items, skipping the gap. Either h_ or r_ may be 0
456 sd.serialize(os, data_, h_);
457 sd.serialize(os, &data_[h_ + 1], r_);
458 }
459}
460
461template<typename T, typename A>
462template<typename SerDe>
463var_opt_sketch<T, A> var_opt_sketch<T, A>::deserialize(const void* bytes, size_t size, const SerDe& sd, const A& allocator) {
464 ensure_minimum_memory(size, 8);
465 const char* ptr = static_cast<const char*>(bytes);
466 const char* base = ptr;
467 const char* end_ptr = ptr + size;
468 uint8_t first_byte;
469 ptr += copy_from_mem(ptr, first_byte);
470 uint8_t preamble_longs = first_byte & 0x3f;
471 resize_factor rf = static_cast<resize_factor>((first_byte >> 6) & 0x03);
472 uint8_t serial_version;
473 ptr += copy_from_mem(ptr, serial_version);
474 uint8_t family_id;
475 ptr += copy_from_mem(ptr, family_id);
476 uint8_t flags;
477 ptr += copy_from_mem(ptr, flags);
478 uint32_t k;
479 ptr += copy_from_mem(ptr, k);
480
481 check_preamble_longs(preamble_longs, flags);
482 check_family_and_serialization_version(family_id, serial_version);
483 ensure_minimum_memory(size, preamble_longs << 3);
484
485 const bool is_empty = flags & EMPTY_FLAG_MASK;
486 const bool is_gadget = flags & GADGET_FLAG_MASK;
487
488 if (is_empty) {
489 return var_opt_sketch(k, rf, is_gadget, allocator);
490 }
491
492 // second and third prelongs
493 uint64_t n;
494 uint32_t h, r;
495 ptr += copy_from_mem(ptr, n);
496 ptr += copy_from_mem(ptr, h);
497 ptr += copy_from_mem(ptr, r);
498
499 const uint32_t array_size = validate_and_get_target_size(preamble_longs, k, n, h, r, rf);
500
501 // current_items_alloc_ is set but validate R region weight (4th prelong), if needed, before allocating
502 double total_wt_r = 0.0;
503 if (preamble_longs == PREAMBLE_LONGS_FULL) {
504 ptr += copy_from_mem(ptr, total_wt_r);
505 if (std::isnan(total_wt_r) || r == 0 || total_wt_r <= 0.0) {
506 throw std::invalid_argument("Possible corruption: deserializing in full mode but r = 0 or invalid R weight. "
507 "Found r = " + std::to_string(r) + ", R region weight = " + std::to_string(total_wt_r));
508 }
509 } else {
510 total_wt_r = 0.0;
511 }
512
513 // read the first h_ weights, fill in rest of array with -1.0
514 check_memory_size(ptr - base + (h * sizeof(double)), size);
515 std::unique_ptr<double, weights_deleter> weights(AllocDouble(allocator).allocate(array_size),
516 weights_deleter(array_size, allocator));
517 double* wts = weights.get(); // to avoid lots of .get() calls -- do not delete
518 ptr += copy_from_mem(ptr, wts, h * sizeof(double));
519 for (size_t i = 0; i < h; ++i) {
520 if (!(wts[i] > 0.0)) {
521 throw std::invalid_argument("Possible corruption: Non-positive weight when deserializing: " + std::to_string(wts[i]));
522 }
523 }
524 std::fill(wts + h, wts + array_size, -1.0);
525
526 // read the first h_ marks as packed bytes iff we have a gadget
527 uint32_t num_marks_in_h = 0;
528 std::unique_ptr<bool, marks_deleter> marks(nullptr, marks_deleter(array_size, allocator));
529 if (is_gadget) {
530 uint8_t val = 0;
531 marks = std::unique_ptr<bool, marks_deleter>(AllocBool(allocator).allocate(array_size), marks_deleter(array_size, allocator));
532 const size_t size_marks = (h / 8) + (h % 8 > 0 ? 1 : 0);
533 check_memory_size(ptr - base + size_marks, size);
534 for (uint32_t i = 0; i < h; ++i) {
535 if ((i & 0x7) == 0x0) { // should trigger on first iteration
536 ptr += copy_from_mem(ptr, val);
537 }
538 marks.get()[i] = ((val >> (i & 0x7)) & 0x1) == 1;
539 num_marks_in_h += (marks.get()[i] ? 1 : 0);
540 }
541 }
542
543 // read the sample items, skipping the gap. Either h_ or r_ may be 0
544 items_deleter deleter(array_size, allocator);
545 std::unique_ptr<T, items_deleter> items(A(allocator).allocate(array_size), deleter);
546
547 ptr += sd.deserialize(ptr, end_ptr - ptr, items.get(), h);
548 items.get_deleter().set_h(h); // serde didn't throw, so the items are now valid
549
550 ptr += sd.deserialize(ptr, end_ptr - ptr, &(items.get()[h + 1]), r);
551 items.get_deleter().set_r(r); // serde didn't throw, so the items are now valid
552
553 return var_opt_sketch(k, h, (r > 0 ? 1 : 0), r, n, total_wt_r, rf, array_size, false,
554 std::move(items), std::move(weights), num_marks_in_h, std::move(marks), allocator);
555}
556
557template<typename T, typename A>
558template<typename SerDe>
559var_opt_sketch<T, A> var_opt_sketch<T, A>::deserialize(std::istream& is, const SerDe& sd, const A& allocator) {
560 const auto first_byte = read<uint8_t>(is);
561 uint8_t preamble_longs = first_byte & 0x3f;
562 const resize_factor rf = static_cast<resize_factor>((first_byte >> 6) & 0x03);
563 const auto serial_version = read<uint8_t>(is);
564 const auto family_id = read<uint8_t>(is);
565 const auto flags = read<uint8_t>(is);
566 const auto k = read<uint32_t>(is);
567
568 check_preamble_longs(preamble_longs, flags);
569 check_family_and_serialization_version(family_id, serial_version);
570
571 const bool is_empty = flags & EMPTY_FLAG_MASK;
572 const bool is_gadget = flags & GADGET_FLAG_MASK;
573
574 if (is_empty) {
575 if (!is.good())
576 throw std::runtime_error("error reading from std::istream");
577 else
578 return var_opt_sketch(k, rf, is_gadget, allocator);
579 }
580
581 // second and third prelongs
582 const auto n = read<uint64_t>(is);
583 const auto h = read<uint32_t>(is);
584 const auto r = read<uint32_t>(is);
585
586 const uint32_t array_size = validate_and_get_target_size(preamble_longs, k, n, h, r, rf);
587
588 // current_items_alloc_ is set but validate R region weight (4th prelong), if needed, before allocating
589 double total_wt_r = 0.0;
590 if (preamble_longs == PREAMBLE_LONGS_FULL) {
591 total_wt_r = read<double>(is);
592 if (std::isnan(total_wt_r) || r == 0 || total_wt_r <= 0.0) {
593 throw std::invalid_argument("Possible corruption: deserializing in full mode but r = 0 or invalid R weight. "
594 "Found r = " + std::to_string(r) + ", R region weight = " + std::to_string(total_wt_r));
595 }
596 }
597
598 // read the first h weights, fill remainder with -1.0
599 std::unique_ptr<double, weights_deleter> weights(AllocDouble(allocator).allocate(array_size),
600 weights_deleter(array_size, allocator));
601 double* wts = weights.get(); // to avoid lots of .get() calls -- do not delete
602 read(is, wts, h * sizeof(double));
603 for (size_t i = 0; i < h; ++i) {
604 if (!(wts[i] > 0.0)) {
605 throw std::invalid_argument("Possible corruption: Non-positive weight when deserializing: " + std::to_string(wts[i]));
606 }
607 }
608 std::fill(wts + h, wts + array_size, -1.0);
609
610 // read the first h_ marks as packed bytes iff we have a gadget
611 uint32_t num_marks_in_h = 0;
612 std::unique_ptr<bool, marks_deleter> marks(nullptr, marks_deleter(array_size, allocator));
613 if (is_gadget) {
614 marks = std::unique_ptr<bool, marks_deleter>(AllocBool(allocator).allocate(array_size), marks_deleter(array_size, allocator));
615 uint8_t val = 0;
616 for (uint32_t i = 0; i < h; ++i) {
617 if ((i & 0x7) == 0x0) { // should trigger on first iteration
618 val = read<uint8_t>(is);
619 }
620 marks.get()[i] = ((val >> (i & 0x7)) & 0x1) == 1;
621 num_marks_in_h += (marks.get()[i] ? 1 : 0);
622 }
623 }
624
625 // read the sample items, skipping the gap. Either h or r may be 0
626 items_deleter deleter(array_size, allocator);
627 std::unique_ptr<T, items_deleter> items(A(allocator).allocate(array_size), deleter);
628
629 sd.deserialize(is, items.get(), h); // aka &data_[0]
630 items.get_deleter().set_h(h); // serde didn't throw, so the items are now valid
631
632 sd.deserialize(is, &(items.get()[h + 1]), r);
633 items.get_deleter().set_r(r); // serde didn't throw, so the items are now valid
634
635 if (!is.good())
636 throw std::runtime_error("error reading from std::istream");
637
638 return var_opt_sketch(k, h, (r > 0 ? 1 : 0), r, n, total_wt_r, rf, array_size, false,
639 std::move(items), std::move(weights), num_marks_in_h, std::move(marks), allocator);
640}
641
642template<typename T, typename A>
644 return (h_ == 0 && r_ == 0);
645}
646
647template<typename T, typename A>
649 const uint32_t prev_alloc = curr_items_alloc_;
650 const uint32_t ceiling_lg_k = to_log_2(ceiling_power_of_2(k_));
651 const uint32_t initial_lg_size = starting_sub_multiple(ceiling_lg_k, rf_, MIN_LG_ARR_ITEMS);
652 curr_items_alloc_ = get_adjusted_size(k_, 1 << initial_lg_size);
653 if (curr_items_alloc_ == k_) { // if full size, need to leave 1 for the gap
654 ++curr_items_alloc_;
655 }
656
657 if (filled_data_) {
658 // destroy everything
659 const size_t num_to_destroy = std::min(k_ + 1, prev_alloc);
660 for (size_t i = 0; i < num_to_destroy; ++i)
661 data_[i].~T();
662 } else {
663 // skip gap or anything unused at the end
664 for (size_t i = 0; i < h_; ++i)
665 data_[i].~T();
666
667 for (size_t i = h_ + 1; i < h_ + r_ + 1; ++i)
668 data_[i].~T();
669 }
670
671 if (curr_items_alloc_ < prev_alloc) {
672 const bool is_gadget = (marks_ != nullptr);
673
674 allocator_.deallocate(data_, prev_alloc);
675 AllocDouble(allocator_).deallocate(weights_, prev_alloc);
676
677 if (marks_ != nullptr)
678 AllocBool(allocator_).deallocate(marks_, prev_alloc);
679
680 allocate_data_arrays(curr_items_alloc_, is_gadget);
681 }
682
683 n_ = 0;
684 h_ = 0;
685 m_ = 0;
686 r_ = 0;
687 num_marks_in_h_ = 0;
688 total_wt_r_ = 0.0;
689 filled_data_ = false;
690}
691
692template<typename T, typename A>
694 return n_;
695}
696
697template<typename T, typename A>
699 return k_;
700}
701
702template<typename T, typename A>
704 const uint32_t num_in_sketch = h_ + r_;
705 return (num_in_sketch < k_ ? num_in_sketch : k_);
706}
707
708template<typename T, typename A>
709void var_opt_sketch<T, A>::update(const T& item, double weight) {
710 update(item, weight, false);
711}
712
713template<typename T, typename A>
714void var_opt_sketch<T, A>::update(T&& item, double weight) {
715 update(std::move(item), weight, false);
716}
717
718template<typename T, typename A>
720 // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements.
721 // The stream does not support passing an allocator instance, and alternatives are complicated.
722 std::ostringstream os;
723 os << "### VarOpt SUMMARY:" << std::endl;
724 os << " k : " << k_ << std::endl;
725 os << " h : " << h_ << std::endl;
726 os << " r : " << r_ << std::endl;
727 os << " weight_r : " << total_wt_r_ << std::endl;
728 os << " Current size : " << curr_items_alloc_ << std::endl;
729 os << " Resize factor: " << (1 << rf_) << std::endl;
730 os << "### END SKETCH SUMMARY" << std::endl;
731 return string<A>(os.str().c_str(), allocator_);
732}
733
734template<typename T, typename A>
736 // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements.
737 // The stream does not support passing an allocator instance, and alternatives are complicated.
738 std::ostringstream os;
739 os << "### Sketch Items" << std::endl;
740 int idx = 0;
741 for (auto record : *this) {
742 os << idx << ": " << record.first << "\twt = " << record.second << std::endl;
743 ++idx;
744 }
745 return string<A>(os.str().c_str(), allocator_);
746}
747
748template<typename T, typename A>
749string<A> var_opt_sketch<T, A>::items_to_string(bool print_gap) const {
750 // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements.
751 // The stream does not support passing an allocator instance, and alternatives are complicated.
752 std::ostringstream os;
753 os << "### Sketch Items" << std::endl;
754 const uint32_t array_length = (n_ < k_ ? n_ : k_ + 1);
755 for (uint32_t i = 0, display_idx = 0; i < array_length; ++i) {
756 if (i == h_ && print_gap) {
757 os << display_idx << ": GAP" << std::endl;
758 ++display_idx;
759 } else {
760 os << display_idx << ": " << data_[i] << "\twt = ";
761 if (weights_[i] == -1.0) {
762 os << get_tau() << "\t(-1.0)" << std::endl;
763 } else {
764 os << weights_[i] << std::endl;
765 }
766 ++display_idx;
767 }
768 }
769 return string<A>(os.str().c_str(), allocator_);
770}
771
772template<typename T, typename A>
773template<typename O>
774void var_opt_sketch<T, A>::update(O&& item, double weight, bool mark) {
775 if (weight < 0.0 || std::isnan(weight) || std::isinf(weight)) {
776 throw std::invalid_argument("Item weights must be nonnegative and finite. Found: "
777 + std::to_string(weight));
778 } else if (weight == 0.0) {
779 return;
780 }
781 ++n_;
782
783 if (r_ == 0) {
784 // exact mode
785 update_warmup_phase(std::forward<O>(item), weight, mark);
786 } else {
787 // sketch is in estimation mode so we can make the following check,
788 // although very conservative to check every time
789 if ((h_ != 0) && (peek_min() < get_tau()))
790 throw std::logic_error("sketch not in valid estimation mode");
791
792 // what tau would be if deletion candidates turn out to be R plus the new item
793 // note: (r_ + 1) - 1 is intentional
794 const double hypothetical_tau = (weight + total_wt_r_) / ((r_ + 1) - 1);
795
796 // is new item's turn to be considered for reservoir?
797 const double condition1 = (h_ == 0) || (weight <= peek_min());
798
799 // is new item light enough for reservoir?
800 const double condition2 = weight < hypothetical_tau;
801
802 if (condition1 && condition2) {
803 update_light(std::forward<O>(item), weight, mark);
804 } else if (r_ == 1) {
805 update_heavy_r_eq1(std::forward<O>(item), weight, mark);
806 } else {
807 update_heavy_general(std::forward<O>(item), weight, mark);
808 }
809 }
810}
811
812template<typename T, typename A>
813template<typename O>
814void var_opt_sketch<T, A>::update_warmup_phase(O&& item, double weight, bool mark) {
815 // seems overly cautious
816 if (r_ > 0 || m_ != 0 || h_ > k_) throw std::logic_error("invalid sketch state during warmup");
817
818 if (h_ >= curr_items_alloc_) {
819 grow_data_arrays();
820 }
821
822 // store items as they come in until full
823 new (&data_[h_]) T(std::forward<O>(item));
824 weights_[h_] = weight;
825 if (marks_ != nullptr) {
826 marks_[h_] = mark;
827 }
828 ++h_;
829 num_marks_in_h_ += mark ? 1 : 0;
830
831 // check if need to heapify
832 if (h_ > k_) {
833 filled_data_ = true;
834 transition_from_warmup();
835 }
836}
837
838/* In the "light" case the new item has weight <= old_tau, so
839 would appear to the right of the R items in a hypothetical reverse-sorted
840 list. It is easy to prove that it is light enough to be part of this
841 round's downsampling */
842template<typename T, typename A>
843template<typename O>
844void var_opt_sketch<T, A>::update_light(O&& item, double weight, bool mark) {
845 if (r_ == 0 || (r_ + h_) != k_) throw std::logic_error("invalid sketch state during light warmup");
846
847 const uint32_t m_slot = h_; // index of the gap, which becomes the M region
848 if (filled_data_) {
849 if (&data_[m_slot] != &item)
850 data_[m_slot] = std::forward<O>(item);
851 } else {
852 new (&data_[m_slot]) T(std::forward<O>(item));
853 filled_data_ = true;
854 }
855 weights_[m_slot] = weight;
856 if (marks_ != nullptr) { marks_[m_slot] = mark; }
857 ++m_;
858
859 grow_candidate_set(total_wt_r_ + weight, r_ + 1);
860}
861
862/* In the "heavy" case the new item has weight > old_tau, so would
863 appear to the left of items in R in a hypothetical reverse-sorted list and
864 might or might not be light enough be part of this round's downsampling.
865 [After first splitting off the R=1 case] we greatly simplify the code by
866 putting the new item into the H heap whether it needs to be there or not.
867 In other words, it might go into the heap and then come right back out,
868 but that should be okay because pseudo_heavy items cannot predominate
869 in long streams unless (max wt) / (min wt) > o(exp(N)) */
870template<typename T, typename A>
871template<typename O>
872void var_opt_sketch<T, A>::update_heavy_general(O&& item, double weight, bool mark) {
873 if (r_ < 2 || m_ != 0 || (r_ + h_) != k_) throw std::logic_error("invalid sketch state during heavy general update");
874
875 // put into H, although may come back out momentarily
876 push(std::forward<O>(item), weight, mark);
877
878 grow_candidate_set(total_wt_r_, r_);
879}
880
881/* The analysis of this case is similar to that of the general heavy case.
882 The one small technical difference is that since R < 2, we must grab an M item
883 to have a valid starting point for continue_by_growing_candidate_set () */
884template<typename T, typename A>
885template<typename O>
886void var_opt_sketch<T, A>::update_heavy_r_eq1(O&& item, double weight, bool mark) {
887 if (r_ != 1 || m_ != 0 || (r_ + h_) != k_) throw std::logic_error("invalid sketch state during heavy r=1 update");
888
889 push(std::forward<O>(item), weight, mark); // new item into H
890 pop_min_to_m_region(); // pop lightest back into M
891
892 // Any set of two items is downsample-able to one item,
893 // so the two lightest items are a valid starting point for the following
894 const uint32_t m_slot = k_ - 1; // array is k+1, 1 in R, so slot before is M
895 grow_candidate_set(weights_[m_slot] + total_wt_r_, 2);
896}
897
898/*
899 * Decreases sketch's value of k by 1, updating stored values as needed.
900 *
901 * <p>Subject to certain pre-conditions, decreasing k causes tau to increase. This fact is used by
902 * the unioning algorithm to force "marked" items out of H and into the reservoir region.</p>
903 */
904template<typename T, typename A>
905void var_opt_sketch<T, A>::decrease_k_by_1() {
906 if (k_ <= 1) {
907 throw std::logic_error("Cannot decrease k below 1 in union");
908 }
909
910 if ((h_ == 0) && (r_ == 0)) {
911 // exact mode, but no data yet; this reduction is somewhat gratuitous
912 --k_;
913 } else if ((h_ > 0) && (r_ == 0)) {
914 // exact mode, but we have some data
915 --k_;
916 if (h_ > k_) {
917 transition_from_warmup();
918 }
919 } else if ((h_ > 0) && (r_ > 0)) {
920 // reservoir mode, but we have some exact samples.
921 // Our strategy will be to pull an item out of H (which we are allowed to do since it's
922 // still just data), reduce k, and then re-insert the item
923
924 // first, slide the R zone to the left by 1, temporarily filling the gap
925 const uint32_t old_gap_idx = h_;
926 const uint32_t old_final_r_idx = (h_ + 1 + r_) - 1;
927 if (old_final_r_idx != k_) throw std::logic_error("gadget in invalid state");
928
929 swap_values(old_final_r_idx, old_gap_idx);
930 filled_data_ = true; // we just filled the gap, and no need to check previous state
931
932 // now we pull an item out of H; any item is ok, but if we grab the rightmost and then
933 // reduce h_, the heap invariant will be preserved (and the gap will be restored), plus
934 // the push() of the item that will probably happen later will be cheap.
935
936 const uint32_t pulled_idx = h_ - 1;
937 double pulled_weight = weights_[pulled_idx];
938 bool pulled_mark = marks_[pulled_idx];
939 // will move the pulled item below; don't do antying to it here
940
941 if (pulled_mark) { --num_marks_in_h_; }
942 weights_[pulled_idx] = -1.0; // to make bugs easier to spot
943
944 --h_;
945 --k_;
946 --n_; // will be re-incremented with the update
947
948 update(std::move(data_[pulled_idx]), pulled_weight, pulled_mark);
949 } else if ((h_ == 0) && (r_ > 0)) {
950 // pure reservoir mode, so can simply eject a randomly chosen sample from the reservoir
951 if (r_ < 2) throw std::logic_error("r_ too small for pure reservoir mode");
952
953 const uint32_t r_idx_to_delete = 1 + next_int(r_); // 1 for the gap
954 const uint32_t rightmost_r_idx = (1 + r_) - 1;
955 swap_values(r_idx_to_delete, rightmost_r_idx);
956 weights_[rightmost_r_idx] = -1.0;
957
958 --k_;
959 --r_;
960 }
961}
962
963template<typename T, typename A>
964void var_opt_sketch<T, A>::allocate_data_arrays(uint32_t tgt_size, bool use_marks) {
965 filled_data_ = false;
966
967 data_ = allocator_.allocate(tgt_size);
968 weights_ = AllocDouble(allocator_).allocate(tgt_size);
969
970 if (use_marks) {
971 marks_ = AllocBool(allocator_).allocate(tgt_size);
972 } else {
973 marks_ = nullptr;
974 }
975}
976
977template<typename T, typename A>
978void var_opt_sketch<T, A>::grow_data_arrays() {
979 const uint32_t prev_size = curr_items_alloc_;
980 curr_items_alloc_ = get_adjusted_size(k_, curr_items_alloc_ << rf_);
981 if (curr_items_alloc_ == k_) {
982 ++curr_items_alloc_;
983 }
984
985 if (prev_size < curr_items_alloc_) {
986 filled_data_ = false;
987
988 T* tmp_data = allocator_.allocate(curr_items_alloc_);
989 double* tmp_weights = AllocDouble(allocator_).allocate(curr_items_alloc_);
990
991 for (uint32_t i = 0; i < prev_size; ++i) {
992 new (&tmp_data[i]) T(std::move(data_[i]));
993 data_[i].~T();
994 tmp_weights[i] = weights_[i];
995 }
996
997 allocator_.deallocate(data_, prev_size);
998 AllocDouble(allocator_).deallocate(weights_, prev_size);
999
1000 data_ = tmp_data;
1001 weights_ = tmp_weights;
1002
1003 if (marks_ != nullptr) {
1004 bool* tmp_marks = AllocBool(allocator_).allocate(curr_items_alloc_);
1005 for (uint32_t i = 0; i < prev_size; ++i) {
1006 tmp_marks[i] = marks_[i];
1007 }
1008 AllocBool(allocator_).deallocate(marks_, prev_size);
1009 marks_ = tmp_marks;
1010 }
1011 }
1012}
1013
1014template<typename T, typename A>
1015void var_opt_sketch<T, A>::transition_from_warmup() {
1016 // Move the 2 lightest items from H to M
1017 // But the lighter really belongs in R, so update counts to reflect that
1018 convert_to_heap();
1019 pop_min_to_m_region();
1020 pop_min_to_m_region();
1021 --m_;
1022 ++r_;
1023
1024 if (h_ != (k_ -1) || m_ != 1 || r_ != 1)
1025 throw std::logic_error("invalid state for transitioning from warmup");
1026
1027 // Update total weight in R and then, having grabbed the value, overwrite
1028 // in weight_ array to help make bugs more obvious
1029 total_wt_r_ = weights_[k_]; // only one item, known location
1030 weights_[k_] = -1.0;
1031
1032 // The two lightest items are ncessarily downsample-able to one item,
1033 // and are therefore a valid initial candidate set
1034 grow_candidate_set(weights_[k_ - 1] + total_wt_r_, 2);
1035}
1036
1037template<typename T, typename A>
1038void var_opt_sketch<T, A>::convert_to_heap() {
1039 if (h_ < 2) {
1040 return; // nothing to do
1041 }
1042
1043 const uint32_t last_slot = h_ - 1;
1044 const int last_non_leaf = ((last_slot + 1) / 2) - 1;
1045
1046 for (int j = last_non_leaf; j >= 0; --j) {
1047 restore_towards_leaves(j);
1048 }
1049
1050 // validates heap, used for initial debugging
1051 //for (uint32_t j = h_ - 1; j >= 1; --j) {
1052 // uint32_t p = ((j + 1) / 2) - 1;
1053 // if (weights_[p] > weights_[j]) throw std::logic_error("invalid heap");
1054 //}
1055}
1056
1057template<typename T, typename A>
1058void var_opt_sketch<T, A>::restore_towards_leaves(uint32_t slot_in) {
1059 const uint32_t last_slot = h_ - 1;
1060 if (h_ == 0 || slot_in > last_slot) throw std::logic_error("invalid heap state");
1061
1062 uint32_t slot = slot_in;
1063 uint32_t child = (2 * slot_in) + 1; // might be invalid, need to check
1064
1065 while (child <= last_slot) {
1066 uint32_t child2 = child + 1; // might also be invalid
1067 if ((child2 <= last_slot) && (weights_[child2] < weights_[child])) {
1068 // siwtch to other child if it's both valid and smaller
1069 child = child2;
1070 }
1071
1072 if (weights_[slot] <= weights_[child]) {
1073 // invariant holds so we're done
1074 break;
1075 }
1076
1077 // swap and continue
1078 swap_values(slot, child);
1079
1080 slot = child;
1081 child = (2 * slot) + 1; // might be invalid, checked on next loop
1082 }
1083}
1084
1085template<typename T, typename A>
1086void var_opt_sketch<T, A>::restore_towards_root(uint32_t slot_in) {
1087 uint32_t slot = slot_in;
1088 uint32_t p = (((slot + 1) / 2) - 1); // valid if slot >= 1
1089 while ((slot > 0) && (weights_[slot] < weights_[p])) {
1090 swap_values(slot, p);
1091 slot = p;
1092 p = (((slot + 1) / 2) - 1); // valid if slot >= 1
1093 }
1094}
1095
1096template<typename T, typename A>
1097template<typename O>
1098void var_opt_sketch<T, A>::push(O&& item, double wt, bool mark) {
1099 if (filled_data_) {
1100 if (&data_[h_] != &item)
1101 data_[h_] = std::forward<O>(item);
1102 } else {
1103 new (&data_[h_]) T(std::forward<O>(item));
1104 filled_data_ = true;
1105 }
1106 weights_[h_] = wt;
1107 if (marks_ != nullptr) {
1108 marks_[h_] = mark;
1109 num_marks_in_h_ += (mark ? 1 : 0);
1110 }
1111 ++h_;
1112
1113 restore_towards_root(h_ - 1); // need use old h_, but want accurate h_
1114}
1115
1116template<typename T, typename A>
1117void var_opt_sketch<T, A>::pop_min_to_m_region() {
1118 if (h_ == 0 || (h_ + m_ + r_ != k_ + 1))
1119 throw std::logic_error("invalid heap state popping min to M region");
1120
1121 if (h_ == 1) {
1122 // just update bookkeeping
1123 ++m_;
1124 --h_;
1125 } else {
1126 // main case
1127 uint32_t tgt = h_ - 1; // last slot, will swap with root
1128 swap_values(0, tgt);
1129 ++m_;
1130 --h_;
1131
1132 restore_towards_leaves(0);
1133 }
1134
1135 if (is_marked(h_)) {
1136 --num_marks_in_h_;
1137 }
1138}
1139
1140
1141template<typename T, typename A>
1142void var_opt_sketch<T, A>::swap_values(uint32_t src, uint32_t dst) {
1143 std::swap(data_[src], data_[dst]);
1144 std::swap(weights_[src], weights_[dst]);
1145
1146 if (marks_ != nullptr) {
1147 std::swap(marks_[src], marks_[dst]);
1148 }
1149}
1150
1151/* When entering here we should be in a well-characterized state where the
1152 new item has been placed in either h or m and we have a valid but not necessarily
1153 maximal sampling plan figured out. The array is completely full at this point.
1154 Everyone in h and m has an explicit weight. The candidates are right-justified
1155 and are either just the r set or the r set + exactly one m item. The number
1156 of cands is at least 2. We will now grow the candidate set as much as possible
1157 by pulling sufficiently light items from h to m.
1158*/
1159template<typename T, typename A>
1160void var_opt_sketch<T, A>::grow_candidate_set(double wt_cands, uint32_t num_cands) {
1161 if ((h_ + m_ + r_ != k_ + 1) || (num_cands < 1) || (num_cands != m_ + r_) || (m_ >= 2))
1162 throw std::logic_error("invariant violated when growing candidate set");
1163
1164 while (h_ > 0) {
1165 const double next_wt = peek_min();
1166 const double next_tot_wt = wt_cands + next_wt;
1167
1168 // test for strict lightness of next prospect (denominator multiplied through)
1169 // ideally: (next_wt * (next_num_cands-1) < next_tot_wt)
1170 // but can use num_cands directly
1171 if ((next_wt * num_cands) < next_tot_wt) {
1172 wt_cands = next_tot_wt;
1173 ++num_cands;
1174 pop_min_to_m_region(); // adjusts h_ and m_
1175 } else {
1176 break;
1177 }
1178 }
1179
1180 downsample_candidate_set(wt_cands, num_cands);
1181}
1182
1183template<typename T, typename A>
1184void var_opt_sketch<T, A>::downsample_candidate_set(double wt_cands, uint32_t num_cands) {
1185 if (num_cands < 2 || h_ + num_cands != k_ + 1)
1186 throw std::logic_error("invalid num_cands when downsampling");
1187
1188 // need this before overwriting anything
1189 const uint32_t delete_slot = choose_delete_slot(wt_cands, num_cands);
1190 const uint32_t leftmost_cand_slot = h_;
1191 if (delete_slot < leftmost_cand_slot || delete_slot > k_)
1192 throw std::logic_error("invalid delete slot index when downsampling");
1193
1194 // Overwrite weights for items from M moving into R,
1195 // to make bugs more obvious. Also needed so anyone reading the
1196 // weight knows if it's invalid without checking h_ and m_
1197 const uint32_t stop_idx = leftmost_cand_slot + m_;
1198 for (uint32_t j = leftmost_cand_slot; j < stop_idx; ++j) {
1199 weights_[j] = -1.0;
1200 }
1201
1202 // The next line works even when delete_slot == leftmost_cand_slot
1203 data_[delete_slot] = std::move(data_[leftmost_cand_slot]);
1204
1205 m_ = 0;
1206 r_ = num_cands - 1;
1207 total_wt_r_ = wt_cands;
1208}
1209
1210template<typename T, typename A>
1211uint32_t var_opt_sketch<T, A>::choose_delete_slot(double wt_cands, uint32_t num_cands) const {
1212 if (r_ == 0) throw std::logic_error("choosing delete slot while in exact mode");
1213
1214 if (m_ == 0) {
1215 // this happens if we insert a really heavy item
1216 return pick_random_slot_in_r();
1217 } else if (m_ == 1) {
1218 // check if we keep th item in M or pick oen from R
1219 // p(keep) = (num_cand - 1) * wt_M / wt_cand
1220 double wt_m_cand = weights_[h_]; // slot of item in M is h_
1221 if ((wt_cands * next_double_exclude_zero()) < ((num_cands - 1) * wt_m_cand)) {
1222 return pick_random_slot_in_r(); // keep item in M
1223 } else {
1224 return h_; // indext of item in M
1225 }
1226 } else {
1227 // general case
1228 const uint32_t delete_slot = choose_weighted_delete_slot(wt_cands, num_cands);
1229 const uint32_t first_r_slot = h_ + m_;
1230 if (delete_slot == first_r_slot) {
1231 return pick_random_slot_in_r();
1232 } else {
1233 return delete_slot;
1234 }
1235 }
1236}
1237
1238template<typename T, typename A>
1239uint32_t var_opt_sketch<T, A>::choose_weighted_delete_slot(double wt_cands, uint32_t num_cands) const {
1240 if (m_ < 1) throw std::logic_error("must have weighted delete slot");
1241
1242 const uint32_t offset = h_;
1243 const uint32_t final_m = (offset + m_) - 1;
1244 const uint32_t num_to_keep = num_cands - 1;
1245
1246 double left_subtotal = 0.0;
1247 double right_subtotal = -1.0 * wt_cands * next_double_exclude_zero();
1248
1249 for (uint32_t i = offset; i <= final_m; ++i) {
1250 left_subtotal += num_to_keep * weights_[i];
1251 right_subtotal += wt_cands;
1252
1253 if (left_subtotal < right_subtotal) {
1254 return i;
1255 }
1256 }
1257
1258 // this slot tells caller that we need to delete out of R
1259 return final_m + 1;
1260}
1261
1262template<typename T, typename A>
1263uint32_t var_opt_sketch<T, A>::pick_random_slot_in_r() const {
1264 if (r_ == 0) throw std::logic_error("r_ = 0 when picking slot in R region");
1265 const uint32_t offset = h_ + m_;
1266 if (r_ == 1) {
1267 return offset;
1268 } else {
1269 return offset + next_int(r_);
1270 }
1271}
1272
1273template<typename T, typename A>
1274double var_opt_sketch<T, A>::peek_min() const {
1275 if (h_ == 0) throw std::logic_error("h_ = 0 when checking min in H region");
1276 return weights_[0];
1277}
1278
1279template<typename T, typename A>
1280inline bool var_opt_sketch<T, A>::is_marked(uint32_t idx) const {
1281 return marks_ == nullptr ? false : marks_[idx];
1282}
1283
1284template<typename T, typename A>
1285double var_opt_sketch<T, A>::get_tau() const {
1286 return r_ == 0 ? std::nan("1") : (total_wt_r_ / r_);
1287}
1288
1289template<typename T, typename A>
1290void var_opt_sketch<T, A>::strip_marks() {
1291 if (marks_ == nullptr) throw std::logic_error("request to strip marks from non-gadget");
1292 num_marks_in_h_ = 0;
1293 AllocBool(allocator_).deallocate(marks_, curr_items_alloc_);
1294 marks_ = nullptr;
1295}
1296
1297template<typename T, typename A>
1298void var_opt_sketch<T, A>::check_preamble_longs(uint8_t preamble_longs, uint8_t flags) {
1299 const bool is_empty(flags & EMPTY_FLAG_MASK);
1300
1301 if (is_empty) {
1302 if (preamble_longs != PREAMBLE_LONGS_EMPTY) {
1303 throw std::invalid_argument("Possible corruption: Preamble longs must be "
1304 + std::to_string(PREAMBLE_LONGS_EMPTY) + " for an empty sketch. Found: "
1305 + std::to_string(preamble_longs));
1306 }
1307 } else {
1308 if (preamble_longs != PREAMBLE_LONGS_WARMUP
1309 && preamble_longs != PREAMBLE_LONGS_FULL) {
1310 throw std::invalid_argument("Possible corruption: Preamble longs must be "
1311 + std::to_string(PREAMBLE_LONGS_WARMUP) + " or "
1312 + std::to_string(PREAMBLE_LONGS_FULL)
1313 + " for a non-empty sketch. Found: " + std::to_string(preamble_longs));
1314 }
1315 }
1316}
1317
1318template<typename T, typename A>
1319void var_opt_sketch<T, A>::check_family_and_serialization_version(uint8_t family_id, uint8_t ser_ver) {
1320 if (family_id == FAMILY_ID) {
1321 if (ser_ver != SER_VER) {
1322 throw std::invalid_argument("Possible corruption: VarOpt serialization version must be "
1323 + std::to_string(SER_VER) + ". Found: " + std::to_string(ser_ver));
1324 }
1325 return;
1326 }
1327 // TODO: extend to handle reservoir sampling
1328
1329 throw std::invalid_argument("Possible corruption: VarOpt family id must be "
1330 + std::to_string(FAMILY_ID) + ". Found: " + std::to_string(family_id));
1331}
1332
1333template<typename T, typename A>
1334uint32_t var_opt_sketch<T, A>::validate_and_get_target_size(uint32_t preamble_longs, uint32_t k, uint64_t n,
1335 uint32_t h, uint32_t r, resize_factor rf) {
1336 if (k == 0 || k > MAX_K) {
1337 throw std::invalid_argument("k must be at least 1 and less than 2^31 - 1");
1338 }
1339
1340 uint32_t array_size;
1341
1342 if (n <= k) {
1343 if (preamble_longs != PREAMBLE_LONGS_WARMUP) {
1344 throw std::invalid_argument("Possible corruption: deserializing with n <= k but not in warmup mode. "
1345 "Found n = " + std::to_string(n) + ", k = " + std::to_string(k));
1346 }
1347 if (n != h) {
1348 throw std::invalid_argument("Possible corruption: deserializing in warmup mode but n != h. "
1349 "Found n = " + std::to_string(n) + ", h = " + std::to_string(h));
1350 }
1351 if (r > 0) {
1352 throw std::invalid_argument("Possible corruption: deserializing in warmup mode but r > 0. "
1353 "Found r = " + std::to_string(r));
1354 }
1355
1356 const uint32_t ceiling_lg_k = to_log_2(ceiling_power_of_2(k));
1357 const uint32_t min_lg_size = to_log_2(ceiling_power_of_2(h));
1358 const uint32_t initial_lg_size = starting_sub_multiple(ceiling_lg_k, rf, min_lg_size);
1359 array_size = get_adjusted_size(k, 1 << initial_lg_size);
1360 if (array_size == k) { // if full size, need to leave 1 for the gap
1361 ++array_size;
1362 }
1363 } else { // n > k
1364 if (preamble_longs != PREAMBLE_LONGS_FULL) {
1365 throw std::invalid_argument("Possible corruption: deserializing with n > k but not in full mode. "
1366 "Found n = " + std::to_string(n) + ", k = " + std::to_string(k));
1367 }
1368 if (h + r != k) {
1369 throw std::invalid_argument("Possible corruption: deserializing in full mode but h + r != n. "
1370 "Found h = " + std::to_string(h) + ", r = " + std::to_string(r) + ", n = " + std::to_string(n));
1371 }
1372
1373 array_size = k + 1;
1374 }
1375
1376 return array_size;
1377}
1378
1379template<typename T, typename A>
1380template<typename P>
1381subset_summary var_opt_sketch<T, A>::estimate_subset_sum(P predicate) const {
1382 if (n_ == 0) {
1383 return {0.0, 0.0, 0.0, 0.0};
1384 }
1385
1386 double total_wt_h = 0.0;
1387 double h_true_wt = 0.0;
1388 size_t idx = 0;
1389 for (; idx < h_; ++idx) {
1390 double wt = weights_[idx];
1391 total_wt_h += wt;
1392 if (predicate(data_[idx])) {
1393 h_true_wt += wt;
1394 }
1395 }
1396
1397 // if only heavy items, we have an exact answer
1398 if (r_ == 0) {
1399 return {h_true_wt, h_true_wt, h_true_wt, h_true_wt};
1400 }
1401
1402 // since r_ > 0, we know we have samples
1403 const uint64_t num_samples = n_ - h_;
1404 double effective_sampling_rate = r_ / static_cast<double>(num_samples);
1405 if (effective_sampling_rate < 0.0 || effective_sampling_rate > 1.0)
1406 throw std::logic_error("invalid sampling rate outside [0.0, 1.0]");
1407
1408 uint32_t r_true_count = 0;
1409 ++idx; // skip the gap
1410 for (; idx < (k_ + 1); ++idx) {
1411 if (predicate(data_[idx])) {
1412 ++r_true_count;
1413 }
1414 }
1415
1416 double lb_true_fraction = pseudo_hypergeometric_lb_on_p(r_, r_true_count, effective_sampling_rate);
1417 double estimated_true_fraction = (1.0 * r_true_count) / r_;
1418 double ub_true_fraction = pseudo_hypergeometric_ub_on_p(r_, r_true_count, effective_sampling_rate);
1419
1420 return { h_true_wt + (total_wt_r_ * lb_true_fraction),
1421 h_true_wt + (total_wt_r_ * estimated_true_fraction),
1422 h_true_wt + (total_wt_r_ * ub_true_fraction),
1423 total_wt_h + total_wt_r_
1424 };
1425}
1426
1427template<typename T, typename A>
1428class var_opt_sketch<T, A>::items_deleter {
1429 public:
1430 items_deleter(uint32_t num, const A& allocator) : num(num), h_count(0), r_count(0), allocator(allocator) {}
1431 void set_h(uint32_t h) { h_count = h; }
1432 void set_r(uint32_t r) { r_count = r; }
1433 void operator() (T* ptr) {
1434 if (h_count > 0) {
1435 for (size_t i = 0; i < h_count; ++i) {
1436 ptr[i].~T();
1437 }
1438 }
1439 if (r_count > 0) {
1440 uint32_t end = h_count + r_count + 1;
1441 for (size_t i = h_count + 1; i < end; ++i) {
1442 ptr[i].~T();
1443 }
1444 }
1445 if (ptr != nullptr) {
1446 allocator.deallocate(ptr, num);
1447 }
1448 }
1449 private:
1450 uint32_t num;
1451 uint32_t h_count;
1452 uint32_t r_count;
1453 A allocator;
1454};
1455
1456template<typename T, typename A>
1457class var_opt_sketch<T, A>::weights_deleter {
1458 public:
1459 weights_deleter(uint32_t num, const A& allocator) : num(num), allocator(allocator) {}
1460 void operator() (double* ptr) {
1461 if (ptr != nullptr) {
1462 allocator.deallocate(ptr, num);
1463 }
1464 }
1465 private:
1466 uint32_t num;
1467 AllocDouble allocator;
1468};
1469
1470template<typename T, typename A>
1471class var_opt_sketch<T, A>::marks_deleter {
1472 public:
1473 marks_deleter(uint32_t num, const A& allocator) : num(num), allocator(allocator) {}
1474 void operator() (bool* ptr) {
1475 if (ptr != nullptr) {
1476 allocator.deallocate(ptr, 1);
1477 }
1478 }
1479 private:
1480 uint32_t num;
1481 AllocBool allocator;
1482};
1483
1484
1485template<typename T, typename A>
1487 return const_iterator(*this, false);
1488}
1489
1490template<typename T, typename A>
1492 return const_iterator(*this, true);
1493}
1494
1495// -------- var_opt_sketch::const_iterator implementation ---------
1496
1497template<typename T, typename A>
1499 sk_(&sk),
1500 cum_r_weight_(0.0),
1501 r_item_wt_(sk.get_tau()),
1502 final_idx_(sk.r_ > 0 ? sk.h_ + sk.r_ + 1 : sk.h_)
1503{
1504 // index logic easier to read if not inline
1505 if (is_end) {
1506 idx_ = final_idx_;
1507 sk_ = nullptr;
1508 } else {
1509 idx_ = (sk.h_ == 0 && sk.r_ > 0 ? 1 : 0); // skip if gap is at start
1510 }
1511
1512 // should only apply if sketch is empty
1513 if (idx_ == final_idx_) { sk_ = nullptr; }
1514}
1515
1516template<typename T, typename A>
1517var_opt_sketch<T, A>::const_iterator::const_iterator(const var_opt_sketch& sk, bool is_end, bool use_r_region) :
1518 sk_(&sk),
1519 cum_r_weight_(0.0),
1520 r_item_wt_(sk.get_tau()),
1521 final_idx_(sk.h_ + (use_r_region ? 1 + sk.r_ : 0))
1522{
1523 if (use_r_region) {
1524 idx_ = sk.h_ + 1 + (is_end ? sk.r_ : 0);
1525 } else { // H region
1526 // gap at start only if h_ == 0, so index always starts at 0
1527 idx_ = (is_end ? sk.h_ : 0);
1528 }
1529
1530 // unlike in full iterator case, may happen even if sketch is not empty
1531 if (idx_ == final_idx_) { sk_ = nullptr; }
1532}
1533
1534template<typename T, typename A>
1535var_opt_sketch<T, A>::const_iterator::const_iterator(const const_iterator& other) :
1536 sk_(other.sk_),
1537 cum_r_weight_(other.cum_r_weight_),
1538 r_item_wt_(other.r_item_wt_),
1539 idx_(other.idx_),
1540 final_idx_(other.final_idx_)
1541{}
1542
1543template<typename T, typename A>
1544typename var_opt_sketch<T, A>::const_iterator& var_opt_sketch<T, A>::const_iterator::operator++() {
1545 // accumulate weight already visited
1546 if (idx_ > sk_->h_) { cum_r_weight_ += r_item_wt_; }
1547
1548 ++idx_;
1549
1550 if (idx_ == final_idx_) {
1551 sk_ = nullptr;
1552 return *this;
1553 } else if (idx_ == sk_->h_ && sk_->r_ > 0) { // check for the gap
1554 ++idx_;
1555 }
1556 return *this;
1557}
1558
1559template<typename T, typename A>
1560typename var_opt_sketch<T, A>::const_iterator& var_opt_sketch<T, A>::const_iterator::operator++(int) {
1561 const_iterator tmp(*this);
1562 operator++();
1563 return tmp;
1564}
1565
1566template<typename T, typename A>
1567bool var_opt_sketch<T, A>::const_iterator::operator==(const const_iterator& other) const {
1568 if (sk_ != other.sk_) return false;
1569 if (sk_ == nullptr) return true; // end (and we know other.sk_ is also null)
1570 return idx_ == other.idx_;
1571}
1572
1573template<typename T, typename A>
1574bool var_opt_sketch<T, A>::const_iterator::operator!=(const const_iterator& other) const {
1575 return !operator==(other);
1576}
1577
1578template<typename T, typename A>
1579auto var_opt_sketch<T, A>::const_iterator::operator*() const -> reference {
1580 double wt;
1581 if (idx_ < sk_->h_) {
1582 wt = sk_->weights_[idx_];
1583 } else {
1584 wt = r_item_wt_;
1585 }
1586 return value_type(sk_->data_[idx_], wt);
1587}
1588
1589template<typename T, typename A>
1590auto var_opt_sketch<T, A>::const_iterator::operator->() const -> pointer {
1591 return **this;
1592}
1593
1594template<typename T, typename A>
1595bool var_opt_sketch<T, A>::const_iterator::get_mark() const {
1596 return sk_->marks_ == nullptr ? false : sk_->marks_[idx_];
1597}
1598
1599
1600// -------- var_opt_sketch::iterator implementation ---------
1601
1602template<typename T, typename A>
1603var_opt_sketch<T, A>::iterator::iterator(const var_opt_sketch& sk, bool is_end, bool use_r_region) :
1604 sk_(&sk),
1605 cum_r_weight_(0.0),
1606 r_item_wt_(sk.get_tau()),
1607 final_idx_(sk.h_ + (use_r_region ? 1 + sk.r_ : 0))
1608{
1609 if (use_r_region) {
1610 idx_ = sk.h_ + 1 + (is_end ? sk.r_ : 0);
1611 } else { // H region
1612 // gap at start only if h_ == 0, so index always starts at 0
1613 idx_ = (is_end ? sk.h_ : 0);
1614 }
1615
1616 // unlike in full iterator case, may happen even if sketch is not empty
1617 if (idx_ == final_idx_) { sk_ = nullptr; }
1618}
1619
1620template<typename T, typename A>
1621var_opt_sketch<T, A>::iterator::iterator(const iterator& other) :
1622 sk_(other.sk_),
1623 cum_r_weight_(other.cum_r_weight_),
1624 r_item_wt_(other.r_item_wt_),
1625 idx_(other.idx_),
1626 final_idx_(other.final_idx_)
1627{}
1628
1629template<typename T, typename A>
1630typename var_opt_sketch<T, A>::iterator& var_opt_sketch<T, A>::iterator::operator++() {
1631 // accumulate weight already visited
1632 if (idx_ > sk_->h_) { cum_r_weight_ += r_item_wt_; }
1633
1634 ++idx_;
1635
1636 if (idx_ == final_idx_) {
1637 sk_ = nullptr;
1638 return *this;
1639 } else if (idx_ == sk_->h_ && sk_->r_ > 0) { // check for the gap
1640 ++idx_;
1641 }
1642
1643 return *this;
1644}
1645
1646template<typename T, typename A>
1647typename var_opt_sketch<T, A>::iterator& var_opt_sketch<T, A>::iterator::operator++(int) {
1648 const_iterator tmp(*this);
1649 operator++();
1650 return tmp;
1651}
1652
1653template<typename T, typename A>
1654bool var_opt_sketch<T, A>::iterator::operator==(const iterator& other) const {
1655 if (sk_ != other.sk_) return false;
1656 if (sk_ == nullptr) return true; // end (and we know other.sk_ is also null)
1657 return idx_ == other.idx_;
1658}
1659
1660template<typename T, typename A>
1661bool var_opt_sketch<T, A>::iterator::operator!=(const iterator& other) const {
1662 return !operator==(other);
1663}
1664
1665template<typename T, typename A>
1666auto var_opt_sketch<T, A>::iterator::operator*() -> reference {
1667 double wt;
1668 if (idx_ < sk_->h_) {
1669 wt = sk_->weights_[idx_];
1670 } else if (idx_ == final_idx_ - 1) {
1671 wt = sk_->total_wt_r_ - cum_r_weight_;
1672 } else {
1673 wt = r_item_wt_;
1674 }
1675 return value_type(sk_->data_[idx_], wt);
1676}
1677
1678template<typename T, typename A>
1679auto var_opt_sketch<T, A>::iterator::operator->() -> pointer {
1680 return **this;
1681}
1682
1683template<typename T, typename A>
1684bool var_opt_sketch<T, A>::iterator::get_mark() const {
1685 return sk_->marks_ == nullptr ? false : sk_->marks_[idx_];
1686}
1687
1688/*
1689 * Checks if target sampling allocation is more than 50% of max sampling size.
1690 * If so, returns max sampling size, otherwise passes through target size.
1691 */
1692template<typename T, typename A>
1693uint32_t var_opt_sketch<T, A>::get_adjusted_size(uint32_t max_size, uint32_t resize_target) {
1694 if (max_size < (resize_target << 1)) {
1695 return max_size;
1696 }
1697 return resize_target;
1698}
1699
1700template<typename T, typename A>
1701uint32_t var_opt_sketch<T, A>::starting_sub_multiple(uint32_t lg_target, uint32_t lg_rf, uint32_t lg_min) {
1702 return (lg_target <= lg_min)
1703 ? lg_min : (lg_rf == 0) ? lg_target
1704 : (lg_target - lg_min) % lg_rf + lg_min;
1705}
1706
1707template<typename T, typename A>
1708double var_opt_sketch<T, A>::pseudo_hypergeometric_ub_on_p(uint64_t n, uint32_t k, double sampling_rate) {
1709 const double adjusted_kappa = DEFAULT_KAPPA * sqrt(1 - sampling_rate);
1711}
1712
1713template<typename T, typename A>
1714double var_opt_sketch<T, A>::pseudo_hypergeometric_lb_on_p(uint64_t n, uint32_t k, double sampling_rate) {
1715 const double adjusted_kappa = DEFAULT_KAPPA * sqrt(1 - sampling_rate);
1717}
1718
1719template<typename T, typename A>
1720bool var_opt_sketch<T, A>::is_power_of_2(uint32_t v) {
1721 return v && !(v & (v - 1));
1722}
1723
1724template<typename T, typename A>
1725uint32_t var_opt_sketch<T, A>::to_log_2(uint32_t v) {
1726 if (is_power_of_2(v)) {
1727 return count_trailing_zeros_in_u32(v);
1728 } else {
1729 throw std::invalid_argument("Attempt to compute integer log2 of non-positive or non-power of 2");
1730 }
1731}
1732
1733// Returns an integer in the range [0, max_value) -- excludes max_value
1734template<typename T, typename A>
1735uint32_t var_opt_sketch<T, A>::next_int(uint32_t max_value) {
1736 std::uniform_int_distribution<uint32_t> dist(0, max_value - 1);
1737 return dist(random_utils::rand);
1738}
1739
1740template<typename T, typename A>
1741double var_opt_sketch<T, A>::next_double_exclude_zero() {
1742 double r = random_utils::next_double(random_utils::rand);
1743 while (r == 0.0) {
1744 r = random_utils::next_double(random_utils::rand);
1745 }
1746 return r;
1747}
1748
1749}
1750
1751// namespace datasketches
1752
1753#endif // _VAR_OPT_SKETCH_IMPL_HPP_
static double approximate_upper_bound_on_p(uint64_t n, uint64_t k, double num_std_devs)
Computes upper bound of approximate Clopper-Pearson confidence interval for a binomial proportion.
Definition bounds_binomial_proportions.hpp:148
static double approximate_lower_bound_on_p(uint64_t n, uint64_t k, double num_std_devs)
Computes lower bound of approximate Clopper-Pearson confidence interval for a binomial proportion.
Definition bounds_binomial_proportions.hpp:113
This sketch samples data from a stream of items.
Definition var_opt_sketch.hpp:67
var_opt_sketch(uint32_t k, resize_factor rf=var_opt_constants::DEFAULT_RESIZE_FACTOR, const A &allocator=A())
Constructor.
Definition var_opt_sketch_impl.hpp:46
DataSketches namespace.
Definition binomial_bounds.hpp:38