Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
name: Build
on:
- push
- pull_request


jobs:
debian_x86:
runs-on: ubuntu-22.04
timeout-minutes: 30
steps:
- name: Checkout
uses: actions/checkout@v4
- uses: Swatinem/rust-cache@v2
- name: Run cargo build
run: cargo build
- name: Run cargo test
run: cargo test
macos_x86:
runs-on: macos-12
timeout-minutes: 30
steps:
- name: Checkout
uses: actions/checkout@v4
- uses: Swatinem/rust-cache@v2
- name: Install coreutils
run: brew install coreutils
- name: Run cargo build
run: cargo build
- name: Run cargo test
run: cargo test
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ Cargo.lock
# Emacs backups
*~

# Jetbrains products
.idea

# Proptest
proptest-regressions

Expand Down
3 changes: 2 additions & 1 deletion build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ fn main() {
bridge
.files(&[
datasketches.join("cpc.cpp"),
datasketches.join("hll.cpp"),
datasketches.join("theta.cpp"),
datasketches.join("hh.cpp"),
])
.include(datasketches.join("common").join("include"))
.flag_if_supported("-std=c++11")
.cpp_link_stdlib("stdc++")
.cpp_link_stdlib(None)
.static_flag(true)
.compile("libdatasketches.a");
}
93 changes: 93 additions & 0 deletions datasketches-cpp/hll.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#include <cstdint>
#include <ios>
#include <sstream>
#include <iostream>

#include "rust/cxx.h"
#include "hll/include/hll.hpp"

#include "hll.hpp"

OpaqueHLLSketch::OpaqueHLLSketch(unsigned lg_k, datasketches::target_hll_type tgt_type):
inner_{ datasketches::hll_sketch(lg_k, tgt_type) } {
}

OpaqueHLLSketch::OpaqueHLLSketch(datasketches::hll_sketch&& hll):
inner_{std::move(hll)} {
}

OpaqueHLLSketch::OpaqueHLLSketch(std::istream& is):
inner_{datasketches::hll_sketch::deserialize(is)} {
}

double OpaqueHLLSketch::estimate() const {
return this->inner_.get_estimate();
}

void OpaqueHLLSketch::update(rust::Slice<const uint8_t> buf) {
this->inner_.update(buf.data(), buf.size());
}

void OpaqueHLLSketch::update_u64(uint64_t value) {
this->inner_.update(value);
}

datasketches::target_hll_type OpaqueHLLSketch::get_target_type() const {
return this->inner_.get_target_type();
}

uint8_t OpaqueHLLSketch::get_lg_config_k() const {
return this->inner_.get_lg_config_k();
}

std::unique_ptr<std::vector<uint8_t>> OpaqueHLLSketch::serialize() const {
// TODO: could use a custom streambuf to avoid the
// stream -> vec copy https://stackoverflow.com/a/13059195/1779853
std::stringstream s{};
auto start = s.tellg();
this->inner_.serialize_compact(s);
s.seekg(0, std::ios::end);
auto stop = s.tellg();

std::vector<uint8_t> v(std::size_t(stop-start));
s.seekg(0, std::ios::beg);
s.read(reinterpret_cast<char*>(v.data()), std::streamsize(v.size()));

return std::unique_ptr<std::vector<uint8_t>>(new std::vector<uint8_t>(std::move(v)));
}

std::unique_ptr<OpaqueHLLSketch> new_opaque_hll_sketch(unsigned lg_k, datasketches::target_hll_type tgt_type) {
return std::unique_ptr<OpaqueHLLSketch>(new OpaqueHLLSketch { lg_k, tgt_type });
}

std::unique_ptr<OpaqueHLLSketch> deserialize_opaque_hll_sketch(rust::Slice<const uint8_t> buf) {
// TODO: could use a custom streambuf to avoid the slice -> stream copy
std::stringstream s{};
s.write(const_cast<char*>(reinterpret_cast<const char*>(buf.data())), std::streamsize(buf.size()));
s.seekg(0, std::ios::beg);
return std::unique_ptr<OpaqueHLLSketch>(new OpaqueHLLSketch{s});
}

OpaqueHLLUnion::OpaqueHLLUnion(uint8_t lg_max_k):
inner_{ datasketches::hll_union(lg_max_k) } {
}

std::unique_ptr<OpaqueHLLSketch> OpaqueHLLUnion::sketch(datasketches::target_hll_type tgt_type) const {
return std::unique_ptr<OpaqueHLLSketch>(new OpaqueHLLSketch{this->inner_.get_result(tgt_type)});
}

void OpaqueHLLUnion::merge(std::unique_ptr<OpaqueHLLSketch> to_add) {
this->inner_.update(std::move(to_add->inner_));
}

datasketches::target_hll_type OpaqueHLLUnion::get_target_type() const {
return this->inner_.get_target_type();
}

uint8_t OpaqueHLLUnion::get_lg_config_k() const {
return this->inner_.get_lg_config_k();
}

std::unique_ptr<OpaqueHLLUnion> new_opaque_hll_union(uint8_t lg_max_k) {
return std::unique_ptr<OpaqueHLLUnion>(new OpaqueHLLUnion{ lg_max_k });
}
45 changes: 45 additions & 0 deletions datasketches-cpp/hll.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#pragma once

#include <cstdint>
#include <iostream>
#include <vector>
#include <memory>

#include "rust/cxx.h"
#include "hll/include/hll.hpp"

// alias
typedef datasketches::target_hll_type target_hll_type;

class OpaqueHLLSketch {
public:
double estimate() const;
void update(rust::Slice<const uint8_t> buf);
void update_u64(uint64_t value);
std::unique_ptr<std::vector<uint8_t>> serialize() const;
friend std::unique_ptr<OpaqueHLLSketch> deserialize_opaque_hll_sketch(rust::Slice<const uint8_t> buf);
OpaqueHLLSketch(unsigned lg_k, datasketches::target_hll_type tgt_type);
datasketches::target_hll_type get_target_type() const;
uint8_t get_lg_config_k() const;
private:
OpaqueHLLSketch(datasketches::hll_sketch&& hll);
OpaqueHLLSketch(std::istream& is);
friend class OpaqueHLLUnion;
datasketches::hll_sketch inner_;
};

std::unique_ptr<OpaqueHLLSketch> new_opaque_hll_sketch(unsigned lg_k, datasketches::target_hll_type tgt_type);
std::unique_ptr<OpaqueHLLSketch> deserialize_opaque_hll_sketch(rust::Slice<const uint8_t> buf);

class OpaqueHLLUnion {
public:
std::unique_ptr<OpaqueHLLSketch> sketch(datasketches::target_hll_type tgt_type) const;
void merge(std::unique_ptr<OpaqueHLLSketch> to_add);
OpaqueHLLUnion(uint8_t lg_max_k);
datasketches::target_hll_type get_target_type() const;
uint8_t get_lg_config_k() const;
private:
datasketches::hll_union inner_;
};

std::unique_ptr<OpaqueHLLUnion> new_opaque_hll_union(uint8_t lg_max_k);
4 changes: 4 additions & 0 deletions rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[toolchain]
channel = "1.77.2"
components = ["rustfmt", "rustc-dev", "clippy"]
profile = "minimal"
54 changes: 48 additions & 6 deletions src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,23 @@ pub(crate) mod ffi {
unsafe fn remove_from_hashset(hashset_addr: usize, addr: usize);
}

#[derive(Debug, Eq)]
#[repr(i32)]
enum target_hll_type {
HLL_4,
HLL_6,
HLL_8,
}

unsafe extern "C++" {
include!("dsrs/datasketches-cpp/cpc.hpp");

pub(crate) type OpaqueCpcSketch;

pub(crate) fn new_opaque_cpc_sketch() -> UniquePtr<OpaqueCpcSketch>;
pub(crate) fn deserialize_opaque_cpc_sketch(buf: &[u8]) -> UniquePtr<OpaqueCpcSketch>;
pub(crate) fn deserialize_opaque_cpc_sketch(
buf: &[u8],
) -> Result<UniquePtr<OpaqueCpcSketch>>;
pub(crate) fn estimate(self: &OpaqueCpcSketch) -> f64;
pub(crate) fn update(self: Pin<&mut OpaqueCpcSketch>, buf: &[u8]);
pub(crate) fn update_u64(self: Pin<&mut OpaqueCpcSketch>, value: u64);
Expand All @@ -36,6 +46,37 @@ pub(crate) mod ffi {
pub(crate) fn sketch(self: &OpaqueCpcUnion) -> UniquePtr<OpaqueCpcSketch>;
pub(crate) fn merge(self: Pin<&mut OpaqueCpcUnion>, to_add: UniquePtr<OpaqueCpcSketch>);

include!("dsrs/datasketches-cpp/hll.hpp");

type target_hll_type;

pub(crate) type OpaqueHLLSketch;
pub(crate) fn estimate(self: &OpaqueHLLSketch) -> f64;
pub(crate) fn update(self: Pin<&mut OpaqueHLLSketch>, buf: &[u8]);
pub(crate) fn update_u64(self: Pin<&mut OpaqueHLLSketch>, value: u64);
pub(crate) fn serialize(self: &OpaqueHLLSketch) -> UniquePtr<CxxVector<u8>>;
pub(crate) fn get_target_type(self: &OpaqueHLLSketch) -> target_hll_type;
pub(crate) fn get_lg_config_k(self: &OpaqueHLLSketch) -> u8;

pub(crate) fn new_opaque_hll_sketch(
lg_k: u32,
tgt_type: target_hll_type,
) -> UniquePtr<OpaqueHLLSketch>;
pub(crate) fn deserialize_opaque_hll_sketch(
buf: &[u8],
) -> Result<UniquePtr<OpaqueHLLSketch>>;

pub(crate) type OpaqueHLLUnion;

pub(crate) fn new_opaque_hll_union(lg_max_k: u8) -> UniquePtr<OpaqueHLLUnion>;
pub(crate) fn sketch(
self: &OpaqueHLLUnion,
tgt_type: target_hll_type,
) -> UniquePtr<OpaqueHLLSketch>;
pub(crate) fn merge(self: Pin<&mut OpaqueHLLUnion>, to_add: UniquePtr<OpaqueHLLSketch>);
pub(crate) fn get_target_type(self: &OpaqueHLLUnion) -> target_hll_type;
pub(crate) fn get_lg_config_k(self: &OpaqueHLLUnion) -> u8;

include!("dsrs/datasketches-cpp/theta.hpp");

pub(crate) type OpaqueThetaSketch;
Expand All @@ -57,7 +98,7 @@ pub(crate) mod ffi {
pub(crate) fn serialize(self: &OpaqueStaticThetaSketch) -> UniquePtr<CxxVector<u8>>;
pub(crate) fn deserialize_opaque_static_theta_sketch(
buf: &[u8],
) -> UniquePtr<OpaqueStaticThetaSketch>;
) -> Result<UniquePtr<OpaqueStaticThetaSketch>>;

pub(crate) type OpaqueThetaUnion;

Expand All @@ -81,16 +122,17 @@ pub(crate) mod ffi {

pub(crate) type OpaqueHhSketch;

pub(crate) fn new_opaque_hh_sketch(lg2_k: u8, hashset_addr: usize) -> UniquePtr<OpaqueHhSketch>;
pub(crate) fn new_opaque_hh_sketch(
lg2_k: u8,
hashset_addr: usize,
) -> UniquePtr<OpaqueHhSketch>;
pub(crate) fn estimate_no_fp(
self: &OpaqueHhSketch,
) -> UniquePtr<CxxVector<ThinHeavyHitterRow>>;
pub(crate) fn estimate_no_fn(
self: &OpaqueHhSketch,
) -> UniquePtr<CxxVector<ThinHeavyHitterRow>>;
pub(crate) fn state(
self: &OpaqueHhSketch,
) -> UniquePtr<CxxVector<ThinHeavyHitterRow>>;
pub(crate) fn state(self: &OpaqueHhSketch) -> UniquePtr<CxxVector<ThinHeavyHitterRow>>;
pub(crate) fn update(self: Pin<&mut OpaqueHhSketch>, value: usize, weight: u64);
pub(crate) fn set_weights(self: Pin<&mut OpaqueHhSketch>, total_weight: u64, weight: u64);
pub(crate) fn get_total_weight(self: &OpaqueHhSketch) -> u64;
Expand Down
22 changes: 11 additions & 11 deletions src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use base64;
use memchr;

use crate::stream_reducer::LineReducer;
use crate::{CpcSketch, CpcUnion, HhSketch};
use crate::{CpcSketch, CpcUnion, DataSketchesError, HhSketch};

pub struct Counter {
sketch: CpcSketch,
Expand All @@ -32,9 +32,9 @@ impl Counter {
}

/// Deserializes from base64 string with no newlines or `=` padding.
pub fn deserialize(s: &str) -> Result<Self, base64::DecodeError> {
pub fn deserialize(s: &str) -> Result<Self, DataSketchesError> {
let bytes = base64::decode_config(s, base64::STANDARD_NO_PAD)?;
let sketch = CpcSketch::deserialize(bytes.as_ref());
let sketch = CpcSketch::deserialize(bytes.as_ref())?;
Ok(Self { sketch })
}

Expand Down Expand Up @@ -150,29 +150,30 @@ impl KeyedMerger {

pub struct HeavyHitter {
sketch: HhSketch,
k: u64
k: u64,
}

// https://users.rust-lang.org/t/logarithm-of-integers/8506/5

fn log2_floor(x: u64) -> usize {
const fn num_bits<T>() -> usize { std::mem::size_of::<T>() * 8 }
const fn num_bits<T>() -> usize {
std::mem::size_of::<T>() * 8
}
assert!(x > 0);
num_bits::<u64>() - x.leading_zeros() as usize - 1
}

impl HeavyHitter {

/// Creates a new heavy hitter sketch targeting elements in the top-k
/// by reserving O(k) space.
pub fn new( k: u64) -> Self {
pub fn new(k: u64) -> Self {
let lg2_k_with_room = log2_floor(k as u64).max(1) + 2;
Self {
sketch: HhSketch::new(lg2_k_with_room.try_into().unwrap()),
k
k,
}
}

/// Serializes to base64 string with no newlines or `=` padding.
pub fn serialize(&self) -> String {
unimplemented!()
Expand All @@ -187,8 +188,7 @@ impl HeavyHitter {
pub fn estimate(&self) -> impl Iterator<Item = (&[u8], u64)> {
let mut v = self.sketch.estimate_no_fn();
v.sort_by_key(|row| row.ub);
v
.into_iter()
v.into_iter()
.rev()
.take(self.k as usize)
.map(|row| (row.key, row.ub))
Expand Down
32 changes: 32 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use std::fmt::{Display, Formatter};

#[derive(Debug)]
pub enum DataSketchesError {
CXXError(String),
DecodeError(String),
}

impl Display for DataSketchesError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
DataSketchesError::CXXError(err) => f.write_fmt(format_args!("Error: {}", err)),
DataSketchesError::DecodeError(err) => {
f.write_fmt(format_args!("DecodeError: {}", err))
}
}
}
}

impl std::error::Error for DataSketchesError {}

impl From<base64::DecodeError> for DataSketchesError {
fn from(value: base64::DecodeError) -> Self {
Self::DecodeError(format!("{}", value))
}
}

impl From<cxx::Exception> for DataSketchesError {
fn from(value: cxx::Exception) -> Self {
Self::CXXError(format!("{}", value))
}
}
Loading