20 #ifndef CPC_UNION_IMPL_HPP_
21 #define CPC_UNION_IMPL_HPP_
23 #include "count_zeros.hpp"
39 accumulator =
new (AllocCpc(allocator).allocate(1))
cpc_sketch_alloc<A>(lg_k, seed, allocator);
46 accumulator(other.accumulator),
47 bit_matrix(other.bit_matrix)
49 if (accumulator !=
nullptr) {
50 accumulator =
new (AllocCpc(accumulator->get_allocator()).allocate(1))
cpc_sketch_alloc<A>(*other.accumulator);
58 accumulator(other.accumulator),
59 bit_matrix(std::move(other.bit_matrix))
61 other.accumulator =
nullptr;
66 if (accumulator !=
nullptr) {
67 AllocCpc allocator(accumulator->get_allocator());
68 accumulator->~cpc_sketch_alloc<A>();
69 allocator.deallocate(accumulator, 1);
76 std::swap(lg_k, copy.lg_k);
78 std::swap(accumulator, copy.accumulator);
79 bit_matrix = std::move(copy.bit_matrix);
85 std::swap(lg_k, other.lg_k);
87 std::swap(accumulator, other.accumulator);
88 bit_matrix = std::move(other.bit_matrix);
94 internal_update(sketch);
105 const uint16_t seed_hash_union = compute_seed_hash(seed);
106 const uint16_t seed_hash_sketch = compute_seed_hash(sketch.seed);
107 if (seed_hash_union != seed_hash_sketch) {
108 throw std::invalid_argument(
"Incompatible seed hashes: " + std::to_string(seed_hash_union) +
", "
109 + std::to_string(seed_hash_sketch));
111 const auto src_flavor = sketch.determine_flavor();
112 if (cpc_sketch_alloc<A>::flavor::EMPTY == src_flavor)
return;
114 if (sketch.get_lg_k() < lg_k) reduce_k(sketch.get_lg_k());
115 if (sketch.get_lg_k() < lg_k)
throw std::logic_error(
"sketch lg_k < union lg_k");
117 if (accumulator ==
nullptr && bit_matrix.size() == 0)
throw std::logic_error(
"both accumulator and bit matrix are absent");
119 if (cpc_sketch_alloc<A>::flavor::SPARSE == src_flavor && accumulator !=
nullptr) {
120 if (bit_matrix.size() > 0)
throw std::logic_error(
"union bit_matrix is not expected");
121 const auto initial_dest_flavor = accumulator->determine_flavor();
122 if (cpc_sketch_alloc<A>::flavor::EMPTY != initial_dest_flavor &&
123 cpc_sketch_alloc<A>::flavor::SPARSE != initial_dest_flavor)
throw std::logic_error(
"wrong flavor");
126 if (cpc_sketch_alloc<A>::flavor::EMPTY == initial_dest_flavor && lg_k == sketch.get_lg_k()) {
127 *accumulator = std::forward<S>(sketch);
131 walk_table_updating_sketch(sketch.surprising_value_table);
132 const auto final_dst_flavor = accumulator->determine_flavor();
134 if (final_dst_flavor != cpc_sketch_alloc<A>::flavor::EMPTY && final_dst_flavor != cpc_sketch_alloc<A>::flavor::SPARSE) {
135 switch_to_bit_matrix();
140 if (cpc_sketch_alloc<A>::flavor::SPARSE == src_flavor && bit_matrix.size() > 0) {
141 if (accumulator !=
nullptr)
throw std::logic_error(
"union accumulator != null");
142 or_table_into_matrix(sketch.surprising_value_table);
146 if (cpc_sketch_alloc<A>::flavor::HYBRID != src_flavor && cpc_sketch_alloc<A>::flavor::PINNED != src_flavor
147 && cpc_sketch_alloc<A>::flavor::SLIDING != src_flavor)
throw std::logic_error(
"wrong flavor");
150 if (accumulator !=
nullptr) {
151 if (bit_matrix.size() > 0)
throw std::logic_error(
"union bit matrix is not expected");
152 const auto dst_flavor = accumulator->determine_flavor();
153 if (cpc_sketch_alloc<A>::flavor::EMPTY != dst_flavor && cpc_sketch_alloc<A>::flavor::SPARSE != dst_flavor) {
154 throw std::logic_error(
"wrong flavor");
156 switch_to_bit_matrix();
158 if (bit_matrix.size() == 0)
throw std::logic_error(
"union bit_matrix is expected");
160 if (cpc_sketch_alloc<A>::flavor::HYBRID == src_flavor || cpc_sketch_alloc<A>::flavor::PINNED == src_flavor) {
161 or_window_into_matrix(sketch.sliding_window, sketch.window_offset, sketch.get_lg_k());
162 or_table_into_matrix(sketch.surprising_value_table);
168 if (cpc_sketch_alloc<A>::flavor::SLIDING != src_flavor)
throw std::logic_error(
"wrong flavor");
169 vector_u64 src_matrix = sketch.build_bit_matrix();
170 or_matrix_into_matrix(src_matrix, sketch.get_lg_k());
175 if (accumulator !=
nullptr) {
176 if (bit_matrix.size() > 0)
throw std::logic_error(
"bit_matrix is not expected");
177 return get_result_from_accumulator();
179 if (bit_matrix.size() == 0)
throw std::logic_error(
"bit_matrix is expected");
180 return get_result_from_bit_matrix();
185 if (lg_k != accumulator->get_lg_k())
throw std::logic_error(
"lg_k != accumulator->lg_k");
186 if (accumulator->get_num_coupons() == 0) {
189 if (accumulator->determine_flavor() != cpc_sketch_alloc<A>::flavor::SPARSE)
throw std::logic_error(
"wrong flavor");
190 cpc_sketch_alloc<A> copy(*accumulator);
191 copy.was_merged =
true;
196 cpc_sketch_alloc<A> cpc_union_alloc<A>::get_result_from_bit_matrix()
const {
197 const uint32_t k = 1 << lg_k;
198 const uint32_t num_coupons = count_bits_set_in_matrix(bit_matrix.data(), k);
200 const auto flavor = cpc_sketch_alloc<A>::determine_flavor(lg_k, num_coupons);
201 if (flavor != cpc_sketch_alloc<A>::flavor::HYBRID && flavor != cpc_sketch_alloc<A>::flavor::PINNED
202 && flavor != cpc_sketch_alloc<A>::flavor::SLIDING)
throw std::logic_error(
"wrong flavor");
204 const uint8_t offset = cpc_sketch_alloc<A>::determine_correct_offset(lg_k, num_coupons);
206 vector_bytes sliding_window(k, 0, bit_matrix.get_allocator());
210 uint8_t table_lg_size = lg_k - 4;
211 if (table_lg_size < 2) table_lg_size = 2;
212 u32_table<A> table(table_lg_size, 6 + lg_k, bit_matrix.get_allocator());
215 const uint64_t mask_for_clearing_window = (
static_cast<uint64_t
>(0xff) << offset) ^ UINT64_MAX;
216 const uint64_t mask_for_flipping_early_zone = (
static_cast<uint64_t
>(1) << offset) - 1;
217 uint64_t all_surprises_ored = 0;
221 for (uint32_t i = 0; i < k; i++) {
222 uint64_t pattern = bit_matrix[i];
223 sliding_window[i] = (pattern >> offset) & 0xff;
224 pattern &= mask_for_clearing_window;
225 pattern ^= mask_for_flipping_early_zone;
226 all_surprises_ored |= pattern;
227 while (pattern != 0) {
228 const uint8_t col = count_trailing_zeros_in_u64(pattern);
229 pattern = pattern ^ (
static_cast<uint64_t
>(1) << col);
230 const uint32_t row_col = (i << 6) | col;
231 bool is_novel = table.maybe_insert(row_col);
232 if (!is_novel)
throw std::logic_error(
"is_novel != true");
238 uint8_t first_interesting_column = count_trailing_zeros_in_u64(all_surprises_ored);
239 if (first_interesting_column > offset) first_interesting_column = offset;
242 return cpc_sketch_alloc<A>(lg_k, num_coupons, first_interesting_column, std::move(table), std::move(sliding_window),
false, 0, 0, seed);
246 void cpc_union_alloc<A>::switch_to_bit_matrix() {
247 bit_matrix = accumulator->build_bit_matrix();
248 AllocCpc allocator(accumulator->get_allocator());
249 accumulator->~cpc_sketch_alloc<A>();
250 allocator.deallocate(accumulator, 1);
251 accumulator =
nullptr;
255 void cpc_union_alloc<A>::walk_table_updating_sketch(
const u32_table<A>& table) {
256 const uint32_t* slots = table.get_slots();
257 const uint32_t num_slots = 1 << table.get_lg_size();
258 const uint64_t dst_mask = (((1 << accumulator->get_lg_k()) - 1) << 6) | 63;
261 const double golden = 0.6180339887498949025;
262 uint32_t stride =
static_cast<uint32_t
>(golden *
static_cast<double>(num_slots));
263 if (stride < 2)
throw std::logic_error(
"stride < 2");
264 if (stride == ((stride >> 1) << 1)) stride += 1;
265 if (stride < 3 || stride >= num_slots)
throw std::out_of_range(
"stride out of range");
267 for (uint32_t i = 0, j = 0; i < num_slots; i++, j += stride) {
269 const uint32_t row_col = slots[j];
270 if (row_col != UINT32_MAX) {
271 accumulator->row_col_update(row_col & dst_mask);
277 void cpc_union_alloc<A>::or_table_into_matrix(
const u32_table<A>& table) {
278 const uint32_t* slots = table.get_slots();
279 const uint32_t num_slots = 1 << table.get_lg_size();
280 const uint64_t dest_mask = (1 << lg_k) - 1;
281 for (uint32_t i = 0; i < num_slots; i++) {
282 const uint32_t row_col = slots[i];
283 if (row_col != UINT32_MAX) {
284 const uint8_t col = row_col & 63;
285 const uint32_t row = row_col >> 6;
286 bit_matrix[row & dest_mask] |=
static_cast<uint64_t
>(1) << col;
292 void cpc_union_alloc<A>::or_window_into_matrix(
const vector_bytes& sliding_window, uint8_t offset, uint8_t src_lg_k) {
293 if (lg_k > src_lg_k)
throw std::logic_error(
"dst LgK > src LgK");
294 const uint64_t dst_mask = (1 << lg_k) - 1;
295 const uint32_t src_k = 1 << src_lg_k;
296 for (uint32_t src_row = 0; src_row < src_k; src_row++) {
297 bit_matrix[src_row & dst_mask] |=
static_cast<uint64_t
>(sliding_window[src_row]) << offset;
302 void cpc_union_alloc<A>::or_matrix_into_matrix(
const vector_u64& src_matrix, uint8_t src_lg_k) {
303 if (lg_k > src_lg_k)
throw std::logic_error(
"dst LgK > src LgK");
304 const uint64_t dst_mask = (1 << lg_k) - 1;
305 const uint32_t src_k = 1 << src_lg_k;
306 for (uint32_t src_row = 0; src_row < src_k; src_row++) {
307 bit_matrix[src_row & dst_mask] |= src_matrix[src_row];
312 void cpc_union_alloc<A>::reduce_k(uint8_t new_lg_k) {
313 if (new_lg_k >= lg_k)
throw std::logic_error(
"new LgK >= union lgK");
314 if (accumulator ==
nullptr && bit_matrix.size() == 0)
throw std::logic_error(
"both accumulator and bit_matrix are absent");
316 if (bit_matrix.size() > 0) {
317 if (accumulator !=
nullptr)
throw std::logic_error(
"accumulator is not null");
318 vector_u64 old_matrix = std::move(bit_matrix);
319 const uint8_t old_lg_k = lg_k;
320 const uint32_t new_k = 1 << new_lg_k;
321 bit_matrix = vector_u64(new_k, 0, old_matrix.get_allocator());
323 or_matrix_into_matrix(old_matrix, old_lg_k);
327 if (accumulator !=
nullptr) {
328 if (bit_matrix.size() > 0)
throw std::logic_error(
"bit_matrix is not expected");
329 if (!accumulator->is_empty()) {
330 cpc_sketch_alloc<A> old_accumulator(*accumulator);
331 *accumulator = cpc_sketch_alloc<A>(new_lg_k, seed, old_accumulator.get_allocator());
332 walk_table_updating_sketch(old_accumulator.surprising_value_table);
336 const auto final_new_flavor = accumulator->determine_flavor();
338 if (final_new_flavor != cpc_sketch_alloc<A>::flavor::EMPTY &&
339 final_new_flavor != cpc_sketch_alloc<A>::flavor::SPARSE) {
340 switch_to_bit_matrix();
345 throw std::logic_error(
"invalid state");
High performance C++ implementation of Compressed Probabilistic Counting (CPC) Sketch.
Definition: cpc_sketch.hpp:64
High performance C++ implementation of Compressed Probabilistic Counting (CPC) Union.
Definition: cpc_union.hpp:40
void update(const cpc_sketch_alloc< A > &sketch)
This method is to update the union with a given sketch (lvalue)
Definition: cpc_union_impl.hpp:93
cpc_sketch_alloc< A > get_result() const
This method produces a copy of the current state of the union as a sketch.
Definition: cpc_union_impl.hpp:174
cpc_union_alloc(uint8_t lg_k=cpc_constants::DEFAULT_LG_K, uint64_t seed=DEFAULT_SEED, const A &allocator=A())
Creates an instance of the union given the lg_k parameter and hash seed.
Definition: cpc_union_impl.hpp:30
cpc_union_alloc< A > & operator=(const cpc_union_alloc< A > &other)
Copy assignment.
Definition: cpc_union_impl.hpp:74
const uint8_t MIN_LG_K
min log2 of K
Definition: cpc_common.hpp:32
const uint8_t MAX_LG_K
max log2 of K
Definition: cpc_common.hpp:34
DataSketches namespace.
Definition: binomial_bounds.hpp:38