datasketches-cpp
ebpps_sketch_impl.hpp
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef _EBPPS_SKETCH_IMPL_HPP_
21 #define _EBPPS_SKETCH_IMPL_HPP_
22 
23 #include <memory>
24 #include <sstream>
25 #include <cmath>
26 #include <random>
27 #include <algorithm>
28 #include <stdexcept>
29 #include <utility>
30 
31 #include "ebpps_sketch.hpp"
32 
33 namespace datasketches {
34 
35 template<typename T, typename A>
36 ebpps_sketch<T, A>::ebpps_sketch(uint32_t k, const A& allocator) :
37  allocator_(allocator),
38  k_(k),
39  n_(0),
40  cumulative_wt_(0.0),
41  wt_max_(0.0),
42  rho_(1.0),
43  sample_(check_k(k), allocator)
44  {}
45 
46 template<typename T, typename A>
47 ebpps_sketch<T,A>::ebpps_sketch(uint32_t k, uint64_t n, double cumulative_wt,
48  double wt_max, double rho,
49  ebpps_sample<T,A>&& sample, const A& allocator) :
50  allocator_(allocator),
51  k_(k),
52  n_(n),
53  cumulative_wt_(cumulative_wt),
54  wt_max_(wt_max),
55  rho_(rho),
56  sample_(sample)
57  {}
58 
59 template<typename T, typename A>
60 uint32_t ebpps_sketch<T, A>::get_k() const {
61  return k_;
62 }
63 
64 template<typename T, typename A>
65 uint64_t ebpps_sketch<T, A>::get_n() const {
66  return n_;
67 }
68 
69 template<typename T, typename A>
70 double ebpps_sketch<T, A>::get_c() const {
71  return sample_.get_c();
72 }
73 
74 template<typename T, typename A>
76  return cumulative_wt_;
77 }
78 
79 template<typename T, typename A>
81  return n_ == 0;
82 }
83 
84 template<typename T, typename A>
86  n_ = 0;
87  cumulative_wt_ = 0.0;
88  wt_max_ = 0.0;
89  rho_ = 1.0;
90  sample_.reset();
91 }
92 
93 template<typename T, typename A>
94 string<A> ebpps_sketch<T, A>::to_string() const {
95  // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements.
96  // The stream does not support passing an allocator instance, and alternatives are complicated.
97  std::ostringstream os;
98  os << "### EBPPS Sketch SUMMARY:" << std::endl;
99  os << " k : " << k_ << std::endl;
100  os << " n : " << n_ << std::endl;
101  os << " cum. weight : " << cumulative_wt_ << std::endl;
102  os << " wt_mac : " << wt_max_ << std::endl;
103  os << " rho : " << rho_ << std::endl;
104  os << " C : " << sample_.get_c() << std::endl;
105  os << "### END SKETCH SUMMARY" << std::endl;
106  return string<A>(os.str().c_str(), allocator_);
107 }
108 
109 template<typename T, typename A>
111  // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements.
112  // The stream does not support passing an allocator instance, and alternatives are complicated.
113  std::ostringstream os;
114  os << "### Sketch Items" << std::endl;
115  os << sample_.to_string(); // assumes std::endl at end
116  return string<A>(os.str().c_str(), allocator_);
117 }
118 
119 template<typename T, typename A>
121  return allocator_;
122 }
123 
124 template<typename T, typename A>
125 void ebpps_sketch<T, A>::update(const T& item, double weight) {
126  return internal_update(item, weight);
127 }
128 
129 template<typename T, typename A>
130 void ebpps_sketch<T, A>::update(T&& item, double weight) {
131  return internal_update(std::move(item), weight);
132 }
133 
134 template<typename T, typename A>
135 template<typename FwdItem>
136 void ebpps_sketch<T, A>::internal_update(FwdItem&& item, double weight) {
137  if (weight < 0.0 || std::isnan(weight) || std::isinf(weight)) {
138  throw std::invalid_argument("Item weights must be nonnegative and finite. Found: "
139  + std::to_string(weight));
140  } else if (weight == 0.0) {
141  return;
142  }
143 
144  const double new_cum_wt = cumulative_wt_ + weight;
145  const double new_wt_max = std::max(wt_max_, weight);
146  const double new_rho = std::min(1.0 / new_wt_max, k_ / new_cum_wt);
147 
148  if (cumulative_wt_ > 0.0)
149  sample_.downsample(new_rho / rho_);
150 
151  ebpps_sample<T,A> tmp(conditional_forward<FwdItem>(item), new_rho * weight, allocator_);
152 
153  sample_.merge(tmp);
154 
155  cumulative_wt_ = new_cum_wt;
156  wt_max_ = new_wt_max;
157  rho_ = new_rho;
158  ++n_;
159 }
160 
161 template<typename T, typename A>
162 auto ebpps_sketch<T,A>::get_result() const -> result_type {
163  return sample_.get_sample();
164 }
165 
166 /* Merging
167  * There is a trivial merge algorithm that involves downsampling each sketch A and B
168  * as A.cum_wt / (A.cum_wt + B.cum_wt) and B.cum_wt / (A.cum_wt + B.cum_wt),
169  * respectively. That merge does preserve first-order probabilities, specifically
170  * the probability proportional to size property, and like all other known merge
171  * algorithms distorts second-order probabilities (co-occurrences). There are
172  * pathological cases, most obvious with k=2 and A.cum_wt == B.cum_wt where that
173  * approach will always take exactly 1 item from A and 1 from B, meaning the
174  * co-occurrence rate for two items from either sketch is guaranteed to be 0.0.
175  *
176  * With EBPPS, once an item is accepted into the sketch we no longer need to
177  * track the item's weight: All accepted items are treated equally. As a result, we
178  * can take inspiration from the reservoir sampling merge in the datasketches-java
179  * library. We need to merge the smaller sketch into the larger one, swapping as
180  * needed to ensure that, at which point we simply call update() with the items
181  * in the smaller sketch as long as we adjust the weight appropriately.
182  * Merging smaller into larger is essential to ensure that no item has a
183  * contribution to C > 1.0.
184  */
185 
186 template<typename T, typename A>
188  if (sk.get_cumulative_weight() == 0.0) return;
189  else if (sk.get_cumulative_weight() > get_cumulative_weight()) {
190  // need to swap this with sk to merge smaller into larger
191  std::swap(*this, sk);
192  }
193 
194  internal_merge(sk);
195 }
196 
197 template<typename T, typename A>
199  if (sk.get_cumulative_weight() == 0.0) return;
200  else if (sk.get_cumulative_weight() > get_cumulative_weight()) {
201  // need to swap this with sk to merge, so make a copy, swap,
202  // and use that to merge
203  ebpps_sketch sk_copy(sk);
204  swap(*this, sk_copy);
205  internal_merge(sk_copy);
206  } else {
207  internal_merge(sk);
208  }
209 }
210 
211 template<typename T, typename A>
212 template<typename O>
214  // assumes that sk.cumulative_wt_ <= cumulative_wt_,
215  // which must be checked before calling this
216 
217  const ebpps_sample<T,A>& other_sample = sk.sample_;
218 
219  const double final_cum_wt = cumulative_wt_ + sk.cumulative_wt_;
220  const double new_wt_max = std::max(wt_max_, sk.wt_max_);
221  k_ = std::min(k_, sk.k_);
222  const uint64_t new_n = n_ + sk.n_;
223 
224  // Insert sk's items with the cumulative weight
225  // split between the input items. We repeat the same process
226  // for full items and the partial item, scaling the input
227  // weight appropriately.
228  // We handle all C input items, meaning we always process
229  // the partial item using a scaled down weight.
230  // Handling the partial item by probabilistically including
231  // it as a full item would be correct on average but would
232  // introduce bias for any specific merge operation.
233  const double avg_wt = sk.get_cumulative_weight() / sk.get_c();
234  auto items = other_sample.get_full_items();
235  for (size_t i = 0; i < items.size(); ++i) {
236  // new_wt_max is pre-computed
237  const double new_cum_wt = cumulative_wt_ + avg_wt;
238  const double new_rho = std::min(1.0 / new_wt_max, k_ / new_cum_wt);
239 
240  if (cumulative_wt_ > 0.0)
241  sample_.downsample(new_rho / rho_);
242 
243  ebpps_sample<T,A> tmp(conditional_forward<O>(items[i]), new_rho * avg_wt, allocator_);
244 
245  sample_.merge(tmp);
246 
247  cumulative_wt_ = new_cum_wt;
248  rho_ = new_rho;
249  }
250 
251  // insert partial item with weight scaled by the fractional part of C
252  if (other_sample.has_partial_item()) {
253  double unused;
254  const double other_c_frac = std::modf(other_sample.get_c(), &unused);
255 
256  const double new_cum_wt = cumulative_wt_ + (other_c_frac * avg_wt);
257  const double new_rho = std::min(1.0 / new_wt_max, k_ / new_cum_wt);
258 
259  if (cumulative_wt_ > 0.0)
260  sample_.downsample(new_rho / rho_);
261 
262  ebpps_sample<T,A> tmp(conditional_forward<O>(other_sample.get_partial_item()), new_rho * other_c_frac * avg_wt, allocator_);
263 
264  sample_.merge(tmp);
265 
266  cumulative_wt_ = new_cum_wt;
267  rho_ = new_rho;
268  }
269 
270  // avoid numeric issues by setting cumulative weight to the
271  // pre-computed value
272  cumulative_wt_ = final_cum_wt;
273  n_ = new_n;
274 }
275 
276 /*
277  * An empty sketch requires 8 bytes.
278  *
279  * <pre>
280  * Long || Start Byte Adr:
281  * Adr:
282  * || 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
283  * 0 || Preamble_Longs | SerVer | FamID | Flags |---------Max Res. Size (K)---------|
284  * </pre>
285  *
286  * A non-empty sketch requires 40 bytes of preamble. C looks like part of
287  * the preamble but is serialized as part of the internal sample state.
288  *
289  * The count of items seen is not used but preserved as the value seems like a useful
290  * count to track.
291  *
292  * <pre>
293  * Long || Start Byte Adr:
294  * Adr:
295  * || 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
296  * 0 || Preamble_Longs | SerVer | FamID | Flags |---------Max Res. Size (K)---------|
297  *
298  * || 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 |
299  * 1 ||---------------------------Items Seen Count (N)--------------------------------|
300  *
301  * || 16 | 17 | 18 | 19 | 20 | 21 | 22 | 23 |
302  * 2 ||----------------------------Cumulative Weight----------------------------------|
303  *
304  * || 24 | 25 | 26 | 27 | 28 | 29 | 30 | 31 |
305  * 3 ||-----------------------------Max Item Weight-----------------------------------|
306  *
307  * || 32 | 33 | 34 | 35 | 36 | 37 | 38 | 39 |
308  * 4 ||----------------------------------Rho------------------------------------------|
309  *
310  * || 40 | 41 | 42 | 43 | 44 | 45 | 46 | 47 |
311  * 5 ||-----------------------------------C-------------------------------------------|
312  *
313  * || 40+ |
314  * 6+ || {Items Array} |
315  * || {Optional Item (if needed)} |
316  * </pre>
317  */
318 
319 template<typename T, typename A>
320 template<typename SerDe>
321 size_t ebpps_sketch<T, A>::get_serialized_size_bytes(const SerDe& sd) const {
322  if (is_empty()) { return PREAMBLE_LONGS_EMPTY << 3; }
323  return (PREAMBLE_LONGS_FULL << 3) + sample_.get_serialized_size_bytes(sd);
324 }
325 
326 template<typename T, typename A>
327 template<typename SerDe>
328 auto ebpps_sketch<T,A>::serialize(unsigned header_size_bytes, const SerDe& sd) const -> vector_bytes {
329  const uint8_t prelongs = (is_empty() ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_FULL);
330 
331  const size_t size = header_size_bytes + (prelongs << 3) + sample_.get_serialized_size_bytes(sd);
332  vector_bytes bytes(size, 0, allocator_);
333  uint8_t* ptr = bytes.data() + header_size_bytes;
334  const uint8_t* end_ptr = ptr + size;
335 
336  uint8_t flags = 0;
337  if (is_empty()) {
338  flags |= EMPTY_FLAG_MASK;
339  } else {
340  flags |= sample_.has_partial_item() ? HAS_PARTIAL_ITEM_MASK : 0;
341  }
342 
343  // first prelong
344  const uint8_t ser_ver = SER_VER;
345  const uint8_t family = FAMILY_ID;
346  ptr += copy_to_mem(prelongs, ptr);
347  ptr += copy_to_mem(ser_ver, ptr);
348  ptr += copy_to_mem(family, ptr);
349  ptr += copy_to_mem(flags, ptr);
350  ptr += copy_to_mem(k_, ptr);
351 
352  if (!is_empty()) {
353  // remaining preamble
354  ptr += copy_to_mem(n_, ptr);
355  ptr += copy_to_mem(cumulative_wt_, ptr);
356  ptr += copy_to_mem(wt_max_, ptr);
357  ptr += copy_to_mem(rho_, ptr);
358  ptr += sample_.serialize(ptr, end_ptr, sd);
359  }
360 
361  return bytes;
362 }
363 
364 template<typename T, typename A>
365 template<typename SerDe>
366 void ebpps_sketch<T,A>::serialize(std::ostream& os, const SerDe& sd) const {
367  const uint8_t prelongs = (is_empty() ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_FULL);
368 
369  uint8_t flags = 0;
370  if (is_empty()) {
371  flags |= EMPTY_FLAG_MASK;
372  } else {
373  flags |= sample_.has_partial_item() ? HAS_PARTIAL_ITEM_MASK : 0;
374  }
375 
376  // first prelong
377  const uint8_t ser_ver = SER_VER;
378  const uint8_t family = FAMILY_ID;
379  write(os, prelongs);
380  write(os, ser_ver);
381  write(os, family);
382  write(os, flags);
383  write(os, k_);
384 
385  if (!is_empty()) {
386  // remaining preamble
387  write(os, n_);
388  write(os, cumulative_wt_);
389  write(os, wt_max_);
390  write(os, rho_);
391  sample_.serialize(os, sd);
392  }
393 
394  if (!os.good()) throw std::runtime_error("error writing to std::ostream");
395 }
396 
397 template<typename T, typename A>
398 template<typename SerDe>
399 ebpps_sketch<T,A> ebpps_sketch<T,A>::deserialize(const void* bytes, size_t size, const SerDe& sd, const A& allocator) {
400  ensure_minimum_memory(size, 8);
401  const uint8_t* ptr = static_cast<const uint8_t*>(bytes);
402  const uint8_t* end_ptr = ptr + size;
403  uint8_t prelongs;
404  ptr += copy_from_mem(ptr, prelongs);
405  uint8_t serial_version;
406  ptr += copy_from_mem(ptr, serial_version);
407  uint8_t family_id;
408  ptr += copy_from_mem(ptr, family_id);
409  uint8_t flags;
410  ptr += copy_from_mem(ptr, flags);
411  uint32_t k;
412  ptr += copy_from_mem(ptr, k);
413 
414  check_k(k);
415  check_preamble_longs(prelongs, flags);
416  check_family_and_serialization_version(family_id, serial_version);
417  ensure_minimum_memory(size, prelongs << 3);
418 
419  const bool empty = flags & EMPTY_FLAG_MASK;
420  if (empty)
421  return ebpps_sketch(k, allocator);
422 
423  uint64_t n;
424  ptr += copy_from_mem(ptr, n);
425  double cumulative_wt;
426  ptr += copy_from_mem(ptr, cumulative_wt);
427  double wt_max;
428  ptr += copy_from_mem(ptr, wt_max);
429  double rho;
430  ptr += copy_from_mem(ptr, rho);
431 
432  auto pair = ebpps_sample<T, A>::deserialize(ptr, end_ptr - ptr, sd, allocator);
433  ebpps_sample<T, A> sample = pair.first;
434  ptr += pair.second;
435 
436  if (sample.has_partial_item() != bool(flags & HAS_PARTIAL_ITEM_MASK))
437  throw std::runtime_error("sketch fails internal consistency check");
438 
439  return ebpps_sketch(k, n, cumulative_wt, wt_max, rho, std::move(sample), allocator);
440 }
441 
442 template<typename T, typename A>
443 template<typename SerDe>
444 ebpps_sketch<T,A> ebpps_sketch<T,A>::deserialize(std::istream& is, const SerDe& sd, const A& allocator) {
445  const uint8_t prelongs = read<uint8_t>(is);
446  const uint8_t ser_ver = read<uint8_t>(is);
447  const uint8_t family = read<uint8_t>(is);
448  const uint8_t flags = read<uint8_t>(is);
449  const uint32_t k = read<uint32_t>(is);
450 
451  check_k(k);
452  check_family_and_serialization_version(family, ser_ver);
453  check_preamble_longs(prelongs, flags);
454 
455  const bool empty = (flags & EMPTY_FLAG_MASK);
456 
457  if (empty)
458  return ebpps_sketch(k, allocator);
459 
460  const uint64_t n = read<uint64_t>(is);
461  const double cumulative_wt = read<double>(is);
462  const double wt_max = read<double>(is);
463  const double rho = read<double>(is);
464 
465  auto sample = ebpps_sample<T,A>::deserialize(is, sd, allocator);
466 
467  if (sample.has_partial_item() != bool(flags & HAS_PARTIAL_ITEM_MASK))
468  throw std::runtime_error("sketch fails internal consistency check");
469 
470  return ebpps_sketch(k, n, cumulative_wt, wt_max, rho, std::move(sample), allocator);
471 }
472 
473 template <typename T, typename A>
474 inline uint32_t ebpps_sketch<T, A>::check_k(uint32_t k)
475 {
476  if (k == 0 || k > MAX_K)
477  throw std::invalid_argument("k must be strictly positive and less than " + std::to_string(MAX_K));
478  return k;
479 }
480 
481 template<typename T, typename A>
482 void ebpps_sketch<T, A>::check_family_and_serialization_version(uint8_t family_id, uint8_t ser_ver) {
483  if (family_id == FAMILY_ID) {
484  if (ser_ver != SER_VER) {
485  throw std::invalid_argument("Possible corruption: EBPPS serialization version must be "
486  + std::to_string(SER_VER) + ". Found: " + std::to_string(ser_ver));
487  }
488  return;
489  }
490 
491  throw std::invalid_argument("Possible corruption: EBPPS Sketch family id must be "
492  + std::to_string(FAMILY_ID) + ". Found: " + std::to_string(family_id));
493 }
494 
495 template <typename T, typename A>
496 void ebpps_sketch<T, A>::check_preamble_longs(uint8_t preamble_longs, uint8_t flags)
497 {
498  const bool is_empty(flags & EMPTY_FLAG_MASK);
499 
500  if (is_empty) {
501  if (preamble_longs != PREAMBLE_LONGS_EMPTY) {
502  throw std::invalid_argument("Possible corruption: Preamble longs must be "
503  + std::to_string(PREAMBLE_LONGS_EMPTY) + " for an empty sketch. Found: "
504  + std::to_string(preamble_longs));
505  }
506  if (flags & HAS_PARTIAL_ITEM_MASK) {
507  throw std::invalid_argument("Possible corruption: Empty sketch must not "
508  "contain indications of the presence of any item");
509  }
510  } else {
511  if (preamble_longs != PREAMBLE_LONGS_FULL) {
512  throw std::invalid_argument("Possible corruption: Preamble longs must be "
513  + std::to_string(PREAMBLE_LONGS_FULL)
514  + " for a non-empty sketch. Found: " + std::to_string(preamble_longs));
515  }
516  }
517 }
518 
519 template<typename T, typename A>
520 typename ebpps_sample<T, A>::const_iterator ebpps_sketch<T, A>::begin() const {
521  return sample_.begin();
522 }
523 
524 template<typename T, typename A>
525 typename ebpps_sample<T, A>::const_iterator ebpps_sketch<T, A>::end() const {
526  return sample_.end();
527 }
528 
529 } // namespace datasketches
530 
531 #endif // _EBPPS_SKETCH_IMPL_HPP_
An implementation of an Exact and Bounded Sampling Proportional to Size sketch.
Definition: ebpps_sketch.hpp:59
string< A > items_to_string() const
Prints the raw sketch items to a string.
Definition: ebpps_sketch_impl.hpp:110
void update(const T &item, double weight=1.0)
Updates this sketch with the given data item with the given weight.
Definition: ebpps_sketch_impl.hpp:125
ebpps_sketch(uint32_t k, const A &allocator=A())
Constructor.
Definition: ebpps_sketch_impl.hpp:36
vector_bytes serialize(unsigned header_size_bytes=0, const SerDe &sd=SerDe()) const
This method serializes the sketch as a vector of bytes.
ebpps_sample< T, A >::const_iterator end() const
Iterator pointing to the past-the-end item in the sketch.
Definition: ebpps_sketch_impl.hpp:525
bool is_empty() const
Returns true if the sketch is empty.
Definition: ebpps_sketch_impl.hpp:80
ebpps_sample< T, A >::const_iterator begin() const
Iterator pointing to the first item in the sketch.
Definition: ebpps_sketch_impl.hpp:520
size_t get_serialized_size_bytes(const SerDe &sd=SerDe()) const
Computes size needed to serialize the current state of the sketch.
Definition: ebpps_sketch_impl.hpp:321
double get_cumulative_weight() const
Returns the cumulative weight of items processed by the sketch.
Definition: ebpps_sketch_impl.hpp:75
A get_allocator() const
Returns an instance of the allocator for this sketch.
Definition: ebpps_sketch_impl.hpp:120
void merge(const ebpps_sketch< T, A > &sketch)
Merges the provided sketch into the current one.
Definition: ebpps_sketch_impl.hpp:198
static ebpps_sketch deserialize(const void *bytes, size_t size, const SerDe &sd=SerDe(), const A &allocator=A())
This method deserializes a sketch from a given array of bytes.
void reset()
Resets the sketch to its default, empty state.
Definition: ebpps_sketch_impl.hpp:85
double get_c() const
Returns the expected number of samples returned upon a call to get_result() or the creation of an ite...
Definition: ebpps_sketch_impl.hpp:70
string< A > to_string() const
Prints a summary of the sketch.
Definition: ebpps_sketch_impl.hpp:94
result_type get_result() const
Returns a copy of the current sample, as a std::vector.
Definition: ebpps_sketch_impl.hpp:162
uint32_t get_k() const
Returns the configured maximum sample size.
Definition: ebpps_sketch_impl.hpp:60
uint64_t get_n() const
Returns the number of items processed by the sketch, regardless of item weight.
Definition: ebpps_sketch_impl.hpp:65
DataSketches namespace.
Definition: binomial_bounds.hpp:38