Skip to content

Commit

Permalink
move examples from aws to http
Browse files Browse the repository at this point in the history
  • Loading branch information
CarlKCarlK committed Jan 20, 2024
1 parent 543c26d commit 204c162
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 28 deletions.
6 changes: 1 addition & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,7 @@ From the cloud: open a file and read data for one SNP (variant)
at index position 2.

```python
>>> from bed_reader import sample_url
>>> url = sample_url("small.bed")
>>> print(f"{url}") # Example output: "file:///.../small.bed"
file:///.../small.bed
>>> with open_bed(url, cloud_options={}) as bed:
>>> with open_bed("https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/small.bed") as bed:
... val = bed.read(index=np.s_[:,2], dtype="float64")
... print(val)
[[nan]
Expand Down
30 changes: 21 additions & 9 deletions bed_reader/tests/test_open_bed_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -1038,14 +1038,9 @@ def test_url_errors(shared_datadir):


def test_readme_example():
from bed_reader import sample_url

url = sample_url("small.bed")
print(
f"{url}"
) # For example, "file:///C:/Users/carlk/AppData/Local/bed_reader/bed_reader/Cache/small.bed"
# file:///.../small.bed
with open_bed(url, cloud_options={}) as bed:
with open_bed(
"https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/small.bed",
) as bed:
val = bed.read(index=np.s_[:, 2], dtype="float64")
print(val)
# [[nan]
Expand All @@ -1055,7 +1050,7 @@ def test_readme_example():

def test_http_one():
with open_bed(
"https://raw.githubusercontent.com/fastlmm/bed-reader/rustybed/bed_reader/tests/data/some_missing.bed",
"https://raw.githubusercontent.com/fastlmm/bed-sample-files/main/some_missing.bed",
cloud_options={},
) as bed:
print(bed.iid[:5])
Expand Down Expand Up @@ -1085,6 +1080,23 @@ def test_http_two():
assert val.shape == (10, 10) or val.shape == (10, 11)


def test_http_two_slow():
from bed_reader import open_bed

with open_bed(
"https://www.ebi.ac.uk/biostudies/files/S-BSST936/genotypes/synthetic_v1_chr-10.bed",
cloud_options={"timeout": "100s"},
skip_format_check=True,
) as bed:
print(f"iid_count={bed.iid_count:_}, sid_count={bed.sid_count:_}")
print(f"iid={bed.iid[:5]}...")
print(f"sid={bed.sid[:5]}...")
print(f"unique chromosomes = {np.unique(bed.chromosome)}")
val = bed.read(index=np.s_[:10, :: bed.sid_count // 10])
print(f"val={val}")
assert val.shape == (10, 10) or val.shape == (10, 11)


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)

Expand Down
85 changes: 74 additions & 11 deletions src/bed_cloud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ use itertools::Itertools;
use nd::ShapeBuilder;
use ndarray as nd;
use object_store::delimited::newline_delimited_stream;
use object_store::http::HttpBuilder;
use object_store::local::LocalFileSystem;
use object_store::path::Path as StorePath;
use object_store::ObjectStore;
use object_store::{GetOptions, GetRange, GetResult};
use object_store::{ObjectMeta, ObjectStore};
use std::cmp::max;
use std::collections::HashSet;
use std::path::PathBuf;
Expand Down Expand Up @@ -367,10 +368,10 @@ async fn open_and_check<TObjectStore>(
where
TObjectStore: ObjectStore,
{
let object_store = object_path.object_store.clone();
let path: &StorePath = &object_path.path;
let object_meta: ObjectMeta = object_store.head(path).await?;
let size: usize = object_meta.size;
// let object_store = object_path.object_store.clone();
// let path: &StorePath = &object_path.path;
// let object_meta: ObjectMeta = object_store.head(path).await?;
// let size: usize = object_meta.size;

let get_options = GetOptions {
range: Some(GetRange::Bounded(0..CB_HEADER_U64 as usize)),
Expand All @@ -379,6 +380,7 @@ where
let object_store = object_path.object_store.clone();
let path: &StorePath = &object_path.path;
let get_result = object_store.get_opts(path, get_options).await?;
let size: usize = get_result.meta.size;
let bytes = get_result.bytes().await?;

if (BED_FILE_MAGIC1 != bytes[0]) || (BED_FILE_MAGIC2 != bytes[1]) {
Expand Down Expand Up @@ -2380,9 +2382,10 @@ pub fn sample_urls(path_list: AnyIter<AnyPath>) -> Result<Vec<String>, Box<BedEr
#[derive(Debug)]
/// The location of a file in the cloud.
///
/// The location is made up of of two parts, an `Arc`-wrapped [`ObjectStore`](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html) and an [`object_store::path::Path as StorePath`](https://docs.rs/object_store/latest/object_store/path/struct.Path.html).
/// The [`ObjectStore`](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html) is a cloud service, for example, AWS S3, Azure, the local file system, etc.
/// The `StorePath` is the path to the file on the cloud service.
/// The location is made up of of two parts, an `Arc`-wrapped [`ObjectStore`](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html)
/// and an [`object_store::path::Path as StorePath`](https://docs.rs/object_store/latest/object_store/path/struct.Path.html).
/// The [`ObjectStore`](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html) is a cloud service, for example, Http, AWS S3, Azure,
/// the local file system, etc. The `StorePath` is the path to the file on the cloud service.
///
/// See ["Cloud URLs and `ObjectPath` Examples"](supplemental_document_cloud_urls/index.html) for details specifying a file.
///
Expand All @@ -2408,9 +2411,11 @@ pub struct ObjectPath<TObjectStore>
where
TObjectStore: ObjectStore,
{
/// An `Arc`-wrapped [`ObjectStore`](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html) cloud service, for example, AWS S3, Azure, the local file system, etc.
/// An `Arc`-wrapped [`ObjectStore`](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html) cloud service, for example, Http, AWS S3,
/// Azure, the local file system, etc.
pub object_store: Arc<TObjectStore>,
/// A [`object_store::path::Path as StorePath`](https://docs.rs/object_store/latest/object_store/path/struct.Path.html) that points to a file on the [`ObjectStore`](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html)
/// A [`object_store::path::Path as StorePath`](https://docs.rs/object_store/latest/object_store/path/struct.Path.html) that points to a file on
/// the [`ObjectStore`](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html)
/// that gives the path to the file on the cloud service.
pub path: StorePath,
}
Expand Down Expand Up @@ -2464,12 +2469,70 @@ impl ObjectPath<Box<dyn ObjectStore>> {
.map_err(|e| BedError::CannotParseUrl(location.to_string(), e.to_string()))?;

let (object_store, store_path): (Box<dyn ObjectStore>, StorePath) =
object_store::parse_url_opts(&url, options)?;
parse_url_opts_work_around(&url, options)?;
let object_path = ObjectPath::new(Arc::new(object_store), store_path);
Ok(object_path)
}
}

#[allow(clippy::match_bool)]
fn parse_work_around(url: &Url) -> Result<(bool, StorePath), object_store::Error> {
let strip_bucket = || Some(url.path().strip_prefix('/')?.split_once('/')?.1);

let (scheme, path) = match (url.scheme(), url.host_str()) {
("http", Some(_)) => (true, url.path()),
("https", Some(host)) => {
if host.ends_with("dfs.core.windows.net")
|| host.ends_with("blob.core.windows.net")
|| host.ends_with("dfs.fabric.microsoft.com")
|| host.ends_with("blob.fabric.microsoft.com")
{
(false, url.path())
} else if host.ends_with("amazonaws.com") {
match host.starts_with("s3") {
true => (false, strip_bucket().unwrap_or_default()),
false => (false, url.path()),
}
} else if host.ends_with("r2.cloudflarestorage.com") {
(false, strip_bucket().unwrap_or_default())
} else {
(true, url.path())
}
}
_ => (false, url.path()),
};

Ok((scheme, StorePath::from_url_path(path)?))
}

// LATER when https://github.com/apache/arrow-rs/issues/5310 gets fixed, can remove work around
pub fn parse_url_opts_work_around<I, K, V>(
url: &Url,
options: I,
) -> Result<(Box<dyn ObjectStore>, StorePath), object_store::Error>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
V: Into<String>,
{
let (is_http, path) = parse_work_around(url)?;
if is_http {
let url = &url[..url::Position::BeforePath];
let path = StorePath::parse(path)?;
let builder = options.into_iter().fold(
<HttpBuilder>::new().with_url(url),
|builder, (key, value)| match key.as_ref().parse() {
Ok(k) => builder.with_config(k, value),
Err(_) => builder,
},
);
let store = Box::new(builder.build()?) as _;
Ok((store, path))
} else {
object_store::parse_url_opts(url, options)
}
}

impl<TObjectStore> ObjectPath<TObjectStore>
where
TObjectStore: ObjectStore,
Expand Down
1 change: 1 addition & 0 deletions src/supplemental_documents/cloud_urls_etc.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ To specify a file in the cloud, you must specify either
* a URL string plus options, or
* an [`ObjectPath`](../struct.ObjectPath.html)

<!-- cmk: and http -->
Let's look at how to do this [for a local file](#local-file) and [for AWS S3](#aws-s3).

## Local File
Expand Down
1 change: 1 addition & 0 deletions src/supplemental_documents/options_etc.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ When specifing a file in the cloud via a URL, we use methods [`BedCloud::new(url

The cloud providers forbid putting some needed information in the URL. Instead, that information must
go into `options`.
<!-- cmk: and http -->
For example, AWS S3 requires that information about `"aws_region"`, `"aws_access_key_id"`, and `"aws_secret_access_key"` be placed in the options.

Here is an AWS example:
Expand Down
6 changes: 3 additions & 3 deletions tests/tests_api_cloud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2429,7 +2429,6 @@ async fn http_one() -> Result<(), Box<BedErrorPlus>> {
async fn http_two() -> Result<(), Box<BedErrorPlus>> {
let local_fam_file = sample_file("synthetic_v1_chr-10.fam")?;
let local_bim_file = sample_file("synthetic_v1_chr-10.bim")?;
// cmk make this a const
let empty_skip_set = HashSet::<MetadataFields>::new();
let metadata = Metadata::new()
.read_fam(local_fam_file, &empty_skip_set)?
Expand All @@ -2440,7 +2439,7 @@ async fn http_two() -> Result<(), Box<BedErrorPlus>> {
// Open the bed file with a URL and any needed cloud options, then use as before.
let mut bed_cloud = BedCloud::builder(
"https://www.ebi.ac.uk/biostudies/files/S-BSST936/genotypes/synthetic_v1_chr-10.bed",
[("timeout", "100")], // cmk must figure this out
[("timeout", "100s")],
)?
.metadata(&metadata)
.skip_early_check()
Expand Down Expand Up @@ -2487,6 +2486,7 @@ async fn http_object_path() -> Result<(), Box<BedErrorPlus>> {
.skip_early_check()
.build()
.await?;
println!("{:?}", bed_cloud);
println!("{:?}", bed_cloud.iid().await?.slice(s![..5]));
println!("{:?}", bed_cloud.sid().await?.slice(s![..5]));
Ok(())
Expand All @@ -2498,7 +2498,7 @@ async fn http_long_url() -> Result<(), Box<BedErrorPlus>> {
// Open the bed file with a URL and any needed cloud options, then use as before.
let mut bed_cloud = BedCloud::builder(
"https://www.ebi.ac.uk/biostudies/files/S-BSST936/example/synthetic_small_v1_chr-10.bed",
[("timeout", "1000")], // cmk must figure this out
[("timeout", "1000s")],
)?
.skip_early_check()
.build()
Expand Down

0 comments on commit 204c162

Please sign in to comment.