datasketches-cpp
Loading...
Searching...
No Matches
cpc_union_impl.hpp
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#ifndef CPC_UNION_IMPL_HPP_
21#define CPC_UNION_IMPL_HPP_
22
23#include "count_zeros.hpp"
24
25#include <stdexcept>
26
27namespace datasketches {
28
29template<typename A>
30cpc_union_alloc<A>::cpc_union_alloc(uint8_t lg_k, uint64_t seed, const A& allocator):
31lg_k(lg_k),
32seed(seed),
33accumulator(nullptr),
34bit_matrix(allocator)
35{
36 if (lg_k < cpc_constants::MIN_LG_K || lg_k > cpc_constants::MAX_LG_K) {
37 throw std::invalid_argument("lg_k must be >= " + std::to_string(cpc_constants::MIN_LG_K) + " and <= " + std::to_string(cpc_constants::MAX_LG_K) + ": " + std::to_string(lg_k));
38 }
39 accumulator = new (AllocCpc(allocator).allocate(1)) cpc_sketch_alloc<A>(lg_k, seed, allocator);
40}
41
42template<typename A>
44lg_k(other.lg_k),
45seed(other.seed),
46accumulator(other.accumulator),
47bit_matrix(other.bit_matrix)
48{
49 if (accumulator != nullptr) {
50 accumulator = new (AllocCpc(accumulator->get_allocator()).allocate(1)) cpc_sketch_alloc<A>(*other.accumulator);
51 }
52}
53
54template<typename A>
56lg_k(other.lg_k),
57seed(other.seed),
58accumulator(other.accumulator),
59bit_matrix(std::move(other.bit_matrix))
60{
61 other.accumulator = nullptr;
62}
63
64template<typename A>
66 if (accumulator != nullptr) {
67 AllocCpc allocator(accumulator->get_allocator());
68 accumulator->~cpc_sketch_alloc<A>();
69 allocator.deallocate(accumulator, 1);
70 }
71}
72
73template<typename A>
75 cpc_union_alloc<A> copy(other);
76 std::swap(lg_k, copy.lg_k);
77 seed = copy.seed;
78 std::swap(accumulator, copy.accumulator);
79 bit_matrix = std::move(copy.bit_matrix);
80 return *this;
81}
82
83template<typename A>
85 std::swap(lg_k, other.lg_k);
86 seed = other.seed;
87 std::swap(accumulator, other.accumulator);
88 bit_matrix = std::move(other.bit_matrix);
89 return *this;
90}
91
92template<typename A>
94 internal_update(sketch);
95}
96
97template<typename A>
99 internal_update(std::forward<cpc_sketch_alloc<A>>(sketch));
100}
101
102template<typename A>
103template<typename S>
105 const uint16_t seed_hash_union = compute_seed_hash(seed);
106 const uint16_t seed_hash_sketch = compute_seed_hash(sketch.seed);
107 if (seed_hash_union != seed_hash_sketch) {
108 throw std::invalid_argument("Incompatible seed hashes: " + std::to_string(seed_hash_union) + ", "
109 + std::to_string(seed_hash_sketch));
110 }
111 const auto src_flavor = sketch.determine_flavor();
112 if (cpc_sketch_alloc<A>::flavor::EMPTY == src_flavor) return;
113
114 if (sketch.get_lg_k() < lg_k) reduce_k(sketch.get_lg_k());
115 if (sketch.get_lg_k() < lg_k) throw std::logic_error("sketch lg_k < union lg_k");
116
117 if (accumulator == nullptr && bit_matrix.size() == 0) throw std::logic_error("both accumulator and bit matrix are absent");
118
119 if (cpc_sketch_alloc<A>::flavor::SPARSE == src_flavor && accumulator != nullptr) { // Case A
120 if (bit_matrix.size() > 0) throw std::logic_error("union bit_matrix is not expected");
121 const auto initial_dest_flavor = accumulator->determine_flavor();
122 if (cpc_sketch_alloc<A>::flavor::EMPTY != initial_dest_flavor &&
123 cpc_sketch_alloc<A>::flavor::SPARSE != initial_dest_flavor) throw std::logic_error("wrong flavor");
124
125 // The following partially fixes the snowplow problem provided that the K's are equal.
126 if (cpc_sketch_alloc<A>::flavor::EMPTY == initial_dest_flavor && lg_k == sketch.get_lg_k()) {
127 *accumulator = std::forward<S>(sketch);
128 return;
129 }
130
131 walk_table_updating_sketch(sketch.surprising_value_table);
132 const auto final_dst_flavor = accumulator->determine_flavor();
133 // if the accumulator has graduated beyond sparse, switch to a bit matrix representation
134 if (final_dst_flavor != cpc_sketch_alloc<A>::flavor::EMPTY && final_dst_flavor != cpc_sketch_alloc<A>::flavor::SPARSE) {
135 switch_to_bit_matrix();
136 }
137 return;
138 }
139
140 if (cpc_sketch_alloc<A>::flavor::SPARSE == src_flavor && bit_matrix.size() > 0) { // Case B
141 if (accumulator != nullptr) throw std::logic_error("union accumulator != null");
142 or_table_into_matrix(sketch.surprising_value_table);
143 return;
144 }
145
146 if (cpc_sketch_alloc<A>::flavor::HYBRID != src_flavor && cpc_sketch_alloc<A>::flavor::PINNED != src_flavor
147 && cpc_sketch_alloc<A>::flavor::SLIDING != src_flavor) throw std::logic_error("wrong flavor");
148
149 // source is past SPARSE mode, so make sure that dest is a bit matrix
150 if (accumulator != nullptr) {
151 if (bit_matrix.size() > 0) throw std::logic_error("union bit matrix is not expected");
152 const auto dst_flavor = accumulator->determine_flavor();
153 if (cpc_sketch_alloc<A>::flavor::EMPTY != dst_flavor && cpc_sketch_alloc<A>::flavor::SPARSE != dst_flavor) {
154 throw std::logic_error("wrong flavor");
155 }
156 switch_to_bit_matrix();
157 }
158 if (bit_matrix.size() == 0) throw std::logic_error("union bit_matrix is expected");
159
160 if (cpc_sketch_alloc<A>::flavor::HYBRID == src_flavor || cpc_sketch_alloc<A>::flavor::PINNED == src_flavor) { // Case C
161 or_window_into_matrix(sketch.sliding_window, sketch.window_offset, sketch.get_lg_k());
162 or_table_into_matrix(sketch.surprising_value_table);
163 return;
164 }
165
166 // SLIDING mode involves inverted logic, so we can't just walk the source sketch.
167 // Instead, we convert it to a bitMatrix that can be OR'ed into the destination.
168 if (cpc_sketch_alloc<A>::flavor::SLIDING != src_flavor) throw std::logic_error("wrong flavor"); // Case D
169 vector_u64 src_matrix = sketch.build_bit_matrix();
170 or_matrix_into_matrix(src_matrix, sketch.get_lg_k());
171}
172
173template<typename A>
175 if (accumulator != nullptr) {
176 if (bit_matrix.size() > 0) throw std::logic_error("bit_matrix is not expected");
177 return get_result_from_accumulator();
178 }
179 if (bit_matrix.size() == 0) throw std::logic_error("bit_matrix is expected");
180 return get_result_from_bit_matrix();
181}
182
183template<typename A>
185 if (lg_k != accumulator->get_lg_k()) throw std::logic_error("lg_k != accumulator->lg_k");
186 if (accumulator->get_num_coupons() == 0) {
187 return cpc_sketch_alloc<A>(lg_k, seed, accumulator->get_allocator());
188 }
189 if (accumulator->determine_flavor() != cpc_sketch_alloc<A>::flavor::SPARSE) throw std::logic_error("wrong flavor");
190 cpc_sketch_alloc<A> copy(*accumulator);
191 copy.was_merged = true;
192 return copy;
193}
194
195template<typename A>
196cpc_sketch_alloc<A> cpc_union_alloc<A>::get_result_from_bit_matrix() const {
197 const uint32_t k = 1 << lg_k;
198 const uint32_t num_coupons = count_bits_set_in_matrix(bit_matrix.data(), k);
199
200 const auto flavor = cpc_sketch_alloc<A>::determine_flavor(lg_k, num_coupons);
201 if (flavor != cpc_sketch_alloc<A>::flavor::HYBRID && flavor != cpc_sketch_alloc<A>::flavor::PINNED
202 && flavor != cpc_sketch_alloc<A>::flavor::SLIDING) throw std::logic_error("wrong flavor");
203
204 const uint8_t offset = cpc_sketch_alloc<A>::determine_correct_offset(lg_k, num_coupons);
205
206 vector_bytes sliding_window(k, 0, bit_matrix.get_allocator());
207 // don't need to zero the window's memory
208
209 // dynamically growing caused snowplow effect
210 uint8_t table_lg_size = lg_k - 4; // K/16; in some cases this will end up being oversized
211 if (table_lg_size < 2) table_lg_size = 2;
212 u32_table<A> table(table_lg_size, 6 + lg_k, bit_matrix.get_allocator());
213
214 // the following should work even when the offset is zero
215 const uint64_t mask_for_clearing_window = (static_cast<uint64_t>(0xff) << offset) ^ UINT64_MAX;
216 const uint64_t mask_for_flipping_early_zone = (static_cast<uint64_t>(1) << offset) - 1;
217 uint64_t all_surprises_ored = 0;
218
219 // The snowplow effect was caused by processing the rows in order,
220 // but we have fixed it by using a sufficiently large hash table.
221 for (uint32_t i = 0; i < k; i++) {
222 uint64_t pattern = bit_matrix[i];
223 sliding_window[i] = (pattern >> offset) & 0xff;
224 pattern &= mask_for_clearing_window;
225 pattern ^= mask_for_flipping_early_zone; // this flipping converts surprising 0's to 1's
226 all_surprises_ored |= pattern;
227 while (pattern != 0) {
228 const uint8_t col = count_trailing_zeros_in_u64(pattern);
229 pattern = pattern ^ (static_cast<uint64_t>(1) << col); // erase the 1
230 const uint32_t row_col = (i << 6) | col;
231 bool is_novel = table.maybe_insert(row_col);
232 if (!is_novel) throw std::logic_error("is_novel != true");
233 }
234 }
235
236 // at this point we could shrink an oversized hash table, but the relative waste isn't very big
237
238 uint8_t first_interesting_column = count_trailing_zeros_in_u64(all_surprises_ored);
239 if (first_interesting_column > offset) first_interesting_column = offset; // corner case
240
241 // HIP-related fields will contain zeros, and that is okay
242 return cpc_sketch_alloc<A>(lg_k, num_coupons, first_interesting_column, std::move(table), std::move(sliding_window), false, 0, 0, seed);
243}
244
245template<typename A>
246void cpc_union_alloc<A>::switch_to_bit_matrix() {
247 bit_matrix = accumulator->build_bit_matrix();
248 AllocCpc allocator(accumulator->get_allocator());
249 accumulator->~cpc_sketch_alloc<A>();
250 allocator.deallocate(accumulator, 1);
251 accumulator = nullptr;
252}
253
254template<typename A>
255void cpc_union_alloc<A>::walk_table_updating_sketch(const u32_table<A>& table) {
256 const uint32_t* slots = table.get_slots();
257 const uint32_t num_slots = 1 << table.get_lg_size();
258 const uint64_t dst_mask = (((1 << accumulator->get_lg_k()) - 1) << 6) | 63; // downsamples when dst lgK < src LgK
259
260 // Using a golden ratio stride fixes the snowplow effect.
261 const double golden = 0.6180339887498949025;
262 uint32_t stride = static_cast<uint32_t>(golden * static_cast<double>(num_slots));
263 if (stride < 2) throw std::logic_error("stride < 2");
264 if (stride == ((stride >> 1) << 1)) stride += 1; // force the stride to be odd
265 if (stride < 3 || stride >= num_slots) throw std::out_of_range("stride out of range");
266
267 for (uint32_t i = 0, j = 0; i < num_slots; i++, j += stride) {
268 j &= num_slots - 1;
269 const uint32_t row_col = slots[j];
270 if (row_col != UINT32_MAX) {
271 accumulator->row_col_update(row_col & dst_mask);
272 }
273 }
274}
275
276template<typename A>
277void cpc_union_alloc<A>::or_table_into_matrix(const u32_table<A>& table) {
278 const uint32_t* slots = table.get_slots();
279 const uint32_t num_slots = 1 << table.get_lg_size();
280 const uint64_t dest_mask = (1 << lg_k) - 1; // downsamples when dst lgK < sr LgK
281 for (uint32_t i = 0; i < num_slots; i++) {
282 const uint32_t row_col = slots[i];
283 if (row_col != UINT32_MAX) {
284 const uint8_t col = row_col & 63;
285 const uint32_t row = row_col >> 6;
286 bit_matrix[row & dest_mask] |= static_cast<uint64_t>(1) << col; // set the bit
287 }
288 }
289}
290
291template<typename A>
292void cpc_union_alloc<A>::or_window_into_matrix(const vector_bytes& sliding_window, uint8_t offset, uint8_t src_lg_k) {
293 if (lg_k > src_lg_k) throw std::logic_error("dst LgK > src LgK");
294 const uint64_t dst_mask = (1 << lg_k) - 1; // downsamples when dst lgK < src LgK
295 const uint32_t src_k = 1 << src_lg_k;
296 for (uint32_t src_row = 0; src_row < src_k; src_row++) {
297 bit_matrix[src_row & dst_mask] |= static_cast<uint64_t>(sliding_window[src_row]) << offset;
298 }
299}
300
301template<typename A>
302void cpc_union_alloc<A>::or_matrix_into_matrix(const vector_u64& src_matrix, uint8_t src_lg_k) {
303 if (lg_k > src_lg_k) throw std::logic_error("dst LgK > src LgK");
304 const uint64_t dst_mask = (1 << lg_k) - 1; // downsamples when dst lgK < src LgK
305 const uint32_t src_k = 1 << src_lg_k;
306 for (uint32_t src_row = 0; src_row < src_k; src_row++) {
307 bit_matrix[src_row & dst_mask] |= src_matrix[src_row];
308 }
309}
310
311template<typename A>
312void cpc_union_alloc<A>::reduce_k(uint8_t new_lg_k) {
313 if (new_lg_k >= lg_k) throw std::logic_error("new LgK >= union lgK");
314 if (accumulator == nullptr && bit_matrix.size() == 0) throw std::logic_error("both accumulator and bit_matrix are absent");
315
316 if (bit_matrix.size() > 0) { // downsample the unioner's bit matrix
317 if (accumulator != nullptr) throw std::logic_error("accumulator is not null");
318 vector_u64 old_matrix = std::move(bit_matrix);
319 const uint8_t old_lg_k = lg_k;
320 const uint32_t new_k = 1 << new_lg_k;
321 bit_matrix = vector_u64(new_k, 0, old_matrix.get_allocator());
322 lg_k = new_lg_k;
323 or_matrix_into_matrix(old_matrix, old_lg_k);
324 return;
325 }
326
327 if (accumulator != nullptr) { // downsample the unioner's sketch
328 if (bit_matrix.size() > 0) throw std::logic_error("bit_matrix is not expected");
329 if (!accumulator->is_empty()) {
330 cpc_sketch_alloc<A> old_accumulator(*accumulator);
331 *accumulator = cpc_sketch_alloc<A>(new_lg_k, seed, old_accumulator.get_allocator());
332 walk_table_updating_sketch(old_accumulator.surprising_value_table);
333 }
334 lg_k = new_lg_k;
335
336 const auto final_new_flavor = accumulator->determine_flavor();
337 // if the new sketch has graduated beyond sparse, convert to bit_matrix
338 if (final_new_flavor != cpc_sketch_alloc<A>::flavor::EMPTY &&
339 final_new_flavor != cpc_sketch_alloc<A>::flavor::SPARSE) {
340 switch_to_bit_matrix();
341 }
342 return;
343 }
344
345 throw std::logic_error("invalid state");
346}
347
348} /* namespace datasketches */
349
350#endif
High performance C++ implementation of Compressed Probabilistic Counting (CPC) Sketch.
Definition cpc_sketch.hpp:64
High performance C++ implementation of Compressed Probabilistic Counting (CPC) Union.
Definition cpc_union.hpp:40
void update(const cpc_sketch_alloc< A > &sketch)
This method is to update the union with a given sketch (lvalue)
Definition cpc_union_impl.hpp:93
cpc_sketch_alloc< A > get_result() const
This method produces a copy of the current state of the union as a sketch.
Definition cpc_union_impl.hpp:174
cpc_union_alloc(uint8_t lg_k=cpc_constants::DEFAULT_LG_K, uint64_t seed=DEFAULT_SEED, const A &allocator=A())
Creates an instance of the union given the lg_k parameter and hash seed.
Definition cpc_union_impl.hpp:30
cpc_union_alloc< A > & operator=(const cpc_union_alloc< A > &other)
Copy assignment.
Definition cpc_union_impl.hpp:74
const uint8_t MIN_LG_K
min log2 of K
Definition cpc_common.hpp:32
const uint8_t MAX_LG_K
max log2 of K
Definition cpc_common.hpp:34
DataSketches namespace.
Definition binomial_bounds.hpp:38