datasketches-cpp
Loading...
Searching...
No Matches
ebpps_sample_impl.hpp
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#ifndef _EBPPS_SAMPLE_IMPL_HPP_
21#define _EBPPS_SAMPLE_IMPL_HPP_
22
23#include "common_defs.hpp"
24#include "conditional_forward.hpp"
25#include "ebpps_sample.hpp"
26#include "serde.hpp"
27
28#include <cmath>
29#include <string>
30#include <sstream>
31#include <type_traits>
32
33namespace datasketches {
34
35template<typename T, typename A>
36ebpps_sample<T,A>::ebpps_sample(uint32_t reserved_size, const A& allocator) :
37 allocator_(allocator),
38 c_(0.0),
39 partial_item_(),
40 data_(allocator)
41 {
42 data_.reserve(reserved_size);
43 }
44
45template<typename T, typename A>
46ebpps_sample<T,A>::ebpps_sample(std::vector<T, A>&& data, optional<T>&& partial_item, double c, const A& allocator) :
47 allocator_(allocator),
48 c_(c),
49 partial_item_(partial_item),
50 data_(data, allocator)
51 {}
52
53template<typename T, typename A>
54template<typename TT>
55void ebpps_sample<T,A>::replace_content(TT&& item, double theta) {
56 c_ = theta;
57 data_.clear();
58 partial_item_.reset();
59 if (theta == 1.0) {
60 data_.emplace_back(std::forward<TT>(item));
61 } else {
62 partial_item_.emplace(std::forward<TT>(item));
63 }
64}
65
66template<typename T, typename A>
67auto ebpps_sample<T,A>::get_sample() const -> result_type {
68 double unused;
69 const double c_frac = std::modf(c_, &unused);
70 const bool include_partial = next_double() < c_frac;
71 const uint32_t result_size = static_cast<uint32_t>(data_.size()) + (include_partial ? 1 : 0);
72
73 result_type result;
74 result.reserve(result_size);
75 std::copy(data_.begin(), data_.end(), std::back_inserter(result));
76 if (include_partial)
77 result.emplace_back(static_cast<const T&>(*partial_item_));
78
79 return result;
80}
81
82template<typename T, typename A>
83void ebpps_sample<T,A>::downsample(double theta) {
84 if (theta >= 1.0) return;
85
86 const double new_c = theta * c_;
87 double new_c_int;
88 const double new_c_frac = std::modf(new_c, &new_c_int);
89 double c_int;
90 const double c_frac = std::modf(c_, &c_int);
91
92 if (new_c_int == 0.0) {
93 // no full items retained
94 if (next_double() > (c_frac / c_)) {
95 swap_with_partial();
96 }
97 data_.clear();
98 } else if (new_c_int == c_int) {
99 // no items deleted
100 if (next_double() > (1 - theta * c_frac)/(1 - new_c_frac)) {
101 swap_with_partial();
102 }
103 } else {
104 if (next_double() < theta * c_frac) {
105 // subsample data in random order; last item is partial
106 // create sample size new_c_int then swap_with_partial()
107 subsample(static_cast<uint32_t>(new_c_int));
108 swap_with_partial();
109 } else {
110 // create sample size new_c_int + 1 then move_one_to_partial)
111 subsample(static_cast<uint32_t>(new_c_int) + 1);
112 move_one_to_partial();
113 }
114 }
115
116 if (new_c == new_c_int)
117 partial_item_.reset();
118
119 c_ = new_c;
120}
121
122template<typename T, typename A>
123template<typename FwdSample>
124void ebpps_sample<T,A>::merge(FwdSample&& other) {
125 double c_int;
126 const double c_frac = std::modf(c_, &c_int);
127
128 double unused;
129 const double other_c_frac = std::modf(other.c_, &unused);
130
131 // update c_ here but do NOT recompute fractional part yet
132 c_ += other.c_;
133
134 for (uint32_t i = 0; i < other.data_.size(); ++i)
135 data_.emplace_back(conditional_forward<FwdSample>(other.data_[i]));
136
137 // This modifies the original algorithm slightly due to numeric
138 // precision issues. Specifically, the test if c_frac + other_c_frac == 1.0
139 // happens before tests for < 1.0 or > 1.0 and can also be triggered
140 // if c_ == floor(c_) (the updated value of c_, not the input).
141 //
142 // We can still run into issues where c_frac + other_c_frac == epsilon
143 // and the first case would have ideally triggered. As a result, we must
144 // check if the partial item exists before adding to the data_ vector.
145
146 if (c_frac == 0.0 && other_c_frac == 0.0) {
147 partial_item_.reset();
148 } else if (c_frac + other_c_frac == 1.0 || c_ == std::floor(c_)) {
149 if (next_double() <= c_frac) {
150 if (partial_item_)
151 data_.emplace_back(std::move(*partial_item_));
152 } else {
153 if (other.partial_item_)
154 data_.emplace_back(conditional_forward<FwdSample>(*other.partial_item_));
155 }
156 partial_item_.reset();
157 } else if (c_frac + other_c_frac < 1.0) {
158 if (next_double() > c_frac / (c_frac + other_c_frac)) {
159 set_partial(conditional_forward<FwdSample>(*other.partial_item_));
160 }
161 } else { // c_frac + other_c_frac > 1
162 if (next_double() <= (1 - c_frac) / ((1 - c_frac) + (1 - other_c_frac))) {
163 data_.emplace_back(conditional_forward<FwdSample>(*other.partial_item_));
164 } else {
165 data_.emplace_back(std::move(*partial_item_));
166 partial_item_.reset();
167 set_partial(conditional_forward<FwdSample>(*other.partial_item_));
168 }
169 }
170}
171
172template<typename T, typename A>
173string<A> ebpps_sample<T ,A>::to_string() const {
174 std::ostringstream oss;
175 oss << " sample:" << std::endl;
176 uint32_t idx = 0;
177 for (const T& item : data_)
178 oss << "\t" << idx++ << ":\t" << item << std::endl;
179 oss << " partial: ";
180 if (bool(partial_item_))
181 oss << (*partial_item_) << std::endl;
182 else
183 oss << "NULL" << std::endl;
184
185 return oss.str();
186}
187
188template<typename T, typename A>
189void ebpps_sample<T,A>::subsample(uint32_t num_samples) {
190 // we can perform a Fisher-Yates style shuffle, stopping after
191 // num_samples points since subsequent swaps would only be
192 // between items after num_samples. This is valid since a
193 // point from anywhere in the initial array would be eligible
194 // to end up in the final subsample.
195
196 if (num_samples == data_.size()) return;
197
198 auto erase_start = data_.begin();
199 uint32_t data_len = static_cast<uint32_t>(data_.size());
200 for (uint32_t i = 0; i < num_samples; ++i, ++erase_start) {
201 uint32_t j = i + random_idx(data_len - i);
202 std::swap(data_[i], data_[j]);
203 }
204
205 data_.erase(erase_start, data_.end());
206}
207
208template<typename T, typename A>
209template<typename FwdItem>
210void ebpps_sample<T,A>::set_partial(FwdItem&& item) {
211 if (partial_item_)
212 *partial_item_ = conditional_forward<FwdItem>(item);
213 else
214 partial_item_.emplace(conditional_forward<FwdItem>(item));
215}
216
217template<typename T, typename A>
218void ebpps_sample<T,A>::move_one_to_partial() {
219 const size_t idx = random_idx(static_cast<uint32_t>(data_.size()));
220 // swap selected item to end so we can delete it easily
221 const size_t last_idx = data_.size() - 1;
222 if (idx != last_idx) {
223 std::swap(data_[idx], data_[last_idx]);
224 }
225
226 set_partial(std::move(data_[last_idx]));
227
228 data_.pop_back();
229}
230
231template<typename T, typename A>
232void ebpps_sample<T,A>::swap_with_partial() {
233 if (partial_item_) {
234 const size_t idx = random_idx(static_cast<uint32_t>(data_.size()));
235 std::swap(data_[idx], *partial_item_);
236 } else {
237 move_one_to_partial();
238 }
239}
240
241template<typename T, typename A>
242void ebpps_sample<T,A>::reset() {
243 c_ = 0.0;
244 partial_item_.reset();
245 data_.clear();
246}
247
248template<typename T, typename A>
249double ebpps_sample<T,A>::get_c() const {
250 return c_;
251}
252
253template<typename T, typename A>
254auto ebpps_sample<T,A>::get_full_items() const -> result_type {
255 return result_type(data_);
256}
257
258template<typename T, typename A>
259bool ebpps_sample<T,A>::has_partial_item() const {
260 return bool(partial_item_);
261}
262
263template<typename T, typename A>
264T ebpps_sample<T,A>::get_partial_item() const {
265 if (!partial_item_) throw std::runtime_error("Call to get_partial_item() when no partial item exists");
266 return *partial_item_;
267}
268
269template<typename T, typename A>
270uint32_t ebpps_sample<T,A>::random_idx(uint32_t max) {
271 static std::uniform_int_distribution<uint32_t> dist;
272 return dist(random_utils::rand, std::uniform_int_distribution<uint32_t>::param_type(0, max - 1));
273}
274
275template<typename T, typename A>
276double ebpps_sample<T,A>::next_double() {
277 return random_utils::next_double(random_utils::rand);
278}
279
280template<typename T, typename A>
281uint32_t ebpps_sample<T,A>::get_num_retained_items() const {
282 return static_cast<uint32_t>(data_.size() + (partial_item_ ? 1 : 0));
283}
284
285// implementation for fixed-size arithmetic types (integral and floating point)
286template<typename T, typename A>
287template<typename TT, typename SerDe, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
288size_t ebpps_sample<T, A>::get_serialized_size_bytes(const SerDe&) const {
289 if (c_ == 0.0)
290 return 0;
291 else
292 return sizeof(c_) + (data_.size() + (partial_item_ ? 1 : 0)) * sizeof(T);
293}
294
295// implementation for all other types
296template<typename T, typename A>
297template<typename TT, typename SerDe, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
298size_t ebpps_sample<T, A>::get_serialized_size_bytes(const SerDe& sd) const {
299 if (c_ == 0.0) return 0;
300
301 size_t num_bytes = sizeof(c_);
302 for (auto it : data_)
303 num_bytes += sd.size_of_item(it);
304
305 if (partial_item_)
306 num_bytes += sd.size_of_item(*partial_item_);
307
308 return num_bytes;
309}
310
311template<typename T, typename A>
312template<typename SerDe>
313size_t ebpps_sample<T,A>::serialize(uint8_t* ptr, const uint8_t* end_ptr, const SerDe& sd) const {
314 uint8_t* st_ptr = ptr;
315
316 ensure_minimum_memory(end_ptr - ptr, sizeof(c_));
317 ptr += copy_to_mem(c_, ptr);
318
319 ptr += sd.serialize(ptr, end_ptr - ptr, data_.data(), static_cast<unsigned>(data_.size()));
320
321 if (partial_item_) {
322 ptr += sd.serialize(ptr, end_ptr - ptr, &*partial_item_, 1);
323 }
324
325 return ptr - st_ptr;
326}
327
328template<typename T, typename A>
329template<typename SerDe>
330void ebpps_sample<T,A>::serialize(std::ostream& os, const SerDe& sd) const {
331 write(os, c_);
332
333 sd.serialize(os, data_.data(), static_cast<unsigned>(data_.size()));
334
335 if (partial_item_)
336 sd.serialize(os, &*partial_item_, 1);
337
338 if (!os.good()) throw std::runtime_error("error writing to std::ostream");
339}
340
341template<typename T, typename A>
342template<typename SerDe>
343std::pair<ebpps_sample<T, A>, size_t> ebpps_sample<T, A>::deserialize(const uint8_t* ptr, size_t size, const SerDe& sd, const A& allocator) {
344 const uint8_t* st_ptr = ptr;
345 const uint8_t* end_ptr = ptr + size;
346
347 ensure_minimum_memory(size, sizeof(double));
348 double c;
349 ptr += copy_from_mem(ptr, c);
350 if (c < 0.0)
351 throw std::runtime_error("sketch image has C < 0.0 during deserializaiton");
352
353 double c_int;
354 const double c_frac = std::modf(c, &c_int);
355 const bool has_partial = c_frac != 0.0;
356
357 const uint32_t num_full_items = static_cast<uint32_t>(c_int);
358 A alloc(allocator);
359 std::unique_ptr<T, items_deleter> items(alloc.allocate(num_full_items), items_deleter(allocator, false, num_full_items));
360 ptr += sd.deserialize(ptr, end_ptr - ptr, items.get(), num_full_items);
361 // serde did not throw, enable destructors
362 items.get_deleter().set_destroy(true);
363 std::vector<T, A> data(std::make_move_iterator(items.get()),
364 std::make_move_iterator(items.get() + num_full_items),
365 allocator);
366
367 optional<T> partial_item;
368 if (has_partial) {
369 // Space to deserialize.
370 // serde::deserialize expects allocated but not initialized storage.
371 typename std::aligned_storage<sizeof(T), alignof(T)>::type tmp_storage;
372 T* tmp = reinterpret_cast<T*>(&tmp_storage);
373
374 ptr += sd.deserialize(ptr, end_ptr - ptr, tmp, 1);
375 // serde did not throw so place item and clean up
376 partial_item.emplace(std::move(*tmp));
377 tmp->~T();
378 }
379
380 return std::pair<ebpps_sample<T,A>, size_t>(
381 ebpps_sample<T,A>(std::move(data), std::move(partial_item), c, allocator),
382 ptr - st_ptr);
383}
384
385template<typename T, typename A>
386template<typename SerDe>
387ebpps_sample<T, A> ebpps_sample<T, A>::deserialize(std::istream& is, const SerDe& sd, const A& allocator) {
388 const double c = read<double>(is);
389 if (c < 0.0)
390 throw std::runtime_error("sketch image has C < 0.0 during deserializaiton");
391
392 double c_int;
393 const double c_frac = std::modf(c, &c_int);
394 const bool has_partial = c_frac != 0.0;
395
396 const uint32_t num_full_items = static_cast<uint32_t>(c_int);
397 A alloc(allocator);
398 std::unique_ptr<T, items_deleter> items(alloc.allocate(num_full_items), items_deleter(allocator, false, num_full_items));
399 sd.deserialize(is, items.get(), num_full_items);
400 // serde did not throw, enable destructors
401 items.get_deleter().set_destroy(true);
402 std::vector<T, A> data(std::make_move_iterator(items.get()),
403 std::make_move_iterator(items.get() + num_full_items),
404 allocator);
405
406 optional<T> partial_item;
407 if (has_partial) {
408 // Space to deserialize.
409 // serde::deserialize expects allocated but not initialized storage.
410 typename std::aligned_storage<sizeof(T), alignof(T)>::type tmp_storage;
411 T* tmp = reinterpret_cast<T*>(&tmp_storage);
412
413 sd.deserialize(is, tmp, 1);
414 // serde did not throw so place item and clean up
415 partial_item.emplace(std::move(*tmp));
416 tmp->~T();
417 }
418
419 if (!is.good()) throw std::runtime_error("error reading from std::istream");
420
421 return ebpps_sample<T,A>(std::move(data), std::move(partial_item), c, allocator);
422}
423
424
425template<typename T, typename A>
426typename ebpps_sample<T, A>::const_iterator ebpps_sample<T, A>::begin() const {
427 return const_iterator(this);
428}
429
430template<typename T, typename A>
431typename ebpps_sample<T, A>::const_iterator ebpps_sample<T, A>::end() const {
432 return const_iterator(nullptr);
433}
434
435
436// -------- ebpps_sketch::const_iterator implementation ---------
437
438template<typename T, typename A>
439ebpps_sample<T, A>::const_iterator::const_iterator(const ebpps_sample* sample) :
440 sample_(sample),
441 idx_(0),
442 use_partial_(false)
443{
444 if (sample == nullptr)
445 return;
446
447 // determine in advance if we use the partial item
448 double c_int;
449 const double c_frac = std::modf(sample_->get_c(), &c_int);
450 use_partial_ = sample->next_double() < c_frac;
451
452 // sample with no items
453 if (sample_->data_.size() == 0 && use_partial_) {
454 idx_ = PARTIAL_IDX;
455 }
456
457 if (sample_->c_== 0.0 || (sample_->data_.size() == 0 && !sample_->has_partial_item())) { sample_ = nullptr; }
458}
459
460template<typename T, typename A>
461ebpps_sample<T, A>::const_iterator::const_iterator(const const_iterator& other) :
462 sample_(other.sample_),
463 idx_(other.idx_),
464 use_partial_(other.use_partial_)
465{}
466
467template<typename T, typename A>
468typename ebpps_sample<T, A>::const_iterator& ebpps_sample<T, A>::const_iterator::operator++() {
469 if (sample_ == nullptr)
470 return *this;
471 else if (idx_ == PARTIAL_IDX) {
472 idx_ = sample_->data_.size();
473 sample_ = nullptr;
474 return * this;
475 }
476
477 ++idx_;
478
479 if (idx_ == sample_->data_.size()) {
480 if (use_partial_)
481 idx_ = PARTIAL_IDX;
482 else
483 sample_ = nullptr;
484 }
485
486 return *this;
487}
488
489template<typename T, typename A>
490typename ebpps_sample<T, A>::const_iterator& ebpps_sample<T, A>::const_iterator::operator++(int) {
491 const_iterator tmp(*this);
492 operator++();
493 return tmp;
494}
495
496template<typename T, typename A>
497bool ebpps_sample<T, A>::const_iterator::operator==(const const_iterator& other) const {
498 if (sample_ != other.sample_) return false;
499 if (sample_ == nullptr) return true; // end (and we know other.sample_ is also null)
500 return idx_ == other.idx_;
501}
502
503template<typename T, typename A>
504bool ebpps_sample<T, A>::const_iterator::operator!=(const const_iterator& other) const {
505 return !operator==(other);
506}
507
508template<typename T, typename A>
509auto ebpps_sample<T, A>::const_iterator::operator*() const -> reference {
510 if (idx_ == PARTIAL_IDX)
511 return *(sample_->partial_item_);
512 else
513 return sample_->data_[idx_];
514}
515
516template<typename T, typename A>
517auto ebpps_sample<T, A>::const_iterator::operator->() const -> pointer {
518 return **this;
519}
520
521template<typename T, typename A>
522class ebpps_sample<T, A>::items_deleter {
523 public:
524 items_deleter(const A& allocator, bool destroy, size_t num): allocator_(allocator), destroy_(destroy), num_(num) {}
525 void operator() (T* ptr) {
526 if (ptr != nullptr) {
527 if (destroy_) {
528 for (size_t i = 0; i < num_; ++i) {
529 ptr[i].~T();
530 }
531 }
532 allocator_.deallocate(ptr, num_);
533 }
534 }
535 void set_destroy(bool destroy) { destroy_ = destroy; }
536 private:
537 A allocator_;
538 bool destroy_;
539 size_t num_;
540};
541
542
543} // namespace datasketches
544
545#endif // _EBPPS_SAMPLE_IMPL_HPP_
DataSketches namespace.
Definition binomial_bounds.hpp:38