datasketches-cpp
All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
ebpps_sample_impl.hpp
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#ifndef _EBPPS_SAMPLE_IMPL_HPP_
21#define _EBPPS_SAMPLE_IMPL_HPP_
22
23#include "common_defs.hpp"
24#include "conditional_forward.hpp"
25#include "ebpps_sample.hpp"
26#include "serde.hpp"
27
28#include <cmath>
29#include <string>
30#include <sstream>
31
32namespace datasketches {
33
34template<typename T, typename A>
35ebpps_sample<T,A>::ebpps_sample(uint32_t reserved_size, const A& allocator) :
36 allocator_(allocator),
37 c_(0.0),
38 partial_item_(),
39 data_(allocator)
40 {
41 data_.reserve(reserved_size);
42 }
43
44template<typename T, typename A>
45ebpps_sample<T,A>::ebpps_sample(std::vector<T, A>&& data, optional<T>&& partial_item, double c, const A& allocator) :
46 allocator_(allocator),
47 c_(c),
48 partial_item_(partial_item),
49 data_(data, allocator)
50 {}
51
52template<typename T, typename A>
53template<typename TT>
54void ebpps_sample<T,A>::replace_content(TT&& item, double theta) {
55 c_ = theta;
56 data_.clear();
57 partial_item_.reset();
58 if (theta == 1.0) {
59 data_.emplace_back(std::forward<TT>(item));
60 } else {
61 partial_item_.emplace(std::forward<TT>(item));
62 }
63}
64
65template<typename T, typename A>
66auto ebpps_sample<T,A>::get_sample() const -> result_type {
67 double unused;
68 const double c_frac = std::modf(c_, &unused);
69 const bool include_partial = next_double() < c_frac;
70 const uint32_t result_size = static_cast<uint32_t>(data_.size()) + (include_partial ? 1 : 0);
71
72 result_type result;
73 result.reserve(result_size);
74 std::copy(data_.begin(), data_.end(), std::back_inserter(result));
75 if (include_partial)
76 result.emplace_back(static_cast<const T&>(*partial_item_));
77
78 return result;
79}
80
81template<typename T, typename A>
82void ebpps_sample<T,A>::downsample(double theta) {
83 if (theta >= 1.0) return;
84
85 const double new_c = theta * c_;
86 double new_c_int;
87 const double new_c_frac = std::modf(new_c, &new_c_int);
88 double c_int;
89 const double c_frac = std::modf(c_, &c_int);
90
91 if (new_c_int == 0.0) {
92 // no full items retained
93 if (next_double() > (c_frac / c_)) {
94 swap_with_partial();
95 }
96 data_.clear();
97 } else if (new_c_int == c_int) {
98 // no items deleted
99 if (next_double() > (1 - theta * c_frac)/(1 - new_c_frac)) {
100 swap_with_partial();
101 }
102 } else {
103 if (next_double() < theta * c_frac) {
104 // subsample data in random order; last item is partial
105 // create sample size new_c_int then swap_with_partial()
106 subsample(static_cast<uint32_t>(new_c_int));
107 swap_with_partial();
108 } else {
109 // create sample size new_c_int + 1 then move_one_to_partial)
110 subsample(static_cast<uint32_t>(new_c_int) + 1);
111 move_one_to_partial();
112 }
113 }
114
115 if (new_c == new_c_int)
116 partial_item_.reset();
117
118 c_ = new_c;
119}
120
121template<typename T, typename A>
122template<typename FwdSample>
123void ebpps_sample<T,A>::merge(FwdSample&& other) {
124 double c_int;
125 const double c_frac = std::modf(c_, &c_int);
126
127 double unused;
128 const double other_c_frac = std::modf(other.c_, &unused);
129
130 // update c_ here but do NOT recompute fractional part yet
131 c_ += other.c_;
132
133 for (uint32_t i = 0; i < other.data_.size(); ++i)
134 data_.emplace_back(conditional_forward<FwdSample>(other.data_[i]));
135
136 // This modifies the original algorithm slightly due to numeric
137 // precision issues. Specifically, the test if c_frac + other_c_frac == 1.0
138 // happens before tests for < 1.0 or > 1.0 and can also be triggered
139 // if c_ == floor(c_) (the updated value of c_, not the input).
140 //
141 // We can still run into issues where c_frac + other_c_frac == epsilon
142 // and the first case would have ideally triggered. As a result, we must
143 // check if the partial item exists before adding to the data_ vector.
144
145 if (c_frac == 0.0 && other_c_frac == 0.0) {
146 partial_item_.reset();
147 } else if (c_frac + other_c_frac == 1.0 || c_ == std::floor(c_)) {
148 if (next_double() <= c_frac) {
149 if (partial_item_)
150 data_.emplace_back(std::move(*partial_item_));
151 } else {
152 if (other.partial_item_)
153 data_.emplace_back(conditional_forward<FwdSample>(*other.partial_item_));
154 }
155 partial_item_.reset();
156 } else if (c_frac + other_c_frac < 1.0) {
157 if (next_double() > c_frac / (c_frac + other_c_frac)) {
158 set_partial(conditional_forward<FwdSample>(*other.partial_item_));
159 }
160 } else { // c_frac + other_c_frac > 1
161 if (next_double() <= (1 - c_frac) / ((1 - c_frac) + (1 - other_c_frac))) {
162 data_.emplace_back(conditional_forward<FwdSample>(*other.partial_item_));
163 } else {
164 data_.emplace_back(std::move(*partial_item_));
165 partial_item_.reset();
166 set_partial(conditional_forward<FwdSample>(*other.partial_item_));
167 }
168 }
169}
170
171template<typename T, typename A>
172string<A> ebpps_sample<T ,A>::to_string() const {
173 std::ostringstream oss;
174 oss << " sample:" << std::endl;
175 uint32_t idx = 0;
176 for (const T& item : data_)
177 oss << "\t" << idx++ << ":\t" << item << std::endl;
178 oss << " partial: ";
179 if (bool(partial_item_))
180 oss << (*partial_item_) << std::endl;
181 else
182 oss << "NULL" << std::endl;
183
184 return oss.str();
185}
186
187template<typename T, typename A>
188void ebpps_sample<T,A>::subsample(uint32_t num_samples) {
189 // we can perform a Fisher-Yates style shuffle, stopping after
190 // num_samples points since subsequent swaps would only be
191 // between items after num_samples. This is valid since a
192 // point from anywhere in the initial array would be eligible
193 // to end up in the final subsample.
194
195 if (num_samples == data_.size()) return;
196
197 auto erase_start = data_.begin();
198 uint32_t data_len = static_cast<uint32_t>(data_.size());
199 for (uint32_t i = 0; i < num_samples; ++i, ++erase_start) {
200 uint32_t j = i + random_idx(data_len - i);
201 std::swap(data_[i], data_[j]);
202 }
203
204 data_.erase(erase_start, data_.end());
205}
206
207template<typename T, typename A>
208template<typename FwdItem>
209void ebpps_sample<T,A>::set_partial(FwdItem&& item) {
210 if (partial_item_)
211 *partial_item_ = conditional_forward<FwdItem>(item);
212 else
213 partial_item_.emplace(conditional_forward<FwdItem>(item));
214}
215
216template<typename T, typename A>
217void ebpps_sample<T,A>::move_one_to_partial() {
218 const size_t idx = random_idx(static_cast<uint32_t>(data_.size()));
219 // swap selected item to end so we can delete it easily
220 const size_t last_idx = data_.size() - 1;
221 if (idx != last_idx) {
222 std::swap(data_[idx], data_[last_idx]);
223 }
224
225 set_partial(std::move(data_[last_idx]));
226
227 data_.pop_back();
228}
229
230template<typename T, typename A>
231void ebpps_sample<T,A>::swap_with_partial() {
232 if (partial_item_) {
233 const size_t idx = random_idx(static_cast<uint32_t>(data_.size()));
234 std::swap(data_[idx], *partial_item_);
235 } else {
236 move_one_to_partial();
237 }
238}
239
240template<typename T, typename A>
241void ebpps_sample<T,A>::reset() {
242 c_ = 0.0;
243 partial_item_.reset();
244 data_.clear();
245}
246
247template<typename T, typename A>
248double ebpps_sample<T,A>::get_c() const {
249 return c_;
250}
251
252template<typename T, typename A>
253auto ebpps_sample<T,A>::get_full_items() const -> result_type {
254 return result_type(data_);
255}
256
257template<typename T, typename A>
258bool ebpps_sample<T,A>::has_partial_item() const {
259 return bool(partial_item_);
260}
261
262template<typename T, typename A>
263T ebpps_sample<T,A>::get_partial_item() const {
264 if (!partial_item_) throw std::runtime_error("Call to get_partial_item() when no partial item exists");
265 return *partial_item_;
266}
267
268template<typename T, typename A>
269uint32_t ebpps_sample<T,A>::random_idx(uint32_t max) {
270 static std::uniform_int_distribution<uint32_t> dist;
271 return dist(random_utils::rand, std::uniform_int_distribution<uint32_t>::param_type(0, max - 1));
272}
273
274template<typename T, typename A>
275double ebpps_sample<T,A>::next_double() {
276 return random_utils::next_double(random_utils::rand);
277}
278
279template<typename T, typename A>
280uint32_t ebpps_sample<T,A>::get_num_retained_items() const {
281 return static_cast<uint32_t>(data_.size() + (partial_item_ ? 1 : 0));
282}
283
284// implementation for fixed-size arithmetic types (integral and floating point)
285template<typename T, typename A>
286template<typename TT, typename SerDe, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
287size_t ebpps_sample<T, A>::get_serialized_size_bytes(const SerDe&) const {
288 if (c_ == 0.0)
289 return 0;
290 else
291 return sizeof(c_) + (data_.size() + (partial_item_ ? 1 : 0)) * sizeof(T);
292}
293
294// implementation for all other types
295template<typename T, typename A>
296template<typename TT, typename SerDe, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
297size_t ebpps_sample<T, A>::get_serialized_size_bytes(const SerDe& sd) const {
298 if (c_ == 0.0) return 0;
299
300 size_t num_bytes = sizeof(c_);
301 for (auto it : data_)
302 num_bytes += sd.size_of_item(it);
303
304 if (partial_item_)
305 num_bytes += sd.size_of_item(*partial_item_);
306
307 return num_bytes;
308}
309
310template<typename T, typename A>
311template<typename SerDe>
312size_t ebpps_sample<T,A>::serialize(uint8_t* ptr, const uint8_t* end_ptr, const SerDe& sd) const {
313 uint8_t* st_ptr = ptr;
314
315 ensure_minimum_memory(end_ptr - ptr, sizeof(c_));
316 ptr += copy_to_mem(c_, ptr);
317
318 ptr += sd.serialize(ptr, end_ptr - ptr, data_.data(), static_cast<unsigned>(data_.size()));
319
320 if (partial_item_) {
321 ptr += sd.serialize(ptr, end_ptr - ptr, &*partial_item_, 1);
322 }
323
324 return ptr - st_ptr;
325}
326
327template<typename T, typename A>
328template<typename SerDe>
329void ebpps_sample<T,A>::serialize(std::ostream& os, const SerDe& sd) const {
330 write(os, c_);
331
332 sd.serialize(os, data_.data(), static_cast<unsigned>(data_.size()));
333
334 if (partial_item_)
335 sd.serialize(os, &*partial_item_, 1);
336
337 if (!os.good()) throw std::runtime_error("error writing to std::ostream");
338}
339
340template<typename T, typename A>
341template<typename SerDe>
342std::pair<ebpps_sample<T, A>, size_t> ebpps_sample<T, A>::deserialize(const uint8_t* ptr, size_t size, const SerDe& sd, const A& allocator) {
343 const uint8_t* st_ptr = ptr;
344 const uint8_t* end_ptr = ptr + size;
345
346 ensure_minimum_memory(size, sizeof(double));
347 double c;
348 ptr += copy_from_mem(ptr, c);
349 if (c < 0.0)
350 throw std::runtime_error("sketch image has C < 0.0 during deserializaiton");
351
352 double c_int;
353 const double c_frac = std::modf(c, &c_int);
354 const bool has_partial = c_frac != 0.0;
355
356 const uint32_t num_full_items = static_cast<uint32_t>(c_int);
357 A alloc(allocator);
358 std::unique_ptr<T, items_deleter> items(alloc.allocate(num_full_items), items_deleter(allocator, false, num_full_items));
359 ptr += sd.deserialize(ptr, end_ptr - ptr, items.get(), num_full_items);
360 // serde did not throw, enable destructors
361 items.get_deleter().set_destroy(true);
362 std::vector<T, A> data(std::make_move_iterator(items.get()),
363 std::make_move_iterator(items.get() + num_full_items),
364 allocator);
365
366 optional<T> partial_item;
367 if (has_partial) {
368 optional<T> tmp; // space to deserialize
369 ptr += sd.deserialize(ptr, end_ptr - ptr, &*tmp, 1);
370 // serde did not throw so place item and clean up
371 partial_item.emplace(*tmp);
372 (*tmp).~T();
373 }
374
375 return std::pair<ebpps_sample<T,A>, size_t>(
376 ebpps_sample<T,A>(std::move(data), std::move(partial_item), c, allocator),
377 ptr - st_ptr);
378}
379
380template<typename T, typename A>
381template<typename SerDe>
382ebpps_sample<T, A> ebpps_sample<T, A>::deserialize(std::istream& is, const SerDe& sd, const A& allocator) {
383 const double c = read<double>(is);
384 if (c < 0.0)
385 throw std::runtime_error("sketch image has C < 0.0 during deserializaiton");
386
387 double c_int;
388 const double c_frac = std::modf(c, &c_int);
389 const bool has_partial = c_frac != 0.0;
390
391 const uint32_t num_full_items = static_cast<uint32_t>(c_int);
392 A alloc(allocator);
393 std::unique_ptr<T, items_deleter> items(alloc.allocate(num_full_items), items_deleter(allocator, false, num_full_items));
394 sd.deserialize(is, items.get(), num_full_items);
395 // serde did not throw, enable destructors
396 items.get_deleter().set_destroy(true);
397 std::vector<T, A> data(std::make_move_iterator(items.get()),
398 std::make_move_iterator(items.get() + num_full_items),
399 allocator);
400
401 optional<T> partial_item;
402 if (has_partial) {
403 optional<T> tmp; // space to deserialize
404 sd.deserialize(is, &*tmp, 1);
405 // serde did not throw so place item and clean up
406 partial_item.emplace(*tmp);
407 (*tmp).~T();
408 }
409
410 if (!is.good()) throw std::runtime_error("error reading from std::istream");
411
412 return ebpps_sample<T,A>(std::move(data), std::move(partial_item), c, allocator);
413}
414
415
416template<typename T, typename A>
417typename ebpps_sample<T, A>::const_iterator ebpps_sample<T, A>::begin() const {
418 return const_iterator(this);
419}
420
421template<typename T, typename A>
422typename ebpps_sample<T, A>::const_iterator ebpps_sample<T, A>::end() const {
423 return const_iterator(nullptr);
424}
425
426
427// -------- ebpps_sketch::const_iterator implementation ---------
428
429template<typename T, typename A>
430ebpps_sample<T, A>::const_iterator::const_iterator(const ebpps_sample* sample) :
431 sample_(sample),
432 idx_(0),
433 use_partial_(false)
434{
435 if (sample == nullptr)
436 return;
437
438 // determine in advance if we use the partial item
439 double c_int;
440 const double c_frac = std::modf(sample_->get_c(), &c_int);
441 use_partial_ = sample->next_double() < c_frac;
442
443 // sample with no items
444 if (sample_->data_.size() == 0 && use_partial_) {
445 idx_ = PARTIAL_IDX;
446 }
447
448 if (sample_->c_== 0.0 || (sample_->data_.size() == 0 && !sample_->has_partial_item())) { sample_ = nullptr; }
449}
450
451template<typename T, typename A>
452ebpps_sample<T, A>::const_iterator::const_iterator(const const_iterator& other) :
453 sample_(other.sample_),
454 idx_(other.idx_),
455 use_partial_(other.use_partial_)
456{}
457
458template<typename T, typename A>
459typename ebpps_sample<T, A>::const_iterator& ebpps_sample<T, A>::const_iterator::operator++() {
460 if (sample_ == nullptr)
461 return *this;
462 else if (idx_ == PARTIAL_IDX) {
463 idx_ = sample_->data_.size();
464 sample_ = nullptr;
465 return * this;
466 }
467
468 ++idx_;
469
470 if (idx_ == sample_->data_.size()) {
471 if (use_partial_)
472 idx_ = PARTIAL_IDX;
473 else
474 sample_ = nullptr;
475 }
476
477 return *this;
478}
479
480template<typename T, typename A>
481typename ebpps_sample<T, A>::const_iterator& ebpps_sample<T, A>::const_iterator::operator++(int) {
482 const_iterator tmp(*this);
483 operator++();
484 return tmp;
485}
486
487template<typename T, typename A>
488bool ebpps_sample<T, A>::const_iterator::operator==(const const_iterator& other) const {
489 if (sample_ != other.sample_) return false;
490 if (sample_ == nullptr) return true; // end (and we know other.sample_ is also null)
491 return idx_ == other.idx_;
492}
493
494template<typename T, typename A>
495bool ebpps_sample<T, A>::const_iterator::operator!=(const const_iterator& other) const {
496 return !operator==(other);
497}
498
499template<typename T, typename A>
500auto ebpps_sample<T, A>::const_iterator::operator*() const -> reference {
501 if (idx_ == PARTIAL_IDX)
502 return *(sample_->partial_item_);
503 else
504 return sample_->data_[idx_];
505}
506
507template<typename T, typename A>
508auto ebpps_sample<T, A>::const_iterator::operator->() const -> pointer {
509 return **this;
510}
511
512template<typename T, typename A>
513class ebpps_sample<T, A>::items_deleter {
514 public:
515 items_deleter(const A& allocator, bool destroy, size_t num): allocator_(allocator), destroy_(destroy), num_(num) {}
516 void operator() (T* ptr) {
517 if (ptr != nullptr) {
518 if (destroy_) {
519 for (size_t i = 0; i < num_; ++i) {
520 ptr[i].~T();
521 }
522 }
523 allocator_.deallocate(ptr, num_);
524 }
525 }
526 void set_destroy(bool destroy) { destroy_ = destroy; }
527 private:
528 A allocator_;
529 bool destroy_;
530 size_t num_;
531};
532
533
534} // namespace datasketches
535
536#endif // _EBPPS_SAMPLE_IMPL_HPP_
DataSketches namespace.
Definition binomial_bounds.hpp:38