datasketches-cpp
Loading...
Searching...
No Matches
cpc_sketch_impl.hpp
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#ifndef CPC_SKETCH_IMPL_HPP_
21#define CPC_SKETCH_IMPL_HPP_
22
23#include <stdexcept>
24#include <cmath>
25#include <cstring>
26#include <sstream>
27
28#include "cpc_confidence.hpp"
29#include "kxp_byte_lookup.hpp"
30#include "inv_pow2_table.hpp"
31#include "cpc_util.hpp"
32#include "icon_estimator.hpp"
33#include "serde.hpp"
34#include "count_zeros.hpp"
35
36namespace datasketches {
37
38template<typename A>
39void cpc_init() {
40 get_compressor<A>(); // this initializes a global static instance of the compressor on the first use
41}
42
43template<typename A>
44cpc_sketch_alloc<A>::cpc_sketch_alloc(uint8_t lg_k, uint64_t seed, const A& allocator):
45lg_k(lg_k),
46seed(seed),
47was_merged(false),
48num_coupons(0),
49surprising_value_table(2, 6 + lg_k, allocator),
50sliding_window(allocator),
51window_offset(0),
52first_interesting_column(0),
53kxp(1 << lg_k),
54hip_est_accum(0)
55{
56 check_lg_k(lg_k);
57}
58
59template<typename A>
61 return sliding_window.get_allocator();
62}
63
64template<typename A>
66 return lg_k;
67}
68
69template<typename A>
71 return num_coupons == 0;
72}
73
74template<typename A>
76 if (!was_merged) return get_hip_estimate();
77 return get_icon_estimate();
78}
79
80template<typename A>
82 return hip_est_accum;
83}
84
85template<typename A>
86double cpc_sketch_alloc<A>::get_icon_estimate() const {
87 return compute_icon_estimate(lg_k, num_coupons);
88}
89
90template<typename A>
91double cpc_sketch_alloc<A>::get_lower_bound(unsigned kappa) const {
92 if (kappa < 1 || kappa > 3) {
93 throw std::invalid_argument("kappa must be 1, 2 or 3");
94 }
95 if (!was_merged) return get_hip_confidence_lb<A>(*this, kappa);
96 return get_icon_confidence_lb<A>(*this, kappa);
97}
98
99template<typename A>
100double cpc_sketch_alloc<A>::get_upper_bound(unsigned kappa) const {
101 if (kappa < 1 || kappa > 3) {
102 throw std::invalid_argument("kappa must be 1, 2 or 3");
103 }
104 if (!was_merged) return get_hip_confidence_ub<A>(*this, kappa);
105 return get_icon_confidence_ub<A>(*this, kappa);
106}
107
108template<typename A>
109void cpc_sketch_alloc<A>::update(const std::string& value) {
110 if (value.empty()) return;
111 update(value.c_str(), value.length());
112}
113
114template<typename A>
115void cpc_sketch_alloc<A>::update(uint64_t value) {
116 update(&value, sizeof(value));
117}
118
119template<typename A>
120void cpc_sketch_alloc<A>::update(int64_t value) {
121 update(&value, sizeof(value));
122}
123
124template<typename A>
125void cpc_sketch_alloc<A>::update(uint32_t value) {
126 update(static_cast<int32_t>(value));
127}
128
129template<typename A>
130void cpc_sketch_alloc<A>::update(int32_t value) {
131 update(static_cast<int64_t>(value));
132}
133
134template<typename A>
135void cpc_sketch_alloc<A>::update(uint16_t value) {
136 update(static_cast<int16_t>(value));
137}
138
139template<typename A>
140void cpc_sketch_alloc<A>::update(int16_t value) {
141 update(static_cast<int64_t>(value));
142}
143
144template<typename A>
145void cpc_sketch_alloc<A>::update(uint8_t value) {
146 update(static_cast<int8_t>(value));
147}
148
149template<typename A>
150void cpc_sketch_alloc<A>::update(int8_t value) {
151 update(static_cast<int64_t>(value));
152}
153
154template<typename A>
155void cpc_sketch_alloc<A>::update(double value) {
156 union {
157 int64_t long_value;
158 double double_value;
159 } ldu;
160 if (value == 0.0) {
161 ldu.double_value = 0.0; // canonicalize -0.0 to 0.0
162 } else if (std::isnan(value)) {
163 ldu.long_value = 0x7ff8000000000000L; // canonicalize NaN using value from Java's Double.doubleToLongBits()
164 } else {
165 ldu.double_value = value;
166 }
167 update(&ldu, sizeof(ldu));
168}
169
170template<typename A>
172 update(static_cast<double>(value));
173}
174
175static inline uint32_t row_col_from_two_hashes(uint64_t hash0, uint64_t hash1, uint8_t lg_k) {
176 if (lg_k > 26) throw std::logic_error("lg_k > 26");
177 const uint32_t k = 1 << lg_k;
178 uint8_t col = count_leading_zeros_in_u64(hash1); // 0 <= col <= 64
179 if (col > 63) col = 63; // clip so that 0 <= col <= 63
180 const uint32_t row = hash0 & (k - 1);
181 uint32_t row_col = (row << 6) | col;
182 // To avoid the hash table's "empty" value, we change the row of the following pair.
183 // This case is extremely unlikely, but we might as well handle it.
184 if (row_col == UINT32_MAX) row_col ^= 1 << 6;
185 return row_col;
186}
187
188template<typename A>
189void cpc_sketch_alloc<A>::update(const void* value, size_t size) {
190 HashState hashes;
191 MurmurHash3_x64_128(value, size, seed, hashes);
192 row_col_update(row_col_from_two_hashes(hashes.h1, hashes.h2, lg_k));
193}
194
195template<typename A>
196void cpc_sketch_alloc<A>::row_col_update(uint32_t row_col) {
197 const uint8_t col = row_col & 63;
198 if (col < first_interesting_column) return; // important speed optimization
199 // window size is 0 until sketch is promoted from sparse to windowed
200 if (sliding_window.size() == 0) {
201 update_sparse(row_col);
202 } else {
203 update_windowed(row_col);
204 }
205}
206
207template<typename A>
208void cpc_sketch_alloc<A>::update_sparse(uint32_t row_col) {
209 const uint32_t k = 1 << lg_k;
210 const uint64_t c32pre = static_cast<uint64_t>(num_coupons) << 5;
211 if (c32pre >= 3 * k) throw std::logic_error("c32pre >= 3 * k"); // C < 3K/32, in other words flavor == SPARSE
212 bool is_novel = surprising_value_table.maybe_insert(row_col);
213 if (is_novel) {
214 num_coupons++;
215 update_hip(row_col);
216 const uint64_t c32post = static_cast<uint64_t>(num_coupons) << 5;
217 if (c32post >= 3 * k) promote_sparse_to_windowed(); // C >= 3K/32
218 }
219}
220
221// the flavor is HYBRID, PINNED, or SLIDING
222template<typename A>
223void cpc_sketch_alloc<A>::update_windowed(uint32_t row_col) {
224 if (window_offset > 56) throw std::logic_error("wrong window offset");
225 const uint32_t k = 1 << lg_k;
226 const uint64_t c32pre = static_cast<uint64_t>(num_coupons) << 5;
227 if (c32pre < 3 * k) throw std::logic_error("c32pre < 3 * k"); // C < 3K/32, in other words flavor >= HYBRID
228 const uint64_t c8pre = static_cast<uint64_t>(num_coupons) << 3;
229 const uint64_t w8pre = static_cast<uint64_t>(window_offset) << 3;
230 if (c8pre >= (27 + w8pre) * k) throw std::logic_error("c8pre is wrong"); // C < (K * 27/8) + (K * window_offset)
231
232 bool is_novel = false;
233 const uint8_t col = row_col & 63;
234
235 if (col < window_offset) { // track the surprising 0's "before" the window
236 is_novel = surprising_value_table.maybe_delete(row_col); // inverted logic
237 } else if (col < window_offset + 8) { // track the 8 bits inside the window
238 if (col < window_offset) throw std::logic_error("col < window_offset");
239 const uint32_t row = row_col >> 6;
240 const uint8_t old_bits = sliding_window[row];
241 const uint8_t new_bits = old_bits | (1 << (col - window_offset));
242 if (new_bits != old_bits) {
243 sliding_window[row] = new_bits;
244 is_novel = true;
245 }
246 } else { // track the surprising 1's "after" the window
247 if (col < window_offset + 8) throw std::logic_error("col < window_offset + 8");
248 is_novel = surprising_value_table.maybe_insert(row_col); // normal logic
249 }
250
251 if (is_novel) {
252 num_coupons++;
253 update_hip(row_col);
254 const uint64_t c8post = static_cast<uint64_t>(num_coupons) << 3;
255 if (c8post >= (27 + w8pre) * k) {
256 move_window();
257 if (window_offset < 1 || window_offset > 56) throw std::logic_error("wrong window offset");
258 const uint64_t w8post = static_cast<uint64_t>(window_offset) << 3;
259 if (c8post >= (27 + w8post) * k) throw std::logic_error("c8pre is wrong"); // C < (K * 27/8) + (K * window_offset)
260 }
261 }
262}
263
264// Call this whenever a new coupon has been collected.
265template<typename A>
266void cpc_sketch_alloc<A>::update_hip(uint32_t row_col) {
267 const uint32_t k = 1 << lg_k;
268 const uint8_t col = row_col & 63;
269 const double one_over_p = static_cast<double>(k) / kxp;
270 hip_est_accum += one_over_p;
271 kxp -= INVERSE_POWERS_OF_2[col + 1]; // notice the "+1"
272}
273
274// In terms of flavor, this promotes SPARSE to HYBRID
275template<typename A>
276void cpc_sketch_alloc<A>::promote_sparse_to_windowed() {
277 const uint32_t k = 1 << lg_k;
278 const uint64_t c32 = static_cast<uint64_t>(num_coupons) << 5;
279 if (!(c32 == 3 * k || (lg_k == 4 && c32 > 3 * k))) throw std::logic_error("wrong c32");
280
281 sliding_window.resize(k, 0); // zero the memory (because we will be OR'ing into it)
282
283 u32_table<A> new_table(2, 6 + lg_k, sliding_window.get_allocator());
284
285 const uint32_t* old_slots = surprising_value_table.get_slots();
286 const uint32_t old_num_slots = 1 << surprising_value_table.get_lg_size();
287
288 if (window_offset != 0) throw std::logic_error("window_offset != 0");
289
290 for (uint32_t i = 0; i < old_num_slots; i++) {
291 const uint32_t row_col = old_slots[i];
292 if (row_col != UINT32_MAX) {
293 const uint8_t col = row_col & 63;
294 if (col < 8) {
295 const uint32_t row = row_col >> 6;
296 sliding_window[row] |= 1 << col;
297 } else {
298 // cannot use u32_table::must_insert(), because it doesn't provide for growth
299 const bool is_novel = new_table.maybe_insert(row_col);
300 if (!is_novel) throw std::logic_error("is_novel != true");
301 }
302 }
303 }
304
305 surprising_value_table = std::move(new_table);
306}
307
308template<typename A>
309void cpc_sketch_alloc<A>::move_window() {
310 const uint8_t new_offset = window_offset + 1;
311 if (new_offset > 56) throw std::logic_error("new_offset > 56");
312 if (new_offset != determine_correct_offset(lg_k, num_coupons)) throw std::logic_error("new_offset is wrong");
313
314 if (sliding_window.size() == 0) throw std::logic_error("no sliding window");
315 const uint32_t k = 1 << lg_k;
316
317 // Construct the full-sized bit matrix that corresponds to the sketch
318 vector_u64 bit_matrix = build_bit_matrix();
319
320 // refresh the KXP register on every 8th window shift.
321 if ((new_offset & 0x7) == 0) refresh_kxp(bit_matrix.data());
322
323 surprising_value_table.clear(); // the new number of surprises will be about the same
324
325 const uint64_t mask_for_clearing_window = (static_cast<uint64_t>(0xff) << new_offset) ^ UINT64_MAX;
326 const uint64_t mask_for_flipping_early_zone = (static_cast<uint64_t>(1) << new_offset) - 1;
327 uint64_t all_surprises_ored = 0;
328
329 for (uint32_t i = 0; i < k; i++) {
330 uint64_t pattern = bit_matrix[i];
331 sliding_window[i] = (pattern >> new_offset) & 0xff;
332 pattern &= mask_for_clearing_window;
333 // The following line converts surprising 0's to 1's in the "early zone",
334 // (and vice versa, which is essential for this procedure's O(k) time cost).
335 pattern ^= mask_for_flipping_early_zone;
336 all_surprises_ored |= pattern; // a cheap way to recalculate first_interesting_column
337 while (pattern != 0) {
338 const uint8_t col = count_trailing_zeros_in_u64(pattern);
339 pattern = pattern ^ (static_cast<uint64_t>(1) << col); // erase the 1
340 const uint32_t row_col = (i << 6) | col;
341 const bool is_novel = surprising_value_table.maybe_insert(row_col);
342 if (!is_novel) throw std::logic_error("is_novel != true");
343 }
344 }
345
346 window_offset = new_offset;
347
348 first_interesting_column = count_trailing_zeros_in_u64(all_surprises_ored);
349 if (first_interesting_column > new_offset) first_interesting_column = new_offset; // corner case
350}
351
352// The KXP register is a double with roughly 50 bits of precision, but
353// it might need roughly 90 bits to track the value with perfect accuracy.
354// Therefore we recalculate KXP occasionally from the sketch's full bitmatrix
355// so that it will reflect changes that were previously outside the mantissa.
356template<typename A>
357void cpc_sketch_alloc<A>::refresh_kxp(const uint64_t* bit_matrix) {
358 const uint32_t k = 1 << lg_k;
359
360 // for improved numerical accuracy, we separately sum the bytes of the U64's
361 double byte_sums[8]; // allocating on the stack
362 std::fill(byte_sums, byte_sums + 8, 0);
363
364 for (size_t i = 0; i < k; i++) {
365 uint64_t word = bit_matrix[i];
366 for (unsigned j = 0; j < 8; j++) {
367 const uint8_t byte = word & 0xff;
368 byte_sums[j] += KXP_BYTE_TABLE[byte];
369 word >>= 8;
370 }
371 }
372
373 double total = 0.0;
374 for (int j = 7; j >= 0; j--) { // the reverse order is important
375 const double factor = INVERSE_POWERS_OF_2[8 * j]; // pow (256.0, (-1.0 * ((double) j)));
376 total += factor * byte_sums[j];
377 }
378
379 kxp = total;
380}
381
382template<typename A>
384 // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements.
385 // The stream does not support passing an allocator instance, and alternatives are complicated.
386 std::ostringstream os;
387 os << "### CPC sketch summary:" << std::endl;
388 os << " lg_k : " << std::to_string(lg_k) << std::endl;
389 os << " seed hash : " << std::hex << compute_seed_hash(seed) << std::dec << std::endl;
390 os << " C : " << num_coupons << std::endl;
391 os << " flavor : " << determine_flavor() << std::endl;
392 os << " merged : " << (was_merged ? "true" : "false") << std::endl;
393 if (!was_merged) {
394 os << " HIP estimate : " << hip_est_accum << std::endl;
395 os << " kxp : " << kxp << std::endl;
396 }
397 os << " interesting col: " << std::to_string(first_interesting_column) << std::endl;
398 os << " table entries : " << surprising_value_table.get_num_items() << std::endl;
399 os << " window : " << (sliding_window.size() == 0 ? "not " : "") << "allocated" << std::endl;
400 if (sliding_window.size() > 0) {
401 os << " window offset : " << std::to_string(window_offset) << std::endl;
402 }
403 os << "### End sketch summary" << std::endl;
404 return string<A>(os.str().c_str(), sliding_window.get_allocator());
405}
406
407template<typename A>
408void cpc_sketch_alloc<A>::serialize(std::ostream& os) const {
409 compressed_state<A> compressed(A(sliding_window.get_allocator()));
410 compressed.table_data_words = 0;
411 compressed.table_num_entries = 0;
412 compressed.window_data_words = 0;
413 get_compressor<A>().compress(*this, compressed);
414 const bool has_hip = !was_merged;
415 const bool has_table = compressed.table_data.size() > 0;
416 const bool has_window = compressed.window_data.size() > 0;
417 const uint8_t preamble_ints = get_preamble_ints(num_coupons, has_hip, has_table, has_window);
418 write(os, preamble_ints);
419 const uint8_t serial_version = SERIAL_VERSION;
420 write(os, serial_version);
421 const uint8_t family = FAMILY;
422 write(os, family);
423 write(os, lg_k);
424 write(os, first_interesting_column);
425 const uint8_t flags_byte(
426 (1 << flags::IS_COMPRESSED)
427 | (has_hip ? 1 << flags::HAS_HIP : 0)
428 | (has_table ? 1 << flags::HAS_TABLE : 0)
429 | (has_window ? 1 << flags::HAS_WINDOW : 0)
430 );
431 write(os, flags_byte);
432 const uint16_t seed_hash(compute_seed_hash(seed));
433 write(os, seed_hash);
434 if (!is_empty()) {
435 write(os, num_coupons);
436 if (has_table && has_window) {
437 // if there is no window it is the same as number of coupons
438 write(os, compressed.table_num_entries);
439 // HIP values can be in two different places in the sequence of fields
440 // this is the first HIP decision point
441 if (has_hip) write_hip(os);
442 }
443 if (has_table) {
444 write(os, compressed.table_data_words);
445 }
446 if (has_window) {
447 write(os, compressed.window_data_words);
448 }
449 // this is the second HIP decision point
450 if (has_hip && !(has_table && has_window)) write_hip(os);
451 if (has_window) {
452 write(os, compressed.window_data.data(), compressed.window_data_words * sizeof(uint32_t));
453 }
454 if (has_table) {
455 write(os, compressed.table_data.data(), compressed.table_data_words * sizeof(uint32_t));
456 }
457 }
458}
459
460template<typename A>
461auto cpc_sketch_alloc<A>::serialize(unsigned header_size_bytes) const -> vector_bytes {
462 compressed_state<A> compressed(sliding_window.get_allocator());
463 compressed.table_data_words = 0;
464 compressed.table_num_entries = 0;
465 compressed.window_data_words = 0;
466 get_compressor<A>().compress(*this, compressed);
467 const bool has_hip = !was_merged;
468 const bool has_table = compressed.table_data.size() > 0;
469 const bool has_window = compressed.window_data.size() > 0;
470 const uint8_t preamble_ints = get_preamble_ints(num_coupons, has_hip, has_table, has_window);
471 const size_t size = header_size_bytes + (preamble_ints + compressed.table_data_words + compressed.window_data_words) * sizeof(uint32_t);
472 vector_bytes bytes(size, 0, sliding_window.get_allocator());
473 uint8_t* ptr = bytes.data() + header_size_bytes;
474 ptr += copy_to_mem(preamble_ints, ptr);
475 const uint8_t serial_version = SERIAL_VERSION;
476 ptr += copy_to_mem(serial_version, ptr);
477 const uint8_t family = FAMILY;
478 ptr += copy_to_mem(family, ptr);
479 ptr += copy_to_mem(lg_k, ptr);
480 ptr += copy_to_mem(first_interesting_column, ptr);
481 const uint8_t flags_byte(
482 (1 << flags::IS_COMPRESSED)
483 | (has_hip ? 1 << flags::HAS_HIP : 0)
484 | (has_table ? 1 << flags::HAS_TABLE : 0)
485 | (has_window ? 1 << flags::HAS_WINDOW : 0)
486 );
487 ptr += copy_to_mem(flags_byte, ptr);
488 const uint16_t seed_hash = compute_seed_hash(seed);
489 ptr += copy_to_mem(seed_hash, ptr);
490 if (!is_empty()) {
491 ptr += copy_to_mem(num_coupons, ptr);
492 if (has_table && has_window) {
493 // if there is no window it is the same as number of coupons
494 ptr += copy_to_mem(compressed.table_num_entries, ptr);
495 // HIP values can be in two different places in the sequence of fields
496 // this is the first HIP decision point
497 if (has_hip) ptr += copy_hip_to_mem(ptr);
498 }
499 if (has_table) {
500 ptr += copy_to_mem(compressed.table_data_words, ptr);
501 }
502 if (has_window) {
503 ptr += copy_to_mem(compressed.window_data_words, ptr);
504 }
505 // this is the second HIP decision point
506 if (has_hip && !(has_table && has_window)) ptr += copy_hip_to_mem(ptr);
507 if (has_window) {
508 ptr += copy_to_mem(compressed.window_data.data(), ptr, compressed.window_data_words * sizeof(uint32_t));
509 }
510 if (has_table) {
511 ptr += copy_to_mem(compressed.table_data.data(), ptr, compressed.table_data_words * sizeof(uint32_t));
512 }
513 }
514 if (ptr != bytes.data() + size) throw std::logic_error("serialized size mismatch");
515 return bytes;
516}
517
518template<typename A>
519cpc_sketch_alloc<A> cpc_sketch_alloc<A>::deserialize(std::istream& is, uint64_t seed, const A& allocator) {
520 const auto preamble_ints = read<uint8_t>(is);
521 const auto serial_version = read<uint8_t>(is);
522 const auto family_id = read<uint8_t>(is);
523 const auto lg_k = read<uint8_t>(is);
524 const auto first_interesting_column = read<uint8_t>(is);
525 const auto flags_byte = read<uint8_t>(is);
526 const auto seed_hash = read<uint16_t>(is);
527 const bool has_hip = flags_byte & (1 << flags::HAS_HIP);
528 const bool has_table = flags_byte & (1 << flags::HAS_TABLE);
529 const bool has_window = flags_byte & (1 << flags::HAS_WINDOW);
530 compressed_state<A> compressed(allocator);
531 compressed.table_data_words = 0;
532 compressed.table_num_entries = 0;
533 compressed.window_data_words = 0;
534 uint32_t num_coupons = 0;
535 double kxp = 0;
536 double hip_est_accum = 0;
537 if (has_table || has_window) {
538 num_coupons = read<uint32_t>(is);
539 if (has_table && has_window) {
540 compressed.table_num_entries = read<uint32_t>(is);
541 if (has_hip) {
542 kxp = read<double>(is);
543 hip_est_accum = read<double>(is);
544 }
545 }
546 if (has_table) {
547 compressed.table_data_words = read<uint32_t>(is);
548 }
549 if (has_window) {
550 compressed.window_data_words = read<uint32_t>(is);
551 }
552 if (has_hip && !(has_table && has_window)) {
553 kxp = read<double>(is);
554 hip_est_accum = read<double>(is);
555 }
556 if (has_window) {
557 compressed.window_data.resize(compressed.window_data_words);
558 read(is, compressed.window_data.data(), compressed.window_data_words * sizeof(uint32_t));
559 }
560 if (has_table) {
561 compressed.table_data.resize(compressed.table_data_words);
562 read(is, compressed.table_data.data(), compressed.table_data_words * sizeof(uint32_t));
563 }
564 if (!has_window) compressed.table_num_entries = num_coupons;
565 }
566
567 uint8_t expected_preamble_ints = get_preamble_ints(num_coupons, has_hip, has_table, has_window);
568 if (preamble_ints != expected_preamble_ints) {
569 throw std::invalid_argument("Possible corruption: preamble ints: expected "
570 + std::to_string(expected_preamble_ints) + ", got " + std::to_string(preamble_ints));
571 }
572 if (serial_version != SERIAL_VERSION) {
573 throw std::invalid_argument("Possible corruption: serial version: expected "
574 + std::to_string(SERIAL_VERSION) + ", got " + std::to_string(serial_version));
575 }
576 if (family_id != FAMILY) {
577 throw std::invalid_argument("Possible corruption: family: expected "
578 + std::to_string(FAMILY) + ", got " + std::to_string(family_id));
579 }
580 if (seed_hash != compute_seed_hash(seed)) {
581 throw std::invalid_argument("Incompatible seed hashes: " + std::to_string(seed_hash) + ", "
582 + std::to_string(compute_seed_hash(seed)));
583 }
584 uncompressed_state<A> uncompressed(allocator);
585 get_compressor<A>().uncompress(compressed, uncompressed, lg_k, num_coupons);
586 if (!is.good())
587 throw std::runtime_error("error reading from std::istream");
588 return cpc_sketch_alloc(lg_k, num_coupons, first_interesting_column, std::move(uncompressed.table),
589 std::move(uncompressed.window), has_hip, kxp, hip_est_accum, seed);
590}
591
592template<typename A>
593cpc_sketch_alloc<A> cpc_sketch_alloc<A>::deserialize(const void* bytes, size_t size, uint64_t seed, const A& allocator) {
594 ensure_minimum_memory(size, 8);
595 const char* ptr = static_cast<const char*>(bytes);
596 const char* base = static_cast<const char*>(bytes);
597 uint8_t preamble_ints;
598 ptr += copy_from_mem(ptr, preamble_ints);
599 uint8_t serial_version;
600 ptr += copy_from_mem(ptr, serial_version);
601 uint8_t family_id;
602 ptr += copy_from_mem(ptr, family_id);
603 uint8_t lg_k;
604 ptr += copy_from_mem(ptr, lg_k);
605 uint8_t first_interesting_column;
606 ptr += copy_from_mem(ptr, first_interesting_column);
607 uint8_t flags_byte;
608 ptr += copy_from_mem(ptr, flags_byte);
609 uint16_t seed_hash;
610 ptr += copy_from_mem(ptr, seed_hash);
611 const bool has_hip = flags_byte & (1 << flags::HAS_HIP);
612 const bool has_table = flags_byte & (1 << flags::HAS_TABLE);
613 const bool has_window = flags_byte & (1 << flags::HAS_WINDOW);
614 ensure_minimum_memory(size, preamble_ints << 2);
615 compressed_state<A> compressed(allocator);
616 compressed.table_data_words = 0;
617 compressed.table_num_entries = 0;
618 compressed.window_data_words = 0;
619 uint32_t num_coupons = 0;
620 double kxp = 0;
621 double hip_est_accum = 0;
622 if (has_table || has_window) {
623 check_memory_size(ptr - base + sizeof(num_coupons), size);
624 ptr += copy_from_mem(ptr, num_coupons);
625 if (has_table && has_window) {
626 check_memory_size(ptr - base + sizeof(compressed.table_num_entries), size);
627 ptr += copy_from_mem(ptr, compressed.table_num_entries);
628 if (has_hip) {
629 check_memory_size(ptr - base + sizeof(kxp) + sizeof(hip_est_accum), size);
630 ptr += copy_from_mem(ptr, kxp);
631 ptr += copy_from_mem(ptr, hip_est_accum);
632 }
633 }
634 if (has_table) {
635 check_memory_size(ptr - base + sizeof(compressed.table_data_words), size);
636 ptr += copy_from_mem(ptr, compressed.table_data_words);
637 }
638 if (has_window) {
639 check_memory_size(ptr - base + sizeof(compressed.window_data_words), size);
640 ptr += copy_from_mem(ptr, compressed.window_data_words);
641 }
642 if (has_hip && !(has_table && has_window)) {
643 check_memory_size(ptr - base + sizeof(kxp) + sizeof(hip_est_accum), size);
644 ptr += copy_from_mem(ptr, kxp);
645 ptr += copy_from_mem(ptr, hip_est_accum);
646 }
647 if (has_window) {
648 compressed.window_data.resize(compressed.window_data_words);
649 check_memory_size(ptr - base + (compressed.window_data_words * sizeof(uint32_t)), size);
650 ptr += copy_from_mem(ptr, compressed.window_data.data(), compressed.window_data_words * sizeof(uint32_t));
651 }
652 if (has_table) {
653 compressed.table_data.resize(compressed.table_data_words);
654 check_memory_size(ptr - base + (compressed.table_data_words * sizeof(uint32_t)), size);
655 ptr += copy_from_mem(ptr, compressed.table_data.data(), compressed.table_data_words * sizeof(uint32_t));
656 }
657 if (!has_window) compressed.table_num_entries = num_coupons;
658 }
659 if (ptr != static_cast<const char*>(bytes) + size) throw std::logic_error("deserialized size mismatch");
660
661 uint8_t expected_preamble_ints = get_preamble_ints(num_coupons, has_hip, has_table, has_window);
662 if (preamble_ints != expected_preamble_ints) {
663 throw std::invalid_argument("Possible corruption: preamble ints: expected "
664 + std::to_string(expected_preamble_ints) + ", got " + std::to_string(preamble_ints));
665 }
666 if (serial_version != SERIAL_VERSION) {
667 throw std::invalid_argument("Possible corruption: serial version: expected "
668 + std::to_string(SERIAL_VERSION) + ", got " + std::to_string(serial_version));
669 }
670 if (family_id != FAMILY) {
671 throw std::invalid_argument("Possible corruption: family: expected "
672 + std::to_string(FAMILY) + ", got " + std::to_string(family_id));
673 }
674 if (seed_hash != compute_seed_hash(seed)) {
675 throw std::invalid_argument("Incompatible seed hashes: " + std::to_string(seed_hash) + ", "
676 + std::to_string(compute_seed_hash(seed)));
677 }
678 uncompressed_state<A> uncompressed(allocator);
679 get_compressor<A>().uncompress(compressed, uncompressed, lg_k, num_coupons);
680 return cpc_sketch_alloc(lg_k, num_coupons, first_interesting_column, std::move(uncompressed.table),
681 std::move(uncompressed.window), has_hip, kxp, hip_est_accum, seed);
682}
683
684/*
685 * These empirical values for the 99.9th percentile of size in bytes were measured using 100,000
686 * trials. The value for each trial is the maximum of 5*16=80 measurements that were equally
687 * spaced over values of the quantity C/K between 3.0 and 8.0. This table does not include the
688 * worst-case space for the preamble, which is added by the function.
689 */
690static const uint8_t CPC_EMPIRICAL_SIZE_MAX_LGK = 19;
691static const size_t CPC_EMPIRICAL_MAX_SIZE_BYTES[] = {
692 24, // lg_k = 4
693 36, // lg_k = 5
694 56, // lg_k = 6
695 100, // lg_k = 7
696 180, // lg_k = 8
697 344, // lg_k = 9
698 660, // lg_k = 10
699 1292, // lg_k = 11
700 2540, // lg_k = 12
701 5020, // lg_k = 13
702 9968, // lg_k = 14
703 19836, // lg_k = 15
704 39532, // lg_k = 16
705 78880, // lg_k = 17
706 157516, // lg_k = 18
707 314656 // lg_k = 19
708};
709static const double CPC_EMPIRICAL_MAX_SIZE_FACTOR = 0.6; // 0.6 = 4.8 / 8.0
710static const size_t CPC_MAX_PREAMBLE_SIZE_BYTES = 40;
711
712template<typename A>
714 check_lg_k(lg_k);
715 if (lg_k <= CPC_EMPIRICAL_SIZE_MAX_LGK) {
716 return CPC_EMPIRICAL_MAX_SIZE_BYTES[lg_k - cpc_constants::MIN_LG_K] + CPC_MAX_PREAMBLE_SIZE_BYTES;
717 }
718 const uint32_t k = 1 << lg_k;
719 return (int) (CPC_EMPIRICAL_MAX_SIZE_FACTOR * k) + CPC_MAX_PREAMBLE_SIZE_BYTES;
720}
721
722template<typename A>
723void cpc_sketch_alloc<A>::check_lg_k(uint8_t lg_k) {
724 if (lg_k < cpc_constants::MIN_LG_K || lg_k > cpc_constants::MAX_LG_K) {
725 throw std::invalid_argument("lg_k must be >= " + std::to_string(cpc_constants::MIN_LG_K) + " and <= "
726 + std::to_string(cpc_constants::MAX_LG_K) + ": " + std::to_string(lg_k));
727 }
728}
729
730template<typename A>
731uint32_t cpc_sketch_alloc<A>::get_num_coupons() const {
732 return num_coupons;
733}
734
735template<typename A>
736bool cpc_sketch_alloc<A>::validate() const {
737 vector_u64 bit_matrix = build_bit_matrix();
738 const uint64_t num_bits_set = count_bits_set_in_matrix(bit_matrix.data(), 1ULL << lg_k);
739 return num_bits_set == num_coupons;
740}
741
742template<typename A>
743cpc_sketch_alloc<A>::cpc_sketch_alloc(uint8_t lg_k, uint32_t num_coupons, uint8_t first_interesting_column,
744 u32_table<A>&& table, vector_bytes&& window, bool has_hip, double kxp, double hip_est_accum, uint64_t seed):
745lg_k(lg_k),
746seed(seed),
747was_merged(!has_hip),
748num_coupons(num_coupons),
749surprising_value_table(std::move(table)),
750sliding_window(std::move(window)),
751window_offset(determine_correct_offset(lg_k, num_coupons)),
752first_interesting_column(first_interesting_column),
753kxp(kxp),
754hip_est_accum(hip_est_accum)
755{}
756
757template<typename A>
758uint8_t cpc_sketch_alloc<A>::get_preamble_ints(uint32_t num_coupons, bool has_hip, bool has_table, bool has_window) {
759 uint8_t preamble_ints = 2;
760 if (num_coupons > 0) {
761 preamble_ints += 1; // number of coupons
762 if (has_hip) {
763 preamble_ints += 4; // HIP
764 }
765 if (has_table) {
766 preamble_ints += 1; // table data length
767 // number of values (if there is no window it is the same as number of coupons)
768 if (has_window) {
769 preamble_ints += 1;
770 }
771 }
772 if (has_window) {
773 preamble_ints += 1; // window length
774 }
775 }
776 return preamble_ints;
777}
778
779template<typename A>
780typename cpc_sketch_alloc<A>::flavor cpc_sketch_alloc<A>::determine_flavor() const {
781 return determine_flavor(lg_k, num_coupons);
782}
783
784template<typename A>
785typename cpc_sketch_alloc<A>::flavor cpc_sketch_alloc<A>::determine_flavor(uint8_t lg_k, uint64_t c) {
786 const uint32_t k = 1 << lg_k;
787 const uint64_t c2 = c << 1;
788 const uint64_t c8 = c << 3;
789 const uint64_t c32 = c << 5;
790 if (c == 0) return EMPTY; // 0 == C < 1
791 if (c32 < 3 * k) return SPARSE; // 1 <= C < 3K/32
792 if (c2 < k) return HYBRID; // 3K/32 <= C < K/2
793 if (c8 < 27 * k) return PINNED; // K/2 <= C < 27K/8
794 else return SLIDING; // 27K/8 <= C
795}
796
797template<typename A>
798uint8_t cpc_sketch_alloc<A>::determine_correct_offset(uint8_t lg_k, uint64_t c) {
799 const uint32_t k = 1 << lg_k;
800 const int64_t tmp = static_cast<int64_t>(c << 3) - static_cast<int64_t>(19 * k); // 8C - 19K
801 if (tmp < 0) return 0;
802 return static_cast<uint8_t>(tmp >> (lg_k + 3)); // tmp / 8K
803}
804
805template<typename A>
806auto cpc_sketch_alloc<A>::build_bit_matrix() const -> vector_u64 {
807 const uint32_t k = 1 << lg_k;
808 if (window_offset > 56) throw std::logic_error("offset > 56");
809
810 // Fill the matrix with default rows in which the "early zone" is filled with ones.
811 // This is essential for the routine's O(k) time cost (as opposed to O(C)).
812 const uint64_t default_row = (static_cast<uint64_t>(1) << window_offset) - 1;
813 vector_u64 matrix(k, default_row, sliding_window.get_allocator());
814
815 if (num_coupons == 0) return matrix;
816
817 if (sliding_window.size() > 0) { // In other words, we are in window mode, not sparse mode
818 for (size_t i = 0; i < k; i++) { // set the window bits, trusting the sketch's current offset
819 matrix[i] |= static_cast<uint64_t>(sliding_window[i]) << window_offset;
820 }
821 }
822
823 const uint32_t* slots = surprising_value_table.get_slots();
824 const uint32_t num_slots = 1 << surprising_value_table.get_lg_size();
825 for (size_t i = 0; i < num_slots; i++) {
826 const uint32_t row_col = slots[i];
827 if (row_col != UINT32_MAX) {
828 const uint8_t col = row_col & 63;
829 const uint32_t row = row_col >> 6;
830 // Flip the specified matrix bit from its default value.
831 // In the "early" zone the bit changes from 1 to 0.
832 // In the "late" zone the bit changes from 0 to 1.
833 matrix[row] ^= static_cast<uint64_t>(1) << col;
834 }
835 }
836 return matrix;
837}
838
839template<typename A>
840void cpc_sketch_alloc<A>::write_hip(std::ostream& os) const {
841 write(os, kxp);
842 write(os, hip_est_accum);
843}
844
845template<typename A>
846size_t cpc_sketch_alloc<A>::copy_hip_to_mem(void* dst) const {
847 memcpy(dst, &kxp, sizeof(kxp));
848 memcpy(static_cast<char*>(dst) + sizeof(kxp), &hip_est_accum, sizeof(hip_est_accum));
849 return sizeof(kxp) + sizeof(hip_est_accum);
850}
851
852} /* namespace datasketches */
853
854#endif
High performance C++ implementation of Compressed Probabilistic Counting (CPC) Sketch.
Definition cpc_sketch.hpp:64
void serialize(std::ostream &os) const
This method serializes the sketch into a given stream in a binary form.
Definition cpc_sketch_impl.hpp:408
double get_estimate() const
Definition cpc_sketch_impl.hpp:75
cpc_sketch_alloc(uint8_t lg_k=cpc_constants::DEFAULT_LG_K, uint64_t seed=DEFAULT_SEED, const A &allocator=A())
Creates an instance of the sketch given the lg_k parameter and hash seed.
Definition cpc_sketch_impl.hpp:44
static cpc_sketch_alloc< A > deserialize(std::istream &is, uint64_t seed=DEFAULT_SEED, const A &allocator=A())
This method deserializes a sketch from a given stream.
Definition cpc_sketch_impl.hpp:519
void update(const std::string &value)
Update this sketch with a given string.
Definition cpc_sketch_impl.hpp:109
bool is_empty() const
Definition cpc_sketch_impl.hpp:70
static size_t get_max_serialized_size_bytes(uint8_t lg_k)
The actual size of a compressed CPC sketch has a small random variance, but the following empirically...
Definition cpc_sketch_impl.hpp:713
uint8_t get_lg_k() const
Definition cpc_sketch_impl.hpp:65
A get_allocator() const
Definition cpc_sketch_impl.hpp:60
double get_lower_bound(unsigned kappa) const
Returns the approximate lower error bound given a parameter kappa (1, 2 or 3).
Definition cpc_sketch_impl.hpp:91
double get_upper_bound(unsigned kappa) const
Returns the approximate upper error bound given a parameter kappa (1, 2 or 3).
Definition cpc_sketch_impl.hpp:100
string< A > to_string() const
Returns a human-readable summary of this sketch.
Definition cpc_sketch_impl.hpp:383
const uint8_t MIN_LG_K
min log2 of K
Definition cpc_common.hpp:32
const uint8_t MAX_LG_K
max log2 of K
Definition cpc_common.hpp:34
DataSketches namespace.
Definition binomial_bounds.hpp:38
void cpc_init()
Allocation and initialization of global decompression (decoding) tables.
Definition cpc_sketch_impl.hpp:39