Skip to content

Commit 017b553

Browse files
committed
Use ADBC for Google BigQuery and update Athena code
1 parent ff88a99 commit 017b553

File tree

7 files changed

+158
-200
lines changed

7 files changed

+158
-200
lines changed

lib/assets/connection_cell/main.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -691,8 +691,9 @@ export function init(ctx, info) {
691691
const reader = new FileReader();
692692

693693
reader.onload = (res) => {
694-
const value = JSON.parse(res.target.result);
695-
ctx.pushEvent("update_field", { field: "credentials", value });
694+
// Reformat the JSON into a compact form
695+
const value = JSON.stringify(JSON.parse(res.target.result));
696+
ctx.pushEvent("update_field", { field: "credentials_json", value });
696697
};
697698

698699
reader.readAsText(file);

lib/kino_db/connection_cell.ex

Lines changed: 38 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ defmodule KinoDB.ConnectionCell do
3333
"database" => attrs["database"] || "",
3434
"project_id" => attrs["project_id"] || "",
3535
"default_dataset_id" => attrs["default_dataset_id"] || "",
36-
"credentials" => attrs["credentials"] || %{},
36+
"credentials_json" => attrs["credentials_json"] || "",
3737
"access_key_id" => attrs["access_key_id"] || "",
3838
"secret_access_key" => secret_access_key,
3939
"use_secret_access_key_secret" =>
@@ -128,7 +128,7 @@ defmodule KinoDB.ConnectionCell do
128128
~w|database_path|
129129

130130
"bigquery" ->
131-
~w|project_id default_dataset_id credentials|
131+
~w|project_id default_dataset_id credentials_json|
132132

133133
"athena" ->
134134
if fields["use_secret_access_key_secret"],
@@ -267,6 +267,37 @@ defmodule KinoDB.ConnectionCell do
267267
end
268268
end
269269

270+
defp to_quoted(%{"type" => "bigquery"} = attrs) do
271+
var = quoted_var(attrs["variable"])
272+
273+
opts =
274+
[
275+
driver: :bigquery,
276+
"adbc.bigquery.sql.project_id": attrs["project_id"]
277+
] ++
278+
case attrs["default_dataset_id"] do
279+
"" -> []
280+
dataset_id -> ["adbc.bigquery.sql.dataset_id": dataset_id]
281+
end ++
282+
case attrs["credentials_json"] do
283+
"" ->
284+
[]
285+
286+
credentials_json ->
287+
[
288+
"adbc.bigquery.sql.auth_type": "adbc.bigquery.sql.auth_type.json_credential_string",
289+
"adbc.bigquery.sql.auth_credentials":
290+
{:sigil_S, [delimiter: ~s["""]], [{:<<>>, [], [credentials_json <> "\n"]}, []]}
291+
]
292+
end
293+
294+
quote do
295+
:ok = Adbc.download_driver!(:bigquery)
296+
{:ok, db} = Kino.start_child({Adbc.Database, unquote(opts)})
297+
{:ok, unquote(var)} = Kino.start_child({Adbc.Connection, database: db})
298+
end
299+
end
300+
270301
defp to_quoted(%{"type" => "postgres"} = attrs) do
271302
quote do
272303
opts = unquote(trim_opts(shared_options(attrs) ++ postgres_and_mysql_options(attrs)))
@@ -291,40 +322,18 @@ defmodule KinoDB.ConnectionCell do
291322
end
292323
end
293324

294-
defp to_quoted(%{"type" => "bigquery"} = attrs) do
295-
goth_opts_block = check_bigquery_credentials(attrs)
296-
297-
conn_block =
298-
quote do
299-
{:ok, _pid} = Kino.start_child({Goth, opts})
300-
301-
unquote(quoted_var(attrs["variable"])) =
302-
Req.new(http_errors: :raise)
303-
|> ReqBigQuery.attach(
304-
goth: ReqBigQuery.Goth,
305-
project_id: unquote(attrs["project_id"]),
306-
default_dataset_id: unquote(attrs["default_dataset_id"])
307-
)
308-
309-
:ok
310-
end
311-
312-
join_quoted([goth_opts_block, conn_block])
313-
end
314-
315325
defp to_quoted(%{"type" => "athena"} = attrs) do
316326
quote do
317327
unquote(quoted_var(attrs["variable"])) =
318-
Req.new(http_errors: :raise)
319-
|> ReqAthena.attach(
320-
format: :explorer,
328+
ReqAthena.new(
321329
access_key_id: unquote(attrs["access_key_id"]),
322330
database: unquote(attrs["database"]),
323331
output_location: unquote(attrs["output_location"]),
324332
region: unquote(attrs["region"]),
325333
secret_access_key: unquote(quoted_access_key(attrs)),
326334
token: unquote(attrs["token"]),
327-
workgroup: unquote(attrs["workgroup"])
335+
workgroup: unquote(attrs["workgroup"]),
336+
http_errors: :raise
328337
)
329338

330339
:ok
@@ -354,37 +363,6 @@ defmodule KinoDB.ConnectionCell do
354363
end
355364
end
356365

357-
defp check_bigquery_credentials(attrs) do
358-
case attrs["credentials"] do
359-
%{"type" => "service_account"} ->
360-
quote do
361-
credentials = unquote(Macro.escape(attrs["credentials"]))
362-
363-
opts = [
364-
name: ReqBigQuery.Goth,
365-
http_client: &Req.request/1,
366-
source: {:service_account, credentials}
367-
]
368-
end
369-
370-
%{"type" => "authorized_user"} ->
371-
quote do
372-
credentials = unquote(Macro.escape(attrs["credentials"]))
373-
374-
opts = [
375-
name: ReqBigQuery.Goth,
376-
http_client: &Req.request/1,
377-
source: {:refresh_token, credentials}
378-
]
379-
end
380-
381-
_empty_map ->
382-
quote do
383-
opts = [name: ReqBigQuery.Goth, http_client: &Req.request/1]
384-
end
385-
end
386-
end
387-
388366
defp shared_options(attrs) do
389367
opts = [
390368
hostname: attrs["hostname"],
@@ -516,7 +494,6 @@ defmodule KinoDB.ConnectionCell do
516494
Code.ensure_loaded?(Postgrex) -> "postgres"
517495
Code.ensure_loaded?(MyXQL) -> "mysql"
518496
Code.ensure_loaded?(Exqlite) -> "sqlite"
519-
Code.ensure_loaded?(ReqBigQuery) -> "bigquery"
520497
Code.ensure_loaded?(ReqAthena) -> "athena"
521498
Code.ensure_loaded?(ReqCH) -> "clickhouse"
522499
Code.ensure_loaded?(Adbc) -> "duckdb"
@@ -543,20 +520,14 @@ defmodule KinoDB.ConnectionCell do
543520
end
544521
end
545522

546-
defp missing_dep(%{"type" => "bigquery"}) do
547-
unless Code.ensure_loaded?(ReqBigQuery) do
548-
~s|{:req_bigquery, "~> 0.1"}|
549-
end
550-
end
551-
552523
defp missing_dep(%{"type" => "athena"}) do
553524
missing_many_deps([
554-
{ReqAthena, ~s|{:req_athena, "~> 0.1"}|},
525+
{ReqAthena, ~s|{:req_athena, "~> 0.3"}|},
555526
{Explorer, ~s|{:explorer, "~> 0.10"}|}
556527
])
557528
end
558529

559-
defp missing_dep(%{"type" => adbc}) when adbc in ~w[snowflake duckdb] do
530+
defp missing_dep(%{"type" => adbc}) when adbc in ~w[snowflake duckdb bigquery] do
560531
unless Code.ensure_loaded?(Adbc) do
561532
~s|{:adbc, "~> 0.3"}|
562533
end
@@ -585,19 +556,6 @@ defmodule KinoDB.ConnectionCell do
585556
end
586557
end
587558

588-
defp join_quoted(quoted_blocks) do
589-
asts =
590-
Enum.flat_map(quoted_blocks, fn
591-
{:__block__, _meta, nodes} -> nodes
592-
node -> [node]
593-
end)
594-
595-
case asts do
596-
[node] -> node
597-
nodes -> {:__block__, [], nodes}
598-
end
599-
end
600-
601559
defp help_box(%{"type" => "bigquery"}) do
602560
if Code.ensure_loaded?(Mint.HTTP) do
603561
if running_on_google_metadata?() do

lib/kino_db/sql_cell.ex

Lines changed: 34 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ defmodule KinoDB.SQLCell do
2323
result_variable: Kino.SmartCell.prefixed_var_name("result", attrs["result_variable"]),
2424
query: query,
2525
timeout: attrs["timeout"],
26-
cache_query: attrs["cache_query"] || true,
26+
cache_query: Map.get(attrs, "cache_query", true),
2727
data_frame_alias: Explorer.DataFrame,
2828
missing_dep: missing_dep(connection)
2929
)
@@ -165,7 +165,6 @@ defmodule KinoDB.SQLCell do
165165

166166
defp connection_type(connection) when is_struct(connection, Req.Request) do
167167
cond do
168-
Keyword.has_key?(connection.request_steps, :bigquery_run) -> "bigquery"
169168
Keyword.has_key?(connection.request_steps, :athena_run) -> "athena"
170169
Keyword.has_key?(connection.request_steps, :clickhouse_run) -> "clickhouse"
171170
true -> nil
@@ -220,55 +219,39 @@ defmodule KinoDB.SQLCell do
220219
to_quoted(attrs, quote(do: Tds), fn n -> "@#{n}" end)
221220
end
222221

223-
# query!/4 based that returns a Req response.
224-
defp to_quoted(%{"connection" => %{"type" => "clickhouse"}} = attrs) do
225-
to_quoted_query_req(attrs, quote(do: ReqCH), fn n, inner ->
226-
name =
227-
if String.match?(inner, ~r/[^a-z0-9_]/) do
228-
"param_#{n}"
229-
else
230-
inner
231-
end
232-
233-
"{#{name}:String}"
234-
end)
235-
end
236-
237222
# Explorer-based
238223
defp to_quoted(%{"connection" => %{"type" => "snowflake"}} = attrs) do
239-
to_explorer_quoted(attrs, fn n -> "?#{n}" end)
224+
to_quoted_explorer(attrs, fn n -> "?#{n}" end)
240225
end
241226

242227
defp to_quoted(%{"connection" => %{"type" => "duckdb"}} = attrs) do
243-
to_explorer_quoted(attrs, fn n -> "?#{n}" end)
228+
to_quoted_explorer(attrs, fn n -> "?#{n}" end)
244229
end
245230

246-
# Req-based
247231
defp to_quoted(%{"connection" => %{"type" => "bigquery"}} = attrs) do
248-
to_req_quoted(attrs, fn _n -> "?" end, :bigquery)
232+
to_quoted_explorer(attrs, fn _n -> "?" end)
249233
end
250234

235+
# Req-based
251236
defp to_quoted(%{"connection" => %{"type" => "athena"}} = attrs) do
252-
to_req_quoted(attrs, fn _n -> "?" end, :athena)
237+
to_quoted_req_query(attrs, quote(do: ReqAthena), fn _n -> "?" end)
253238
end
254239

255-
defp to_quoted(_ctx) do
256-
quote do
257-
end
258-
end
240+
defp to_quoted(%{"connection" => %{"type" => "clickhouse"}} = attrs) do
241+
to_quoted_req_query(attrs, quote(do: ReqCH), fn n, inner ->
242+
name =
243+
if String.match?(inner, ~r/[^a-z0-9_]/) do
244+
"param_#{n}"
245+
else
246+
inner
247+
end
259248

260-
defp to_quoted_query_req(attrs, quoted_module, next) do
261-
{query, params} = parameterize(attrs["query"], attrs["connection"]["type"], next)
262-
opts_args = query_opts_args(attrs)
249+
"{#{name}:String}"
250+
end)
251+
end
263252

253+
defp to_quoted(_ctx) do
264254
quote do
265-
unquote(quoted_var(attrs["result_variable"])) =
266-
unquote(quoted_module).query!(
267-
unquote(quoted_var(attrs["connection"]["variable"])),
268-
unquote(quoted_query(query)),
269-
unquote(params),
270-
unquote_splicing(opts_args)
271-
).body
272255
end
273256
end
274257

@@ -287,32 +270,32 @@ defmodule KinoDB.SQLCell do
287270
end
288271
end
289272

290-
defp to_req_quoted(attrs, next, req_key) do
273+
defp to_quoted_explorer(attrs, next) do
291274
{query, params} = parameterize(attrs["query"], attrs["connection"]["type"], next)
292-
query = {quoted_query(query), params}
293-
opts = query_opts_args(attrs)
294-
req_opts = opts |> Enum.at(0, []) |> Keyword.put(req_key, query)
275+
data_frame_alias = attrs["data_frame_alias"]
295276

296277
quote do
297278
unquote(quoted_var(attrs["result_variable"])) =
298-
Req.post!(
279+
unquote(data_frame_alias).from_query!(
299280
unquote(quoted_var(attrs["connection"]["variable"])),
300-
unquote(req_opts)
301-
).body
281+
unquote(quoted_query(query)),
282+
unquote(params)
283+
)
302284
end
303285
end
304286

305-
defp to_explorer_quoted(attrs, next) do
287+
defp to_quoted_req_query(attrs, quoted_module, next) do
306288
{query, params} = parameterize(attrs["query"], attrs["connection"]["type"], next)
307-
data_frame_alias = attrs["data_frame_alias"]
289+
opts_args = query_opts_args(attrs)
308290

309291
quote do
310292
unquote(quoted_var(attrs["result_variable"])) =
311-
unquote(data_frame_alias).from_query!(
293+
unquote(quoted_module).query!(
312294
unquote(quoted_var(attrs["connection"]["variable"])),
313295
unquote(quoted_query(query)),
314-
unquote(params)
315-
)
296+
unquote(params),
297+
unquote_splicing(opts_args)
298+
).body
316299
end
317300
end
318301

@@ -333,8 +316,9 @@ defmodule KinoDB.SQLCell do
333316
when timeout != nil and type in @connection_types_with_timeout,
334317
do: [[timeout: timeout * 1000]]
335318

336-
defp query_opts_args(%{"connection" => %{"type" => "athena"}, "cache_query" => cache_query}),
337-
do: [[cache_query: cache_query]]
319+
defp query_opts_args(%{"connection" => %{"type" => "athena"}} = attrs) do
320+
[[format: :explorer] ++ if(attrs["cache_query"], do: [], else: [cache_query: false])]
321+
end
338322

339323
defp query_opts_args(%{"connection" => %{"type" => "clickhouse"}}),
340324
do: [[format: :explorer]]
@@ -418,7 +402,7 @@ defmodule KinoDB.SQLCell do
418402
end
419403
end
420404

421-
defp missing_dep(%{type: adbc}) when adbc in ~w[snowflake duckdb] do
405+
defp missing_dep(%{type: adbc}) when adbc in ~w[snowflake duckdb bigquery] do
422406
unless Code.ensure_loaded?(Explorer) do
423407
~s|{:explorer, "~> 0.10"}|
424408
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ defmodule KinoDB.MixProject do
3737
{:explorer, "~> 0.10", optional: true},
3838

3939
# Those dependecies are new, so we use stricter versions
40-
{:req_bigquery, "~> 0.1.0", optional: true},
40+
# TODO bump to 0.3.0
4141
{:req_athena, "~> 0.2.0", optional: true},
4242
{:req_ch, "~> 0.1.0", optional: true},
4343

mix.lock

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
"postgrex": {:hex, :postgrex, "0.18.0", "f34664101eaca11ff24481ed4c378492fed2ff416cd9b06c399e90f321867d7e", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "a042989ba1bc1cca7383ebb9e461398e3f89f868c92ce6671feb7ef132a252d1"},
3131
"req": {:hex, :req, "0.5.6", "8fe1eead4a085510fe3d51ad854ca8f20a622aae46e97b302f499dfb84f726ac", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "cfaa8e720945d46654853de39d368f40362c2641c4b2153c886418914b372185"},
3232
"req_athena": {:hex, :req_athena, "0.2.0", "a88b1500105f7dfade0c6fdc28174577c72ae6d96f8ab67c5a5cb594f68c8b10", [:mix], [{:aws_credentials, "~> 0.2", [hex: :aws_credentials, repo: "hexpm", optional: true]}, {:aws_signature, "~> 0.3.0", [hex: :aws_signature, repo: "hexpm", optional: false]}, {:explorer, "~> 0.9", [hex: :explorer, repo: "hexpm", optional: true]}, {:req, "~> 0.5.0", [hex: :req, repo: "hexpm", optional: false]}, {:req_s3, "~> 0.2", [hex: :req_s3, repo: "hexpm", optional: false]}], "hexpm", "75376f2f7b6fa924b51cb7d797287a1967589ae7d7d355c50e4170aa7f750d3c"},
33-
"req_bigquery": {:hex, :req_bigquery, "0.1.4", "4e1cbadb773d7272ef6c5ed5930f3f2d5df82bc0c6966f00ee71fa681e43a8c2", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:goth, "~> 1.3", [hex: :goth, repo: "hexpm", optional: false]}, {:req, "~> 0.3.5 or ~> 0.4", [hex: :req, repo: "hexpm", optional: false]}, {:table, "~> 0.1.1", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "9d099fbaca9eb89909d955ce7ff3dcc731f54e75bf68f99c97bbc2dbeef83ff6"},
3433
"req_ch": {:hex, :req_ch, "0.1.0", "a8c8710b2fc51ff6bf43d3842bdaff8269688f62ae94250dcfa92d2d37ca4279", [:mix], [{:explorer, "~> 0.10", [hex: :explorer, repo: "hexpm", optional: true]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}], "hexpm", "7aad3e3f4492647b431294e1fbb915c7280af8628ec6104506029b376b2b4150"},
3534
"req_s3": {:hex, :req_s3, "0.2.3", "ede5f4c792cf39995379307733ff4593032a876f38da29d9d7ea03881b498b51", [:mix], [{:req, "~> 0.5.6", [hex: :req, repo: "hexpm", optional: false]}], "hexpm", "31b5d52490495c8aeea7e3c5cbcec82f49035e11bdaf41f0e58ab716fefe44ca"},
3635
"rustler_precompiled": {:hex, :rustler_precompiled, "0.8.2", "5f25cbe220a8fac3e7ad62e6f950fcdca5a5a5f8501835d2823e8c74bf4268d5", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:rustler, "~> 0.23", [hex: :rustler, repo: "hexpm", optional: true]}], "hexpm", "63d1bd5f8e23096d1ff851839923162096364bac8656a4a3c00d1fff8e83ee0a"},

0 commit comments

Comments
 (0)