datasketches-cpp
Loading...
Searching...
No Matches
req_compactor_impl.hpp
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#ifndef REQ_COMPACTOR_IMPL_HPP_
21#define REQ_COMPACTOR_IMPL_HPP_
22
23#include <stdexcept>
24#include <cmath>
25#include <algorithm>
26
27#include "count_zeros.hpp"
28#include "conditional_forward.hpp"
29#include "common_defs.hpp"
30
31namespace datasketches {
32
33template<typename T, typename C, typename A>
34req_compactor<T, C, A>::req_compactor(bool hra, uint8_t lg_weight, uint32_t section_size,
35 const C& comparator, const A& allocator, bool sorted):
36comparator_(comparator),
37allocator_(allocator),
38lg_weight_(lg_weight),
39hra_(hra),
40coin_(false),
41sorted_(sorted),
42section_size_raw_(static_cast<float>(section_size)),
43section_size_(section_size),
44num_sections_(req_constants::INIT_NUM_SECTIONS),
45state_(0),
46num_items_(0),
47capacity_(2 * get_nom_capacity()),
48items_(allocator_.allocate(capacity_))
49{}
50
51template<typename T, typename C, typename A>
52req_compactor<T, C, A>::~req_compactor() {
53 if (items_ != nullptr) {
54 for (auto it = begin(); it != end(); ++it) (*it).~T();
55 allocator_.deallocate(items_, capacity_);
56 }
57}
58
59template<typename T, typename C, typename A>
60req_compactor<T, C, A>::req_compactor(const req_compactor& other):
61comparator_(other.comparator_),
62allocator_(other.allocator_),
63lg_weight_(other.lg_weight_),
64hra_(other.hra_),
65coin_(other.coin_),
66sorted_(other.sorted_),
67section_size_raw_(other.section_size_raw_),
68section_size_(other.section_size_),
69num_sections_(other.num_sections_),
70state_(other.state_),
71num_items_(other.num_items_),
72capacity_(other.capacity_),
73items_(nullptr)
74{
75 if (other.items_ != nullptr) {
76 items_ = allocator_.allocate(capacity_);
77 const uint32_t from = hra_ ? capacity_ - num_items_ : 0;
78 const uint32_t to = hra_ ? capacity_ : num_items_;
79 for (uint32_t i = from; i < to; ++i) new (items_ + i) T(other.items_[i]);
80 }
81}
82
83template<typename T, typename C, typename A>
84req_compactor<T, C, A>::req_compactor(req_compactor&& other) noexcept :
85comparator_(std::move(other.comparator_)),
86allocator_(std::move(other.allocator_)),
87lg_weight_(other.lg_weight_),
88hra_(other.hra_),
89coin_(other.coin_),
90sorted_(other.sorted_),
91section_size_raw_(other.section_size_raw_),
92section_size_(other.section_size_),
93num_sections_(other.num_sections_),
94state_(other.state_),
95num_items_(other.num_items_),
96capacity_(other.capacity_),
97items_(other.items_)
98{
99 other.items_ = nullptr;
100}
101
102template<typename T, typename C, typename A>
103req_compactor<T, C, A>& req_compactor<T, C, A>::operator=(const req_compactor& other) {
104 req_compactor copy(other);
105 std::swap(comparator_, copy.comparator_);
106 std::swap(allocator_, copy.allocator_);
107 std::swap(lg_weight_, copy.lg_weight_);
108 std::swap(hra_, copy.hra_);
109 std::swap(coin_, copy.coin_);
110 std::swap(sorted_, copy.sorted_);
111 std::swap(section_size_raw_, copy.section_size_raw_);
112 std::swap(section_size_, copy.section_size_);
113 std::swap(num_sections_, copy.num_sections_);
114 std::swap(state_, copy.state_);
115 std::swap(num_items_, copy.num_items_);
116 std::swap(capacity_, copy.capacity_);
117 std::swap(items_, copy.items_);
118 return *this;
119}
120
121template<typename T, typename C, typename A>
122req_compactor<T, C, A>& req_compactor<T, C, A>::operator=(req_compactor&& other) {
123 std::swap(comparator_, other.comparator_);
124 std::swap(allocator_, other.allocator_);
125 std::swap(lg_weight_, other.lg_weight_);
126 std::swap(hra_, other.hra_);
127 std::swap(coin_, other.coin_);
128 std::swap(sorted_, other.sorted_);
129 std::swap(section_size_raw_, other.section_size_raw_);
130 std::swap(section_size_, other.section_size_);
131 std::swap(num_sections_, other.num_sections_);
132 std::swap(state_, other.state_);
133 std::swap(num_items_, other.num_items_);
134 std::swap(capacity_, other.capacity_);
135 std::swap(items_, other.items_);
136 return *this;
137}
138
139template<typename T, typename C, typename A>
140template<typename TT, typename CC, typename AA>
141req_compactor<T, C, A>::req_compactor(const req_compactor<TT, CC, AA>& other, const C& comparator, const A& allocator):
142comparator_(comparator),
143allocator_(allocator),
144lg_weight_(other.lg_weight_),
145hra_(other.hra_),
146coin_(other.coin_),
147sorted_(other.sorted_),
148section_size_raw_(other.section_size_raw_),
149section_size_(other.section_size_),
150num_sections_(other.num_sections_),
151state_(other.state_),
152num_items_(other.num_items_),
153capacity_(other.capacity_),
154items_(nullptr)
155{
156 if (other.items_ != nullptr) {
157 items_ = allocator_.allocate(capacity_);
158 const uint32_t from = hra_ ? capacity_ - num_items_ : 0;
159 const uint32_t to = hra_ ? capacity_ : num_items_;
160 for (uint32_t i = from; i < to; ++i) new (items_ + i) T(other.items_[i]);
161 if (sorted_ && !std::is_sorted(items_ + from, items_ + to, comparator_)) {
162 throw std::logic_error("items must be sorted");
163 }
164 }
165}
166
167template<typename T, typename C, typename A>
168bool req_compactor<T, C, A>::is_sorted() const {
169 return sorted_;
170}
171
172template<typename T, typename C, typename A>
173uint32_t req_compactor<T, C, A>::get_num_items() const {
174 return num_items_;
175}
176
177template<typename T, typename C, typename A>
178uint32_t req_compactor<T, C, A>::get_nom_capacity() const {
179 return req_constants::MULTIPLIER * num_sections_ * section_size_;
180}
181
182template<typename T, typename C, typename A>
183uint8_t req_compactor<T, C, A>::get_lg_weight() const {
184 return lg_weight_;
185}
186
187template<typename T, typename C, typename A>
188uint64_t req_compactor<T, C, A>::compute_weight(const T& item, bool inclusive) const {
189 if (!sorted_) const_cast<req_compactor*>(this)->sort(); // allow sorting as a side effect
190 auto it = inclusive ?
191 std::upper_bound(begin(), end(), item, comparator_) :
192 std::lower_bound(begin(), end(), item, comparator_);
193 return std::distance(begin(), it) << lg_weight_;
194}
195
196template<typename T, typename C, typename A>
197template<typename FwdT>
198void req_compactor<T, C, A>::append(FwdT&& item) {
199 if (num_items_ == capacity_) grow(capacity_ + get_nom_capacity());
200 const uint32_t i = hra_ ? capacity_ - num_items_ - 1 : num_items_;
201 new (items_ + i) T(std::forward<FwdT>(item));
202 ++num_items_;
203 if (num_items_ > 1) sorted_ = false;
204}
205
206template<typename T, typename C, typename A>
207void req_compactor<T, C, A>::grow(uint32_t new_capacity) {
208 T* new_items = allocator_.allocate(new_capacity);
209 uint32_t new_i = hra_ ? new_capacity - num_items_ : 0;
210 for (auto it = begin(); it != end(); ++it, ++new_i) {
211 new (new_items + new_i) T(std::move(*it));
212 (*it).~T();
213 }
214 allocator_.deallocate(items_, capacity_);
215 items_ = new_items;
216 capacity_ = new_capacity;
217}
218
219template<typename T, typename C, typename A>
220void req_compactor<T, C, A>::ensure_space(uint32_t num) {
221 if (num_items_ + num > capacity_) grow(num_items_ + num + get_nom_capacity());
222}
223
224template<typename T, typename C, typename A>
225const T* req_compactor<T, C, A>::begin() const {
226 return items_ + (hra_ ? capacity_ - num_items_ : 0);
227}
228
229template<typename T, typename C, typename A>
230const T* req_compactor<T, C, A>::end() const {
231 return items_ + (hra_ ? capacity_ : num_items_);
232}
233
234template<typename T, typename C, typename A>
235T* req_compactor<T, C, A>::begin() {
236 return items_ + (hra_ ? capacity_ - num_items_ : 0);
237}
238
239template<typename T, typename C, typename A>
240T* req_compactor<T, C, A>::end() {
241 return items_ + (hra_ ? capacity_ : num_items_);
242}
243
244template<typename T, typename C, typename A>
245template<typename FwdC>
246void req_compactor<T, C, A>::merge(FwdC&& other) {
247 // TODO: swap if other is larger?
248 if (lg_weight_ != other.lg_weight_) throw std::logic_error("weight mismatch");
249 state_ |= other.state_;
250 while (ensure_enough_sections()) {}
251 ensure_space(other.get_num_items());
252 sort();
253 auto offset = hra_ ? capacity_ - num_items_ : num_items_;
254 auto from = hra_ ? begin() - other.get_num_items() : end();
255 auto to = from + other.get_num_items();
256 auto other_it = other.begin();
257 for (auto it = from; it != to; ++it, ++other_it) new (it) T(conditional_forward<FwdC>(*other_it));
258 if (!other.sorted_) std::sort(from, to, comparator_);
259 if (num_items_ > 0) std::inplace_merge(hra_ ? from : begin(), items_ + offset, hra_ ? end() : to, C());
260 num_items_ += other.get_num_items();
261}
262
263template<typename T, typename C, typename A>
264void req_compactor<T, C, A>::sort() {
265 if (!sorted_) {
266 std::sort(begin(), end(), comparator_);
267 sorted_ = true;
268 }
269}
270
271template<typename T, typename C, typename A>
272std::pair<uint32_t, uint32_t> req_compactor<T, C, A>::compact(req_compactor& next) {
273 const uint32_t starting_nom_capacity = get_nom_capacity();
274 // choose a part of the buffer to compact
275 const uint32_t secs_to_compact = std::min<uint32_t>(count_trailing_zeros_in_u64(~state_) + 1, num_sections_);
276 auto compaction_range = compute_compaction_range(secs_to_compact);
277 if (compaction_range.second - compaction_range.first < 2) throw std::logic_error("compaction range error");
278
279 if ((state_ & 1) == 1) { coin_ = !coin_; } // for odd flip coin;
280 else { coin_ = random_utils::random_bit(); } // random coin flip
281
282 const auto num = (compaction_range.second - compaction_range.first) / 2;
283 next.ensure_space(num);
284 auto next_middle = hra_ ? next.begin() : next.end();
285 auto next_empty = hra_ ? next.begin() - num : next.end();
286 promote_evens_or_odds(begin() + compaction_range.first, begin() + compaction_range.second, coin_, next_empty);
287 next.num_items_ += num;
288 std::inplace_merge(next.begin(), next_middle, next.end(), comparator_);
289 for (size_t i = compaction_range.first; i < compaction_range.second; ++i) (*(begin() + i)).~T();
290 num_items_ -= compaction_range.second - compaction_range.first;
291
292 ++state_;
293 ensure_enough_sections();
294 return std::pair<uint32_t, uint32_t>(
295 num,
296 get_nom_capacity() - starting_nom_capacity
297 );
298}
299
300template<typename T, typename C, typename A>
301bool req_compactor<T, C, A>::ensure_enough_sections() {
302 const float ssr = section_size_raw_ / sqrtf(2);
303 const uint32_t ne = nearest_even(ssr);
304 if (state_ >= static_cast<uint64_t>(1ULL << (num_sections_ - 1)) && ne >= req_constants::MIN_K) {
305 section_size_raw_ = ssr;
306 section_size_ = ne;
307 num_sections_ <<= 1;
308 if (capacity_ < 2 * get_nom_capacity()) grow(2 * get_nom_capacity());
309 return true;
310 }
311 return false;
312}
313
314template<typename T, typename C, typename A>
315std::pair<uint32_t, uint32_t> req_compactor<T, C, A>::compute_compaction_range(uint32_t secs_to_compact) const {
316 uint32_t non_compact = get_nom_capacity() / 2 + (num_sections_ - secs_to_compact) * section_size_;
317 // make compacted region even
318 if (((num_items_ - non_compact) & 1) == 1) ++non_compact;
319 const uint32_t low = hra_ ? 0 : non_compact;
320 const uint32_t high = hra_ ? num_items_ - non_compact : num_items_;
321 return std::pair<uint32_t, uint32_t>(low, high);
322}
323
324template<typename T, typename C, typename A>
325uint32_t req_compactor<T, C, A>::nearest_even(float value) {
326 return static_cast<uint32_t>(round(value / 2)) << 1;
327}
328
329template<typename T, typename C, typename A>
330template<typename InIter, typename OutIter>
331void req_compactor<T, C, A>::promote_evens_or_odds(InIter from, InIter to, bool odds, OutIter dst) {
332 if (from == to) return;
333 InIter i = from;
334 if (odds) ++i;
335 while (i != to) {
336 new (dst) T(std::move(*i));
337 ++dst;
338 ++i;
339 if (i == to) break;
340 ++i;
341 }
342}
343
344// implementation for fixed-size arithmetic types (integral and floating point)
345template<typename T, typename C, typename A>
346template<typename S, typename TT, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
347size_t req_compactor<T, C, A>::get_serialized_size_bytes(const S&) const {
348 return sizeof(state_) + sizeof(section_size_raw_) + sizeof(lg_weight_) + sizeof(num_sections_) +
349 sizeof(uint16_t) + // padding
350 sizeof(uint32_t) + // num_items
351 sizeof(TT) * num_items_;
352}
353
354// implementation for all other types
355template<typename T, typename C, typename A>
356template<typename S, typename TT, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
357size_t req_compactor<T, C, A>::get_serialized_size_bytes(const S& serde) const {
358 size_t size = sizeof(state_) + sizeof(section_size_raw_) + sizeof(lg_weight_) + sizeof(num_sections_) +
359 sizeof(uint16_t) + // padding
360 sizeof(uint32_t); // num_items
361 for (auto it = begin(); it != end(); ++it) size += serde.size_of_item(*it);
362 return size;
363}
364
365template<typename T, typename C, typename A>
366template<typename S>
367void req_compactor<T, C, A>::serialize(std::ostream& os, const S& serde) const {
368 write(os, state_);
369 write(os, section_size_raw_);
370 write(os, lg_weight_);
371 write(os, num_sections_);
372 const uint16_t padding = 0;
373 write(os, padding);
374 write(os, num_items_);
375 serde.serialize(os, begin(), num_items_);
376}
377
378template<typename T, typename C, typename A>
379template<typename S>
380size_t req_compactor<T, C, A>::serialize(void* dst, size_t capacity, const S& serde) const {
381 uint8_t* ptr = static_cast<uint8_t*>(dst);
382 const uint8_t* end_ptr = ptr + capacity;
383 ptr += copy_to_mem(state_, ptr);
384 ptr += copy_to_mem(section_size_raw_, ptr);
385 ptr += copy_to_mem(lg_weight_, ptr);
386 ptr += copy_to_mem(num_sections_, ptr);
387 const uint16_t padding = 0;
388 ptr += copy_to_mem(padding, ptr);
389 ptr += copy_to_mem(num_items_, ptr);
390 ptr += serde.serialize(ptr, end_ptr - ptr, begin(), num_items_);
391 return ptr - static_cast<uint8_t*>(dst);
392}
393
394template<typename T, typename C, typename A>
395template<typename S>
396req_compactor<T, C, A> req_compactor<T, C, A>::deserialize(std::istream& is, const S& serde,
397 const C& comparator, const A& allocator, bool sorted, bool hra) {
398 auto state = read<decltype(state_)>(is);
399 auto section_size_raw = read<decltype(section_size_raw_)>(is);
400 auto lg_weight = read<decltype(lg_weight_)>(is);
401 auto num_sections = read<decltype(num_sections_)>(is);
402 read<uint16_t>(is); // padding
403 auto num_items = read<uint32_t>(is);
404 auto items = deserialize_items(is, serde, allocator, num_items);
405 return req_compactor(hra, lg_weight, sorted, section_size_raw, num_sections, state, std::move(items), num_items,
406 comparator, allocator);
407}
408
409template<typename T, typename C, typename A>
410template<typename S>
411req_compactor<T, C, A> req_compactor<T, C, A>::deserialize(std::istream& is, const S& serde,
412 const C& comparator, const A& allocator, bool sorted, uint16_t k, uint8_t num_items, bool hra) {
413 auto items = deserialize_items(is, serde, allocator, num_items);
414 return req_compactor(hra, 0, sorted, k, req_constants::INIT_NUM_SECTIONS, 0, std::move(items), num_items,
415 comparator, allocator);
416}
417
418template<typename T, typename C, typename A>
419template<typename S>
420auto req_compactor<T, C, A>::deserialize_items(std::istream& is, const S& serde, const A& allocator, uint32_t num)
421-> std::unique_ptr<T, items_deleter> {
422 A alloc(allocator);
423 std::unique_ptr<T, items_deleter> items(alloc.allocate(num), items_deleter(allocator, false, num));
424 serde.deserialize(is, items.get(), num);
425 // serde did not throw, enable destructors
426 items.get_deleter().set_destroy(true);
427 if (!is.good()) throw std::runtime_error("error reading from std::istream");
428 return items;
429}
430
431template<typename T, typename C, typename A>
432template<typename S>
433std::pair<req_compactor<T, C, A>, size_t> req_compactor<T, C, A>::deserialize(const void* bytes, size_t size,
434 const S& serde, const C& comparator, const A& allocator, bool sorted, bool hra) {
435 ensure_minimum_memory(size, 8);
436 const char* ptr = static_cast<const char*>(bytes);
437 const char* end_ptr = static_cast<const char*>(bytes) + size;
438
439 uint64_t state;
440 ptr += copy_from_mem(ptr, state);
441 float section_size_raw;
442 ptr += copy_from_mem(ptr, section_size_raw);
443 uint8_t lg_weight;
444 ptr += copy_from_mem(ptr, lg_weight);
445 uint8_t num_sections;
446 ptr += copy_from_mem(ptr, num_sections);
447 ptr += 2; // padding
448 uint32_t num_items;
449 ptr += copy_from_mem(ptr, num_items);
450 auto pair = deserialize_items(ptr, end_ptr - ptr, serde, allocator, num_items);
451 ptr += pair.second;
452 return std::pair<req_compactor, size_t>(
453 req_compactor(hra, lg_weight, sorted, section_size_raw, num_sections, state, std::move(pair.first), num_items,
454 comparator, allocator),
455 ptr - static_cast<const char*>(bytes)
456 );
457}
458
459template<typename T, typename C, typename A>
460template<typename S>
461std::pair<req_compactor<T, C, A>, size_t> req_compactor<T, C, A>::deserialize(const void* bytes, size_t size,
462 const S& serde, const C& comparator, const A& allocator, bool sorted, uint16_t k, uint8_t num_items, bool hra) {
463 auto pair = deserialize_items(bytes, size, serde, allocator, num_items);
464 return std::pair<req_compactor, size_t>(
465 req_compactor(hra, 0, sorted, k, req_constants::INIT_NUM_SECTIONS, 0, std::move(pair.first), num_items,
466 comparator, allocator),
467 pair.second
468 );
469}
470
471template<typename T, typename C, typename A>
472template<typename S>
473auto req_compactor<T, C, A>::deserialize_items(const void* bytes, size_t size, const S& serde, const A& allocator, uint32_t num)
474-> std::pair<std::unique_ptr<T, items_deleter>, size_t> {
475 const char* ptr = static_cast<const char*>(bytes);
476 const char* end_ptr = static_cast<const char*>(bytes) + size;
477 A alloc(allocator);
478 std::unique_ptr<T, items_deleter> items(alloc.allocate(num), items_deleter(allocator, false, num));
479 ptr += serde.deserialize(ptr, end_ptr - ptr, items.get(), num);
480 // serde did not throw, enable destructors
481 items.get_deleter().set_destroy(true);
482 return std::pair<std::unique_ptr<T, items_deleter>, size_t>(
483 std::move(items),
484 ptr - static_cast<const char*>(bytes)
485 );
486}
487
488
489template<typename T, typename C, typename A>
490req_compactor<T, C, A>::req_compactor(bool hra, uint8_t lg_weight, bool sorted, float section_size_raw, uint8_t num_sections,
491 uint64_t state, std::unique_ptr<T, items_deleter> items, uint32_t num_items, const C& comparator, const A& allocator):
492comparator_(comparator),
493allocator_(allocator),
494lg_weight_(lg_weight),
495hra_(hra),
496coin_(random_utils::random_bit()),
497sorted_(sorted),
498section_size_raw_(section_size_raw),
499section_size_(nearest_even(section_size_raw)),
500num_sections_(num_sections),
501state_(state),
502num_items_(num_items),
503capacity_(num_items),
504items_(items.release())
505{}
506
507template<typename T, typename C, typename A>
508class req_compactor<T, C, A>::items_deleter {
509 public:
510 items_deleter(const A& allocator, bool destroy, size_t num): allocator_(allocator), destroy_(destroy), num_(num) {}
511 void operator() (T* ptr) {
512 if (ptr != nullptr) {
513 if (destroy_) {
514 for (size_t i = 0; i < num_; ++i) {
515 ptr[i].~T();
516 }
517 }
518 allocator_.deallocate(ptr, num_);
519 }
520 }
521 void set_destroy(bool destroy) { destroy_ = destroy; }
522 private:
523 A allocator_;
524 bool destroy_;
525 size_t num_;
526};
527
528} /* namespace datasketches */
529
530#endif
const uint8_t INIT_NUM_SECTIONS
initial number of sections
Definition req_common.hpp:35
DataSketches namespace.
Definition binomial_bounds.hpp:38