20#ifndef COUNT_MIN_IMPL_HPP_
21#define COUNT_MIN_IMPL_HPP_
28#include "MurmurHash3.h"
29#include "count_min.hpp"
30#include "memory_operations.hpp"
34template<
typename W,
typename A>
37_num_hashes(num_hashes),
38_num_buckets(num_buckets),
39_sketch_array((num_hashes*num_buckets < 1<<30) ? num_hashes*num_buckets : 0, 0, _allocator),
42 if (num_buckets < 3)
throw std::invalid_argument(
"Using fewer than 3 buckets incurs relative error greater than 1.");
46 if (num_buckets * num_hashes >= 1 << 30) {
47 throw std::invalid_argument(
"These parameters generate a sketch that exceeds 2^30 elements."
48 "Try reducing either the number of buckets or the number of hash functions.");
51 std::default_random_engine rng(_seed);
52 std::uniform_int_distribution<uint64_t> extra_hash_seeds(0, std::numeric_limits<uint64_t>::max());
53 hash_seeds.reserve(num_hashes);
55 for (uint64_t i=0; i < num_hashes; ++i) {
56 hash_seeds.push_back(extra_hash_seeds(rng) + _seed);
60template<
typename W,
typename A>
65template<
typename W,
typename A>
70template<
typename W,
typename A>
75template<
typename W,
typename A>
77 return exp(1.0) / double(_num_buckets);
80template<
typename W,
typename A>
85template<
typename W,
typename A>
91 if (relative_error < 0.) {
92 throw std::invalid_argument(
"Relative error must be at least 0.");
94 return static_cast<uint32_t
>(ceil(exp(1.0) / relative_error));
97template<
typename W,
typename A>
104 if (confidence < 0. || confidence > 1.0) {
105 throw std::invalid_argument(
"Confidence must be between 0 and 1.0 (inclusive).");
107 return std::min<uint8_t>(ceil(log(1.0 / (1.0 - confidence))), UINT8_MAX);
110template<
typename W,
typename A>
126 uint64_t bucket_index;
127 std::vector<uint64_t> sketch_update_locations;
128 sketch_update_locations.reserve(_num_hashes);
130 uint64_t hash_seed_index = 0;
131 for (
const auto &it: hash_seeds) {
133 MurmurHash3_x64_128(item, size, it, hashes);
134 uint64_t hash = hashes.h1;
135 bucket_index = hash % _num_buckets;
136 sketch_update_locations.push_back((hash_seed_index * _num_buckets) + bucket_index);
137 hash_seed_index += 1;
139 return sketch_update_locations;
142template<
typename W,
typename A>
145template<
typename W,
typename A>
148template<
typename W,
typename A>
150 if (item.empty())
return 0;
151 return get_estimate(item.c_str(), item.length());
154template<
typename W,
typename A>
159 std::vector<uint64_t> hash_locations = get_hashes(item, size);
160 std::vector<W> estimates;
161 for (
const auto h: hash_locations) {
162 estimates.push_back(_sketch_array[h]);
164 return *std::min_element(estimates.begin(), estimates.end());
167template<
typename W,
typename A>
169 update(&item,
sizeof(item), weight);
172template<
typename W,
typename A>
174 update(&item,
sizeof(item), weight);
177template<
typename W,
typename A>
179 if (item.empty())
return;
180 update(item.c_str(), item.length(), weight);
183template<
typename W,
typename A>
189 _total_weight += weight >= 0 ? weight : -weight;
190 std::vector<uint64_t> hash_locations = get_hashes(item, size);
191 for (
const auto h: hash_locations) {
192 _sketch_array[h] += weight;
196template<
typename W,
typename A>
199template<
typename W,
typename A>
202template<
typename W,
typename A>
204 if (item.empty())
return 0;
205 return get_upper_bound(item.c_str(), item.length());
208template<
typename W,
typename A>
210 return static_cast<W
>(get_estimate(item, size) + get_relative_error() * get_total_weight());
213template<
typename W,
typename A>
216template<
typename W,
typename A>
219template<
typename W,
typename A>
221 if (item.empty())
return 0;
222 return get_lower_bound(item.c_str(), item.length());
225template<
typename W,
typename A>
227 return get_estimate(item, size);
230template<
typename W,
typename A>
235 if (
this == &other_sketch) {
236 throw std::invalid_argument(
"Cannot merge a sketch with itself." );
239 bool acceptable_config =
242 (get_seed() == other_sketch.
get_seed());
243 if (!acceptable_config) {
244 throw std::invalid_argument(
"Incompatible sketch configuration." );
248 auto it = _sketch_array.begin();
249 auto other_it = other_sketch.
begin();
250 while (it != _sketch_array.end()) {
259template<
typename W,
typename A>
261 return _sketch_array.begin();
264template<
typename W,
typename A>
266return _sketch_array.end();
269template<
typename W,
typename A>
273 const uint8_t preamble_longs = PREAMBLE_LONGS_SHORT;
274 const uint8_t ser_ver = SERIAL_VERSION_1;
275 const uint8_t family_id = FAMILY_ID;
276 const uint8_t flags_byte = (is_empty() ? 1 << flags::IS_EMPTY : 0);
277 const uint32_t unused32 = NULL_32;
278 write(os, preamble_longs);
280 write(os, family_id);
281 write(os, flags_byte);
285 const uint32_t nbuckets = _num_buckets;
286 const uint8_t nhashes = _num_hashes;
287 const uint16_t seed_hash(compute_seed_hash(_seed));
288 const uint8_t unused8 = NULL_8;
291 write(os, seed_hash);
293 if (is_empty())
return;
296 write(os, _total_weight);
299 auto it = _sketch_array.begin();
300 while (it != _sketch_array.end()) {
306template<
typename W,
typename A>
310 const auto preamble_longs = read<uint8_t>(is);
311 const auto serial_version = read<uint8_t>(is);
312 const auto family_id = read<uint8_t>(is);
313 const auto flags_byte = read<uint8_t>(is);
316 check_header_validity(preamble_longs, serial_version, family_id, flags_byte);
319 const auto nbuckets = read<uint32_t>(is);
320 const auto nhashes = read<uint8_t>(is);
321 const auto seed_hash = read<uint16_t>(is);
324 if (seed_hash != compute_seed_hash(seed)) {
325 throw std::invalid_argument(
"Incompatible seed hashes: " + std::to_string(seed_hash) +
", "
326 + std::to_string(compute_seed_hash(seed)));
328 count_min_sketch c(nhashes, nbuckets, seed, allocator);
329 const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
330 if (is_empty == 1)
return c;
333 const auto weight = read<W>(is);
334 c._total_weight += weight;
335 read(is, c._sketch_array.data(),
sizeof(W) * c._sketch_array.size());
340template<
typename W,
typename A>
343 const size_t preamble_longs = PREAMBLE_LONGS_SHORT;
347 return (preamble_longs *
sizeof(uint64_t)) + (is_empty() ? 0 :
sizeof(W) * (1 + _num_buckets * _num_hashes));
350template<
typename W,
typename A>
352 vector_bytes bytes(header_size_bytes + get_serialized_size_bytes(), 0, _allocator);
353 uint8_t *ptr = bytes.data() + header_size_bytes;
356 const uint8_t preamble_longs = PREAMBLE_LONGS_SHORT;
357 ptr += copy_to_mem(preamble_longs, ptr);
358 const uint8_t ser_ver = SERIAL_VERSION_1;
359 ptr += copy_to_mem(ser_ver, ptr);
360 const uint8_t family_id = FAMILY_ID;
361 ptr += copy_to_mem(family_id, ptr);
362 const uint8_t flags_byte = (is_empty() ? 1 << flags::IS_EMPTY : 0);
363 ptr += copy_to_mem(flags_byte, ptr);
364 const uint32_t unused32 = NULL_32;
365 ptr += copy_to_mem(unused32, ptr);
368 const uint32_t nbuckets = _num_buckets;
369 const uint8_t nhashes = _num_hashes;
370 const uint16_t seed_hash(compute_seed_hash(_seed));
371 const uint8_t null_characters_8 = NULL_8;
372 ptr += copy_to_mem(nbuckets, ptr);
373 ptr += copy_to_mem(nhashes, ptr);
374 ptr += copy_to_mem(seed_hash, ptr);
375 ptr += copy_to_mem(null_characters_8, ptr);
376 if (is_empty())
return bytes;
379 const W t_weight = _total_weight;
380 ptr += copy_to_mem(t_weight, ptr);
383 auto it = _sketch_array.begin();
384 while (it != _sketch_array.end()) {
385 ptr += copy_to_mem(*it, ptr);
392template<
typename W,
typename A>
394 ensure_minimum_memory(size, PREAMBLE_LONGS_SHORT *
sizeof(uint64_t));
396 const char* ptr =
static_cast<const char*
>(bytes);
399 uint8_t preamble_longs;
400 ptr += copy_from_mem(ptr, preamble_longs);
401 uint8_t serial_version;
402 ptr += copy_from_mem(ptr, serial_version);
404 ptr += copy_from_mem(ptr, family_id);
406 ptr += copy_from_mem(ptr, flags_byte);
407 ptr +=
sizeof(uint32_t);
409 check_header_validity(preamble_longs, serial_version, family_id, flags_byte);
415 ptr += copy_from_mem(ptr, nbuckets);
416 ptr += copy_from_mem(ptr, nhashes);
417 ptr += copy_from_mem(ptr, seed_hash);
418 ptr +=
sizeof(uint8_t);
420 if (seed_hash != compute_seed_hash(seed)) {
421 throw std::invalid_argument(
"Incompatible seed hashes: " + std::to_string(seed_hash) +
", "
422 + std::to_string(compute_seed_hash(seed)));
424 count_min_sketch c(nhashes, nbuckets, seed, allocator);
425 const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
426 if (is_empty)
return c;
428 ensure_minimum_memory(size,
sizeof(W) * (1 + nbuckets * nhashes));
432 ptr += copy_from_mem(ptr, weight);
433 c._total_weight += weight;
436 for (
size_t i = 0; i<c._num_buckets*c._num_hashes; ++i) {
437 ptr += copy_from_mem(ptr, c._sketch_array[i]);
442template<
typename W,
typename A>
444 return _total_weight == 0;
447template<
typename W,
typename A>
450 uint64_t num_nonzero = 0;
451 for (
const auto entry: _sketch_array) {
452 if (entry !=
static_cast<W
>(0.0))
458 std::ostringstream os;
459 os <<
"### Count Min sketch summary:" << std::endl;
460 os <<
" num hashes : " <<
static_cast<uint32_t
>(_num_hashes) << std::endl;
461 os <<
" num buckets : " << _num_buckets << std::endl;
462 os <<
" capacity bins : " << _sketch_array.size() << std::endl;
463 os <<
" filled bins : " << num_nonzero << std::endl;
464 os <<
" pct filled : " << std::setprecision(3) << (num_nonzero * 100.0) / _sketch_array.size() <<
"%" << std::endl;
465 os <<
"### End sketch summary" << std::endl;
467 return string<A>(os.str().c_str(), _allocator);
470template<
typename W,
typename A>
472 const bool empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
474 const uint8_t sw = (empty ? 1 : 0) + (2 * serial_version) + (4 * family_id) + (32 * (preamble_longs & 0x3F));
486 std::ostringstream os;
487 os <<
"Possible sketch corruption. Inconsistent state: "
488 <<
"preamble_longs = " <<
static_cast<uint32_t
>(preamble_longs)
489 <<
", empty = " << (empty ?
"true" :
"false")
490 <<
", serialization_version = " << static_cast<uint32_t>(serial_version);
491 throw std::invalid_argument(os.str());
C++ implementation of the CountMin sketch data structure of Cormode and Muthukrishnan.
Definition count_min.hpp:37
static count_min_sketch deserialize(std::istream &is, uint64_t seed=DEFAULT_SEED, const Allocator &allocator=Allocator())
This method deserializes a sketch from a given stream.
void serialize(std::ostream &os) const
This method serializes the sketch into a given stream in a binary form.
Definition count_min_impl.hpp:270
static uint32_t suggest_num_buckets(double relative_error)
Suggests the number of buckets required to achieve the given relative error.
Definition count_min_impl.hpp:86
const_iterator end() const
Iterator pointing to the past-the-end item in the sketch.
Definition count_min_impl.hpp:265
static uint8_t suggest_num_hashes(double confidence)
Suggests the number of hash functions required to achieve the given confidence.
Definition count_min_impl.hpp:98
double get_relative_error() const
Definition count_min_impl.hpp:76
uint32_t get_num_buckets() const
Definition count_min_impl.hpp:66
bool is_empty() const
Returns true if this sketch is empty.
Definition count_min_impl.hpp:443
W get_total_weight() const
Definition count_min_impl.hpp:81
uint8_t get_num_hashes() const
Definition count_min_impl.hpp:61
W get_upper_bound(const void *item, size_t size) const
Query the sketch for the upper bound of a given item.
Definition count_min_impl.hpp:209
W get_lower_bound(const void *item, size_t size) const
Query the sketch for the lower bound of a given item.
Definition count_min_impl.hpp:226
W get_estimate(uint64_t item) const
Specific get_estimate function for uint64_t type see generic get_estimate function.
Definition count_min_impl.hpp:143
count_min_sketch(uint8_t num_hashes, uint32_t num_buckets, uint64_t seed=DEFAULT_SEED, const Allocator &allocator=Allocator())
Creates an instance of the sketch given parameters _num_hashes, _num_buckets and hash seed,...
Definition count_min_impl.hpp:35
uint64_t get_seed() const
Definition count_min_impl.hpp:71
void merge(const count_min_sketch &other_sketch)
Merges another count_min_sketch into this count_min_sketch.
Definition count_min_impl.hpp:231
void update(const void *item, size_t size, W weight)
Update this sketch with given data of any type.
Definition count_min_impl.hpp:184
string< Allocator > to_string() const
Returns a string describing the sketch.
Definition count_min_impl.hpp:448
size_t get_serialized_size_bytes() const
Computes size needed to serialize the current state of the sketch.
Definition count_min_impl.hpp:341
const_iterator begin() const
Iterator pointing to the first item in the sketch.
Definition count_min_impl.hpp:260
DataSketches namespace.
Definition binomial_bounds.hpp:38