Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection Management and Arrow #165

Merged
merged 22 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
3e60cf8
Add GitHub links to DESCRIPTION
brancengregory Oct 12, 2023
d1e331a
Editing ojo_tbl
brancengregory Feb 10, 2024
a4c15e3
WIP
brancengregory Feb 10, 2024
f948636
Edit ojo_tbl print method
brancengregory Feb 16, 2024
8525ca4
Replace stringr functions with stringi equivalents
brancengregory Apr 2, 2024
d07de25
Merge branch 'main' into arrow
brancengregory Apr 13, 2024
f0d4307
Merge branch 'main' into devtools-github
brancengregory Apr 13, 2024
5a961af
Remove caching logic
brancengregory Apr 13, 2024
d58f1f9
Fix argument to withr call
brancengregory Apr 14, 2024
cb9db58
Replace tests
brancengregory Apr 15, 2024
490731b
Connection management fixed and tests updated to accurately validate
brancengregory Apr 15, 2024
db47ecd
Merge branch 'arrow' into dev
brancengregory Apr 15, 2024
61a65b3
Adds support and test for arrow
brancengregory Apr 15, 2024
272bfc3
Build docs
brancengregory Apr 15, 2024
188fe0b
Replaces stringr calls with slightly faster stringi calls
brancengregory Apr 15, 2024
9783c19
Merge remote-tracking branch 'origin/devtools-github' into dev
brancengregory Apr 15, 2024
df23a5f
Fix roxygen md docs; add lifecycle experimental badge to .source argu…
brancengregory Apr 15, 2024
97c5b7e
Remove arrow test because it is unstable; remove ojo_tbl print method
brancengregory Apr 15, 2024
b7fbfa6
Accepted new test snapshots
andrewjbe Apr 15, 2024
2bf2f44
Update progress bar to be current and run in caller env
brancengregory Apr 15, 2024
79c02aa
Add internal keyword to documentation for tbl_from functions
brancengregory Apr 15, 2024
b89f922
re-ran tests with github actions credentials so they will pass
andrewjbe May 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,19 @@ Authors@R: c(
))
Description: {ojodb} provides convenient functions to query court data from the Open Justice Oklahoma database.
Imports:
arrow,
cli,
dbplyr,
DBI,
ggplot2,
glue,
lifecycle,
lubridate,
pool,
purrr,
rlang,
RPostgres,
stringi,
stringr,
tibble,
tidyr,
Expand All @@ -53,7 +56,6 @@ Suggests:
janitor,
knitr,
languageserver,
lifecycle,
lintr,
readr,
roxygen2,
Expand All @@ -69,7 +71,10 @@ Depends:
License: GPL (>= 3)
Encoding: UTF-8
LazyData: true
RoxygenNote: 7.2.3
RoxygenNote: 7.3.1
VignetteBuilder: knitr
Config/testthat/edition: 3
Config/testthat/parallel: true
URL: https://github.com/openjusticeok/ojodb
BugReports: https://github.com/openjusticeok/ojodb/issues
Roxygen: list(markdown = TRUE)
8 changes: 4 additions & 4 deletions R/ojo_auth.R
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ ojo_auth <- function(host, port, username, password, ..., .admin = F, .overwrite
clientkey <- fs::path(home, ".postgresql/ojodb/client-key.pem")

# Check if SSL certs are in correct location; if not, throw error
if(!fs::file_exists(rootcert) |
if (!fs::file_exists(rootcert) |
!fs::file_exists(clientcert) |
!fs::file_exists(clientkey)) {
rlang::abort(
Expand Down Expand Up @@ -71,14 +71,14 @@ ojo_auth <- function(host, port, username, password, ..., .admin = F, .overwrite
# Creating the new .Renviron file (not filled out yet, all OJO variables removed)
newenv <- oldenv |>
dplyr::as_tibble() |>
dplyr::filter(!stringr::str_detect(.data$V1, "(OJO_HOST)|(OJO_PORT)|(OJO_DRIVER)|(OJO_SSL)"))
dplyr::filter(!stringi::stri_detect_regex(.data$V1, "(OJO_HOST)|(OJO_PORT)|(OJO_DRIVER)|(OJO_SSL)"))
if (.admin == T) {
newenv <- newenv |>
dplyr::filter(!stringr::str_detect(.data$V1, "(OJO_ADMIN_USER)|(OJO_ADMIN_PASS)")) |>
dplyr::filter(!stringi::stri_detect_regex(.data$V1, "(OJO_ADMIN_USER)|(OJO_ADMIN_PASS)")) |>
as.data.frame()
} else {
newenv <- newenv |>
dplyr::filter(!stringr::str_detect(.data$V1, "(OJO_DEFAULT_USER)|(OJO_DEFAULT_PASS)")) |>
dplyr::filter(!stringi::stri_detect_regex(.data$V1, "(OJO_DEFAULT_USER)|(OJO_DEFAULT_PASS)")) |>
as.data.frame()
}

Expand Down
104 changes: 42 additions & 62 deletions R/ojo_collect.R
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,22 @@
ojo_collect <- function(.data, ..., .silent = !rlang::is_interactive()) {

# Check class of `.data`
if (!inherits(.data, "tbl_lazy")) {
stop("`.data` must be a lazy tibble. Have you already collected the data?")
if (!inherits(.data, c("tbl_lazy", "tbl_Pool", "tbl_dbi"))) {
rlang::abort("`.data` must be a lazy tibble created with `ojo_connect()`, `pool::dbPool()`, or `DBI::dbConnect()`.")
}

# First check that `.data` is a lazy tibble created with `ojo_connect()`, `pool::dbPool()`, or `DBI::dbConnect()`
if (!inherits(.data, "tbl_Pool") && !inherits(.data, "tbl_dbi")) {
stop("`.data` must be a lazy tibble created with `ojo_connect`, `pool::dbPool`, `DBI::dbConnect`. Have you already collected the data?")
}

# Extract the connection from the lazy tibble
# Extract the database connection
.con <- dbplyr::remote_con(.data)

# If the user is using `pool`, we need to check out a connection from the pool locally
# `pool::localCheckout()` will return the connection to the pool when the function exits
if (inherits(.data, "tbl_Pool")) {
.con <- .con |> pool::localCheckout()
.con <- pool::localCheckout(.con)
}

# Check connection validity
# Ensure the connection is valid
if (!DBI::dbIsValid(.con)) {
stop("The connection to the OJO database is no longer valid. Please reconnect to the database using `ojo_connect()`.")
rlang::abort("The connection to the OJO database is no longer valid. Please reconnect to the database using `ojo_connect()`.")
}

# First UI chunk
# CLI output
if (!.silent) {
con_desc <- dbplyr::db_connection_describe(.con) |>
gsub(pattern = "postgres", replacement = "") |>
Expand All @@ -69,75 +61,63 @@ ojo_collect <- function(.data, ..., .silent = !rlang::is_interactive()) {
msg_failed = "Something went wrong sending your query to the database! Please check your connection.")
}

# Get n rows in request results
t_0 <- Sys.time() # Timer start
# Estimate number of results and warn about potential issues
n_results <- estimate_results(.data, .silent)

# Collect data
res <- collect_data(.con, .data, n_results, .silent)

return(res)
}


estimate_results <- function(.data, .silent) {
if ("n" %in% names(.data)) {
rlang::warn(
"The tbl you are requesting has a variable named `n`. This can cause issues with progress bar rendering and status updates. This message won't be shown again this session.",
.frequency = "once",
.frequency_id = "ojo_collect_n_warning"
)
rlang::warn("The tbl you are requesting has a variable named `n`. This might cause issues with progress bar rendering.",
.frequency = "once", .frequency_id = "ojo_collect_n_warning")
}

n_results <- .data |>
dplyr::ungroup() |>
dplyr::tally() |>
dplyr::pull(var = .data$n)

t_1 <- Sys.time() # Timer end
dplyr::pull(n = n)

# Second UI chunk
if (!.silent) {
cli::cli_progress_step(msg = paste0("Found ", format(n_results, big.mark = ","), " matching results!"),
msg_failed = "Something went wrong downloading your data from the database!")

# Warning if it took more than 20 seconds
if (difftime(t_1, t_0, units = "secs") > 20) {
cli::cli_alert_warning("If the previous step took too long for your query, you can skip these progress updates by setting `.silent = TRUE`")
}
cli::cli_progress_step(msg = paste0("Found ", format(n_results, big.mark = ","), " matching results!"))
}

# Translate query back to SQL
query <- dbplyr::sql_render(.data)
return(n_results)
}

# Make initial db request
collect_data <- function(.con, .data, n_results, .silent) {
query <- dbplyr::sql_render(.data)
req <- DBI::dbSendQuery(.con, query)
withr::defer(DBI::dbClearResult(req))
withr::defer(DBI::dbClearResult(req), envir = parent.frame())

# Downloading from request
res <- NULL
chunk_size <- max(round(n_results / 100, 0), 1000) # Download in chunks of 1% or at least 1000

chunk_size <- round(n_results / 100, 0) # This way we're downloading in chunks of 1% of the total or 1000, whichever is bigger;
if (chunk_size < 1000) { chunk_size <- 1000 } # might help with large queries

cli::cli_progress_bar(
name = "dl_pb",
format = "\u001b[34m{cli::symbol$info}\u001b[0m Downloading data... {cli::pb_bar} {cli::pb_percent} | ~{cli::pb_eta} remaining...",
format_done = "\u001b[32m{cli::symbol$tick}\u001b[0m Data retrieved successfully! {cli::pb_bar} {cli::pb_percent} | {.grayed [{cli::pb_elapsed}]}",
type = "iterator",
total = n_results,
clear = FALSE
)
if (!.silent) {
init_progress_bar(n_results)
}

while (TRUE) {
# Fetch the next chunk of data
while (!DBI::dbHasCompleted(req)) {
chunk <- DBI::dbFetch(req, n = chunk_size)

# Combine the data with previous chunks
res <- rbind(res, tibble::as_tibble(chunk))

# update pb
cli::cli_progress_update(set = nrow(res))

# Break if we've reached the end of the results
if (DBI::dbHasCompleted(req)) {
break
if (!.silent) {
cli::cli_progress_update(set = nrow(res))
}
}

# Terminate pb
cli::cli_progress_done()

return(res)
}

init_progress_bar <- function(n_results) {
cli::cli_progress_bar(
type = "iterator",
total = n_results,
clear = FALSE,
.envir = rlang::caller_env()
)
}
74 changes: 34 additions & 40 deletions R/ojo_connect.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,32 @@
#' Opens a connection to the Open Justice Oklahoma database using credentials stored in the .Renviron file.
#' If no credentials exist, prompts for user, password, and host name and provides instructions to store them for future sessions.
#'
#' @param .admin A logical value indicating whether to connect to the database as an administrator
#' @param ... Placeholder
#' @param .global A logical value indicating whether to establish the connection in the global environment or not.
#' @param .env The environment in which you want the connection stored
#' @param .pool A logical value indicating whether to use a connection pool from the `{pool}` package, or not. If `.global = TRUE` and a connection object already exists, this argument is ignored regardless of the connection type.
#' @param .admin A logical value indicating whether to connect to the database as an administrator.
#' @param ... Placeholder.
#' @param .global Deprecated. A connection will always be created in the specified environment, or in the package environment by default.
#' @param .env The environment in which you want the connection stored.
#' @param .pool A logical value indicating whether to use a connection pool from the `{pool}` package, or not.
#'
#' @export
#' @returns A database connection object created with `RPostgres::Postgres()` and either `pool::dbPool` or `DBI::dbConnect`
#' @returns A database connection object created with `RPostgres::Postgres()` and either `pool::dbPool` or `DBI::dbConnect`
#'
#' @examples
#' \dontrun{
#' ojo_connect()
#' }
#' @section Side Effects:
#' If either the `.global` argument or `rlang::is_interactive` are `TRUE`, a connection object (named `ojo_con` or `ojo_pool` depending on the `.pool` argument) is created in the package environment.
#' A connection object (named `ojo_con` or `ojo_pool` depending on the `.pool` argument) is created in the package environment.
#'
#' @seealso ojo_auth()
#'
ojo_connect <- function(..., .admin = FALSE, .global = rlang::is_interactive(), .env = ojo_env(), .pool = FALSE) {
ojo_connect <- function(..., .admin = FALSE, .global = lifecycle::deprecated(), .env = ojo_env(), .pool = FALSE) {

if (lifecycle::is_present(.global)) {
lifecycle::deprecate_warn(
when = "2.8.0",
what = "ojo_connect(.global)"
)
}

user_type <- if (.admin) "ADMIN" else "DEFAULT"

Expand All @@ -35,20 +42,14 @@ ojo_connect <- function(..., .admin = FALSE, .global = rlang::is_interactive(),
)
}

# Check if object with correct connection type and user already exists and is valid
if (.global) {
global_conn <- try(
get_connection_object(env = .env),
silent = TRUE
)
connection_type <- if (.pool) "ojo_pool" else "ojo_con"

if (!inherits(global_conn, "try-error") && !is.null(global_conn)) {
return(global_conn)
}
# Check if a valid connection object already exists in the environment
existing_conn <- get_connection_object(.env)
if (!is.null(existing_conn) && DBI::dbIsValid(existing_conn)) {
return(existing_conn)
}

connection_type <- if (.pool) "ojo_pool" else "ojo_con"

conn_args <- list(
drv = RPostgres::Postgres(),
dbname = "ojodb",
Expand All @@ -71,27 +72,20 @@ ojo_connect <- function(..., .admin = FALSE, .global = rlang::is_interactive(),
ojo_con = DBI::dbConnect
)

conn <- rlang::exec(conn_fn, !!!conn_args)

if (.global) {
assign(connection_type, conn, envir = .env)
withr::defer(
{
if (exists(connection_type, envir = .env)) {
connection_object <- get(connection_type, envir = .env, inherits = FALSE)
if (.pool) {
pool::poolClose(connection_object)
} else {
DBI::dbDisconnect(connection_object)
}
new_conn <- rlang::exec(conn_fn, !!!conn_args)
assign(connection_type, new_conn, envir = .env)

rm(list = connection_type, envir = .env)
}
},
envir = .env
)
invisible()
}
withr::defer({
if (exists(connection_type, envir = .env)) {
connection_object <- get(connection_type, envir = .env, inherits = FALSE)
if (.pool) {
pool::poolClose(connection_object)
} else {
DBI::dbDisconnect(connection_object)
}
rm(list = connection_type, envir = .env)
}
}, envir = .env)

invisible(conn)
return(new_conn)
}
6 changes: 3 additions & 3 deletions R/ojo_county_population.R
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ ojo_county_population <- function(years) {
d <- d |>
dplyr::mutate(
court = .data$NAME |>
stringr::str_remove(" County.*") |>
stringr::str_to_upper() |>
stringr::str_remove_all(" ")
stringi::stri_replace_all_regex(" County.*", "") |>
stringi::stri_trans_toupper() |>
stringi::stri_replace_all_regex(" ", "")
) |>
dplyr::select(.data$court, .data$year, pop = .data$value)

Expand Down
3 changes: 2 additions & 1 deletion R/ojo_list_vars.R
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ ojo_list_vars <- function(table, schema = "public", ..., .con = NULL) {
.data$table_name == table
) |>
dplyr::select(column_name) |>
dplyr::arrange(.data$column_name)
collect() |>
dplyr::arrange(column_name)
}
Loading
Loading