datasketches-cpp
theta_update_sketch_base_impl.hpp
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef THETA_UPDATE_SKETCH_BASE_IMPL_HPP_
21 #define THETA_UPDATE_SKETCH_BASE_IMPL_HPP_
22 
23 #include <iostream>
24 #include <sstream>
25 #include <algorithm>
26 #include <stdexcept>
27 
28 #include "theta_helpers.hpp"
29 
30 namespace datasketches {
31 
32 template<typename EN, typename EK, typename A>
33 theta_update_sketch_base<EN, EK, A>::theta_update_sketch_base(uint8_t lg_cur_size, uint8_t lg_nom_size, resize_factor rf, float p, uint64_t theta, uint64_t seed, const A& allocator, bool is_empty):
34 allocator_(allocator),
35 is_empty_(is_empty),
36 lg_cur_size_(lg_cur_size),
37 lg_nom_size_(lg_nom_size),
38 rf_(rf),
39 p_(p),
40 num_entries_(0),
41 theta_(theta),
42 seed_(seed),
43 entries_(nullptr)
44 {
45  if (lg_cur_size > 0) {
46  const size_t size = 1ULL << lg_cur_size;
47  entries_ = allocator_.allocate(size);
48  for (size_t i = 0; i < size; ++i) EK()(entries_[i]) = 0;
49  }
50 }
51 
52 template<typename EN, typename EK, typename A>
53 theta_update_sketch_base<EN, EK, A>::theta_update_sketch_base(const theta_update_sketch_base& other):
54 allocator_(other.allocator_),
55 is_empty_(other.is_empty_),
56 lg_cur_size_(other.lg_cur_size_),
57 lg_nom_size_(other.lg_nom_size_),
58 rf_(other.rf_),
59 p_(other.p_),
60 num_entries_(other.num_entries_),
61 theta_(other.theta_),
62 seed_(other.seed_),
63 entries_(nullptr)
64 {
65  if (other.entries_ != nullptr) {
66  const size_t size = 1ULL << lg_cur_size_;
67  entries_ = allocator_.allocate(size);
68  for (size_t i = 0; i < size; ++i) {
69  if (EK()(other.entries_[i]) != 0) {
70  new (&entries_[i]) EN(other.entries_[i]);
71  } else {
72  EK()(entries_[i]) = 0;
73  }
74  }
75  }
76 }
77 
78 template<typename EN, typename EK, typename A>
79 theta_update_sketch_base<EN, EK, A>::theta_update_sketch_base(theta_update_sketch_base&& other) noexcept:
80 allocator_(std::move(other.allocator_)),
81 is_empty_(other.is_empty_),
82 lg_cur_size_(other.lg_cur_size_),
83 lg_nom_size_(other.lg_nom_size_),
84 rf_(other.rf_),
85 p_(other.p_),
86 num_entries_(other.num_entries_),
87 theta_(other.theta_),
88 seed_(other.seed_),
89 entries_(other.entries_)
90 {
91  other.entries_ = nullptr;
92 }
93 
94 template<typename EN, typename EK, typename A>
95 theta_update_sketch_base<EN, EK, A>::~theta_update_sketch_base()
96 {
97  if (entries_ != nullptr) {
98  const size_t size = 1ULL << lg_cur_size_;
99  for (size_t i = 0; i < size; ++i) {
100  if (EK()(entries_[i]) != 0) entries_[i].~EN();
101  }
102  allocator_.deallocate(entries_, size);
103  }
104 }
105 
106 template<typename EN, typename EK, typename A>
107 theta_update_sketch_base<EN, EK, A>& theta_update_sketch_base<EN, EK, A>::operator=(const theta_update_sketch_base& other) {
108  theta_update_sketch_base<EN, EK, A> copy(other);
109  std::swap(allocator_, copy.allocator_);
110  std::swap(is_empty_, copy.is_empty_);
111  std::swap(lg_cur_size_, copy.lg_cur_size_);
112  std::swap(lg_nom_size_, copy.lg_nom_size_);
113  std::swap(rf_, copy.rf_);
114  std::swap(p_, copy.p_);
115  std::swap(num_entries_, copy.num_entries_);
116  std::swap(theta_, copy.theta_);
117  std::swap(seed_, copy.seed_);
118  std::swap(entries_, copy.entries_);
119  return *this;
120 }
121 
122 template<typename EN, typename EK, typename A>
123 theta_update_sketch_base<EN, EK, A>& theta_update_sketch_base<EN, EK, A>::operator=(theta_update_sketch_base&& other) {
124  std::swap(allocator_, other.allocator_);
125  std::swap(is_empty_, other.is_empty_);
126  std::swap(lg_cur_size_, other.lg_cur_size_);
127  std::swap(lg_nom_size_, other.lg_nom_size_);
128  std::swap(rf_, other.rf_);
129  std::swap(p_, other.p_);
130  std::swap(num_entries_, other.num_entries_);
131  std::swap(theta_, other.theta_);
132  std::swap(seed_, other.seed_);
133  std::swap(entries_, other.entries_);
134  return *this;
135 }
136 
137 template<typename EN, typename EK, typename A>
138 uint64_t theta_update_sketch_base<EN, EK, A>::hash_and_screen(const void* data, size_t length) {
139  is_empty_ = false;
140  const uint64_t hash = compute_hash(data, length, seed_);
141  if (hash >= theta_) return 0; // hash == 0 is reserved to mark empty slots in the table
142  return hash;
143 }
144 
145 template<typename EN, typename EK, typename A>
146 auto theta_update_sketch_base<EN, EK, A>::find(uint64_t key) const -> std::pair<iterator, bool> {
147  return find(entries_, lg_cur_size_, key);
148 }
149 
150 template<typename EN, typename EK, typename A>
151 auto theta_update_sketch_base<EN, EK, A>::find(EN* entries, uint8_t lg_size, uint64_t key) -> std::pair<iterator, bool> {
152  const uint32_t size = 1 << lg_size;
153  const uint32_t mask = size - 1;
154  const uint32_t stride = get_stride(key, lg_size);
155  uint32_t index = static_cast<uint32_t>(key) & mask;
156  // search for duplicate or zero
157  const uint32_t loop_index = index;
158  do {
159  const uint64_t probe = EK()(entries[index]);
160  if (probe == 0) {
161  return std::pair<iterator, bool>(&entries[index], false);
162  } else if (probe == key) {
163  return std::pair<iterator, bool>(&entries[index], true);
164  }
165  index = (index + stride) & mask;
166  } while (index != loop_index);
167  throw std::logic_error("key not found and no empty slots!");
168 }
169 
170 template<typename EN, typename EK, typename A>
171 template<typename Fwd>
172 void theta_update_sketch_base<EN, EK, A>::insert(iterator it, Fwd&& entry) {
173  new (it) EN(std::forward<Fwd>(entry));
174  ++num_entries_;
175  if (num_entries_ > get_capacity(lg_cur_size_, lg_nom_size_)) {
176  if (lg_cur_size_ <= lg_nom_size_) {
177  resize();
178  } else {
179  rebuild();
180  }
181  }
182 }
183 
184 template<typename EN, typename EK, typename A>
185 auto theta_update_sketch_base<EN, EK, A>::begin() const -> iterator {
186  return entries_;
187 }
188 
189 template<typename EN, typename EK, typename A>
190 auto theta_update_sketch_base<EN, EK, A>::end() const -> iterator {
191  return entries_ + (1ULL << lg_cur_size_);
192 }
193 
194 template<typename EN, typename EK, typename A>
195 uint32_t theta_update_sketch_base<EN, EK, A>::get_capacity(uint8_t lg_cur_size, uint8_t lg_nom_size) {
196  const double fraction = (lg_cur_size <= lg_nom_size) ? RESIZE_THRESHOLD : REBUILD_THRESHOLD;
197  return static_cast<uint32_t>(std::floor(fraction * (1 << lg_cur_size)));
198 }
199 
200 template<typename EN, typename EK, typename A>
201 uint32_t theta_update_sketch_base<EN, EK, A>::get_stride(uint64_t key, uint8_t lg_size) {
202  // odd and independent of index assuming lg_size lowest bits of the key were used for the index
203  return (2 * static_cast<uint32_t>((key >> lg_size) & STRIDE_MASK)) + 1;
204 }
205 
206 template<typename EN, typename EK, typename A>
207 void theta_update_sketch_base<EN, EK, A>::resize() {
208  const size_t old_size = 1ULL << lg_cur_size_;
209  const uint8_t lg_new_size = std::min<uint8_t>(lg_cur_size_ + static_cast<uint8_t>(rf_), lg_nom_size_ + 1);
210  const size_t new_size = 1ULL << lg_new_size;
211  EN* new_entries = allocator_.allocate(new_size);
212  for (size_t i = 0; i < new_size; ++i) EK()(new_entries[i]) = 0;
213  for (size_t i = 0; i < old_size; ++i) {
214  const uint64_t key = EK()(entries_[i]);
215  if (key != 0) {
216  // always finds an empty slot in a larger table
217  new (find(new_entries, lg_new_size, key).first) EN(std::move(entries_[i]));
218  entries_[i].~EN();
219  EK()(entries_[i]) = 0;
220  }
221  }
222  std::swap(entries_, new_entries);
223  lg_cur_size_ = lg_new_size;
224  allocator_.deallocate(new_entries, old_size);
225 }
226 
227 // assumes number of entries > nominal size
228 template<typename EN, typename EK, typename A>
229 void theta_update_sketch_base<EN, EK, A>::rebuild() {
230  const size_t size = 1ULL << lg_cur_size_;
231  const uint32_t nominal_size = 1 << lg_nom_size_;
232 
233  // empty entries have uninitialized payloads
234  // TODO: avoid this for empty or trivial payloads (arithmetic types)
235  consolidate_non_empty(entries_, size, num_entries_);
236 
237  std::nth_element(entries_, entries_ + nominal_size, entries_ + num_entries_, comparator());
238  this->theta_ = EK()(entries_[nominal_size]);
239  EN* old_entries = entries_;
240  const size_t num_old_entries = num_entries_;
241  entries_ = allocator_.allocate(size);
242  for (size_t i = 0; i < size; ++i) EK()(entries_[i]) = 0;
243  num_entries_ = nominal_size;
244  // relies on consolidating non-empty entries to the front
245  for (size_t i = 0; i < nominal_size; ++i) {
246  new (find(EK()(old_entries[i])).first) EN(std::move(old_entries[i]));
247  old_entries[i].~EN();
248  }
249  for (size_t i = nominal_size; i < num_old_entries; ++i) old_entries[i].~EN();
250  allocator_.deallocate(old_entries, size);
251 }
252 
253 template<typename EN, typename EK, typename A>
254 void theta_update_sketch_base<EN, EK, A>::trim() {
255  if (num_entries_ > static_cast<uint32_t>(1 << lg_nom_size_)) rebuild();
256 }
257 
258 template<typename EN, typename EK, typename A>
259 void theta_update_sketch_base<EN, EK, A>::reset() {
260  const size_t cur_size = 1ULL << lg_cur_size_;
261  for (size_t i = 0; i < cur_size; ++i) {
262  if (EK()(entries_[i]) != 0) {
263  entries_[i].~EN();
264  EK()(entries_[i]) = 0;
265  }
266  }
267  const uint8_t starting_lg_size = theta_build_helper<true>::starting_sub_multiple(
268  lg_nom_size_ + 1, theta_constants::MIN_LG_K, static_cast<uint8_t>(rf_));
269  if (starting_lg_size != lg_cur_size_) {
270  allocator_.deallocate(entries_, cur_size);
271  lg_cur_size_ = starting_lg_size;
272  const size_t new_size = 1ULL << starting_lg_size;
273  entries_ = allocator_.allocate(new_size);
274  for (size_t i = 0; i < new_size; ++i) EK()(entries_[i]) = 0;
275  }
276  num_entries_ = 0;
277  theta_ = theta_build_helper<true>::starting_theta_from_p(p_);
278  is_empty_ = true;
279 }
280 
281 template<typename EN, typename EK, typename A>
282 void theta_update_sketch_base<EN, EK, A>::consolidate_non_empty(EN* entries, size_t size, size_t num) {
283  // find the first empty slot
284  size_t i = 0;
285  while (i < size) {
286  if (EK()(entries[i]) == 0) break;
287  ++i;
288  }
289  // scan the rest and move non-empty entries to the front
290  for (size_t j = i + 1; j < size; ++j) {
291  if (EK()(entries[j]) != 0) {
292  new (&entries[i]) EN(std::move(entries[j]));
293  entries[j].~EN();
294  EK()(entries[j]) = 0;
295  ++i;
296  if (i == num) break;
297  }
298  }
299 }
300 
301 // builder
302 
303 template<typename Derived, typename Allocator>
305 allocator_(allocator),
306 lg_k_(theta_constants::DEFAULT_LG_K),
307 rf_(theta_constants::DEFAULT_RESIZE_FACTOR),
308 p_(1),
309 seed_(DEFAULT_SEED) {}
310 
311 template<typename Derived, typename Allocator>
313  if (lg_k < theta_constants::MIN_LG_K) {
314  throw std::invalid_argument("lg_k must not be less than " + std::to_string(theta_constants::MIN_LG_K) + ": " + std::to_string(lg_k));
315  }
316  if (lg_k > theta_constants::MAX_LG_K) {
317  throw std::invalid_argument("lg_k must not be greater than " + std::to_string(theta_constants::MAX_LG_K) + ": " + std::to_string(lg_k));
318  }
319  lg_k_ = lg_k;
320  return static_cast<Derived&>(*this);
321 }
322 
323 template<typename Derived, typename Allocator>
325  rf_ = rf;
326  return static_cast<Derived&>(*this);
327 }
328 
329 template<typename Derived, typename Allocator>
331  if (p <= 0 || p > 1) throw std::invalid_argument("sampling probability must be between 0 and 1");
332  p_ = p;
333  return static_cast<Derived&>(*this);
334 }
335 
336 template<typename Derived, typename Allocator>
338  seed_ = seed;
339  return static_cast<Derived&>(*this);
340 }
341 
342 template<typename Derived, typename Allocator>
344  return theta_build_helper<true>::starting_theta_from_p(p_);
345 }
346 
347 template<typename Derived, typename Allocator>
348 uint8_t theta_base_builder<Derived, Allocator>::starting_lg_size() const {
349  return theta_build_helper<true>::starting_sub_multiple(lg_k_ + 1, theta_constants::MIN_LG_K, static_cast<uint8_t>(rf_));
350 }
351 
352 // iterator
353 
354 template<typename Entry, typename ExtractKey>
355 theta_iterator<Entry, ExtractKey>::theta_iterator(Entry* entries, uint32_t size, uint32_t index):
356 entries_(entries), size_(size), index_(index) {
357  while (index_ < size_ && ExtractKey()(entries_[index_]) == 0) ++index_;
358 }
359 
360 template<typename Entry, typename ExtractKey>
361 auto theta_iterator<Entry, ExtractKey>::operator++() -> theta_iterator& {
362  ++index_;
363  while (index_ < size_ && ExtractKey()(entries_[index_]) == 0) ++index_;
364  return *this;
365 }
366 
367 template<typename Entry, typename ExtractKey>
368 auto theta_iterator<Entry, ExtractKey>::operator++(int) -> theta_iterator {
369  theta_iterator tmp(*this);
370  operator++();
371  return tmp;
372 }
373 
374 template<typename Entry, typename ExtractKey>
375 bool theta_iterator<Entry, ExtractKey>::operator!=(const theta_iterator& other) const {
376  return index_ != other.index_;
377 }
378 
379 template<typename Entry, typename ExtractKey>
380 bool theta_iterator<Entry, ExtractKey>::operator==(const theta_iterator& other) const {
381  return index_ == other.index_;
382 }
383 
384 template<typename Entry, typename ExtractKey>
385 auto theta_iterator<Entry, ExtractKey>::operator*() const -> reference {
386  return entries_[index_];
387 }
388 
389 template<typename Entry, typename ExtractKey>
390 auto theta_iterator<Entry, ExtractKey>::operator->() const -> pointer {
391  return entries_ + index_;
392 }
393 
394 // const iterator
395 
396 template<typename Entry, typename ExtractKey>
397 theta_const_iterator<Entry, ExtractKey>::theta_const_iterator(const Entry* entries, uint32_t size, uint32_t index):
398 entries_(entries), size_(size), index_(index) {
399  while (index_ < size_ && ExtractKey()(entries_[index_]) == 0) ++index_;
400 }
401 
402 template<typename Entry, typename ExtractKey>
403 auto theta_const_iterator<Entry, ExtractKey>::operator++() -> theta_const_iterator& {
404  ++index_;
405  while (index_ < size_ && ExtractKey()(entries_[index_]) == 0) ++index_;
406  return *this;
407 }
408 
409 template<typename Entry, typename ExtractKey>
410 auto theta_const_iterator<Entry, ExtractKey>::operator++(int) -> theta_const_iterator {
411  theta_const_iterator tmp(*this);
412  operator++();
413  return tmp;
414 }
415 
416 template<typename Entry, typename ExtractKey>
417 bool theta_const_iterator<Entry, ExtractKey>::operator!=(const theta_const_iterator& other) const {
418  return index_ != other.index_;
419 }
420 
421 template<typename Entry, typename ExtractKey>
422 bool theta_const_iterator<Entry, ExtractKey>::operator==(const theta_const_iterator& other) const {
423  return index_ == other.index_;
424 }
425 
426 template<typename Entry, typename ExtractKey>
427 auto theta_const_iterator<Entry, ExtractKey>::operator*() const -> reference {
428  return entries_[index_];
429 }
430 
431 template<typename Entry, typename ExtractKey>
432 auto theta_const_iterator<Entry, ExtractKey>::operator->() const -> pointer {
433  return entries_ + index_;
434 }
435 
436 } /* namespace datasketches */
437 
438 #endif
Theta base builder.
Definition: theta_update_sketch_base.hpp:97
Derived & set_lg_k(uint8_t lg_k)
Set log2(k), where k is a nominal number of entries in the sketch.
Definition: theta_update_sketch_base_impl.hpp:312
Derived & set_resize_factor(resize_factor rf)
Set resize factor for the internal hash table (defaults to 8)
Definition: theta_update_sketch_base_impl.hpp:324
Derived & set_p(float p)
Set sampling probability (initial theta).
Definition: theta_update_sketch_base_impl.hpp:330
Derived & set_seed(uint64_t seed)
Set the seed for the hash function.
Definition: theta_update_sketch_base_impl.hpp:337
const uint8_t MIN_LG_K
min log2 of K
Definition: theta_constants.hpp:38
const uint8_t MAX_LG_K
max log2 of K
Definition: theta_constants.hpp:40
DataSketches namespace.
Definition: binomial_bounds.hpp:38