20 #ifndef FREQUENT_ITEMS_SKETCH_IMPL_HPP_
21 #define FREQUENT_ITEMS_SKETCH_IMPL_HPP_
28 #include "memory_operations.hpp"
33 template<
typename T,
typename W,
typename H,
typename E,
typename A>
34 const uint8_t frequent_items_sketch<T, W, H, E, A>::LG_MIN_MAP_SIZE;
36 template<
typename T,
typename W,
typename H,
typename E,
typename A>
38 const E& equal,
const A& allocator):
48 if (lg_start_map_size > lg_max_map_size)
throw std::invalid_argument(
"starting size must not be greater than maximum size");
51 template<
typename T,
typename W,
typename H,
typename E,
typename A>
54 if (weight == 0)
return;
55 total_weight += weight;
56 offset += map.adjust_or_insert(item, weight);
59 template<
typename T,
typename W,
typename H,
typename E,
typename A>
62 if (weight == 0)
return;
63 total_weight += weight;
64 offset += map.adjust_or_insert(std::move(item), weight);
67 template<
typename T,
typename W,
typename H,
typename E,
typename A>
71 for (
auto it: other.map) {
72 update(it.first, it.second);
74 offset += other.offset;
75 total_weight = merged_total_weight;
78 template<
typename T,
typename W,
typename H,
typename E,
typename A>
80 if (other.is_empty())
return;
81 const W merged_total_weight = total_weight + other.get_total_weight();
82 for (
auto it: other.map) {
83 update(std::move(it.first), it.second);
85 offset += other.offset;
86 total_weight = merged_total_weight;
89 template<
typename T,
typename W,
typename H,
typename E,
typename A>
91 return map.get_num_active() == 0;
94 template<
typename T,
typename W,
typename H,
typename E,
typename A>
96 return map.get_num_active();
99 template<
typename T,
typename W,
typename H,
typename E,
typename A>
104 template<
typename T,
typename W,
typename H,
typename E,
typename A>
107 const W weight = map.get(item);
108 if (weight > 0)
return weight + offset;
112 template<
typename T,
typename W,
typename H,
typename E,
typename A>
114 return map.get(item);
117 template<
typename T,
typename W,
typename H,
typename E,
typename A>
119 return map.get(item) + offset;
122 template<
typename T,
typename W,
typename H,
typename E,
typename A>
127 template<
typename T,
typename W,
typename H,
typename E,
typename A>
129 return EPSILON_FACTOR / (1 << map.get_lg_max_size());
132 template<
typename T,
typename W,
typename H,
typename E,
typename A>
134 return EPSILON_FACTOR / (1 << lg_max_map_size);
137 template<
typename T,
typename W,
typename H,
typename E,
typename A>
139 return get_epsilon(lg_max_map_size) * estimated_total_weight;
143 template<
typename T,
typename W,
typename H,
typename E,
typename A>
145 return get_frequent_items(err_type, get_maximum_error());
148 template<
typename T,
typename W,
typename H,
typename E,
typename A>
150 vector_row items(map.get_allocator());
152 const W lb = it.second;
153 const W ub = it.second + offset;
155 items.push_back(
row(&it.first, it.second, offset));
159 std::sort(items.begin(), items.end(), [](
row a,
row b){ return a.get_estimate() > b.get_estimate(); });
163 template<
typename T,
typename W,
typename H,
typename E,
typename A>
164 template<
typename SerDe>
166 const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_NONEMPTY;
167 write(os, preamble_longs);
168 const uint8_t serial_version = SERIAL_VERSION;
169 write(os, serial_version);
170 const uint8_t family = FAMILY_ID;
172 const uint8_t lg_max_size = map.get_lg_max_size();
173 write(os, lg_max_size);
174 const uint8_t lg_cur_size = map.get_lg_cur_size();
175 write(os, lg_cur_size);
176 const uint8_t flags_byte(
177 (is_empty() ? 1 << flags::IS_EMPTY_1 : 0)
178 | (is_empty() ? 1 << flags::IS_EMPTY_2 : 0)
180 write(os, flags_byte);
181 const uint16_t unused16 = 0;
184 const uint32_t num_items = map.get_num_active();
185 write(os, num_items);
186 const uint32_t unused32 = 0;
188 write(os, total_weight);
192 using AllocW =
typename std::allocator_traits<A>::template rebind_alloc<W>;
193 AllocW aw(map.get_allocator());
194 W* weights = aw.allocate(num_items);
195 A alloc(map.get_allocator());
196 T* items = alloc.allocate(num_items);
199 new (&items[i]) T(it.first);
200 weights[i++] = it.second;
202 write(os, weights,
sizeof(W) * num_items);
203 aw.deallocate(weights, num_items);
204 sd.serialize(os, items, num_items);
205 for (i = 0; i < num_items; i++) items[i].~T();
206 alloc.deallocate(items, num_items);
210 template<
typename T,
typename W,
typename H,
typename E,
typename A>
211 template<
typename SerDe>
213 if (is_empty())
return PREAMBLE_LONGS_EMPTY *
sizeof(uint64_t);
214 size_t size = PREAMBLE_LONGS_NONEMPTY *
sizeof(uint64_t) + map.get_num_active() *
sizeof(W);
215 for (
auto it: map) size += sd.size_of_item(it.first);
219 template<
typename T,
typename W,
typename H,
typename E,
typename A>
220 template<
typename SerDe>
222 const size_t size = header_size_bytes + get_serialized_size_bytes(sd);
223 vector_bytes bytes(size, 0, map.get_allocator());
224 uint8_t* ptr = bytes.data() + header_size_bytes;
225 uint8_t* end_ptr = ptr + size;
227 const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_NONEMPTY;
228 ptr += copy_to_mem(preamble_longs, ptr);
229 const uint8_t serial_version = SERIAL_VERSION;
230 ptr += copy_to_mem(serial_version, ptr);
231 const uint8_t family = FAMILY_ID;
232 ptr += copy_to_mem(family, ptr);
233 const uint8_t lg_max_size = map.get_lg_max_size();
234 ptr += copy_to_mem(lg_max_size, ptr);
235 const uint8_t lg_cur_size = map.get_lg_cur_size();
236 ptr += copy_to_mem(lg_cur_size, ptr);
237 const uint8_t flags_byte(
238 (is_empty() ? 1 << flags::IS_EMPTY_1 : 0)
239 | (is_empty() ? 1 << flags::IS_EMPTY_2 : 0)
241 ptr += copy_to_mem(flags_byte, ptr);
242 ptr +=
sizeof(uint16_t);
244 const uint32_t num_items = map.get_num_active();
245 ptr += copy_to_mem(num_items, ptr);
246 ptr +=
sizeof(uint32_t);
247 ptr += copy_to_mem(total_weight, ptr);
248 ptr += copy_to_mem(offset, ptr);
251 using AllocW =
typename std::allocator_traits<A>::template rebind_alloc<W>;
252 AllocW aw(map.get_allocator());
253 W* weights = aw.allocate(num_items);
254 A alloc(map.get_allocator());
255 T* items = alloc.allocate(num_items);
258 new (&items[i]) T(it.first);
259 weights[i++] = it.second;
261 ptr += copy_to_mem(weights, ptr,
sizeof(W) * num_items);
262 aw.deallocate(weights, num_items);
263 const size_t bytes_remaining = end_ptr - ptr;
264 ptr += sd.serialize(ptr, bytes_remaining, items, num_items);
265 for (i = 0; i < num_items; i++) items[i].~T();
266 alloc.deallocate(items, num_items);
271 template<
typename T,
typename W,
typename H,
typename E,
typename A>
272 class frequent_items_sketch<T, W, H, E, A>::items_deleter {
274 items_deleter(uint32_t num,
bool destroy,
const A& allocator):
275 allocator_(allocator), num_(num), destroy_(destroy) {}
276 void set_destroy(
bool destroy) { destroy_ = destroy; }
277 void operator() (T* ptr) {
278 if (ptr !=
nullptr) {
280 for (uint32_t i = 0; i < num_; ++i) ptr[i].~T();
282 allocator_.deallocate(ptr, num_);
291 template<
typename T,
typename W,
typename H,
typename E,
typename A>
292 template<
typename SerDe>
294 const SerDe& sd,
const E& equal,
const A& allocator) {
295 const auto preamble_longs = read<uint8_t>(is);
296 const auto serial_version = read<uint8_t>(is);
297 const auto family_id = read<uint8_t>(is);
298 const auto lg_max_size = read<uint8_t>(is);
299 const auto lg_cur_size = read<uint8_t>(is);
300 const auto flags_byte = read<uint8_t>(is);
303 const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY_1)) | (flags_byte & (1 << flags::IS_EMPTY_2));
305 check_preamble_longs(preamble_longs, is_empty);
306 check_serial_version(serial_version);
307 check_family_id(family_id);
308 check_size(lg_cur_size, lg_max_size);
310 frequent_items_sketch sketch(lg_max_size, lg_cur_size, equal, allocator);
312 const auto num_items = read<uint32_t>(is);
314 const auto total_weight = read<W>(is);
315 const auto offset = read<W>(is);
318 using AllocW =
typename std::allocator_traits<A>::template rebind_alloc<W>;
319 std::vector<W, AllocW> weights(num_items, 0, allocator);
320 read(is, weights.data(),
sizeof(W) * num_items);
322 std::unique_ptr<T, items_deleter> items(alloc.allocate(num_items), items_deleter(num_items,
false, alloc));
323 sd.deserialize(is, items.get(), num_items);
324 items.get_deleter().set_destroy(
true);
325 for (uint32_t i = 0; i < num_items; i++) {
326 sketch.update(std::move(items.get()[i]), weights[i]);
328 sketch.total_weight = total_weight;
329 sketch.offset = offset;
332 throw std::runtime_error(
"error reading from std::istream");
336 template<
typename T,
typename W,
typename H,
typename E,
typename A>
337 template<
typename SerDe>
338 frequent_items_sketch<T, W, H, E, A>
frequent_items_sketch<T, W, H, E, A>::deserialize(
const void* bytes,
size_t size,
339 const SerDe& sd,
const E& equal,
const A& allocator) {
340 ensure_minimum_memory(size, 8);
341 const char* ptr =
static_cast<const char*
>(bytes);
342 const char* base =
static_cast<const char*
>(bytes);
343 uint8_t preamble_longs;
344 ptr += copy_from_mem(ptr, preamble_longs);
345 uint8_t serial_version;
346 ptr += copy_from_mem(ptr, serial_version);
348 ptr += copy_from_mem(ptr, family_id);
350 ptr += copy_from_mem(ptr, lg_max_size);
352 ptr += copy_from_mem(ptr, lg_cur_size);
354 ptr += copy_from_mem(ptr, flags_byte);
355 ptr +=
sizeof(uint16_t);
357 const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY_1)) | (flags_byte & (1 << flags::IS_EMPTY_2));
359 check_preamble_longs(preamble_longs, is_empty);
360 check_serial_version(serial_version);
361 check_family_id(family_id);
362 check_size(lg_cur_size, lg_max_size);
363 ensure_minimum_memory(size, preamble_longs *
sizeof(uint64_t));
365 frequent_items_sketch sketch(lg_max_size, lg_cur_size, equal, allocator);
368 ptr += copy_from_mem(ptr, num_items);
369 ptr +=
sizeof(uint32_t);
371 ptr += copy_from_mem(ptr, total_weight);
373 ptr += copy_from_mem(ptr, offset);
375 ensure_minimum_memory(size, ptr - base + (
sizeof(W) * num_items));
377 using AllocW =
typename std::allocator_traits<A>::template rebind_alloc<W>;
378 std::vector<W, AllocW> weights(num_items, 0, allocator);
379 ptr += copy_from_mem(ptr, weights.data(),
sizeof(W) * num_items);
381 std::unique_ptr<T, items_deleter> items(alloc.allocate(num_items), items_deleter(num_items,
false, alloc));
382 const size_t bytes_remaining = size - (ptr - base);
383 ptr += sd.deserialize(ptr, bytes_remaining, items.get(), num_items);
384 items.get_deleter().set_destroy(
true);
385 for (uint32_t i = 0; i < num_items; i++) {
386 sketch.update(std::move(items.get()[i]), weights[i]);
389 sketch.total_weight = total_weight;
390 sketch.offset = offset;
395 template<
typename T,
typename W,
typename H,
typename E,
typename A>
396 void frequent_items_sketch<T, W, H, E, A>::check_preamble_longs(uint8_t preamble_longs,
bool is_empty) {
398 if (preamble_longs != PREAMBLE_LONGS_EMPTY) {
399 throw std::invalid_argument(
"Possible corruption: preamble longs of an empty sketch must be " + std::to_string(PREAMBLE_LONGS_EMPTY) +
": " + std::to_string(preamble_longs));
402 if (preamble_longs != PREAMBLE_LONGS_NONEMPTY) {
403 throw std::invalid_argument(
"Possible corruption: preamble longs of an non-empty sketch must be " + std::to_string(PREAMBLE_LONGS_NONEMPTY) +
": " + std::to_string(preamble_longs));
408 template<
typename T,
typename W,
typename H,
typename E,
typename A>
409 void frequent_items_sketch<T, W, H, E, A>::check_serial_version(uint8_t serial_version) {
410 if (serial_version != SERIAL_VERSION) {
411 throw std::invalid_argument(
"Possible corruption: serial version must be " + std::to_string(SERIAL_VERSION) +
": " + std::to_string(serial_version));
415 template<
typename T,
typename W,
typename H,
typename E,
typename A>
416 void frequent_items_sketch<T, W, H, E, A>::check_family_id(uint8_t family_id) {
417 if (family_id != FAMILY_ID) {
418 throw std::invalid_argument(
"Possible corruption: family ID must be " + std::to_string(FAMILY_ID) +
": " + std::to_string(family_id));
422 template<
typename T,
typename W,
typename H,
typename E,
typename A>
423 void frequent_items_sketch<T, W, H, E, A>::check_size(uint8_t lg_cur_size, uint8_t lg_max_size) {
424 if (lg_cur_size > lg_max_size) {
425 throw std::invalid_argument(
"Possible corruption: expected lg_cur_size <= lg_max_size: " + std::to_string(lg_cur_size) +
" <= " + std::to_string(lg_max_size));
427 if (lg_cur_size < LG_MIN_MAP_SIZE) {
428 throw std::invalid_argument(
"Possible corruption: lg_cur_size must not be less than " + std::to_string(LG_MIN_MAP_SIZE) +
": " + std::to_string(lg_cur_size));
432 template<
typename T,
typename W,
typename H,
typename E,
typename A>
436 std::ostringstream os;
437 os <<
"### Frequent items sketch summary:" << std::endl;
438 os <<
" lg cur map size : " << (int) map.get_lg_cur_size() << std::endl;
439 os <<
" lg max map size : " << (int) map.get_lg_max_size() << std::endl;
440 os <<
" num active items : " << get_num_active_items() << std::endl;
441 os <<
" total weight : " << get_total_weight() << std::endl;
442 os <<
" max error : " << get_maximum_error() << std::endl;
443 os <<
"### End sketch summary" << std::endl;
447 items.push_back(
row(&it.first, it.second, offset));
450 std::sort(items.begin(), items.end(), [](
row a,
row b){ return a.get_estimate() > b.get_estimate(); });
451 os <<
"### Items in descending order by estimate" << std::endl;
452 os <<
" item, estimate, lower bound, upper bound" << std::endl;
453 for (
auto it: items) {
454 os <<
" " << it.get_item() <<
", " << it.get_estimate() <<
", "
455 << it.get_lower_bound() <<
", " << it.get_upper_bound() << std::endl;
457 os <<
"### End items" << std::endl;
459 return string<A>(os.str().c_str(), map.get_allocator());
463 template<
typename T,
typename W,
typename H,
typename E,
typename A>
464 template<typename WW, typename std::enable_if<std::is_integral<WW>::value && std::is_signed<WW>::value,
int>::type>
467 throw std::invalid_argument(
"weight must be non-negative");
472 template<
typename T,
typename W,
typename H,
typename E,
typename A>
473 template<typename WW, typename std::enable_if<std::is_integral<WW>::value && std::is_unsigned<WW>::value,
int>::type>
474 void frequent_items_sketch<T, W, H, E, A>::check_weight(WW) {}
477 template<
typename T,
typename W,
typename H,
typename E,
typename A>
478 template<typename WW, typename std::enable_if<std::is_floating_point<WW>::value,
int>::type>
479 void frequent_items_sketch<T, W, H, E, A>::check_weight(WW weight) {
481 throw std::invalid_argument(
"weight must be non-negative");
483 if (std::isnan(weight)) {
484 throw std::invalid_argument(
"weight must be a valid number");
486 if (std::isinf(weight)) {
487 throw std::invalid_argument(
"weight must be finite");
Row in the output from get_frequent_items.
Definition: frequent_items_sketch.hpp:330
Frequent Items sketch.
Definition: frequent_items_sketch.hpp:55
frequent_items_sketch(uint8_t lg_max_map_size, uint8_t lg_start_map_size=LG_MIN_MAP_SIZE, const E &equal=E(), const A &allocator=A())
Construct this sketch with parameters lg_max_map_size and lg_start_map_size.
Definition: frequent_items_sketch_impl.hpp:37
W get_upper_bound(const T &item) const
Returns the guaranteed upper bound weight (frequency) of the given item.
Definition: frequent_items_sketch_impl.hpp:118
void merge(const frequent_items_sketch &other)
This function merges the other sketch into this one.
Definition: frequent_items_sketch_impl.hpp:68
void serialize(std::ostream &os, const SerDe &sd=SerDe()) const
This method serializes the sketch into a given stream in a binary form.
Definition: frequent_items_sketch_impl.hpp:165
vector_row get_frequent_items(frequent_items_error_type err_type) const
Returns an array of rows that include frequent items, estimates, upper and lower bounds given an erro...
Definition: frequent_items_sketch_impl.hpp:144
bool is_empty() const
Definition: frequent_items_sketch_impl.hpp:90
string< A > to_string(bool print_items=false) const
Returns a human readable summary of this sketch.
Definition: frequent_items_sketch_impl.hpp:433
void update(const T &item, W weight=1)
Update this sketch with an item and a positive weight (frequency count).
Definition: frequent_items_sketch_impl.hpp:52
static double get_apriori_error(uint8_t lg_max_map_size, W estimated_total_weight)
Returns the estimated a priori error given the max_map_size for the sketch and the estimated_total_st...
Definition: frequent_items_sketch_impl.hpp:138
W get_total_weight() const
Returns the sum of the weights (frequencies) in the stream seen so far by the sketch.
Definition: frequent_items_sketch_impl.hpp:100
double get_epsilon() const
Returns epsilon value of this sketch.
Definition: frequent_items_sketch_impl.hpp:128
size_t get_serialized_size_bytes(const SerDe &sd=SerDe()) const
Computes size needed to serialize the current state of the sketch.
Definition: frequent_items_sketch_impl.hpp:212
W get_maximum_error() const
Definition: frequent_items_sketch_impl.hpp:123
W get_estimate(const T &item) const
Returns the estimate of the weight (frequency) of the given item.
Definition: frequent_items_sketch_impl.hpp:105
static frequent_items_sketch deserialize(std::istream &is, const SerDe &sd=SerDe(), const E &equal=E(), const A &allocator=A())
This method deserializes a sketch from a given stream.
W get_lower_bound(const T &item) const
Returns the guaranteed lower bound weight (frequency) of the given item.
Definition: frequent_items_sketch_impl.hpp:113
uint32_t get_num_active_items() const
Definition: frequent_items_sketch_impl.hpp:95
DataSketches namespace.
Definition: binomial_bounds.hpp:38
frequent_items_error_type
Frequent items error type.
Definition: frequent_items_sketch.hpp:36
@ NO_FALSE_NEGATIVES
include an item in the result list if get_upper_bound(item) > threshold
Definition: frequent_items_sketch.hpp:38
@ NO_FALSE_POSITIVES
include an item in the result list if get_lower_bound(item) > threshold
Definition: frequent_items_sketch.hpp:37