diff --git a/influxdb_iox/src/commands/write.rs b/influxdb_iox/src/commands/write.rs index 8c3fb352e68..1f0f678bcb3 100644 --- a/influxdb_iox/src/commands/write.rs +++ b/influxdb_iox/src/commands/write.rs @@ -1,5 +1,8 @@ use futures::{stream::BoxStream, StreamExt}; -use influxdb_iox_client::{connection::Connection, write}; +use influxdb_iox_client::{ + connection::Connection, + write::{self, DatabaseName}, +}; use observability_deps::tracing::{debug, info}; use snafu::{ensure, OptionExt, ResultExt, Snafu}; use std::{ @@ -150,8 +153,9 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> { .with_max_concurrent_uploads(max_concurrent_uploads) .with_max_request_payload_size_bytes(Some(max_request_payload_size_bytes)); + let database = DatabaseName::split_org_db(namespace).context(ClientSnafu)?; let total_bytes = client - .write_lp_stream(namespace, lp_stream) + .write_lp_stream(database, lp_stream) .await .context(ClientSnafu)?; diff --git a/influxdb_iox_client/src/client/error.rs b/influxdb_iox_client/src/client/error.rs index ffb4577ba9c..fff676e20e5 100644 --- a/influxdb_iox_client/src/client/error.rs +++ b/influxdb_iox_client/src/client/error.rs @@ -68,16 +68,16 @@ pub enum Error { Unknown(ServerError<()>), #[error("Client specified an invalid argument: {0}")] - InvalidArgument(ServerError), + InvalidArgument(Box>), #[error("Deadline expired before operation could complete: {0}")] DeadlineExceeded(ServerError<()>), #[error("{0}")] - NotFound(ServerError), + NotFound(Box>), #[error("Some entity that we attempted to create already exists: {0}")] - AlreadyExists(ServerError), + AlreadyExists(Box>), #[error("The caller does not have permission to execute the specified operation: {0}")] PermissionDenied(ServerError<()>), @@ -86,7 +86,7 @@ pub enum Error { ResourceExhausted(ServerError<()>), #[error("The system is not in a state required for the operation's execution: {0}")] - FailedPrecondition(ServerError), + FailedPrecondition(Box>), #[error("The operation was aborted: {0}")] Aborted(ServerError<()>), @@ -122,13 +122,13 @@ impl From for Error { Code::Ok => Self::Client("status is not an error".into()), Code::Cancelled => Self::Cancelled(parse_status(s)), Code::Unknown => Self::Unknown(parse_status(s)), - Code::InvalidArgument => Self::InvalidArgument(parse_status(s)), + Code::InvalidArgument => Self::InvalidArgument(Box::new(parse_status(s))), Code::DeadlineExceeded => Self::DeadlineExceeded(parse_status(s)), - Code::NotFound => Self::NotFound(parse_status(s)), - Code::AlreadyExists => Self::AlreadyExists(parse_status(s)), + Code::NotFound => Self::NotFound(Box::new(parse_status(s))), + Code::AlreadyExists => Self::AlreadyExists(Box::new(parse_status(s))), Code::PermissionDenied => Self::PermissionDenied(parse_status(s)), Code::ResourceExhausted => Self::ResourceExhausted(parse_status(s)), - Code::FailedPrecondition => Self::FailedPrecondition(parse_status(s)), + Code::FailedPrecondition => Self::FailedPrecondition(Box::new(parse_status(s))), Code::Aborted => Self::Aborted(parse_status(s)), Code::OutOfRange => Self::OutOfRange(parse_status(s)), Code::Unimplemented => Self::Unimplemented(parse_status(s)), @@ -170,13 +170,13 @@ impl Error { let field_name = field_name.into(); let description = description.into(); - Self::InvalidArgument(ServerError { + Self::InvalidArgument(Box::new(ServerError { message: format!("Invalid argument for '{field_name}': {description}"), details: Some(FieldViolation { field: field_name, description, }), - }) + })) } } diff --git a/influxdb_iox_client/src/client/write.rs b/influxdb_iox_client/src/client/write.rs index 36a67d9c0cf..b9e471c9a37 100644 --- a/influxdb_iox_client/src/client/write.rs +++ b/influxdb_iox_client/src/client/write.rs @@ -12,6 +12,62 @@ use reqwest::{Body, Method}; /// The default value for the maximum size of each request, in bytes pub const DEFAULT_MAX_REQUEST_PAYLOAD_SIZE_BYTES: Option = Some(1024 * 1024); +/// Name of a database. +#[derive(Debug, Clone)] +pub struct DatabaseName { + /// The database name. + database: String, + /// Optionally, an Org ID. This is used only by multi-tenant instances of InfluxDB (such as InfluxDB serverless) + /// and must be None if writing to a single-tenant instance (such as InfluxDB OSS or Influxdb Clustered) + org: Option, +} + +impl DatabaseName { + /// Create a single tenant database name from a string. + /// + /// You can also just pass a &str or a String (or anything that has a `AsRef` impl) directly + /// all places that expect a DatabaseName, so generally you don't need to call this function. + pub fn from_db(database: &str) -> Self { + database.into() + } + + /// Create a MultiTenant DatabaseName. + pub fn from_org_db(org: impl AsRef, database: impl AsRef) -> Self { + Self { + org: Some(org.as_ref().to_owned()), + database: database.as_ref().to_owned(), + } + } + + /// Create a DatabaseName by splitting a single string formatted as `org_bucket`. + /// This format is useful when porting legacy code that used IOx internal org_db -> namespace encoding. + pub fn split_org_db(namespace: impl AsRef) -> Result { + let (org, database) = split_namespace(namespace.as_ref()).map_err(|e| { + Error::invalid_argument( + "namespace", + format!("Could not find valid org_id and bucket_id: {e}"), + ) + })?; + Ok(Self::from_org_db(org, database)) + } + + /// Internally, we speak the v2 protocol which has an "org" parameter. Single tenant instances of InfluxDB + /// will tolerate the presence of an "org" parameter provided it's an empty string. + fn get_org_bucket(&self) -> (String, String) { + let name = self.clone(); + (name.org.unwrap_or_default(), name.database) + } +} + +impl> From for DatabaseName { + fn from(value: T) -> Self { + Self { + database: value.as_ref().to_owned(), + org: None, + } + } +} + /// An IOx Write API client. /// /// ```no_run @@ -99,24 +155,24 @@ impl Client { } /// Write the [LineProtocol] formatted string in `lp_data` to - /// namespace `namespace`. + /// namespace `database`. /// /// Returns the number of bytes which were written to the namespace. /// /// [LineProtocol]: https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format pub async fn write_lp( &mut self, - namespace: impl AsRef + Send, + database: impl Into + Send, lp_data: impl Into + Send, ) -> Result { let sources = futures_util::stream::iter([lp_data.into()]); - self.write_lp_stream(namespace, sources).await + self.write_lp_stream(database, sources).await } /// Write the stream of [LineProtocol] formatted strings in - /// `sources` to namespace `namespace`. It is assumed that - /// individual lines (points) do not cross these strings + /// `sources` to database `database`. It is assumed that + /// individual lines (points) do not cross these strings. /// /// Returns the number of bytes, in total, which were written to /// the namespace. @@ -124,19 +180,15 @@ impl Client { /// [LineProtocol]: https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format pub async fn write_lp_stream( &mut self, - namespace: impl AsRef + Send, + database: impl Into + Send, sources: impl Stream + Send, ) -> Result { - let (org_id, bucket_id) = split_namespace(namespace.as_ref()).map_err(|e| { - Error::invalid_argument( - "namespace", - format!("Could not find valid org_id and bucket_id: {e}"), - ) - })?; - let max_concurrent_uploads: usize = self.max_concurrent_uploads.into(); let max_request_payload_size_bytes = self.max_request_payload_size_bytes; + let database = database.into(); + let (org_id, bucket_id) = database.get_org_bucket(); + // make a stream and process in parallel let results = sources // split each input source in parallel, if possible @@ -306,7 +358,7 @@ mod tests { async fn test() { let mock = Arc::new(MockRequestMaker::new()); - let namespace = "orgname_bucketname"; + let namespace = DatabaseName::from_org_db("orgname", "bucketname"); let data = "m,t=foo f=4"; let expected = vec![MockRequest { @@ -323,11 +375,53 @@ mod tests { assert_eq!(num_bytes, 11); } + #[tokio::test] + async fn test_underscore() { + let mock = Arc::new(MockRequestMaker::new()); + + let namespace = DatabaseName::from_org_db("orgname", "bucket_name"); + let data = "m,t=foo f=4"; + + let expected = vec![MockRequest { + org_id: "orgname".into(), + bucket_id: "bucket_name".into(), + body: data.into(), + }]; + + let num_bytes = Client::new_with_maker(Arc::clone(&mock) as _) + .write_lp(namespace, data) + .await + .unwrap(); + assert_eq!(expected, mock.requests()); + assert_eq!(num_bytes, 11); + } + + #[tokio::test] + async fn test_single_tenant() { + let mock = Arc::new(MockRequestMaker::new()); + + let namespace = "bucket_name"; + let data = "m,t=foo f=4"; + + let expected = vec![MockRequest { + org_id: "".into(), + bucket_id: "bucket_name".into(), + body: data.into(), + }]; + + let num_bytes = Client::new_with_maker(Arc::clone(&mock) as _) + .write_lp(namespace, data) + .await + .unwrap(); + assert_eq!(expected, mock.requests()); + assert_eq!(num_bytes, 11); + } + #[tokio::test] async fn test_max_request_payload_size() { let mock = Arc::new(MockRequestMaker::new()); - let namespace = "orgname_bucketname"; + let namespace = DatabaseName::from_org_db("orgname", "bucketname"); let data = "m,t=foo f=4\n\ m,t=bar f=3\n\ m,t=fooddddddd f=4"; @@ -360,7 +454,7 @@ mod tests { async fn test_write_lp_stream() { let mock = Arc::new(MockRequestMaker::new()); - let namespace = "orgname_bucketname"; + let namespace = DatabaseName::from_org_db("orgname", "bucketname"); let data = futures_util::stream::iter( vec!["m,t=foo f=4", "m,t=bar f=3"] .into_iter()