datasketches-cpp
Loading...
Searching...
No Matches
var_opt_sketch_impl.hpp
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#ifndef _VAR_OPT_SKETCH_IMPL_HPP_
21#define _VAR_OPT_SKETCH_IMPL_HPP_
22
23#include <memory>
24#include <sstream>
25#include <cmath>
26#include <random>
27#include <algorithm>
28#include <stdexcept>
29
30#include "var_opt_sketch.hpp"
31#include "serde.hpp"
32#include "bounds_binomial_proportions.hpp"
33#include "count_zeros.hpp"
34#include "memory_operations.hpp"
35#include "ceiling_power_of_2.hpp"
36
37namespace datasketches {
38
39/*
40 * Implementation code for the VarOpt sketch.
41 *
42 * author Kevin Lang
43 * author Jon Malkin
44 */
45template<typename T, typename A>
46var_opt_sketch<T, A>::var_opt_sketch(uint32_t k, resize_factor rf, const A& allocator) :
47 var_opt_sketch(k, rf, false, allocator) {}
48
49template<typename T, typename A>
51 k_(other.k_),
52 h_(other.h_),
53 m_(other.m_),
54 r_(other.r_),
55 n_(other.n_),
56 total_wt_r_(other.total_wt_r_),
57 rf_(other.rf_),
58 curr_items_alloc_(other.curr_items_alloc_),
59 filled_data_(other.filled_data_),
60 allocator_(other.allocator_),
61 data_(nullptr),
62 weights_(nullptr),
63 num_marks_in_h_(other.num_marks_in_h_),
64 marks_(nullptr)
65 {
66 data_ = allocator_.allocate(curr_items_alloc_);
67 // skip gap or anything unused at the end
68 for (size_t i = 0; i < h_; ++i)
69 new (&data_[i]) T(other.data_[i]);
70 for (size_t i = h_ + 1; i < h_ + r_ + 1; ++i)
71 new (&data_[i]) T(other.data_[i]);
72
73 // we skipped the gap
74 filled_data_ = false;
75
76 weights_ = AllocDouble(allocator_).allocate(curr_items_alloc_);
77 // doubles so can successfully copy regardless of the internal state
78 std::copy(other.weights_, other.weights_ + curr_items_alloc_, weights_);
79
80 if (other.marks_ != nullptr) {
81 marks_ = AllocBool(allocator_).allocate(curr_items_alloc_);
82 std::copy(other.marks_, other.marks_ + curr_items_alloc_, marks_);
83 }
84 }
85
86template<typename T, typename A>
87var_opt_sketch<T, A>::var_opt_sketch(const var_opt_sketch& other, bool as_sketch, uint64_t adjusted_n) :
88 k_(other.k_),
89 h_(other.h_),
90 m_(other.m_),
91 r_(other.r_),
92 n_(adjusted_n),
93 total_wt_r_(other.total_wt_r_),
94 rf_(other.rf_),
95 curr_items_alloc_(other.curr_items_alloc_),
96 filled_data_(other.filled_data_),
97 allocator_(other.allocator_),
98 data_(nullptr),
99 weights_(nullptr),
100 num_marks_in_h_(other.num_marks_in_h_),
101 marks_(nullptr)
102 {
103 data_ = allocator_.allocate(curr_items_alloc_);
104 // skip gap or anything unused at the end
105 for (size_t i = 0; i < h_; ++i)
106 new (&data_[i]) T(other.data_[i]);
107 for (size_t i = h_ + 1; i < h_ + r_ + 1; ++i)
108 new (&data_[i]) T(other.data_[i]);
109
110 // we skipped the gap
111 filled_data_ = false;
112
113 weights_ = AllocDouble(allocator_).allocate(curr_items_alloc_);
114 // doubles so can successfully copy regardless of the internal state
115 std::copy(other.weights_, other.weights_ + curr_items_alloc_, weights_);
116
117 if (!as_sketch && other.marks_ != nullptr) {
118 marks_ = AllocBool(allocator_).allocate(curr_items_alloc_);
119 std::copy(other.marks_, other.marks_ + curr_items_alloc_, marks_);
120 }
121 }
122
123template<typename T, typename A>
125 k_(other.k_),
126 h_(other.h_),
127 m_(other.m_),
128 r_(other.r_),
129 n_(other.n_),
130 total_wt_r_(other.total_wt_r_),
131 rf_(other.rf_),
132 curr_items_alloc_(other.curr_items_alloc_),
133 filled_data_(other.filled_data_),
134 allocator_(other.allocator_),
135 data_(other.data_),
136 weights_(other.weights_),
137 num_marks_in_h_(other.num_marks_in_h_),
138 marks_(other.marks_)
139 {
140 other.data_ = nullptr;
141 other.weights_ = nullptr;
142 other.marks_ = nullptr;
143 }
144
145template<typename T, typename A>
146var_opt_sketch<T, A>::var_opt_sketch(uint32_t k, resize_factor rf, bool is_gadget, const A& allocator) :
147 k_(k), h_(0), m_(0), r_(0), n_(0), total_wt_r_(0.0), rf_(rf), allocator_(allocator) {
148 if (k == 0 || k_ > MAX_K) {
149 throw std::invalid_argument("k must be at least 1 and less than 2^31 - 1");
150 }
151
152 uint32_t ceiling_lg_k = to_log_2(ceiling_power_of_2(k_));
153 uint32_t initial_lg_size = starting_sub_multiple(ceiling_lg_k, rf_, MIN_LG_ARR_ITEMS);
154 curr_items_alloc_ = get_adjusted_size(k_, 1 << initial_lg_size);
155 if (curr_items_alloc_ == k_) { // if full size, need to leave 1 for the gap
156 ++curr_items_alloc_;
157 }
158
159 allocate_data_arrays(curr_items_alloc_, is_gadget);
160 num_marks_in_h_ = 0;
161}
162
163template<typename T, typename A>
164var_opt_sketch<T, A>::var_opt_sketch(uint32_t k, uint32_t h, uint32_t m, uint32_t r, uint64_t n, double total_wt_r, resize_factor rf,
165 uint32_t curr_items_alloc, bool filled_data, std::unique_ptr<T, items_deleter> items,
166 std::unique_ptr<double, weights_deleter> weights, uint32_t num_marks_in_h,
167 std::unique_ptr<bool, marks_deleter> marks, const A& allocator) :
168 k_(k),
169 h_(h),
170 m_(m),
171 r_(r),
172 n_(n),
173 total_wt_r_(total_wt_r),
174 rf_(rf),
175 curr_items_alloc_(curr_items_alloc),
176 filled_data_(filled_data),
177 allocator_(allocator),
178 data_(items.release()),
179 weights_(weights.release()),
180 num_marks_in_h_(num_marks_in_h),
181 marks_(marks.release())
182{}
184
185template<typename T, typename A>
187 if (data_ != nullptr) {
188 if (filled_data_) {
189 // destroy everything
190 const size_t num_to_destroy = std::min(k_ + 1, curr_items_alloc_);
191 for (size_t i = 0; i < num_to_destroy; ++i) {
192 data_[i].~T();
193 }
194 } else {
195 // skip gap or anything unused at the end
196 for (size_t i = 0; i < h_; ++i) {
197 data_[i].~T();
198 }
199
200 for (size_t i = h_ + 1; i < h_ + r_ + 1; ++i) {
201 data_[i].~T();
202 }
203 }
204 allocator_.deallocate(data_, curr_items_alloc_);
205 }
206
207 if (weights_ != nullptr) {
208 AllocDouble(allocator_).deallocate(weights_, curr_items_alloc_);
209 }
210
211 if (marks_ != nullptr) {
212 AllocBool(allocator_).deallocate(marks_, curr_items_alloc_);
213 }
214}
216template<typename T, typename A>
218 var_opt_sketch sk_copy(other);
219 std::swap(k_, sk_copy.k_);
220 std::swap(h_, sk_copy.h_);
221 std::swap(m_, sk_copy.m_);
222 std::swap(r_, sk_copy.r_);
223 std::swap(n_, sk_copy.n_);
224 std::swap(total_wt_r_, sk_copy.total_wt_r_);
225 std::swap(rf_, sk_copy.rf_);
226 std::swap(curr_items_alloc_, sk_copy.curr_items_alloc_);
227 std::swap(filled_data_, sk_copy.filled_data_);
228 std::swap(allocator_, sk_copy.allocator_);
229 std::swap(data_, sk_copy.data_);
230 std::swap(weights_, sk_copy.weights_);
231 std::swap(num_marks_in_h_, sk_copy.num_marks_in_h_);
232 std::swap(marks_, sk_copy.marks_);
233 return *this;
234}
235
236template<typename T, typename A>
238 std::swap(k_, other.k_);
239 std::swap(h_, other.h_);
240 std::swap(m_, other.m_);
241 std::swap(r_, other.r_);
242 std::swap(n_, other.n_);
243 std::swap(total_wt_r_, other.total_wt_r_);
244 std::swap(rf_, other.rf_);
245 std::swap(curr_items_alloc_, other.curr_items_alloc_);
246 std::swap(filled_data_, other.filled_data_);
247 std::swap(allocator_, other.allocator_);
248 std::swap(data_, other.data_);
249 std::swap(weights_, other.weights_);
250 std::swap(num_marks_in_h_, other.num_marks_in_h_);
251 std::swap(marks_, other.marks_);
252 return *this;
253}
254
255/*
256 * An empty sketch requires 8 bytes.
257 *
258 * <pre>
259 * Long || Start Byte Adr:
260 * Adr:
261 * || 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
262 * 0 || Preamble_Longs | SerVer | FamID | Flags |---------Max Res. Size (K)---------|
263 * </pre>
264 *
265 * A non-empty sketch requires 24 bytes of preamble for an under-full sample; once there are
266 * at least k items the sketch uses 32 bytes of preamble.
267 *
268 * The count of items seen is limited to 48 bits (~256 trillion) even though there are adjacent
269 * unused preamble bits. The acceptance probability for an item is a double in the range [0,1),
270 * limiting us to 53 bits of randomness due to details of the IEEE floating point format. To
271 * ensure meaningful probabilities as the items seen count approaches capacity, we intentionally
272 * use slightly fewer bits.
273 *
274 * Following the header are weights for the heavy items, then marks in the event this is a gadget.
275 * The serialized items come last.
276 *
277 * <pre>
278 * Long || Start Byte Adr:
279 * Adr:
280 * || 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
281 * 0 || Preamble_Longs | SerVer | FamID | Flags |---------Max Res. Size (K)---------|
282 *
283 * || 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 |
284 * 1 ||---------------------------Items Seen Count (N)--------------------------------|
285 *
286 * || 16 | 17 | 18 | 19 | 20 | 21 | 22 | 23 |
287 * 2 ||-------------Item Count in H---------------|-------Item Count in R-------------|
288 *
289 * || 24 | 25 | 26 | 27 | 28 | 29 | 30 | 31 |
290 * 3 ||-------------------------------Total Weight in R-------------------------------|
291 * </pre>
292 */
293
294// implementation for fixed-size arithmetic types (integral and floating point)
295template<typename T, typename A>
296template<typename TT, typename SerDe, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
298 if (is_empty()) { return PREAMBLE_LONGS_EMPTY << 3; }
299 size_t num_bytes = (r_ == 0 ? PREAMBLE_LONGS_WARMUP : PREAMBLE_LONGS_FULL) << 3;
300 num_bytes += h_ * sizeof(double); // weights
301 if (marks_ != nullptr) { // marks
302 num_bytes += (h_ / 8) + (h_ % 8 > 0);
303 }
304 num_bytes += (h_ + r_) * sizeof(TT); // the actual items
305 return num_bytes;
306}
307
308// implementation for all other types
309template<typename T, typename A>
310template<typename TT, typename SerDe, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
311size_t var_opt_sketch<T, A>::get_serialized_size_bytes(const SerDe& sd) const {
312 if (is_empty()) { return PREAMBLE_LONGS_EMPTY << 3; }
313 size_t num_bytes = (r_ == 0 ? PREAMBLE_LONGS_WARMUP : PREAMBLE_LONGS_FULL) << 3;
314 num_bytes += h_ * sizeof(double); // weights
315 if (marks_ != nullptr) { // marks
316 num_bytes += (h_ / 8) + (h_ % 8 > 0);
317 }
318 // must iterate over the items
319 for (auto it: *this)
320 num_bytes += sd.size_of_item(it.first);
321 return num_bytes;
322}
323
324template<typename T, typename A>
325template<typename SerDe>
326std::vector<uint8_t, AllocU8<A>> var_opt_sketch<T, A>::serialize(unsigned header_size_bytes, const SerDe& sd) const {
327 const size_t size = header_size_bytes + get_serialized_size_bytes(sd);
328 std::vector<uint8_t, AllocU8<A>> bytes(size, 0, allocator_);
329 uint8_t* ptr = bytes.data() + header_size_bytes;
330 uint8_t* end_ptr = ptr + size;
331
332 bool empty = is_empty();
333 uint8_t preLongs = (empty ? PREAMBLE_LONGS_EMPTY
334 : (r_ == 0 ? PREAMBLE_LONGS_WARMUP : PREAMBLE_LONGS_FULL));
335 uint8_t first_byte = (preLongs & 0x3F) | ((static_cast<uint8_t>(rf_)) << 6);
336 uint8_t flags = (marks_ != nullptr ? GADGET_FLAG_MASK : 0);
337
338 if (empty) {
339 flags |= EMPTY_FLAG_MASK;
340 }
341
342 // first prelong
343 uint8_t ser_ver(SER_VER);
344 uint8_t family(FAMILY_ID);
345 ptr += copy_to_mem(first_byte, ptr);
346 ptr += copy_to_mem(ser_ver, ptr);
347 ptr += copy_to_mem(family, ptr);
348 ptr += copy_to_mem(flags, ptr);
349 ptr += copy_to_mem(k_, ptr);
350
351 if (!empty) {
352 // second and third prelongs
353 ptr += copy_to_mem(n_, ptr);
354 ptr += copy_to_mem(h_, ptr);
355 ptr += copy_to_mem(r_, ptr);
356
357 // fourth prelong, if needed
358 if (r_ > 0) {
359 ptr += copy_to_mem(total_wt_r_, ptr);
360 }
361
362 // first h_ weights
363 ptr += copy_to_mem(weights_, ptr, h_ * sizeof(double));
364
365 // first h_ marks as packed bytes iff we have a gadget
366 if (marks_ != nullptr) {
367 uint8_t val = 0;
368 for (uint32_t i = 0; i < h_; ++i) {
369 if (marks_[i]) {
370 val |= 0x1 << (i & 0x7);
371 }
372
373 if ((i & 0x7) == 0x7) {
374 ptr += copy_to_mem(val, ptr);
375 val = 0;
376 }
377 }
378
379 // write out any remaining values
380 if ((h_ & 0x7) > 0) {
381 ptr += copy_to_mem(val, ptr);
382 }
383 }
384
385 // write the sample items, skipping the gap. Either h_ or r_ may be 0
386 ptr += sd.serialize(ptr, end_ptr - ptr, data_, h_);
387 ptr += sd.serialize(ptr, end_ptr - ptr, &data_[h_ + 1], r_);
388 }
389
390 size_t bytes_written = ptr - bytes.data();
391 if (bytes_written != size) {
392 throw std::logic_error("serialized size mismatch: " + std::to_string(bytes_written) + " != " + std::to_string(size));
393 }
394
395 return bytes;
396}
397
398template<typename T, typename A>
399template<typename SerDe>
400void var_opt_sketch<T, A>::serialize(std::ostream& os, const SerDe& sd) const {
401 const bool empty = (h_ == 0) && (r_ == 0);
402
403 const uint8_t preLongs = (empty ? PREAMBLE_LONGS_EMPTY
404 : (r_ == 0 ? PREAMBLE_LONGS_WARMUP : PREAMBLE_LONGS_FULL));
405 const uint8_t first_byte = (preLongs & 0x3F) | ((static_cast<uint8_t>(rf_)) << 6);
406 uint8_t flags = (marks_ != nullptr ? GADGET_FLAG_MASK : 0);
407
408 if (empty) {
409 flags |= EMPTY_FLAG_MASK;
410 }
411
412 // first prelong
413 const uint8_t ser_ver(SER_VER);
414 const uint8_t family(FAMILY_ID);
415 write(os, first_byte);
416 write(os, ser_ver);
417 write(os, family);
418 write(os, flags);
419 write(os, k_);
420
421 if (!empty) {
422 // second and third prelongs
423 write(os, n_);
424 write(os, h_);
425 write(os, r_);
426
427 // fourth prelong, if needed
428 if (r_ > 0) {
429 write(os, total_wt_r_);
430 }
431
432 // write the first h_ weights
433 write(os, weights_, h_ * sizeof(double));
434
435 // write the first h_ marks as packed bytes iff we have a gadget
436 if (marks_ != nullptr) {
437 uint8_t val = 0;
438 for (uint32_t i = 0; i < h_; ++i) {
439 if (marks_[i]) {
440 val |= 0x1 << (i & 0x7);
441 }
442
443 if ((i & 0x7) == 0x7) {
444 write(os, val);
445 val = 0;
446 }
447 }
448
449 // write out any remaining values
450 if ((h_ & 0x7) > 0) {
451 write(os, val);
452 }
453 }
454
455 // write the sample items, skipping the gap. Either h_ or r_ may be 0
456 sd.serialize(os, data_, h_);
457 sd.serialize(os, &data_[h_ + 1], r_);
458 }
459}
460
461template<typename T, typename A>
462template<typename SerDe>
463var_opt_sketch<T, A> var_opt_sketch<T, A>::deserialize(const void* bytes, size_t size, const SerDe& sd, const A& allocator) {
464 ensure_minimum_memory(size, 8);
465 const char* ptr = static_cast<const char*>(bytes);
466 const char* base = ptr;
467 const char* end_ptr = ptr + size;
468 uint8_t first_byte;
469 ptr += copy_from_mem(ptr, first_byte);
470 uint8_t preamble_longs = first_byte & 0x3f;
471 resize_factor rf = static_cast<resize_factor>((first_byte >> 6) & 0x03);
472 uint8_t serial_version;
473 ptr += copy_from_mem(ptr, serial_version);
474 uint8_t family_id;
475 ptr += copy_from_mem(ptr, family_id);
476 uint8_t flags;
477 ptr += copy_from_mem(ptr, flags);
478 uint32_t k;
479 ptr += copy_from_mem(ptr, k);
480
481 check_preamble_longs(preamble_longs, flags);
482 check_family_and_serialization_version(family_id, serial_version);
483 ensure_minimum_memory(size, preamble_longs << 3);
484
485 const bool is_empty = flags & EMPTY_FLAG_MASK;
486 const bool is_gadget = flags & GADGET_FLAG_MASK;
487
488 if (is_empty) {
489 return var_opt_sketch(k, rf, is_gadget, allocator);
490 }
491
492 // second and third prelongs
493 uint64_t n;
494 uint32_t h, r;
495 ptr += copy_from_mem(ptr, n);
496 ptr += copy_from_mem(ptr, h);
497 ptr += copy_from_mem(ptr, r);
498
499 const uint32_t array_size = validate_and_get_target_size(preamble_longs, k, n, h, r, rf);
500
501 // current_items_alloc_ is set but validate R region weight (4th prelong), if needed, before allocating
502 double total_wt_r = 0.0;
503 if (preamble_longs == PREAMBLE_LONGS_FULL) {
504 ptr += copy_from_mem(ptr, total_wt_r);
505 if (std::isnan(total_wt_r) || r == 0 || total_wt_r <= 0.0) {
506 throw std::invalid_argument("Possible corruption: deserializing in full mode but r = 0 or invalid R weight. "
507 "Found r = " + std::to_string(r) + ", R region weight = " + std::to_string(total_wt_r));
508 }
509 } else {
510 total_wt_r = 0.0;
511 }
512
513 // read the first h_ weights, fill in rest of array with -1.0
514 check_memory_size(ptr - base + (h * sizeof(double)), size);
515 std::unique_ptr<double, weights_deleter> weights(AllocDouble(allocator).allocate(array_size),
516 weights_deleter(array_size, allocator));
517 double* wts = weights.get(); // to avoid lots of .get() calls -- do not delete
518 ptr += copy_from_mem(ptr, wts, h * sizeof(double));
519 for (size_t i = 0; i < h; ++i) {
520 if (!(wts[i] > 0.0)) {
521 throw std::invalid_argument("Possible corruption: Non-positive weight when deserializing: " + std::to_string(wts[i]));
522 }
523 }
524 std::fill(wts + h, wts + array_size, -1.0);
525
526 // read the first h_ marks as packed bytes iff we have a gadget
527 uint32_t num_marks_in_h = 0;
528 std::unique_ptr<bool, marks_deleter> marks(nullptr, marks_deleter(array_size, allocator));
529 if (is_gadget) {
530 uint8_t val = 0;
531 marks = std::unique_ptr<bool, marks_deleter>(AllocBool(allocator).allocate(array_size), marks_deleter(array_size, allocator));
532 const size_t size_marks = (h / 8) + (h % 8 > 0 ? 1 : 0);
533 check_memory_size(ptr - base + size_marks, size);
534 for (uint32_t i = 0; i < h; ++i) {
535 if ((i & 0x7) == 0x0) { // should trigger on first iteration
536 ptr += copy_from_mem(ptr, val);
537 }
538 marks.get()[i] = ((val >> (i & 0x7)) & 0x1) == 1;
539 num_marks_in_h += (marks.get()[i] ? 1 : 0);
540 }
541 }
542
543 // read the sample items, skipping the gap. Either h_ or r_ may be 0
544 items_deleter deleter(array_size, allocator);
545 std::unique_ptr<T, items_deleter> items(A(allocator).allocate(array_size), deleter);
546
547 ptr += sd.deserialize(ptr, end_ptr - ptr, items.get(), h);
548 items.get_deleter().set_h(h); // serde didn't throw, so the items are now valid
549
550 ptr += sd.deserialize(ptr, end_ptr - ptr, &(items.get()[h + 1]), r);
551 items.get_deleter().set_r(r); // serde didn't throw, so the items are now valid
552
553 return var_opt_sketch(k, h, (r > 0 ? 1 : 0), r, n, total_wt_r, rf, array_size, false,
554 std::move(items), std::move(weights), num_marks_in_h, std::move(marks), allocator);
555}
556
557template<typename T, typename A>
558template<typename SerDe>
559var_opt_sketch<T, A> var_opt_sketch<T, A>::deserialize(std::istream& is, const SerDe& sd, const A& allocator) {
560 const auto first_byte = read<uint8_t>(is);
561 uint8_t preamble_longs = first_byte & 0x3f;
562 const resize_factor rf = static_cast<resize_factor>((first_byte >> 6) & 0x03);
563 const auto serial_version = read<uint8_t>(is);
564 const auto family_id = read<uint8_t>(is);
565 const auto flags = read<uint8_t>(is);
566 const auto k = read<uint32_t>(is);
567
568 check_preamble_longs(preamble_longs, flags);
569 check_family_and_serialization_version(family_id, serial_version);
570
571 const bool is_empty = flags & EMPTY_FLAG_MASK;
572 const bool is_gadget = flags & GADGET_FLAG_MASK;
573
574 if (is_empty) {
575 if (!is.good())
576 throw std::runtime_error("error reading from std::istream");
577 else
578 return var_opt_sketch(k, rf, is_gadget, allocator);
579 }
580
581 // second and third prelongs
582 const auto n = read<uint64_t>(is);
583 const auto h = read<uint32_t>(is);
584 const auto r = read<uint32_t>(is);
585
586 const uint32_t array_size = validate_and_get_target_size(preamble_longs, k, n, h, r, rf);
587
588 // current_items_alloc_ is set but validate R region weight (4th prelong), if needed, before allocating
589 double total_wt_r = 0.0;
590 if (preamble_longs == PREAMBLE_LONGS_FULL) {
591 total_wt_r = read<double>(is);
592 if (std::isnan(total_wt_r) || r == 0 || total_wt_r <= 0.0) {
593 throw std::invalid_argument("Possible corruption: deserializing in full mode but r = 0 or invalid R weight. "
594 "Found r = " + std::to_string(r) + ", R region weight = " + std::to_string(total_wt_r));
595 }
596 }
597
598 // read the first h weights, fill remainder with -1.0
599 std::unique_ptr<double, weights_deleter> weights(AllocDouble(allocator).allocate(array_size),
600 weights_deleter(array_size, allocator));
601 double* wts = weights.get(); // to avoid lots of .get() calls -- do not delete
602 read(is, wts, h * sizeof(double));
603 for (size_t i = 0; i < h; ++i) {
604 if (!(wts[i] > 0.0)) {
605 throw std::invalid_argument("Possible corruption: Non-positive weight when deserializing: " + std::to_string(wts[i]));
606 }
607 }
608 std::fill(wts + h, wts + array_size, -1.0);
609
610 // read the first h_ marks as packed bytes iff we have a gadget
611 uint32_t num_marks_in_h = 0;
612 std::unique_ptr<bool, marks_deleter> marks(nullptr, marks_deleter(array_size, allocator));
613 if (is_gadget) {
614 marks = std::unique_ptr<bool, marks_deleter>(AllocBool(allocator).allocate(array_size), marks_deleter(array_size, allocator));
615 uint8_t val = 0;
616 for (uint32_t i = 0; i < h; ++i) {
617 if ((i & 0x7) == 0x0) { // should trigger on first iteration
618 val = read<uint8_t>(is);
619 }
620 marks.get()[i] = ((val >> (i & 0x7)) & 0x1) == 1;
621 num_marks_in_h += (marks.get()[i] ? 1 : 0);
622 }
623 }
624
625 // read the sample items, skipping the gap. Either h or r may be 0
626 items_deleter deleter(array_size, allocator);
627 std::unique_ptr<T, items_deleter> items(A(allocator).allocate(array_size), deleter);
628
629 sd.deserialize(is, items.get(), h); // aka &data_[0]
630 items.get_deleter().set_h(h); // serde didn't throw, so the items are now valid
631
632 sd.deserialize(is, &(items.get()[h + 1]), r);
633 items.get_deleter().set_r(r); // serde didn't throw, so the items are now valid
634
635 if (!is.good())
636 throw std::runtime_error("error reading from std::istream");
637
638 return var_opt_sketch(k, h, (r > 0 ? 1 : 0), r, n, total_wt_r, rf, array_size, false,
639 std::move(items), std::move(weights), num_marks_in_h, std::move(marks), allocator);
640}
641
642template<typename T, typename A>
644 return (h_ == 0 && r_ == 0);
645}
646
647template<typename T, typename A>
649 const uint32_t prev_alloc = curr_items_alloc_;
650 const uint32_t ceiling_lg_k = to_log_2(ceiling_power_of_2(k_));
651 const uint32_t initial_lg_size = starting_sub_multiple(ceiling_lg_k, rf_, MIN_LG_ARR_ITEMS);
652 curr_items_alloc_ = get_adjusted_size(k_, 1 << initial_lg_size);
653 if (curr_items_alloc_ == k_) { // if full size, need to leave 1 for the gap
654 ++curr_items_alloc_;
655 }
656
657 if (filled_data_) {
658 // destroy everything
659 const size_t num_to_destroy = std::min(k_ + 1, prev_alloc);
660 for (size_t i = 0; i < num_to_destroy; ++i)
661 data_[i].~T();
662 } else {
663 // skip gap or anything unused at the end
664 for (size_t i = 0; i < h_; ++i)
665 data_[i].~T();
666
667 for (size_t i = h_ + 1; i < h_ + r_ + 1; ++i)
668 data_[i].~T();
669 }
670
671 if (curr_items_alloc_ < prev_alloc) {
672 const bool is_gadget = (marks_ != nullptr);
673
674 allocator_.deallocate(data_, prev_alloc);
675 AllocDouble(allocator_).deallocate(weights_, prev_alloc);
676
677 if (marks_ != nullptr)
678 AllocBool(allocator_).deallocate(marks_, prev_alloc);
679
680 allocate_data_arrays(curr_items_alloc_, is_gadget);
681 }
682
683 n_ = 0;
684 h_ = 0;
685 m_ = 0;
686 r_ = 0;
687 num_marks_in_h_ = 0;
688 total_wt_r_ = 0.0;
689 filled_data_ = false;
690}
691
692template<typename T, typename A>
694 return n_;
695}
696
697template<typename T, typename A>
699 return k_;
700}
701
702template<typename T, typename A>
704 const uint32_t num_in_sketch = h_ + r_;
705 return (num_in_sketch < k_ ? num_in_sketch : k_);
706}
707
708template<typename T, typename A>
709void var_opt_sketch<T, A>::update(const T& item, double weight) {
710 update(item, weight, false);
711}
712
713template<typename T, typename A>
714void var_opt_sketch<T, A>::update(T&& item, double weight) {
715 update(std::move(item), weight, false);
716}
717
718template<typename T, typename A>
720 // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements.
721 // The stream does not support passing an allocator instance, and alternatives are complicated.
722 std::ostringstream os;
723 os << "### VarOpt SUMMARY:" << std::endl;
724 os << " k : " << k_ << std::endl;
725 os << " h : " << h_ << std::endl;
726 os << " r : " << r_ << std::endl;
727 os << " weight_r : " << total_wt_r_ << std::endl;
728 os << " Current size : " << curr_items_alloc_ << std::endl;
729 os << " Resize factor: " << (1 << rf_) << std::endl;
730 os << "### END SKETCH SUMMARY" << std::endl;
731 return string<A>(os.str().c_str(), allocator_);
732}
733
734template<typename T, typename A>
736 // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements.
737 // The stream does not support passing an allocator instance, and alternatives are complicated.
738 std::ostringstream os;
739 os << "### Sketch Items" << std::endl;
740 int idx = 0;
741 for (auto record : *this) {
742 os << idx << ": " << record.first << "\twt = " << record.second << std::endl;
743 ++idx;
744 }
745 return string<A>(os.str().c_str(), allocator_);
746}
747
748template<typename T, typename A>
749string<A> var_opt_sketch<T, A>::items_to_string(bool print_gap) const {
750 // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements.
751 // The stream does not support passing an allocator instance, and alternatives are complicated.
752 std::ostringstream os;
753 os << "### Sketch Items" << std::endl;
754 const uint32_t array_length = (n_ < k_ ? n_ : k_ + 1);
755 for (uint32_t i = 0, display_idx = 0; i < array_length; ++i) {
756 if (i == h_ && print_gap) {
757 os << display_idx << ": GAP" << std::endl;
758 ++display_idx;
759 } else {
760 os << display_idx << ": " << data_[i] << "\twt = ";
761 if (weights_[i] == -1.0) {
762 os << get_tau() << "\t(-1.0)" << std::endl;
763 } else {
764 os << weights_[i] << std::endl;
765 }
766 ++display_idx;
767 }
768 }
769 return string<A>(os.str().c_str(), allocator_);
770}
771
772template<typename T, typename A>
773template<typename O>
774void var_opt_sketch<T, A>::update(O&& item, double weight, bool mark) {
775 if (weight <= 0.0 || std::isnan(weight) || std::isinf(weight)) {
776 throw std::invalid_argument("Item weights must be positive and finite. Found: "
777 + std::to_string(weight));
778 }
779
780 ++n_;
781
782 if (r_ == 0) {
783 // exact mode
784 update_warmup_phase(std::forward<O>(item), weight, mark);
785 } else {
786 // sketch is in estimation mode so we can make the following check,
787 // although very conservative to check every time
788 if ((h_ != 0) && (peek_min() < get_tau()))
789 throw std::logic_error("sketch not in valid estimation mode");
790
791 // what tau would be if deletion candidates turn out to be R plus the new item
792 // note: (r_ + 1) - 1 is intentional
793 const double hypothetical_tau = (weight + total_wt_r_) / ((r_ + 1) - 1);
794
795 // is new item's turn to be considered for reservoir?
796 const double condition1 = (h_ == 0) || (weight <= peek_min());
797
798 // is new item light enough for reservoir?
799 const double condition2 = weight < hypothetical_tau;
800
801 if (condition1 && condition2) {
802 update_light(std::forward<O>(item), weight, mark);
803 } else if (r_ == 1) {
804 update_heavy_r_eq1(std::forward<O>(item), weight, mark);
805 } else {
806 update_heavy_general(std::forward<O>(item), weight, mark);
807 }
808 }
809}
810
811template<typename T, typename A>
812template<typename O>
813void var_opt_sketch<T, A>::update_warmup_phase(O&& item, double weight, bool mark) {
814 // seems overly cautious
815 if (r_ > 0 || m_ != 0 || h_ > k_) throw std::logic_error("invalid sketch state during warmup");
816
817 if (h_ >= curr_items_alloc_) {
818 grow_data_arrays();
819 }
820
821 // store items as they come in until full
822 new (&data_[h_]) T(std::forward<O>(item));
823 weights_[h_] = weight;
824 if (marks_ != nullptr) {
825 marks_[h_] = mark;
826 }
827 ++h_;
828 num_marks_in_h_ += mark ? 1 : 0;
829
830 // check if need to heapify
831 if (h_ > k_) {
832 filled_data_ = true;
833 transition_from_warmup();
834 }
835}
836
837/* In the "light" case the new item has weight <= old_tau, so
838 would appear to the right of the R items in a hypothetical reverse-sorted
839 list. It is easy to prove that it is light enough to be part of this
840 round's downsampling */
841template<typename T, typename A>
842template<typename O>
843void var_opt_sketch<T, A>::update_light(O&& item, double weight, bool mark) {
844 if (r_ == 0 || (r_ + h_) != k_) throw std::logic_error("invalid sketch state during light warmup");
845
846 const uint32_t m_slot = h_; // index of the gap, which becomes the M region
847 if (filled_data_) {
848 if (&data_[m_slot] != &item)
849 data_[m_slot] = std::forward<O>(item);
850 } else {
851 new (&data_[m_slot]) T(std::forward<O>(item));
852 filled_data_ = true;
853 }
854 weights_[m_slot] = weight;
855 if (marks_ != nullptr) { marks_[m_slot] = mark; }
856 ++m_;
857
858 grow_candidate_set(total_wt_r_ + weight, r_ + 1);
859}
860
861/* In the "heavy" case the new item has weight > old_tau, so would
862 appear to the left of items in R in a hypothetical reverse-sorted list and
863 might or might not be light enough be part of this round's downsampling.
864 [After first splitting off the R=1 case] we greatly simplify the code by
865 putting the new item into the H heap whether it needs to be there or not.
866 In other words, it might go into the heap and then come right back out,
867 but that should be okay because pseudo_heavy items cannot predominate
868 in long streams unless (max wt) / (min wt) > o(exp(N)) */
869template<typename T, typename A>
870template<typename O>
871void var_opt_sketch<T, A>::update_heavy_general(O&& item, double weight, bool mark) {
872 if (r_ < 2 || m_ != 0 || (r_ + h_) != k_) throw std::logic_error("invalid sketch state during heavy general update");
873
874 // put into H, although may come back out momentarily
875 push(std::forward<O>(item), weight, mark);
876
877 grow_candidate_set(total_wt_r_, r_);
878}
879
880/* The analysis of this case is similar to that of the general heavy case.
881 The one small technical difference is that since R < 2, we must grab an M item
882 to have a valid starting point for continue_by_growing_candidate_set () */
883template<typename T, typename A>
884template<typename O>
885void var_opt_sketch<T, A>::update_heavy_r_eq1(O&& item, double weight, bool mark) {
886 if (r_ != 1 || m_ != 0 || (r_ + h_) != k_) throw std::logic_error("invalid sketch state during heavy r=1 update");
887
888 push(std::forward<O>(item), weight, mark); // new item into H
889 pop_min_to_m_region(); // pop lightest back into M
890
891 // Any set of two items is downsample-able to one item,
892 // so the two lightest items are a valid starting point for the following
893 const uint32_t m_slot = k_ - 1; // array is k+1, 1 in R, so slot before is M
894 grow_candidate_set(weights_[m_slot] + total_wt_r_, 2);
895}
896
897/*
898 * Decreases sketch's value of k by 1, updating stored values as needed.
899 *
900 * <p>Subject to certain pre-conditions, decreasing k causes tau to increase. This fact is used by
901 * the unioning algorithm to force "marked" items out of H and into the reservoir region.</p>
902 */
903template<typename T, typename A>
904void var_opt_sketch<T, A>::decrease_k_by_1() {
905 if (k_ <= 1) {
906 throw std::logic_error("Cannot decrease k below 1 in union");
907 }
908
909 if ((h_ == 0) && (r_ == 0)) {
910 // exact mode, but no data yet; this reduction is somewhat gratuitous
911 --k_;
912 } else if ((h_ > 0) && (r_ == 0)) {
913 // exact mode, but we have some data
914 --k_;
915 if (h_ > k_) {
916 transition_from_warmup();
917 }
918 } else if ((h_ > 0) && (r_ > 0)) {
919 // reservoir mode, but we have some exact samples.
920 // Our strategy will be to pull an item out of H (which we are allowed to do since it's
921 // still just data), reduce k, and then re-insert the item
922
923 // first, slide the R zone to the left by 1, temporarily filling the gap
924 const uint32_t old_gap_idx = h_;
925 const uint32_t old_final_r_idx = (h_ + 1 + r_) - 1;
926 if (old_final_r_idx != k_) throw std::logic_error("gadget in invalid state");
927
928 swap_values(old_final_r_idx, old_gap_idx);
929 filled_data_ = true; // we just filled the gap, and no need to check previous state
930
931 // now we pull an item out of H; any item is ok, but if we grab the rightmost and then
932 // reduce h_, the heap invariant will be preserved (and the gap will be restored), plus
933 // the push() of the item that will probably happen later will be cheap.
934
935 const uint32_t pulled_idx = h_ - 1;
936 double pulled_weight = weights_[pulled_idx];
937 bool pulled_mark = marks_[pulled_idx];
938 // will move the pulled item below; don't do antying to it here
939
940 if (pulled_mark) { --num_marks_in_h_; }
941 weights_[pulled_idx] = -1.0; // to make bugs easier to spot
942
943 --h_;
944 --k_;
945 --n_; // will be re-incremented with the update
946
947 update(std::move(data_[pulled_idx]), pulled_weight, pulled_mark);
948 } else if ((h_ == 0) && (r_ > 0)) {
949 // pure reservoir mode, so can simply eject a randomly chosen sample from the reservoir
950 if (r_ < 2) throw std::logic_error("r_ too small for pure reservoir mode");
951
952 const uint32_t r_idx_to_delete = 1 + next_int(r_); // 1 for the gap
953 const uint32_t rightmost_r_idx = (1 + r_) - 1;
954 swap_values(r_idx_to_delete, rightmost_r_idx);
955 weights_[rightmost_r_idx] = -1.0;
956
957 --k_;
958 --r_;
959 }
960}
961
962template<typename T, typename A>
963void var_opt_sketch<T, A>::allocate_data_arrays(uint32_t tgt_size, bool use_marks) {
964 filled_data_ = false;
965
966 data_ = allocator_.allocate(tgt_size);
967 weights_ = AllocDouble(allocator_).allocate(tgt_size);
968
969 if (use_marks) {
970 marks_ = AllocBool(allocator_).allocate(tgt_size);
971 } else {
972 marks_ = nullptr;
973 }
974}
975
976template<typename T, typename A>
977void var_opt_sketch<T, A>::grow_data_arrays() {
978 const uint32_t prev_size = curr_items_alloc_;
979 curr_items_alloc_ = get_adjusted_size(k_, curr_items_alloc_ << rf_);
980 if (curr_items_alloc_ == k_) {
981 ++curr_items_alloc_;
982 }
983
984 if (prev_size < curr_items_alloc_) {
985 filled_data_ = false;
986
987 T* tmp_data = allocator_.allocate(curr_items_alloc_);
988 double* tmp_weights = AllocDouble(allocator_).allocate(curr_items_alloc_);
989
990 for (uint32_t i = 0; i < prev_size; ++i) {
991 new (&tmp_data[i]) T(std::move(data_[i]));
992 data_[i].~T();
993 tmp_weights[i] = weights_[i];
994 }
995
996 allocator_.deallocate(data_, prev_size);
997 AllocDouble(allocator_).deallocate(weights_, prev_size);
998
999 data_ = tmp_data;
1000 weights_ = tmp_weights;
1001
1002 if (marks_ != nullptr) {
1003 bool* tmp_marks = AllocBool(allocator_).allocate(curr_items_alloc_);
1004 for (uint32_t i = 0; i < prev_size; ++i) {
1005 tmp_marks[i] = marks_[i];
1006 }
1007 AllocBool(allocator_).deallocate(marks_, prev_size);
1008 marks_ = tmp_marks;
1009 }
1010 }
1011}
1012
1013template<typename T, typename A>
1014void var_opt_sketch<T, A>::transition_from_warmup() {
1015 // Move the 2 lightest items from H to M
1016 // But the lighter really belongs in R, so update counts to reflect that
1017 convert_to_heap();
1018 pop_min_to_m_region();
1019 pop_min_to_m_region();
1020 --m_;
1021 ++r_;
1022
1023 if (h_ != (k_ -1) || m_ != 1 || r_ != 1)
1024 throw std::logic_error("invalid state for transitioning from warmup");
1025
1026 // Update total weight in R and then, having grabbed the value, overwrite
1027 // in weight_ array to help make bugs more obvious
1028 total_wt_r_ = weights_[k_]; // only one item, known location
1029 weights_[k_] = -1.0;
1030
1031 // The two lightest items are necessarily downsample-able to one item,
1032 // and are therefore a valid initial candidate set
1033 grow_candidate_set(weights_[k_ - 1] + total_wt_r_, 2);
1034}
1035
1036template<typename T, typename A>
1037void var_opt_sketch<T, A>::convert_to_heap() {
1038 if (h_ < 2) {
1039 return; // nothing to do
1040 }
1041
1042 const uint32_t last_slot = h_ - 1;
1043 const int last_non_leaf = ((last_slot + 1) / 2) - 1;
1044
1045 for (int j = last_non_leaf; j >= 0; --j) {
1046 restore_towards_leaves(j);
1047 }
1048
1049 // validates heap, used for initial debugging
1050 //for (uint32_t j = h_ - 1; j >= 1; --j) {
1051 // uint32_t p = ((j + 1) / 2) - 1;
1052 // if (weights_[p] > weights_[j]) throw std::logic_error("invalid heap");
1053 //}
1054}
1055
1056template<typename T, typename A>
1057void var_opt_sketch<T, A>::restore_towards_leaves(uint32_t slot_in) {
1058 const uint32_t last_slot = h_ - 1;
1059 if (h_ == 0 || slot_in > last_slot) throw std::logic_error("invalid heap state");
1060
1061 uint32_t slot = slot_in;
1062 uint32_t child = (2 * slot_in) + 1; // might be invalid, need to check
1063
1064 while (child <= last_slot) {
1065 uint32_t child2 = child + 1; // might also be invalid
1066 if ((child2 <= last_slot) && (weights_[child2] < weights_[child])) {
1067 // switch to other child if it's both valid and smaller
1068 child = child2;
1069 }
1070
1071 if (weights_[slot] <= weights_[child]) {
1072 // invariant holds so we're done
1073 break;
1074 }
1075
1076 // swap and continue
1077 swap_values(slot, child);
1078
1079 slot = child;
1080 child = (2 * slot) + 1; // might be invalid, checked on next loop
1081 }
1082}
1083
1084template<typename T, typename A>
1085void var_opt_sketch<T, A>::restore_towards_root(uint32_t slot_in) {
1086 uint32_t slot = slot_in;
1087 uint32_t p = (((slot + 1) / 2) - 1); // valid if slot >= 1
1088 while ((slot > 0) && (weights_[slot] < weights_[p])) {
1089 swap_values(slot, p);
1090 slot = p;
1091 p = (((slot + 1) / 2) - 1); // valid if slot >= 1
1092 }
1093}
1094
1095template<typename T, typename A>
1096template<typename O>
1097void var_opt_sketch<T, A>::push(O&& item, double wt, bool mark) {
1098 if (filled_data_) {
1099 if (&data_[h_] != &item)
1100 data_[h_] = std::forward<O>(item);
1101 } else {
1102 new (&data_[h_]) T(std::forward<O>(item));
1103 filled_data_ = true;
1104 }
1105 weights_[h_] = wt;
1106 if (marks_ != nullptr) {
1107 marks_[h_] = mark;
1108 num_marks_in_h_ += (mark ? 1 : 0);
1109 }
1110 ++h_;
1111
1112 restore_towards_root(h_ - 1); // need use old h_, but want accurate h_
1113}
1114
1115template<typename T, typename A>
1116void var_opt_sketch<T, A>::pop_min_to_m_region() {
1117 if (h_ == 0 || (h_ + m_ + r_ != k_ + 1))
1118 throw std::logic_error("invalid heap state popping min to M region");
1119
1120 if (h_ == 1) {
1121 // just update bookkeeping
1122 ++m_;
1123 --h_;
1124 } else {
1125 // main case
1126 uint32_t tgt = h_ - 1; // last slot, will swap with root
1127 swap_values(0, tgt);
1128 ++m_;
1129 --h_;
1130
1131 restore_towards_leaves(0);
1132 }
1133
1134 if (is_marked(h_)) {
1135 --num_marks_in_h_;
1136 }
1137}
1138
1139
1140template<typename T, typename A>
1141void var_opt_sketch<T, A>::swap_values(uint32_t src, uint32_t dst) {
1142 std::swap(data_[src], data_[dst]);
1143 std::swap(weights_[src], weights_[dst]);
1144
1145 if (marks_ != nullptr) {
1146 std::swap(marks_[src], marks_[dst]);
1147 }
1148}
1149
1150/* When entering here we should be in a well-characterized state where the
1151 new item has been placed in either h or m and we have a valid but not necessarily
1152 maximal sampling plan figured out. The array is completely full at this point.
1153 Everyone in h and m has an explicit weight. The candidates are right-justified
1154 and are either just the r set or the r set + exactly one m item. The number
1155 of cands is at least 2. We will now grow the candidate set as much as possible
1156 by pulling sufficiently light items from h to m.
1157*/
1158template<typename T, typename A>
1159void var_opt_sketch<T, A>::grow_candidate_set(double wt_cands, uint32_t num_cands) {
1160 if ((h_ + m_ + r_ != k_ + 1) || (num_cands < 1) || (num_cands != m_ + r_) || (m_ >= 2))
1161 throw std::logic_error("invariant violated when growing candidate set");
1162
1163 while (h_ > 0) {
1164 const double next_wt = peek_min();
1165 const double next_tot_wt = wt_cands + next_wt;
1166
1167 // test for strict lightness of next prospect (denominator multiplied through)
1168 // ideally: (next_wt * (next_num_cands-1) < next_tot_wt)
1169 // but can use num_cands directly
1170 if ((next_wt * num_cands) < next_tot_wt) {
1171 wt_cands = next_tot_wt;
1172 ++num_cands;
1173 pop_min_to_m_region(); // adjusts h_ and m_
1174 } else {
1175 break;
1176 }
1177 }
1178
1179 downsample_candidate_set(wt_cands, num_cands);
1180}
1181
1182template<typename T, typename A>
1183void var_opt_sketch<T, A>::downsample_candidate_set(double wt_cands, uint32_t num_cands) {
1184 if (num_cands < 2 || h_ + num_cands != k_ + 1)
1185 throw std::logic_error("invalid num_cands when downsampling");
1186
1187 // need this before overwriting anything
1188 const uint32_t delete_slot = choose_delete_slot(wt_cands, num_cands);
1189 const uint32_t leftmost_cand_slot = h_;
1190 if (delete_slot < leftmost_cand_slot || delete_slot > k_)
1191 throw std::logic_error("invalid delete slot index when downsampling");
1192
1193 // Overwrite weights for items from M moving into R,
1194 // to make bugs more obvious. Also needed so anyone reading the
1195 // weight knows if it's invalid without checking h_ and m_
1196 const uint32_t stop_idx = leftmost_cand_slot + m_;
1197 for (uint32_t j = leftmost_cand_slot; j < stop_idx; ++j) {
1198 weights_[j] = -1.0;
1199 }
1200
1201 // The next line works even when delete_slot == leftmost_cand_slot
1202 data_[delete_slot] = std::move(data_[leftmost_cand_slot]);
1203
1204 m_ = 0;
1205 r_ = num_cands - 1;
1206 total_wt_r_ = wt_cands;
1207}
1208
1209template<typename T, typename A>
1210uint32_t var_opt_sketch<T, A>::choose_delete_slot(double wt_cands, uint32_t num_cands) const {
1211 if (r_ == 0) throw std::logic_error("choosing delete slot while in exact mode");
1212
1213 if (m_ == 0) {
1214 // this happens if we insert a really heavy item
1215 return pick_random_slot_in_r();
1216 } else if (m_ == 1) {
1217 // check if we keep th item in M or pick oen from R
1218 // p(keep) = (num_cand - 1) * wt_M / wt_cand
1219 double wt_m_cand = weights_[h_]; // slot of item in M is h_
1220 if ((wt_cands * next_double_exclude_zero()) < ((num_cands - 1) * wt_m_cand)) {
1221 return pick_random_slot_in_r(); // keep item in M
1222 } else {
1223 return h_; // index of item in M
1224 }
1225 } else {
1226 // general case
1227 const uint32_t delete_slot = choose_weighted_delete_slot(wt_cands, num_cands);
1228 const uint32_t first_r_slot = h_ + m_;
1229 if (delete_slot == first_r_slot) {
1230 return pick_random_slot_in_r();
1231 } else {
1232 return delete_slot;
1233 }
1234 }
1235}
1236
1237template<typename T, typename A>
1238uint32_t var_opt_sketch<T, A>::choose_weighted_delete_slot(double wt_cands, uint32_t num_cands) const {
1239 if (m_ < 1) throw std::logic_error("must have weighted delete slot");
1240
1241 const uint32_t offset = h_;
1242 const uint32_t final_m = (offset + m_) - 1;
1243 const uint32_t num_to_keep = num_cands - 1;
1244
1245 double left_subtotal = 0.0;
1246 double right_subtotal = -1.0 * wt_cands * next_double_exclude_zero();
1247
1248 for (uint32_t i = offset; i <= final_m; ++i) {
1249 left_subtotal += num_to_keep * weights_[i];
1250 right_subtotal += wt_cands;
1251
1252 if (left_subtotal < right_subtotal) {
1253 return i;
1254 }
1255 }
1256
1257 // this slot tells caller that we need to delete out of R
1258 return final_m + 1;
1259}
1260
1261template<typename T, typename A>
1262uint32_t var_opt_sketch<T, A>::pick_random_slot_in_r() const {
1263 if (r_ == 0) throw std::logic_error("r_ = 0 when picking slot in R region");
1264 const uint32_t offset = h_ + m_;
1265 if (r_ == 1) {
1266 return offset;
1267 } else {
1268 return offset + next_int(r_);
1269 }
1270}
1271
1272template<typename T, typename A>
1273double var_opt_sketch<T, A>::peek_min() const {
1274 if (h_ == 0) throw std::logic_error("h_ = 0 when checking min in H region");
1275 return weights_[0];
1276}
1277
1278template<typename T, typename A>
1279inline bool var_opt_sketch<T, A>::is_marked(uint32_t idx) const {
1280 return marks_ == nullptr ? false : marks_[idx];
1281}
1282
1283template<typename T, typename A>
1284double var_opt_sketch<T, A>::get_tau() const {
1285 return r_ == 0 ? std::nan("1") : (total_wt_r_ / r_);
1286}
1287
1288template<typename T, typename A>
1289void var_opt_sketch<T, A>::strip_marks() {
1290 if (marks_ == nullptr) throw std::logic_error("request to strip marks from non-gadget");
1291 num_marks_in_h_ = 0;
1292 AllocBool(allocator_).deallocate(marks_, curr_items_alloc_);
1293 marks_ = nullptr;
1294}
1295
1296template<typename T, typename A>
1297void var_opt_sketch<T, A>::check_preamble_longs(uint8_t preamble_longs, uint8_t flags) {
1298 const bool is_empty(flags & EMPTY_FLAG_MASK);
1299
1300 if (is_empty) {
1301 if (preamble_longs != PREAMBLE_LONGS_EMPTY) {
1302 throw std::invalid_argument("Possible corruption: Preamble longs must be "
1303 + std::to_string(PREAMBLE_LONGS_EMPTY) + " for an empty sketch. Found: "
1304 + std::to_string(preamble_longs));
1305 }
1306 } else {
1307 if (preamble_longs != PREAMBLE_LONGS_WARMUP
1308 && preamble_longs != PREAMBLE_LONGS_FULL) {
1309 throw std::invalid_argument("Possible corruption: Preamble longs must be "
1310 + std::to_string(PREAMBLE_LONGS_WARMUP) + " or "
1311 + std::to_string(PREAMBLE_LONGS_FULL)
1312 + " for a non-empty sketch. Found: " + std::to_string(preamble_longs));
1313 }
1314 }
1315}
1316
1317template<typename T, typename A>
1318void var_opt_sketch<T, A>::check_family_and_serialization_version(uint8_t family_id, uint8_t ser_ver) {
1319 if (family_id == FAMILY_ID) {
1320 if (ser_ver != SER_VER) {
1321 throw std::invalid_argument("Possible corruption: VarOpt serialization version must be "
1322 + std::to_string(SER_VER) + ". Found: " + std::to_string(ser_ver));
1323 }
1324 return;
1325 }
1326 // TODO: extend to handle reservoir sampling
1327
1328 throw std::invalid_argument("Possible corruption: VarOpt family id must be "
1329 + std::to_string(FAMILY_ID) + ". Found: " + std::to_string(family_id));
1330}
1331
1332template<typename T, typename A>
1333uint32_t var_opt_sketch<T, A>::validate_and_get_target_size(uint32_t preamble_longs, uint32_t k, uint64_t n,
1334 uint32_t h, uint32_t r, resize_factor rf) {
1335 if (k == 0 || k > MAX_K) {
1336 throw std::invalid_argument("k must be at least 1 and less than 2^31 - 1");
1337 }
1338
1339 uint32_t array_size;
1340
1341 if (n <= k) {
1342 if (preamble_longs != PREAMBLE_LONGS_WARMUP) {
1343 throw std::invalid_argument("Possible corruption: deserializing with n <= k but not in warmup mode. "
1344 "Found n = " + std::to_string(n) + ", k = " + std::to_string(k));
1345 }
1346 if (n != h) {
1347 throw std::invalid_argument("Possible corruption: deserializing in warmup mode but n != h. "
1348 "Found n = " + std::to_string(n) + ", h = " + std::to_string(h));
1349 }
1350 if (r > 0) {
1351 throw std::invalid_argument("Possible corruption: deserializing in warmup mode but r > 0. "
1352 "Found r = " + std::to_string(r));
1353 }
1354
1355 const uint32_t ceiling_lg_k = to_log_2(ceiling_power_of_2(k));
1356 const uint32_t min_lg_size = to_log_2(ceiling_power_of_2(h));
1357 const uint32_t initial_lg_size = starting_sub_multiple(ceiling_lg_k, rf, min_lg_size);
1358 array_size = get_adjusted_size(k, 1 << initial_lg_size);
1359 if (array_size == k) { // if full size, need to leave 1 for the gap
1360 ++array_size;
1361 }
1362 } else { // n > k
1363 if (preamble_longs != PREAMBLE_LONGS_FULL) {
1364 throw std::invalid_argument("Possible corruption: deserializing with n > k but not in full mode. "
1365 "Found n = " + std::to_string(n) + ", k = " + std::to_string(k));
1366 }
1367 if (h + r != k) {
1368 throw std::invalid_argument("Possible corruption: deserializing in full mode but h + r != n. "
1369 "Found h = " + std::to_string(h) + ", r = " + std::to_string(r) + ", n = " + std::to_string(n));
1370 }
1371
1372 array_size = k + 1;
1373 }
1374
1375 return array_size;
1376}
1377
1378template<typename T, typename A>
1379template<typename P>
1380subset_summary var_opt_sketch<T, A>::estimate_subset_sum(P predicate) const {
1381 if (n_ == 0) {
1382 return {0.0, 0.0, 0.0, 0.0};
1383 }
1384
1385 double total_wt_h = 0.0;
1386 double h_true_wt = 0.0;
1387 size_t idx = 0;
1388 for (; idx < h_; ++idx) {
1389 double wt = weights_[idx];
1390 total_wt_h += wt;
1391 if (predicate(data_[idx])) {
1392 h_true_wt += wt;
1393 }
1394 }
1395
1396 // if only heavy items, we have an exact answer
1397 if (r_ == 0) {
1398 return {h_true_wt, h_true_wt, h_true_wt, h_true_wt};
1399 }
1400
1401 // since r_ > 0, we know we have samples
1402 const uint64_t num_samples = n_ - h_;
1403 double effective_sampling_rate = r_ / static_cast<double>(num_samples);
1404 if (effective_sampling_rate < 0.0 || effective_sampling_rate > 1.0)
1405 throw std::logic_error("invalid sampling rate outside [0.0, 1.0]");
1406
1407 uint32_t r_true_count = 0;
1408 ++idx; // skip the gap
1409 for (; idx < (k_ + 1); ++idx) {
1410 if (predicate(data_[idx])) {
1411 ++r_true_count;
1412 }
1413 }
1414
1415 double lb_true_fraction = pseudo_hypergeometric_lb_on_p(r_, r_true_count, effective_sampling_rate);
1416 double estimated_true_fraction = (1.0 * r_true_count) / r_;
1417 double ub_true_fraction = pseudo_hypergeometric_ub_on_p(r_, r_true_count, effective_sampling_rate);
1418
1419 return { h_true_wt + (total_wt_r_ * lb_true_fraction),
1420 h_true_wt + (total_wt_r_ * estimated_true_fraction),
1421 h_true_wt + (total_wt_r_ * ub_true_fraction),
1422 total_wt_h + total_wt_r_
1423 };
1424}
1425
1426template<typename T, typename A>
1427class var_opt_sketch<T, A>::items_deleter {
1428 public:
1429 items_deleter(uint32_t num, const A& allocator) : num(num), h_count(0), r_count(0), allocator(allocator) {}
1430 void set_h(uint32_t h) { h_count = h; }
1431 void set_r(uint32_t r) { r_count = r; }
1432 void operator() (T* ptr) {
1433 if (h_count > 0) {
1434 for (size_t i = 0; i < h_count; ++i) {
1435 ptr[i].~T();
1436 }
1437 }
1438 if (r_count > 0) {
1439 uint32_t end = h_count + r_count + 1;
1440 for (size_t i = h_count + 1; i < end; ++i) {
1441 ptr[i].~T();
1442 }
1443 }
1444 if (ptr != nullptr) {
1445 allocator.deallocate(ptr, num);
1446 }
1447 }
1448 private:
1449 uint32_t num;
1450 uint32_t h_count;
1451 uint32_t r_count;
1452 A allocator;
1453};
1454
1455template<typename T, typename A>
1456class var_opt_sketch<T, A>::weights_deleter {
1457 public:
1458 weights_deleter(uint32_t num, const A& allocator) : num(num), allocator(allocator) {}
1459 void operator() (double* ptr) {
1460 if (ptr != nullptr) {
1461 allocator.deallocate(ptr, num);
1462 }
1463 }
1464 private:
1465 uint32_t num;
1466 AllocDouble allocator;
1467};
1468
1469template<typename T, typename A>
1470class var_opt_sketch<T, A>::marks_deleter {
1471 public:
1472 marks_deleter(uint32_t num, const A& allocator) : num(num), allocator(allocator) {}
1473 void operator() (bool* ptr) {
1474 if (ptr != nullptr) {
1475 allocator.deallocate(ptr, 1);
1476 }
1477 }
1478 private:
1479 uint32_t num;
1480 AllocBool allocator;
1481};
1482
1483
1484template<typename T, typename A>
1486 return const_iterator(*this, false);
1487}
1488
1489template<typename T, typename A>
1491 return const_iterator(*this, true);
1492}
1493
1494// -------- var_opt_sketch::const_iterator implementation ---------
1495
1496template<typename T, typename A>
1498 sk_(&sk),
1499 cum_r_weight_(0.0),
1500 r_item_wt_(sk.get_tau()),
1501 final_idx_(sk.r_ > 0 ? sk.h_ + sk.r_ + 1 : sk.h_)
1502{
1503 // index logic easier to read if not inline
1504 if (is_end) {
1505 idx_ = final_idx_;
1506 sk_ = nullptr;
1507 } else {
1508 idx_ = (sk.h_ == 0 && sk.r_ > 0 ? 1 : 0); // skip if gap is at start
1509 }
1510
1511 // should only apply if sketch is empty
1512 if (idx_ == final_idx_) { sk_ = nullptr; }
1513}
1514
1515template<typename T, typename A>
1516var_opt_sketch<T, A>::const_iterator::const_iterator(const var_opt_sketch& sk, bool is_end, bool use_r_region) :
1517 sk_(&sk),
1518 cum_r_weight_(0.0),
1519 r_item_wt_(sk.get_tau()),
1520 final_idx_(sk.h_ + (use_r_region ? 1 + sk.r_ : 0))
1521{
1522 if (use_r_region) {
1523 idx_ = sk.h_ + 1 + (is_end ? sk.r_ : 0);
1524 } else { // H region
1525 // gap at start only if h_ == 0, so index always starts at 0
1526 idx_ = (is_end ? sk.h_ : 0);
1527 }
1528
1529 // unlike in full iterator case, may happen even if sketch is not empty
1530 if (idx_ == final_idx_) { sk_ = nullptr; }
1531}
1532
1533template<typename T, typename A>
1534var_opt_sketch<T, A>::const_iterator::const_iterator(const const_iterator& other) :
1535 sk_(other.sk_),
1536 cum_r_weight_(other.cum_r_weight_),
1537 r_item_wt_(other.r_item_wt_),
1538 idx_(other.idx_),
1539 final_idx_(other.final_idx_)
1540{}
1541
1542template<typename T, typename A>
1543typename var_opt_sketch<T, A>::const_iterator& var_opt_sketch<T, A>::const_iterator::operator++() {
1544 // accumulate weight already visited
1545 if (idx_ > sk_->h_) { cum_r_weight_ += r_item_wt_; }
1546
1547 ++idx_;
1548
1549 if (idx_ == final_idx_) {
1550 sk_ = nullptr;
1551 return *this;
1552 } else if (idx_ == sk_->h_ && sk_->r_ > 0) { // check for the gap
1553 ++idx_;
1554 }
1555 return *this;
1556}
1557
1558template<typename T, typename A>
1559typename var_opt_sketch<T, A>::const_iterator& var_opt_sketch<T, A>::const_iterator::operator++(int) {
1560 const_iterator tmp(*this);
1561 operator++();
1562 return tmp;
1563}
1564
1565template<typename T, typename A>
1566bool var_opt_sketch<T, A>::const_iterator::operator==(const const_iterator& other) const {
1567 if (sk_ != other.sk_) return false;
1568 if (sk_ == nullptr) return true; // end (and we know other.sk_ is also null)
1569 return idx_ == other.idx_;
1570}
1571
1572template<typename T, typename A>
1573bool var_opt_sketch<T, A>::const_iterator::operator!=(const const_iterator& other) const {
1574 return !operator==(other);
1575}
1576
1577template<typename T, typename A>
1578auto var_opt_sketch<T, A>::const_iterator::operator*() const -> reference {
1579 double wt;
1580 if (idx_ < sk_->h_) {
1581 wt = sk_->weights_[idx_];
1582 } else {
1583 wt = r_item_wt_;
1584 }
1585 return value_type(sk_->data_[idx_], wt);
1586}
1587
1588template<typename T, typename A>
1589auto var_opt_sketch<T, A>::const_iterator::operator->() const -> pointer {
1590 return **this;
1591}
1592
1593template<typename T, typename A>
1594bool var_opt_sketch<T, A>::const_iterator::get_mark() const {
1595 return sk_->marks_ == nullptr ? false : sk_->marks_[idx_];
1596}
1597
1598
1599// -------- var_opt_sketch::iterator implementation ---------
1600
1601template<typename T, typename A>
1602var_opt_sketch<T, A>::iterator::iterator(const var_opt_sketch& sk, bool is_end, bool use_r_region) :
1603 sk_(&sk),
1604 cum_r_weight_(0.0),
1605 r_item_wt_(sk.get_tau()),
1606 final_idx_(sk.h_ + (use_r_region ? 1 + sk.r_ : 0))
1607{
1608 if (use_r_region) {
1609 idx_ = sk.h_ + 1 + (is_end ? sk.r_ : 0);
1610 } else { // H region
1611 // gap at start only if h_ == 0, so index always starts at 0
1612 idx_ = (is_end ? sk.h_ : 0);
1613 }
1614
1615 // unlike in full iterator case, may happen even if sketch is not empty
1616 if (idx_ == final_idx_) { sk_ = nullptr; }
1617}
1618
1619template<typename T, typename A>
1620var_opt_sketch<T, A>::iterator::iterator(const iterator& other) :
1621 sk_(other.sk_),
1622 cum_r_weight_(other.cum_r_weight_),
1623 r_item_wt_(other.r_item_wt_),
1624 idx_(other.idx_),
1625 final_idx_(other.final_idx_)
1626{}
1627
1628template<typename T, typename A>
1629typename var_opt_sketch<T, A>::iterator& var_opt_sketch<T, A>::iterator::operator++() {
1630 // accumulate weight already visited
1631 if (idx_ > sk_->h_) { cum_r_weight_ += r_item_wt_; }
1632
1633 ++idx_;
1634
1635 if (idx_ == final_idx_) {
1636 sk_ = nullptr;
1637 return *this;
1638 } else if (idx_ == sk_->h_ && sk_->r_ > 0) { // check for the gap
1639 ++idx_;
1640 }
1641
1642 return *this;
1643}
1644
1645template<typename T, typename A>
1646typename var_opt_sketch<T, A>::iterator& var_opt_sketch<T, A>::iterator::operator++(int) {
1647 const_iterator tmp(*this);
1648 operator++();
1649 return tmp;
1650}
1651
1652template<typename T, typename A>
1653bool var_opt_sketch<T, A>::iterator::operator==(const iterator& other) const {
1654 if (sk_ != other.sk_) return false;
1655 if (sk_ == nullptr) return true; // end (and we know other.sk_ is also null)
1656 return idx_ == other.idx_;
1657}
1658
1659template<typename T, typename A>
1660bool var_opt_sketch<T, A>::iterator::operator!=(const iterator& other) const {
1661 return !operator==(other);
1662}
1663
1664template<typename T, typename A>
1665auto var_opt_sketch<T, A>::iterator::operator*() -> reference {
1666 double wt;
1667 if (idx_ < sk_->h_) {
1668 wt = sk_->weights_[idx_];
1669 } else if (idx_ == final_idx_ - 1) {
1670 wt = sk_->total_wt_r_ - cum_r_weight_;
1671 } else {
1672 wt = r_item_wt_;
1673 }
1674 return value_type(sk_->data_[idx_], wt);
1675}
1676
1677template<typename T, typename A>
1678auto var_opt_sketch<T, A>::iterator::operator->() -> pointer {
1679 return **this;
1680}
1681
1682template<typename T, typename A>
1683bool var_opt_sketch<T, A>::iterator::get_mark() const {
1684 return sk_->marks_ == nullptr ? false : sk_->marks_[idx_];
1685}
1686
1687/*
1688 * Checks if target sampling allocation is more than 50% of max sampling size.
1689 * If so, returns max sampling size, otherwise passes through target size.
1690 */
1691template<typename T, typename A>
1692uint32_t var_opt_sketch<T, A>::get_adjusted_size(uint32_t max_size, uint32_t resize_target) {
1693 if (max_size < (resize_target << 1)) {
1694 return max_size;
1695 }
1696 return resize_target;
1697}
1698
1699template<typename T, typename A>
1700uint32_t var_opt_sketch<T, A>::starting_sub_multiple(uint32_t lg_target, uint32_t lg_rf, uint32_t lg_min) {
1701 return (lg_target <= lg_min)
1702 ? lg_min : (lg_rf == 0) ? lg_target
1703 : (lg_target - lg_min) % lg_rf + lg_min;
1704}
1705
1706template<typename T, typename A>
1707double var_opt_sketch<T, A>::pseudo_hypergeometric_ub_on_p(uint64_t n, uint32_t k, double sampling_rate) {
1708 const double adjusted_kappa = DEFAULT_KAPPA * sqrt(1 - sampling_rate);
1710}
1711
1712template<typename T, typename A>
1713double var_opt_sketch<T, A>::pseudo_hypergeometric_lb_on_p(uint64_t n, uint32_t k, double sampling_rate) {
1714 const double adjusted_kappa = DEFAULT_KAPPA * sqrt(1 - sampling_rate);
1716}
1717
1718template<typename T, typename A>
1719bool var_opt_sketch<T, A>::is_power_of_2(uint32_t v) {
1720 return v && !(v & (v - 1));
1721}
1722
1723template<typename T, typename A>
1724uint32_t var_opt_sketch<T, A>::to_log_2(uint32_t v) {
1725 if (is_power_of_2(v)) {
1726 return count_trailing_zeros_in_u32(v);
1727 } else {
1728 throw std::invalid_argument("Attempt to compute integer log2 of non-positive or non-power of 2");
1729 }
1730}
1731
1732// Returns an integer in the range [0, max_value) -- excludes max_value
1733template<typename T, typename A>
1734uint32_t var_opt_sketch<T, A>::next_int(uint32_t max_value) {
1735 std::uniform_int_distribution<uint32_t> dist(0, max_value - 1);
1736 return dist(random_utils::rand);
1737}
1738
1739template<typename T, typename A>
1740double var_opt_sketch<T, A>::next_double_exclude_zero() {
1741 double r = random_utils::next_double(random_utils::rand);
1742 while (r == 0.0) {
1743 r = random_utils::next_double(random_utils::rand);
1744 }
1745 return r;
1746}
1747
1748}
1749
1750// namespace datasketches
1751
1752#endif // _VAR_OPT_SKETCH_IMPL_HPP_
static double approximate_upper_bound_on_p(uint64_t n, uint64_t k, double num_std_devs)
Computes upper bound of approximate Clopper-Pearson confidence interval for a binomial proportion.
Definition bounds_binomial_proportions.hpp:148
static double approximate_lower_bound_on_p(uint64_t n, uint64_t k, double num_std_devs)
Computes lower bound of approximate Clopper-Pearson confidence interval for a binomial proportion.
Definition bounds_binomial_proportions.hpp:113
This sketch samples data from a stream of items.
Definition var_opt_sketch.hpp:72
var_opt_sketch(uint32_t k, resize_factor rf=var_opt_constants::DEFAULT_RESIZE_FACTOR, const A &allocator=A())
Constructor.
Definition var_opt_sketch_impl.hpp:46
DataSketches namespace.
Definition binomial_bounds.hpp:38