Skip to content

Commit

Permalink
feat(r): Add infrastructure methods for dealing with chunked values (#75
Browse files Browse the repository at this point in the history
)

Not polished yet! I had intended to start on the actual vctr bit a few
weeks ago but didn't get far past some very boring details.
  • Loading branch information
paleolimbot authored Nov 26, 2023
1 parent c574a6a commit 7ae1089
Show file tree
Hide file tree
Showing 7 changed files with 529 additions and 16 deletions.
11 changes: 11 additions & 0 deletions r/geoarrow/NAMESPACE
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Generated by roxygen2: do not edit by hand

S3method("[",geoarrow_vctr)
S3method(as_geoarrow_array,character)
S3method(as_geoarrow_array,default)
S3method(as_geoarrow_array,nanoarrow_array)
Expand All @@ -8,15 +9,21 @@ S3method(as_geoarrow_array,wk_wkb)
S3method(as_geoarrow_array,wk_wkt)
S3method(as_geoarrow_array,wk_xy)
S3method(as_geoarrow_array_stream,default)
S3method(as_geoarrow_array_stream,geoarrow_vctr)
S3method(as_geoarrow_array_stream,nanoarrow_array_stream)
S3method(as_nanoarrow_array,sfc)
S3method(as_nanoarrow_array_stream,geoarrow_vctr)
S3method(infer_geoarrow_schema,default)
S3method(infer_geoarrow_schema,nanoarrow_array)
S3method(infer_geoarrow_schema,nanoarrow_array_stream)
S3method(infer_nanoarrow_schema,geoarrow_vctr)
S3method(infer_nanoarrow_schema,sfc)
S3method(infer_nanoarrow_schema,wk_wkb)
S3method(infer_nanoarrow_schema,wk_wkt)
S3method(infer_nanoarrow_schema,wk_xy)
S3method(wk_crs,geoarrow_vctr)
S3method(wk_handle,geoarrow_vctr)
S3method(wk_is_geodesic,geoarrow_vctr)
export(as_geoarrow_array)
export(as_geoarrow_array_stream)
export(geoarrow_handle)
Expand All @@ -29,5 +36,9 @@ export(na_extension_large_wkt)
export(na_extension_wkb)
export(na_extension_wkt)
importFrom(nanoarrow,as_nanoarrow_array)
importFrom(nanoarrow,as_nanoarrow_array_stream)
importFrom(nanoarrow,infer_nanoarrow_schema)
importFrom(wk,wk_crs)
importFrom(wk,wk_handle)
importFrom(wk,wk_is_geodesic)
useDynLib(geoarrow, .registration = TRUE)
148 changes: 148 additions & 0 deletions r/geoarrow/R/vctr.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@

as_geoarrow_vctr <- function(x, ..., schema = NULL) {
if (inherits(x, "geoarrow_vctr") && is.null(schema)) {
return(x)
}

stream <- as_geoarrow_array_stream(x, ..., schema = schema)
chunks <- nanoarrow::collect_array_stream(stream, validate = FALSE)
new_geoarrow_vctr(chunks, stream$get_schema())
}

new_geoarrow_vctr <- function(chunks, schema, indices = NULL) {
offsets <- .Call(geoarrow_c_vctr_chunk_offsets, chunks)
if (is.null(indices)) {
indices <- seq_len(offsets[length(offsets)])
}

structure(
indices,
schema = schema,
chunks = chunks,
offsets = offsets,
class = c("geoarrow_vctr", "wk_vctr")
)
}

#' @export
`[.geoarrow_vctr` <- function(x, i) {
attrs <- attributes(x)
x <- NextMethod()
# Assert slice?
attributes(x) <- attrs
x
}

#' @export
infer_nanoarrow_schema.geoarrow_vctr <- function(x, ...) {
attr(x, "schema")
}

#' @export
as_geoarrow_array_stream.geoarrow_vctr <- function(x, ..., schema = NULL) {
as_nanoarrow_array_stream.geoarrow_vctr(x, ..., schema = NULL)
}

#' @importFrom nanoarrow as_nanoarrow_array_stream
#' @export
as_nanoarrow_array_stream.geoarrow_vctr <- function(x, ..., schema = NULL) {
slice <- vctr_as_slice(x)
if (is.null(slice)) {
stop("Can't resolve non-slice geoarrow_vctr to nanoarrow_array_stream")
}

x_schema <- attr(x, "schema")

# Zero-size slice can be an array stream with zero batches
if (slice[2] == 0) {
return(nanoarrow::basic_array_stream(list(), schema = x_schema))
}

# Full slice doesn't need slicing logic
offsets <- attr(x, "offsets")
batches <- attr(x, "chunks")
if (slice[1] == 1 && slice[2] == max(offsets)) {
return(
nanoarrow::basic_array_stream(
batches,
schema = x_schema,
validate = FALSE
)
)
}

# Calculate first and last slice information
first_index <- slice[1] - 1L
end_index <- first_index + slice[2]
last_index <- end_index - 1L
first_chunk_index <- vctr_resolve_chunk(first_index, offsets)
last_chunk_index <- vctr_resolve_chunk(last_index, offsets)

first_chunk_offset <- first_index - offsets[first_chunk_index + 1L]
first_chunk_length <- offsets[first_chunk_index + 2L] - first_index
last_chunk_offset <- 0L
last_chunk_length <- end_index - offsets[last_chunk_index + 1L]

# Calculate first and last slices
if (first_chunk_index == last_chunk_index) {
batch <- vctr_array_slice(
batches[[first_chunk_index + 1L]],
first_chunk_offset,
last_chunk_length - first_chunk_offset
)

return(
nanoarrow::basic_array_stream(
list(batch),
schema = x_schema,
validate = FALSE
)
)
}

batch1 <- vctr_array_slice(
batches[[first_chunk_index + 1L]],
first_chunk_offset,
first_chunk_length
)

batchn <- vctr_array_slice(
batches[[last_chunk_index + 1L]],
last_chunk_offset,
last_chunk_length
)

seq_mid <- seq_len(last_chunk_index - first_chunk_index - 1)
batch_mid <- batches[first_chunk_index + seq_mid]

nanoarrow::basic_array_stream(
c(
list(batch1),
batch_mid,
list(batchn)
),
schema = x_schema,
validate = FALSE
)
}


# Utilities for vctr methods

vctr_resolve_chunk <- function(x, offsets) {
.Call(geoarrow_c_vctr_chunk_resolve, x, offsets)
}

vctr_as_slice <- function(x) {
.Call(geoarrow_c_vctr_as_slice, x)
}

vctr_array_slice <- function(x, offset, length) {
new_offset <- x$offset + offset
new_length <- length
nanoarrow::nanoarrow_array_modify(
x,
list(offset = new_offset, length = new_length),
validate = FALSE
)
}
24 changes: 24 additions & 0 deletions r/geoarrow/R/wk-compat.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,28 @@

#' @importFrom wk wk_handle
#' @export
wk_handle.geoarrow_vctr <- function(handleable, handler, ...) {
geoarrow_handle(handleable, handler, size = length(handleable))
}

#' @importFrom wk wk_crs
#' @export
wk_crs.geoarrow_vctr <- function(x) {
parsed <- geoarrow_schema_parse(attr(x, "schema"))
if (parsed$crs_type == enum$CrsType$NONE) {
NULL
} else {
parsed$crs
}
}

#' @importFrom wk wk_is_geodesic
#' @export
wk_is_geodesic.geoarrow_vctr <- function(x) {
parsed <- geoarrow_schema_parse(attr(x, "schema"))
parsed$edge_type == enum$EdgeType$SPHERICAL
}

#' @export
as_geoarrow_array.wk_wkt <- function(x, ..., schema = NULL) {
if (!is.null(schema)) {
Expand Down
41 changes: 25 additions & 16 deletions r/geoarrow/src/r-init.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,38 @@
#include <R_ext/Rdynload.h>
#include <Rinternals.h>

SEXP geoarrow_c_handle_stream(SEXP data, SEXP handler_xptr);
SEXP geoarrow_c_make_type(SEXP geometry_type_sexp, SEXP dimensions_sexp,
SEXP coord_type_sexp);
SEXP geoarrow_c_schema_init_extension(SEXP schema_xptr, SEXP type_sexp);
SEXP geoarrow_c_writer_new(SEXP schema_xptr, SEXP array_out_xptr);
SEXP geoarrow_c_schema_parse(SEXP schema_xptr, SEXP extension_name_sexp);
SEXP geoarrow_c_kernel(SEXP kernel_name_sexp, SEXP arg_types_sexp, SEXP options_sexp,
SEXP schema_out_xptr);
SEXP geoarrow_c_kernel_push(SEXP kernel_xptr, SEXP args_sexp, SEXP array_out_xptr);
SEXP geoarrow_c_kernel_finish(SEXP kernel_xptr, SEXP array_out_xptr);
SEXP geoarrow_c_as_nanoarrow_array_sfc(SEXP sfc, SEXP schema_xptr, SEXP array_xptr);
/* generated by tools/make-callentries.R */
extern SEXP geoarrow_c_kernel(SEXP kernel_name_sexp, SEXP arg_types_sexp,
SEXP options_sexp, SEXP schema_out_xptr);
extern SEXP geoarrow_c_kernel_push(SEXP kernel_xptr, SEXP args_sexp, SEXP array_out_xptr);
extern SEXP geoarrow_c_kernel_finish(SEXP kernel_xptr, SEXP array_out_xptr);
extern SEXP geoarrow_c_as_nanoarrow_array_sfc(SEXP sfc, SEXP schema_xptr,
SEXP array_xptr);
extern SEXP geoarrow_c_make_type(SEXP geometry_type_sexp, SEXP dimensions_sexp,
SEXP coord_type_sexp);
extern SEXP geoarrow_c_schema_init_extension(SEXP schema_xptr, SEXP type_sexp);
extern SEXP geoarrow_c_schema_parse(SEXP schema_xptr, SEXP extension_name_sexp);
extern SEXP geoarrow_c_vctr_chunk_offsets(SEXP array_list);
extern SEXP geoarrow_c_vctr_chunk_resolve(SEXP indices_sexp, SEXP offsets_sexp);
extern SEXP geoarrow_c_vctr_as_slice(SEXP indices_sexp);
extern SEXP geoarrow_c_handle_stream(SEXP data, SEXP handler_xptr);
extern SEXP geoarrow_c_writer_new(SEXP schema_xptr, SEXP array_out_xptr);

static const R_CallMethodDef CallEntries[] = {
{"geoarrow_c_handle_stream", (DL_FUNC)&geoarrow_c_handle_stream, 2},
{"geoarrow_c_make_type", (DL_FUNC)&geoarrow_c_make_type, 3},
{"geoarrow_c_schema_init_extension", (DL_FUNC)&geoarrow_c_schema_init_extension, 2},
{"geoarrow_c_writer_new", (DL_FUNC)&geoarrow_c_writer_new, 2},
{"geoarrow_c_schema_parse", (DL_FUNC)&geoarrow_c_schema_parse, 2},
{"geoarrow_c_kernel", (DL_FUNC)&geoarrow_c_kernel, 4},
{"geoarrow_c_kernel_push", (DL_FUNC)&geoarrow_c_kernel_push, 3},
{"geoarrow_c_kernel_finish", (DL_FUNC)&geoarrow_c_kernel_finish, 2},
{"geoarrow_c_as_nanoarrow_array_sfc", (DL_FUNC)&geoarrow_c_as_nanoarrow_array_sfc, 3},
{"geoarrow_c_make_type", (DL_FUNC)&geoarrow_c_make_type, 3},
{"geoarrow_c_schema_init_extension", (DL_FUNC)&geoarrow_c_schema_init_extension, 2},
{"geoarrow_c_schema_parse", (DL_FUNC)&geoarrow_c_schema_parse, 2},
{"geoarrow_c_vctr_chunk_offsets", (DL_FUNC)&geoarrow_c_vctr_chunk_offsets, 1},
{"geoarrow_c_vctr_chunk_resolve", (DL_FUNC)&geoarrow_c_vctr_chunk_resolve, 2},
{"geoarrow_c_vctr_as_slice", (DL_FUNC)&geoarrow_c_vctr_as_slice, 1},
{"geoarrow_c_handle_stream", (DL_FUNC)&geoarrow_c_handle_stream, 2},
{"geoarrow_c_writer_new", (DL_FUNC)&geoarrow_c_writer_new, 2},
{NULL, NULL, 0}};
/* end generated by tools/make-callentries.R */

void R_init_geoarrow(DllInfo* dll) {
R_registerRoutines(dll, NULL, CallEntries, NULL, NULL);
Expand Down
115 changes: 115 additions & 0 deletions r/geoarrow/src/r-vctr.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@

#define R_NO_REMAP
#include <R.h>
#include <Rinternals.h>

#include "geoarrow.h"

SEXP geoarrow_c_vctr_chunk_offsets(SEXP array_list) {
int num_chunks = Rf_length(array_list);
SEXP offsets_sexp = PROTECT(Rf_allocVector(INTSXP, num_chunks + 1));
int* offsets = INTEGER(offsets_sexp);
offsets[0] = 0;
int64_t cumulative_offset = 0;

struct ArrowArray* array;
for (int i = 0; i < num_chunks; i++) {
array = (struct ArrowArray*)R_ExternalPtrAddr(VECTOR_ELT(array_list, i));
cumulative_offset += array->length;
if (cumulative_offset > INT_MAX) {
Rf_error("Can't build geoarrow_vctr with length > INT_MAX");
}

offsets[i + 1] = cumulative_offset;
}

UNPROTECT(1);
return offsets_sexp;
}

static int resolve_chunk(int* sorted_offsets, int index, int start_offset_i,
int end_offset_i) {
if (start_offset_i >= (end_offset_i - 1)) {
return start_offset_i;
}

int mid_offset_i = start_offset_i + (end_offset_i - start_offset_i) / 2;
int mid_index = sorted_offsets[mid_offset_i];
if (index < mid_index) {
return resolve_chunk(sorted_offsets, index, start_offset_i, mid_offset_i);
} else {
return resolve_chunk(sorted_offsets, index, mid_offset_i, end_offset_i);
}
}

SEXP geoarrow_c_vctr_chunk_resolve(SEXP indices_sexp, SEXP offsets_sexp) {
int* offsets = INTEGER(offsets_sexp);
int end_offset_i = Rf_length(offsets_sexp) - 1;
int last_offset = offsets[end_offset_i];

int n = Rf_length(indices_sexp);
SEXP chunk_indices_sexp = PROTECT(Rf_allocVector(INTSXP, n));
int* chunk_indices = INTEGER(chunk_indices_sexp);

int buf[1024];
for (int i = 0; i < n; i++) {
if (i % 1024 == 0) {
INTEGER_GET_REGION(indices_sexp, i, 1024, buf);
}
int index0 = buf[i % 1024];

if (index0 < 0 || index0 > last_offset) {
chunk_indices[i] = NA_INTEGER;
} else {
chunk_indices[i] = resolve_chunk(offsets, index0, 0, end_offset_i);
}
}

UNPROTECT(1);
return chunk_indices_sexp;
}

SEXP geoarrow_c_vctr_as_slice(SEXP indices_sexp) {
if (TYPEOF(indices_sexp) != INTSXP) {
return R_NilValue;
}
SEXP slice_sexp = PROTECT(Rf_allocVector(INTSXP, 2));
int* slice = INTEGER(slice_sexp);

int n = Rf_length(indices_sexp);
slice[1] = n;

if (n == 1) {
slice[0] = INTEGER_ELT(indices_sexp, 0);
UNPROTECT(1);
return slice_sexp;
} else if (n == 0) {
slice[0] = NA_INTEGER;
UNPROTECT(1);
return slice_sexp;
}

int buf[1024];
INTEGER_GET_REGION(indices_sexp, 0, 1024, buf);
slice[0] = buf[0];

int last_value = buf[0];
int this_value = 0;

for (int i = 1; i < n; i++) {
if (i % 1024 == 0) {
INTEGER_GET_REGION(indices_sexp, i, 1024, buf);
}

this_value = buf[i % 1024];
if ((this_value - last_value) != 1) {
UNPROTECT(1);
return R_NilValue;
}

last_value = this_value;
}

UNPROTECT(1);
return slice_sexp;
}
Loading

0 comments on commit 7ae1089

Please sign in to comment.