20 #ifndef _EBPPS_SKETCH_IMPL_HPP_
21 #define _EBPPS_SKETCH_IMPL_HPP_
31 #include "ebpps_sketch.hpp"
35 template<
typename T,
typename A>
37 allocator_(allocator),
43 sample_(check_k(k), allocator),
47 template<
typename T,
typename A>
49 double wt_max,
double rho,
50 ebpps_sample<T,A>&& sample,
const A& allocator) :
51 allocator_(allocator),
54 cumulative_wt_(cumulative_wt),
61 template<
typename T,
typename A>
66 template<
typename T,
typename A>
71 template<
typename T,
typename A>
73 return sample_.get_c();
76 template<
typename T,
typename A>
78 return cumulative_wt_;
81 template<
typename T,
typename A>
86 template<
typename T,
typename A>
95 template<
typename T,
typename A>
99 std::ostringstream os;
100 os <<
"### EBPPS Sketch SUMMARY:" << std::endl;
101 os <<
" k : " << k_ << std::endl;
102 os <<
" n : " << n_ << std::endl;
103 os <<
" cum. weight : " << cumulative_wt_ << std::endl;
104 os <<
" wt_mac : " << wt_max_ << std::endl;
105 os <<
" rho : " << rho_ << std::endl;
106 os <<
" C : " << sample_.get_c() << std::endl;
107 os <<
"### END SKETCH SUMMARY" << std::endl;
108 return string<A>(os.str().c_str(), allocator_);
111 template<
typename T,
typename A>
115 std::ostringstream os;
116 os <<
"### Sketch Items" << std::endl;
117 os << sample_.to_string();
118 return string<A>(os.str().c_str(), allocator_);
121 template<
typename T,
typename A>
126 template<
typename T,
typename A>
128 return internal_update(item, weight);
131 template<
typename T,
typename A>
133 return internal_update(std::move(item), weight);
136 template<
typename T,
typename A>
137 template<
typename FwdItem>
139 if (weight < 0.0 || std::isnan(weight) || std::isinf(weight)) {
140 throw std::invalid_argument(
"Item weights must be nonnegative and finite. Found: "
141 + std::to_string(weight));
142 }
else if (weight == 0.0) {
146 const double new_cum_wt = cumulative_wt_ + weight;
147 const double new_wt_max = std::max(wt_max_, weight);
148 const double new_rho = std::min(1.0 / new_wt_max, k_ / new_cum_wt);
150 if (cumulative_wt_ > 0.0)
151 sample_.downsample(new_rho / rho_);
153 tmp_.replace_content(conditional_forward<FwdItem>(item), new_rho * weight);
156 cumulative_wt_ = new_cum_wt;
157 wt_max_ = new_wt_max;
162 template<
typename T,
typename A>
164 return sample_.get_sample();
187 template<
typename T,
typename A>
189 if (sk.get_cumulative_weight() == 0.0)
return;
190 else if (sk.get_cumulative_weight() > get_cumulative_weight()) {
192 std::swap(*
this, sk);
198 template<
typename T,
typename A>
205 swap(*
this, sk_copy);
206 internal_merge(sk_copy);
212 template<
typename T,
typename A>
218 const ebpps_sample<T,A>& other_sample = sk.sample_;
220 const double final_cum_wt = cumulative_wt_ + sk.cumulative_wt_;
221 const double new_wt_max = std::max(wt_max_, sk.wt_max_);
222 k_ = std::min(k_, sk.k_);
223 const uint64_t new_n = n_ + sk.n_;
234 const double avg_wt = sk.get_cumulative_weight() / sk.get_c();
235 auto items = other_sample.get_full_items();
236 for (
size_t i = 0; i < items.size(); ++i) {
238 const double new_cum_wt = cumulative_wt_ + avg_wt;
239 const double new_rho = std::min(1.0 / new_wt_max, k_ / new_cum_wt);
241 if (cumulative_wt_ > 0.0)
242 sample_.downsample(new_rho / rho_);
244 tmp_.replace_content(conditional_forward<O>(items[i]), new_rho * avg_wt);
247 cumulative_wt_ = new_cum_wt;
252 if (other_sample.has_partial_item()) {
254 const double other_c_frac = std::modf(other_sample.get_c(), &unused);
256 const double new_cum_wt = cumulative_wt_ + (other_c_frac * avg_wt);
257 const double new_rho = std::min(1.0 / new_wt_max, k_ / new_cum_wt);
259 if (cumulative_wt_ > 0.0)
260 sample_.downsample(new_rho / rho_);
262 tmp_.replace_content(conditional_forward<O>(other_sample.get_partial_item()), new_rho * other_c_frac * avg_wt);
265 cumulative_wt_ = new_cum_wt;
271 cumulative_wt_ = final_cum_wt;
318 template<
typename T,
typename A>
319 template<
typename SerDe>
321 if (is_empty()) {
return PREAMBLE_LONGS_EMPTY << 3; }
322 return (PREAMBLE_LONGS_FULL << 3) + sample_.get_serialized_size_bytes(sd);
325 template<
typename T,
typename A>
326 template<
typename SerDe>
328 const uint8_t prelongs = (is_empty() ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_FULL);
330 const size_t size = header_size_bytes + (prelongs << 3) + sample_.get_serialized_size_bytes(sd);
331 vector_bytes bytes(size, 0, allocator_);
332 uint8_t* ptr = bytes.data() + header_size_bytes;
333 const uint8_t* end_ptr = ptr + size;
337 flags |= EMPTY_FLAG_MASK;
339 flags |= sample_.has_partial_item() ? HAS_PARTIAL_ITEM_MASK : 0;
343 const uint8_t ser_ver = SER_VER;
344 const uint8_t family = FAMILY_ID;
345 ptr += copy_to_mem(prelongs, ptr);
346 ptr += copy_to_mem(ser_ver, ptr);
347 ptr += copy_to_mem(family, ptr);
348 ptr += copy_to_mem(flags, ptr);
349 ptr += copy_to_mem(k_, ptr);
353 ptr += copy_to_mem(n_, ptr);
354 ptr += copy_to_mem(cumulative_wt_, ptr);
355 ptr += copy_to_mem(wt_max_, ptr);
356 ptr += copy_to_mem(rho_, ptr);
357 ptr += sample_.serialize(ptr, end_ptr, sd);
363 template<
typename T,
typename A>
364 template<
typename SerDe>
366 const uint8_t prelongs = (is_empty() ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_FULL);
370 flags |= EMPTY_FLAG_MASK;
372 flags |= sample_.has_partial_item() ? HAS_PARTIAL_ITEM_MASK : 0;
376 const uint8_t ser_ver = SER_VER;
377 const uint8_t family = FAMILY_ID;
387 write(os, cumulative_wt_);
390 sample_.serialize(os, sd);
393 if (!os.good())
throw std::runtime_error(
"error writing to std::ostream");
396 template<
typename T,
typename A>
397 template<
typename SerDe>
399 ensure_minimum_memory(size, 8);
400 const uint8_t* ptr =
static_cast<const uint8_t*
>(bytes);
401 const uint8_t* end_ptr = ptr + size;
403 ptr += copy_from_mem(ptr, prelongs);
404 uint8_t serial_version;
405 ptr += copy_from_mem(ptr, serial_version);
407 ptr += copy_from_mem(ptr, family_id);
409 ptr += copy_from_mem(ptr, flags);
411 ptr += copy_from_mem(ptr, k);
414 check_preamble_longs(prelongs, flags);
415 check_family_and_serialization_version(family_id, serial_version);
416 ensure_minimum_memory(size, prelongs << 3);
418 const bool empty = flags & EMPTY_FLAG_MASK;
423 ptr += copy_from_mem(ptr, n);
424 double cumulative_wt;
425 ptr += copy_from_mem(ptr, cumulative_wt);
427 ptr += copy_from_mem(ptr, wt_max);
429 ptr += copy_from_mem(ptr, rho);
431 auto pair = ebpps_sample<T, A>::deserialize(ptr, end_ptr - ptr, sd, allocator);
432 ebpps_sample<T, A> sample = pair.first;
435 if (sample.has_partial_item() !=
bool(flags & HAS_PARTIAL_ITEM_MASK))
436 throw std::runtime_error(
"sketch fails internal consistency check");
438 return ebpps_sketch(k, n, cumulative_wt, wt_max, rho, std::move(sample), allocator);
441 template<
typename T,
typename A>
442 template<
typename SerDe>
444 const uint8_t prelongs = read<uint8_t>(is);
445 const uint8_t ser_ver = read<uint8_t>(is);
446 const uint8_t family = read<uint8_t>(is);
447 const uint8_t flags = read<uint8_t>(is);
448 const uint32_t k = read<uint32_t>(is);
451 check_family_and_serialization_version(family, ser_ver);
452 check_preamble_longs(prelongs, flags);
454 const bool empty = (flags & EMPTY_FLAG_MASK);
457 return ebpps_sketch(k, allocator);
459 const uint64_t n = read<uint64_t>(is);
460 const double cumulative_wt = read<double>(is);
461 const double wt_max = read<double>(is);
462 const double rho = read<double>(is);
464 auto sample = ebpps_sample<T,A>::deserialize(is, sd, allocator);
466 if (sample.has_partial_item() !=
bool(flags & HAS_PARTIAL_ITEM_MASK))
467 throw std::runtime_error(
"sketch fails internal consistency check");
469 return ebpps_sketch(k, n, cumulative_wt, wt_max, rho, std::move(sample), allocator);
472 template <
typename T,
typename A>
473 inline uint32_t ebpps_sketch<T, A>::check_k(uint32_t k)
475 if (k == 0 || k > MAX_K)
476 throw std::invalid_argument(
"k must be strictly positive and less than " + std::to_string(MAX_K));
480 template<
typename T,
typename A>
481 void ebpps_sketch<T, A>::check_family_and_serialization_version(uint8_t family_id, uint8_t ser_ver) {
482 if (family_id == FAMILY_ID) {
483 if (ser_ver != SER_VER) {
484 throw std::invalid_argument(
"Possible corruption: EBPPS serialization version must be "
485 + std::to_string(SER_VER) +
". Found: " + std::to_string(ser_ver));
490 throw std::invalid_argument(
"Possible corruption: EBPPS Sketch family id must be "
491 + std::to_string(FAMILY_ID) +
". Found: " + std::to_string(family_id));
494 template <
typename T,
typename A>
495 void ebpps_sketch<T, A>::check_preamble_longs(uint8_t preamble_longs, uint8_t flags)
497 const bool is_empty(flags & EMPTY_FLAG_MASK);
500 if (preamble_longs != PREAMBLE_LONGS_EMPTY) {
501 throw std::invalid_argument(
"Possible corruption: Preamble longs must be "
502 + std::to_string(PREAMBLE_LONGS_EMPTY) +
" for an empty sketch. Found: "
503 + std::to_string(preamble_longs));
505 if (flags & HAS_PARTIAL_ITEM_MASK) {
506 throw std::invalid_argument(
"Possible corruption: Empty sketch must not "
507 "contain indications of the presence of any item");
510 if (preamble_longs != PREAMBLE_LONGS_FULL) {
511 throw std::invalid_argument(
"Possible corruption: Preamble longs must be "
512 + std::to_string(PREAMBLE_LONGS_FULL)
513 +
" for a non-empty sketch. Found: " + std::to_string(preamble_longs));
518 template<
typename T,
typename A>
520 return sample_.begin();
523 template<
typename T,
typename A>
525 return sample_.end();
An implementation of an Exact and Bounded Sampling Proportional to Size sketch.
Definition: ebpps_sketch.hpp:59
string< A > items_to_string() const
Prints the raw sketch items to a string.
Definition: ebpps_sketch_impl.hpp:112
void update(const T &item, double weight=1.0)
Updates this sketch with the given data item with the given weight.
Definition: ebpps_sketch_impl.hpp:127
ebpps_sketch(uint32_t k, const A &allocator=A())
Constructor.
Definition: ebpps_sketch_impl.hpp:36
vector_bytes serialize(unsigned header_size_bytes=0, const SerDe &sd=SerDe()) const
This method serializes the sketch as a vector of bytes.
ebpps_sample< T, A >::const_iterator end() const
Iterator pointing to the past-the-end item in the sketch.
Definition: ebpps_sketch_impl.hpp:524
bool is_empty() const
Returns true if the sketch is empty.
Definition: ebpps_sketch_impl.hpp:82
ebpps_sample< T, A >::const_iterator begin() const
Iterator pointing to the first item in the sketch.
Definition: ebpps_sketch_impl.hpp:519
size_t get_serialized_size_bytes(const SerDe &sd=SerDe()) const
Computes size needed to serialize the current state of the sketch.
Definition: ebpps_sketch_impl.hpp:320
double get_cumulative_weight() const
Returns the cumulative weight of items processed by the sketch.
Definition: ebpps_sketch_impl.hpp:77
A get_allocator() const
Returns an instance of the allocator for this sketch.
Definition: ebpps_sketch_impl.hpp:122
void merge(const ebpps_sketch< T, A > &sketch)
Merges the provided sketch into the current one.
Definition: ebpps_sketch_impl.hpp:199
static ebpps_sketch deserialize(const void *bytes, size_t size, const SerDe &sd=SerDe(), const A &allocator=A())
This method deserializes a sketch from a given array of bytes.
void reset()
Resets the sketch to its default, empty state.
Definition: ebpps_sketch_impl.hpp:87
double get_c() const
Returns the expected number of samples returned upon a call to get_result() or the creation of an ite...
Definition: ebpps_sketch_impl.hpp:72
string< A > to_string() const
Prints a summary of the sketch.
Definition: ebpps_sketch_impl.hpp:96
result_type get_result() const
Returns a copy of the current sample, as a std::vector.
Definition: ebpps_sketch_impl.hpp:163
uint32_t get_k() const
Returns the configured maximum sample size.
Definition: ebpps_sketch_impl.hpp:62
uint64_t get_n() const
Returns the number of items processed by the sketch, regardless of item weight.
Definition: ebpps_sketch_impl.hpp:67
DataSketches namespace.
Definition: binomial_bounds.hpp:38