datasketches-cpp
cpc_union_impl.hpp
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef CPC_UNION_IMPL_HPP_
21 #define CPC_UNION_IMPL_HPP_
22 
23 #include "count_zeros.hpp"
24 
25 #include <stdexcept>
26 
27 namespace datasketches {
28 
29 template<typename A>
30 cpc_union_alloc<A>::cpc_union_alloc(uint8_t lg_k, uint64_t seed, const A& allocator):
31 lg_k(lg_k),
32 seed(seed),
33 accumulator(nullptr),
34 bit_matrix(allocator)
35 {
36  if (lg_k < cpc_constants::MIN_LG_K || lg_k > cpc_constants::MAX_LG_K) {
37  throw std::invalid_argument("lg_k must be >= " + std::to_string(cpc_constants::MIN_LG_K) + " and <= " + std::to_string(cpc_constants::MAX_LG_K) + ": " + std::to_string(lg_k));
38  }
39  accumulator = new (AllocCpc(allocator).allocate(1)) cpc_sketch_alloc<A>(lg_k, seed, allocator);
40 }
41 
42 template<typename A>
44 lg_k(other.lg_k),
45 seed(other.seed),
46 accumulator(other.accumulator),
47 bit_matrix(other.bit_matrix)
48 {
49  if (accumulator != nullptr) {
50  accumulator = new (AllocCpc(accumulator->get_allocator()).allocate(1)) cpc_sketch_alloc<A>(*other.accumulator);
51  }
52 }
53 
54 template<typename A>
56 lg_k(other.lg_k),
57 seed(other.seed),
58 accumulator(other.accumulator),
59 bit_matrix(std::move(other.bit_matrix))
60 {
61  other.accumulator = nullptr;
62 }
63 
64 template<typename A>
66  if (accumulator != nullptr) {
67  AllocCpc allocator(accumulator->get_allocator());
68  accumulator->~cpc_sketch_alloc<A>();
69  allocator.deallocate(accumulator, 1);
70  }
71 }
72 
73 template<typename A>
75  cpc_union_alloc<A> copy(other);
76  std::swap(lg_k, copy.lg_k);
77  seed = copy.seed;
78  std::swap(accumulator, copy.accumulator);
79  bit_matrix = std::move(copy.bit_matrix);
80  return *this;
81 }
82 
83 template<typename A>
85  std::swap(lg_k, other.lg_k);
86  seed = other.seed;
87  std::swap(accumulator, other.accumulator);
88  bit_matrix = std::move(other.bit_matrix);
89  return *this;
90 }
91 
92 template<typename A>
94  internal_update(sketch);
95 }
96 
97 template<typename A>
99  internal_update(std::forward<cpc_sketch_alloc<A>>(sketch));
100 }
101 
102 template<typename A>
103 template<typename S>
104 void cpc_union_alloc<A>::internal_update(S&& sketch) {
105  const uint16_t seed_hash_union = compute_seed_hash(seed);
106  const uint16_t seed_hash_sketch = compute_seed_hash(sketch.seed);
107  if (seed_hash_union != seed_hash_sketch) {
108  throw std::invalid_argument("Incompatible seed hashes: " + std::to_string(seed_hash_union) + ", "
109  + std::to_string(seed_hash_sketch));
110  }
111  const auto src_flavor = sketch.determine_flavor();
112  if (cpc_sketch_alloc<A>::flavor::EMPTY == src_flavor) return;
113 
114  if (sketch.get_lg_k() < lg_k) reduce_k(sketch.get_lg_k());
115  if (sketch.get_lg_k() < lg_k) throw std::logic_error("sketch lg_k < union lg_k");
116 
117  if (accumulator == nullptr && bit_matrix.size() == 0) throw std::logic_error("both accumulator and bit matrix are absent");
118 
119  if (cpc_sketch_alloc<A>::flavor::SPARSE == src_flavor && accumulator != nullptr) { // Case A
120  if (bit_matrix.size() > 0) throw std::logic_error("union bit_matrix is not expected");
121  const auto initial_dest_flavor = accumulator->determine_flavor();
122  if (cpc_sketch_alloc<A>::flavor::EMPTY != initial_dest_flavor &&
123  cpc_sketch_alloc<A>::flavor::SPARSE != initial_dest_flavor) throw std::logic_error("wrong flavor");
124 
125  // The following partially fixes the snowplow problem provided that the K's are equal.
126  if (cpc_sketch_alloc<A>::flavor::EMPTY == initial_dest_flavor && lg_k == sketch.get_lg_k()) {
127  *accumulator = std::forward<S>(sketch);
128  return;
129  }
130 
131  walk_table_updating_sketch(sketch.surprising_value_table);
132  const auto final_dst_flavor = accumulator->determine_flavor();
133  // if the accumulator has graduated beyond sparse, switch to a bit matrix representation
134  if (final_dst_flavor != cpc_sketch_alloc<A>::flavor::EMPTY && final_dst_flavor != cpc_sketch_alloc<A>::flavor::SPARSE) {
135  switch_to_bit_matrix();
136  }
137  return;
138  }
139 
140  if (cpc_sketch_alloc<A>::flavor::SPARSE == src_flavor && bit_matrix.size() > 0) { // Case B
141  if (accumulator != nullptr) throw std::logic_error("union accumulator != null");
142  or_table_into_matrix(sketch.surprising_value_table);
143  return;
144  }
145 
146  if (cpc_sketch_alloc<A>::flavor::HYBRID != src_flavor && cpc_sketch_alloc<A>::flavor::PINNED != src_flavor
147  && cpc_sketch_alloc<A>::flavor::SLIDING != src_flavor) throw std::logic_error("wrong flavor");
148 
149  // source is past SPARSE mode, so make sure that dest is a bit matrix
150  if (accumulator != nullptr) {
151  if (bit_matrix.size() > 0) throw std::logic_error("union bit matrix is not expected");
152  const auto dst_flavor = accumulator->determine_flavor();
153  if (cpc_sketch_alloc<A>::flavor::EMPTY != dst_flavor && cpc_sketch_alloc<A>::flavor::SPARSE != dst_flavor) {
154  throw std::logic_error("wrong flavor");
155  }
156  switch_to_bit_matrix();
157  }
158  if (bit_matrix.size() == 0) throw std::logic_error("union bit_matrix is expected");
159 
160  if (cpc_sketch_alloc<A>::flavor::HYBRID == src_flavor || cpc_sketch_alloc<A>::flavor::PINNED == src_flavor) { // Case C
161  or_window_into_matrix(sketch.sliding_window, sketch.window_offset, sketch.get_lg_k());
162  or_table_into_matrix(sketch.surprising_value_table);
163  return;
164  }
165 
166  // SLIDING mode involves inverted logic, so we can't just walk the source sketch.
167  // Instead, we convert it to a bitMatrix that can be OR'ed into the destination.
168  if (cpc_sketch_alloc<A>::flavor::SLIDING != src_flavor) throw std::logic_error("wrong flavor"); // Case D
169  vector_u64 src_matrix = sketch.build_bit_matrix();
170  or_matrix_into_matrix(src_matrix, sketch.get_lg_k());
171 }
172 
173 template<typename A>
175  if (accumulator != nullptr) {
176  if (bit_matrix.size() > 0) throw std::logic_error("bit_matrix is not expected");
177  return get_result_from_accumulator();
178  }
179  if (bit_matrix.size() == 0) throw std::logic_error("bit_matrix is expected");
180  return get_result_from_bit_matrix();
181 }
182 
183 template<typename A>
185  if (lg_k != accumulator->get_lg_k()) throw std::logic_error("lg_k != accumulator->lg_k");
186  if (accumulator->get_num_coupons() == 0) {
187  return cpc_sketch_alloc<A>(lg_k, seed, accumulator->get_allocator());
188  }
189  if (accumulator->determine_flavor() != cpc_sketch_alloc<A>::flavor::SPARSE) throw std::logic_error("wrong flavor");
190  cpc_sketch_alloc<A> copy(*accumulator);
191  copy.was_merged = true;
192  return copy;
193 }
194 
195 template<typename A>
196 cpc_sketch_alloc<A> cpc_union_alloc<A>::get_result_from_bit_matrix() const {
197  const uint32_t k = 1 << lg_k;
198  const uint32_t num_coupons = count_bits_set_in_matrix(bit_matrix.data(), k);
199 
200  const auto flavor = cpc_sketch_alloc<A>::determine_flavor(lg_k, num_coupons);
201  if (flavor != cpc_sketch_alloc<A>::flavor::HYBRID && flavor != cpc_sketch_alloc<A>::flavor::PINNED
202  && flavor != cpc_sketch_alloc<A>::flavor::SLIDING) throw std::logic_error("wrong flavor");
203 
204  const uint8_t offset = cpc_sketch_alloc<A>::determine_correct_offset(lg_k, num_coupons);
205 
206  vector_bytes sliding_window(k, 0, bit_matrix.get_allocator());
207  // don't need to zero the window's memory
208 
209  // dynamically growing caused snowplow effect
210  uint8_t table_lg_size = lg_k - 4; // K/16; in some cases this will end up being oversized
211  if (table_lg_size < 2) table_lg_size = 2;
212  u32_table<A> table(table_lg_size, 6 + lg_k, bit_matrix.get_allocator());
213 
214  // the following should work even when the offset is zero
215  const uint64_t mask_for_clearing_window = (static_cast<uint64_t>(0xff) << offset) ^ UINT64_MAX;
216  const uint64_t mask_for_flipping_early_zone = (static_cast<uint64_t>(1) << offset) - 1;
217  uint64_t all_surprises_ored = 0;
218 
219  // The snowplow effect was caused by processing the rows in order,
220  // but we have fixed it by using a sufficiently large hash table.
221  for (uint32_t i = 0; i < k; i++) {
222  uint64_t pattern = bit_matrix[i];
223  sliding_window[i] = (pattern >> offset) & 0xff;
224  pattern &= mask_for_clearing_window;
225  pattern ^= mask_for_flipping_early_zone; // this flipping converts surprising 0's to 1's
226  all_surprises_ored |= pattern;
227  while (pattern != 0) {
228  const uint8_t col = count_trailing_zeros_in_u64(pattern);
229  pattern = pattern ^ (static_cast<uint64_t>(1) << col); // erase the 1
230  const uint32_t row_col = (i << 6) | col;
231  bool is_novel = table.maybe_insert(row_col);
232  if (!is_novel) throw std::logic_error("is_novel != true");
233  }
234  }
235 
236  // at this point we could shrink an oversized hash table, but the relative waste isn't very big
237 
238  uint8_t first_interesting_column = count_trailing_zeros_in_u64(all_surprises_ored);
239  if (first_interesting_column > offset) first_interesting_column = offset; // corner case
240 
241  // HIP-related fields will contain zeros, and that is okay
242  return cpc_sketch_alloc<A>(lg_k, num_coupons, first_interesting_column, std::move(table), std::move(sliding_window), false, 0, 0, seed);
243 }
244 
245 template<typename A>
246 void cpc_union_alloc<A>::switch_to_bit_matrix() {
247  bit_matrix = accumulator->build_bit_matrix();
248  AllocCpc allocator(accumulator->get_allocator());
249  accumulator->~cpc_sketch_alloc<A>();
250  allocator.deallocate(accumulator, 1);
251  accumulator = nullptr;
252 }
253 
254 template<typename A>
255 void cpc_union_alloc<A>::walk_table_updating_sketch(const u32_table<A>& table) {
256  const uint32_t* slots = table.get_slots();
257  const uint32_t num_slots = 1 << table.get_lg_size();
258  const uint64_t dst_mask = (((1 << accumulator->get_lg_k()) - 1) << 6) | 63; // downsamples when dst lgK < src LgK
259 
260  // Using a golden ratio stride fixes the snowplow effect.
261  const double golden = 0.6180339887498949025;
262  uint32_t stride = static_cast<uint32_t>(golden * static_cast<double>(num_slots));
263  if (stride < 2) throw std::logic_error("stride < 2");
264  if (stride == ((stride >> 1) << 1)) stride += 1; // force the stride to be odd
265  if (stride < 3 || stride >= num_slots) throw std::out_of_range("stride out of range");
266 
267  for (uint32_t i = 0, j = 0; i < num_slots; i++, j += stride) {
268  j &= num_slots - 1;
269  const uint32_t row_col = slots[j];
270  if (row_col != UINT32_MAX) {
271  accumulator->row_col_update(row_col & dst_mask);
272  }
273  }
274 }
275 
276 template<typename A>
277 void cpc_union_alloc<A>::or_table_into_matrix(const u32_table<A>& table) {
278  const uint32_t* slots = table.get_slots();
279  const uint32_t num_slots = 1 << table.get_lg_size();
280  const uint64_t dest_mask = (1 << lg_k) - 1; // downsamples when dst lgK < sr LgK
281  for (uint32_t i = 0; i < num_slots; i++) {
282  const uint32_t row_col = slots[i];
283  if (row_col != UINT32_MAX) {
284  const uint8_t col = row_col & 63;
285  const uint32_t row = row_col >> 6;
286  bit_matrix[row & dest_mask] |= static_cast<uint64_t>(1) << col; // set the bit
287  }
288  }
289 }
290 
291 template<typename A>
292 void cpc_union_alloc<A>::or_window_into_matrix(const vector_bytes& sliding_window, uint8_t offset, uint8_t src_lg_k) {
293  if (lg_k > src_lg_k) throw std::logic_error("dst LgK > src LgK");
294  const uint64_t dst_mask = (1 << lg_k) - 1; // downsamples when dst lgK < src LgK
295  const uint32_t src_k = 1 << src_lg_k;
296  for (uint32_t src_row = 0; src_row < src_k; src_row++) {
297  bit_matrix[src_row & dst_mask] |= static_cast<uint64_t>(sliding_window[src_row]) << offset;
298  }
299 }
300 
301 template<typename A>
302 void cpc_union_alloc<A>::or_matrix_into_matrix(const vector_u64& src_matrix, uint8_t src_lg_k) {
303  if (lg_k > src_lg_k) throw std::logic_error("dst LgK > src LgK");
304  const uint64_t dst_mask = (1 << lg_k) - 1; // downsamples when dst lgK < src LgK
305  const uint32_t src_k = 1 << src_lg_k;
306  for (uint32_t src_row = 0; src_row < src_k; src_row++) {
307  bit_matrix[src_row & dst_mask] |= src_matrix[src_row];
308  }
309 }
310 
311 template<typename A>
312 void cpc_union_alloc<A>::reduce_k(uint8_t new_lg_k) {
313  if (new_lg_k >= lg_k) throw std::logic_error("new LgK >= union lgK");
314  if (accumulator == nullptr && bit_matrix.size() == 0) throw std::logic_error("both accumulator and bit_matrix are absent");
315 
316  if (bit_matrix.size() > 0) { // downsample the unioner's bit matrix
317  if (accumulator != nullptr) throw std::logic_error("accumulator is not null");
318  vector_u64 old_matrix = std::move(bit_matrix);
319  const uint8_t old_lg_k = lg_k;
320  const uint32_t new_k = 1 << new_lg_k;
321  bit_matrix = vector_u64(new_k, 0, old_matrix.get_allocator());
322  lg_k = new_lg_k;
323  or_matrix_into_matrix(old_matrix, old_lg_k);
324  return;
325  }
326 
327  if (accumulator != nullptr) { // downsample the unioner's sketch
328  if (bit_matrix.size() > 0) throw std::logic_error("bit_matrix is not expected");
329  if (!accumulator->is_empty()) {
330  cpc_sketch_alloc<A> old_accumulator(*accumulator);
331  *accumulator = cpc_sketch_alloc<A>(new_lg_k, seed, old_accumulator.get_allocator());
332  walk_table_updating_sketch(old_accumulator.surprising_value_table);
333  }
334  lg_k = new_lg_k;
335 
336  const auto final_new_flavor = accumulator->determine_flavor();
337  // if the new sketch has graduated beyond sparse, convert to bit_matrix
338  if (final_new_flavor != cpc_sketch_alloc<A>::flavor::EMPTY &&
339  final_new_flavor != cpc_sketch_alloc<A>::flavor::SPARSE) {
340  switch_to_bit_matrix();
341  }
342  return;
343  }
344 
345  throw std::logic_error("invalid state");
346 }
347 
348 } /* namespace datasketches */
349 
350 #endif
High performance C++ implementation of Compressed Probabilistic Counting (CPC) Sketch.
Definition: cpc_sketch.hpp:64
High performance C++ implementation of Compressed Probabilistic Counting (CPC) Union.
Definition: cpc_union.hpp:40
void update(const cpc_sketch_alloc< A > &sketch)
This method is to update the union with a given sketch (lvalue)
Definition: cpc_union_impl.hpp:93
cpc_sketch_alloc< A > get_result() const
This method produces a copy of the current state of the union as a sketch.
Definition: cpc_union_impl.hpp:174
cpc_union_alloc(uint8_t lg_k=cpc_constants::DEFAULT_LG_K, uint64_t seed=DEFAULT_SEED, const A &allocator=A())
Creates an instance of the union given the lg_k parameter and hash seed.
Definition: cpc_union_impl.hpp:30
cpc_union_alloc< A > & operator=(const cpc_union_alloc< A > &other)
Copy assignment.
Definition: cpc_union_impl.hpp:74
const uint8_t MIN_LG_K
min log2 of K
Definition: cpc_common.hpp:32
const uint8_t MAX_LG_K
max log2 of K
Definition: cpc_common.hpp:34
DataSketches namespace.
Definition: binomial_bounds.hpp:38