Skip to content

Commit

Permalink
fix: Make write_lp work for both multitenant and singletenant
Browse files Browse the repository at this point in the history
  • Loading branch information
Marko Mikulicic committed Oct 19, 2023
1 parent 2f0e2dc commit 4bccc0f
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 28 deletions.
8 changes: 6 additions & 2 deletions influxdb_iox/src/commands/write.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use futures::{stream::BoxStream, StreamExt};
use influxdb_iox_client::{connection::Connection, write};
use influxdb_iox_client::{
connection::Connection,
write::{self, DatabaseName},
};
use observability_deps::tracing::{debug, info};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{
Expand Down Expand Up @@ -150,8 +153,9 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
.with_max_concurrent_uploads(max_concurrent_uploads)
.with_max_request_payload_size_bytes(Some(max_request_payload_size_bytes));

let database = DatabaseName::split_org_db(namespace).context(ClientSnafu)?;
let total_bytes = client
.write_lp_stream(namespace, lp_stream)
.write_lp_stream(database, lp_stream)
.await
.context(ClientSnafu)?;

Expand Down
20 changes: 10 additions & 10 deletions influxdb_iox_client/src/client/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,16 @@ pub enum Error {
Unknown(ServerError<()>),

#[error("Client specified an invalid argument: {0}")]
InvalidArgument(ServerError<FieldViolation>),
InvalidArgument(Box<ServerError<FieldViolation>>),

#[error("Deadline expired before operation could complete: {0}")]
DeadlineExceeded(ServerError<()>),

#[error("{0}")]
NotFound(ServerError<NotFound>),
NotFound(Box<ServerError<NotFound>>),

#[error("Some entity that we attempted to create already exists: {0}")]
AlreadyExists(ServerError<AlreadyExists>),
AlreadyExists(Box<ServerError<AlreadyExists>>),

#[error("The caller does not have permission to execute the specified operation: {0}")]
PermissionDenied(ServerError<()>),
Expand All @@ -86,7 +86,7 @@ pub enum Error {
ResourceExhausted(ServerError<()>),

#[error("The system is not in a state required for the operation's execution: {0}")]
FailedPrecondition(ServerError<PreconditionViolation>),
FailedPrecondition(Box<ServerError<PreconditionViolation>>),

#[error("The operation was aborted: {0}")]
Aborted(ServerError<()>),
Expand Down Expand Up @@ -122,13 +122,13 @@ impl From<tonic::Status> for Error {
Code::Ok => Self::Client("status is not an error".into()),
Code::Cancelled => Self::Cancelled(parse_status(s)),
Code::Unknown => Self::Unknown(parse_status(s)),
Code::InvalidArgument => Self::InvalidArgument(parse_status(s)),
Code::InvalidArgument => Self::InvalidArgument(Box::new(parse_status(s))),
Code::DeadlineExceeded => Self::DeadlineExceeded(parse_status(s)),
Code::NotFound => Self::NotFound(parse_status(s)),
Code::AlreadyExists => Self::AlreadyExists(parse_status(s)),
Code::NotFound => Self::NotFound(Box::new(parse_status(s))),
Code::AlreadyExists => Self::AlreadyExists(Box::new(parse_status(s))),
Code::PermissionDenied => Self::PermissionDenied(parse_status(s)),
Code::ResourceExhausted => Self::ResourceExhausted(parse_status(s)),
Code::FailedPrecondition => Self::FailedPrecondition(parse_status(s)),
Code::FailedPrecondition => Self::FailedPrecondition(Box::new(parse_status(s))),
Code::Aborted => Self::Aborted(parse_status(s)),
Code::OutOfRange => Self::OutOfRange(parse_status(s)),
Code::Unimplemented => Self::Unimplemented(parse_status(s)),
Expand Down Expand Up @@ -170,13 +170,13 @@ impl Error {
let field_name = field_name.into();
let description = description.into();

Self::InvalidArgument(ServerError {
Self::InvalidArgument(Box::new(ServerError {
message: format!("Invalid argument for '{field_name}': {description}"),
details: Some(FieldViolation {
field: field_name,
description,
}),
})
}))
}
}

Expand Down
126 changes: 110 additions & 16 deletions influxdb_iox_client/src/client/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,62 @@ use reqwest::{Body, Method};
/// The default value for the maximum size of each request, in bytes
pub const DEFAULT_MAX_REQUEST_PAYLOAD_SIZE_BYTES: Option<usize> = Some(1024 * 1024);

/// Name of a database.
#[derive(Debug, Clone)]
pub struct DatabaseName {
/// The database name.
database: String,
/// Optionally, an Org ID. This is used only by multi-tenant instances of InfluxDB (such as InfluxDB serverless)
/// and must be None if writing to a single-tenant instance (such as InfluxDB OSS or Influxdb Clustered)
org: Option<String>,
}

impl DatabaseName {
/// Create a single tenant database name from a string.
///
/// You can also just pass a &str or a String (or anything that has a `AsRef<str>` impl) directly
/// all places that expect a DatabaseName, so generally you don't need to call this function.
pub fn from_db(database: &str) -> Self {
database.into()
}

/// Create a MultiTenant DatabaseName.
pub fn from_org_db(org: impl AsRef<str>, database: impl AsRef<str>) -> Self {
Self {
org: Some(org.as_ref().to_owned()),
database: database.as_ref().to_owned(),
}
}

/// Create a DatabaseName by splitting a single string formatted as `org_bucket`.
/// This format is useful when porting legacy code that used IOx internal org_db -> namespace encoding.
pub fn split_org_db(namespace: impl AsRef<str>) -> Result<Self, Error> {
let (org, database) = split_namespace(namespace.as_ref()).map_err(|e| {
Error::invalid_argument(
"namespace",
format!("Could not find valid org_id and bucket_id: {e}"),
)
})?;
Ok(Self::from_org_db(org, database))
}

/// Internally, we speak the v2 protocol which has an "org" parameter. Single tenant instances of InfluxDB
/// will tolerate the presence of an "org" parameter provided it's an empty string.
fn get_org_bucket(&self) -> (String, String) {
let name = self.clone();
(name.org.unwrap_or_default(), name.database)
}
}

impl<T: AsRef<str>> From<T> for DatabaseName {
fn from(value: T) -> Self {
Self {
database: value.as_ref().to_owned(),
org: None,
}
}
}

/// An IOx Write API client.
///
/// ```no_run
Expand Down Expand Up @@ -99,44 +155,40 @@ impl Client {
}

/// Write the [LineProtocol] formatted string in `lp_data` to
/// namespace `namespace`.
/// namespace `database`.
///
/// Returns the number of bytes which were written to the namespace.
///
/// [LineProtocol]: https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format
pub async fn write_lp(
&mut self,
namespace: impl AsRef<str> + Send,
database: impl Into<DatabaseName> + Send,
lp_data: impl Into<String> + Send,
) -> Result<usize, Error> {
let sources = futures_util::stream::iter([lp_data.into()]);

self.write_lp_stream(namespace, sources).await
self.write_lp_stream(database, sources).await
}

/// Write the stream of [LineProtocol] formatted strings in
/// `sources` to namespace `namespace`. It is assumed that
/// individual lines (points) do not cross these strings
/// `sources` to database `database`. It is assumed that
/// individual lines (points) do not cross these strings.
///
/// Returns the number of bytes, in total, which were written to
/// the namespace.
///
/// [LineProtocol]: https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format
pub async fn write_lp_stream(
&mut self,
namespace: impl AsRef<str> + Send,
database: impl Into<DatabaseName> + Send,
sources: impl Stream<Item = String> + Send,
) -> Result<usize, Error> {
let (org_id, bucket_id) = split_namespace(namespace.as_ref()).map_err(|e| {
Error::invalid_argument(
"namespace",
format!("Could not find valid org_id and bucket_id: {e}"),
)
})?;

let max_concurrent_uploads: usize = self.max_concurrent_uploads.into();
let max_request_payload_size_bytes = self.max_request_payload_size_bytes;

let database = database.into();
let (org_id, bucket_id) = database.get_org_bucket();

// make a stream and process in parallel
let results = sources
// split each input source in parallel, if possible
Expand Down Expand Up @@ -306,7 +358,7 @@ mod tests {
async fn test() {
let mock = Arc::new(MockRequestMaker::new());

let namespace = "orgname_bucketname";
let namespace = DatabaseName::from_org_db("orgname", "bucketname");
let data = "m,t=foo f=4";

let expected = vec![MockRequest {
Expand All @@ -323,11 +375,53 @@ mod tests {
assert_eq!(num_bytes, 11);
}

#[tokio::test]
async fn test_underscore() {
let mock = Arc::new(MockRequestMaker::new());

let namespace = DatabaseName::from_org_db("orgname", "bucket_name");
let data = "m,t=foo f=4";

let expected = vec![MockRequest {
org_id: "orgname".into(),
bucket_id: "bucket_name".into(),
body: data.into(),
}];

let num_bytes = Client::new_with_maker(Arc::clone(&mock) as _)
.write_lp(namespace, data)
.await
.unwrap();
assert_eq!(expected, mock.requests());
assert_eq!(num_bytes, 11);
}

#[tokio::test]
async fn test_single_tenant() {
let mock = Arc::new(MockRequestMaker::new());

let namespace = "bucket_name";
let data = "m,t=foo f=4";

let expected = vec![MockRequest {
org_id: "".into(),
bucket_id: "bucket_name".into(),
body: data.into(),
}];

let num_bytes = Client::new_with_maker(Arc::clone(&mock) as _)
.write_lp(namespace, data)
.await
.unwrap();
assert_eq!(expected, mock.requests());
assert_eq!(num_bytes, 11);
}

#[tokio::test]
async fn test_max_request_payload_size() {
let mock = Arc::new(MockRequestMaker::new());

let namespace = "orgname_bucketname";
let namespace = DatabaseName::from_org_db("orgname", "bucketname");
let data = "m,t=foo f=4\n\
m,t=bar f=3\n\
m,t=fooddddddd f=4";
Expand Down Expand Up @@ -360,7 +454,7 @@ mod tests {
async fn test_write_lp_stream() {
let mock = Arc::new(MockRequestMaker::new());

let namespace = "orgname_bucketname";
let namespace = DatabaseName::from_org_db("orgname", "bucketname");
let data = futures_util::stream::iter(
vec!["m,t=foo f=4", "m,t=bar f=3"]
.into_iter()
Expand Down

0 comments on commit 4bccc0f

Please sign in to comment.