20 #ifndef _EBPPS_SKETCH_IMPL_HPP_
21 #define _EBPPS_SKETCH_IMPL_HPP_
31 #include "ebpps_sketch.hpp"
35 template<
typename T,
typename A>
37 allocator_(allocator),
43 sample_(check_k(k), allocator)
46 template<
typename T,
typename A>
48 double wt_max,
double rho,
49 ebpps_sample<T,A>&& sample,
const A& allocator) :
50 allocator_(allocator),
53 cumulative_wt_(cumulative_wt),
59 template<
typename T,
typename A>
64 template<
typename T,
typename A>
69 template<
typename T,
typename A>
71 return sample_.get_c();
74 template<
typename T,
typename A>
76 return cumulative_wt_;
79 template<
typename T,
typename A>
84 template<
typename T,
typename A>
93 template<
typename T,
typename A>
97 std::ostringstream os;
98 os <<
"### EBPPS Sketch SUMMARY:" << std::endl;
99 os <<
" k : " << k_ << std::endl;
100 os <<
" n : " << n_ << std::endl;
101 os <<
" cum. weight : " << cumulative_wt_ << std::endl;
102 os <<
" wt_mac : " << wt_max_ << std::endl;
103 os <<
" rho : " << rho_ << std::endl;
104 os <<
" C : " << sample_.get_c() << std::endl;
105 os <<
"### END SKETCH SUMMARY" << std::endl;
106 return string<A>(os.str().c_str(), allocator_);
109 template<
typename T,
typename A>
113 std::ostringstream os;
114 os <<
"### Sketch Items" << std::endl;
115 os << sample_.to_string();
116 return string<A>(os.str().c_str(), allocator_);
119 template<
typename T,
typename A>
124 template<
typename T,
typename A>
126 return internal_update(item, weight);
129 template<
typename T,
typename A>
131 return internal_update(std::move(item), weight);
134 template<
typename T,
typename A>
135 template<
typename FwdItem>
137 if (weight < 0.0 || std::isnan(weight) || std::isinf(weight)) {
138 throw std::invalid_argument(
"Item weights must be nonnegative and finite. Found: "
139 + std::to_string(weight));
140 }
else if (weight == 0.0) {
144 const double new_cum_wt = cumulative_wt_ + weight;
145 const double new_wt_max = std::max(wt_max_, weight);
146 const double new_rho = std::min(1.0 / new_wt_max, k_ / new_cum_wt);
148 if (cumulative_wt_ > 0.0)
149 sample_.downsample(new_rho / rho_);
151 ebpps_sample<T,A> tmp(conditional_forward<FwdItem>(item), new_rho * weight, allocator_);
155 cumulative_wt_ = new_cum_wt;
156 wt_max_ = new_wt_max;
161 template<
typename T,
typename A>
163 return sample_.get_sample();
186 template<
typename T,
typename A>
188 if (sk.get_cumulative_weight() == 0.0)
return;
189 else if (sk.get_cumulative_weight() > get_cumulative_weight()) {
191 std::swap(*
this, sk);
197 template<
typename T,
typename A>
204 swap(*
this, sk_copy);
205 internal_merge(sk_copy);
211 template<
typename T,
typename A>
217 const ebpps_sample<T,A>& other_sample = sk.sample_;
219 const double final_cum_wt = cumulative_wt_ + sk.cumulative_wt_;
220 const double new_wt_max = std::max(wt_max_, sk.wt_max_);
221 k_ = std::min(k_, sk.k_);
222 const uint64_t new_n = n_ + sk.n_;
233 const double avg_wt = sk.get_cumulative_weight() / sk.get_c();
234 auto items = other_sample.get_full_items();
235 for (
size_t i = 0; i < items.size(); ++i) {
237 const double new_cum_wt = cumulative_wt_ + avg_wt;
238 const double new_rho = std::min(1.0 / new_wt_max, k_ / new_cum_wt);
240 if (cumulative_wt_ > 0.0)
241 sample_.downsample(new_rho / rho_);
243 ebpps_sample<T,A> tmp(conditional_forward<O>(items[i]), new_rho * avg_wt, allocator_);
247 cumulative_wt_ = new_cum_wt;
252 if (other_sample.has_partial_item()) {
254 const double other_c_frac = std::modf(other_sample.get_c(), &unused);
256 const double new_cum_wt = cumulative_wt_ + (other_c_frac * avg_wt);
257 const double new_rho = std::min(1.0 / new_wt_max, k_ / new_cum_wt);
259 if (cumulative_wt_ > 0.0)
260 sample_.downsample(new_rho / rho_);
262 ebpps_sample<T,A> tmp(conditional_forward<O>(other_sample.get_partial_item()), new_rho * other_c_frac * avg_wt, allocator_);
266 cumulative_wt_ = new_cum_wt;
272 cumulative_wt_ = final_cum_wt;
319 template<
typename T,
typename A>
320 template<
typename SerDe>
322 if (is_empty()) {
return PREAMBLE_LONGS_EMPTY << 3; }
323 return (PREAMBLE_LONGS_FULL << 3) + sample_.get_serialized_size_bytes(sd);
326 template<
typename T,
typename A>
327 template<
typename SerDe>
329 const uint8_t prelongs = (is_empty() ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_FULL);
331 const size_t size = header_size_bytes + (prelongs << 3) + sample_.get_serialized_size_bytes(sd);
332 vector_bytes bytes(size, 0, allocator_);
333 uint8_t* ptr = bytes.data() + header_size_bytes;
334 const uint8_t* end_ptr = ptr + size;
338 flags |= EMPTY_FLAG_MASK;
340 flags |= sample_.has_partial_item() ? HAS_PARTIAL_ITEM_MASK : 0;
344 const uint8_t ser_ver = SER_VER;
345 const uint8_t family = FAMILY_ID;
346 ptr += copy_to_mem(prelongs, ptr);
347 ptr += copy_to_mem(ser_ver, ptr);
348 ptr += copy_to_mem(family, ptr);
349 ptr += copy_to_mem(flags, ptr);
350 ptr += copy_to_mem(k_, ptr);
354 ptr += copy_to_mem(n_, ptr);
355 ptr += copy_to_mem(cumulative_wt_, ptr);
356 ptr += copy_to_mem(wt_max_, ptr);
357 ptr += copy_to_mem(rho_, ptr);
358 ptr += sample_.serialize(ptr, end_ptr, sd);
364 template<
typename T,
typename A>
365 template<
typename SerDe>
367 const uint8_t prelongs = (is_empty() ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_FULL);
371 flags |= EMPTY_FLAG_MASK;
373 flags |= sample_.has_partial_item() ? HAS_PARTIAL_ITEM_MASK : 0;
377 const uint8_t ser_ver = SER_VER;
378 const uint8_t family = FAMILY_ID;
388 write(os, cumulative_wt_);
391 sample_.serialize(os, sd);
394 if (!os.good())
throw std::runtime_error(
"error writing to std::ostream");
397 template<
typename T,
typename A>
398 template<
typename SerDe>
400 ensure_minimum_memory(size, 8);
401 const uint8_t* ptr =
static_cast<const uint8_t*
>(bytes);
402 const uint8_t* end_ptr = ptr + size;
404 ptr += copy_from_mem(ptr, prelongs);
405 uint8_t serial_version;
406 ptr += copy_from_mem(ptr, serial_version);
408 ptr += copy_from_mem(ptr, family_id);
410 ptr += copy_from_mem(ptr, flags);
412 ptr += copy_from_mem(ptr, k);
415 check_preamble_longs(prelongs, flags);
416 check_family_and_serialization_version(family_id, serial_version);
417 ensure_minimum_memory(size, prelongs << 3);
419 const bool empty = flags & EMPTY_FLAG_MASK;
424 ptr += copy_from_mem(ptr, n);
425 double cumulative_wt;
426 ptr += copy_from_mem(ptr, cumulative_wt);
428 ptr += copy_from_mem(ptr, wt_max);
430 ptr += copy_from_mem(ptr, rho);
432 auto pair = ebpps_sample<T, A>::deserialize(ptr, end_ptr - ptr, sd, allocator);
433 ebpps_sample<T, A> sample = pair.first;
436 if (sample.has_partial_item() !=
bool(flags & HAS_PARTIAL_ITEM_MASK))
437 throw std::runtime_error(
"sketch fails internal consistency check");
439 return ebpps_sketch(k, n, cumulative_wt, wt_max, rho, std::move(sample), allocator);
442 template<
typename T,
typename A>
443 template<
typename SerDe>
445 const uint8_t prelongs = read<uint8_t>(is);
446 const uint8_t ser_ver = read<uint8_t>(is);
447 const uint8_t family = read<uint8_t>(is);
448 const uint8_t flags = read<uint8_t>(is);
449 const uint32_t k = read<uint32_t>(is);
452 check_family_and_serialization_version(family, ser_ver);
453 check_preamble_longs(prelongs, flags);
455 const bool empty = (flags & EMPTY_FLAG_MASK);
458 return ebpps_sketch(k, allocator);
460 const uint64_t n = read<uint64_t>(is);
461 const double cumulative_wt = read<double>(is);
462 const double wt_max = read<double>(is);
463 const double rho = read<double>(is);
465 auto sample = ebpps_sample<T,A>::deserialize(is, sd, allocator);
467 if (sample.has_partial_item() !=
bool(flags & HAS_PARTIAL_ITEM_MASK))
468 throw std::runtime_error(
"sketch fails internal consistency check");
470 return ebpps_sketch(k, n, cumulative_wt, wt_max, rho, std::move(sample), allocator);
473 template <
typename T,
typename A>
474 inline uint32_t ebpps_sketch<T, A>::check_k(uint32_t k)
476 if (k == 0 || k > MAX_K)
477 throw std::invalid_argument(
"k must be strictly positive and less than " + std::to_string(MAX_K));
481 template<
typename T,
typename A>
482 void ebpps_sketch<T, A>::check_family_and_serialization_version(uint8_t family_id, uint8_t ser_ver) {
483 if (family_id == FAMILY_ID) {
484 if (ser_ver != SER_VER) {
485 throw std::invalid_argument(
"Possible corruption: EBPPS serialization version must be "
486 + std::to_string(SER_VER) +
". Found: " + std::to_string(ser_ver));
491 throw std::invalid_argument(
"Possible corruption: EBPPS Sketch family id must be "
492 + std::to_string(FAMILY_ID) +
". Found: " + std::to_string(family_id));
495 template <
typename T,
typename A>
496 void ebpps_sketch<T, A>::check_preamble_longs(uint8_t preamble_longs, uint8_t flags)
498 const bool is_empty(flags & EMPTY_FLAG_MASK);
501 if (preamble_longs != PREAMBLE_LONGS_EMPTY) {
502 throw std::invalid_argument(
"Possible corruption: Preamble longs must be "
503 + std::to_string(PREAMBLE_LONGS_EMPTY) +
" for an empty sketch. Found: "
504 + std::to_string(preamble_longs));
506 if (flags & HAS_PARTIAL_ITEM_MASK) {
507 throw std::invalid_argument(
"Possible corruption: Empty sketch must not "
508 "contain indications of the presence of any item");
511 if (preamble_longs != PREAMBLE_LONGS_FULL) {
512 throw std::invalid_argument(
"Possible corruption: Preamble longs must be "
513 + std::to_string(PREAMBLE_LONGS_FULL)
514 +
" for a non-empty sketch. Found: " + std::to_string(preamble_longs));
519 template<
typename T,
typename A>
521 return sample_.begin();
524 template<
typename T,
typename A>
526 return sample_.end();
An implementation of an Exact and Bounded Sampling Proportional to Size sketch.
Definition: ebpps_sketch.hpp:59
string< A > items_to_string() const
Prints the raw sketch items to a string.
Definition: ebpps_sketch_impl.hpp:110
void update(const T &item, double weight=1.0)
Updates this sketch with the given data item with the given weight.
Definition: ebpps_sketch_impl.hpp:125
ebpps_sketch(uint32_t k, const A &allocator=A())
Constructor.
Definition: ebpps_sketch_impl.hpp:36
vector_bytes serialize(unsigned header_size_bytes=0, const SerDe &sd=SerDe()) const
This method serializes the sketch as a vector of bytes.
ebpps_sample< T, A >::const_iterator end() const
Iterator pointing to the past-the-end item in the sketch.
Definition: ebpps_sketch_impl.hpp:525
bool is_empty() const
Returns true if the sketch is empty.
Definition: ebpps_sketch_impl.hpp:80
ebpps_sample< T, A >::const_iterator begin() const
Iterator pointing to the first item in the sketch.
Definition: ebpps_sketch_impl.hpp:520
size_t get_serialized_size_bytes(const SerDe &sd=SerDe()) const
Computes size needed to serialize the current state of the sketch.
Definition: ebpps_sketch_impl.hpp:321
double get_cumulative_weight() const
Returns the cumulative weight of items processed by the sketch.
Definition: ebpps_sketch_impl.hpp:75
A get_allocator() const
Returns an instance of the allocator for this sketch.
Definition: ebpps_sketch_impl.hpp:120
void merge(const ebpps_sketch< T, A > &sketch)
Merges the provided sketch into the current one.
Definition: ebpps_sketch_impl.hpp:198
static ebpps_sketch deserialize(const void *bytes, size_t size, const SerDe &sd=SerDe(), const A &allocator=A())
This method deserializes a sketch from a given array of bytes.
void reset()
Resets the sketch to its default, empty state.
Definition: ebpps_sketch_impl.hpp:85
double get_c() const
Returns the expected number of samples returned upon a call to get_result() or the creation of an ite...
Definition: ebpps_sketch_impl.hpp:70
string< A > to_string() const
Prints a summary of the sketch.
Definition: ebpps_sketch_impl.hpp:94
result_type get_result() const
Returns a copy of the current sample, as a std::vector.
Definition: ebpps_sketch_impl.hpp:162
uint32_t get_k() const
Returns the configured maximum sample size.
Definition: ebpps_sketch_impl.hpp:60
uint64_t get_n() const
Returns the number of items processed by the sketch, regardless of item weight.
Definition: ebpps_sketch_impl.hpp:65
DataSketches namespace.
Definition: binomial_bounds.hpp:38