datasketches-cpp
density_sketch_impl.hpp
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef DENSITY_SKETCH_IMPL_HPP_
21 #define DENSITY_SKETCH_IMPL_HPP_
22 
23 #include <algorithm>
24 #include <sstream>
25 
26 #include "memory_operations.hpp"
27 #include "conditional_forward.hpp"
28 
29 namespace datasketches {
30 
31 template<typename T, typename K, typename A>
32 density_sketch<T, K, A>::density_sketch(uint16_t k, uint32_t dim, const K& kernel, const A& allocator):
33 kernel_(kernel),
34 k_(k),
35 dim_(dim),
36 num_retained_(0),
37 n_(0),
38 levels_(1, Level(allocator), allocator)
39 {
40  check_k(k);
41 }
42 
43 template<typename T, typename K, typename A>
44 density_sketch<T, K, A>::density_sketch(uint16_t k, uint32_t dim, uint32_t num_retained, uint64_t n,
45  Levels&& levels, const K& kernel):
46 kernel_(kernel),
47 k_(k),
48 dim_(dim),
49 num_retained_(num_retained),
50 n_(n),
51 levels_(std::move(levels))
52 {
53  check_k(k);
54 }
55 
56 template<typename T, typename K, typename A>
58  return k_;
59 }
60 
61 template<typename T, typename K, typename A>
63  return dim_;
64 }
65 
66 template<typename T, typename K, typename A>
68  return num_retained_ == 0;
69 }
70 
71 template<typename T, typename K, typename A>
73  return n_;
74 }
75 
76 template<typename T, typename K, typename A>
78  return num_retained_;
79 }
80 
81 template<typename T, typename K, typename A>
83  return levels_.size() > 1;
84 }
85 
86 template<typename T, typename K, typename A>
87 template<typename FwdVector>
88 void density_sketch<T, K, A>::update(FwdVector&& point) {
89  if (point.size() != dim_) throw std::invalid_argument("dimension mismatch");
90  while (num_retained_ >= k_ * levels_.size()) compact();
91  levels_[0].push_back(std::forward<FwdVector>(point));
92  ++num_retained_;
93  ++n_;
94 }
95 
96 template<typename T, typename K, typename A>
97 template<typename FwdSketch>
98 void density_sketch<T, K, A>::merge(FwdSketch&& other) {
99  if (other.is_empty()) return;
100  if (other.dim_ != dim_) throw std::invalid_argument("dimension mismatch");
101  while (levels_.size() < other.levels_.size()) levels_.push_back(Level(levels_.get_allocator()));
102  for (unsigned height = 0; height < other.levels_.size(); ++height) {
103  std::copy(
104  forward_begin(conditional_forward<FwdSketch>(other.levels_[height])),
105  forward_end(conditional_forward<FwdSketch>(other.levels_[height])),
106  back_inserter(levels_[height])
107  );
108  }
109  num_retained_ += other.num_retained_;
110  n_ += other.n_;
111  while (num_retained_ >= k_ * levels_.size()) compact();
112 }
113 
114 template<typename T, typename K, typename A>
115 T density_sketch<T, K, A>::get_estimate(const std::vector<T>& point) const {
116  if (is_empty()) throw std::runtime_error("operation is undefined for an empty sketch");
117  T density = 0;
118  for (unsigned height = 0; height < levels_.size(); ++height) {
119  for (const auto& p: levels_[height]) {
120  density += (1 << height) * kernel_(p, point) / n_;
121  }
122  }
123  return density;
124 }
125 
126 template<typename T, typename K, typename A>
128  return levels_.get_allocator();
129 }
130 
131 template<typename T, typename K, typename A>
133  for (unsigned height = 0; height < levels_.size(); ++height) {
134  if (levels_[height].size() >= k_) {
135  if (height + 1 >= levels_.size()) levels_.push_back(Level(levels_.get_allocator()));
136  compact_level(height);
137  break;
138  }
139  }
140 }
141 
142 template<typename T, typename K, typename A>
143 void density_sketch<T, K, A>::compact_level(unsigned height) {
144  auto& level = levels_[height];
145  std::vector<bool> bits(level.size());
146  bits[0] = random_utils::random_bit();
147  std::shuffle(level.begin(), level.end(), random_utils::rand);
148  for (unsigned i = 1; i < level.size(); ++i) {
149  T delta = 0;
150  for (unsigned j = 0; j < i; ++j) {
151  delta += (bits[j] ? 1 : -1) * kernel_(level[i], level[j]);
152  }
153  bits[i] = delta < 0;
154  }
155  for (unsigned i = 0; i < level.size(); ++i) {
156  if (bits[i]) {
157  levels_[height + 1].push_back(std::move(level[i]));
158  } else {
159  --num_retained_;
160  }
161  }
162  level.clear();
163 }
164 
165 /* Serialized sketch layout:
166  * Int || Start Byte Addr:
167  * Addr:
168  * || 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
169  * 0 || Preamble_Ints | SerVer | FamID | Flags |------- K -------|---- unused -----|
170  *
171  * || 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 |
172  * 2 ||------------- Num Dimensions --------------|------ Num Retained Items ---------|
173  *
174  * || 16 | 17 | 18 | 19 | 20 | 21 | 22 | 23 |
175  * 4 ||---------------------------Items Seen Count (N)--------------------------------|
176  *
177  * Ints 2 and 3 are omitted when the sketch is empty, meaning Num Dimensions is stored at
178  * offset 8 in that case. Otherwise, Int 5 is the start of level data, consisting of the
179  * size of the level (as a uint32 value) followed by that number of points, with
180  * Num Dimensions per point.
181  */
182 
183 template<typename T, typename K, typename A>
184 void density_sketch<T, K, A>::serialize(std::ostream& os) const {
185  const uint8_t preamble_ints = is_empty() ? PREAMBLE_INTS_SHORT : PREAMBLE_INTS_LONG;
186  write(os, preamble_ints);
187  const uint8_t ser_ver = SERIAL_VERSION;
188  write(os, ser_ver);
189  const uint8_t family = FAMILY_ID;
190  write(os, family);
191 
192  // only empty is a valid flag
193  const uint8_t flags_byte = (is_empty() ? 1 << flags::IS_EMPTY : 0);
194  write(os, flags_byte);
195  write(os, k_);
196  const uint16_t unused = 0;
197  write(os, unused);
198  write(os, dim_);
199 
200  if (is_empty())
201  return;
202 
203  write(os, num_retained_);
204  write(os, n_);
205 
206  // levels array -- uint32_t since a single level may be larger than k
207  size_t pt_size = sizeof(T) * dim_;
208  for (const Level& lvl : levels_) {
209  const uint32_t level_size = static_cast<uint32_t>(lvl.size());
210  write(os, level_size);
211  for (const Vector& pt : lvl) {
212  write(os, pt.data(), pt_size);
213  }
214  }
215 }
216 
217 template<typename T, typename K, typename A>
218 auto density_sketch<T, K, A>::serialize(unsigned header_size_bytes) const -> vector_bytes {
219  const uint8_t preamble_ints = (is_empty() ? PREAMBLE_INTS_SHORT : PREAMBLE_INTS_LONG);
220 
221  // pre-compute size
222  size_t size = header_size_bytes + preamble_ints * sizeof(uint32_t);
223  if (!is_empty())
224  for (const Level& lvl : levels_)
225  size += sizeof(uint32_t) + (lvl.size() * dim_ * sizeof(T));
226 
227  vector_bytes bytes(size, 0, levels_.get_allocator());
228  uint8_t* ptr = bytes.data() + header_size_bytes;
229  const uint8_t* end_ptr = ptr + size;
230 
231  ptr += copy_to_mem(preamble_ints, ptr);
232  const uint8_t ser_ver = SERIAL_VERSION;
233  ptr += copy_to_mem(ser_ver, ptr);
234  const uint8_t family = FAMILY_ID;
235  ptr += copy_to_mem(family, ptr);
236 
237  // empty is the only valid flat
238  const uint8_t flags_byte = (is_empty() ? 1 << flags::IS_EMPTY : 0);
239  ptr += copy_to_mem(flags_byte, ptr);
240  ptr += copy_to_mem(k_, ptr);
241  ptr += sizeof(uint16_t); // 2 unused bytes
242  ptr += copy_to_mem(dim_, ptr);
243 
244  if (is_empty())
245  return bytes;
246 
247  ptr += copy_to_mem(num_retained_, ptr);
248  ptr += copy_to_mem(n_, ptr);
249 
250  // levels array -- uint32_t since a single level may be larger than k
251  size_t pt_size = sizeof(T) * dim_;
252  for (const Level& lvl : levels_) {
253  ptr += copy_to_mem(static_cast<uint32_t>(lvl.size()), ptr);
254  for (const Vector& pt : lvl) {
255  ptr += copy_to_mem(pt.data(), ptr, pt_size);
256  }
257  }
258 
259  if (ptr != end_ptr)
260  throw std::runtime_error("Actual output size does not equal expected output size");
261 
262  return bytes;
263 }
264 
265 template<typename T, typename K, typename A>
266 density_sketch<T, K, A> density_sketch<T, K, A>::deserialize(std::istream& is, const K& kernel, const A& allocator) {
267  const auto preamble_ints = read<uint8_t>(is);
268  const auto serial_version = read<uint8_t>(is);
269  const auto family_id = read<uint8_t>(is);
270  const auto flags_byte = read<uint8_t>(is);
271  const auto k = read<uint16_t>(is);
272  read<uint16_t>(is); // unused
273  const auto dim = read<uint32_t>(is);
274 
275  check_k(k); // do we have constraints?
276  check_serial_version(serial_version); // a little redundant with the header check
277  check_family_id(family_id);
278  check_header_validity(preamble_ints, flags_byte, serial_version);
279 
280  if (!is.good()) throw std::runtime_error("error reading from std::istream");
281  const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
282  if (is_empty) {
283  return density_sketch(k, dim, kernel, allocator);
284  }
285 
286  const auto num_retained = read<uint32_t>(is);
287  const auto n = read<uint64_t>(is);
288 
289  // levels arrays
290  size_t pt_size = sizeof(T) * dim;
291  Levels levels(allocator);
292  int64_t num_to_read = num_retained; // num_retrained is uint32_t so this allows error checking
293  while (num_to_read > 0) {
294  const auto level_size = read<uint32_t>(is);
295  Level lvl(allocator);
296  lvl.reserve(level_size);
297  for (uint32_t i = 0; i < level_size; ++i) {
298  Vector pt(dim, 0, allocator);
299  read(is, pt.data(), pt_size);
300  lvl.push_back(pt);
301  }
302  levels.push_back(lvl);
303  num_to_read -= lvl.size();
304  }
305 
306  if (num_to_read != 0)
307  throw std::runtime_error("Error deserializing sketch: Incorrect number of items read");
308  if (!is.good()) throw std::runtime_error("error reading from std::istream");
309 
310  return density_sketch(k, dim, num_retained, n, std::move(levels), kernel);
311 }
312 
313 template<typename T, typename K, typename A>
314 density_sketch<T, K, A> density_sketch<T, K, A>::deserialize(const void* bytes, size_t size, const K& kernel, const A& allocator) {
315  ensure_minimum_memory(size, PREAMBLE_INTS_SHORT * sizeof(uint32_t));
316  const char* ptr = static_cast<const char*>(bytes);
317  const char* end_ptr = static_cast<const char*>(bytes) + size;
318  uint8_t preamble_ints;
319  ptr += copy_from_mem(ptr, preamble_ints);
320  uint8_t serial_version;
321  ptr += copy_from_mem(ptr, serial_version);
322  uint8_t family_id;
323  ptr += copy_from_mem(ptr, family_id);
324  uint8_t flags_byte;
325  ptr += copy_from_mem(ptr, flags_byte);
326  uint16_t k;
327  ptr += copy_from_mem(ptr, k);
328  uint16_t unused;
329  ptr += copy_from_mem(ptr, unused);
330  uint32_t dim;
331  ptr += copy_from_mem(ptr, dim);
332 
333  check_k(k);
334  check_serial_version(serial_version); // a little redundant with the header check
335  check_family_id(family_id);
336  check_header_validity(preamble_ints, flags_byte, serial_version);
337 
338  const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
339  if (is_empty) {
340  return density_sketch(k, dim, kernel, allocator);
341  }
342 
343  ensure_minimum_memory(size, PREAMBLE_INTS_LONG * sizeof(uint32_t));
344  uint32_t num_retained;
345  ptr += copy_from_mem(ptr, num_retained);
346  uint64_t n;
347  ptr += copy_from_mem(ptr, n);
348 
349  // Predicting the number of levels seems hard so determining the exact remaining
350  // size is also hard. But we need at least num_retained * dim * sizeof(T)
351  // bytes for the points so we can check that.
352  size_t pt_size = sizeof(T) * dim;
353  ensure_minimum_memory(end_ptr - ptr, num_retained * pt_size);
354 
355  // levels arrays
356  Levels levels(allocator);
357  int64_t num_to_read = num_retained; // num_retained is uint32_t so this allows error checking
358  while (num_to_read > 0) {
359  uint32_t level_size;
360  ptr += copy_from_mem(ptr, level_size);
361  ensure_minimum_memory(end_ptr - ptr, level_size * pt_size);
362  Level lvl(allocator);
363  lvl.reserve(level_size);
364  for (uint32_t i = 0; i < level_size; ++i) {
365  Vector pt(dim, 0, allocator);
366  ptr += copy_from_mem(ptr, pt.data(), pt_size);
367  lvl.push_back(pt);
368  }
369  levels.push_back(lvl);
370  num_to_read -= lvl.size();
371  }
372 
373  if (num_to_read != 0)
374  throw std::runtime_error("Error deserializing sketch: Incorrect number of items read");
375  if (ptr > end_ptr) throw std::runtime_error("Error deserializing sketch: Read beyond provided memory");
376 
377  return density_sketch(k, dim, num_retained, n, std::move(levels), kernel);
378 }
379 
380 template<typename T, typename K, typename A>
381 void density_sketch<T, K, A>::check_k(uint16_t k) {
382  if (k < 2)
383  throw std::invalid_argument("k must be > 1. Found: " + std::to_string(k));
384 }
385 
386 template<typename T, typename K, typename A>
387 void density_sketch<T, K, A>::check_serial_version(uint8_t serial_version) {
388  if (serial_version == SERIAL_VERSION)
389  return;
390  else
391  throw std::invalid_argument("Possible corruption. Unrecognized serialization version: " + std::to_string(serial_version));
392 }
393 
394 template<typename T, typename K, typename A>
395 void density_sketch<T, K, A>::check_family_id(uint8_t family_id) {
396  if (family_id == FAMILY_ID)
397  return;
398  else
399  throw std::invalid_argument("Possible corruption. Family id does not indicate density sketch: " + std::to_string(family_id));
400 }
401 
402 template<typename T, typename K, typename A>
403 void density_sketch<T, K, A>::check_header_validity(uint8_t preamble_ints, uint8_t flags_byte, uint8_t serial_version) {
404  const bool empty = (flags_byte & (1 << flags::IS_EMPTY)) > 0;
405 
406  if ((empty && preamble_ints == PREAMBLE_INTS_SHORT)
407  || (!empty && preamble_ints == PREAMBLE_INTS_LONG))
408  return;
409  else {
410  std::ostringstream os;
411  os << "Possible sketch corruption. Inconsistent state: "
412  << "preamble_ints = " << preamble_ints
413  << ", empty = " << (empty ? "true" : "false")
414  << ", serialization_version = " << serial_version;
415  throw std::invalid_argument(os.str());
416  }
417 }
418 
419 template<typename T, typename K, typename A>
420 string<A> density_sketch<T, K, A>::to_string(bool print_levels, bool print_items) const {
421  // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements.
422  // The stream does not support passing an allocator instance, and alternatives are complicated.
423  std::ostringstream os;
424  os << "### Density sketch summary:" << std::endl;
425  os << " K : " << k_ << std::endl;
426  os << " Dim : " << dim_ << std::endl;
427  os << " Empty : " << (is_empty() ? "true" : "false") << std::endl;
428  os << " N : " << n_ << std::endl;
429  os << " Retained items : " << num_retained_ << std::endl;
430  os << " Estimation mode: " << (is_estimation_mode() ? "true" : "false") << std::endl;
431  os << " Levels : " << levels_.size() << std::endl;
432  os << "### End sketch summary" << std::endl;
433 
434  if (print_levels) {
435  os << "### Density sketch levels:" << std::endl;
436  os << " height: size" << std::endl;
437  for (unsigned height = 0; height < levels_.size(); ++height) {
438  os << " " << height << ": "
439  << levels_[height].size() << std::endl;
440  }
441  os << "### End sketch levels" << std::endl;
442  }
443 
444  if (print_items) {
445  os << "### Density sketch data:" << std::endl;
446  for (unsigned height = 0; height < levels_.size(); ++height) {
447  os << " level " << height << ": " << std::endl;
448  for (const auto& point: levels_[height]) {
449  os << " [";
450  bool first = true;
451  for (auto value: point) {
452  if (first) {
453  first = false;
454  } else {
455  os << ", ";
456  }
457  os << value;
458  }
459  os << "]" << std::endl;
460  }
461  }
462  os << "### End sketch data" << std::endl;
463  }
464  return string<A>(os.str().c_str(), levels_.get_allocator());
465 }
466 
467 template<typename T, typename K, typename A>
468 auto density_sketch<T, K, A>::begin() const -> const_iterator {
469  return const_iterator(levels_.begin(), levels_.end());
470 }
471 
472 template<typename T, typename K, typename A>
473 auto density_sketch<T, K, A>::end() const -> const_iterator {
474  return const_iterator(levels_.end(), levels_.end());
475 }
476 
477 // iterator
478 
479 template<typename T, typename K, typename A>
480 density_sketch<T, K, A>::const_iterator::const_iterator(LevelsIterator begin, LevelsIterator end):
481 levels_it_(begin),
482 levels_end_(end),
483 level_it_(),
484 height_(0)
485 {
486  // skip empty levels
487  while (levels_it_ != levels_end_) {
488  level_it_ = levels_it_->begin();
489  if (level_it_ != levels_it_->end()) break;
490  ++levels_it_;
491  ++height_;
492  }
493 }
494 
495 template<typename T, typename K, typename A>
496 auto density_sketch<T, K, A>::const_iterator::operator++() -> const_iterator& {
497  ++level_it_;
498  if (level_it_ == levels_it_->end()) {
499  ++levels_it_;
500  ++height_;
501  // skip empty levels
502  while (levels_it_ != levels_end_) {
503  level_it_ = levels_it_->begin();
504  if (level_it_ != levels_it_->end()) break;
505  ++levels_it_;
506  ++height_;
507  }
508  }
509  return *this;
510 }
511 
512 template<typename T, typename K, typename A>
513 auto density_sketch<T, K, A>::const_iterator::operator++(int) -> const_iterator& {
514  const_iterator tmp(*this);
515  operator++();
516  return tmp;
517 }
518 
519 template<typename T, typename K, typename A>
520 bool density_sketch<T, K, A>::const_iterator::operator==(const const_iterator& other) const {
521  if (levels_it_ != other.levels_it_) return false;
522  if (levels_it_ == levels_end_) return true;
523  return level_it_ == other.level_it_;
524 }
525 
526 template<typename T, typename K, typename A>
527 bool density_sketch<T, K, A>::const_iterator::operator!=(const const_iterator& other) const {
528  return !operator==(other);
529 }
530 
531 template<typename T, typename K, typename A>
532 auto density_sketch<T, K, A>::const_iterator::operator*() const -> const value_type {
533  return value_type(*level_it_, 1ULL << height_);
534 }
535 
536 template<typename T, typename K, typename A>
537 auto density_sketch<T, K, A>::const_iterator::operator->() const -> const return_value_holder<value_type> {
538  return **this;
539 }
540 
541 } /* namespace datasketches */
542 
543 #endif
Density sketch.
Definition: density_sketch.hpp:57
density_sketch(uint16_t k, uint32_t dim, const Kernel &kernel=Kernel(), const Allocator &allocator=Allocator())
Constructor.
Allocator get_allocator() const
Returns an instance of the allocator for this sketch.
Definition: density_sketch_impl.hpp:127
static density_sketch deserialize(std::istream &is, const K &kernel=K(), const A &allocator=A())
This method deserializes a sketch from a given stream.
DataSketches namespace.
Definition: binomial_bounds.hpp:38