datasketches-cpp
Loading...
Searching...
No Matches
theta_update_sketch_base_impl.hpp
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#ifndef THETA_UPDATE_SKETCH_BASE_IMPL_HPP_
21#define THETA_UPDATE_SKETCH_BASE_IMPL_HPP_
22
23#include <iostream>
24#include <sstream>
25#include <algorithm>
26#include <stdexcept>
27
28#include "theta_helpers.hpp"
29
30namespace datasketches {
31
32template<typename EN, typename EK, typename A>
33theta_update_sketch_base<EN, EK, A>::theta_update_sketch_base(uint8_t lg_cur_size, uint8_t lg_nom_size, resize_factor rf, float p, uint64_t theta, uint64_t seed, const A& allocator, bool is_empty):
34allocator_(allocator),
35is_empty_(is_empty),
36lg_cur_size_(lg_cur_size),
37lg_nom_size_(lg_nom_size),
38rf_(rf),
39p_(p),
40num_entries_(0),
41theta_(theta),
42seed_(seed),
43entries_(nullptr)
44{
45 if (lg_cur_size > 0) {
46 const size_t size = 1ULL << lg_cur_size;
47 entries_ = allocator_.allocate(size);
48 for (size_t i = 0; i < size; ++i) EK()(entries_[i]) = 0;
49 }
50}
51
52template<typename EN, typename EK, typename A>
53theta_update_sketch_base<EN, EK, A>::theta_update_sketch_base(const theta_update_sketch_base& other):
54allocator_(other.allocator_),
55is_empty_(other.is_empty_),
56lg_cur_size_(other.lg_cur_size_),
57lg_nom_size_(other.lg_nom_size_),
58rf_(other.rf_),
59p_(other.p_),
60num_entries_(other.num_entries_),
61theta_(other.theta_),
62seed_(other.seed_),
63entries_(nullptr)
64{
65 if (other.entries_ != nullptr) {
66 const size_t size = 1ULL << lg_cur_size_;
67 entries_ = allocator_.allocate(size);
68 for (size_t i = 0; i < size; ++i) {
69 if (EK()(other.entries_[i]) != 0) {
70 new (&entries_[i]) EN(other.entries_[i]);
71 } else {
72 EK()(entries_[i]) = 0;
73 }
74 }
75 }
76}
77
78template<typename EN, typename EK, typename A>
79theta_update_sketch_base<EN, EK, A>::theta_update_sketch_base(theta_update_sketch_base&& other) noexcept:
80allocator_(std::move(other.allocator_)),
81is_empty_(other.is_empty_),
82lg_cur_size_(other.lg_cur_size_),
83lg_nom_size_(other.lg_nom_size_),
84rf_(other.rf_),
85p_(other.p_),
86num_entries_(other.num_entries_),
87theta_(other.theta_),
88seed_(other.seed_),
89entries_(other.entries_)
90{
91 other.entries_ = nullptr;
92}
93
94template<typename EN, typename EK, typename A>
95theta_update_sketch_base<EN, EK, A>::~theta_update_sketch_base()
96{
97 if (entries_ != nullptr) {
98 const size_t size = 1ULL << lg_cur_size_;
99 for (size_t i = 0; i < size; ++i) {
100 if (EK()(entries_[i]) != 0) entries_[i].~EN();
101 }
102 allocator_.deallocate(entries_, size);
104}
105
106template<typename EN, typename EK, typename A>
107theta_update_sketch_base<EN, EK, A>& theta_update_sketch_base<EN, EK, A>::operator=(const theta_update_sketch_base& other) {
108 theta_update_sketch_base<EN, EK, A> copy(other);
109 std::swap(allocator_, copy.allocator_);
110 std::swap(is_empty_, copy.is_empty_);
111 std::swap(lg_cur_size_, copy.lg_cur_size_);
112 std::swap(lg_nom_size_, copy.lg_nom_size_);
113 std::swap(rf_, copy.rf_);
114 std::swap(p_, copy.p_);
115 std::swap(num_entries_, copy.num_entries_);
116 std::swap(theta_, copy.theta_);
117 std::swap(seed_, copy.seed_);
118 std::swap(entries_, copy.entries_);
119 return *this;
120}
121
122template<typename EN, typename EK, typename A>
123theta_update_sketch_base<EN, EK, A>& theta_update_sketch_base<EN, EK, A>::operator=(theta_update_sketch_base&& other) {
124 std::swap(allocator_, other.allocator_);
125 std::swap(is_empty_, other.is_empty_);
126 std::swap(lg_cur_size_, other.lg_cur_size_);
127 std::swap(lg_nom_size_, other.lg_nom_size_);
128 std::swap(rf_, other.rf_);
129 std::swap(p_, other.p_);
130 std::swap(num_entries_, other.num_entries_);
131 std::swap(theta_, other.theta_);
132 std::swap(seed_, other.seed_);
133 std::swap(entries_, other.entries_);
134 return *this;
136
137template<typename EN, typename EK, typename A>
138uint64_t theta_update_sketch_base<EN, EK, A>::hash_and_screen(const void* data, size_t length) {
139 is_empty_ = false;
140 const uint64_t hash = compute_hash(data, length, seed_);
141 if (hash >= theta_) return 0; // hash == 0 is reserved to mark empty slots in the table
142 return hash;
143}
144
145template<typename EN, typename EK, typename A>
146auto theta_update_sketch_base<EN, EK, A>::find(uint64_t key) const -> std::pair<iterator, bool> {
147 return find(entries_, lg_cur_size_, key);
148}
149
150template<typename EN, typename EK, typename A>
151auto theta_update_sketch_base<EN, EK, A>::find(EN* entries, uint8_t lg_size, uint64_t key) -> std::pair<iterator, bool> {
152 const uint32_t size = 1 << lg_size;
153 const uint32_t mask = size - 1;
154 const uint32_t stride = get_stride(key, lg_size);
155 uint32_t index = static_cast<uint32_t>(key) & mask;
156 // search for duplicate or zero
157 const uint32_t loop_index = index;
158 do {
159 const uint64_t probe = EK()(entries[index]);
160 if (probe == 0) {
161 return std::pair<iterator, bool>(&entries[index], false);
162 } else if (probe == key) {
163 return std::pair<iterator, bool>(&entries[index], true);
164 }
165 index = (index + stride) & mask;
166 } while (index != loop_index);
167 throw std::logic_error("key not found and no empty slots!");
168}
169
170template<typename EN, typename EK, typename A>
171template<typename Fwd>
172void theta_update_sketch_base<EN, EK, A>::insert(iterator it, Fwd&& entry) {
173 new (it) EN(std::forward<Fwd>(entry));
174 ++num_entries_;
175 if (num_entries_ > get_capacity(lg_cur_size_, lg_nom_size_)) {
176 if (lg_cur_size_ <= lg_nom_size_) {
177 resize();
178 } else {
179 rebuild();
180 }
181 }
182}
183
184template<typename EN, typename EK, typename A>
185auto theta_update_sketch_base<EN, EK, A>::begin() const -> iterator {
186 return entries_;
187}
188
189template<typename EN, typename EK, typename A>
190auto theta_update_sketch_base<EN, EK, A>::end() const -> iterator {
191 return entries_ + (1ULL << lg_cur_size_);
192}
193
194template<typename EN, typename EK, typename A>
195uint32_t theta_update_sketch_base<EN, EK, A>::get_capacity(uint8_t lg_cur_size, uint8_t lg_nom_size) {
196 const double fraction = (lg_cur_size <= lg_nom_size) ? RESIZE_THRESHOLD : REBUILD_THRESHOLD;
197 return static_cast<uint32_t>(std::floor(fraction * (1 << lg_cur_size)));
198}
199
200template<typename EN, typename EK, typename A>
201uint32_t theta_update_sketch_base<EN, EK, A>::get_stride(uint64_t key, uint8_t lg_size) {
202 // odd and independent of index assuming lg_size lowest bits of the key were used for the index
203 return (2 * static_cast<uint32_t>((key >> lg_size) & STRIDE_MASK)) + 1;
204}
205
206template<typename EN, typename EK, typename A>
207void theta_update_sketch_base<EN, EK, A>::resize() {
208 const size_t old_size = 1ULL << lg_cur_size_;
209 const uint8_t lg_new_size = std::min<uint8_t>(lg_cur_size_ + static_cast<uint8_t>(rf_), lg_nom_size_ + 1);
210 const size_t new_size = 1ULL << lg_new_size;
211 EN* new_entries = allocator_.allocate(new_size);
212 for (size_t i = 0; i < new_size; ++i) EK()(new_entries[i]) = 0;
213 for (size_t i = 0; i < old_size; ++i) {
214 const uint64_t key = EK()(entries_[i]);
215 if (key != 0) {
216 // always finds an empty slot in a larger table
217 new (find(new_entries, lg_new_size, key).first) EN(std::move(entries_[i]));
218 entries_[i].~EN();
219 EK()(entries_[i]) = 0;
220 }
221 }
222 std::swap(entries_, new_entries);
223 lg_cur_size_ = lg_new_size;
224 allocator_.deallocate(new_entries, old_size);
225}
226
227// assumes number of entries > nominal size
228template<typename EN, typename EK, typename A>
229void theta_update_sketch_base<EN, EK, A>::rebuild() {
230 const size_t size = 1ULL << lg_cur_size_;
231 const uint32_t nominal_size = 1 << lg_nom_size_;
232
233 // empty entries have uninitialized payloads
234 // TODO: avoid this for empty or trivial payloads (arithmetic types)
235 consolidate_non_empty(entries_, size, num_entries_);
236
237 std::nth_element(entries_, entries_ + nominal_size, entries_ + num_entries_, comparator());
238 this->theta_ = EK()(entries_[nominal_size]);
239 EN* old_entries = entries_;
240 const size_t num_old_entries = num_entries_;
241 entries_ = allocator_.allocate(size);
242 for (size_t i = 0; i < size; ++i) EK()(entries_[i]) = 0;
243 num_entries_ = nominal_size;
244 // relies on consolidating non-empty entries to the front
245 for (size_t i = 0; i < nominal_size; ++i) {
246 new (find(EK()(old_entries[i])).first) EN(std::move(old_entries[i]));
247 old_entries[i].~EN();
248 }
249 for (size_t i = nominal_size; i < num_old_entries; ++i) old_entries[i].~EN();
250 allocator_.deallocate(old_entries, size);
251}
252
253template<typename EN, typename EK, typename A>
254void theta_update_sketch_base<EN, EK, A>::trim() {
255 if (num_entries_ > static_cast<uint32_t>(1 << lg_nom_size_)) rebuild();
256}
257
258template<typename EN, typename EK, typename A>
259void theta_update_sketch_base<EN, EK, A>::reset() {
260 const size_t cur_size = 1ULL << lg_cur_size_;
261 for (size_t i = 0; i < cur_size; ++i) {
262 if (EK()(entries_[i]) != 0) {
263 entries_[i].~EN();
264 EK()(entries_[i]) = 0;
265 }
266 }
267 const uint8_t starting_lg_size = theta_build_helper<true>::starting_sub_multiple(
268 lg_nom_size_ + 1, theta_constants::MIN_LG_K, static_cast<uint8_t>(rf_));
269 if (starting_lg_size != lg_cur_size_) {
270 allocator_.deallocate(entries_, cur_size);
271 lg_cur_size_ = starting_lg_size;
272 const size_t new_size = 1ULL << starting_lg_size;
273 entries_ = allocator_.allocate(new_size);
274 for (size_t i = 0; i < new_size; ++i) EK()(entries_[i]) = 0;
275 }
276 num_entries_ = 0;
277 theta_ = theta_build_helper<true>::starting_theta_from_p(p_);
278 is_empty_ = true;
279}
280
281template<typename EN, typename EK, typename A>
282void theta_update_sketch_base<EN, EK, A>::consolidate_non_empty(EN* entries, size_t size, size_t num) {
283 // find the first empty slot
284 size_t i = 0;
285 while (i < size) {
286 if (EK()(entries[i]) == 0) break;
287 ++i;
288 }
289 // scan the rest and move non-empty entries to the front
290 for (size_t j = i + 1; j < size; ++j) {
291 if (EK()(entries[j]) != 0) {
292 new (&entries[i]) EN(std::move(entries[j]));
293 entries[j].~EN();
294 EK()(entries[j]) = 0;
295 ++i;
296 if (i == num) break;
297 }
298 }
299}
300
301// builder
302
303template<typename Derived, typename Allocator>
305allocator_(allocator),
306lg_k_(theta_constants::DEFAULT_LG_K),
307rf_(theta_constants::DEFAULT_RESIZE_FACTOR),
308p_(1),
309seed_(DEFAULT_SEED) {}
310
311template<typename Derived, typename Allocator>
313 if (lg_k < theta_constants::MIN_LG_K) {
314 throw std::invalid_argument("lg_k must not be less than " + std::to_string(theta_constants::MIN_LG_K) + ": " + std::to_string(lg_k));
315 }
316 if (lg_k > theta_constants::MAX_LG_K) {
317 throw std::invalid_argument("lg_k must not be greater than " + std::to_string(theta_constants::MAX_LG_K) + ": " + std::to_string(lg_k));
318 }
319 lg_k_ = lg_k;
320 return static_cast<Derived&>(*this);
321}
322
323template<typename Derived, typename Allocator>
325 rf_ = rf;
326 return static_cast<Derived&>(*this);
327}
328
329template<typename Derived, typename Allocator>
331 if (p <= 0 || p > 1) throw std::invalid_argument("sampling probability must be between 0 and 1");
332 p_ = p;
333 return static_cast<Derived&>(*this);
334}
335
336template<typename Derived, typename Allocator>
338 seed_ = seed;
339 return static_cast<Derived&>(*this);
340}
341
342template<typename Derived, typename Allocator>
344 return theta_build_helper<true>::starting_theta_from_p(p_);
345}
346
347template<typename Derived, typename Allocator>
348uint8_t theta_base_builder<Derived, Allocator>::starting_lg_size() const {
349 return theta_build_helper<true>::starting_sub_multiple(lg_k_ + 1, theta_constants::MIN_LG_K, static_cast<uint8_t>(rf_));
350}
351
352// iterator
353
354template<typename Entry, typename ExtractKey>
355theta_iterator<Entry, ExtractKey>::theta_iterator(Entry* entries, uint32_t size, uint32_t index):
356entries_(entries), size_(size), index_(index) {
357 while (index_ < size_ && ExtractKey()(entries_[index_]) == 0) ++index_;
358}
359
360template<typename Entry, typename ExtractKey>
361auto theta_iterator<Entry, ExtractKey>::operator++() -> theta_iterator& {
362 ++index_;
363 while (index_ < size_ && ExtractKey()(entries_[index_]) == 0) ++index_;
364 return *this;
365}
366
367template<typename Entry, typename ExtractKey>
368auto theta_iterator<Entry, ExtractKey>::operator++(int) -> theta_iterator {
369 theta_iterator tmp(*this);
370 operator++();
371 return tmp;
372}
373
374template<typename Entry, typename ExtractKey>
375bool theta_iterator<Entry, ExtractKey>::operator!=(const theta_iterator& other) const {
376 return index_ != other.index_;
377}
378
379template<typename Entry, typename ExtractKey>
380bool theta_iterator<Entry, ExtractKey>::operator==(const theta_iterator& other) const {
381 return index_ == other.index_;
382}
383
384template<typename Entry, typename ExtractKey>
385auto theta_iterator<Entry, ExtractKey>::operator*() const -> reference {
386 return entries_[index_];
387}
388
389template<typename Entry, typename ExtractKey>
390auto theta_iterator<Entry, ExtractKey>::operator->() const -> pointer {
391 return entries_ + index_;
392}
393
394// const iterator
395
396template<typename Entry, typename ExtractKey>
397theta_const_iterator<Entry, ExtractKey>::theta_const_iterator(const Entry* entries, uint32_t size, uint32_t index):
398entries_(entries), size_(size), index_(index) {
399 while (index_ < size_ && ExtractKey()(entries_[index_]) == 0) ++index_;
400}
401
402template<typename Entry, typename ExtractKey>
403auto theta_const_iterator<Entry, ExtractKey>::operator++() -> theta_const_iterator& {
404 ++index_;
405 while (index_ < size_ && ExtractKey()(entries_[index_]) == 0) ++index_;
406 return *this;
407}
408
409template<typename Entry, typename ExtractKey>
410auto theta_const_iterator<Entry, ExtractKey>::operator++(int) -> theta_const_iterator {
411 theta_const_iterator tmp(*this);
412 operator++();
413 return tmp;
414}
415
416template<typename Entry, typename ExtractKey>
417bool theta_const_iterator<Entry, ExtractKey>::operator!=(const theta_const_iterator& other) const {
418 return index_ != other.index_;
419}
420
421template<typename Entry, typename ExtractKey>
422bool theta_const_iterator<Entry, ExtractKey>::operator==(const theta_const_iterator& other) const {
423 return index_ == other.index_;
424}
425
426template<typename Entry, typename ExtractKey>
427auto theta_const_iterator<Entry, ExtractKey>::operator*() const -> reference {
428 return entries_[index_];
429}
430
431template<typename Entry, typename ExtractKey>
432auto theta_const_iterator<Entry, ExtractKey>::operator->() const -> pointer {
433 return entries_ + index_;
434}
435
436} /* namespace datasketches */
437
438#endif
Theta base builder.
Definition theta_update_sketch_base.hpp:97
Derived & set_lg_k(uint8_t lg_k)
Set log2(k), where k is a nominal number of entries in the sketch.
Definition theta_update_sketch_base_impl.hpp:312
Derived & set_resize_factor(resize_factor rf)
Set resize factor for the internal hash table (defaults to 8)
Definition theta_update_sketch_base_impl.hpp:324
Derived & set_p(float p)
Set sampling probability (initial theta).
Definition theta_update_sketch_base_impl.hpp:330
Derived & set_seed(uint64_t seed)
Set the seed for the hash function.
Definition theta_update_sketch_base_impl.hpp:337
const uint8_t MIN_LG_K
min log2 of K
Definition theta_constants.hpp:38
const uint8_t MAX_LG_K
max log2 of K
Definition theta_constants.hpp:40
DataSketches namespace.
Definition binomial_bounds.hpp:38