20#ifndef COUNT_MIN_IMPL_HPP_
21#define COUNT_MIN_IMPL_HPP_
28#include "MurmurHash3.h"
29#include "count_min.hpp"
30#include "memory_operations.hpp"
34template<
typename W,
typename A>
37_num_hashes(num_hashes),
38_num_buckets(num_buckets),
39_sketch_array((num_hashes*num_buckets < 1<<30) ? num_hashes*num_buckets : 0, 0, _allocator),
42 if (num_buckets < 3) {
43 throw std::invalid_argument(
"Using fewer than 3 buckets incurs relative error greater than 1.");
48 if (num_buckets * num_hashes >= 1 << 30) {
49 throw std::invalid_argument(
"These parameters generate a sketch that exceeds 2^30 elements."
50 "Try reducing either the number of buckets or the number of hash functions.");
53 std::default_random_engine rng(_seed);
54 std::uniform_int_distribution<uint64_t> extra_hash_seeds(0, std::numeric_limits<uint64_t>::max());
55 hash_seeds.reserve(num_hashes);
57 for (uint64_t i=0; i < num_hashes; ++i) {
58 hash_seeds.push_back(extra_hash_seeds(rng) + _seed);
62template<
typename W,
typename A>
67template<
typename W,
typename A>
72template<
typename W,
typename A>
77template<
typename W,
typename A>
79 return exp(1.0) /
static_cast<double>(_num_buckets);
82template<
typename W,
typename A>
87template<
typename W,
typename A>
93 if (relative_error < 0.) {
94 throw std::invalid_argument(
"Relative error must be at least 0.");
96 return static_cast<uint32_t
>(ceil(exp(1.0) / relative_error));
99template<
typename W,
typename A>
106 if (confidence < 0. || confidence > 1.0) {
107 throw std::invalid_argument(
"Confidence must be between 0 and 1.0 (inclusive).");
109 return std::min<uint8_t>(ceil(log(1.0 / (1.0 - confidence))), UINT8_MAX);
112template<
typename W,
typename A>
128 uint64_t bucket_index;
129 std::vector<uint64_t> sketch_update_locations;
130 sketch_update_locations.reserve(_num_hashes);
132 uint64_t hash_seed_index = 0;
133 for (
const auto &it: hash_seeds) {
135 MurmurHash3_x64_128(item, size, it, hashes);
136 uint64_t hash = hashes.h1;
137 bucket_index = hash % _num_buckets;
138 sketch_update_locations.push_back((hash_seed_index * _num_buckets) + bucket_index);
139 hash_seed_index += 1;
141 return sketch_update_locations;
144template<
typename W,
typename A>
147template<
typename W,
typename A>
150template<
typename W,
typename A>
152 if (item.empty()) {
return 0; }
153 return get_estimate(item.c_str(), item.length());
156template<
typename W,
typename A>
161 std::vector<uint64_t> hash_locations = get_hashes(item, size);
162 std::vector<W> estimates;
163 for (
const auto h: hash_locations) {
164 estimates.push_back(_sketch_array[h]);
166 return *std::min_element(estimates.begin(), estimates.end());
169template<
typename W,
typename A>
171 update(&item,
sizeof(item), weight);
174template<
typename W,
typename A>
176 update(&item,
sizeof(item), weight);
179template<
typename W,
typename A>
181 if (item.empty()) {
return; }
182 update(item.c_str(), item.length(), weight);
185template<
typename W,
typename A>
191 _total_weight += weight >= 0 ? weight : -weight;
192 std::vector<uint64_t> hash_locations = get_hashes(item, size);
193 for (
const auto h: hash_locations) {
194 _sketch_array[h] += weight;
198template<
typename W,
typename A>
201template<
typename W,
typename A>
204template<
typename W,
typename A>
206 if (item.empty()) {
return 0; }
207 return get_upper_bound(item.c_str(), item.length());
210template<
typename W,
typename A>
212 return static_cast<W
>(get_estimate(item, size) + get_relative_error() * get_total_weight());
215template<
typename W,
typename A>
218template<
typename W,
typename A>
221template<
typename W,
typename A>
223 if (item.empty()) {
return 0; }
224 return get_lower_bound(item.c_str(), item.length());
227template<
typename W,
typename A>
229 return get_estimate(item, size);
232template<
typename W,
typename A>
237 if (
this == &other_sketch) {
throw std::invalid_argument(
"Cannot merge a sketch with itself." ); }
239 bool acceptable_config =
242 (get_seed() == other_sketch.
get_seed());
243 if (!acceptable_config) {
throw std::invalid_argument(
"Incompatible sketch configuration." ); }
246 auto it = _sketch_array.begin();
247 auto other_it = other_sketch.
begin();
248 while (it != _sketch_array.end()) {
257template<
typename W,
typename A>
259 return _sketch_array.begin();
262template<
typename W,
typename A>
264return _sketch_array.end();
267template<
typename W,
typename A>
271 const uint8_t preamble_longs = PREAMBLE_LONGS_SHORT;
272 const uint8_t ser_ver = SERIAL_VERSION_1;
273 const uint8_t family_id = FAMILY_ID;
274 const uint8_t flags_byte = (is_empty() ? 1 << flags::IS_EMPTY : 0);
275 const uint32_t unused32 = NULL_32;
276 write(os, preamble_longs);
278 write(os, family_id);
279 write(os, flags_byte);
283 const uint32_t nbuckets = _num_buckets;
284 const uint8_t nhashes = _num_hashes;
285 const uint16_t seed_hash(compute_seed_hash(_seed));
286 const uint8_t unused8 = NULL_8;
289 write(os, seed_hash);
291 if (is_empty()) {
return; }
294 write(os, _total_weight);
297 auto it = _sketch_array.begin();
298 while (it != _sketch_array.end()) {
304template<
typename W,
typename A>
308 const auto preamble_longs = read<uint8_t>(is);
309 const auto serial_version = read<uint8_t>(is);
310 const auto family_id = read<uint8_t>(is);
311 const auto flags_byte = read<uint8_t>(is);
314 check_header_validity(preamble_longs, serial_version, family_id, flags_byte);
317 const auto nbuckets = read<uint32_t>(is);
318 const auto nhashes = read<uint8_t>(is);
319 const auto seed_hash = read<uint16_t>(is);
322 if (seed_hash != compute_seed_hash(seed)) {
323 throw std::invalid_argument(
"Incompatible seed hashes: " + std::to_string(seed_hash) +
", "
324 + std::to_string(compute_seed_hash(seed)));
326 count_min_sketch c(nhashes, nbuckets, seed, allocator);
327 const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
328 if (is_empty == 1) {
return c; }
331 const auto weight = read<W>(is);
332 c._total_weight += weight;
333 read(is, c._sketch_array.data(),
sizeof(W) * c._sketch_array.size());
338template<
typename W,
typename A>
341 const size_t preamble_longs = PREAMBLE_LONGS_SHORT;
345 return (preamble_longs *
sizeof(uint64_t)) + (is_empty() ? 0 :
sizeof(W) * (1 + _num_buckets * _num_hashes));
348template<
typename W,
typename A>
350 vector_bytes bytes(header_size_bytes + get_serialized_size_bytes(), 0, _allocator);
351 uint8_t *ptr = bytes.data() + header_size_bytes;
354 const uint8_t preamble_longs = PREAMBLE_LONGS_SHORT;
355 ptr += copy_to_mem(preamble_longs, ptr);
356 const uint8_t ser_ver = SERIAL_VERSION_1;
357 ptr += copy_to_mem(ser_ver, ptr);
358 const uint8_t family_id = FAMILY_ID;
359 ptr += copy_to_mem(family_id, ptr);
360 const uint8_t flags_byte = (is_empty() ? 1 << flags::IS_EMPTY : 0);
361 ptr += copy_to_mem(flags_byte, ptr);
362 const uint32_t unused32 = NULL_32;
363 ptr += copy_to_mem(unused32, ptr);
366 const uint32_t nbuckets = _num_buckets;
367 const uint8_t nhashes = _num_hashes;
368 const uint16_t seed_hash(compute_seed_hash(_seed));
369 const uint8_t null_characters_8 = NULL_8;
370 ptr += copy_to_mem(nbuckets, ptr);
371 ptr += copy_to_mem(nhashes, ptr);
372 ptr += copy_to_mem(seed_hash, ptr);
373 ptr += copy_to_mem(null_characters_8, ptr);
374 if (is_empty()) {
return bytes; }
377 const W t_weight = _total_weight;
378 ptr += copy_to_mem(t_weight, ptr);
381 auto it = _sketch_array.begin();
382 while (it != _sketch_array.end()) {
383 ptr += copy_to_mem(*it, ptr);
390template<
typename W,
typename A>
392 ensure_minimum_memory(size, PREAMBLE_LONGS_SHORT *
sizeof(uint64_t));
394 const char* ptr =
static_cast<const char*
>(bytes);
397 uint8_t preamble_longs;
398 ptr += copy_from_mem(ptr, preamble_longs);
399 uint8_t serial_version;
400 ptr += copy_from_mem(ptr, serial_version);
402 ptr += copy_from_mem(ptr, family_id);
404 ptr += copy_from_mem(ptr, flags_byte);
405 ptr +=
sizeof(uint32_t);
407 check_header_validity(preamble_longs, serial_version, family_id, flags_byte);
413 ptr += copy_from_mem(ptr, nbuckets);
414 ptr += copy_from_mem(ptr, nhashes);
415 ptr += copy_from_mem(ptr, seed_hash);
416 ptr +=
sizeof(uint8_t);
418 if (seed_hash != compute_seed_hash(seed)) {
419 throw std::invalid_argument(
"Incompatible seed hashes: " + std::to_string(seed_hash) +
", "
420 + std::to_string(compute_seed_hash(seed)));
422 count_min_sketch c(nhashes, nbuckets, seed, allocator);
423 const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
424 if (is_empty) {
return c; }
426 ensure_minimum_memory(size,
sizeof(W) * (1 + nbuckets * nhashes));
430 ptr += copy_from_mem(ptr, weight);
431 c._total_weight += weight;
434 for (
size_t i = 0; i<c._num_buckets*c._num_hashes; ++i) {
435 ptr += copy_from_mem(ptr, c._sketch_array[i]);
440template<
typename W,
typename A>
442 return _total_weight == 0;
445template<
typename W,
typename A>
448 uint64_t num_nonzero = 0;
449 for (
const auto entry: _sketch_array) {
450 if (entry !=
static_cast<W
>(0.0)) { ++num_nonzero; }
455 std::ostringstream os;
456 os <<
"### Count Min sketch summary:" << std::endl;
457 os <<
" num hashes : " <<
static_cast<uint32_t
>(_num_hashes) << std::endl;
458 os <<
" num buckets : " << _num_buckets << std::endl;
459 os <<
" capacity bins : " << _sketch_array.size() << std::endl;
460 os <<
" filled bins : " << num_nonzero << std::endl;
461 os <<
" pct filled : " << std::setprecision(3) << (num_nonzero * 100.0) / _sketch_array.size() <<
"%" << std::endl;
462 os <<
"### End sketch summary" << std::endl;
464 return string<A>(os.str().c_str(), _allocator);
467template<
typename W,
typename A>
469 const bool empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
471 const uint8_t sw = (empty ? 1 : 0) + (2 * serial_version) + (4 * family_id) + (32 * (preamble_longs & 0x3F));
483 std::ostringstream os;
484 os <<
"Possible sketch corruption. Inconsistent state: "
485 <<
"preamble_longs = " <<
static_cast<uint32_t
>(preamble_longs)
486 <<
", empty = " << (empty ?
"true" :
"false")
487 <<
", serialization_version = " << static_cast<uint32_t>(serial_version);
488 throw std::invalid_argument(os.str());
C++ implementation of the CountMin sketch data structure of Cormode and Muthukrishnan.
Definition count_min.hpp:37
static count_min_sketch deserialize(std::istream &is, uint64_t seed=DEFAULT_SEED, const Allocator &allocator=Allocator())
This method deserializes a sketch from a given stream.
void serialize(std::ostream &os) const
This method serializes the sketch into a given stream in a binary form.
Definition count_min_impl.hpp:268
static uint32_t suggest_num_buckets(double relative_error)
Suggests the number of buckets required to achieve the given relative error.
Definition count_min_impl.hpp:88
const_iterator end() const
Iterator pointing to the past-the-end item in the sketch.
Definition count_min_impl.hpp:263
static uint8_t suggest_num_hashes(double confidence)
Suggests the number of hash functions required to achieve the given confidence.
Definition count_min_impl.hpp:100
double get_relative_error() const
Definition count_min_impl.hpp:78
uint32_t get_num_buckets() const
Definition count_min_impl.hpp:68
bool is_empty() const
Returns true if this sketch is empty.
Definition count_min_impl.hpp:441
W get_total_weight() const
Definition count_min_impl.hpp:83
uint8_t get_num_hashes() const
Definition count_min_impl.hpp:63
W get_upper_bound(const void *item, size_t size) const
Query the sketch for the upper bound of a given item.
Definition count_min_impl.hpp:211
W get_lower_bound(const void *item, size_t size) const
Query the sketch for the lower bound of a given item.
Definition count_min_impl.hpp:228
W get_estimate(uint64_t item) const
Specific get_estimate function for uint64_t type see generic get_estimate function.
Definition count_min_impl.hpp:145
count_min_sketch(uint8_t num_hashes, uint32_t num_buckets, uint64_t seed=DEFAULT_SEED, const Allocator &allocator=Allocator())
Creates an instance of the sketch given parameters _num_hashes, _num_buckets and hash seed,...
Definition count_min_impl.hpp:35
uint64_t get_seed() const
Definition count_min_impl.hpp:73
void merge(const count_min_sketch &other_sketch)
Merges another count_min_sketch into this count_min_sketch.
Definition count_min_impl.hpp:233
void update(const void *item, size_t size, W weight)
Update this sketch with given data of any type.
Definition count_min_impl.hpp:186
string< Allocator > to_string() const
Returns a string describing the sketch.
Definition count_min_impl.hpp:446
size_t get_serialized_size_bytes() const
Computes size needed to serialize the current state of the sketch.
Definition count_min_impl.hpp:339
const_iterator begin() const
Iterator pointing to the first item in the sketch.
Definition count_min_impl.hpp:258
DataSketches namespace.
Definition binomial_bounds.hpp:38