Skip to content

Commit

Permalink
Use ADBC for Google BigQuery and update Athena code (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonatanklosko authored Jan 8, 2025
1 parent ff88a99 commit 62a1989
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 210 deletions.
5 changes: 3 additions & 2 deletions lib/assets/connection_cell/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -691,8 +691,9 @@ export function init(ctx, info) {
const reader = new FileReader();

reader.onload = (res) => {
const value = JSON.parse(res.target.result);
ctx.pushEvent("update_field", { field: "credentials", value });
// Reformat the JSON into a compact form
const value = JSON.stringify(JSON.parse(res.target.result));
ctx.pushEvent("update_field", { field: "credentials_json", value });
};

reader.readAsText(file);
Expand Down
126 changes: 45 additions & 81 deletions lib/kino_db/connection_cell.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ defmodule KinoDB.ConnectionCell do
"database" => attrs["database"] || "",
"project_id" => attrs["project_id"] || "",
"default_dataset_id" => attrs["default_dataset_id"] || "",
"credentials" => attrs["credentials"] || %{},
"credentials_json" => attrs["credentials_json"] || "",
"access_key_id" => attrs["access_key_id"] || "",
"secret_access_key" => secret_access_key,
"use_secret_access_key_secret" =>
Expand Down Expand Up @@ -128,7 +128,7 @@ defmodule KinoDB.ConnectionCell do
~w|database_path|

"bigquery" ->
~w|project_id default_dataset_id credentials|
~w|project_id default_dataset_id credentials_json|

"athena" ->
if fields["use_secret_access_key_secret"],
Expand Down Expand Up @@ -267,6 +267,37 @@ defmodule KinoDB.ConnectionCell do
end
end

defp to_quoted(%{"type" => "bigquery"} = attrs) do
var = quoted_var(attrs["variable"])

opts =
[
driver: :bigquery,
"adbc.bigquery.sql.project_id": attrs["project_id"]
] ++
case attrs["default_dataset_id"] do
"" -> []
dataset_id -> ["adbc.bigquery.sql.dataset_id": dataset_id]
end ++
case attrs["credentials_json"] do
"" ->
[]

credentials_json ->
[
"adbc.bigquery.sql.auth_type": "adbc.bigquery.sql.auth_type.json_credential_string",
"adbc.bigquery.sql.auth_credentials":
{:sigil_S, [delimiter: ~s["""]], [{:<<>>, [], [credentials_json <> "\n"]}, []]}
]
end

quote do
:ok = Adbc.download_driver!(:bigquery)
{:ok, db} = Kino.start_child({Adbc.Database, unquote(opts)})
{:ok, unquote(var)} = Kino.start_child({Adbc.Connection, database: db})
end
end

defp to_quoted(%{"type" => "postgres"} = attrs) do
quote do
opts = unquote(trim_opts(shared_options(attrs) ++ postgres_and_mysql_options(attrs)))
Expand All @@ -291,40 +322,18 @@ defmodule KinoDB.ConnectionCell do
end
end

defp to_quoted(%{"type" => "bigquery"} = attrs) do
goth_opts_block = check_bigquery_credentials(attrs)

conn_block =
quote do
{:ok, _pid} = Kino.start_child({Goth, opts})

unquote(quoted_var(attrs["variable"])) =
Req.new(http_errors: :raise)
|> ReqBigQuery.attach(
goth: ReqBigQuery.Goth,
project_id: unquote(attrs["project_id"]),
default_dataset_id: unquote(attrs["default_dataset_id"])
)

:ok
end

join_quoted([goth_opts_block, conn_block])
end

defp to_quoted(%{"type" => "athena"} = attrs) do
quote do
unquote(quoted_var(attrs["variable"])) =
Req.new(http_errors: :raise)
|> ReqAthena.attach(
format: :explorer,
ReqAthena.new(
access_key_id: unquote(attrs["access_key_id"]),
database: unquote(attrs["database"]),
output_location: unquote(attrs["output_location"]),
region: unquote(attrs["region"]),
secret_access_key: unquote(quoted_access_key(attrs)),
token: unquote(attrs["token"]),
workgroup: unquote(attrs["workgroup"])
workgroup: unquote(attrs["workgroup"]),
http_errors: :raise
)

:ok
Expand Down Expand Up @@ -354,37 +363,6 @@ defmodule KinoDB.ConnectionCell do
end
end

defp check_bigquery_credentials(attrs) do
case attrs["credentials"] do
%{"type" => "service_account"} ->
quote do
credentials = unquote(Macro.escape(attrs["credentials"]))

opts = [
name: ReqBigQuery.Goth,
http_client: &Req.request/1,
source: {:service_account, credentials}
]
end

%{"type" => "authorized_user"} ->
quote do
credentials = unquote(Macro.escape(attrs["credentials"]))

opts = [
name: ReqBigQuery.Goth,
http_client: &Req.request/1,
source: {:refresh_token, credentials}
]
end

_empty_map ->
quote do
opts = [name: ReqBigQuery.Goth, http_client: &Req.request/1]
end
end
end

defp shared_options(attrs) do
opts = [
hostname: attrs["hostname"],
Expand Down Expand Up @@ -516,15 +494,20 @@ defmodule KinoDB.ConnectionCell do
Code.ensure_loaded?(Postgrex) -> "postgres"
Code.ensure_loaded?(MyXQL) -> "mysql"
Code.ensure_loaded?(Exqlite) -> "sqlite"
Code.ensure_loaded?(ReqBigQuery) -> "bigquery"
Code.ensure_loaded?(ReqAthena) -> "athena"
Code.ensure_loaded?(ReqCH) -> "clickhouse"
Code.ensure_loaded?(Adbc) -> "duckdb"
Code.ensure_loaded?(Adbc) -> adbc_default_db_type()
Code.ensure_loaded?(Tds) -> "sqlserver"
true -> "postgres"
end
end

defp adbc_default_db_type() do
drivers = Application.get_env(:adbc, :drivers, [])
driver = Enum.find([:duckdb, :snowflake, :bigquery], :duckdb, &(&1 in drivers))
Atom.to_string(driver)
end

defp missing_dep(%{"type" => "postgres"}) do
unless Code.ensure_loaded?(Postgrex) do
~s/{:postgrex, "~> 0.18"}/
Expand All @@ -543,20 +526,14 @@ defmodule KinoDB.ConnectionCell do
end
end

defp missing_dep(%{"type" => "bigquery"}) do
unless Code.ensure_loaded?(ReqBigQuery) do
~s|{:req_bigquery, "~> 0.1"}|
end
end

defp missing_dep(%{"type" => "athena"}) do
missing_many_deps([
{ReqAthena, ~s|{:req_athena, "~> 0.1"}|},
{ReqAthena, ~s|{:req_athena, "~> 0.3"}|},
{Explorer, ~s|{:explorer, "~> 0.10"}|}
])
end

defp missing_dep(%{"type" => adbc}) when adbc in ~w[snowflake duckdb] do
defp missing_dep(%{"type" => adbc}) when adbc in ~w[duckdb snowflake bigquery] do
unless Code.ensure_loaded?(Adbc) do
~s|{:adbc, "~> 0.3"}|
end
Expand Down Expand Up @@ -585,19 +562,6 @@ defmodule KinoDB.ConnectionCell do
end
end

defp join_quoted(quoted_blocks) do
asts =
Enum.flat_map(quoted_blocks, fn
{:__block__, _meta, nodes} -> nodes
node -> [node]
end)

case asts do
[node] -> node
nodes -> {:__block__, [], nodes}
end
end

defp help_box(%{"type" => "bigquery"}) do
if Code.ensure_loaded?(Mint.HTTP) do
if running_on_google_metadata?() do
Expand Down
84 changes: 34 additions & 50 deletions lib/kino_db/sql_cell.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ defmodule KinoDB.SQLCell do
result_variable: Kino.SmartCell.prefixed_var_name("result", attrs["result_variable"]),
query: query,
timeout: attrs["timeout"],
cache_query: attrs["cache_query"] || true,
cache_query: Map.get(attrs, "cache_query", true),
data_frame_alias: Explorer.DataFrame,
missing_dep: missing_dep(connection)
)
Expand Down Expand Up @@ -165,7 +165,6 @@ defmodule KinoDB.SQLCell do

defp connection_type(connection) when is_struct(connection, Req.Request) do
cond do
Keyword.has_key?(connection.request_steps, :bigquery_run) -> "bigquery"
Keyword.has_key?(connection.request_steps, :athena_run) -> "athena"
Keyword.has_key?(connection.request_steps, :clickhouse_run) -> "clickhouse"
true -> nil
Expand Down Expand Up @@ -220,55 +219,39 @@ defmodule KinoDB.SQLCell do
to_quoted(attrs, quote(do: Tds), fn n -> "@#{n}" end)
end

# query!/4 based that returns a Req response.
defp to_quoted(%{"connection" => %{"type" => "clickhouse"}} = attrs) do
to_quoted_query_req(attrs, quote(do: ReqCH), fn n, inner ->
name =
if String.match?(inner, ~r/[^a-z0-9_]/) do
"param_#{n}"
else
inner
end

"{#{name}:String}"
end)
end

# Explorer-based
defp to_quoted(%{"connection" => %{"type" => "snowflake"}} = attrs) do
to_explorer_quoted(attrs, fn n -> "?#{n}" end)
to_quoted_explorer(attrs, fn n -> "?#{n}" end)
end

defp to_quoted(%{"connection" => %{"type" => "duckdb"}} = attrs) do
to_explorer_quoted(attrs, fn n -> "?#{n}" end)
to_quoted_explorer(attrs, fn n -> "?#{n}" end)
end

# Req-based
defp to_quoted(%{"connection" => %{"type" => "bigquery"}} = attrs) do
to_req_quoted(attrs, fn _n -> "?" end, :bigquery)
to_quoted_explorer(attrs, fn _n -> "?" end)
end

# Req-based
defp to_quoted(%{"connection" => %{"type" => "athena"}} = attrs) do
to_req_quoted(attrs, fn _n -> "?" end, :athena)
to_quoted_req_query(attrs, quote(do: ReqAthena), fn _n -> "?" end)
end

defp to_quoted(_ctx) do
quote do
end
end
defp to_quoted(%{"connection" => %{"type" => "clickhouse"}} = attrs) do
to_quoted_req_query(attrs, quote(do: ReqCH), fn n, inner ->
name =
if String.match?(inner, ~r/[^a-z0-9_]/) do
"param_#{n}"
else
inner
end

defp to_quoted_query_req(attrs, quoted_module, next) do
{query, params} = parameterize(attrs["query"], attrs["connection"]["type"], next)
opts_args = query_opts_args(attrs)
"{#{name}:String}"
end)
end

defp to_quoted(_ctx) do
quote do
unquote(quoted_var(attrs["result_variable"])) =
unquote(quoted_module).query!(
unquote(quoted_var(attrs["connection"]["variable"])),
unquote(quoted_query(query)),
unquote(params),
unquote_splicing(opts_args)
).body
end
end

Expand All @@ -287,32 +270,32 @@ defmodule KinoDB.SQLCell do
end
end

defp to_req_quoted(attrs, next, req_key) do
defp to_quoted_explorer(attrs, next) do
{query, params} = parameterize(attrs["query"], attrs["connection"]["type"], next)
query = {quoted_query(query), params}
opts = query_opts_args(attrs)
req_opts = opts |> Enum.at(0, []) |> Keyword.put(req_key, query)
data_frame_alias = attrs["data_frame_alias"]

quote do
unquote(quoted_var(attrs["result_variable"])) =
Req.post!(
unquote(data_frame_alias).from_query!(
unquote(quoted_var(attrs["connection"]["variable"])),
unquote(req_opts)
).body
unquote(quoted_query(query)),
unquote(params)
)
end
end

defp to_explorer_quoted(attrs, next) do
defp to_quoted_req_query(attrs, quoted_module, next) do
{query, params} = parameterize(attrs["query"], attrs["connection"]["type"], next)
data_frame_alias = attrs["data_frame_alias"]
opts_args = query_opts_args(attrs)

quote do
unquote(quoted_var(attrs["result_variable"])) =
unquote(data_frame_alias).from_query!(
unquote(quoted_module).query!(
unquote(quoted_var(attrs["connection"]["variable"])),
unquote(quoted_query(query)),
unquote(params)
)
unquote(params),
unquote_splicing(opts_args)
).body
end
end

Expand All @@ -333,8 +316,9 @@ defmodule KinoDB.SQLCell do
when timeout != nil and type in @connection_types_with_timeout,
do: [[timeout: timeout * 1000]]

defp query_opts_args(%{"connection" => %{"type" => "athena"}, "cache_query" => cache_query}),
do: [[cache_query: cache_query]]
defp query_opts_args(%{"connection" => %{"type" => "athena"}} = attrs) do
[[format: :explorer] ++ if(attrs["cache_query"], do: [], else: [cache_query: false])]
end

defp query_opts_args(%{"connection" => %{"type" => "clickhouse"}}),
do: [[format: :explorer]]
Expand Down Expand Up @@ -418,7 +402,7 @@ defmodule KinoDB.SQLCell do
end
end

defp missing_dep(%{type: adbc}) when adbc in ~w[snowflake duckdb] do
defp missing_dep(%{type: adbc}) when adbc in ~w[snowflake duckdb bigquery] do
unless Code.ensure_loaded?(Explorer) do
~s|{:explorer, "~> 0.10"}|
end
Expand Down
3 changes: 1 addition & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ defmodule KinoDB.MixProject do
{:explorer, "~> 0.10", optional: true},

# Those dependecies are new, so we use stricter versions
{:req_bigquery, "~> 0.1.0", optional: true},
{:req_athena, "~> 0.2.0", optional: true},
{:req_athena, "~> 0.3.0", optional: true},
{:req_ch, "~> 0.1.0", optional: true},

# Dev only
Expand Down
Loading

0 comments on commit 62a1989

Please sign in to comment.