datasketches-cpp
count_min_impl.hpp
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef COUNT_MIN_IMPL_HPP_
21 #define COUNT_MIN_IMPL_HPP_
22 
23 #include <algorithm>
24 #include <iomanip>
25 #include <random>
26 #include <sstream>
27 
28 #include "MurmurHash3.h"
29 #include "count_min.hpp"
30 #include "memory_operations.hpp"
31 
32 namespace datasketches {
33 
34 template<typename W, typename A>
35 count_min_sketch<W,A>::count_min_sketch(uint8_t num_hashes, uint32_t num_buckets, uint64_t seed, const A& allocator):
36 _allocator(allocator),
37 _num_hashes(num_hashes),
38 _num_buckets(num_buckets),
39 _sketch_array((num_hashes*num_buckets < 1<<30) ? num_hashes*num_buckets : 0, 0, _allocator),
40 _seed(seed),
41 _total_weight(0) {
42  if (num_buckets < 3) throw std::invalid_argument("Using fewer than 3 buckets incurs relative error greater than 1.");
43 
44  // This check is to ensure later compatibility with a Java implementation whose maximum size can only
45  // be 2^31-1. We check only against 2^30 for simplicity.
46  if (num_buckets * num_hashes >= 1 << 30) {
47  throw std::invalid_argument("These parameters generate a sketch that exceeds 2^30 elements."
48  "Try reducing either the number of buckets or the number of hash functions.");
49  }
50 
51  std::default_random_engine rng(_seed);
52  std::uniform_int_distribution<uint64_t> extra_hash_seeds(0, std::numeric_limits<uint64_t>::max());
53  hash_seeds.reserve(num_hashes);
54 
55  for (uint64_t i=0; i < num_hashes; ++i) {
56  hash_seeds.push_back(extra_hash_seeds(rng) + _seed); // Adds the global seed to all hash functions.
57  }
58 }
59 
60 template<typename W, typename A>
62  return _num_hashes;
63 }
64 
65 template<typename W, typename A>
67  return _num_buckets;
68 }
69 
70 template<typename W, typename A>
72  return _seed;
73 }
74 
75 template<typename W, typename A>
77  return exp(1.0) / double(_num_buckets);
78 }
79 
80 template<typename W, typename A>
82  return _total_weight;
83 }
84 
85 template<typename W, typename A>
86 uint32_t count_min_sketch<W,A>::suggest_num_buckets(double relative_error) {
87  /*
88  * Function to help users select a number of buckets for a given error.
89  * TODO: Change this when we use only power of 2 buckets.
90  */
91  if (relative_error < 0.) {
92  throw std::invalid_argument("Relative error must be at least 0.");
93  }
94  return static_cast<uint32_t>(ceil(exp(1.0) / relative_error));
95 }
96 
97 template<typename W, typename A>
98 uint8_t count_min_sketch<W,A>::suggest_num_hashes(double confidence) {
99  /*
100  * Function to help users select a number of hashes for a given confidence
101  * e.g. confidence = 1 - failure probability
102  * failure probability == delta in the literature.
103  */
104  if (confidence < 0. || confidence > 1.0) {
105  throw std::invalid_argument("Confidence must be between 0 and 1.0 (inclusive).");
106  }
107  return std::min<uint8_t>(ceil(log(1.0 / (1.0 - confidence))), UINT8_MAX);
108 }
109 
110 template<typename W, typename A>
111 std::vector<uint64_t> count_min_sketch<W,A>::get_hashes(const void* item, size_t size) const {
112  /*
113  * Returns the hash locations for the input item using the original hashing
114  * scheme from [1].
115  * Generate _num_hashes separate hashes from calls to murmurmhash.
116  * This could be optimized by keeping both of the 64bit parts of the hash
117  * function, rather than generating a new one for every level.
118  *
119  *
120  * Postscript.
121  * Note that a tradeoff can be achieved over the update time and space
122  * complexity of the sketch by using a combinatorial hashing scheme from
123  * https://github.com/Claudenw/BloomFilter/wiki/Bloom-Filters----An-overview
124  * https://www.eecs.harvard.edu/~michaelm/postscripts/tr-02-05.pdf
125  */
126  uint64_t bucket_index;
127  std::vector<uint64_t> sketch_update_locations;
128  sketch_update_locations.reserve(_num_hashes);
129 
130  uint64_t hash_seed_index = 0;
131  for (const auto &it: hash_seeds) {
132  HashState hashes;
133  MurmurHash3_x64_128(item, size, it, hashes); // ? BEWARE OVERFLOW.
134  uint64_t hash = hashes.h1;
135  bucket_index = hash % _num_buckets;
136  sketch_update_locations.push_back((hash_seed_index * _num_buckets) + bucket_index);
137  hash_seed_index += 1;
138  }
139  return sketch_update_locations;
140 }
141 
142 template<typename W, typename A>
143 W count_min_sketch<W,A>::get_estimate(uint64_t item) const {return get_estimate(&item, sizeof(item));}
144 
145 template<typename W, typename A>
146 W count_min_sketch<W,A>::get_estimate(int64_t item) const {return get_estimate(&item, sizeof(item));}
147 
148 template<typename W, typename A>
149 W count_min_sketch<W,A>::get_estimate(const std::string& item) const {
150  if (item.empty()) return 0; // Empty strings are not inserted into the sketch.
151  return get_estimate(item.c_str(), item.length());
152 }
153 
154 template<typename W, typename A>
155 W count_min_sketch<W,A>::get_estimate(const void* item, size_t size) const {
156  /*
157  * Returns the estimated frequency of the item
158  */
159  std::vector<uint64_t> hash_locations = get_hashes(item, size);
160  std::vector<W> estimates;
161  for (const auto h: hash_locations) {
162  estimates.push_back(_sketch_array[h]);
163  }
164  return *std::min_element(estimates.begin(), estimates.end());
165 }
166 
167 template<typename W, typename A>
168 void count_min_sketch<W,A>::update(uint64_t item, W weight) {
169  update(&item, sizeof(item), weight);
170 }
171 
172 template<typename W, typename A>
173 void count_min_sketch<W,A>::update(int64_t item, W weight) {
174  update(&item, sizeof(item), weight);
175 }
176 
177 template<typename W, typename A>
178 void count_min_sketch<W,A>::update(const std::string& item, W weight) {
179  if (item.empty()) return;
180  update(item.c_str(), item.length(), weight);
181 }
182 
183 template<typename W, typename A>
184 void count_min_sketch<W,A>::update(const void* item, size_t size, W weight) {
185  /*
186  * Gets the item's hash locations and then increments the sketch in those
187  * locations by the weight.
188  */
189  _total_weight += weight >= 0 ? weight : -weight;
190  std::vector<uint64_t> hash_locations = get_hashes(item, size);
191  for (const auto h: hash_locations) {
192  _sketch_array[h] += weight;
193  }
194 }
195 
196 template<typename W, typename A>
197 W count_min_sketch<W,A>::get_upper_bound(uint64_t item) const {return get_upper_bound(&item, sizeof(item));}
198 
199 template<typename W, typename A>
200 W count_min_sketch<W,A>::get_upper_bound(int64_t item) const {return get_upper_bound(&item, sizeof(item));}
201 
202 template<typename W, typename A>
203 W count_min_sketch<W,A>::get_upper_bound(const std::string& item) const {
204  if (item.empty()) return 0; // Empty strings are not inserted into the sketch.
205  return get_upper_bound(item.c_str(), item.length());
206 }
207 
208 template<typename W, typename A>
209 W count_min_sketch<W,A>::get_upper_bound(const void* item, size_t size) const {
210  return static_cast<W>(get_estimate(item, size) + get_relative_error() * get_total_weight());
211 }
212 
213 template<typename W, typename A>
214 W count_min_sketch<W,A>::get_lower_bound(uint64_t item) const {return get_lower_bound(&item, sizeof(item));}
215 
216 template<typename W, typename A>
217 W count_min_sketch<W,A>::get_lower_bound(int64_t item) const {return get_lower_bound(&item, sizeof(item));}
218 
219 template<typename W, typename A>
220 W count_min_sketch<W,A>::get_lower_bound(const std::string& item) const {
221  if (item.empty()) return 0; // Empty strings are not inserted into the sketch.
222  return get_lower_bound(item.c_str(), item.length());
223 }
224 
225 template<typename W, typename A>
226 W count_min_sketch<W,A>::get_lower_bound(const void* item, size_t size) const {
227  return get_estimate(item, size);
228 }
229 
230 template<typename W, typename A>
232  /*
233  * Merges this sketch into other_sketch sketch by elementwise summing of buckets
234  */
235  if (this == &other_sketch) {
236  throw std::invalid_argument( "Cannot merge a sketch with itself." );
237  }
238 
239  bool acceptable_config =
240  (get_num_hashes() == other_sketch.get_num_hashes()) &&
241  (get_num_buckets() == other_sketch.get_num_buckets()) &&
242  (get_seed() == other_sketch.get_seed());
243  if (!acceptable_config) {
244  throw std::invalid_argument( "Incompatible sketch configuration." );
245  }
246 
247  // Merge step - iterate over the other vector and add the weights to this sketch
248  auto it = _sketch_array.begin(); // This is a std::vector iterator.
249  auto other_it = other_sketch.begin(); //This is a const iterator over the other sketch.
250  while (it != _sketch_array.end()) {
251  *it += *other_it;
252  ++it;
253  ++other_it;
254  }
255  _total_weight += other_sketch.get_total_weight();
256 }
257 
258 // Iterators
259 template<typename W, typename A>
260 typename count_min_sketch<W,A>::const_iterator count_min_sketch<W,A>::begin() const {
261  return _sketch_array.begin();
262 }
263 
264 template<typename W, typename A>
265 typename count_min_sketch<W,A>::const_iterator count_min_sketch<W,A>::end() const {
266 return _sketch_array.end();
267 }
268 
269 template<typename W, typename A>
270 void count_min_sketch<W,A>::serialize(std::ostream& os) const {
271  // Long 0
272  //const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_SHORT : PREAMBLE_LONGS_FULL;
273  const uint8_t preamble_longs = PREAMBLE_LONGS_SHORT;
274  const uint8_t ser_ver = SERIAL_VERSION_1;
275  const uint8_t family_id = FAMILY_ID;
276  const uint8_t flags_byte = (is_empty() ? 1 << flags::IS_EMPTY : 0);
277  const uint32_t unused32 = NULL_32;
278  write(os, preamble_longs);
279  write(os, ser_ver);
280  write(os, family_id);
281  write(os, flags_byte);
282  write(os, unused32);
283 
284  // Long 1
285  const uint32_t nbuckets = _num_buckets;
286  const uint8_t nhashes = _num_hashes;
287  const uint16_t seed_hash(compute_seed_hash(_seed));
288  const uint8_t unused8 = NULL_8;
289  write(os, nbuckets);
290  write(os, nhashes);
291  write(os, seed_hash);
292  write(os, unused8);
293  if (is_empty()) return; // sketch is empty, no need to write further bytes.
294 
295  // Long 2
296  write(os, _total_weight);
297 
298  // Long 3 onwards: remaining bytes are consumed by writing the weight and the array values.
299  auto it = _sketch_array.begin();
300  while (it != _sketch_array.end()) {
301  write(os, *it);
302  ++it;
303  }
304 }
305 
306 template<typename W, typename A>
307 auto count_min_sketch<W,A>::deserialize(std::istream& is, uint64_t seed, const A& allocator) -> count_min_sketch {
308 
309  // First 8 bytes are 4 bytes of preamble and 4 unused bytes.
310  const auto preamble_longs = read<uint8_t>(is);
311  const auto serial_version = read<uint8_t>(is);
312  const auto family_id = read<uint8_t>(is);
313  const auto flags_byte = read<uint8_t>(is);
314  read<uint32_t>(is); // 4 unused bytes
315 
316  check_header_validity(preamble_longs, serial_version, family_id, flags_byte);
317 
318  // Sketch parameters
319  const auto nbuckets = read<uint32_t>(is);
320  const auto nhashes = read<uint8_t>(is);
321  const auto seed_hash = read<uint16_t>(is);
322  read<uint8_t>(is); // 1 unused byte
323 
324  if (seed_hash != compute_seed_hash(seed)) {
325  throw std::invalid_argument("Incompatible seed hashes: " + std::to_string(seed_hash) + ", "
326  + std::to_string(compute_seed_hash(seed)));
327  }
328  count_min_sketch c(nhashes, nbuckets, seed, allocator);
329  const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
330  if (is_empty == 1) return c; // sketch is empty, no need to read further.
331 
332  // Set the sketch weight and read in the sketch values
333  const auto weight = read<W>(is);
334  c._total_weight += weight;
335  read(is, c._sketch_array.data(), sizeof(W) * c._sketch_array.size());
336 
337  return c;
338 }
339 
340 template<typename W, typename A>
342  // The header is always 2 longs, whether empty or full
343  const size_t preamble_longs = PREAMBLE_LONGS_SHORT;
344 
345  // If the sketch is empty, we're done. Otherwise, we need the total weight
346  // held by the sketch as well as a data table of size (num_buckets * num_hashes)
347  return (preamble_longs * sizeof(uint64_t)) + (is_empty() ? 0 : sizeof(W) * (1 + _num_buckets * _num_hashes));
348 }
349 
350 template<typename W, typename A>
351 auto count_min_sketch<W,A>::serialize(unsigned header_size_bytes) const -> vector_bytes {
352  vector_bytes bytes(header_size_bytes + get_serialized_size_bytes(), 0, _allocator);
353  uint8_t *ptr = bytes.data() + header_size_bytes;
354 
355  // Long 0
356  const uint8_t preamble_longs = PREAMBLE_LONGS_SHORT;
357  ptr += copy_to_mem(preamble_longs, ptr);
358  const uint8_t ser_ver = SERIAL_VERSION_1;
359  ptr += copy_to_mem(ser_ver, ptr);
360  const uint8_t family_id = FAMILY_ID;
361  ptr += copy_to_mem(family_id, ptr);
362  const uint8_t flags_byte = (is_empty() ? 1 << flags::IS_EMPTY : 0);
363  ptr += copy_to_mem(flags_byte, ptr);
364  const uint32_t unused32 = NULL_32;
365  ptr += copy_to_mem(unused32, ptr);
366 
367  // Long 1
368  const uint32_t nbuckets = _num_buckets;
369  const uint8_t nhashes = _num_hashes;
370  const uint16_t seed_hash(compute_seed_hash(_seed));
371  const uint8_t null_characters_8 = NULL_8;
372  ptr += copy_to_mem(nbuckets, ptr);
373  ptr += copy_to_mem(nhashes, ptr);
374  ptr += copy_to_mem(seed_hash, ptr);
375  ptr += copy_to_mem(null_characters_8, ptr);
376  if (is_empty()) return bytes; // sketch is empty, no need to write further bytes.
377 
378  // Long 2
379  const W t_weight = _total_weight;
380  ptr += copy_to_mem(t_weight, ptr);
381 
382  // Long 3 onwards: remaining bytes are consumed by writing the weight and the array values.
383  auto it = _sketch_array.begin();
384  while (it != _sketch_array.end()) {
385  ptr += copy_to_mem(*it, ptr);
386  ++it;
387  }
388 
389  return bytes;
390 }
391 
392 template<typename W, typename A>
393 auto count_min_sketch<W,A>::deserialize(const void* bytes, size_t size, uint64_t seed, const A& allocator) -> count_min_sketch {
394  ensure_minimum_memory(size, PREAMBLE_LONGS_SHORT * sizeof(uint64_t));
395 
396  const char* ptr = static_cast<const char*>(bytes);
397 
398  // First 8 bytes are 4 bytes of preamble and 4 unused bytes.
399  uint8_t preamble_longs;
400  ptr += copy_from_mem(ptr, preamble_longs);
401  uint8_t serial_version;
402  ptr += copy_from_mem(ptr, serial_version);
403  uint8_t family_id;
404  ptr += copy_from_mem(ptr, family_id);
405  uint8_t flags_byte;
406  ptr += copy_from_mem(ptr, flags_byte);
407  ptr += sizeof(uint32_t);
408 
409  check_header_validity(preamble_longs, serial_version, family_id, flags_byte);
410 
411  // Second 8 bytes are the sketch parameters with a final, unused byte.
412  uint32_t nbuckets;
413  uint8_t nhashes;
414  uint16_t seed_hash;
415  ptr += copy_from_mem(ptr, nbuckets);
416  ptr += copy_from_mem(ptr, nhashes);
417  ptr += copy_from_mem(ptr, seed_hash);
418  ptr += sizeof(uint8_t);
419 
420  if (seed_hash != compute_seed_hash(seed)) {
421  throw std::invalid_argument("Incompatible seed hashes: " + std::to_string(seed_hash) + ", "
422  + std::to_string(compute_seed_hash(seed)));
423  }
424  count_min_sketch c(nhashes, nbuckets, seed, allocator);
425  const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
426  if (is_empty) return c; // sketch is empty, no need to read further.
427 
428  ensure_minimum_memory(size, sizeof(W) * (1 + nbuckets * nhashes));
429 
430  // Long 2 is the weight.
431  W weight;
432  ptr += copy_from_mem(ptr, weight);
433  c._total_weight += weight;
434 
435  // All remaining bytes are the sketch table entries.
436  for (size_t i = 0; i<c._num_buckets*c._num_hashes; ++i) {
437  ptr += copy_from_mem(ptr, c._sketch_array[i]);
438  }
439  return c;
440 }
441 
442 template<typename W, typename A>
444  return _total_weight == 0;
445 }
446 
447 template<typename W, typename A>
449  // count the number of used entries in the sketch
450  uint64_t num_nonzero = 0;
451  for (const auto entry: _sketch_array) {
452  if (entry != static_cast<W>(0.0))
453  ++num_nonzero;
454  }
455 
456  // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements.
457  // The stream does not support passing an allocator instance, and alternatives are complicated.
458  std::ostringstream os;
459  os << "### Count Min sketch summary:" << std::endl;
460  os << " num hashes : " << static_cast<uint32_t>(_num_hashes) << std::endl;
461  os << " num buckets : " << _num_buckets << std::endl;
462  os << " capacity bins : " << _sketch_array.size() << std::endl;
463  os << " filled bins : " << num_nonzero << std::endl;
464  os << " pct filled : " << std::setprecision(3) << (num_nonzero * 100.0) / _sketch_array.size() << "%" << std::endl;
465  os << "### End sketch summary" << std::endl;
466 
467  return string<A>(os.str().c_str(), _allocator);
468 }
469 
470 template<typename W, typename A>
471 void count_min_sketch<W,A>::check_header_validity(uint8_t preamble_longs, uint8_t serial_version, uint8_t family_id, uint8_t flags_byte) {
472  const bool empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
473 
474  const uint8_t sw = (empty ? 1 : 0) + (2 * serial_version) + (4 * family_id) + (32 * (preamble_longs & 0x3F));
475  bool valid = true;
476 
477  switch (sw) { // exhaustive list and description of all valid cases
478  case 138 : break; // !empty, ser_ver==1, family==18, preLongs=2;
479  case 139 : break; // empty, ser_ver==1, family==18, preLongs=2;
480  //case 170 : break; // !empty, ser_ver==1, family==18, preLongs=3;
481  default : // all other case values are invalid
482  valid = false;
483  }
484 
485  if (!valid) {
486  std::ostringstream os;
487  os << "Possible sketch corruption. Inconsistent state: "
488  << "preamble_longs = " << static_cast<uint32_t>(preamble_longs)
489  << ", empty = " << (empty ? "true" : "false")
490  << ", serialization_version = " << static_cast<uint32_t>(serial_version);
491  throw std::invalid_argument(os.str());
492  }
493 }
494 
495 } /* namespace datasketches */
496 
497 #endif
C++ implementation of the CountMin sketch data structure of Cormode and Muthukrishnan.
Definition: count_min.hpp:37
static count_min_sketch deserialize(std::istream &is, uint64_t seed=DEFAULT_SEED, const Allocator &allocator=Allocator())
This method deserializes a sketch from a given stream.
void serialize(std::ostream &os) const
This method serializes the sketch into a given stream in a binary form.
Definition: count_min_impl.hpp:270
static uint32_t suggest_num_buckets(double relative_error)
Suggests the number of buckets required to achieve the given relative error.
Definition: count_min_impl.hpp:86
const_iterator end() const
Iterator pointing to the past-the-end item in the sketch.
Definition: count_min_impl.hpp:265
static uint8_t suggest_num_hashes(double confidence)
Suggests the number of hash functions required to achieve the given confidence.
Definition: count_min_impl.hpp:98
double get_relative_error() const
Definition: count_min_impl.hpp:76
uint32_t get_num_buckets() const
Definition: count_min_impl.hpp:66
bool is_empty() const
Returns true if this sketch is empty.
Definition: count_min_impl.hpp:443
W get_total_weight() const
Definition: count_min_impl.hpp:81
uint8_t get_num_hashes() const
Definition: count_min_impl.hpp:61
W get_upper_bound(const void *item, size_t size) const
Query the sketch for the upper bound of a given item.
Definition: count_min_impl.hpp:209
W get_lower_bound(const void *item, size_t size) const
Query the sketch for the lower bound of a given item.
Definition: count_min_impl.hpp:226
W get_estimate(uint64_t item) const
Specific get_estimate function for uint64_t type see generic get_estimate function.
Definition: count_min_impl.hpp:143
count_min_sketch(uint8_t num_hashes, uint32_t num_buckets, uint64_t seed=DEFAULT_SEED, const Allocator &allocator=Allocator())
Creates an instance of the sketch given parameters _num_hashes, _num_buckets and hash seed,...
Definition: count_min_impl.hpp:35
uint64_t get_seed() const
Definition: count_min_impl.hpp:71
void merge(const count_min_sketch &other_sketch)
Merges another count_min_sketch into this count_min_sketch.
Definition: count_min_impl.hpp:231
void update(const void *item, size_t size, W weight)
Update this sketch with given data of any type.
Definition: count_min_impl.hpp:184
string< Allocator > to_string() const
Returns a string describing the sketch.
Definition: count_min_impl.hpp:448
size_t get_serialized_size_bytes() const
Computes size needed to serialize the current state of the sketch.
Definition: count_min_impl.hpp:341
const_iterator begin() const
Iterator pointing to the first item in the sketch.
Definition: count_min_impl.hpp:260
DataSketches namespace.
Definition: binomial_bounds.hpp:38