Skip to content

Commit

Permalink
Fix ReScanForeignScan API to make the parameterized query work
Browse files Browse the repository at this point in the history
correctly.

Sub-select or correlated queries that use a parameterized plan
uses ReScanForeignScan API.  However, this API is not correctly
rescanning the data.  In the case of parameters, we need to
recreate the Mongo cursor that fetches the data again.  The
patch does the same.

In passing, refactor some code in this area so that Mongo cursor
is created from MongoIterateForeignScan() that is called after
Begin and ReScan APIs.  However, create a Mongo cursor only if
it was not created already.

FDW-103, Vaibhav Dalvi, reviewed by Suraj Kharage.
  • Loading branch information
jeevanchalke committed Sep 27, 2020
1 parent 72ac0c0 commit 3571072
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 46 deletions.
64 changes: 63 additions & 1 deletion expected/select.out
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ SELECT mongo_fdw_version();
(1 row)

-- Create foreign tables
CREATE FOREIGN TABLE f_mongo_test (_id name, a int, b varchar)
CREATE FOREIGN TABLE f_mongo_test (_id name, a int, b text)
SERVER mongo_server OPTIONS (database 'mongo_fdw_regress', collection 'mongo_test');
CREATE FOREIGN TABLE f_test_tbl1 (_id NAME, c1 INTEGER, c2 VARCHAR(10), c3 CHAR(9),c4 INTEGER, c5 pg_catalog.Date, c6 DECIMAL, c7 INTEGER, c8 INTEGER)
SERVER mongo_server OPTIONS (database 'mongo_fdw_regress', collection 'test_tbl1');
Expand Down Expand Up @@ -945,10 +945,72 @@ NOTICE: Found number Nine

(1 row)

-- FDW-103: Parameter expression should work correctly with WHERE clause.
SELECT a, b FROM f_mongo_test WHERE a = (SELECT 2) ORDER BY a;
a | b
---+-----
2 | Two
(1 row)

SELECT a, b FROM f_mongo_test WHERE b = (SELECT 'Seven'::text) ORDER BY a;
a | b
---+-------
7 | Seven
(1 row)

-- Create local table and load data into it.
CREATE TABLE l_mongo_test AS SELECT a, b FROM f_mongo_test;
-- Check correlated query.
SELECT a, b FROM l_mongo_test lt
WHERE lt.b = (SELECT b FROM f_mongo_test ft WHERE lt.b = ft.b)
ORDER BY a;
a | b
----+-----------------------
0 | mongo_test collection
1 | One
2 | Two
3 | Three
4 | Four
5 | Five
6 | Six
7 | Seven
8 | Eight
9 | Nine
10 | Ten
(11 rows)

SELECT a, b FROM l_mongo_test lt
WHERE lt.a = (SELECT a FROM f_mongo_test ft WHERE lt.a = ft.a)
ORDER BY a;
a | b
----+-----------------------
0 | mongo_test collection
1 | One
2 | Two
3 | Three
4 | Four
5 | Five
6 | Six
7 | Seven
8 | Eight
9 | Nine
10 | Ten
(11 rows)

SELECT c1, c8 FROM f_test_tbl1 ft1
WHERE ft1.c8 = (SELECT c1 FROM f_test_tbl2 ft2 WHERE ft1.c8 = ft2.c1)
ORDER BY c1 LIMIT 2;
c1 | c8
-----+----
100 | 20
200 | 30
(2 rows)

-- Cleanup
DELETE FROM f_mongo_test WHERE a != 0;
DROP TABLE l_test_tbl1;
DROP TABLE l_test_tbl2;
DROP TABLE l_mongo_test;
DROP VIEW smpl_vw;
DROP VIEW comp_vw;
DROP VIEW temp_vw;
Expand Down
64 changes: 63 additions & 1 deletion expected/select_1.out
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ SELECT mongo_fdw_version();
(1 row)

-- Create foreign tables
CREATE FOREIGN TABLE f_mongo_test (_id name, a int, b varchar)
CREATE FOREIGN TABLE f_mongo_test (_id name, a int, b text)
SERVER mongo_server OPTIONS (database 'mongo_fdw_regress', collection 'mongo_test');
CREATE FOREIGN TABLE f_test_tbl1 (_id NAME, c1 INTEGER, c2 VARCHAR(10), c3 CHAR(9),c4 INTEGER, c5 pg_catalog.Date, c6 DECIMAL, c7 INTEGER, c8 INTEGER)
SERVER mongo_server OPTIONS (database 'mongo_fdw_regress', collection 'test_tbl1');
Expand Down Expand Up @@ -909,10 +909,72 @@ NOTICE: Found number Nine

(1 row)

-- FDW-103: Parameter expression should work correctly with WHERE clause.
SELECT a, b FROM f_mongo_test WHERE a = (SELECT 2) ORDER BY a;
a | b
---+-----
2 | Two
(1 row)

SELECT a, b FROM f_mongo_test WHERE b = (SELECT 'Seven'::text) ORDER BY a;
a | b
---+-------
7 | Seven
(1 row)

-- Create local table and load data into it.
CREATE TABLE l_mongo_test AS SELECT a, b FROM f_mongo_test;
-- Check correlated query.
SELECT a, b FROM l_mongo_test lt
WHERE lt.b = (SELECT b FROM f_mongo_test ft WHERE lt.b = ft.b)
ORDER BY a;
a | b
----+-----------------------
0 | mongo_test collection
1 | One
2 | Two
3 | Three
4 | Four
5 | Five
6 | Six
7 | Seven
8 | Eight
9 | Nine
10 | Ten
(11 rows)

SELECT a, b FROM l_mongo_test lt
WHERE lt.a = (SELECT a FROM f_mongo_test ft WHERE lt.a = ft.a)
ORDER BY a;
a | b
----+-----------------------
0 | mongo_test collection
1 | One
2 | Two
3 | Three
4 | Four
5 | Five
6 | Six
7 | Seven
8 | Eight
9 | Nine
10 | Ten
(11 rows)

SELECT c1, c8 FROM f_test_tbl1 ft1
WHERE ft1.c8 = (SELECT c1 FROM f_test_tbl2 ft2 WHERE ft1.c8 = ft2.c1)
ORDER BY c1 LIMIT 2;
c1 | c8
-----+----
100 | 20
200 | 30
(2 rows)

-- Cleanup
DELETE FROM f_mongo_test WHERE a != 0;
DROP TABLE l_test_tbl1;
DROP TABLE l_test_tbl2;
DROP TABLE l_mongo_test;
DROP VIEW smpl_vw;
DROP VIEW comp_vw;
DROP VIEW temp_vw;
Expand Down
89 changes: 46 additions & 43 deletions mongo_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,6 @@ MongoGetForeignPlan(PlannerInfo *root,
ForeignScan *foreignScan;
List *foreignPrivateList;
List *opExpressionList;
BSON *queryDocument;
List *columnList;

/*
Expand All @@ -371,24 +370,17 @@ MongoGetForeignPlan(PlannerInfo *root,
restrictionClauses = extract_actual_clauses(restrictionClauses, false);

/*
* We construct the query document to have MongoDB filter its rows. We
* could also construct a column name document here to retrieve only the
* needed columns. However, we found this optimization to degrade
* performance on the MongoDB server-side, so we instead filter out
* columns on our side.
* Find the foreign relation's clauses, which can be transformed to
* equivalent MongoDB queries.
*/
opExpressionList = ApplicableOpExpressionList(foreignrel);
queryDocument = QueryDocument(foreigntableid, opExpressionList, NULL);

/* We don't need to serialize column list as lists are copiable */
columnList = ColumnList(foreignrel);

/* Construct foreign plan with query document and column list */
foreignPrivateList = list_make2(columnList, opExpressionList);

/* Only clean up the query struct */
BsonDestroy(queryDocument);

/* Create the foreign scan node */
foreignScan = make_foreignscan(targetList, restrictionClauses,
scanRangeTableIndex,
Expand Down Expand Up @@ -464,16 +456,12 @@ static void
MongoBeginForeignScan(ForeignScanState *node, int eflags)
{
MONGO_CONN *mongoConnection;
MONGO_CURSOR *mongoCursor;
Oid foreignTableId;
List *columnList;
HTAB *columnMappingHash;
ForeignScan *foreignScan;
List *foreignPrivateList;
BSON *queryDocument;
MongoFdwOptions *options;
MongoFdwModifyState *fmstate;
List *opExpressionList;
RangeTblEntry *rte;
EState *estate = node->ss.ps.state;
ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
Expand Down Expand Up @@ -510,36 +498,28 @@ MongoBeginForeignScan(ForeignScanState *node, int eflags)
*/
mongoConnection = mongo_get_connection(server, user, options);

foreignScan = (ForeignScan *) node->ss.ps.plan;
foreignPrivateList = foreignScan->fdw_private;
foreignPrivateList = fsplan->fdw_private;
Assert(list_length(foreignPrivateList) == 2);

columnList = list_nth(foreignPrivateList, 0);
opExpressionList = list_nth(foreignPrivateList, 1);

queryDocument = QueryDocument(foreignTableId, opExpressionList, node);

columnMappingHash = ColumnMappingHash(foreignTableId, columnList);

/* Create cursor for collection name and set query */
mongoCursor = MongoCursorCreate(mongoConnection, options->svr_database,
options->collectionName, queryDocument);

/* Create and set foreign execution state */
fmstate->columnMappingHash = columnMappingHash;
fmstate->mongoConnection = mongoConnection;
fmstate->mongoCursor = mongoCursor;
fmstate->queryDocument = queryDocument;
fmstate->options = options;

node->fdw_state = (void *) fmstate;
}

/*
* MongoIterateForeignScan
* Reads the next document from MongoDB, converts it to a PostgreSQL tuple
* and stores the converted tuple into the ScanTupleSlot as a virtual
* tuple.
* Opens a Mongo cursor that uses the database name, collection name, and
* the remote query to send to the server.
*
* Reads the next document from MongoDB, converts it to a PostgreSQL tuple,
* and stores the converted tuple into the ScanTupleSlot as a virtual tuple.
*/
static TupleTableSlot *
MongoIterateForeignScan(ForeignScanState *node)
Expand All @@ -548,11 +528,44 @@ MongoIterateForeignScan(ForeignScanState *node)
TupleTableSlot *tupleSlot = node->ss.ss_ScanTupleSlot;
MONGO_CURSOR *mongoCursor = fmstate->mongoCursor;
HTAB *columnMappingHash = fmstate->columnMappingHash;
ForeignScan *foreignScan = (ForeignScan *) node->ss.ps.plan;
TupleDesc tupleDescriptor = tupleSlot->tts_tupleDescriptor;
Datum *columnValues = tupleSlot->tts_values;
bool *columnNulls = tupleSlot->tts_isnull;
int32 columnCount = tupleDescriptor->natts;

/* Create cursor for collection name and set query */
if (mongoCursor == NULL)
{
Oid foreignTableId;
List *foreignPrivateList;
List *opExpressionList;
BSON *queryDocument;

foreignPrivateList = foreignScan->fdw_private;
Assert(list_length(foreignPrivateList) == 2);

opExpressionList = list_nth(foreignPrivateList, 1);
foreignTableId = RelationGetRelid(node->ss.ss_currentRelation);

/*
* We construct the query document to have MongoDB filter its rows. We
* could also construct a column name document here to retrieve only
* the needed columns. However, we found this optimization to degrade
* performance on the MongoDB server-side, so we instead filter out
* columns on our side.
*/
queryDocument = QueryDocument(foreignTableId, opExpressionList, node);

mongoCursor = MongoCursorCreate(fmstate->mongoConnection,
fmstate->options->svr_database,
fmstate->options->collectionName,
queryDocument);

/* Save mongoCursor */
fmstate->mongoCursor = mongoCursor;
}

/*
* We execute the protocol to load a virtual tuple into a slot. We first
* call ExecClearTuple, then fill in values / isnull arrays, and last call
Expand Down Expand Up @@ -614,23 +627,13 @@ static void
MongoReScanForeignScan(ForeignScanState *node)
{
MongoFdwModifyState *fmstate = (MongoFdwModifyState *) node->fdw_state;
MONGO_CONN *mongoConnection = fmstate->mongoConnection;
MongoFdwOptions *options;
Oid foreignTableId;

/* Close down the old cursor */
MongoCursorDestroy(fmstate->mongoCursor);

/* Reconstruct full collection name */
foreignTableId = RelationGetRelid(node->ss.ss_currentRelation);
options = mongo_get_options(foreignTableId);

/* Reconstruct cursor for collection name and set query */
fmstate->mongoCursor = MongoCursorCreate(mongoConnection,
fmstate->options->svr_database,
fmstate->options->collectionName,
fmstate->queryDocument);
mongo_free_options(options);
if (fmstate->mongoCursor)
{
MongoCursorDestroy(fmstate->mongoCursor);
fmstate->mongoCursor = NULL;
}
}

static List *
Expand Down
19 changes: 18 additions & 1 deletion sql/select.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ CREATE USER MAPPING FOR public SERVER mongo_server;
SELECT mongo_fdw_version();

-- Create foreign tables
CREATE FOREIGN TABLE f_mongo_test (_id name, a int, b varchar)
CREATE FOREIGN TABLE f_mongo_test (_id name, a int, b text)
SERVER mongo_server OPTIONS (database 'mongo_fdw_regress', collection 'mongo_test');
CREATE FOREIGN TABLE f_test_tbl1 (_id NAME, c1 INTEGER, c2 VARCHAR(10), c3 CHAR(9),c4 INTEGER, c5 pg_catalog.Date, c6 DECIMAL, c7 INTEGER, c8 INTEGER)
SERVER mongo_server OPTIONS (database 'mongo_fdw_regress', collection 'test_tbl1');
Expand Down Expand Up @@ -222,10 +222,27 @@ $$ LANGUAGE plpgsql;

SELECT test_param_where();

-- FDW-103: Parameter expression should work correctly with WHERE clause.
SELECT a, b FROM f_mongo_test WHERE a = (SELECT 2) ORDER BY a;
SELECT a, b FROM f_mongo_test WHERE b = (SELECT 'Seven'::text) ORDER BY a;
-- Create local table and load data into it.
CREATE TABLE l_mongo_test AS SELECT a, b FROM f_mongo_test;
-- Check correlated query.
SELECT a, b FROM l_mongo_test lt
WHERE lt.b = (SELECT b FROM f_mongo_test ft WHERE lt.b = ft.b)
ORDER BY a;
SELECT a, b FROM l_mongo_test lt
WHERE lt.a = (SELECT a FROM f_mongo_test ft WHERE lt.a = ft.a)
ORDER BY a;
SELECT c1, c8 FROM f_test_tbl1 ft1
WHERE ft1.c8 = (SELECT c1 FROM f_test_tbl2 ft2 WHERE ft1.c8 = ft2.c1)
ORDER BY c1 LIMIT 2;

-- Cleanup
DELETE FROM f_mongo_test WHERE a != 0;
DROP TABLE l_test_tbl1;
DROP TABLE l_test_tbl2;
DROP TABLE l_mongo_test;
DROP VIEW smpl_vw;
DROP VIEW comp_vw;
DROP VIEW temp_vw;
Expand Down

0 comments on commit 3571072

Please sign in to comment.