datasketches-cpp
Loading...
Searching...
No Matches
count_min_impl.hpp
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#ifndef COUNT_MIN_IMPL_HPP_
21#define COUNT_MIN_IMPL_HPP_
22
23#include <algorithm>
24#include <iomanip>
25#include <random>
26#include <sstream>
27
28#include "MurmurHash3.h"
29#include "count_min.hpp"
30#include "memory_operations.hpp"
31
32namespace datasketches {
33
34template<typename W, typename A>
35count_min_sketch<W,A>::count_min_sketch(uint8_t num_hashes, uint32_t num_buckets, uint64_t seed, const A& allocator):
36_allocator(allocator),
37_num_hashes(num_hashes),
38_num_buckets(num_buckets),
39_sketch_array((num_hashes*num_buckets < 1<<30) ? num_hashes*num_buckets : 0, 0, _allocator),
40_seed(seed),
41_total_weight(0) {
42 if (num_buckets < 3) {
43 throw std::invalid_argument("Using fewer than 3 buckets incurs relative error greater than 1.");
44 }
45
46 // This check is to ensure later compatibility with a Java implementation whose maximum size can only
47 // be 2^31-1. We check only against 2^30 for simplicity.
48 if (num_buckets * num_hashes >= 1 << 30) {
49 throw std::invalid_argument("These parameters generate a sketch that exceeds 2^30 elements."
50 "Try reducing either the number of buckets or the number of hash functions.");
51 }
52
53 std::default_random_engine rng(_seed);
54 std::uniform_int_distribution<uint64_t> extra_hash_seeds(0, std::numeric_limits<uint64_t>::max());
55 hash_seeds.reserve(num_hashes);
56
57 for (uint64_t i=0; i < num_hashes; ++i) {
58 hash_seeds.push_back(extra_hash_seeds(rng) + _seed); // Adds the global seed to all hash functions.
59 }
60}
61
62template<typename W, typename A>
64 return _num_hashes;
65}
66
67template<typename W, typename A>
69 return _num_buckets;
70}
71
72template<typename W, typename A>
74 return _seed;
75}
76
77template<typename W, typename A>
79 return exp(1.0) / static_cast<double>(_num_buckets);
80}
81
82template<typename W, typename A>
84 return _total_weight;
85}
86
87template<typename W, typename A>
88uint32_t count_min_sketch<W,A>::suggest_num_buckets(double relative_error) {
89 /*
90 * Function to help users select a number of buckets for a given error.
91 * TODO: Change this when we use only power of 2 buckets.
92 */
93 if (relative_error < 0.) {
94 throw std::invalid_argument("Relative error must be at least 0.");
95 }
96 return static_cast<uint32_t>(ceil(exp(1.0) / relative_error));
97}
98
99template<typename W, typename A>
101 /*
102 * Function to help users select a number of hashes for a given confidence
103 * e.g. confidence = 1 - failure probability
104 * failure probability == delta in the literature.
105 */
106 if (confidence < 0. || confidence > 1.0) {
107 throw std::invalid_argument("Confidence must be between 0 and 1.0 (inclusive).");
108 }
109 return std::min<uint8_t>(ceil(log(1.0 / (1.0 - confidence))), UINT8_MAX);
110}
111
112template<typename W, typename A>
113std::vector<uint64_t> count_min_sketch<W,A>::get_hashes(const void* item, size_t size) const {
114 /*
115 * Returns the hash locations for the input item using the original hashing
116 * scheme from [1].
117 * Generate _num_hashes separate hashes from calls to murmurmhash.
118 * This could be optimized by keeping both of the 64bit parts of the hash
119 * function, rather than generating a new one for every level.
120 *
121 *
122 * Postscript.
123 * Note that a tradeoff can be achieved over the update time and space
124 * complexity of the sketch by using a combinatorial hashing scheme from
125 * https://github.com/Claudenw/BloomFilter/wiki/Bloom-Filters----An-overview
126 * https://www.eecs.harvard.edu/~michaelm/postscripts/tr-02-05.pdf
127 */
128 uint64_t bucket_index;
129 std::vector<uint64_t> sketch_update_locations;
130 sketch_update_locations.reserve(_num_hashes);
131
132 uint64_t hash_seed_index = 0;
133 for (const auto &it: hash_seeds) {
134 HashState hashes;
135 MurmurHash3_x64_128(item, size, it, hashes); // ? BEWARE OVERFLOW.
136 uint64_t hash = hashes.h1;
137 bucket_index = hash % _num_buckets;
138 sketch_update_locations.push_back((hash_seed_index * _num_buckets) + bucket_index);
139 hash_seed_index += 1;
140 }
141 return sketch_update_locations;
142}
143
144template<typename W, typename A>
145W count_min_sketch<W,A>::get_estimate(uint64_t item) const {return get_estimate(&item, sizeof(item));}
146
147template<typename W, typename A>
148W count_min_sketch<W,A>::get_estimate(int64_t item) const {return get_estimate(&item, sizeof(item));}
149
150template<typename W, typename A>
151W count_min_sketch<W,A>::get_estimate(const std::string& item) const {
152 if (item.empty()) { return 0; } // Empty strings are not inserted into the sketch.
153 return get_estimate(item.c_str(), item.length());
154}
155
156template<typename W, typename A>
157W count_min_sketch<W,A>::get_estimate(const void* item, size_t size) const {
158 /*
159 * Returns the estimated frequency of the item
160 */
161 std::vector<uint64_t> hash_locations = get_hashes(item, size);
162 std::vector<W> estimates;
163 for (const auto h: hash_locations) {
164 estimates.push_back(_sketch_array[h]);
165 }
166 return *std::min_element(estimates.begin(), estimates.end());
167}
168
169template<typename W, typename A>
170void count_min_sketch<W,A>::update(uint64_t item, W weight) {
171 update(&item, sizeof(item), weight);
172}
173
174template<typename W, typename A>
175void count_min_sketch<W,A>::update(int64_t item, W weight) {
176 update(&item, sizeof(item), weight);
177}
178
179template<typename W, typename A>
180void count_min_sketch<W,A>::update(const std::string& item, W weight) {
181 if (item.empty()) { return; }
182 update(item.c_str(), item.length(), weight);
183}
184
185template<typename W, typename A>
186void count_min_sketch<W,A>::update(const void* item, size_t size, W weight) {
187 /*
188 * Gets the item's hash locations and then increments the sketch in those
189 * locations by the weight.
190 */
191 _total_weight += weight >= 0 ? weight : -weight;
192 std::vector<uint64_t> hash_locations = get_hashes(item, size);
193 for (const auto h: hash_locations) {
194 _sketch_array[h] += weight;
195 }
196}
197
198template<typename W, typename A>
199W count_min_sketch<W,A>::get_upper_bound(uint64_t item) const {return get_upper_bound(&item, sizeof(item));}
200
201template<typename W, typename A>
202W count_min_sketch<W,A>::get_upper_bound(int64_t item) const {return get_upper_bound(&item, sizeof(item));}
203
204template<typename W, typename A>
205W count_min_sketch<W,A>::get_upper_bound(const std::string& item) const {
206 if (item.empty()) { return 0; } // Empty strings are not inserted into the sketch.
207 return get_upper_bound(item.c_str(), item.length());
208}
209
210template<typename W, typename A>
211W count_min_sketch<W,A>::get_upper_bound(const void* item, size_t size) const {
212 return static_cast<W>(get_estimate(item, size) + get_relative_error() * get_total_weight());
213}
214
215template<typename W, typename A>
216W count_min_sketch<W,A>::get_lower_bound(uint64_t item) const {return get_lower_bound(&item, sizeof(item));}
217
218template<typename W, typename A>
219W count_min_sketch<W,A>::get_lower_bound(int64_t item) const {return get_lower_bound(&item, sizeof(item));}
220
221template<typename W, typename A>
222W count_min_sketch<W,A>::get_lower_bound(const std::string& item) const {
223 if (item.empty()) { return 0; } // Empty strings are not inserted into the sketch.
224 return get_lower_bound(item.c_str(), item.length());
225}
226
227template<typename W, typename A>
228W count_min_sketch<W,A>::get_lower_bound(const void* item, size_t size) const {
229 return get_estimate(item, size);
230}
231
232template<typename W, typename A>
234 /*
235 * Merges this sketch into other_sketch sketch by elementwise summing of buckets
236 */
237 if (this == &other_sketch) { throw std::invalid_argument( "Cannot merge a sketch with itself." ); }
238
239 bool acceptable_config =
240 (get_num_hashes() == other_sketch.get_num_hashes()) &&
241 (get_num_buckets() == other_sketch.get_num_buckets()) &&
242 (get_seed() == other_sketch.get_seed());
243 if (!acceptable_config) { throw std::invalid_argument( "Incompatible sketch configuration." ); }
244
245 // Merge step - iterate over the other vector and add the weights to this sketch
246 auto it = _sketch_array.begin(); // This is a std::vector iterator.
247 auto other_it = other_sketch.begin(); //This is a const iterator over the other sketch.
248 while (it != _sketch_array.end()) {
249 *it += *other_it;
250 ++it;
251 ++other_it;
252 }
253 _total_weight += other_sketch.get_total_weight();
254}
255
256// Iterators
257template<typename W, typename A>
258typename count_min_sketch<W,A>::const_iterator count_min_sketch<W,A>::begin() const {
259 return _sketch_array.begin();
260}
261
262template<typename W, typename A>
263typename count_min_sketch<W,A>::const_iterator count_min_sketch<W,A>::end() const {
264return _sketch_array.end();
265}
266
267template<typename W, typename A>
268void count_min_sketch<W,A>::serialize(std::ostream& os) const {
269 // Long 0
270 //const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_SHORT : PREAMBLE_LONGS_FULL;
271 const uint8_t preamble_longs = PREAMBLE_LONGS_SHORT;
272 const uint8_t ser_ver = SERIAL_VERSION_1;
273 const uint8_t family_id = FAMILY_ID;
274 const uint8_t flags_byte = (is_empty() ? 1 << flags::IS_EMPTY : 0);
275 const uint32_t unused32 = NULL_32;
276 write(os, preamble_longs);
277 write(os, ser_ver);
278 write(os, family_id);
279 write(os, flags_byte);
280 write(os, unused32);
281
282 // Long 1
283 const uint32_t nbuckets = _num_buckets;
284 const uint8_t nhashes = _num_hashes;
285 const uint16_t seed_hash(compute_seed_hash(_seed));
286 const uint8_t unused8 = NULL_8;
287 write(os, nbuckets);
288 write(os, nhashes);
289 write(os, seed_hash);
290 write(os, unused8);
291 if (is_empty()) { return; } // sketch is empty, no need to write further bytes.
292
293 // Long 2
294 write(os, _total_weight);
295
296 // Long 3 onwards: remaining bytes are consumed by writing the weight and the array values.
297 auto it = _sketch_array.begin();
298 while (it != _sketch_array.end()) {
299 write(os, *it);
300 ++it;
301 }
302}
303
304template<typename W, typename A>
305auto count_min_sketch<W,A>::deserialize(std::istream& is, uint64_t seed, const A& allocator) -> count_min_sketch {
306
307 // First 8 bytes are 4 bytes of preamble and 4 unused bytes.
308 const auto preamble_longs = read<uint8_t>(is);
309 const auto serial_version = read<uint8_t>(is);
310 const auto family_id = read<uint8_t>(is);
311 const auto flags_byte = read<uint8_t>(is);
312 read<uint32_t>(is); // 4 unused bytes
313
314 check_header_validity(preamble_longs, serial_version, family_id, flags_byte);
315
316 // Sketch parameters
317 const auto nbuckets = read<uint32_t>(is);
318 const auto nhashes = read<uint8_t>(is);
319 const auto seed_hash = read<uint16_t>(is);
320 read<uint8_t>(is); // 1 unused byte
321
322 if (seed_hash != compute_seed_hash(seed)) {
323 throw std::invalid_argument("Incompatible seed hashes: " + std::to_string(seed_hash) + ", "
324 + std::to_string(compute_seed_hash(seed)));
325 }
326 count_min_sketch c(nhashes, nbuckets, seed, allocator);
327 const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
328 if (is_empty == 1) { return c; } // sketch is empty, no need to read further.
329
330 // Set the sketch weight and read in the sketch values
331 const auto weight = read<W>(is);
332 c._total_weight += weight;
333 read(is, c._sketch_array.data(), sizeof(W) * c._sketch_array.size());
334
335 return c;
336}
337
338template<typename W, typename A>
340 // The header is always 2 longs, whether empty or full
341 const size_t preamble_longs = PREAMBLE_LONGS_SHORT;
342
343 // If the sketch is empty, we're done. Otherwise, we need the total weight
344 // held by the sketch as well as a data table of size (num_buckets * num_hashes)
345 return (preamble_longs * sizeof(uint64_t)) + (is_empty() ? 0 : sizeof(W) * (1 + _num_buckets * _num_hashes));
346}
347
348template<typename W, typename A>
349auto count_min_sketch<W,A>::serialize(unsigned header_size_bytes) const -> vector_bytes {
350 vector_bytes bytes(header_size_bytes + get_serialized_size_bytes(), 0, _allocator);
351 uint8_t *ptr = bytes.data() + header_size_bytes;
352
353 // Long 0
354 const uint8_t preamble_longs = PREAMBLE_LONGS_SHORT;
355 ptr += copy_to_mem(preamble_longs, ptr);
356 const uint8_t ser_ver = SERIAL_VERSION_1;
357 ptr += copy_to_mem(ser_ver, ptr);
358 const uint8_t family_id = FAMILY_ID;
359 ptr += copy_to_mem(family_id, ptr);
360 const uint8_t flags_byte = (is_empty() ? 1 << flags::IS_EMPTY : 0);
361 ptr += copy_to_mem(flags_byte, ptr);
362 const uint32_t unused32 = NULL_32;
363 ptr += copy_to_mem(unused32, ptr);
364
365 // Long 1
366 const uint32_t nbuckets = _num_buckets;
367 const uint8_t nhashes = _num_hashes;
368 const uint16_t seed_hash(compute_seed_hash(_seed));
369 const uint8_t null_characters_8 = NULL_8;
370 ptr += copy_to_mem(nbuckets, ptr);
371 ptr += copy_to_mem(nhashes, ptr);
372 ptr += copy_to_mem(seed_hash, ptr);
373 ptr += copy_to_mem(null_characters_8, ptr);
374 if (is_empty()) { return bytes; } // sketch is empty, no need to write further bytes.
375
376 // Long 2
377 const W t_weight = _total_weight;
378 ptr += copy_to_mem(t_weight, ptr);
379
380 // Long 3 onwards: remaining bytes are consumed by writing the weight and the array values.
381 auto it = _sketch_array.begin();
382 while (it != _sketch_array.end()) {
383 ptr += copy_to_mem(*it, ptr);
384 ++it;
385 }
386
387 return bytes;
388}
389
390template<typename W, typename A>
391auto count_min_sketch<W,A>::deserialize(const void* bytes, size_t size, uint64_t seed, const A& allocator) -> count_min_sketch {
392 ensure_minimum_memory(size, PREAMBLE_LONGS_SHORT * sizeof(uint64_t));
393
394 const char* ptr = static_cast<const char*>(bytes);
395
396 // First 8 bytes are 4 bytes of preamble and 4 unused bytes.
397 uint8_t preamble_longs;
398 ptr += copy_from_mem(ptr, preamble_longs);
399 uint8_t serial_version;
400 ptr += copy_from_mem(ptr, serial_version);
401 uint8_t family_id;
402 ptr += copy_from_mem(ptr, family_id);
403 uint8_t flags_byte;
404 ptr += copy_from_mem(ptr, flags_byte);
405 ptr += sizeof(uint32_t);
406
407 check_header_validity(preamble_longs, serial_version, family_id, flags_byte);
408
409 // Second 8 bytes are the sketch parameters with a final, unused byte.
410 uint32_t nbuckets;
411 uint8_t nhashes;
412 uint16_t seed_hash;
413 ptr += copy_from_mem(ptr, nbuckets);
414 ptr += copy_from_mem(ptr, nhashes);
415 ptr += copy_from_mem(ptr, seed_hash);
416 ptr += sizeof(uint8_t);
417
418 if (seed_hash != compute_seed_hash(seed)) {
419 throw std::invalid_argument("Incompatible seed hashes: " + std::to_string(seed_hash) + ", "
420 + std::to_string(compute_seed_hash(seed)));
421 }
422 count_min_sketch c(nhashes, nbuckets, seed, allocator);
423 const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
424 if (is_empty) { return c; } // sketch is empty, no need to read further.
425
426 ensure_minimum_memory(size, sizeof(W) * (1 + nbuckets * nhashes));
427
428 // Long 2 is the weight.
429 W weight;
430 ptr += copy_from_mem(ptr, weight);
431 c._total_weight += weight;
432
433 // All remaining bytes are the sketch table entries.
434 for (size_t i = 0; i<c._num_buckets*c._num_hashes; ++i) {
435 ptr += copy_from_mem(ptr, c._sketch_array[i]);
436 }
437 return c;
438}
439
440template<typename W, typename A>
442 return _total_weight == 0;
443}
444
445template<typename W, typename A>
447 // count the number of used entries in the sketch
448 uint64_t num_nonzero = 0;
449 for (const auto entry: _sketch_array) {
450 if (entry != static_cast<W>(0.0)) { ++num_nonzero; }
451 }
452
453 // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements.
454 // The stream does not support passing an allocator instance, and alternatives are complicated.
455 std::ostringstream os;
456 os << "### Count Min sketch summary:" << std::endl;
457 os << " num hashes : " << static_cast<uint32_t>(_num_hashes) << std::endl;
458 os << " num buckets : " << _num_buckets << std::endl;
459 os << " capacity bins : " << _sketch_array.size() << std::endl;
460 os << " filled bins : " << num_nonzero << std::endl;
461 os << " pct filled : " << std::setprecision(3) << (num_nonzero * 100.0) / _sketch_array.size() << "%" << std::endl;
462 os << "### End sketch summary" << std::endl;
463
464 return string<A>(os.str().c_str(), _allocator);
465}
466
467template<typename W, typename A>
468void count_min_sketch<W,A>::check_header_validity(uint8_t preamble_longs, uint8_t serial_version, uint8_t family_id, uint8_t flags_byte) {
469 const bool empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
470
471 const uint8_t sw = (empty ? 1 : 0) + (2 * serial_version) + (4 * family_id) + (32 * (preamble_longs & 0x3F));
472 bool valid = true;
473
474 switch (sw) { // exhaustive list and description of all valid cases
475 case 138 : break; // !empty, ser_ver==1, family==18, preLongs=2;
476 case 139 : break; // empty, ser_ver==1, family==18, preLongs=2;
477 //case 170 : break; // !empty, ser_ver==1, family==18, preLongs=3;
478 default : // all other case values are invalid
479 valid = false;
480 }
481
482 if (!valid) {
483 std::ostringstream os;
484 os << "Possible sketch corruption. Inconsistent state: "
485 << "preamble_longs = " << static_cast<uint32_t>(preamble_longs)
486 << ", empty = " << (empty ? "true" : "false")
487 << ", serialization_version = " << static_cast<uint32_t>(serial_version);
488 throw std::invalid_argument(os.str());
489 }
490}
491
492} /* namespace datasketches */
493
494#endif
C++ implementation of the CountMin sketch data structure of Cormode and Muthukrishnan.
Definition count_min.hpp:37
static count_min_sketch deserialize(std::istream &is, uint64_t seed=DEFAULT_SEED, const Allocator &allocator=Allocator())
This method deserializes a sketch from a given stream.
void serialize(std::ostream &os) const
This method serializes the sketch into a given stream in a binary form.
Definition count_min_impl.hpp:268
static uint32_t suggest_num_buckets(double relative_error)
Suggests the number of buckets required to achieve the given relative error.
Definition count_min_impl.hpp:88
const_iterator end() const
Iterator pointing to the past-the-end item in the sketch.
Definition count_min_impl.hpp:263
static uint8_t suggest_num_hashes(double confidence)
Suggests the number of hash functions required to achieve the given confidence.
Definition count_min_impl.hpp:100
double get_relative_error() const
Definition count_min_impl.hpp:78
uint32_t get_num_buckets() const
Definition count_min_impl.hpp:68
bool is_empty() const
Returns true if this sketch is empty.
Definition count_min_impl.hpp:441
W get_total_weight() const
Definition count_min_impl.hpp:83
uint8_t get_num_hashes() const
Definition count_min_impl.hpp:63
W get_upper_bound(const void *item, size_t size) const
Query the sketch for the upper bound of a given item.
Definition count_min_impl.hpp:211
W get_lower_bound(const void *item, size_t size) const
Query the sketch for the lower bound of a given item.
Definition count_min_impl.hpp:228
W get_estimate(uint64_t item) const
Specific get_estimate function for uint64_t type see generic get_estimate function.
Definition count_min_impl.hpp:145
count_min_sketch(uint8_t num_hashes, uint32_t num_buckets, uint64_t seed=DEFAULT_SEED, const Allocator &allocator=Allocator())
Creates an instance of the sketch given parameters _num_hashes, _num_buckets and hash seed,...
Definition count_min_impl.hpp:35
uint64_t get_seed() const
Definition count_min_impl.hpp:73
void merge(const count_min_sketch &other_sketch)
Merges another count_min_sketch into this count_min_sketch.
Definition count_min_impl.hpp:233
void update(const void *item, size_t size, W weight)
Update this sketch with given data of any type.
Definition count_min_impl.hpp:186
string< Allocator > to_string() const
Returns a string describing the sketch.
Definition count_min_impl.hpp:446
size_t get_serialized_size_bytes() const
Computes size needed to serialize the current state of the sketch.
Definition count_min_impl.hpp:339
const_iterator begin() const
Iterator pointing to the first item in the sketch.
Definition count_min_impl.hpp:258
DataSketches namespace.
Definition binomial_bounds.hpp:38