datasketches-cpp
Loading...
Searching...
No Matches
frequent_items_sketch_impl.hpp
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#ifndef FREQUENT_ITEMS_SKETCH_IMPL_HPP_
21#define FREQUENT_ITEMS_SKETCH_IMPL_HPP_
22
23#include <cstring>
24#include <limits>
25#include <sstream>
26#include <stdexcept>
27
28#include "memory_operations.hpp"
29
30namespace datasketches {
31
32// clang++ seems to require this declaration for CMAKE_BUILD_TYPE='Debug"
33template<typename T, typename W, typename H, typename E, typename A>
34const uint8_t frequent_items_sketch<T, W, H, E, A>::LG_MIN_MAP_SIZE;
35
36template<typename T, typename W, typename H, typename E, typename A>
37frequent_items_sketch<T, W, H, E, A>::frequent_items_sketch(uint8_t lg_max_map_size, uint8_t lg_start_map_size,
38 const E& equal, const A& allocator):
39total_weight(0),
40offset(0),
41map(
42 std::max(lg_start_map_size, frequent_items_sketch::LG_MIN_MAP_SIZE),
43 std::max(lg_max_map_size, frequent_items_sketch::LG_MIN_MAP_SIZE),
44 equal,
45 allocator
46)
47{
48 if (lg_start_map_size > lg_max_map_size) throw std::invalid_argument("starting size must not be greater than maximum size");
49}
50
51template<typename T, typename W, typename H, typename E, typename A>
52void frequent_items_sketch<T, W, H, E, A>::update(const T& item, W weight) {
53 check_weight(weight);
54 if (weight == 0) return;
55 total_weight += weight;
56 offset += map.adjust_or_insert(item, weight);
57}
58
59template<typename T, typename W, typename H, typename E, typename A>
61 check_weight(weight);
62 if (weight == 0) return;
63 total_weight += weight;
64 offset += map.adjust_or_insert(std::move(item), weight);
65}
66
67template<typename T, typename W, typename H, typename E, typename A>
69 if (other.is_empty()) return;
70 const W merged_total_weight = total_weight + other.get_total_weight(); // for correction at the end
71 for (auto it: other.map) {
72 update(it.first, it.second);
73 }
74 offset += other.offset;
75 total_weight = merged_total_weight;
76}
77
78template<typename T, typename W, typename H, typename E, typename A>
80 if (other.is_empty()) return;
81 const W merged_total_weight = total_weight + other.get_total_weight(); // for correction at the end
82 for (auto it: other.map) {
83 update(std::move(it.first), it.second);
84 }
85 offset += other.offset;
86 total_weight = merged_total_weight;
87}
88
89template<typename T, typename W, typename H, typename E, typename A>
91 return map.get_num_active() == 0;
92}
93
94template<typename T, typename W, typename H, typename E, typename A>
96 return map.get_num_active();
97}
98
99template<typename T, typename W, typename H, typename E, typename A>
101 return total_weight;
102}
103
104template<typename T, typename W, typename H, typename E, typename A>
106 // if item is tracked estimate = weight + offset, otherwise 0
107 const W weight = map.get(item);
108 if (weight > 0) return weight + offset;
109 return 0;
110}
111
112template<typename T, typename W, typename H, typename E, typename A>
114 return map.get(item);
115}
116
117template<typename T, typename W, typename H, typename E, typename A>
119 return map.get(item) + offset;
120}
121
122template<typename T, typename W, typename H, typename E, typename A>
126
127template<typename T, typename W, typename H, typename E, typename A>
129 return EPSILON_FACTOR / (1 << map.get_lg_max_size());
130}
131
132template<typename T, typename W, typename H, typename E, typename A>
134 return EPSILON_FACTOR / (1 << lg_max_map_size);
135}
136
137template<typename T, typename W, typename H, typename E, typename A>
138double frequent_items_sketch<T, W, H, E, A>::get_apriori_error(uint8_t lg_max_map_size, W estimated_total_weight) {
139 return get_epsilon(lg_max_map_size) * estimated_total_weight;
140}
141
142
143template<typename T, typename W, typename H, typename E, typename A>
145 return get_frequent_items(err_type, get_maximum_error());
146}
147
148template<typename T, typename W, typename H, typename E, typename A>
150 vector_row items(map.get_allocator());
151 for (auto it: map) {
152 const W lb = it.second;
153 const W ub = it.second + offset;
154 if ((err_type == NO_FALSE_NEGATIVES && ub > threshold) || (err_type == NO_FALSE_POSITIVES && lb > threshold)) {
155 items.push_back(row(&it.first, it.second, offset));
156 }
157 }
158 // sort by estimate in descending order
159 std::sort(items.begin(), items.end(), [](row a, row b){ return a.get_estimate() > b.get_estimate(); });
160 return items;
161}
162
163template<typename T, typename W, typename H, typename E, typename A>
164template<typename SerDe>
165void frequent_items_sketch<T, W, H, E, A>::serialize(std::ostream& os, const SerDe& sd) const {
166 const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_NONEMPTY;
167 write(os, preamble_longs);
168 const uint8_t serial_version = SERIAL_VERSION;
169 write(os, serial_version);
170 const uint8_t family = FAMILY_ID;
171 write(os, family);
172 const uint8_t lg_max_size = map.get_lg_max_size();
173 write(os, lg_max_size);
174 const uint8_t lg_cur_size = map.get_lg_cur_size();
175 write(os, lg_cur_size);
176 const uint8_t flags_byte(
177 (is_empty() ? 1 << flags::IS_EMPTY_1 : 0)
178 | (is_empty() ? 1 << flags::IS_EMPTY_2 : 0)
179 );
180 write(os, flags_byte);
181 const uint16_t unused16 = 0;
182 write(os, unused16);
183 if (!is_empty()) {
184 const uint32_t num_items = map.get_num_active();
185 write(os, num_items);
186 const uint32_t unused32 = 0;
187 write(os, unused32);
188 write(os, total_weight);
189 write(os, offset);
190
191 // copy active items and their weights to use batch serialization
192 using AllocW = typename std::allocator_traits<A>::template rebind_alloc<W>;
193 AllocW aw(map.get_allocator());
194 W* weights = aw.allocate(num_items);
195 A alloc(map.get_allocator());
196 T* items = alloc.allocate(num_items);
197 uint32_t i = 0;
198 for (auto it: map) {
199 new (&items[i]) T(it.first);
200 weights[i++] = it.second;
201 }
202 write(os, weights, sizeof(W) * num_items);
203 aw.deallocate(weights, num_items);
204 sd.serialize(os, items, num_items);
205 for (i = 0; i < num_items; i++) items[i].~T();
206 alloc.deallocate(items, num_items);
207 }
208}
209
210template<typename T, typename W, typename H, typename E, typename A>
211template<typename SerDe>
213 if (is_empty()) return PREAMBLE_LONGS_EMPTY * sizeof(uint64_t);
214 size_t size = PREAMBLE_LONGS_NONEMPTY * sizeof(uint64_t) + map.get_num_active() * sizeof(W);
215 for (auto it: map) size += sd.size_of_item(it.first);
216 return size;
217}
218
219template<typename T, typename W, typename H, typename E, typename A>
220template<typename SerDe>
221auto frequent_items_sketch<T, W, H, E, A>::serialize(unsigned header_size_bytes, const SerDe& sd) const -> vector_bytes {
222 const size_t size = header_size_bytes + get_serialized_size_bytes(sd);
223 vector_bytes bytes(size, 0, map.get_allocator());
224 uint8_t* ptr = bytes.data() + header_size_bytes;
225 uint8_t* end_ptr = ptr + size;
226
227 const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_NONEMPTY;
228 ptr += copy_to_mem(preamble_longs, ptr);
229 const uint8_t serial_version = SERIAL_VERSION;
230 ptr += copy_to_mem(serial_version, ptr);
231 const uint8_t family = FAMILY_ID;
232 ptr += copy_to_mem(family, ptr);
233 const uint8_t lg_max_size = map.get_lg_max_size();
234 ptr += copy_to_mem(lg_max_size, ptr);
235 const uint8_t lg_cur_size = map.get_lg_cur_size();
236 ptr += copy_to_mem(lg_cur_size, ptr);
237 const uint8_t flags_byte(
238 (is_empty() ? 1 << flags::IS_EMPTY_1 : 0)
239 | (is_empty() ? 1 << flags::IS_EMPTY_2 : 0)
240 );
241 ptr += copy_to_mem(flags_byte, ptr);
242 ptr += sizeof(uint16_t); // unused
243 if (!is_empty()) {
244 const uint32_t num_items = map.get_num_active();
245 ptr += copy_to_mem(num_items, ptr);
246 ptr += sizeof(uint32_t); // unused
247 ptr += copy_to_mem(total_weight, ptr);
248 ptr += copy_to_mem(offset, ptr);
249
250 // copy active items and their weights to use batch serialization
251 using AllocW = typename std::allocator_traits<A>::template rebind_alloc<W>;
252 AllocW aw(map.get_allocator());
253 W* weights = aw.allocate(num_items);
254 A alloc(map.get_allocator());
255 T* items = alloc.allocate(num_items);
256 uint32_t i = 0;
257 for (auto it: map) {
258 new (&items[i]) T(it.first);
259 weights[i++] = it.second;
260 }
261 ptr += copy_to_mem(weights, ptr, sizeof(W) * num_items);
262 aw.deallocate(weights, num_items);
263 const size_t bytes_remaining = end_ptr - ptr;
264 ptr += sd.serialize(ptr, bytes_remaining, items, num_items);
265 for (i = 0; i < num_items; i++) items[i].~T();
266 alloc.deallocate(items, num_items);
267 }
268 return bytes;
269}
270
271template<typename T, typename W, typename H, typename E, typename A>
272class frequent_items_sketch<T, W, H, E, A>::items_deleter {
273public:
274 items_deleter(uint32_t num, bool destroy, const A& allocator):
275 allocator_(allocator), num_(num), destroy_(destroy) {}
276 void set_destroy(bool destroy) { destroy_ = destroy; }
277 void operator() (T* ptr) {
278 if (ptr != nullptr) {
279 if (destroy_) {
280 for (uint32_t i = 0; i < num_; ++i) ptr[i].~T();
281 }
282 allocator_.deallocate(ptr, num_);
283 }
284 }
285private:
286 A allocator_;
287 uint32_t num_;
288 bool destroy_;
289};
290
291template<typename T, typename W, typename H, typename E, typename A>
292template<typename SerDe>
293frequent_items_sketch<T, W, H, E, A> frequent_items_sketch<T, W, H, E, A>::deserialize(std::istream& is,
294 const SerDe& sd, const E& equal, const A& allocator) {
295 const auto preamble_longs = read<uint8_t>(is);
296 const auto serial_version = read<uint8_t>(is);
297 const auto family_id = read<uint8_t>(is);
298 const auto lg_max_size = read<uint8_t>(is);
299 const auto lg_cur_size = read<uint8_t>(is);
300 const auto flags_byte = read<uint8_t>(is);
301 read<uint16_t>(is); // unused
302
303 const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY_1)) | (flags_byte & (1 << flags::IS_EMPTY_2));
304
305 check_preamble_longs(preamble_longs, is_empty);
306 check_serial_version(serial_version);
307 check_family_id(family_id);
308 check_size(lg_cur_size, lg_max_size);
309
310 frequent_items_sketch sketch(lg_max_size, lg_cur_size, equal, allocator);
311 if (!is_empty) {
312 const auto num_items = read<uint32_t>(is);
313 read<uint32_t>(is); // unused
314 const auto total_weight = read<W>(is);
315 const auto offset = read<W>(is);
316
317 // batch deserialization with intermediate array of items and weights
318 using AllocW = typename std::allocator_traits<A>::template rebind_alloc<W>;
319 std::vector<W, AllocW> weights(num_items, 0, allocator);
320 read(is, weights.data(), sizeof(W) * num_items);
321 A alloc(allocator);
322 std::unique_ptr<T, items_deleter> items(alloc.allocate(num_items), items_deleter(num_items, false, alloc));
323 sd.deserialize(is, items.get(), num_items);
324 items.get_deleter().set_destroy(true); // serde did not throw, so the items must be constructed
325 for (uint32_t i = 0; i < num_items; i++) {
326 sketch.update(std::move(items.get()[i]), weights[i]);
327 }
328 sketch.total_weight = total_weight;
329 sketch.offset = offset;
330 }
331 if (!is.good())
332 throw std::runtime_error("error reading from std::istream");
333 return sketch;
334}
335
336template<typename T, typename W, typename H, typename E, typename A>
337template<typename SerDe>
338frequent_items_sketch<T, W, H, E, A> frequent_items_sketch<T, W, H, E, A>::deserialize(const void* bytes, size_t size,
339 const SerDe& sd, const E& equal, const A& allocator) {
340 ensure_minimum_memory(size, 8);
341 const char* ptr = static_cast<const char*>(bytes);
342 const char* base = static_cast<const char*>(bytes);
343 uint8_t preamble_longs;
344 ptr += copy_from_mem(ptr, preamble_longs);
345 uint8_t serial_version;
346 ptr += copy_from_mem(ptr, serial_version);
347 uint8_t family_id;
348 ptr += copy_from_mem(ptr, family_id);
349 uint8_t lg_max_size;
350 ptr += copy_from_mem(ptr, lg_max_size);
351 uint8_t lg_cur_size;
352 ptr += copy_from_mem(ptr, lg_cur_size);
353 uint8_t flags_byte;
354 ptr += copy_from_mem(ptr, flags_byte);
355 ptr += sizeof(uint16_t); // unused
356
357 const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY_1)) | (flags_byte & (1 << flags::IS_EMPTY_2));
358
359 check_preamble_longs(preamble_longs, is_empty);
360 check_serial_version(serial_version);
361 check_family_id(family_id);
362 check_size(lg_cur_size, lg_max_size);
363 ensure_minimum_memory(size, preamble_longs * sizeof(uint64_t));
364
365 frequent_items_sketch sketch(lg_max_size, lg_cur_size, equal, allocator);
366 if (!is_empty) {
367 uint32_t num_items;
368 ptr += copy_from_mem(ptr, num_items);
369 ptr += sizeof(uint32_t); // unused
370 W total_weight;
371 ptr += copy_from_mem(ptr, total_weight);
372 W offset;
373 ptr += copy_from_mem(ptr, offset);
374
375 ensure_minimum_memory(size, ptr - base + (sizeof(W) * num_items));
376 // batch deserialization with intermediate array of items and weights
377 using AllocW = typename std::allocator_traits<A>::template rebind_alloc<W>;
378 std::vector<W, AllocW> weights(num_items, 0, allocator);
379 ptr += copy_from_mem(ptr, weights.data(), sizeof(W) * num_items);
380 A alloc(allocator);
381 std::unique_ptr<T, items_deleter> items(alloc.allocate(num_items), items_deleter(num_items, false, alloc));
382 const size_t bytes_remaining = size - (ptr - base);
383 ptr += sd.deserialize(ptr, bytes_remaining, items.get(), num_items);
384 items.get_deleter().set_destroy(true); // serde did not throw, so the items must be constructed
385 for (uint32_t i = 0; i < num_items; i++) {
386 sketch.update(std::move(items.get()[i]), weights[i]);
387 }
388
389 sketch.total_weight = total_weight;
390 sketch.offset = offset;
391 }
392 return sketch;
393}
394
395template<typename T, typename W, typename H, typename E, typename A>
396void frequent_items_sketch<T, W, H, E, A>::check_preamble_longs(uint8_t preamble_longs, bool is_empty) {
397 if (is_empty) {
398 if (preamble_longs != PREAMBLE_LONGS_EMPTY) {
399 throw std::invalid_argument("Possible corruption: preamble longs of an empty sketch must be " + std::to_string(PREAMBLE_LONGS_EMPTY) + ": " + std::to_string(preamble_longs));
400 }
401 } else {
402 if (preamble_longs != PREAMBLE_LONGS_NONEMPTY) {
403 throw std::invalid_argument("Possible corruption: preamble longs of an non-empty sketch must be " + std::to_string(PREAMBLE_LONGS_NONEMPTY) + ": " + std::to_string(preamble_longs));
404 }
405 }
406}
407
408template<typename T, typename W, typename H, typename E, typename A>
409void frequent_items_sketch<T, W, H, E, A>::check_serial_version(uint8_t serial_version) {
410 if (serial_version != SERIAL_VERSION) {
411 throw std::invalid_argument("Possible corruption: serial version must be " + std::to_string(SERIAL_VERSION) + ": " + std::to_string(serial_version));
412 }
413}
414
415template<typename T, typename W, typename H, typename E, typename A>
416void frequent_items_sketch<T, W, H, E, A>::check_family_id(uint8_t family_id) {
417 if (family_id != FAMILY_ID) {
418 throw std::invalid_argument("Possible corruption: family ID must be " + std::to_string(FAMILY_ID) + ": " + std::to_string(family_id));
419 }
420}
421
422template<typename T, typename W, typename H, typename E, typename A>
423void frequent_items_sketch<T, W, H, E, A>::check_size(uint8_t lg_cur_size, uint8_t lg_max_size) {
424 if (lg_cur_size > lg_max_size) {
425 throw std::invalid_argument("Possible corruption: expected lg_cur_size <= lg_max_size: " + std::to_string(lg_cur_size) + " <= " + std::to_string(lg_max_size));
426 }
427 if (lg_cur_size < LG_MIN_MAP_SIZE) {
428 throw std::invalid_argument("Possible corruption: lg_cur_size must not be less than " + std::to_string(LG_MIN_MAP_SIZE) + ": " + std::to_string(lg_cur_size));
429 }
430}
431
432template<typename T, typename W, typename H, typename E, typename A>
433string<A> frequent_items_sketch<T, W, H, E, A>::to_string(bool print_items) const {
434 // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements.
435 // The stream does not support passing an allocator instance, and alternatives are complicated.
436 std::ostringstream os;
437 os << "### Frequent items sketch summary:" << std::endl;
438 os << " lg cur map size : " << (int) map.get_lg_cur_size() << std::endl;
439 os << " lg max map size : " << (int) map.get_lg_max_size() << std::endl;
440 os << " num active items : " << get_num_active_items() << std::endl;
441 os << " total weight : " << get_total_weight() << std::endl;
442 os << " max error : " << get_maximum_error() << std::endl;
443 os << "### End sketch summary" << std::endl;
444 if (print_items) {
445 vector_row items;
446 for (auto it: map) {
447 items.push_back(row(&it.first, it.second, offset));
448 }
449 // sort by estimate in descending order
450 std::sort(items.begin(), items.end(), [](row a, row b){ return a.get_estimate() > b.get_estimate(); });
451 os << "### Items in descending order by estimate" << std::endl;
452 os << " item, estimate, lower bound, upper bound" << std::endl;
453 for (auto it: items) {
454 os << " " << it.get_item() << ", " << it.get_estimate() << ", "
455 << it.get_lower_bound() << ", " << it.get_upper_bound() << std::endl;
456 }
457 os << "### End items" << std::endl;
458 }
459 return string<A>(os.str().c_str(), map.get_allocator());
460}
461
462// version for integral signed type
463template<typename T, typename W, typename H, typename E, typename A>
464template<typename WW, typename std::enable_if<std::is_integral<WW>::value && std::is_signed<WW>::value, int>::type>
466 if (weight < 0) {
467 throw std::invalid_argument("weight must be non-negative");
468 }
469}
470
471// version for integral unsigned type - no-op
472template<typename T, typename W, typename H, typename E, typename A>
473template<typename WW, typename std::enable_if<std::is_integral<WW>::value && std::is_unsigned<WW>::value, int>::type>
474void frequent_items_sketch<T, W, H, E, A>::check_weight(WW) {}
475
476// version for floating point type
477template<typename T, typename W, typename H, typename E, typename A>
478template<typename WW, typename std::enable_if<std::is_floating_point<WW>::value, int>::type>
479void frequent_items_sketch<T, W, H, E, A>::check_weight(WW weight) {
480 if (weight < 0) {
481 throw std::invalid_argument("weight must be non-negative");
482 }
483 if (std::isnan(weight)) {
484 throw std::invalid_argument("weight must be a valid number");
485 }
486 if (std::isinf(weight)) {
487 throw std::invalid_argument("weight must be finite");
488 }
489}
490
491}
492
493#endif
Row in the output from get_frequent_items.
Definition frequent_items_sketch.hpp:330
Frequent Items sketch.
Definition frequent_items_sketch.hpp:55
frequent_items_sketch(uint8_t lg_max_map_size, uint8_t lg_start_map_size=LG_MIN_MAP_SIZE, const E &equal=E(), const A &allocator=A())
Construct this sketch with parameters lg_max_map_size and lg_start_map_size.
Definition frequent_items_sketch_impl.hpp:37
W get_upper_bound(const T &item) const
Returns the guaranteed upper bound weight (frequency) of the given item.
Definition frequent_items_sketch_impl.hpp:118
void merge(const frequent_items_sketch &other)
This function merges the other sketch into this one.
Definition frequent_items_sketch_impl.hpp:68
void serialize(std::ostream &os, const SerDe &sd=SerDe()) const
This method serializes the sketch into a given stream in a binary form.
Definition frequent_items_sketch_impl.hpp:165
vector_row get_frequent_items(frequent_items_error_type err_type) const
Returns an array of rows that include frequent items, estimates, upper and lower bounds given an erro...
Definition frequent_items_sketch_impl.hpp:144
bool is_empty() const
Definition frequent_items_sketch_impl.hpp:90
string< A > to_string(bool print_items=false) const
Returns a human readable summary of this sketch.
Definition frequent_items_sketch_impl.hpp:433
void update(const T &item, W weight=1)
Update this sketch with an item and a positive weight (frequency count).
Definition frequent_items_sketch_impl.hpp:52
static double get_apriori_error(uint8_t lg_max_map_size, W estimated_total_weight)
Returns the estimated a priori error given the max_map_size for the sketch and the estimated_total_st...
Definition frequent_items_sketch_impl.hpp:138
W get_total_weight() const
Returns the sum of the weights (frequencies) in the stream seen so far by the sketch.
Definition frequent_items_sketch_impl.hpp:100
double get_epsilon() const
Returns epsilon value of this sketch.
Definition frequent_items_sketch_impl.hpp:128
size_t get_serialized_size_bytes(const SerDe &sd=SerDe()) const
Computes size needed to serialize the current state of the sketch.
Definition frequent_items_sketch_impl.hpp:212
W get_maximum_error() const
Definition frequent_items_sketch_impl.hpp:123
W get_estimate(const T &item) const
Returns the estimate of the weight (frequency) of the given item.
Definition frequent_items_sketch_impl.hpp:105
static frequent_items_sketch deserialize(std::istream &is, const SerDe &sd=SerDe(), const E &equal=E(), const A &allocator=A())
This method deserializes a sketch from a given stream.
W get_lower_bound(const T &item) const
Returns the guaranteed lower bound weight (frequency) of the given item.
Definition frequent_items_sketch_impl.hpp:113
uint32_t get_num_active_items() const
Definition frequent_items_sketch_impl.hpp:95
DataSketches namespace.
Definition binomial_bounds.hpp:38
frequent_items_error_type
Frequent items error type.
Definition frequent_items_sketch.hpp:36
@ NO_FALSE_NEGATIVES
include an item in the result list if get_upper_bound(item) > threshold
Definition frequent_items_sketch.hpp:38
@ NO_FALSE_POSITIVES
include an item in the result list if get_lower_bound(item) > threshold
Definition frequent_items_sketch.hpp:37