datasketches-cpp
ebpps_sample_impl.hpp
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef _EBPPS_SAMPLE_IMPL_HPP_
21 #define _EBPPS_SAMPLE_IMPL_HPP_
22 
23 #include "common_defs.hpp"
24 #include "conditional_forward.hpp"
25 #include "ebpps_sample.hpp"
26 #include "serde.hpp"
27 
28 #include <cmath>
29 #include <string>
30 #include <sstream>
31 
32 namespace datasketches {
33 
34 template<typename T, typename A>
35 ebpps_sample<T,A>::ebpps_sample(uint32_t reserved_size, const A& allocator) :
36  allocator_(allocator),
37  c_(0.0),
38  partial_item_(),
39  data_(allocator)
40  {
41  data_.reserve(reserved_size);
42  }
43 
44 template<typename T, typename A>
45 ebpps_sample<T,A>::ebpps_sample(std::vector<T, A>&& data, optional<T>&& partial_item, double c, const A& allocator) :
46  allocator_(allocator),
47  c_(c),
48  partial_item_(partial_item),
49  data_(data, allocator)
50  {}
51 
52 template<typename T, typename A>
53 template<typename TT>
54 void ebpps_sample<T,A>::replace_content(TT&& item, double theta) {
55  c_ = theta;
56  data_.clear();
57  partial_item_.reset();
58  if (theta == 1.0) {
59  data_.emplace_back(std::forward<TT>(item));
60  } else {
61  partial_item_.emplace(std::forward<TT>(item));
62  }
63 }
64 
65 template<typename T, typename A>
66 auto ebpps_sample<T,A>::get_sample() const -> result_type {
67  double unused;
68  const double c_frac = std::modf(c_, &unused);
69  const bool include_partial = next_double() < c_frac;
70  const uint32_t result_size = static_cast<uint32_t>(data_.size()) + (include_partial ? 1 : 0);
71 
72  result_type result;
73  result.reserve(result_size);
74  std::copy(data_.begin(), data_.end(), std::back_inserter(result));
75  if (include_partial)
76  result.emplace_back(static_cast<const T&>(*partial_item_));
77 
78  return result;
79 }
80 
81 template<typename T, typename A>
82 void ebpps_sample<T,A>::downsample(double theta) {
83  if (theta >= 1.0) return;
84 
85  const double new_c = theta * c_;
86  double new_c_int;
87  const double new_c_frac = std::modf(new_c, &new_c_int);
88  double c_int;
89  const double c_frac = std::modf(c_, &c_int);
90 
91  if (new_c_int == 0.0) {
92  // no full items retained
93  if (next_double() > (c_frac / c_)) {
94  swap_with_partial();
95  }
96  data_.clear();
97  } else if (new_c_int == c_int) {
98  // no items deleted
99  if (next_double() > (1 - theta * c_frac)/(1 - new_c_frac)) {
100  swap_with_partial();
101  }
102  } else {
103  if (next_double() < theta * c_frac) {
104  // subsample data in random order; last item is partial
105  // create sample size new_c_int then swap_with_partial()
106  subsample(static_cast<uint32_t>(new_c_int));
107  swap_with_partial();
108  } else {
109  // create sample size new_c_int + 1 then move_one_to_partial)
110  subsample(static_cast<uint32_t>(new_c_int) + 1);
111  move_one_to_partial();
112  }
113  }
114 
115  if (new_c == new_c_int)
116  partial_item_.reset();
117 
118  c_ = new_c;
119 }
120 
121 template<typename T, typename A>
122 template<typename FwdSample>
123 void ebpps_sample<T,A>::merge(FwdSample&& other) {
124  double c_int;
125  const double c_frac = std::modf(c_, &c_int);
126 
127  double unused;
128  const double other_c_frac = std::modf(other.c_, &unused);
129 
130  // update c_ here but do NOT recompute fractional part yet
131  c_ += other.c_;
132 
133  for (uint32_t i = 0; i < other.data_.size(); ++i)
134  data_.emplace_back(conditional_forward<FwdSample>(other.data_[i]));
135 
136  // This modifies the original algorithm slightly due to numeric
137  // precision issues. Specifically, the test if c_frac + other_c_frac == 1.0
138  // happens before tests for < 1.0 or > 1.0 and can also be triggered
139  // if c_ == floor(c_) (the updated value of c_, not the input).
140  //
141  // We can still run into issues where c_frac + other_c_frac == epsilon
142  // and the first case would have ideally triggered. As a result, we must
143  // check if the partial item exists before adding to the data_ vector.
144 
145  if (c_frac == 0.0 && other_c_frac == 0.0) {
146  partial_item_.reset();
147  } else if (c_frac + other_c_frac == 1.0 || c_ == std::floor(c_)) {
148  if (next_double() <= c_frac) {
149  if (partial_item_)
150  data_.emplace_back(std::move(*partial_item_));
151  } else {
152  if (other.partial_item_)
153  data_.emplace_back(conditional_forward<FwdSample>(*other.partial_item_));
154  }
155  partial_item_.reset();
156  } else if (c_frac + other_c_frac < 1.0) {
157  if (next_double() > c_frac / (c_frac + other_c_frac)) {
158  set_partial(conditional_forward<FwdSample>(*other.partial_item_));
159  }
160  } else { // c_frac + other_c_frac > 1
161  if (next_double() <= (1 - c_frac) / ((1 - c_frac) + (1 - other_c_frac))) {
162  data_.emplace_back(conditional_forward<FwdSample>(*other.partial_item_));
163  } else {
164  data_.emplace_back(std::move(*partial_item_));
165  partial_item_.reset();
166  set_partial(conditional_forward<FwdSample>(*other.partial_item_));
167  }
168  }
169 }
170 
171 template<typename T, typename A>
172 string<A> ebpps_sample<T ,A>::to_string() const {
173  std::ostringstream oss;
174  oss << " sample:" << std::endl;
175  uint32_t idx = 0;
176  for (const T& item : data_)
177  oss << "\t" << idx++ << ":\t" << item << std::endl;
178  oss << " partial: ";
179  if (bool(partial_item_))
180  oss << (*partial_item_) << std::endl;
181  else
182  oss << "NULL" << std::endl;
183 
184  return oss.str();
185 }
186 
187 template<typename T, typename A>
188 void ebpps_sample<T,A>::subsample(uint32_t num_samples) {
189  // we can perform a Fisher-Yates style shuffle, stopping after
190  // num_samples points since subsequent swaps would only be
191  // between items after num_samples. This is valid since a
192  // point from anywhere in the initial array would be eligible
193  // to end up in the final subsample.
194 
195  if (num_samples == data_.size()) return;
196 
197  auto erase_start = data_.begin();
198  uint32_t data_len = static_cast<uint32_t>(data_.size());
199  for (uint32_t i = 0; i < num_samples; ++i, ++erase_start) {
200  uint32_t j = i + random_idx(data_len - i);
201  std::swap(data_[i], data_[j]);
202  }
203 
204  data_.erase(erase_start, data_.end());
205 }
206 
207 template<typename T, typename A>
208 template<typename FwdItem>
209 void ebpps_sample<T,A>::set_partial(FwdItem&& item) {
210  if (partial_item_)
211  *partial_item_ = conditional_forward<FwdItem>(item);
212  else
213  partial_item_.emplace(conditional_forward<FwdItem>(item));
214 }
215 
216 template<typename T, typename A>
217 void ebpps_sample<T,A>::move_one_to_partial() {
218  const size_t idx = random_idx(static_cast<uint32_t>(data_.size()));
219  // swap selected item to end so we can delete it easily
220  const size_t last_idx = data_.size() - 1;
221  if (idx != last_idx) {
222  std::swap(data_[idx], data_[last_idx]);
223  }
224 
225  set_partial(std::move(data_[last_idx]));
226 
227  data_.pop_back();
228 }
229 
230 template<typename T, typename A>
231 void ebpps_sample<T,A>::swap_with_partial() {
232  if (partial_item_) {
233  const size_t idx = random_idx(static_cast<uint32_t>(data_.size()));
234  std::swap(data_[idx], *partial_item_);
235  } else {
236  move_one_to_partial();
237  }
238 }
239 
240 template<typename T, typename A>
241 void ebpps_sample<T,A>::reset() {
242  c_ = 0.0;
243  partial_item_.reset();
244  data_.clear();
245 }
246 
247 template<typename T, typename A>
248 double ebpps_sample<T,A>::get_c() const {
249  return c_;
250 }
251 
252 template<typename T, typename A>
253 auto ebpps_sample<T,A>::get_full_items() const -> result_type {
254  return result_type(data_);
255 }
256 
257 template<typename T, typename A>
258 bool ebpps_sample<T,A>::has_partial_item() const {
259  return bool(partial_item_);
260 }
261 
262 template<typename T, typename A>
263 T ebpps_sample<T,A>::get_partial_item() const {
264  if (!partial_item_) throw std::runtime_error("Call to get_partial_item() when no partial item exists");
265  return *partial_item_;
266 }
267 
268 template<typename T, typename A>
269 uint32_t ebpps_sample<T,A>::random_idx(uint32_t max) {
270  static std::uniform_int_distribution<uint32_t> dist;
271  return dist(random_utils::rand, std::uniform_int_distribution<uint32_t>::param_type(0, max - 1));
272 }
273 
274 template<typename T, typename A>
275 double ebpps_sample<T,A>::next_double() {
276  return random_utils::next_double(random_utils::rand);
277 }
278 
279 template<typename T, typename A>
280 uint32_t ebpps_sample<T,A>::get_num_retained_items() const {
281  return static_cast<uint32_t>(data_.size() + (partial_item_ ? 1 : 0));
282 }
283 
284 // implementation for fixed-size arithmetic types (integral and floating point)
285 template<typename T, typename A>
286 template<typename TT, typename SerDe, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
287 size_t ebpps_sample<T, A>::get_serialized_size_bytes(const SerDe&) const {
288  if (c_ == 0.0)
289  return 0;
290  else
291  return sizeof(c_) + (data_.size() + (partial_item_ ? 1 : 0)) * sizeof(T);
292 }
293 
294 // implementation for all other types
295 template<typename T, typename A>
296 template<typename TT, typename SerDe, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
297 size_t ebpps_sample<T, A>::get_serialized_size_bytes(const SerDe& sd) const {
298  if (c_ == 0.0) return 0;
299 
300  size_t num_bytes = sizeof(c_);
301  for (auto it : data_)
302  num_bytes += sd.size_of_item(it);
303 
304  if (partial_item_)
305  num_bytes += sd.size_of_item(*partial_item_);
306 
307  return num_bytes;
308 }
309 
310 template<typename T, typename A>
311 template<typename SerDe>
312 size_t ebpps_sample<T,A>::serialize(uint8_t* ptr, const uint8_t* end_ptr, const SerDe& sd) const {
313  uint8_t* st_ptr = ptr;
314 
315  ensure_minimum_memory(end_ptr - ptr, sizeof(c_));
316  ptr += copy_to_mem(c_, ptr);
317 
318  ptr += sd.serialize(ptr, end_ptr - ptr, data_.data(), static_cast<unsigned>(data_.size()));
319 
320  if (partial_item_) {
321  ptr += sd.serialize(ptr, end_ptr - ptr, &*partial_item_, 1);
322  }
323 
324  return ptr - st_ptr;
325 }
326 
327 template<typename T, typename A>
328 template<typename SerDe>
329 void ebpps_sample<T,A>::serialize(std::ostream& os, const SerDe& sd) const {
330  write(os, c_);
331 
332  sd.serialize(os, data_.data(), static_cast<unsigned>(data_.size()));
333 
334  if (partial_item_)
335  sd.serialize(os, &*partial_item_, 1);
336 
337  if (!os.good()) throw std::runtime_error("error writing to std::ostream");
338 }
339 
340 template<typename T, typename A>
341 template<typename SerDe>
342 std::pair<ebpps_sample<T, A>, size_t> ebpps_sample<T, A>::deserialize(const uint8_t* ptr, size_t size, const SerDe& sd, const A& allocator) {
343  const uint8_t* st_ptr = ptr;
344  const uint8_t* end_ptr = ptr + size;
345 
346  ensure_minimum_memory(size, sizeof(double));
347  double c;
348  ptr += copy_from_mem(ptr, c);
349  if (c < 0.0)
350  throw std::runtime_error("sketch image has C < 0.0 during deserializaiton");
351 
352  double c_int;
353  const double c_frac = std::modf(c, &c_int);
354  const bool has_partial = c_frac != 0.0;
355 
356  const uint32_t num_full_items = static_cast<uint32_t>(c_int);
357  A alloc(allocator);
358  std::unique_ptr<T, items_deleter> items(alloc.allocate(num_full_items), items_deleter(allocator, false, num_full_items));
359  ptr += sd.deserialize(ptr, end_ptr - ptr, items.get(), num_full_items);
360  // serde did not throw, enable destructors
361  items.get_deleter().set_destroy(true);
362  std::vector<T, A> data(std::make_move_iterator(items.get()),
363  std::make_move_iterator(items.get() + num_full_items),
364  allocator);
365 
366  optional<T> partial_item;
367  if (has_partial) {
368  optional<T> tmp; // space to deserialize
369  ptr += sd.deserialize(ptr, end_ptr - ptr, &*tmp, 1);
370  // serde did not throw so place item and clean up
371  partial_item.emplace(*tmp);
372  (*tmp).~T();
373  }
374 
375  return std::pair<ebpps_sample<T,A>, size_t>(
376  ebpps_sample<T,A>(std::move(data), std::move(partial_item), c, allocator),
377  ptr - st_ptr);
378 }
379 
380 template<typename T, typename A>
381 template<typename SerDe>
382 ebpps_sample<T, A> ebpps_sample<T, A>::deserialize(std::istream& is, const SerDe& sd, const A& allocator) {
383  const double c = read<double>(is);
384  if (c < 0.0)
385  throw std::runtime_error("sketch image has C < 0.0 during deserializaiton");
386 
387  double c_int;
388  const double c_frac = std::modf(c, &c_int);
389  const bool has_partial = c_frac != 0.0;
390 
391  const uint32_t num_full_items = static_cast<uint32_t>(c_int);
392  A alloc(allocator);
393  std::unique_ptr<T, items_deleter> items(alloc.allocate(num_full_items), items_deleter(allocator, false, num_full_items));
394  sd.deserialize(is, items.get(), num_full_items);
395  // serde did not throw, enable destructors
396  items.get_deleter().set_destroy(true);
397  std::vector<T, A> data(std::make_move_iterator(items.get()),
398  std::make_move_iterator(items.get() + num_full_items),
399  allocator);
400 
401  optional<T> partial_item;
402  if (has_partial) {
403  optional<T> tmp; // space to deserialize
404  sd.deserialize(is, &*tmp, 1);
405  // serde did not throw so place item and clean up
406  partial_item.emplace(*tmp);
407  (*tmp).~T();
408  }
409 
410  if (!is.good()) throw std::runtime_error("error reading from std::istream");
411 
412  return ebpps_sample<T,A>(std::move(data), std::move(partial_item), c, allocator);
413 }
414 
415 
416 template<typename T, typename A>
417 typename ebpps_sample<T, A>::const_iterator ebpps_sample<T, A>::begin() const {
418  return const_iterator(this);
419 }
420 
421 template<typename T, typename A>
422 typename ebpps_sample<T, A>::const_iterator ebpps_sample<T, A>::end() const {
423  return const_iterator(nullptr);
424 }
425 
426 
427 // -------- ebpps_sketch::const_iterator implementation ---------
428 
429 template<typename T, typename A>
430 ebpps_sample<T, A>::const_iterator::const_iterator(const ebpps_sample* sample) :
431  sample_(sample),
432  idx_(0),
433  use_partial_(false)
434 {
435  if (sample == nullptr)
436  return;
437 
438  // determine in advance if we use the partial item
439  double c_int;
440  const double c_frac = std::modf(sample_->get_c(), &c_int);
441  use_partial_ = sample->next_double() < c_frac;
442 
443  // sample with no items
444  if (sample_->data_.size() == 0 && use_partial_) {
445  idx_ = PARTIAL_IDX;
446  }
447 
448  if (sample_->c_== 0.0 || (sample_->data_.size() == 0 && !sample_->has_partial_item())) { sample_ = nullptr; }
449 }
450 
451 template<typename T, typename A>
452 ebpps_sample<T, A>::const_iterator::const_iterator(const const_iterator& other) :
453  sample_(other.sample_),
454  idx_(other.idx_),
455  use_partial_(other.use_partial_)
456 {}
457 
458 template<typename T, typename A>
459 typename ebpps_sample<T, A>::const_iterator& ebpps_sample<T, A>::const_iterator::operator++() {
460  if (sample_ == nullptr)
461  return *this;
462  else if (idx_ == PARTIAL_IDX) {
463  idx_ = sample_->data_.size();
464  sample_ = nullptr;
465  return * this;
466  }
467 
468  ++idx_;
469 
470  if (idx_ == sample_->data_.size()) {
471  if (use_partial_)
472  idx_ = PARTIAL_IDX;
473  else
474  sample_ = nullptr;
475  }
476 
477  return *this;
478 }
479 
480 template<typename T, typename A>
481 typename ebpps_sample<T, A>::const_iterator& ebpps_sample<T, A>::const_iterator::operator++(int) {
482  const_iterator tmp(*this);
483  operator++();
484  return tmp;
485 }
486 
487 template<typename T, typename A>
488 bool ebpps_sample<T, A>::const_iterator::operator==(const const_iterator& other) const {
489  if (sample_ != other.sample_) return false;
490  if (sample_ == nullptr) return true; // end (and we know other.sample_ is also null)
491  return idx_ == other.idx_;
492 }
493 
494 template<typename T, typename A>
495 bool ebpps_sample<T, A>::const_iterator::operator!=(const const_iterator& other) const {
496  return !operator==(other);
497 }
498 
499 template<typename T, typename A>
500 auto ebpps_sample<T, A>::const_iterator::operator*() const -> reference {
501  if (idx_ == PARTIAL_IDX)
502  return *(sample_->partial_item_);
503  else
504  return sample_->data_[idx_];
505 }
506 
507 template<typename T, typename A>
508 auto ebpps_sample<T, A>::const_iterator::operator->() const -> pointer {
509  return **this;
510 }
511 
512 template<typename T, typename A>
513 class ebpps_sample<T, A>::items_deleter {
514  public:
515  items_deleter(const A& allocator, bool destroy, size_t num): allocator_(allocator), destroy_(destroy), num_(num) {}
516  void operator() (T* ptr) {
517  if (ptr != nullptr) {
518  if (destroy_) {
519  for (size_t i = 0; i < num_; ++i) {
520  ptr[i].~T();
521  }
522  }
523  allocator_.deallocate(ptr, num_);
524  }
525  }
526  void set_destroy(bool destroy) { destroy_ = destroy; }
527  private:
528  A allocator_;
529  bool destroy_;
530  size_t num_;
531 };
532 
533 
534 } // namespace datasketches
535 
536 #endif // _EBPPS_SAMPLE_IMPL_HPP_
DataSketches namespace.
Definition: binomial_bounds.hpp:38