Skip to content

Commit 24eb0e2

Browse files
authored
🏦 Azure Support! (#2)
* azure: initial implementation * get row query counts before streaming thanks jared! * finish azure docs, cli, types, and small other quality of life tweaks
1 parent b12700e commit 24eb0e2

15 files changed

+1132
-21
lines changed

.gitignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ logs/*
77
!logs/.gitkeep
88
tmp/*
99
!tmp/.gitkeep
10-
notes.txt
10+
notes.txt
11+
notes/*

.vscode/settings.json

+5
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,15 @@
1515
"flinksql",
1616
"gcloud",
1717
"gtoken",
18+
"msnodesqlv",
1819
"mydnd",
1920
"papaparse",
21+
"recordset",
2022
"retl",
23+
"ROWCOUNT",
24+
"rowsaffected",
2125
"schemaless",
26+
"sqlify",
2227
"tldr",
2328
"tnpx",
2429
"transactsql",

README.md

+54
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ Supported Data Warehouses:
99
- [Google BigQuery](#bq)
1010
- [AWS Athena](#athena)
1111
- [Snowflake](#snowflake)
12+
- [Microsoft Azure SQL](#azure)
1213

1314
<div id="tldr"></div>
1415

@@ -524,6 +525,59 @@ most AWS accounts can be [setup for programmatic access](https://docs.aws.amazon
524525
}
525526
```
526527

528+
<div id="azure"></div>
529+
530+
##### Azure
531+
532+
Azure SQL (managed + on premise) Servers use usernames and passwords for authentication; there are no special permissions required... the user you authenticate as should have permissions to run the query.
533+
534+
there are two common patterns for entering credentials, **connection strings** and **structured objects** ... they are essentially the same thing in different formats:
535+
536+
- **connection string**
537+
a connection string is a long string which contains username, password, database, port and some other options to establish a secure connection with your database. they look like this:
538+
539+
```
540+
Server=tcp:my-sql-server.database.windows.net,1433;Database=database;User Id=username;Password=password;Encrypt=true
541+
Driver=msnodesqlv8;Server=(local)\INSTANCE;Database=database;UID=DOMAIN\username;PWD=password;Encrypt=true
542+
```
543+
544+
you can input your connection string into the `auth` object with the key `connection_string`:
545+
```javascript
546+
{
547+
dwh: "azure",
548+
auth: {
549+
connection_string: "my-sql-server.database.windows.net,1433; etc..."
550+
}
551+
}
552+
```
553+
554+
if your database is hosted in Azure Cloud, you can find your connection strings in the Azure SQL UI; this module uses the **ADO.NET** syntax:
555+
556+
<img src="https://aktunes.neocities.org/dwh-mixpanel/azureStrings.png" alt="azure cloud screenshot" width=420/>
557+
558+
make sure to choose the right connection string version that is supported by your database.
559+
(hint: not all Azure DBs are setup with Active Directory)
560+
561+
- **JSON**
562+
563+
if you wish, you may also pass your credentials as JSON; the parameters are very similar look like this:
564+
565+
```javascript
566+
{
567+
dwh: "azure",
568+
auth: {
569+
user: "",
570+
password: "",
571+
server: "",
572+
port: 1433, //default
573+
domain: "",
574+
database: ""
575+
}
576+
}
577+
```
578+
you can also pass other pool configuration options to the `auth` object... [see the full list of params](https://github.com/tediousjs/node-mssql#general-same-for-all-drivers)
579+
580+
527581
<div id="env"></div>
528582

529583
### 💾 environment variables

components/cli.js

+18-3
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ function dwhType() {
104104
choices: [
105105
{ name: "\tGoogle BigQuery", value: "bigquery" },
106106
{ name: "\tSnowflake", value: "snowflake" },
107-
{ name: "\tAWS Athena", value: "athena" }
107+
{ name: "\tAWS Athena", value: "athena" },
108+
{ name: "\tAzure SQL", value: "azure" }
108109
]
109110
}
110111
];
@@ -121,11 +122,12 @@ function dwhAuth(dwh, env) {
121122
case "snowflake":
122123
questions = snowflakeAuth(env);
123124
break;
124-
125125
case "athena":
126126
questions = athenaAuth(env);
127127
break;
128-
128+
case "azure":
129+
questions = azureAuth(env);
130+
break;
129131
default:
130132
break;
131133
}
@@ -207,6 +209,19 @@ function bigqueryAuth(env) {
207209
];
208210
}
209211

212+
function azureAuth(env) {
213+
return [
214+
{
215+
message: "what's your SQL connection string?",
216+
name: "connection_string",
217+
type: "input",
218+
suffix: "\n",
219+
default: env?.auth?.connection_string,
220+
validate: passesNotEmpty
221+
}
222+
];
223+
}
224+
210225

211226
function snowflakeAuth(env) {
212227
/**

components/config.js

+35-2
Original file line numberDiff line numberDiff line change
@@ -82,17 +82,29 @@ export default class dwhConfig {
8282

8383
progress(createOrUpdate, type = 'dwh') {
8484
if (this.verbose) {
85+
//make labels the same padding
86+
let dwhLabel = this.dwh;
87+
let mixpanelLabel = "mixpanel";
88+
while (dwhLabel.length !== mixpanelLabel.length) {
89+
if (dwhLabel.length > mixpanelLabel.length) {
90+
mixpanelLabel += " ";
91+
}
92+
else {
93+
dwhLabel += " ";
94+
}
95+
}
96+
8597
if (typeof createOrUpdate === 'object') {
8698
const { total, startValue } = createOrUpdate;
8799
if (type === 'dwh') {
88100
this.dwhProgress = this.multiBar.create(total, startValue, {}, {
89-
format: `${this.dwh} |` + colors.cyan('{bar}') + `| {value}/{total} ${this.type}s ` + colors.green('{percentage}%') + ` {duration_formatted} ETA: {eta_formatted}`,
101+
format: `${dwhLabel} |` + colors.cyan('{bar}') + `| {value}/{total} ${this.type}s ` + colors.green('{percentage}%') + ` {duration_formatted} ETA: {eta_formatted}`,
90102

91103
});
92104
}
93105
if (type === 'mp') {
94106
this.mpProgress = this.multiBar.create(total, startValue, {}, {
95-
format: `mixpanel |` + colors.magenta('{bar}') + `| {value}/{total} ${this.type}s ` + colors.green('{percentage}%') + ` {duration_formatted} ETA: {eta_formatted}`
107+
format: `${mixpanelLabel} |` + colors.magenta('{bar}') + `| {value}/{total} ${this.type}s ` + colors.green('{percentage}%') + ` {duration_formatted} ETA: {eta_formatted}`
96108

97109
});
98110
};
@@ -258,6 +270,27 @@ export default class dwhConfig {
258270
query: this.sql
259271
};
260272
}
273+
274+
if (this.dwh === 'azure') {
275+
return {
276+
query: this.sql,
277+
connectionString: this.auth.connection_string,
278+
user: this.auth.user,
279+
password: this.auth.password,
280+
server: this.auth.server,
281+
port: this.auth.port,
282+
domain: this.auth.domain,
283+
database: this.auth.database
284+
285+
};
286+
}
287+
288+
else {
289+
return {
290+
query: this.sql,
291+
...this.auth
292+
};
293+
}
261294
}
262295

263296
//todo improve validation

index.js

+17-3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import createStream from "./middleware/mixpanel.js";
2121
import bigQuery from './middleware/bigquery.js';
2222
import snowflake from './middleware/snowflake.js';
2323
import athena from './middleware/athena.js';
24+
import azure from "./middleware/azure.js";
2425

2526
/*
2627
----
@@ -110,6 +111,9 @@ async function main(params) {
110111
case 'athena':
111112
dwh = await athena(config, mpStream);
112113
break;
114+
case 'azure':
115+
dwh = await azure(config, mpStream);
116+
break;
113117
default:
114118
if (config.verbose) u.cLog(`i do not know how to access ${config.warehouse}... sorry`);
115119
mpStream.destroy();
@@ -197,7 +201,7 @@ emitter.once('dwh stream start', (config) => {
197201
config.streamTime.start();
198202
if (config.verbose) {
199203
// u.cLog(`\n${config.dwh} stream start`);
200-
u.cLog(c.magenta(`\nstreaming started! (${u.comma(config.dwhStore.rows)} ${config.type}s)\n`));
204+
u.cLog(c.magenta(`\nstreaming started! (${config.dwhStore.rows > 0 ? u.comma(config.dwhStore.rows) : "unknown number of"} ${config.type}s)\n`));
201205
config.progress({ total: config.dwhStore.rows, startValue: 0 });
202206
}
203207
});
@@ -240,13 +244,23 @@ emitter.once('mp import end', (config) => {
240244

241245
emitter.on('dwh batch', (config) => {
242246
if (config.verbose) {
243-
config.progress(1, 'dwh');
247+
try {
248+
config.progress(1, 'dwh');
249+
}
250+
catch (e) {
251+
//noop
252+
}
244253
}
245254
});
246255

247256
emitter.on('mp batch', (config, numImported) => {
248257
if (config.verbose) {
249-
config.progress(numImported, 'mp');
258+
try {
259+
config.progress(numImported, 'mp');
260+
}
261+
catch(e) {
262+
//noop
263+
}
250264
}
251265
});
252266

middleware/athena.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ export default async function athena(config, outStream) {
3030
const sqlParse = new sql.Parser();
3131
let tableList, columnList, ast;
3232
try {
33-
({ tableList, columnList, ast } = sqlParse.parse(query, { database: 'MySQL', }));
33+
({ tableList, columnList, ast } = sqlParse.parse(query, { database: 'MySQL' }));
3434
config.store({ sqlAnalysis: { tableList, columnList, ast } });
3535
} catch (e) {
3636
if (config.verbose) u.cLog("\ncould not parse SQL query to AST...\n\tthat's ok though!!!\n");

middleware/azure.js

+129
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
import transformer from '../components/transformer.js';
2+
import emitter from '../components/emitter.js';
3+
import csvMaker from '../components/csv.js';
4+
import u from 'ak-tools';
5+
import mssql from 'mssql';
6+
import sql from 'node-sql-parser';
7+
import dayjs from "dayjs";
8+
9+
10+
export default async function azure(config, outStream) {
11+
12+
const { query, ...dwhAuth } = config.dwhAuth();
13+
const sqlParse = new sql.Parser();
14+
let tableList, columnList, ast;
15+
try {
16+
({ tableList, columnList, ast } = sqlParse.parse(query, { database: 'transactsql' }));
17+
config.store({ sqlAnalysis: { tableList, columnList, ast } });
18+
} catch (e) {
19+
if (config.verbose) u.cLog("\ncould not parse SQL query to AST...\n\tthat's ok though!!!\n");
20+
}
21+
22+
// * AUTH
23+
const auth = dwhAuth.connectionString ? dwhAuth.connectionString : dwhAuth;
24+
const pool = await mssql.connect(auth); //todo support things other than connection string
25+
pool.on('error', (e) => { throw e; });
26+
config.store({ job: { ...pool.config, password: `******` } });
27+
28+
// * MODELING
29+
if (config.type === "event") {
30+
//events get unix epoch
31+
config.timeTransform = (time) => { return dayjs(time).valueOf(); };
32+
}
33+
else {
34+
//all others get ISO
35+
config.timeTransform = (time) => { return dayjs(time).format('YYYY-MM-DDTHH:mm:ss'); };
36+
}
37+
let mpModel; //not available until "readable"
38+
39+
// * LOOKUP TABLES
40+
if (config.type === 'table') {
41+
emitter.emit('dwh query start', config);
42+
const { recordset, rowsAffected } = await (new mssql.Request(pool)).query(query);
43+
emitter.emit('dwh query end', config);
44+
config.store({ rows: rowsAffected[0] });
45+
config.store({ schema: recordset.columns });
46+
mpModel = transformer(config, []);
47+
const transformedRows = recordset.map(mpModel);
48+
const csv = csvMaker(transformedRows);
49+
return csv;
50+
}
51+
52+
// * METADATA + ROW COUNTS
53+
emitter.emit('dwh query start', config);
54+
try {
55+
const getRowCount = await (new mssql.Request(pool)).query(`WITH count_query AS (${query}) SELECT COUNT(*) as rows FROM count_query;`);
56+
config.store({ rows: getRowCount.recordset[0].rows });
57+
}
58+
catch (e) {
59+
try {
60+
// trailing semicolons mess up the row count query
61+
if (query.endsWith(';')) {
62+
const rowCountQuery = query.substring(0, query.length - 1);
63+
const getRowCount = await (new mssql.Request(pool)).query(`WITH count_query AS (${rowCountQuery}) SELECT COUNT(*) as rows FROM count_query;`);
64+
config.store({ rows: getRowCount.recordset[0].rows });
65+
}
66+
67+
// try to re-serialize query from the ast
68+
else if (ast) {
69+
const rowCountQuery = sqlParse.sqlify(ast);
70+
const getRowCount = await (new mssql.Request(pool)).query(`WITH count_query AS (${rowCountQuery}) SELECT COUNT(*) as rows FROM count_query;`);
71+
config.store({ rows: getRowCount.recordset[0].rows });
72+
}
73+
74+
// todo add a @@ROWCOUNT attempt to get rows
75+
// ? https://www.sqlshack.com/working-with-sql-server-rowcount/
76+
77+
else {
78+
throw e;
79+
}
80+
81+
}
82+
catch (e) {
83+
if (config.verbose) u.cLog('error getting row counts', e.message, 'ERROR');
84+
config.store({ rows: 0 });
85+
}
86+
}
87+
88+
// * STREAMING QUERY
89+
const job = new mssql.Request(pool);
90+
job.stream = true;
91+
job.query(`${query}`);
92+
93+
return new Promise((resolve, reject) => {
94+
job.on('recordset', (columns) => {
95+
emitter.emit('dwh query end', config);
96+
emitter.emit('dwh stream start', config);
97+
config.store({ schema: columns });
98+
const schemaDateFields = Object.keys(u.objFilter(columns, (col) => col?.type?.declaration === 'datetime'));
99+
const dateFields = new Set([config.mappings.time_col, ...schemaDateFields]);
100+
mpModel = transformer(config, dateFields);
101+
});
102+
103+
job.on('row', (row) => {
104+
config.got();
105+
outStream.push(mpModel(row));
106+
});
107+
108+
// job.on('rowsaffected', (rowCount) => {
109+
// // ! seems to fire last
110+
// config.store({ rows: rowCount });
111+
// });
112+
113+
// job.on('info', (message) => {
114+
// // ! dunno what this is
115+
// debugger;
116+
// });
117+
118+
job.on('error', (err) => {
119+
reject(err);
120+
});
121+
122+
job.on('done', () => {
123+
emitter.emit('dwh stream end', config);
124+
outStream.push(null);
125+
pool.close();
126+
resolve(config);
127+
});
128+
});
129+
}

middleware/bigquery.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ export default async function bigquery(config, outStream) {
1313
const sqlParse = new sql.Parser();
1414
let tableList, columnList, ast;
1515
try {
16-
({ tableList, columnList, ast } = sqlParse.parse(query, { database: 'BigQuery', }));
16+
({ tableList, columnList, ast } = sqlParse.parse(query, { database: 'BigQuery' }));
1717
config.store({ sqlAnalysis: { tableList, columnList, ast } });
1818
} catch (e) {
1919
if (config.verbose) u.cLog("\ncould not parse SQL query to AST...\n\tthat's ok though!!!\n");

0 commit comments

Comments
 (0)