datasketches-cpp
req_compactor_impl.hpp
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef REQ_COMPACTOR_IMPL_HPP_
21 #define REQ_COMPACTOR_IMPL_HPP_
22 
23 #include <stdexcept>
24 #include <cmath>
25 #include <algorithm>
26 
27 #include "count_zeros.hpp"
28 #include "conditional_forward.hpp"
29 #include "common_defs.hpp"
30 
31 namespace datasketches {
32 
33 template<typename T, typename C, typename A>
34 req_compactor<T, C, A>::req_compactor(bool hra, uint8_t lg_weight, uint32_t section_size,
35  const C& comparator, const A& allocator, bool sorted):
36 comparator_(comparator),
37 allocator_(allocator),
38 lg_weight_(lg_weight),
39 hra_(hra),
40 coin_(false),
41 sorted_(sorted),
42 section_size_raw_(static_cast<float>(section_size)),
43 section_size_(section_size),
44 num_sections_(req_constants::INIT_NUM_SECTIONS),
45 state_(0),
46 num_items_(0),
47 capacity_(2 * get_nom_capacity()),
48 items_(allocator_.allocate(capacity_))
49 {}
50 
51 template<typename T, typename C, typename A>
52 req_compactor<T, C, A>::~req_compactor() {
53  if (items_ != nullptr) {
54  for (auto it = begin(); it != end(); ++it) (*it).~T();
55  allocator_.deallocate(items_, capacity_);
56  }
57 }
58 
59 template<typename T, typename C, typename A>
60 req_compactor<T, C, A>::req_compactor(const req_compactor& other):
61 comparator_(other.comparator_),
62 allocator_(other.allocator_),
63 lg_weight_(other.lg_weight_),
64 hra_(other.hra_),
65 coin_(other.coin_),
66 sorted_(other.sorted_),
67 section_size_raw_(other.section_size_raw_),
68 section_size_(other.section_size_),
69 num_sections_(other.num_sections_),
70 state_(other.state_),
71 num_items_(other.num_items_),
72 capacity_(other.capacity_),
73 items_(nullptr)
74 {
75  if (other.items_ != nullptr) {
76  items_ = allocator_.allocate(capacity_);
77  const uint32_t from = hra_ ? capacity_ - num_items_ : 0;
78  const uint32_t to = hra_ ? capacity_ : num_items_;
79  for (uint32_t i = from; i < to; ++i) new (items_ + i) T(other.items_[i]);
80  }
81 }
82 
83 template<typename T, typename C, typename A>
84 req_compactor<T, C, A>::req_compactor(req_compactor&& other) noexcept :
85 comparator_(std::move(other.comparator_)),
86 allocator_(std::move(other.allocator_)),
87 lg_weight_(other.lg_weight_),
88 hra_(other.hra_),
89 coin_(other.coin_),
90 sorted_(other.sorted_),
91 section_size_raw_(other.section_size_raw_),
92 section_size_(other.section_size_),
93 num_sections_(other.num_sections_),
94 state_(other.state_),
95 num_items_(other.num_items_),
96 capacity_(other.capacity_),
97 items_(other.items_)
98 {
99  other.items_ = nullptr;
100 }
101 
102 template<typename T, typename C, typename A>
103 req_compactor<T, C, A>& req_compactor<T, C, A>::operator=(const req_compactor& other) {
104  req_compactor copy(other);
105  std::swap(comparator_, copy.comparator_);
106  std::swap(allocator_, copy.allocator_);
107  std::swap(lg_weight_, copy.lg_weight_);
108  std::swap(hra_, copy.hra_);
109  std::swap(coin_, copy.coin_);
110  std::swap(sorted_, copy.sorted_);
111  std::swap(section_size_raw_, copy.section_size_raw_);
112  std::swap(section_size_, copy.section_size_);
113  std::swap(num_sections_, copy.num_sections_);
114  std::swap(state_, copy.state_);
115  std::swap(num_items_, copy.num_items_);
116  std::swap(capacity_, copy.capacity_);
117  std::swap(items_, copy.items_);
118  return *this;
119 }
120 
121 template<typename T, typename C, typename A>
122 req_compactor<T, C, A>& req_compactor<T, C, A>::operator=(req_compactor&& other) {
123  std::swap(comparator_, other.comparator_);
124  std::swap(allocator_, other.allocator_);
125  std::swap(lg_weight_, other.lg_weight_);
126  std::swap(hra_, other.hra_);
127  std::swap(coin_, other.coin_);
128  std::swap(sorted_, other.sorted_);
129  std::swap(section_size_raw_, other.section_size_raw_);
130  std::swap(section_size_, other.section_size_);
131  std::swap(num_sections_, other.num_sections_);
132  std::swap(state_, other.state_);
133  std::swap(num_items_, other.num_items_);
134  std::swap(capacity_, other.capacity_);
135  std::swap(items_, other.items_);
136  return *this;
137 }
138 
139 template<typename T, typename C, typename A>
140 template<typename TT, typename CC, typename AA>
141 req_compactor<T, C, A>::req_compactor(const req_compactor<TT, CC, AA>& other, const C& comparator, const A& allocator):
142 comparator_(comparator),
143 allocator_(allocator),
144 lg_weight_(other.lg_weight_),
145 hra_(other.hra_),
146 coin_(other.coin_),
147 sorted_(other.sorted_),
148 section_size_raw_(other.section_size_raw_),
149 section_size_(other.section_size_),
150 num_sections_(other.num_sections_),
151 state_(other.state_),
152 num_items_(other.num_items_),
153 capacity_(other.capacity_),
154 items_(nullptr)
155 {
156  if (other.items_ != nullptr) {
157  items_ = allocator_.allocate(capacity_);
158  const uint32_t from = hra_ ? capacity_ - num_items_ : 0;
159  const uint32_t to = hra_ ? capacity_ : num_items_;
160  for (uint32_t i = from; i < to; ++i) new (items_ + i) T(other.items_[i]);
161  if (sorted_ && !std::is_sorted(items_ + from, items_ + to, comparator_)) {
162  throw std::logic_error("items must be sorted");
163  }
164  }
165 }
166 
167 template<typename T, typename C, typename A>
168 bool req_compactor<T, C, A>::is_sorted() const {
169  return sorted_;
170 }
171 
172 template<typename T, typename C, typename A>
173 uint32_t req_compactor<T, C, A>::get_num_items() const {
174  return num_items_;
175 }
176 
177 template<typename T, typename C, typename A>
178 uint32_t req_compactor<T, C, A>::get_nom_capacity() const {
179  return req_constants::MULTIPLIER * num_sections_ * section_size_;
180 }
181 
182 template<typename T, typename C, typename A>
183 uint8_t req_compactor<T, C, A>::get_lg_weight() const {
184  return lg_weight_;
185 }
186 
187 template<typename T, typename C, typename A>
188 uint64_t req_compactor<T, C, A>::compute_weight(const T& item, bool inclusive) const {
189  if (!sorted_) const_cast<req_compactor*>(this)->sort(); // allow sorting as a side effect
190  auto it = inclusive ?
191  std::upper_bound(begin(), end(), item, comparator_) :
192  std::lower_bound(begin(), end(), item, comparator_);
193  return std::distance(begin(), it) << lg_weight_;
194 }
195 
196 template<typename T, typename C, typename A>
197 template<typename FwdT>
198 void req_compactor<T, C, A>::append(FwdT&& item) {
199  if (num_items_ == capacity_) grow(capacity_ + get_nom_capacity());
200  const uint32_t i = hra_ ? capacity_ - num_items_ - 1 : num_items_;
201  new (items_ + i) T(std::forward<FwdT>(item));
202  ++num_items_;
203  if (num_items_ > 1) sorted_ = false;
204 }
205 
206 template<typename T, typename C, typename A>
207 void req_compactor<T, C, A>::grow(uint32_t new_capacity) {
208  T* new_items = allocator_.allocate(new_capacity);
209  uint32_t new_i = hra_ ? new_capacity - num_items_ : 0;
210  for (auto it = begin(); it != end(); ++it, ++new_i) {
211  new (new_items + new_i) T(std::move(*it));
212  (*it).~T();
213  }
214  allocator_.deallocate(items_, capacity_);
215  items_ = new_items;
216  capacity_ = new_capacity;
217 }
218 
219 template<typename T, typename C, typename A>
220 void req_compactor<T, C, A>::ensure_space(uint32_t num) {
221  if (num_items_ + num > capacity_) grow(num_items_ + num + get_nom_capacity());
222 }
223 
224 template<typename T, typename C, typename A>
225 const T* req_compactor<T, C, A>::begin() const {
226  return items_ + (hra_ ? capacity_ - num_items_ : 0);
227 }
228 
229 template<typename T, typename C, typename A>
230 const T* req_compactor<T, C, A>::end() const {
231  return items_ + (hra_ ? capacity_ : num_items_);
232 }
233 
234 template<typename T, typename C, typename A>
235 T* req_compactor<T, C, A>::begin() {
236  return items_ + (hra_ ? capacity_ - num_items_ : 0);
237 }
238 
239 template<typename T, typename C, typename A>
240 T* req_compactor<T, C, A>::end() {
241  return items_ + (hra_ ? capacity_ : num_items_);
242 }
243 
244 template<typename T, typename C, typename A>
245 template<typename FwdC>
246 void req_compactor<T, C, A>::merge(FwdC&& other) {
247  // TODO: swap if other is larger?
248  if (lg_weight_ != other.lg_weight_) throw std::logic_error("weight mismatch");
249  state_ |= other.state_;
250  while (ensure_enough_sections()) {}
251  ensure_space(other.get_num_items());
252  sort();
253  auto offset = hra_ ? capacity_ - num_items_ : num_items_;
254  auto from = hra_ ? begin() - other.get_num_items() : end();
255  auto to = from + other.get_num_items();
256  auto other_it = other.begin();
257  for (auto it = from; it != to; ++it, ++other_it) new (it) T(conditional_forward<FwdC>(*other_it));
258  if (!other.sorted_) std::sort(from, to, comparator_);
259  if (num_items_ > 0) std::inplace_merge(hra_ ? from : begin(), items_ + offset, hra_ ? end() : to, C());
260  num_items_ += other.get_num_items();
261 }
262 
263 template<typename T, typename C, typename A>
264 void req_compactor<T, C, A>::sort() {
265  if (!sorted_) {
266  std::sort(begin(), end(), comparator_);
267  sorted_ = true;
268  }
269 }
270 
271 template<typename T, typename C, typename A>
272 std::pair<uint32_t, uint32_t> req_compactor<T, C, A>::compact(req_compactor& next) {
273  const uint32_t starting_nom_capacity = get_nom_capacity();
274  // choose a part of the buffer to compact
275  const uint32_t secs_to_compact = std::min<uint32_t>(count_trailing_zeros_in_u64(~state_) + 1, num_sections_);
276  auto compaction_range = compute_compaction_range(secs_to_compact);
277  if (compaction_range.second - compaction_range.first < 2) throw std::logic_error("compaction range error");
278 
279  if ((state_ & 1) == 1) { coin_ = !coin_; } // for odd flip coin;
280  else { coin_ = random_utils::random_bit(); } // random coin flip
281 
282  const auto num = (compaction_range.second - compaction_range.first) / 2;
283  next.ensure_space(num);
284  auto next_middle = hra_ ? next.begin() : next.end();
285  auto next_empty = hra_ ? next.begin() - num : next.end();
286  promote_evens_or_odds(begin() + compaction_range.first, begin() + compaction_range.second, coin_, next_empty);
287  next.num_items_ += num;
288  std::inplace_merge(next.begin(), next_middle, next.end(), comparator_);
289  for (size_t i = compaction_range.first; i < compaction_range.second; ++i) (*(begin() + i)).~T();
290  num_items_ -= compaction_range.second - compaction_range.first;
291 
292  ++state_;
293  ensure_enough_sections();
294  return std::pair<uint32_t, uint32_t>(
295  num,
296  get_nom_capacity() - starting_nom_capacity
297  );
298 }
299 
300 template<typename T, typename C, typename A>
301 bool req_compactor<T, C, A>::ensure_enough_sections() {
302  const float ssr = section_size_raw_ / sqrtf(2);
303  const uint32_t ne = nearest_even(ssr);
304  if (state_ >= static_cast<uint64_t>(1ULL << (num_sections_ - 1)) && ne >= req_constants::MIN_K) {
305  section_size_raw_ = ssr;
306  section_size_ = ne;
307  num_sections_ <<= 1;
308  if (capacity_ < 2 * get_nom_capacity()) grow(2 * get_nom_capacity());
309  return true;
310  }
311  return false;
312 }
313 
314 template<typename T, typename C, typename A>
315 std::pair<uint32_t, uint32_t> req_compactor<T, C, A>::compute_compaction_range(uint32_t secs_to_compact) const {
316  uint32_t non_compact = get_nom_capacity() / 2 + (num_sections_ - secs_to_compact) * section_size_;
317  // make compacted region even
318  if (((num_items_ - non_compact) & 1) == 1) ++non_compact;
319  const uint32_t low = hra_ ? 0 : non_compact;
320  const uint32_t high = hra_ ? num_items_ - non_compact : num_items_;
321  return std::pair<uint32_t, uint32_t>(low, high);
322 }
323 
324 template<typename T, typename C, typename A>
325 uint32_t req_compactor<T, C, A>::nearest_even(float value) {
326  return static_cast<uint32_t>(round(value / 2)) << 1;
327 }
328 
329 template<typename T, typename C, typename A>
330 template<typename InIter, typename OutIter>
331 void req_compactor<T, C, A>::promote_evens_or_odds(InIter from, InIter to, bool odds, OutIter dst) {
332  if (from == to) return;
333  InIter i = from;
334  if (odds) ++i;
335  while (i != to) {
336  new (dst) T(std::move(*i));
337  ++dst;
338  ++i;
339  if (i == to) break;
340  ++i;
341  }
342 }
343 
344 // implementation for fixed-size arithmetic types (integral and floating point)
345 template<typename T, typename C, typename A>
346 template<typename S, typename TT, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
347 size_t req_compactor<T, C, A>::get_serialized_size_bytes(const S&) const {
348  return sizeof(state_) + sizeof(section_size_raw_) + sizeof(lg_weight_) + sizeof(num_sections_) +
349  sizeof(uint16_t) + // padding
350  sizeof(uint32_t) + // num_items
351  sizeof(TT) * num_items_;
352 }
353 
354 // implementation for all other types
355 template<typename T, typename C, typename A>
356 template<typename S, typename TT, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
357 size_t req_compactor<T, C, A>::get_serialized_size_bytes(const S& serde) const {
358  size_t size = sizeof(state_) + sizeof(section_size_raw_) + sizeof(lg_weight_) + sizeof(num_sections_) +
359  sizeof(uint16_t) + // padding
360  sizeof(uint32_t); // num_items
361  for (auto it = begin(); it != end(); ++it) size += serde.size_of_item(*it);
362  return size;
363 }
364 
365 template<typename T, typename C, typename A>
366 template<typename S>
367 void req_compactor<T, C, A>::serialize(std::ostream& os, const S& serde) const {
368  write(os, state_);
369  write(os, section_size_raw_);
370  write(os, lg_weight_);
371  write(os, num_sections_);
372  const uint16_t padding = 0;
373  write(os, padding);
374  write(os, num_items_);
375  serde.serialize(os, begin(), num_items_);
376 }
377 
378 template<typename T, typename C, typename A>
379 template<typename S>
380 size_t req_compactor<T, C, A>::serialize(void* dst, size_t capacity, const S& serde) const {
381  uint8_t* ptr = static_cast<uint8_t*>(dst);
382  const uint8_t* end_ptr = ptr + capacity;
383  ptr += copy_to_mem(state_, ptr);
384  ptr += copy_to_mem(section_size_raw_, ptr);
385  ptr += copy_to_mem(lg_weight_, ptr);
386  ptr += copy_to_mem(num_sections_, ptr);
387  const uint16_t padding = 0;
388  ptr += copy_to_mem(padding, ptr);
389  ptr += copy_to_mem(num_items_, ptr);
390  ptr += serde.serialize(ptr, end_ptr - ptr, begin(), num_items_);
391  return ptr - static_cast<uint8_t*>(dst);
392 }
393 
394 template<typename T, typename C, typename A>
395 template<typename S>
396 req_compactor<T, C, A> req_compactor<T, C, A>::deserialize(std::istream& is, const S& serde,
397  const C& comparator, const A& allocator, bool sorted, bool hra) {
398  auto state = read<decltype(state_)>(is);
399  auto section_size_raw = read<decltype(section_size_raw_)>(is);
400  auto lg_weight = read<decltype(lg_weight_)>(is);
401  auto num_sections = read<decltype(num_sections_)>(is);
402  read<uint16_t>(is); // padding
403  auto num_items = read<uint32_t>(is);
404  auto items = deserialize_items(is, serde, allocator, num_items);
405  return req_compactor(hra, lg_weight, sorted, section_size_raw, num_sections, state, std::move(items), num_items,
406  comparator, allocator);
407 }
408 
409 template<typename T, typename C, typename A>
410 template<typename S>
411 req_compactor<T, C, A> req_compactor<T, C, A>::deserialize(std::istream& is, const S& serde,
412  const C& comparator, const A& allocator, bool sorted, uint16_t k, uint8_t num_items, bool hra) {
413  auto items = deserialize_items(is, serde, allocator, num_items);
414  return req_compactor(hra, 0, sorted, k, req_constants::INIT_NUM_SECTIONS, 0, std::move(items), num_items,
415  comparator, allocator);
416 }
417 
418 template<typename T, typename C, typename A>
419 template<typename S>
420 auto req_compactor<T, C, A>::deserialize_items(std::istream& is, const S& serde, const A& allocator, uint32_t num)
421 -> std::unique_ptr<T, items_deleter> {
422  A alloc(allocator);
423  std::unique_ptr<T, items_deleter> items(alloc.allocate(num), items_deleter(allocator, false, num));
424  serde.deserialize(is, items.get(), num);
425  // serde did not throw, enable destructors
426  items.get_deleter().set_destroy(true);
427  if (!is.good()) throw std::runtime_error("error reading from std::istream");
428  return items;
429 }
430 
431 template<typename T, typename C, typename A>
432 template<typename S>
433 std::pair<req_compactor<T, C, A>, size_t> req_compactor<T, C, A>::deserialize(const void* bytes, size_t size,
434  const S& serde, const C& comparator, const A& allocator, bool sorted, bool hra) {
435  ensure_minimum_memory(size, 8);
436  const char* ptr = static_cast<const char*>(bytes);
437  const char* end_ptr = static_cast<const char*>(bytes) + size;
438 
439  uint64_t state;
440  ptr += copy_from_mem(ptr, state);
441  float section_size_raw;
442  ptr += copy_from_mem(ptr, section_size_raw);
443  uint8_t lg_weight;
444  ptr += copy_from_mem(ptr, lg_weight);
445  uint8_t num_sections;
446  ptr += copy_from_mem(ptr, num_sections);
447  ptr += 2; // padding
448  uint32_t num_items;
449  ptr += copy_from_mem(ptr, num_items);
450  auto pair = deserialize_items(ptr, end_ptr - ptr, serde, allocator, num_items);
451  ptr += pair.second;
452  return std::pair<req_compactor, size_t>(
453  req_compactor(hra, lg_weight, sorted, section_size_raw, num_sections, state, std::move(pair.first), num_items,
454  comparator, allocator),
455  ptr - static_cast<const char*>(bytes)
456  );
457 }
458 
459 template<typename T, typename C, typename A>
460 template<typename S>
461 std::pair<req_compactor<T, C, A>, size_t> req_compactor<T, C, A>::deserialize(const void* bytes, size_t size,
462  const S& serde, const C& comparator, const A& allocator, bool sorted, uint16_t k, uint8_t num_items, bool hra) {
463  auto pair = deserialize_items(bytes, size, serde, allocator, num_items);
464  return std::pair<req_compactor, size_t>(
465  req_compactor(hra, 0, sorted, k, req_constants::INIT_NUM_SECTIONS, 0, std::move(pair.first), num_items,
466  comparator, allocator),
467  pair.second
468  );
469 }
470 
471 template<typename T, typename C, typename A>
472 template<typename S>
473 auto req_compactor<T, C, A>::deserialize_items(const void* bytes, size_t size, const S& serde, const A& allocator, uint32_t num)
474 -> std::pair<std::unique_ptr<T, items_deleter>, size_t> {
475  const char* ptr = static_cast<const char*>(bytes);
476  const char* end_ptr = static_cast<const char*>(bytes) + size;
477  A alloc(allocator);
478  std::unique_ptr<T, items_deleter> items(alloc.allocate(num), items_deleter(allocator, false, num));
479  ptr += serde.deserialize(ptr, end_ptr - ptr, items.get(), num);
480  // serde did not throw, enable destructors
481  items.get_deleter().set_destroy(true);
482  return std::pair<std::unique_ptr<T, items_deleter>, size_t>(
483  std::move(items),
484  ptr - static_cast<const char*>(bytes)
485  );
486 }
487 
488 
489 template<typename T, typename C, typename A>
490 req_compactor<T, C, A>::req_compactor(bool hra, uint8_t lg_weight, bool sorted, float section_size_raw, uint8_t num_sections,
491  uint64_t state, std::unique_ptr<T, items_deleter> items, uint32_t num_items, const C& comparator, const A& allocator):
492 comparator_(comparator),
493 allocator_(allocator),
494 lg_weight_(lg_weight),
495 hra_(hra),
496 coin_(random_utils::random_bit()),
497 sorted_(sorted),
498 section_size_raw_(section_size_raw),
499 section_size_(nearest_even(section_size_raw)),
500 num_sections_(num_sections),
501 state_(state),
502 num_items_(num_items),
503 capacity_(num_items),
504 items_(items.release())
505 {}
506 
507 template<typename T, typename C, typename A>
508 class req_compactor<T, C, A>::items_deleter {
509  public:
510  items_deleter(const A& allocator, bool destroy, size_t num): allocator_(allocator), destroy_(destroy), num_(num) {}
511  void operator() (T* ptr) {
512  if (ptr != nullptr) {
513  if (destroy_) {
514  for (size_t i = 0; i < num_; ++i) {
515  ptr[i].~T();
516  }
517  }
518  allocator_.deallocate(ptr, num_);
519  }
520  }
521  void set_destroy(bool destroy) { destroy_ = destroy; }
522  private:
523  A allocator_;
524  bool destroy_;
525  size_t num_;
526 };
527 
528 } /* namespace datasketches */
529 
530 #endif
const uint8_t INIT_NUM_SECTIONS
initial number of sections
Definition: req_common.hpp:35
DataSketches namespace.
Definition: binomial_bounds.hpp:38