datasketches-cpp
reverse_purge_hash_map_impl.hpp
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef REVERSE_PURGE_HASH_MAP_IMPL_HPP_
21 #define REVERSE_PURGE_HASH_MAP_IMPL_HPP_
22 
23 #include <memory>
24 #include <algorithm>
25 #include <iterator>
26 #include <cmath>
27 
28 #include "MurmurHash3.h"
29 
30 namespace datasketches {
31 
32 // clang++ seems to require this declaration for CMAKE_BUILD_TYPE='Debug"
33 template<typename K, typename V, typename H, typename E, typename A>
34 constexpr uint32_t reverse_purge_hash_map<K, V, H, E, A>::MAX_SAMPLE_SIZE;
35 
36 template<typename K, typename V, typename H, typename E, typename A>
37 reverse_purge_hash_map<K, V, H, E, A>::reverse_purge_hash_map(uint8_t lg_cur_size, uint8_t lg_max_size,
38  const E& equal, const A& allocator):
39 equal_(equal),
40 allocator_(allocator),
41 lg_cur_size_(lg_cur_size),
42 lg_max_size_(lg_max_size),
43 num_active_(0),
44 keys_(allocator_.allocate(1ULL << lg_cur_size)),
45 values_(nullptr),
46 states_(nullptr)
47 {
48  AllocV av(allocator_);
49  values_ = av.allocate(1ULL << lg_cur_size);
50  AllocU16 au16(allocator_);
51  states_ = au16.allocate(1ULL << lg_cur_size);
52  std::fill(states_, states_ + (1ULL << lg_cur_size), static_cast<uint16_t>(0));
53 }
54 
55 template<typename K, typename V, typename H, typename E, typename A>
56 reverse_purge_hash_map<K, V, H, E, A>::reverse_purge_hash_map(const reverse_purge_hash_map<K, V, H, E, A>& other):
57 equal_(other.equal_),
58 allocator_(other.allocator_),
59 lg_cur_size_(other.lg_cur_size_),
60 lg_max_size_(other.lg_max_size_),
61 num_active_(other.num_active_),
62 keys_(allocator_.allocate(1ULL << lg_cur_size_)),
63 values_(nullptr),
64 states_(nullptr)
65 {
66  AllocV av(allocator_);
67  values_ = av.allocate(1ULL << lg_cur_size_);
68  AllocU16 au16(allocator_);
69  states_ = au16.allocate(1ULL << lg_cur_size_);
70  const uint32_t size = 1 << lg_cur_size_;
71  if (num_active_ > 0) {
72  auto num = num_active_;
73  for (uint32_t i = 0; i < size; i++) {
74  if (other.states_[i] > 0) {
75  new (&keys_[i]) K(other.keys_[i]);
76  values_[i] = other.values_[i];
77  if (--num == 0) break;
78  }
79  }
80  }
81  std::copy(other.states_, other.states_ + size, states_);
82 }
83 
84 template<typename K, typename V, typename H, typename E, typename A>
85 reverse_purge_hash_map<K, V, H, E, A>::reverse_purge_hash_map(reverse_purge_hash_map<K, V, H, E, A>&& other) noexcept:
86 equal_(std::move(other.equal_)),
87 allocator_(std::move(other.allocator_)),
88 lg_cur_size_(other.lg_cur_size_),
89 lg_max_size_(other.lg_max_size_),
90 num_active_(other.num_active_),
91 keys_(nullptr),
92 values_(nullptr),
93 states_(nullptr)
94 {
95  std::swap(keys_, other.keys_);
96  std::swap(values_, other.values_);
97  std::swap(states_, other.states_);
98  other.num_active_ = 0;
99 }
100 
101 template<typename K, typename V, typename H, typename E, typename A>
102 reverse_purge_hash_map<K, V, H, E, A>::~reverse_purge_hash_map() {
103  const uint32_t size = 1 << lg_cur_size_;
104  if (num_active_ > 0) {
105  for (uint32_t i = 0; i < size; i++) {
106  if (is_active(i)) {
107  keys_[i].~K();
108  if (--num_active_ == 0) break;
109  }
110  }
111  }
112  if (keys_ != nullptr) {
113  allocator_.deallocate(keys_, size);
114  }
115  if (values_ != nullptr) {
116  AllocV av(allocator_);
117  av.deallocate(values_, size);
118  }
119  if (states_ != nullptr) {
120  AllocU16 au16(allocator_);
121  au16.deallocate(states_, size);
122  }
123 }
124 
125 template<typename K, typename V, typename H, typename E, typename A>
126 reverse_purge_hash_map<K, V, H, E, A>& reverse_purge_hash_map<K, V, H, E, A>::operator=(const reverse_purge_hash_map<K, V, H, E, A>& other) {
127  reverse_purge_hash_map copy(other);
128  std::swap(equal_, copy.equal_);
129  std::swap(allocator_, copy.allocator_);
130  std::swap(lg_cur_size_, copy.lg_cur_size_);
131  std::swap(lg_max_size_, copy.lg_max_size_);
132  std::swap(num_active_, copy.num_active_);
133  std::swap(keys_, copy.keys_);
134  std::swap(values_, copy.values_);
135  std::swap(states_, copy.states_);
136  return *this;
137 }
138 
139 template<typename K, typename V, typename H, typename E, typename A>
140 reverse_purge_hash_map<K, V, H, E, A>& reverse_purge_hash_map<K, V, H, E, A>::operator=(reverse_purge_hash_map<K, V, H, E, A>&& other) {
141  std::swap(equal_, other.equal_);
142  std::swap(allocator_, other.allocator_);
143  std::swap(lg_cur_size_, other.lg_cur_size_);
144  std::swap(lg_max_size_, other.lg_max_size_);
145  std::swap(num_active_, other.num_active_);
146  std::swap(keys_, other.keys_);
147  std::swap(values_, other.values_);
148  std::swap(states_, other.states_);
149  return *this;
150 }
151 
152 template<typename K, typename V, typename H, typename E, typename A>
153 template<typename FwdK>
154 V reverse_purge_hash_map<K, V, H, E, A>::adjust_or_insert(FwdK&& key, V value) {
155  const uint32_t num_active_before = num_active_;
156  const uint32_t index = internal_adjust_or_insert(key, value);
157  if (num_active_ > num_active_before) {
158  new (&keys_[index]) K(std::forward<FwdK>(key));
159  return resize_or_purge_if_needed();
160  }
161  return 0;
162 }
163 
164 template<typename K, typename V, typename H, typename E, typename A>
165 V reverse_purge_hash_map<K, V, H, E, A>::get(const K& key) const {
166  const uint32_t mask = (1 << lg_cur_size_) - 1;
167  uint32_t probe = fmix64(H()(key)) & mask;
168  while (is_active(probe)) {
169  if (E()(keys_[probe], key)) return values_[probe];
170  probe = (probe + 1) & mask;
171  }
172  return 0;
173 }
174 
175 template<typename K, typename V, typename H, typename E, typename A>
176 uint8_t reverse_purge_hash_map<K, V, H, E, A>::get_lg_cur_size() const {
177  return lg_cur_size_;
178 }
179 
180 template<typename K, typename V, typename H, typename E, typename A>
181 uint8_t reverse_purge_hash_map<K, V, H, E, A>::get_lg_max_size() const {
182  return lg_max_size_;
183 }
184 
185 template<typename K, typename V, typename H, typename E, typename A>
186 uint32_t reverse_purge_hash_map<K, V, H, E, A>::get_capacity() const {
187  return static_cast<uint32_t>((1 << lg_cur_size_) * LOAD_FACTOR);
188 }
189 
190 template<typename K, typename V, typename H, typename E, typename A>
191 uint32_t reverse_purge_hash_map<K, V, H, E, A>::get_num_active() const {
192  return num_active_;
193 }
194 
195 template<typename K, typename V, typename H, typename E, typename A>
196 const A& reverse_purge_hash_map<K, V, H, E, A>::get_allocator() const {
197  return allocator_;
198 }
199 
200 template<typename K, typename V, typename H, typename E, typename A>
201 typename reverse_purge_hash_map<K, V, H, E, A>::iterator reverse_purge_hash_map<K, V, H, E, A>::begin() const {
202  const uint32_t size = 1 << lg_cur_size_;
203  uint32_t i = 0;
204  while (i < size && !is_active(i)) i++;
205  return reverse_purge_hash_map<K, V, H, E, A>::iterator(this, i, 0);
206 }
207 
208 template<typename K, typename V, typename H, typename E, typename A>
209 typename reverse_purge_hash_map<K, V, H, E, A>::iterator reverse_purge_hash_map<K, V, H, E, A>::end() const {
210  return reverse_purge_hash_map<K, V, H, E, A>::iterator(this, 1 << lg_cur_size_, num_active_);
211 }
212 
213 template<typename K, typename V, typename H, typename E, typename A>
214 bool reverse_purge_hash_map<K, V, H, E, A>::is_active(uint32_t index) const {
215  return states_[index] > 0;
216 }
217 
218 template<typename K, typename V, typename H, typename E, typename A>
219 void reverse_purge_hash_map<K, V, H, E, A>::subtract_and_keep_positive_only(V amount) {
220  // starting from the back, find the first empty cell,
221  // which establishes the high end of a cluster.
222  uint32_t first_probe = (1 << lg_cur_size_) - 1;
223  while (is_active(first_probe)) first_probe--;
224  // when we find the next non-empty cell, we know we are at the high end of a cluster
225  // work towards the front, delete any non-positive entries.
226  for (uint32_t probe = first_probe; probe-- > 0;) {
227  if (is_active(probe)) {
228  if (values_[probe] <= amount) {
229  hash_delete(probe); // does the work of deletion and moving higher items towards the front
230  num_active_--;
231  } else {
232  values_[probe] -= amount;
233  }
234  }
235  }
236  // now work on the first cluster that was skipped
237  for (uint32_t probe = (1 << lg_cur_size_); probe-- > first_probe;) {
238  if (is_active(probe)) {
239  if (values_[probe] <= amount) {
240  hash_delete(probe);
241  num_active_--;
242  } else {
243  values_[probe] -= amount;
244  }
245  }
246  }
247 }
248 
249 template<typename K, typename V, typename H, typename E, typename A>
250 void reverse_purge_hash_map<K, V, H, E, A>::hash_delete(uint32_t delete_index) {
251  // Looks ahead in the table to search for another
252  // item to move to this location
253  // if none are found, the status is changed
254  states_[delete_index] = 0; // mark as empty
255  keys_[delete_index].~K();
256  uint16_t drift = 1;
257  const uint32_t mask = (1 << lg_cur_size_) - 1;
258  uint32_t probe = (delete_index + drift) & mask; // map length must be a power of 2
259  // advance until we find a free location replacing locations as needed
260  while (is_active(probe)) {
261  if (states_[probe] > drift) {
262  // move current element
263  new (&keys_[delete_index]) K(std::move(keys_[probe]));
264  values_[delete_index] = values_[probe];
265  states_[delete_index] = states_[probe] - drift;
266  states_[probe] = 0; // mark as empty
267  keys_[probe].~K();
268  drift = 0;
269  delete_index = probe;
270  }
271  probe = (probe + 1) & mask;
272  drift++;
273  // only used for theoretical analysis
274  if (drift >= DRIFT_LIMIT) throw std::logic_error("drift: " + std::to_string(drift) + " >= DRIFT_LIMIT");
275  }
276 }
277 
278 template<typename K, typename V, typename H, typename E, typename A>
279 uint32_t reverse_purge_hash_map<K, V, H, E, A>::internal_adjust_or_insert(const K& key, V value) {
280  const uint32_t mask = (1 << lg_cur_size_) - 1;
281  uint32_t index = fmix64(H()(key)) & mask;
282  uint16_t drift = 1;
283  while (is_active(index)) {
284  if (E()(keys_[index], key)) {
285  // adjusting the value of an existing key
286  values_[index] += value;
287  return index;
288  }
289  index = (index + 1) & mask;
290  drift++;
291  // only used for theoretical analysis
292  if (drift >= DRIFT_LIMIT) throw std::logic_error("drift limit reached");
293  }
294  // adding the key and value to the table
295  if (num_active_ > get_capacity()) {
296  throw std::logic_error("num_active " + std::to_string(num_active_) + " > capacity " + std::to_string(get_capacity()));
297  }
298  values_[index] = value;
299  states_[index] = drift;
300  num_active_++;
301  return index;
302 }
303 
304 template<typename K, typename V, typename H, typename E, typename A>
305 V reverse_purge_hash_map<K, V, H, E, A>::resize_or_purge_if_needed() {
306  if (num_active_ > get_capacity()) {
307  if (lg_cur_size_ < lg_max_size_) { // can grow
308  resize(lg_cur_size_ + 1);
309  } else { // at target size, must purge
310  const V offset = purge();
311  if (num_active_ > get_capacity()) {
312  throw std::logic_error("purge did not reduce number of active items");
313  }
314  return offset;
315  }
316  }
317  return 0;
318 }
319 
320 template<typename K, typename V, typename H, typename E, typename A>
321 void reverse_purge_hash_map<K, V, H, E, A>::resize(uint8_t lg_new_size) {
322  const uint32_t old_size = 1 << lg_cur_size_;
323  K* old_keys = keys_;
324  V* old_values = values_;
325  uint16_t* old_states = states_;
326  const uint32_t new_size = 1 << lg_new_size;
327  keys_ = allocator_.allocate(new_size);
328  AllocV av(allocator_);
329  values_ = av.allocate(new_size);
330  AllocU16 au16(allocator_);
331  states_ = au16.allocate(new_size);
332  std::fill(states_, states_ + new_size, static_cast<uint16_t>(0));
333  num_active_ = 0;
334  lg_cur_size_ = lg_new_size;
335  for (uint32_t i = 0; i < old_size; i++) {
336  if (old_states[i] > 0) {
337  adjust_or_insert(std::move(old_keys[i]), old_values[i]);
338  old_keys[i].~K();
339  }
340  }
341  allocator_.deallocate(old_keys, old_size);
342  av.deallocate(old_values, old_size);
343  au16.deallocate(old_states, old_size);
344 }
345 
346 template<typename K, typename V, typename H, typename E, typename A>
347 V reverse_purge_hash_map<K, V, H, E, A>::purge() {
348  const uint32_t limit = std::min(MAX_SAMPLE_SIZE, num_active_);
349  uint32_t num_samples = 0;
350  uint32_t i = 0;
351  AllocV av(allocator_);
352  V* samples = av.allocate(limit);
353  while (num_samples < limit) {
354  if (is_active(i)) {
355  samples[num_samples++] = values_[i];
356  }
357  i++;
358  }
359  std::nth_element(samples, samples+ (num_samples / 2), samples + num_samples);
360  const V median = samples[num_samples / 2];
361  av.deallocate(samples, limit);
362  subtract_and_keep_positive_only(median);
363  return median;
364 }
365 
366 } /* namespace datasketches */
367 
368 # endif
DataSketches namespace.
Definition: binomial_bounds.hpp:38