datasketches-cpp
cpc_sketch_impl.hpp
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef CPC_SKETCH_IMPL_HPP_
21 #define CPC_SKETCH_IMPL_HPP_
22 
23 #include <stdexcept>
24 #include <cmath>
25 #include <cstring>
26 #include <sstream>
27 
28 #include "cpc_confidence.hpp"
29 #include "kxp_byte_lookup.hpp"
30 #include "inv_pow2_table.hpp"
31 #include "cpc_util.hpp"
32 #include "icon_estimator.hpp"
33 #include "serde.hpp"
34 #include "count_zeros.hpp"
35 
36 namespace datasketches {
37 
38 template<typename A>
39 void cpc_init() {
40  get_compressor<A>(); // this initializes a global static instance of the compressor on the first use
41 }
42 
43 template<typename A>
44 cpc_sketch_alloc<A>::cpc_sketch_alloc(uint8_t lg_k, uint64_t seed, const A& allocator):
45 lg_k(lg_k),
46 seed(seed),
47 was_merged(false),
48 num_coupons(0),
49 surprising_value_table(2, 6 + lg_k, allocator),
50 sliding_window(allocator),
51 window_offset(0),
52 first_interesting_column(0),
53 kxp(1 << lg_k),
54 hip_est_accum(0)
55 {
56  check_lg_k(lg_k);
57 }
58 
59 template<typename A>
61  return sliding_window.get_allocator();
62 }
63 
64 template<typename A>
66  return lg_k;
67 }
68 
69 template<typename A>
71  return num_coupons == 0;
72 }
73 
74 template<typename A>
76  if (!was_merged) return get_hip_estimate();
77  return get_icon_estimate();
78 }
79 
80 template<typename A>
82  return hip_est_accum;
83 }
84 
85 template<typename A>
86 double cpc_sketch_alloc<A>::get_icon_estimate() const {
87  return compute_icon_estimate(lg_k, num_coupons);
88 }
89 
90 template<typename A>
91 double cpc_sketch_alloc<A>::get_lower_bound(unsigned kappa) const {
92  if (kappa < 1 || kappa > 3) {
93  throw std::invalid_argument("kappa must be 1, 2 or 3");
94  }
95  if (!was_merged) return get_hip_confidence_lb<A>(*this, kappa);
96  return get_icon_confidence_lb<A>(*this, kappa);
97 }
98 
99 template<typename A>
100 double cpc_sketch_alloc<A>::get_upper_bound(unsigned kappa) const {
101  if (kappa < 1 || kappa > 3) {
102  throw std::invalid_argument("kappa must be 1, 2 or 3");
103  }
104  if (!was_merged) return get_hip_confidence_ub<A>(*this, kappa);
105  return get_icon_confidence_ub<A>(*this, kappa);
106 }
107 
108 template<typename A>
109 void cpc_sketch_alloc<A>::update(const std::string& value) {
110  if (value.empty()) return;
111  update(value.c_str(), value.length());
112 }
113 
114 template<typename A>
115 void cpc_sketch_alloc<A>::update(uint64_t value) {
116  update(&value, sizeof(value));
117 }
118 
119 template<typename A>
120 void cpc_sketch_alloc<A>::update(int64_t value) {
121  update(&value, sizeof(value));
122 }
123 
124 template<typename A>
125 void cpc_sketch_alloc<A>::update(uint32_t value) {
126  update(static_cast<int32_t>(value));
127 }
128 
129 template<typename A>
130 void cpc_sketch_alloc<A>::update(int32_t value) {
131  update(static_cast<int64_t>(value));
132 }
133 
134 template<typename A>
135 void cpc_sketch_alloc<A>::update(uint16_t value) {
136  update(static_cast<int16_t>(value));
137 }
138 
139 template<typename A>
140 void cpc_sketch_alloc<A>::update(int16_t value) {
141  update(static_cast<int64_t>(value));
142 }
143 
144 template<typename A>
145 void cpc_sketch_alloc<A>::update(uint8_t value) {
146  update(static_cast<int8_t>(value));
147 }
148 
149 template<typename A>
150 void cpc_sketch_alloc<A>::update(int8_t value) {
151  update(static_cast<int64_t>(value));
152 }
153 
154 template<typename A>
155 void cpc_sketch_alloc<A>::update(double value) {
156  union {
157  int64_t long_value;
158  double double_value;
159  } ldu;
160  if (value == 0.0) {
161  ldu.double_value = 0.0; // canonicalize -0.0 to 0.0
162  } else if (std::isnan(value)) {
163  ldu.long_value = 0x7ff8000000000000L; // canonicalize NaN using value from Java's Double.doubleToLongBits()
164  } else {
165  ldu.double_value = value;
166  }
167  update(&ldu, sizeof(ldu));
168 }
169 
170 template<typename A>
171 void cpc_sketch_alloc<A>::update(float value) {
172  update(static_cast<double>(value));
173 }
174 
175 static inline uint32_t row_col_from_two_hashes(uint64_t hash0, uint64_t hash1, uint8_t lg_k) {
176  if (lg_k > 26) throw std::logic_error("lg_k > 26");
177  const uint32_t k = 1 << lg_k;
178  uint8_t col = count_leading_zeros_in_u64(hash1); // 0 <= col <= 64
179  if (col > 63) col = 63; // clip so that 0 <= col <= 63
180  const uint32_t row = hash0 & (k - 1);
181  uint32_t row_col = (row << 6) | col;
182  // To avoid the hash table's "empty" value, we change the row of the following pair.
183  // This case is extremely unlikely, but we might as well handle it.
184  if (row_col == UINT32_MAX) row_col ^= 1 << 6;
185  return row_col;
186 }
187 
188 template<typename A>
189 void cpc_sketch_alloc<A>::update(const void* value, size_t size) {
190  HashState hashes;
191  MurmurHash3_x64_128(value, size, seed, hashes);
192  row_col_update(row_col_from_two_hashes(hashes.h1, hashes.h2, lg_k));
193 }
194 
195 template<typename A>
196 void cpc_sketch_alloc<A>::row_col_update(uint32_t row_col) {
197  const uint8_t col = row_col & 63;
198  if (col < first_interesting_column) return; // important speed optimization
199  // window size is 0 until sketch is promoted from sparse to windowed
200  if (sliding_window.size() == 0) {
201  update_sparse(row_col);
202  } else {
203  update_windowed(row_col);
204  }
205 }
206 
207 template<typename A>
208 void cpc_sketch_alloc<A>::update_sparse(uint32_t row_col) {
209  const uint32_t k = 1 << lg_k;
210  const uint64_t c32pre = static_cast<uint64_t>(num_coupons) << 5;
211  if (c32pre >= 3 * k) throw std::logic_error("c32pre >= 3 * k"); // C < 3K/32, in other words flavor == SPARSE
212  bool is_novel = surprising_value_table.maybe_insert(row_col);
213  if (is_novel) {
214  num_coupons++;
215  update_hip(row_col);
216  const uint64_t c32post = static_cast<uint64_t>(num_coupons) << 5;
217  if (c32post >= 3 * k) promote_sparse_to_windowed(); // C >= 3K/32
218  }
219 }
220 
221 // the flavor is HYBRID, PINNED, or SLIDING
222 template<typename A>
223 void cpc_sketch_alloc<A>::update_windowed(uint32_t row_col) {
224  if (window_offset > 56) throw std::logic_error("wrong window offset");
225  const uint32_t k = 1 << lg_k;
226  const uint64_t c32pre = static_cast<uint64_t>(num_coupons) << 5;
227  if (c32pre < 3 * k) throw std::logic_error("c32pre < 3 * k"); // C < 3K/32, in other words flavor >= HYBRID
228  const uint64_t c8pre = static_cast<uint64_t>(num_coupons) << 3;
229  const uint64_t w8pre = static_cast<uint64_t>(window_offset) << 3;
230  if (c8pre >= (27 + w8pre) * k) throw std::logic_error("c8pre is wrong"); // C < (K * 27/8) + (K * window_offset)
231 
232  bool is_novel = false;
233  const uint8_t col = row_col & 63;
234 
235  if (col < window_offset) { // track the surprising 0's "before" the window
236  is_novel = surprising_value_table.maybe_delete(row_col); // inverted logic
237  } else if (col < window_offset + 8) { // track the 8 bits inside the window
238  if (col < window_offset) throw std::logic_error("col < window_offset");
239  const uint32_t row = row_col >> 6;
240  const uint8_t old_bits = sliding_window[row];
241  const uint8_t new_bits = old_bits | (1 << (col - window_offset));
242  if (new_bits != old_bits) {
243  sliding_window[row] = new_bits;
244  is_novel = true;
245  }
246  } else { // track the surprising 1's "after" the window
247  if (col < window_offset + 8) throw std::logic_error("col < window_offset + 8");
248  is_novel = surprising_value_table.maybe_insert(row_col); // normal logic
249  }
250 
251  if (is_novel) {
252  num_coupons++;
253  update_hip(row_col);
254  const uint64_t c8post = static_cast<uint64_t>(num_coupons) << 3;
255  if (c8post >= (27 + w8pre) * k) {
256  move_window();
257  if (window_offset < 1 || window_offset > 56) throw std::logic_error("wrong window offset");
258  const uint64_t w8post = static_cast<uint64_t>(window_offset) << 3;
259  if (c8post >= (27 + w8post) * k) throw std::logic_error("c8pre is wrong"); // C < (K * 27/8) + (K * window_offset)
260  }
261  }
262 }
263 
264 // Call this whenever a new coupon has been collected.
265 template<typename A>
266 void cpc_sketch_alloc<A>::update_hip(uint32_t row_col) {
267  const uint32_t k = 1 << lg_k;
268  const uint8_t col = row_col & 63;
269  const double one_over_p = static_cast<double>(k) / kxp;
270  hip_est_accum += one_over_p;
271  kxp -= INVERSE_POWERS_OF_2[col + 1]; // notice the "+1"
272 }
273 
274 // In terms of flavor, this promotes SPARSE to HYBRID
275 template<typename A>
276 void cpc_sketch_alloc<A>::promote_sparse_to_windowed() {
277  const uint32_t k = 1 << lg_k;
278  const uint64_t c32 = static_cast<uint64_t>(num_coupons) << 5;
279  if (!(c32 == 3 * k || (lg_k == 4 && c32 > 3 * k))) throw std::logic_error("wrong c32");
280 
281  sliding_window.resize(k, 0); // zero the memory (because we will be OR'ing into it)
282 
283  u32_table<A> new_table(2, 6 + lg_k, sliding_window.get_allocator());
284 
285  const uint32_t* old_slots = surprising_value_table.get_slots();
286  const uint32_t old_num_slots = 1 << surprising_value_table.get_lg_size();
287 
288  if (window_offset != 0) throw std::logic_error("window_offset != 0");
289 
290  for (uint32_t i = 0; i < old_num_slots; i++) {
291  const uint32_t row_col = old_slots[i];
292  if (row_col != UINT32_MAX) {
293  const uint8_t col = row_col & 63;
294  if (col < 8) {
295  const uint32_t row = row_col >> 6;
296  sliding_window[row] |= 1 << col;
297  } else {
298  // cannot use u32_table::must_insert(), because it doesn't provide for growth
299  const bool is_novel = new_table.maybe_insert(row_col);
300  if (!is_novel) throw std::logic_error("is_novel != true");
301  }
302  }
303  }
304 
305  surprising_value_table = std::move(new_table);
306 }
307 
308 template<typename A>
309 void cpc_sketch_alloc<A>::move_window() {
310  const uint8_t new_offset = window_offset + 1;
311  if (new_offset > 56) throw std::logic_error("new_offset > 56");
312  if (new_offset != determine_correct_offset(lg_k, num_coupons)) throw std::logic_error("new_offset is wrong");
313 
314  if (sliding_window.size() == 0) throw std::logic_error("no sliding window");
315  const uint32_t k = 1 << lg_k;
316 
317  // Construct the full-sized bit matrix that corresponds to the sketch
318  vector_u64 bit_matrix = build_bit_matrix();
319 
320  // refresh the KXP register on every 8th window shift.
321  if ((new_offset & 0x7) == 0) refresh_kxp(bit_matrix.data());
322 
323  surprising_value_table.clear(); // the new number of surprises will be about the same
324 
325  const uint64_t mask_for_clearing_window = (static_cast<uint64_t>(0xff) << new_offset) ^ UINT64_MAX;
326  const uint64_t mask_for_flipping_early_zone = (static_cast<uint64_t>(1) << new_offset) - 1;
327  uint64_t all_surprises_ored = 0;
328 
329  for (uint32_t i = 0; i < k; i++) {
330  uint64_t pattern = bit_matrix[i];
331  sliding_window[i] = (pattern >> new_offset) & 0xff;
332  pattern &= mask_for_clearing_window;
333  // The following line converts surprising 0's to 1's in the "early zone",
334  // (and vice versa, which is essential for this procedure's O(k) time cost).
335  pattern ^= mask_for_flipping_early_zone;
336  all_surprises_ored |= pattern; // a cheap way to recalculate first_interesting_column
337  while (pattern != 0) {
338  const uint8_t col = count_trailing_zeros_in_u64(pattern);
339  pattern = pattern ^ (static_cast<uint64_t>(1) << col); // erase the 1
340  const uint32_t row_col = (i << 6) | col;
341  const bool is_novel = surprising_value_table.maybe_insert(row_col);
342  if (!is_novel) throw std::logic_error("is_novel != true");
343  }
344  }
345 
346  window_offset = new_offset;
347 
348  first_interesting_column = count_trailing_zeros_in_u64(all_surprises_ored);
349  if (first_interesting_column > new_offset) first_interesting_column = new_offset; // corner case
350 }
351 
352 // The KXP register is a double with roughly 50 bits of precision, but
353 // it might need roughly 90 bits to track the value with perfect accuracy.
354 // Therefore we recalculate KXP occasionally from the sketch's full bitmatrix
355 // so that it will reflect changes that were previously outside the mantissa.
356 template<typename A>
357 void cpc_sketch_alloc<A>::refresh_kxp(const uint64_t* bit_matrix) {
358  const uint32_t k = 1 << lg_k;
359 
360  // for improved numerical accuracy, we separately sum the bytes of the U64's
361  double byte_sums[8]; // allocating on the stack
362  std::fill(byte_sums, byte_sums + 8, 0);
363 
364  for (size_t i = 0; i < k; i++) {
365  uint64_t word = bit_matrix[i];
366  for (unsigned j = 0; j < 8; j++) {
367  const uint8_t byte = word & 0xff;
368  byte_sums[j] += KXP_BYTE_TABLE[byte];
369  word >>= 8;
370  }
371  }
372 
373  double total = 0.0;
374  for (int j = 7; j >= 0; j--) { // the reverse order is important
375  const double factor = INVERSE_POWERS_OF_2[8 * j]; // pow (256.0, (-1.0 * ((double) j)));
376  total += factor * byte_sums[j];
377  }
378 
379  kxp = total;
380 }
381 
382 template<typename A>
384  // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements.
385  // The stream does not support passing an allocator instance, and alternatives are complicated.
386  std::ostringstream os;
387  os << "### CPC sketch summary:" << std::endl;
388  os << " lg_k : " << std::to_string(lg_k) << std::endl;
389  os << " seed hash : " << std::hex << compute_seed_hash(seed) << std::dec << std::endl;
390  os << " C : " << num_coupons << std::endl;
391  os << " flavor : " << determine_flavor() << std::endl;
392  os << " merged : " << (was_merged ? "true" : "false") << std::endl;
393  if (!was_merged) {
394  os << " HIP estimate : " << hip_est_accum << std::endl;
395  os << " kxp : " << kxp << std::endl;
396  }
397  os << " interesting col: " << std::to_string(first_interesting_column) << std::endl;
398  os << " table entries : " << surprising_value_table.get_num_items() << std::endl;
399  os << " window : " << (sliding_window.size() == 0 ? "not " : "") << "allocated" << std::endl;
400  if (sliding_window.size() > 0) {
401  os << " window offset : " << std::to_string(window_offset) << std::endl;
402  }
403  os << "### End sketch summary" << std::endl;
404  return string<A>(os.str().c_str(), sliding_window.get_allocator());
405 }
406 
407 template<typename A>
408 void cpc_sketch_alloc<A>::serialize(std::ostream& os) const {
409  compressed_state<A> compressed(A(sliding_window.get_allocator()));
410  compressed.table_data_words = 0;
411  compressed.table_num_entries = 0;
412  compressed.window_data_words = 0;
413  get_compressor<A>().compress(*this, compressed);
414  const bool has_hip = !was_merged;
415  const bool has_table = compressed.table_data.size() > 0;
416  const bool has_window = compressed.window_data.size() > 0;
417  const uint8_t preamble_ints = get_preamble_ints(num_coupons, has_hip, has_table, has_window);
418  write(os, preamble_ints);
419  const uint8_t serial_version = SERIAL_VERSION;
420  write(os, serial_version);
421  const uint8_t family = FAMILY;
422  write(os, family);
423  write(os, lg_k);
424  write(os, first_interesting_column);
425  const uint8_t flags_byte(
426  (1 << flags::IS_COMPRESSED)
427  | (has_hip ? 1 << flags::HAS_HIP : 0)
428  | (has_table ? 1 << flags::HAS_TABLE : 0)
429  | (has_window ? 1 << flags::HAS_WINDOW : 0)
430  );
431  write(os, flags_byte);
432  const uint16_t seed_hash(compute_seed_hash(seed));
433  write(os, seed_hash);
434  if (!is_empty()) {
435  write(os, num_coupons);
436  if (has_table && has_window) {
437  // if there is no window it is the same as number of coupons
438  write(os, compressed.table_num_entries);
439  // HIP values can be in two different places in the sequence of fields
440  // this is the first HIP decision point
441  if (has_hip) write_hip(os);
442  }
443  if (has_table) {
444  write(os, compressed.table_data_words);
445  }
446  if (has_window) {
447  write(os, compressed.window_data_words);
448  }
449  // this is the second HIP decision point
450  if (has_hip && !(has_table && has_window)) write_hip(os);
451  if (has_window) {
452  write(os, compressed.window_data.data(), compressed.window_data_words * sizeof(uint32_t));
453  }
454  if (has_table) {
455  write(os, compressed.table_data.data(), compressed.table_data_words * sizeof(uint32_t));
456  }
457  }
458 }
459 
460 template<typename A>
461 auto cpc_sketch_alloc<A>::serialize(unsigned header_size_bytes) const -> vector_bytes {
462  compressed_state<A> compressed(sliding_window.get_allocator());
463  compressed.table_data_words = 0;
464  compressed.table_num_entries = 0;
465  compressed.window_data_words = 0;
466  get_compressor<A>().compress(*this, compressed);
467  const bool has_hip = !was_merged;
468  const bool has_table = compressed.table_data.size() > 0;
469  const bool has_window = compressed.window_data.size() > 0;
470  const uint8_t preamble_ints = get_preamble_ints(num_coupons, has_hip, has_table, has_window);
471  const size_t size = header_size_bytes + (preamble_ints + compressed.table_data_words + compressed.window_data_words) * sizeof(uint32_t);
472  vector_bytes bytes(size, 0, sliding_window.get_allocator());
473  uint8_t* ptr = bytes.data() + header_size_bytes;
474  ptr += copy_to_mem(preamble_ints, ptr);
475  const uint8_t serial_version = SERIAL_VERSION;
476  ptr += copy_to_mem(serial_version, ptr);
477  const uint8_t family = FAMILY;
478  ptr += copy_to_mem(family, ptr);
479  ptr += copy_to_mem(lg_k, ptr);
480  ptr += copy_to_mem(first_interesting_column, ptr);
481  const uint8_t flags_byte(
482  (1 << flags::IS_COMPRESSED)
483  | (has_hip ? 1 << flags::HAS_HIP : 0)
484  | (has_table ? 1 << flags::HAS_TABLE : 0)
485  | (has_window ? 1 << flags::HAS_WINDOW : 0)
486  );
487  ptr += copy_to_mem(flags_byte, ptr);
488  const uint16_t seed_hash = compute_seed_hash(seed);
489  ptr += copy_to_mem(seed_hash, ptr);
490  if (!is_empty()) {
491  ptr += copy_to_mem(num_coupons, ptr);
492  if (has_table && has_window) {
493  // if there is no window it is the same as number of coupons
494  ptr += copy_to_mem(compressed.table_num_entries, ptr);
495  // HIP values can be in two different places in the sequence of fields
496  // this is the first HIP decision point
497  if (has_hip) ptr += copy_hip_to_mem(ptr);
498  }
499  if (has_table) {
500  ptr += copy_to_mem(compressed.table_data_words, ptr);
501  }
502  if (has_window) {
503  ptr += copy_to_mem(compressed.window_data_words, ptr);
504  }
505  // this is the second HIP decision point
506  if (has_hip && !(has_table && has_window)) ptr += copy_hip_to_mem(ptr);
507  if (has_window) {
508  ptr += copy_to_mem(compressed.window_data.data(), ptr, compressed.window_data_words * sizeof(uint32_t));
509  }
510  if (has_table) {
511  ptr += copy_to_mem(compressed.table_data.data(), ptr, compressed.table_data_words * sizeof(uint32_t));
512  }
513  }
514  if (ptr != bytes.data() + size) throw std::logic_error("serialized size mismatch");
515  return bytes;
516 }
517 
518 template<typename A>
519 cpc_sketch_alloc<A> cpc_sketch_alloc<A>::deserialize(std::istream& is, uint64_t seed, const A& allocator) {
520  const auto preamble_ints = read<uint8_t>(is);
521  const auto serial_version = read<uint8_t>(is);
522  const auto family_id = read<uint8_t>(is);
523  const auto lg_k = read<uint8_t>(is);
524  const auto first_interesting_column = read<uint8_t>(is);
525  const auto flags_byte = read<uint8_t>(is);
526  const auto seed_hash = read<uint16_t>(is);
527  const bool has_hip = flags_byte & (1 << flags::HAS_HIP);
528  const bool has_table = flags_byte & (1 << flags::HAS_TABLE);
529  const bool has_window = flags_byte & (1 << flags::HAS_WINDOW);
530  compressed_state<A> compressed(allocator);
531  compressed.table_data_words = 0;
532  compressed.table_num_entries = 0;
533  compressed.window_data_words = 0;
534  uint32_t num_coupons = 0;
535  double kxp = 0;
536  double hip_est_accum = 0;
537  if (has_table || has_window) {
538  num_coupons = read<uint32_t>(is);
539  if (has_table && has_window) {
540  compressed.table_num_entries = read<uint32_t>(is);
541  if (has_hip) {
542  kxp = read<double>(is);
543  hip_est_accum = read<double>(is);
544  }
545  }
546  if (has_table) {
547  compressed.table_data_words = read<uint32_t>(is);
548  }
549  if (has_window) {
550  compressed.window_data_words = read<uint32_t>(is);
551  }
552  if (has_hip && !(has_table && has_window)) {
553  kxp = read<double>(is);
554  hip_est_accum = read<double>(is);
555  }
556  if (has_window) {
557  compressed.window_data.resize(compressed.window_data_words);
558  read(is, compressed.window_data.data(), compressed.window_data_words * sizeof(uint32_t));
559  }
560  if (has_table) {
561  compressed.table_data.resize(compressed.table_data_words);
562  read(is, compressed.table_data.data(), compressed.table_data_words * sizeof(uint32_t));
563  }
564  if (!has_window) compressed.table_num_entries = num_coupons;
565  }
566 
567  uint8_t expected_preamble_ints = get_preamble_ints(num_coupons, has_hip, has_table, has_window);
568  if (preamble_ints != expected_preamble_ints) {
569  throw std::invalid_argument("Possible corruption: preamble ints: expected "
570  + std::to_string(expected_preamble_ints) + ", got " + std::to_string(preamble_ints));
571  }
572  if (serial_version != SERIAL_VERSION) {
573  throw std::invalid_argument("Possible corruption: serial version: expected "
574  + std::to_string(SERIAL_VERSION) + ", got " + std::to_string(serial_version));
575  }
576  if (family_id != FAMILY) {
577  throw std::invalid_argument("Possible corruption: family: expected "
578  + std::to_string(FAMILY) + ", got " + std::to_string(family_id));
579  }
580  if (seed_hash != compute_seed_hash(seed)) {
581  throw std::invalid_argument("Incompatible seed hashes: " + std::to_string(seed_hash) + ", "
582  + std::to_string(compute_seed_hash(seed)));
583  }
584  uncompressed_state<A> uncompressed(allocator);
585  get_compressor<A>().uncompress(compressed, uncompressed, lg_k, num_coupons);
586  if (!is.good())
587  throw std::runtime_error("error reading from std::istream");
588  return cpc_sketch_alloc(lg_k, num_coupons, first_interesting_column, std::move(uncompressed.table),
589  std::move(uncompressed.window), has_hip, kxp, hip_est_accum, seed);
590 }
591 
592 template<typename A>
593 cpc_sketch_alloc<A> cpc_sketch_alloc<A>::deserialize(const void* bytes, size_t size, uint64_t seed, const A& allocator) {
594  ensure_minimum_memory(size, 8);
595  const char* ptr = static_cast<const char*>(bytes);
596  const char* base = static_cast<const char*>(bytes);
597  uint8_t preamble_ints;
598  ptr += copy_from_mem(ptr, preamble_ints);
599  uint8_t serial_version;
600  ptr += copy_from_mem(ptr, serial_version);
601  uint8_t family_id;
602  ptr += copy_from_mem(ptr, family_id);
603  uint8_t lg_k;
604  ptr += copy_from_mem(ptr, lg_k);
605  uint8_t first_interesting_column;
606  ptr += copy_from_mem(ptr, first_interesting_column);
607  uint8_t flags_byte;
608  ptr += copy_from_mem(ptr, flags_byte);
609  uint16_t seed_hash;
610  ptr += copy_from_mem(ptr, seed_hash);
611  const bool has_hip = flags_byte & (1 << flags::HAS_HIP);
612  const bool has_table = flags_byte & (1 << flags::HAS_TABLE);
613  const bool has_window = flags_byte & (1 << flags::HAS_WINDOW);
614  ensure_minimum_memory(size, preamble_ints << 2);
615  compressed_state<A> compressed(allocator);
616  compressed.table_data_words = 0;
617  compressed.table_num_entries = 0;
618  compressed.window_data_words = 0;
619  uint32_t num_coupons = 0;
620  double kxp = 0;
621  double hip_est_accum = 0;
622  if (has_table || has_window) {
623  check_memory_size(ptr - base + sizeof(num_coupons), size);
624  ptr += copy_from_mem(ptr, num_coupons);
625  if (has_table && has_window) {
626  check_memory_size(ptr - base + sizeof(compressed.table_num_entries), size);
627  ptr += copy_from_mem(ptr, compressed.table_num_entries);
628  if (has_hip) {
629  check_memory_size(ptr - base + sizeof(kxp) + sizeof(hip_est_accum), size);
630  ptr += copy_from_mem(ptr, kxp);
631  ptr += copy_from_mem(ptr, hip_est_accum);
632  }
633  }
634  if (has_table) {
635  check_memory_size(ptr - base + sizeof(compressed.table_data_words), size);
636  ptr += copy_from_mem(ptr, compressed.table_data_words);
637  }
638  if (has_window) {
639  check_memory_size(ptr - base + sizeof(compressed.window_data_words), size);
640  ptr += copy_from_mem(ptr, compressed.window_data_words);
641  }
642  if (has_hip && !(has_table && has_window)) {
643  check_memory_size(ptr - base + sizeof(kxp) + sizeof(hip_est_accum), size);
644  ptr += copy_from_mem(ptr, kxp);
645  ptr += copy_from_mem(ptr, hip_est_accum);
646  }
647  if (has_window) {
648  compressed.window_data.resize(compressed.window_data_words);
649  check_memory_size(ptr - base + (compressed.window_data_words * sizeof(uint32_t)), size);
650  ptr += copy_from_mem(ptr, compressed.window_data.data(), compressed.window_data_words * sizeof(uint32_t));
651  }
652  if (has_table) {
653  compressed.table_data.resize(compressed.table_data_words);
654  check_memory_size(ptr - base + (compressed.table_data_words * sizeof(uint32_t)), size);
655  ptr += copy_from_mem(ptr, compressed.table_data.data(), compressed.table_data_words * sizeof(uint32_t));
656  }
657  if (!has_window) compressed.table_num_entries = num_coupons;
658  }
659  if (ptr != static_cast<const char*>(bytes) + size) throw std::logic_error("deserialized size mismatch");
660 
661  uint8_t expected_preamble_ints = get_preamble_ints(num_coupons, has_hip, has_table, has_window);
662  if (preamble_ints != expected_preamble_ints) {
663  throw std::invalid_argument("Possible corruption: preamble ints: expected "
664  + std::to_string(expected_preamble_ints) + ", got " + std::to_string(preamble_ints));
665  }
666  if (serial_version != SERIAL_VERSION) {
667  throw std::invalid_argument("Possible corruption: serial version: expected "
668  + std::to_string(SERIAL_VERSION) + ", got " + std::to_string(serial_version));
669  }
670  if (family_id != FAMILY) {
671  throw std::invalid_argument("Possible corruption: family: expected "
672  + std::to_string(FAMILY) + ", got " + std::to_string(family_id));
673  }
674  if (seed_hash != compute_seed_hash(seed)) {
675  throw std::invalid_argument("Incompatible seed hashes: " + std::to_string(seed_hash) + ", "
676  + std::to_string(compute_seed_hash(seed)));
677  }
678  uncompressed_state<A> uncompressed(allocator);
679  get_compressor<A>().uncompress(compressed, uncompressed, lg_k, num_coupons);
680  return cpc_sketch_alloc(lg_k, num_coupons, first_interesting_column, std::move(uncompressed.table),
681  std::move(uncompressed.window), has_hip, kxp, hip_est_accum, seed);
682 }
683 
684 /*
685  * These empirical values for the 99.9th percentile of size in bytes were measured using 100,000
686  * trials. The value for each trial is the maximum of 5*16=80 measurements that were equally
687  * spaced over values of the quantity C/K between 3.0 and 8.0. This table does not include the
688  * worst-case space for the preamble, which is added by the function.
689  */
690 static const uint8_t CPC_EMPIRICAL_SIZE_MAX_LGK = 19;
691 static const size_t CPC_EMPIRICAL_MAX_SIZE_BYTES[] = {
692  24, // lg_k = 4
693  36, // lg_k = 5
694  56, // lg_k = 6
695  100, // lg_k = 7
696  180, // lg_k = 8
697  344, // lg_k = 9
698  660, // lg_k = 10
699  1292, // lg_k = 11
700  2540, // lg_k = 12
701  5020, // lg_k = 13
702  9968, // lg_k = 14
703  19836, // lg_k = 15
704  39532, // lg_k = 16
705  78880, // lg_k = 17
706  157516, // lg_k = 18
707  314656 // lg_k = 19
708 };
709 static const double CPC_EMPIRICAL_MAX_SIZE_FACTOR = 0.6; // 0.6 = 4.8 / 8.0
710 static const size_t CPC_MAX_PREAMBLE_SIZE_BYTES = 40;
711 
712 template<typename A>
714  check_lg_k(lg_k);
715  if (lg_k <= CPC_EMPIRICAL_SIZE_MAX_LGK) {
716  return CPC_EMPIRICAL_MAX_SIZE_BYTES[lg_k - cpc_constants::MIN_LG_K] + CPC_MAX_PREAMBLE_SIZE_BYTES;
717  }
718  const uint32_t k = 1 << lg_k;
719  return (int) (CPC_EMPIRICAL_MAX_SIZE_FACTOR * k) + CPC_MAX_PREAMBLE_SIZE_BYTES;
720 }
721 
722 template<typename A>
723 void cpc_sketch_alloc<A>::check_lg_k(uint8_t lg_k) {
724  if (lg_k < cpc_constants::MIN_LG_K || lg_k > cpc_constants::MAX_LG_K) {
725  throw std::invalid_argument("lg_k must be >= " + std::to_string(cpc_constants::MIN_LG_K) + " and <= "
726  + std::to_string(cpc_constants::MAX_LG_K) + ": " + std::to_string(lg_k));
727  }
728 }
729 
730 template<typename A>
731 uint32_t cpc_sketch_alloc<A>::get_num_coupons() const {
732  return num_coupons;
733 }
734 
735 template<typename A>
736 bool cpc_sketch_alloc<A>::validate() const {
737  vector_u64 bit_matrix = build_bit_matrix();
738  const uint64_t num_bits_set = count_bits_set_in_matrix(bit_matrix.data(), 1ULL << lg_k);
739  return num_bits_set == num_coupons;
740 }
741 
742 template<typename A>
743 cpc_sketch_alloc<A>::cpc_sketch_alloc(uint8_t lg_k, uint32_t num_coupons, uint8_t first_interesting_column,
744  u32_table<A>&& table, vector_bytes&& window, bool has_hip, double kxp, double hip_est_accum, uint64_t seed):
745 lg_k(lg_k),
746 seed(seed),
747 was_merged(!has_hip),
748 num_coupons(num_coupons),
749 surprising_value_table(std::move(table)),
750 sliding_window(std::move(window)),
751 window_offset(determine_correct_offset(lg_k, num_coupons)),
752 first_interesting_column(first_interesting_column),
753 kxp(kxp),
754 hip_est_accum(hip_est_accum)
755 {}
756 
757 template<typename A>
758 uint8_t cpc_sketch_alloc<A>::get_preamble_ints(uint32_t num_coupons, bool has_hip, bool has_table, bool has_window) {
759  uint8_t preamble_ints = 2;
760  if (num_coupons > 0) {
761  preamble_ints += 1; // number of coupons
762  if (has_hip) {
763  preamble_ints += 4; // HIP
764  }
765  if (has_table) {
766  preamble_ints += 1; // table data length
767  // number of values (if there is no window it is the same as number of coupons)
768  if (has_window) {
769  preamble_ints += 1;
770  }
771  }
772  if (has_window) {
773  preamble_ints += 1; // window length
774  }
775  }
776  return preamble_ints;
777 }
778 
779 template<typename A>
780 typename cpc_sketch_alloc<A>::flavor cpc_sketch_alloc<A>::determine_flavor() const {
781  return determine_flavor(lg_k, num_coupons);
782 }
783 
784 template<typename A>
785 typename cpc_sketch_alloc<A>::flavor cpc_sketch_alloc<A>::determine_flavor(uint8_t lg_k, uint64_t c) {
786  const uint32_t k = 1 << lg_k;
787  const uint64_t c2 = c << 1;
788  const uint64_t c8 = c << 3;
789  const uint64_t c32 = c << 5;
790  if (c == 0) return EMPTY; // 0 == C < 1
791  if (c32 < 3 * k) return SPARSE; // 1 <= C < 3K/32
792  if (c2 < k) return HYBRID; // 3K/32 <= C < K/2
793  if (c8 < 27 * k) return PINNED; // K/2 <= C < 27K/8
794  else return SLIDING; // 27K/8 <= C
795 }
796 
797 template<typename A>
798 uint8_t cpc_sketch_alloc<A>::determine_correct_offset(uint8_t lg_k, uint64_t c) {
799  const uint32_t k = 1 << lg_k;
800  const int64_t tmp = static_cast<int64_t>(c << 3) - static_cast<int64_t>(19 * k); // 8C - 19K
801  if (tmp < 0) return 0;
802  return static_cast<uint8_t>(tmp >> (lg_k + 3)); // tmp / 8K
803 }
804 
805 template<typename A>
806 auto cpc_sketch_alloc<A>::build_bit_matrix() const -> vector_u64 {
807  const uint32_t k = 1 << lg_k;
808  if (window_offset > 56) throw std::logic_error("offset > 56");
809 
810  // Fill the matrix with default rows in which the "early zone" is filled with ones.
811  // This is essential for the routine's O(k) time cost (as opposed to O(C)).
812  const uint64_t default_row = (static_cast<uint64_t>(1) << window_offset) - 1;
813  vector_u64 matrix(k, default_row, sliding_window.get_allocator());
814 
815  if (num_coupons == 0) return matrix;
816 
817  if (sliding_window.size() > 0) { // In other words, we are in window mode, not sparse mode
818  for (size_t i = 0; i < k; i++) { // set the window bits, trusting the sketch's current offset
819  matrix[i] |= static_cast<uint64_t>(sliding_window[i]) << window_offset;
820  }
821  }
822 
823  const uint32_t* slots = surprising_value_table.get_slots();
824  const uint32_t num_slots = 1 << surprising_value_table.get_lg_size();
825  for (size_t i = 0; i < num_slots; i++) {
826  const uint32_t row_col = slots[i];
827  if (row_col != UINT32_MAX) {
828  const uint8_t col = row_col & 63;
829  const uint32_t row = row_col >> 6;
830  // Flip the specified matrix bit from its default value.
831  // In the "early" zone the bit changes from 1 to 0.
832  // In the "late" zone the bit changes from 0 to 1.
833  matrix[row] ^= static_cast<uint64_t>(1) << col;
834  }
835  }
836  return matrix;
837 }
838 
839 template<typename A>
840 void cpc_sketch_alloc<A>::write_hip(std::ostream& os) const {
841  write(os, kxp);
842  write(os, hip_est_accum);
843 }
844 
845 template<typename A>
846 size_t cpc_sketch_alloc<A>::copy_hip_to_mem(void* dst) const {
847  memcpy(dst, &kxp, sizeof(kxp));
848  memcpy(static_cast<char*>(dst) + sizeof(kxp), &hip_est_accum, sizeof(hip_est_accum));
849  return sizeof(kxp) + sizeof(hip_est_accum);
850 }
851 
852 } /* namespace datasketches */
853 
854 #endif
High performance C++ implementation of Compressed Probabilistic Counting (CPC) Sketch.
Definition: cpc_sketch.hpp:64
void serialize(std::ostream &os) const
This method serializes the sketch into a given stream in a binary form.
Definition: cpc_sketch_impl.hpp:408
double get_estimate() const
Definition: cpc_sketch_impl.hpp:75
cpc_sketch_alloc(uint8_t lg_k=cpc_constants::DEFAULT_LG_K, uint64_t seed=DEFAULT_SEED, const A &allocator=A())
Creates an instance of the sketch given the lg_k parameter and hash seed.
Definition: cpc_sketch_impl.hpp:44
static cpc_sketch_alloc< A > deserialize(std::istream &is, uint64_t seed=DEFAULT_SEED, const A &allocator=A())
This method deserializes a sketch from a given stream.
Definition: cpc_sketch_impl.hpp:519
void update(const std::string &value)
Update this sketch with a given string.
Definition: cpc_sketch_impl.hpp:109
bool is_empty() const
Definition: cpc_sketch_impl.hpp:70
static size_t get_max_serialized_size_bytes(uint8_t lg_k)
The actual size of a compressed CPC sketch has a small random variance, but the following empirically...
Definition: cpc_sketch_impl.hpp:713
uint8_t get_lg_k() const
Definition: cpc_sketch_impl.hpp:65
A get_allocator() const
Definition: cpc_sketch_impl.hpp:60
double get_lower_bound(unsigned kappa) const
Returns the approximate lower error bound given a parameter kappa (1, 2 or 3).
Definition: cpc_sketch_impl.hpp:91
double get_upper_bound(unsigned kappa) const
Returns the approximate upper error bound given a parameter kappa (1, 2 or 3).
Definition: cpc_sketch_impl.hpp:100
string< A > to_string() const
Returns a human-readable summary of this sketch.
Definition: cpc_sketch_impl.hpp:383
const uint8_t MIN_LG_K
min log2 of K
Definition: cpc_common.hpp:32
const uint8_t MAX_LG_K
max log2 of K
Definition: cpc_common.hpp:34
DataSketches namespace.
Definition: binomial_bounds.hpp:38
void cpc_init()
Allocation and initialization of global decompression (decoding) tables.
Definition: cpc_sketch_impl.hpp:39