Skip to content

Commit

Permalink
Connection Management and Arrow (#165)
Browse files Browse the repository at this point in the history
* Add GitHub links to DESCRIPTION

* Editing ojo_tbl

* WIP

* Edit ojo_tbl print method

* Replace stringr functions with stringi equivalents

* Remove caching logic

* Fix argument to withr call

* Replace tests

* Connection management fixed and tests updated to accurately validate

* Adds support and test for arrow

* Build docs

* Fix roxygen md docs; add lifecycle experimental badge to .source argument on ojo_tbl

* Remove arrow test because it is unstable; remove ojo_tbl print method

* Accepted new test snapshots

* Update progress bar to be current and run in caller env

* Add internal keyword to documentation for tbl_from functions

* re-ran tests with github actions credentials so they will pass

---------

Co-authored-by: andrewjbe <56839927+andrewjbe@users.noreply.github.com>
  • Loading branch information
brancengregory and andrewjbe authored May 21, 2024
1 parent 930a392 commit 8775b8b
Show file tree
Hide file tree
Showing 31 changed files with 423 additions and 4,230 deletions.
9 changes: 7 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,19 @@ Authors@R: c(
))
Description: {ojodb} provides convenient functions to query court data from the Open Justice Oklahoma database.
Imports:
arrow,
cli,
dbplyr,
DBI,
ggplot2,
glue,
lifecycle,
lubridate,
pool,
purrr,
rlang,
RPostgres,
stringi,
stringr,
tibble,
tidyr,
Expand All @@ -53,7 +56,6 @@ Suggests:
janitor,
knitr,
languageserver,
lifecycle,
lintr,
readr,
roxygen2,
Expand All @@ -69,7 +71,10 @@ Depends:
License: GPL (>= 3)
Encoding: UTF-8
LazyData: true
RoxygenNote: 7.2.3
RoxygenNote: 7.3.1
VignetteBuilder: knitr
Config/testthat/edition: 3
Config/testthat/parallel: true
URL: https://github.com/openjusticeok/ojodb
BugReports: https://github.com/openjusticeok/ojodb/issues
Roxygen: list(markdown = TRUE)
8 changes: 4 additions & 4 deletions R/ojo_auth.R
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ ojo_auth <- function(host, port, username, password, ..., .admin = F, .overwrite
clientkey <- fs::path(home, ".postgresql/ojodb/client-key.pem")

# Check if SSL certs are in correct location; if not, throw error
if(!fs::file_exists(rootcert) |
if (!fs::file_exists(rootcert) |
!fs::file_exists(clientcert) |
!fs::file_exists(clientkey)) {
rlang::abort(
Expand Down Expand Up @@ -71,14 +71,14 @@ ojo_auth <- function(host, port, username, password, ..., .admin = F, .overwrite
# Creating the new .Renviron file (not filled out yet, all OJO variables removed)
newenv <- oldenv |>
dplyr::as_tibble() |>
dplyr::filter(!stringr::str_detect(.data$V1, "(OJO_HOST)|(OJO_PORT)|(OJO_DRIVER)|(OJO_SSL)"))
dplyr::filter(!stringi::stri_detect_regex(.data$V1, "(OJO_HOST)|(OJO_PORT)|(OJO_DRIVER)|(OJO_SSL)"))
if (.admin == T) {
newenv <- newenv |>
dplyr::filter(!stringr::str_detect(.data$V1, "(OJO_ADMIN_USER)|(OJO_ADMIN_PASS)")) |>
dplyr::filter(!stringi::stri_detect_regex(.data$V1, "(OJO_ADMIN_USER)|(OJO_ADMIN_PASS)")) |>
as.data.frame()
} else {
newenv <- newenv |>
dplyr::filter(!stringr::str_detect(.data$V1, "(OJO_DEFAULT_USER)|(OJO_DEFAULT_PASS)")) |>
dplyr::filter(!stringi::stri_detect_regex(.data$V1, "(OJO_DEFAULT_USER)|(OJO_DEFAULT_PASS)")) |>
as.data.frame()
}

Expand Down
104 changes: 42 additions & 62 deletions R/ojo_collect.R
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,22 @@
ojo_collect <- function(.data, ..., .silent = !rlang::is_interactive()) {

# Check class of `.data`
if (!inherits(.data, "tbl_lazy")) {
stop("`.data` must be a lazy tibble. Have you already collected the data?")
if (!inherits(.data, c("tbl_lazy", "tbl_Pool", "tbl_dbi"))) {
rlang::abort("`.data` must be a lazy tibble created with `ojo_connect()`, `pool::dbPool()`, or `DBI::dbConnect()`.")
}

# First check that `.data` is a lazy tibble created with `ojo_connect()`, `pool::dbPool()`, or `DBI::dbConnect()`
if (!inherits(.data, "tbl_Pool") && !inherits(.data, "tbl_dbi")) {
stop("`.data` must be a lazy tibble created with `ojo_connect`, `pool::dbPool`, `DBI::dbConnect`. Have you already collected the data?")
}

# Extract the connection from the lazy tibble
# Extract the database connection
.con <- dbplyr::remote_con(.data)

# If the user is using `pool`, we need to check out a connection from the pool locally
# `pool::localCheckout()` will return the connection to the pool when the function exits
if (inherits(.data, "tbl_Pool")) {
.con <- .con |> pool::localCheckout()
.con <- pool::localCheckout(.con)
}

# Check connection validity
# Ensure the connection is valid
if (!DBI::dbIsValid(.con)) {
stop("The connection to the OJO database is no longer valid. Please reconnect to the database using `ojo_connect()`.")
rlang::abort("The connection to the OJO database is no longer valid. Please reconnect to the database using `ojo_connect()`.")
}

# First UI chunk
# CLI output
if (!.silent) {
con_desc <- dbplyr::db_connection_describe(.con) |>
gsub(pattern = "postgres", replacement = "") |>
Expand All @@ -69,75 +61,63 @@ ojo_collect <- function(.data, ..., .silent = !rlang::is_interactive()) {
msg_failed = "Something went wrong sending your query to the database! Please check your connection.")
}

# Get n rows in request results
t_0 <- Sys.time() # Timer start
# Estimate number of results and warn about potential issues
n_results <- estimate_results(.data, .silent)

# Collect data
res <- collect_data(.con, .data, n_results, .silent)

return(res)
}


estimate_results <- function(.data, .silent) {
if ("n" %in% names(.data)) {
rlang::warn(
"The tbl you are requesting has a variable named `n`. This can cause issues with progress bar rendering and status updates. This message won't be shown again this session.",
.frequency = "once",
.frequency_id = "ojo_collect_n_warning"
)
rlang::warn("The tbl you are requesting has a variable named `n`. This might cause issues with progress bar rendering.",
.frequency = "once", .frequency_id = "ojo_collect_n_warning")
}

n_results <- .data |>
dplyr::ungroup() |>
dplyr::tally() |>
dplyr::pull(var = .data$n)

t_1 <- Sys.time() # Timer end
dplyr::pull(n = n)

# Second UI chunk
if (!.silent) {
cli::cli_progress_step(msg = paste0("Found ", format(n_results, big.mark = ","), " matching results!"),
msg_failed = "Something went wrong downloading your data from the database!")

# Warning if it took more than 20 seconds
if (difftime(t_1, t_0, units = "secs") > 20) {
cli::cli_alert_warning("If the previous step took too long for your query, you can skip these progress updates by setting `.silent = TRUE`")
}
cli::cli_progress_step(msg = paste0("Found ", format(n_results, big.mark = ","), " matching results!"))
}

# Translate query back to SQL
query <- dbplyr::sql_render(.data)
return(n_results)
}

# Make initial db request
collect_data <- function(.con, .data, n_results, .silent) {
query <- dbplyr::sql_render(.data)
req <- DBI::dbSendQuery(.con, query)
withr::defer(DBI::dbClearResult(req))
withr::defer(DBI::dbClearResult(req), envir = parent.frame())

# Downloading from request
res <- NULL
chunk_size <- max(round(n_results / 100, 0), 1000) # Download in chunks of 1% or at least 1000

chunk_size <- round(n_results / 100, 0) # This way we're downloading in chunks of 1% of the total or 1000, whichever is bigger;
if (chunk_size < 1000) { chunk_size <- 1000 } # might help with large queries

cli::cli_progress_bar(
name = "dl_pb",
format = "\u001b[34m{cli::symbol$info}\u001b[0m Downloading data... {cli::pb_bar} {cli::pb_percent} | ~{cli::pb_eta} remaining...",
format_done = "\u001b[32m{cli::symbol$tick}\u001b[0m Data retrieved successfully! {cli::pb_bar} {cli::pb_percent} | {.grayed [{cli::pb_elapsed}]}",
type = "iterator",
total = n_results,
clear = FALSE
)
if (!.silent) {
init_progress_bar(n_results)
}

while (TRUE) {
# Fetch the next chunk of data
while (!DBI::dbHasCompleted(req)) {
chunk <- DBI::dbFetch(req, n = chunk_size)

# Combine the data with previous chunks
res <- rbind(res, tibble::as_tibble(chunk))

# update pb
cli::cli_progress_update(set = nrow(res))

# Break if we've reached the end of the results
if (DBI::dbHasCompleted(req)) {
break
if (!.silent) {
cli::cli_progress_update(set = nrow(res))
}
}

# Terminate pb
cli::cli_progress_done()

return(res)
}

init_progress_bar <- function(n_results) {
cli::cli_progress_bar(
type = "iterator",
total = n_results,
clear = FALSE,
.envir = rlang::caller_env()
)
}
74 changes: 34 additions & 40 deletions R/ojo_connect.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,32 @@
#' Opens a connection to the Open Justice Oklahoma database using credentials stored in the .Renviron file.
#' If no credentials exist, prompts for user, password, and host name and provides instructions to store them for future sessions.
#'
#' @param .admin A logical value indicating whether to connect to the database as an administrator
#' @param ... Placeholder
#' @param .global A logical value indicating whether to establish the connection in the global environment or not.
#' @param .env The environment in which you want the connection stored
#' @param .pool A logical value indicating whether to use a connection pool from the `{pool}` package, or not. If `.global = TRUE` and a connection object already exists, this argument is ignored regardless of the connection type.
#' @param .admin A logical value indicating whether to connect to the database as an administrator.
#' @param ... Placeholder.
#' @param .global Deprecated. A connection will always be created in the specified environment, or in the package environment by default.
#' @param .env The environment in which you want the connection stored.
#' @param .pool A logical value indicating whether to use a connection pool from the `{pool}` package, or not.
#'
#' @export
#' @returns A database connection object created with `RPostgres::Postgres()` and either `pool::dbPool` or `DBI::dbConnect`
#' @returns A database connection object created with `RPostgres::Postgres()` and either `pool::dbPool` or `DBI::dbConnect`
#'
#' @examples
#' \dontrun{
#' ojo_connect()
#' }
#' @section Side Effects:
#' If either the `.global` argument or `rlang::is_interactive` are `TRUE`, a connection object (named `ojo_con` or `ojo_pool` depending on the `.pool` argument) is created in the package environment.
#' A connection object (named `ojo_con` or `ojo_pool` depending on the `.pool` argument) is created in the package environment.
#'
#' @seealso ojo_auth()
#'
ojo_connect <- function(..., .admin = FALSE, .global = rlang::is_interactive(), .env = ojo_env(), .pool = FALSE) {
ojo_connect <- function(..., .admin = FALSE, .global = lifecycle::deprecated(), .env = ojo_env(), .pool = FALSE) {

if (lifecycle::is_present(.global)) {
lifecycle::deprecate_warn(
when = "2.8.0",
what = "ojo_connect(.global)"
)
}

user_type <- if (.admin) "ADMIN" else "DEFAULT"

Expand All @@ -35,20 +42,14 @@ ojo_connect <- function(..., .admin = FALSE, .global = rlang::is_interactive(),
)
}

# Check if object with correct connection type and user already exists and is valid
if (.global) {
global_conn <- try(
get_connection_object(env = .env),
silent = TRUE
)
connection_type <- if (.pool) "ojo_pool" else "ojo_con"

if (!inherits(global_conn, "try-error") && !is.null(global_conn)) {
return(global_conn)
}
# Check if a valid connection object already exists in the environment
existing_conn <- get_connection_object(.env)
if (!is.null(existing_conn) && DBI::dbIsValid(existing_conn)) {
return(existing_conn)
}

connection_type <- if (.pool) "ojo_pool" else "ojo_con"

conn_args <- list(
drv = RPostgres::Postgres(),
dbname = "ojodb",
Expand All @@ -71,27 +72,20 @@ ojo_connect <- function(..., .admin = FALSE, .global = rlang::is_interactive(),
ojo_con = DBI::dbConnect
)

conn <- rlang::exec(conn_fn, !!!conn_args)

if (.global) {
assign(connection_type, conn, envir = .env)
withr::defer(
{
if (exists(connection_type, envir = .env)) {
connection_object <- get(connection_type, envir = .env, inherits = FALSE)
if (.pool) {
pool::poolClose(connection_object)
} else {
DBI::dbDisconnect(connection_object)
}
new_conn <- rlang::exec(conn_fn, !!!conn_args)
assign(connection_type, new_conn, envir = .env)

rm(list = connection_type, envir = .env)
}
},
envir = .env
)
invisible()
}
withr::defer({
if (exists(connection_type, envir = .env)) {
connection_object <- get(connection_type, envir = .env, inherits = FALSE)
if (.pool) {
pool::poolClose(connection_object)
} else {
DBI::dbDisconnect(connection_object)
}
rm(list = connection_type, envir = .env)
}
}, envir = .env)

invisible(conn)
return(new_conn)
}
6 changes: 3 additions & 3 deletions R/ojo_county_population.R
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ ojo_county_population <- function(years) {
d <- d |>
dplyr::mutate(
court = .data$NAME |>
stringr::str_remove(" County.*") |>
stringr::str_to_upper() |>
stringr::str_remove_all(" ")
stringi::stri_replace_all_regex(" County.*", "") |>
stringi::stri_trans_toupper() |>
stringi::stri_replace_all_regex(" ", "")
) |>
dplyr::select(.data$court, .data$year, pop = .data$value)

Expand Down
3 changes: 2 additions & 1 deletion R/ojo_list_vars.R
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ ojo_list_vars <- function(table, schema = "public", ..., .con = NULL) {
.data$table_name == table
) |>
dplyr::select(column_name) |>
dplyr::arrange(.data$column_name)
collect() |>
dplyr::arrange(column_name)
}
Loading

0 comments on commit 8775b8b

Please sign in to comment.