Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Support for Multi-Level Partition Tables #115

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ testcontainers = "0.16.7"
testcontainers-modules = { version = "0.4.3", features = ["localstack"] }
time = { version = "0.3.36", features = ["serde"] }
geojson = "0.24.1"
rand = { version = "0.8.5" }
approx = "0.5.1"
philippemnoel marked this conversation as resolved.
Show resolved Hide resolved

[[bin]]
name = "pgrx_embed_pg_analytics"
Expand Down
63 changes: 61 additions & 2 deletions tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ pub mod arrow;
pub mod db;
pub mod tables;

use anyhow::Result;
use anyhow::{Context, Result};
use async_std::task::block_on;
use aws_config::{BehaviorVersion, Region};
use aws_sdk_s3::primitives::ByteStream;
use bytes::Bytes;
use chrono::{DateTime, Duration};
use datafusion::arrow::array::*;
use datafusion::arrow::array::{Int32Array, TimestampMillisecondArray};
use datafusion::arrow::datatypes::TimeUnit::Millisecond;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::{
arrow::{datatypes::FieldRef, record_batch::RecordBatch},
parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder,
parquet::arrow::ArrowWriter,
};
use futures::future::{BoxFuture, FutureExt};
Expand Down Expand Up @@ -124,6 +126,17 @@ impl S3 {
Ok(())
}

/// Uploads a `RecordBatch` to the specified S3 bucket and key.
///
/// # Arguments
///
/// * `bucket` - The name of the S3 bucket.
/// * `key` - The key under which to store the batch.
/// * `batch` - The `RecordBatch` to upload.
///
/// # Errors
///
/// Returns an error if the object cannot be written to S3.
#[allow(unused)]
pub async fn put_batch(&self, bucket: &str, key: &str, batch: &RecordBatch) -> Result<()> {
let mut buf = vec![];
Expand All @@ -141,6 +154,52 @@ impl S3 {
Ok(())
}

philippemnoel marked this conversation as resolved.
Show resolved Hide resolved
/// Fetches a `RecordBatch` from the specified S3 bucket and key.
///
/// # Arguments
///
/// * `bucket` - The name of the S3 bucket.
/// * `key` - The key where the batch is stored.
///
/// # Returns
///
/// Returns the `RecordBatch` if the object is successfully fetched and parsed.
///
/// # Errors
///
/// Returns an error if the object cannot be retrieved from S3 or if it cannot be parsed into a `RecordBatch`.
#[allow(unused)]
pub async fn get_batch(&self, bucket: &str, key: &str) -> Result<RecordBatch> {
// Retrieve the object from S3
let get_object_output = self
.client
.get_object()
.bucket(bucket)
.key(key)
.send()
.await
.context("Failed to get object from S3")?;

// Read the body of the object
let body = get_object_output.body.collect().await?;
let bytes: Bytes = body.into_bytes();

// Create a Parquet reader
let builder = ParquetRecordBatchReaderBuilder::try_new(bytes)
.context("Failed to create Parquet reader builder")?;

// Create the reader
let mut reader = builder.build().context("Failed to build Parquet reader")?;

// Read the first batch
let record_batch = reader
.next()
.context("No batches found in Parquet file")?
.context("Failed to read batch")?;

Ok(record_batch)
}

#[allow(unused)]
pub async fn put_rows<T: Serialize>(&self, bucket: &str, key: &str, rows: &[T]) -> Result<()> {
let fields = Vec::<FieldRef>::from_type::<NycTripsTable>(TracingOptions::default())?;
Expand Down
Loading