diff --git a/.gitmodules b/.gitmodules index 8da9629..53af7f8 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ [submodule "mongo-c-driver"] path = mongo-c-driver url = ../../mongodb/mongo-c-driver.git +[submodule "json-c"] + path = json-c + url = https://github.com/json-c/json-c.git diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index df9d264..e490f03 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -11,7 +11,7 @@ Using Issues `mongo_fdw`'s maintainers prefer that bug reports, feature requests, and pull requests are submitted as [GitHub Issues][1]. If you think you require personal -assistance, please **do not** open an issue: email `engage` `@` `citusdata.com` +assistance, please **do not** open an issue: email `mongo_fdw` `@` `enterprisedb.com` instead. @@ -81,5 +81,5 @@ permitted by law. Finally, you confirm that you own said copyright, have the legal authority to grant said license, and in doing so are not violating any grant of rights you have made to third parties, including your employer. -[1]: https://github.com/citusdata/mongo_fdw/issues +[1]: https://github.com/EnterpriseDB/mongo_fdw/issues [2]: LICENSE diff --git a/Makefile b/Makefile index 466e31b..bb9d3d0 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,8 @@ # mongo_fdw/Makefile # -# Copyright (c) 2012-2014 Citus Data, Inc. +# Portions Copyright © 2004-2014, EnterpriseDB Corporation. +# +# Portions Copyright © 2012–2014 Citus Data, Inc. # MODULE_big = mongo_fdw @@ -10,14 +12,16 @@ MODULE_big = mongo_fdw # on another platform, change env_posix.os in MONGO_OBJS with the appropriate # environment object file. # - MONGO_DRIVER = mongo-c-driver MONGO_PATH = $(MONGO_DRIVER)/src MONGO_OBJS = $(MONGO_PATH)/bson.os $(MONGO_PATH)/encoding.os $(MONGO_PATH)/md5.os \ $(MONGO_PATH)/mongo.os $(MONGO_PATH)/numbers.os $(MONGO_PATH)/env.os - -PG_CPPFLAGS = --std=c99 -I$(MONGO_PATH) -OBJS = mongo_fdw.o mongo_query.o $(MONGO_OBJS) +LIBJSON = json-c +LIBJSON_OBJS = $(LIBJSON)/json_util.o $(LIBJSON)/json_object.o $(LIBJSON)/json_tokener.o \ + $(LIBJSON)/json_object_iterator.o $(LIBJSON)/printbuf.o $(LIBJSON)/linkhash.o \ + $(LIBJSON)/arraylist.o $(LIBJSON)/random_seed.o $(LIBJSON)/debug.o +PG_CPPFLAGS = --std=c99 -I$(MONGO_PATH) -I$(LIBJSON) +OBJS = connection.o option.o mongo_wrapper.o mongo_fdw.o mongo_query.o $(MONGO_OBJS) $(LIBJSON_OBJS) EXTENSION = mongo_fdw DATA = mongo_fdw--1.0.sql @@ -28,6 +32,8 @@ REGRESS_OPTS = --inputdir=test --outputdir=test \ $(MONGO_DRIVER)/%.os: $(MAKE) -C $(MONGO_DRIVER) $*.os +#$(LIBJSON)/json.o: +# $(MAKE) -C $(LIBJSON) # # Users need to specify their Postgres installation path through pg_config. For @@ -37,3 +43,11 @@ $(MONGO_DRIVER)/%.os: PG_CONFIG = pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) include $(PGXS) + +ifndef MAJORVERSION + MAJORVERSION := $(basename $(VERSION)) +endif + +ifeq (,$(findstring $(MAJORVERSION), 9.3 9.4 9.5)) + $(error PostgreSQL 9.3, 9.4 or 9.5 is required to compile this extension) +endif diff --git a/Makefile.meta b/Makefile.meta new file mode 100644 index 0000000..52a527a --- /dev/null +++ b/Makefile.meta @@ -0,0 +1,46 @@ +# mongo_fdw/Makefile.meta +# +# Portions Copyright © 2004-2014, EnterpriseDB Corporation. +# +# Portions Copyright © 2012–2014 Citus Data, Inc. +# + +MODULE_big = mongo_fdw + +# +# We assume we are running on a POSIX compliant system (Linux, OSX). If you are +# on another platform, change env_posix.os in MONGO_OBJS with the appropriate +# environment object file. +# + +MONGO_DRIVER = mongo-c-meta-driver +MONGO_PATH = $(MONGO_DRIVER)/src/mongoc +MONGO_INCLUDE = -I$(MONGO_DRIVER)/src/libbson/src/bson/ -I$(MONGO_PATH) +PG_CPPFLAGS = --std=c99 $(MONGO_INCLUDE) +SHLIB_LINK = -L$(MONGO_DRIVER)/.libs -lmongoc-1.0 +SHLIB_LINK += -L$(MONGO_DRIVER)/src/libbson/.libs -lbson-1.0 + +OBJS = connection.o option.o mongo_wrapper_meta.o mongo_fdw.o mongo_query.o + +EXTENSION = mongo_fdw +DATA = mongo_fdw--1.0.sql + +$(MONGO_DRIVER)/%.os: + $(MAKE) -C $(MONGO_DRIVER) $*.os + +# +# Users need to specify their Postgres installation path through pg_config. For +# example: /usr/local/pgsql/bin/pg_config or /usr/lib/postgresql/9.1/bin/pg_config +# + +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) + +ifndef MAJORVERSION + MAJORVERSION := $(basename $(VERSION)) +endif + +ifeq (,$(findstring $(MAJORVERSION), 9.3 9.4 9.5)) + $(error PostgreSQL 9.3, 9.4 or 9.5 is required to compile this extension) +endif diff --git a/README.md b/README.md index 027778d..cec383d 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,12 @@ -MongoDB FDW for PostgreSQL -========================== +# MongoDB Foreign Data Wrapper for PostgreSQL -This PostgreSQL extension implements a Foreign Data Wrapper (FDW) for -[MongoDB][1]. For an example demonstrating this wrapper's use, see [our blog -post][2]. Please also note that this version of `mongo_fdw` only works with -PostgreSQL 9.2 or 9.3. +This [MongoDB][1] extension implements the PostgreSQL's Foreign Data Wrapper. +Please note that this version of `mongo_fdw` only works with +PostgreSQL Version **9.3** and greater. Installation ------------ - The MongoDB FDW depends on the official MongoDB C Driver version 0.8 and includes it as a git submodule. If you are cloning this repository for the first time, be sure to pass the --recursive option to git clone in order to @@ -22,33 +19,47 @@ not up-to-date, run git submodule update --init. When you type `make`, the C driver's source code also gets automatically compiled and linked. -To build on POSIX-compliant systems (like Linux and OS X), you need to ensure -the `pg_config` executable is in your path when you run `make`. This executable -is typically in your PostgreSQL installation's `bin` directory. For example: - -```sh -PATH=/usr/local/pgsql/bin/:$PATH make -sudo PATH=/usr/local/pgsql/bin/:$PATH make install -``` +Note: Make sure you have permission to "/usr/local" (default installation location) folder. -Note that we have tested the `mongo_fdw` extension only on Fedora and Ubuntu -systems. If you run into issues on other systems, please [let us know][3]. +Note that we have verified the `mongo_fdw` extension only on MacOS X, +Fedora and Ubuntu systems. If you run into issues on other systems, please [let us know][3] +Enhancements +----------- +The following enhancements are added to the latest version of mongo_fdw + +Write-able FDW +-------------- +The previous version was only read-only, the latest version provides the write capability. +The user can now issue insert/update and delete statements for the foreign tables using the mongo_fdw. + +Connection Pooling +------------------ +The latest version comes with a connection pooler that utilises the same mongo +database connection for all the queries in the same session. The previous version +would open a new mongodb connection for every query. +This is a performance enhancement. + +New MongoDB C Driver Support +---------------------------- +The third enhancement is to add a new [MongoDB][1]' C driver. The current implementation is +based on the legacy driver of MongoDB. But [MongoDB][1] is provided completely new library +for driver called MongoDB's Meta Driver. So I have added support of that driver. +Now compile time option is available to use legacy and Meta driver. I am sure there +are many other benefits of the new Mongo-C-driver that we are not leveraging but we +will adopt those as we learn more about the new C driver. Usage ----- - The following parameters can be set on a MongoDB foreign server object: - * `address`: the address or hostname of the MongoDB server. - Defaults to `127.0.0.1` - * `port`: the port number of the MongoDB server. Defaults to `27017` + * **`address`**: the address or hostname of the MongoDB server Defaults to `127.0.0.1` + * **`port`**: the port number of the MongoDB server. Defaults to `27017` The following parameters can be set on a MongoDB foreign table object: - * `database`: the name of the MongoDB database to query. Defaults to `test` - * `collection`: the name of the MongoDB collection to query. Defaults to - the foreign table name used in the relevant `CREATE` command + * **`database`**: the name of the MongoDB database to query. Defaults to `test` + * **`collection`**: the name of the MongoDB collection to query. Defaults to the foreign table name used in the relevant `CREATE` command As an example, the following commands demonstrate loading the `mongo_fdw` wrapper, creating a server, and then creating a foreign table associated with @@ -60,46 +71,104 @@ default value mentioned above. estimating costs for the query execution plan. To see selected execution plans for a query, just run `EXPLAIN`. -We also currently use the internal PostgreSQL `NAME` type to represent the BSON -object identifier type (the `_id` field). +Examples with [MongoDB][1]'s equivalent statments. ```sql + -- load extension first time after install CREATE EXTENSION mongo_fdw; -- create server object -CREATE SERVER mongo_server FOREIGN DATA WRAPPER mongo_fdw -OPTIONS (address '127.0.0.1', port '27017'); +CREATE SERVER mongo_server + FOREIGN DATA WRAPPER mongo_fdw + OPTIONS (address '127.0.0.1', port '27017'); + +-- create user mapping +CREATE USER MAPPING FOR postgres + SERVER mongo_server + OPTIONS (username 'mongo_user', password 'mongo_pass'); -- create foreign table -CREATE FOREIGN TABLE customer_reviews +CREATE FOREIGN TABLE warehouse( + _id NAME, + warehouse_id int, + warehouse_name text, + warehouse_created timestamptz) +SERVER mongo_server + OPTIONS (database 'db', collection 'warehouse'); + +-- Note: first column of the table must be "_id" of type "NAME". + +-- select from table +SELECT * FROM warehouse WHERE warehouse_id = 1; + + _id | warehouse_id | warehouse_name | warehouse_created +------------------------+----------------+--------------------------- +53720b1904864dc1f5a571a0| 1 | UPS | 12-DEC-14 12:12:10 +05:00 + + +db.warehouse.find({"warehouse_id" : 1}).pretty() +{ + "_id" : ObjectId("53720b1904864dc1f5a571a0"), + "warehouse_id" : 1, + "warehouse_name" : "UPS", + "warehouse_created" : ISODate("2014-12-12T07:12:10Z") +} + + +-- insert row in table +INSERT INTO warehouse values (0, 1, 'UPS', to_date('2014-12-12T07:12:10Z')); + +db.warehouse.insert +( + { + "warehouse_id" : NumberInt(1), + "warehouse_name" : "UPS", + "warehouse_created" : ISODate("2014-12-12T07:12:10Z") + } +); + +-- delete row from table +DELETE FROM warehouse where warehouse_id = 3; + +> db.warehouse.remove({"warehouse_id" : 2}) + + +-- update a row of table +UPDATE warehouse set warehouse_name = 'UPS_NEW' where warehouse_id = 1; + +db.warehouse.update ( - _id NAME, - customer_id TEXT, - review_date TIMESTAMP, - review_rating INTEGER, - product_id CHAR(10), - product_title TEXT, - product_group TEXT, - product_category TEXT, - similar_product_ids CHAR(10)[] + { + "warehouse_id" : 1 + }, + { + "warehouse_id" : 1, + "warehouse_name" : "UPS_NEW" + } ) -SERVER mongo_server -OPTIONS (database 'test', collection 'customer_reviews'); --- collect data distribution statistics -ANALYZE customer_reviews; -``` +-- explain a table +EXPLAIN SELECT * FROM warehouse WHERE warehouse_id = 1; + QUERY PLAN + ----------------------------------------------------------------- + Foreign Scan on warehouse (cost=0.00..0.00 rows=1000 width=44) + Filter: (warehouse_id = 1) + Foreign Namespace: db.warehouse + Planning time: 0.671 ms +(4 rows) +-- collect data distribution statistics` +ANALYZE warehouse; + +``` Limitations ----------- * If the BSON document key contains uppercase letters or occurs within a nested document, `mongo_fdw` requires the corresponding column names to be - declared in double quotes. For example, a nested field such as `"review": { - "Votes": 19 }` should be declared as `"review.Votes" INTEGER` in the `CREATE - TABLE` statement. + declared in double quotes. * Note that PostgreSQL limits column names to 63 characters by default. If you need column names that are longer, you can increase the `NAMEDATALEN` @@ -108,25 +177,25 @@ Limitations Contributing ------------ - Have a fix for a bug or an idea for a great new feature? Great! Check out the contribution guidelines [here][4]. For all other types of questions or comments -about the wrapper please contact us at `engage` `@` `citusdata.com`. +about the wrapper please contact us at `mongo_fdw` `@` `enterprisedb.com`. Support ------- - This project will be modified to maintain compatibility with new PostgreSQL releases. The project owners set aside a day every month to look over open issues and support emails, but are not engaged in active feature development. Reported bugs will be addressed by apparent severity. +As with many open source projects, you may be able to obtain support via the public mailing list (`mongo_fdw` `@` `enterprisedb.com`). If you need commercial support, please contact the EnterpriseDB sales team, or check whether your existing PostgreSQL support provider can also support mongo_fdw. License ------- +Portions Copyright © 2004-2014, EnterpriseDB Corporation. -Copyright © 2012–2014 Citus Data, Inc. +Portions Copyright © 2012–2014 Citus Data, Inc. This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free @@ -137,6 +206,8 @@ See the [`LICENSE`][5] file for full details. [1]: http://www.mongodb.com [2]: http://www.citusdata.com/blog/51-run-sql-on-mongodb -[3]: https://github.com/citusdata/mongo_fdw/issues/new +[3]: https://github.com/enterprisedb/mongo_fdw/issues/new [4]: CONTRIBUTING.md [5]: LICENSE +[6]: https://github.com/mongodb/mongo-c-driver-legacy +[7]: https://github.com/mongodb/mongo-c-driver diff --git a/config.h b/config.h new file mode 100644 index 0000000..ee6ce48 --- /dev/null +++ b/config.h @@ -0,0 +1,22 @@ +/*------------------------------------------------------------------------- + * + * config.h + * Foreign-data wrapper for remote MongoDB servers + * + * Portions Copyright (c) 2012-2014, PostgreSQL Global Development Group + * + * Portions Copyright (c) 2004-2014, EnterpriseDB Corporation. + * + * Portions Copyright (c) 2012–2014 Citus Data, Inc. + * + * IDENTIFICATION + * config.h + * + *------------------------------------------------------------------------- + */ + +/* + * Define if you want to compile the MongoFDW with Meta C Driver, otherwise + * it will compile using MongoDB legacy C Driver +*/ +/* #define META_DRIVER */ diff --git a/connection.c b/connection.c new file mode 100644 index 0000000..9276420 --- /dev/null +++ b/connection.c @@ -0,0 +1,145 @@ +/*------------------------------------------------------------------------- + * + * connection.c + * Foreign-data wrapper for remote MongoDB servers + * + * Portions Copyright (c) 2012-2014, PostgreSQL Global Development Group + * + * Portions Copyright (c) 2004-2014, EnterpriseDB Corporation. + * + * Portions Copyright (c) 2012–2014 Citus Data, Inc. + * + * IDENTIFICATION + * connection.c + * + *------------------------------------------------------------------------- + */ +#include + + +#include "postgres.h" +#include "mongo_wrapper.h" +#include "mongo_fdw.h" + +#include "access/xact.h" +#include "mb/pg_wchar.h" +#include "miscadmin.h" +#include "utils/hsearch.h" +#include "utils/memutils.h" +#include "utils/resowner.h" + +/* Length of host */ +#define HOST_LEN 256 + +/* + * Connection cache hash table entry + * + * The lookup key in this hash table is the foreign server OID plus the user + * mapping OID. (We use just one connection per user per foreign server, + * so that we can ensure all scans use the same snapshot during a query.) + */ +typedef struct ConnCacheKey +{ + Oid serverid; /* OID of foreign server */ + Oid userid; /* OID of local user whose mapping we use */ +} ConnCacheKey; + +typedef struct ConnCacheEntry +{ + ConnCacheKey key; /* hash key (must be first) */ + MONGO_CONN *conn; /* connection to foreign server, or NULL */ +} ConnCacheEntry; + +/* + * Connection cache (initialized on first use) + */ +static HTAB *ConnectionHash = NULL; + +/* + * mongo_get_connection: + * Get a mong connection which can be used to execute queries on + * the remote Mongo server with the user's authorization. A new connection + * is established if we don't already have a suitable one. + */ +MONGO_CONN* +mongo_get_connection(ForeignServer *server, UserMapping *user, MongoFdwOptions *opt) +{ + bool found; + ConnCacheEntry *entry; + ConnCacheKey key; + + /* First time through, initialize connection cache hashtable */ + if (ConnectionHash == NULL) + { + HASHCTL ctl; + MemSet(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(ConnCacheKey); + ctl.entrysize = sizeof(ConnCacheEntry); + ctl.hash = tag_hash; + /* allocate ConnectionHash in the cache context */ + ctl.hcxt = CacheMemoryContext; + ConnectionHash = hash_create("mongo_fdw connections", 8, + &ctl, + HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); + } + + /* Create hash key for the entry. Assume no pad bytes in key struct */ + key.serverid = server->serverid; + key.userid = user->userid; + + /* + * Find or create cached entry for requested connection. + */ + entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found); + if (!found) + { + /* initialize new hashtable entry (key is already filled in) */ + entry->conn = NULL; + } + if (entry->conn == NULL) + { + entry->conn = MongoConnect(opt->svr_address, opt->svr_port, opt->svr_database, opt->svr_username, opt->svr_password); + elog(DEBUG3, "new mongo_fdw connection %p for server \"%s:%d\"", + entry->conn, opt->svr_address, opt->svr_port); + } + + return entry->conn; +} + +/* + * mongo_cleanup_connection: + * Delete all the cache entries on backend exists. + */ +void +mongo_cleanup_connection() +{ + HASH_SEQ_STATUS scan; + ConnCacheEntry *entry; + + if (ConnectionHash == NULL) + return; + + hash_seq_init(&scan, ConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + { + if (entry->conn == NULL) + continue; + + elog(DEBUG3, "disconnecting mongo_fdw connection %p", entry->conn); + MongoDisconnect(entry->conn); + entry->conn = NULL; + } +} + +/* + * Release connection created by calling mongo_get_connection. + */ +void +mongo_release_connection(MONGO_CONN *conn) +{ + /* + * We don't close the connection indvisually here, will do all connection + * cleanup on the backend exit. + */ +} + diff --git a/mongo_fdw--1.0.sql b/mongo_fdw--1.0.sql index af83adf..b495ced 100644 --- a/mongo_fdw--1.0.sql +++ b/mongo_fdw--1.0.sql @@ -1,6 +1,7 @@ /* mongo_fdw/mongo_fdw--1.0.sql */ --- Copyright (c) 2012-2014 Citus Data, Inc. +-- Portions Copyright © 2004-2014, EnterpriseDB Corporation. +-- Portions Copyright © 2012–2014 Citus Data, Inc. -- complain if script is sourced in psql, rather than via CREATE EXTENSION \echo Use "CREATE EXTENSION mongo_fdw" to load this file. \quit diff --git a/mongo_fdw.c b/mongo_fdw.c index d18eb6e..07eb334 100644 --- a/mongo_fdw.c +++ b/mongo_fdw.c @@ -1,17 +1,25 @@ /*------------------------------------------------------------------------- * * mongo_fdw.c + * Foreign-data wrapper for remote MongoDB servers * - * Function definitions for MongoDB foreign data wrapper. These functions access - * data stored in MongoDB through the official C driver. + * Portions Copyright (c) 2012-2014, PostgreSQL Global Development Group * - * Copyright (c) 2012-2014 Citus Data, Inc. + * Portions Copyright (c) 2004-2014, EnterpriseDB Corporation. + * + * Portions Copyright (c) 2012–2014 Citus Data, Inc. + * + * IDENTIFICATION + * mongo_fdw.c * *------------------------------------------------------------------------- */ #include "postgres.h" +#include "bson.h" +#include "mongo_wrapper.h" #include "mongo_fdw.h" +#include "mongo_query.h" #include "access/reloptions.h" #include "catalog/pg_type.h" @@ -26,6 +34,7 @@ #include "optimizer/plancat.h" #include "optimizer/planmain.h" #include "optimizer/restrictinfo.h" +#include "storage/ipc.h" #include "utils/array.h" #include "utils/builtins.h" #include "utils/date.h" @@ -33,49 +42,105 @@ #include "utils/lsyscache.h" #include "utils/rel.h" #include "utils/memutils.h" -//#include "utils/json.h" +#include "access/sysattr.h" +#include "commands/defrem.h" +#include "commands/explain.h" +#include "commands/vacuum.h" +#include "foreign/fdwapi.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "nodes/makefuncs.h" +#include "nodes/nodeFuncs.h" +#include "optimizer/cost.h" +#include "optimizer/pathnode.h" +#include "optimizer/paths.h" +#include "optimizer/planmain.h" +#include "optimizer/prep.h" +#include "optimizer/restrictinfo.h" +#include "optimizer/var.h" +#include "parser/parsetree.h" +#include "utils/builtins.h" +#include "utils/guc.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" #include "utils/jsonapi.h" - #if PG_VERSION_NUM >= 90300 #include "access/htup_details.h" #endif /* Local functions forward declarations */ -static StringInfo OptionNamesString(Oid currentContextId); static void MongoGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, - Oid foreignTableId); + Oid foreignTableId); static void MongoGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, - Oid foreignTableId); + Oid foreignTableId); static ForeignScan * MongoGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel, - Oid foreignTableId, ForeignPath *bestPath, - List *targetList, List *restrictionClauses); + Oid foreignTableId, ForeignPath *bestPath, + List *targetList, List *restrictionClauses); static void MongoExplainForeignScan(ForeignScanState *scanState, - ExplainState *explainState); + ExplainState *explainState); static void MongoBeginForeignScan(ForeignScanState *scanState, int executorFlags); static TupleTableSlot * MongoIterateForeignScan(ForeignScanState *scanState); static void MongoEndForeignScan(ForeignScanState *scanState); static void MongoReScanForeignScan(ForeignScanState *scanState); -static Const * SerializeDocument(bson *document); -static bson * DeserializeDocument(Const *constant); + +static TupleTableSlot *MongoExecForeignUpdate(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + TupleTableSlot *planSlot); +static TupleTableSlot *MongoExecForeignDelete(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + TupleTableSlot *planSlot); +static void MongoEndForeignModify(EState *estate, + ResultRelInfo *resultRelInfo); + +static void MongoAddForeignUpdateTargets(Query *parsetree, + RangeTblEntry *target_rte, + Relation target_relation); + +static void MongoBeginForeignModify(ModifyTableState *mtstate, + ResultRelInfo *resultRelInfo, + List *fdw_private, + int subplan_index, + int eflags); + +static TupleTableSlot *MongoExecForeignInsert(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + TupleTableSlot *planSlot); + +static List *MongoPlanForeignModify(PlannerInfo *root, + ModifyTable *plan, + Index resultRelation, + int subplan_index); + +static void +MongoExplainForeignModify(ModifyTableState *mtstate, + ResultRelInfo *rinfo, List *fdw_private, + int subplan_index, ExplainState *es); + +/* local functions */ static double ForeignTableDocumentCount(Oid foreignTableId); -static MongoFdwOptions * MongoGetOptions(Oid foreignTableId); -static char * MongoGetOptionValue(Oid foreignTableId, const char *optionName); static HTAB * ColumnMappingHash(Oid foreignTableId, List *columnList); -static void FillTupleSlot(const bson *bsonDocument, const char *bsonDocumentKey, - HTAB *columnMappingHash, Datum *columnValues, - bool *columnNulls); -static bool ColumnTypesCompatible(bson_type bsonType, Oid columnTypeId); -static Datum ColumnValueArray(bson_iterator *bsonIterator, Oid valueTypeId); -static Datum ColumnValue(bson_iterator *bsonIterator, Oid columnTypeId, +static void FillTupleSlot(const BSON *bsonDocument, const char *bsonDocumentKey, + HTAB *columnMappingHash, Datum *columnValues, + bool *columnNulls); +static bool ColumnTypesCompatible(BSON_TYPE bsonType, Oid columnTypeId); +static Datum ColumnValueArray(BSON_ITERATOR *bsonIterator, Oid valueTypeId); +static Datum ColumnValue(BSON_ITERATOR *bsonIterator, Oid columnTypeId, int32 columnTypeMod); -static void MongoFreeScanState(MongoFdwExecState *executionState); +static void MongoFreeScanState(MongoFdwModifyState *fmstate); static bool MongoAnalyzeForeignTable(Relation relation, - AcquireSampleRowsFunc *acquireSampleRowsFunc, - BlockNumber *totalPageCount); + AcquireSampleRowsFunc *acquireSampleRowsFunc, + BlockNumber *totalPageCount); static int MongoAcquireSampleRows(Relation relation, int errorLevel, - HeapTuple *sampleRows, int targetRowCount, - double *totalRowCount, double *totalDeadRowCount); + HeapTuple *sampleRows, int targetRowCount, + double *totalRowCount, double *totalDeadRowCount); +static void mongo_fdw_exit(int code, Datum arg); + +extern PGDLLEXPORT void _PG_init(void); + const char * EscapeJsonString(const char *string); void DumpJson(StringInfo buffer, const char *bsonData, bool isArray); @@ -86,13 +151,20 @@ static JsonSemAction nullSemAction = NULL, NULL, NULL, NULL, NULL }; - /* declarations for dynamic loading */ PG_MODULE_MAGIC; PG_FUNCTION_INFO_V1(mongo_fdw_handler); -PG_FUNCTION_INFO_V1(mongo_fdw_validator); +/* + * Library load-time initalization, sets on_proc_exit() callback for + * backend shutdown. + */ +void +_PG_init(void) +{ + on_proc_exit(&mongo_fdw_exit, PointerGetDatum(NULL)); +} /* * mongo_fdw_handler creates and returns a struct with pointers to foreign table @@ -102,106 +174,41 @@ Datum mongo_fdw_handler(PG_FUNCTION_ARGS) { FdwRoutine *fdwRoutine = makeNode(FdwRoutine); - fdwRoutine->GetForeignRelSize = MongoGetForeignRelSize; fdwRoutine->GetForeignPaths = MongoGetForeignPaths; fdwRoutine->GetForeignPlan = MongoGetForeignPlan; - fdwRoutine->ExplainForeignScan = MongoExplainForeignScan; fdwRoutine->BeginForeignScan = MongoBeginForeignScan; fdwRoutine->IterateForeignScan = MongoIterateForeignScan; fdwRoutine->ReScanForeignScan = MongoReScanForeignScan; fdwRoutine->EndForeignScan = MongoEndForeignScan; fdwRoutine->AnalyzeForeignTable = MongoAnalyzeForeignTable; - PG_RETURN_POINTER(fdwRoutine); -} - - -/* - * mongo_fdw_validator validates options given to one of the following commands: - * foreign data wrapper, server, user mapping, or foreign table. This function - * errors out if the given option name or its value is considered invalid. - */ -Datum -mongo_fdw_validator(PG_FUNCTION_ARGS) -{ - Datum optionArray = PG_GETARG_DATUM(0); - Oid optionContextId = PG_GETARG_OID(1); - List *optionList = untransformRelOptions(optionArray); - ListCell *optionCell = NULL; - - foreach(optionCell, optionList) - { - DefElem *optionDef = (DefElem *) lfirst(optionCell); - char *optionName = optionDef->defname; - bool optionValid = false; - - int32 optionIndex = 0; - for (optionIndex = 0; optionIndex < ValidOptionCount; optionIndex++) - { - const MongoValidOption *validOption = &(ValidOptionArray[optionIndex]); - - if ((optionContextId == validOption->optionContextId) && - (strncmp(optionName, validOption->optionName, NAMEDATALEN) == 0)) - { - optionValid = true; - break; - } - } - - /* if invalid option, display an informative error message */ - if (!optionValid) - { - StringInfo optionNamesString = OptionNamesString(optionContextId); + /* support for insert / update / delete */ + fdwRoutine->ExecForeignInsert = MongoExecForeignInsert; + fdwRoutine->BeginForeignModify = MongoBeginForeignModify; + fdwRoutine->PlanForeignModify = MongoPlanForeignModify; + fdwRoutine->AddForeignUpdateTargets = MongoAddForeignUpdateTargets; + fdwRoutine->ExecForeignUpdate = MongoExecForeignUpdate; + fdwRoutine->ExecForeignDelete = MongoExecForeignDelete; + fdwRoutine->EndForeignModify = MongoEndForeignModify; - ereport(ERROR, (errcode(ERRCODE_FDW_INVALID_OPTION_NAME), - errmsg("invalid option \"%s\"", optionName), - errhint("Valid options in this context are: %s", - optionNamesString->data))); - } + /* support for EXPLAIN */ + fdwRoutine->ExplainForeignScan = MongoExplainForeignScan; + fdwRoutine->ExplainForeignModify = MongoExplainForeignModify; - /* if port option is given, error out if its value isn't an integer */ - if (strncmp(optionName, OPTION_NAME_PORT, NAMEDATALEN) == 0) - { - char *optionValue = defGetString(optionDef); - int32 portNumber = pg_atoi(optionValue, sizeof(int32), 0); - (void) portNumber; - } - } + /* support for ANALYSE */ + fdwRoutine->AnalyzeForeignTable = MongoAnalyzeForeignTable; - PG_RETURN_VOID(); + PG_RETURN_POINTER(fdwRoutine); } - /* - * OptionNamesString finds all options that are valid for the current context, - * and concatenates these option names in a comma separated string. + * Exit callback function. */ -static StringInfo -OptionNamesString(Oid currentContextId) +static void +mongo_fdw_exit(int code, Datum arg) { - StringInfo optionNamesString = makeStringInfo(); - bool firstOptionPrinted = false; - - int32 optionIndex = 0; - for (optionIndex = 0; optionIndex < ValidOptionCount; optionIndex++) - { - const MongoValidOption *validOption = &(ValidOptionArray[optionIndex]); - - /* if option belongs to current context, append option name */ - if (currentContextId == validOption->optionContextId) - { - if (firstOptionPrinted) - { - appendStringInfoString(optionNamesString, ", "); - } - - appendStringInfoString(optionNamesString, validOption->optionName); - firstOptionPrinted = true; - } - } - - return optionNamesString; + mongo_cleanup_connection(); } @@ -243,22 +250,22 @@ MongoGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableI static void MongoGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId) { - double tupleFilterCost = baserel->baserestrictcost.per_tuple; - double inputRowCount = 0.0; - double documentSelectivity = 0.0; - double foreignTableSize = 0; - int32 documentWidth = 0; - BlockNumber pageCount = 0; - double totalDiskAccessCost = 0.0; - double cpuCostPerDoc = 0.0; - double cpuCostPerRow = 0.0; - double totalCpuCost = 0.0; - double connectionCost = 0.0; - double documentCount = 0.0; - List *opExpressionList = NIL; - Cost startupCost = 0.0; - Cost totalCost = 0.0; - Path *foreignPath = NULL; + double tupleFilterCost = baserel->baserestrictcost.per_tuple; + double inputRowCount = 0.0; + double documentSelectivity = 0.0; + double foreignTableSize = 0; + int32 documentWidth = 0; + BlockNumber pageCount = 0; + double totalDiskAccessCost = 0.0; + double cpuCostPerDoc = 0.0; + double cpuCostPerRow = 0.0; + double totalCpuCost = 0.0; + double connectionCost = 0.0; + double documentCount = 0.0; + List *opExpressionList = NIL; + Cost startupCost = 0.0; + Cost totalCost = 0.0; + Path *foreignPath = NULL; documentCount = ForeignTableDocumentCount(foreignTableId); if (documentCount > 0.0) @@ -309,7 +316,7 @@ MongoGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId) NULL, /* no outer rel either */ NIL); /* no fdw_private data */ - /* add foreign path as the only possible path */ + /* add foreign path as the only possible path */ add_path(baserel, foreignPath); } @@ -323,13 +330,12 @@ static ForeignScan * MongoGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId, ForeignPath *bestPath, List *targetList, List *restrictionClauses) { - Index scanRangeTableIndex = baserel->relid; - ForeignScan *foreignScan = NULL; - List *foreignPrivateList = NIL; - List *opExpressionList = NIL; - bson *queryDocument = NULL; - Const *queryBuffer = NULL; - List *columnList = NIL; + Index scanRangeTableIndex = baserel->relid; + ForeignScan *foreignScan = NULL; + List *foreignPrivateList = NIL; + List *opExpressionList = NIL; + BSON *queryDocument = NULL; + List *columnList = NIL; /* * We push down applicable restriction clauses to MongoDB, but for simplicity @@ -348,22 +354,22 @@ MongoGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId, */ opExpressionList = ApplicableOpExpressionList(baserel); queryDocument = QueryDocument(foreignTableId, opExpressionList); - queryBuffer = SerializeDocument(queryDocument); - - /* only clean up the query struct, but not its data */ - bson_dealloc(queryDocument); /* we don't need to serialize column list as lists are copiable */ columnList = ColumnList(baserel); /* construct foreign plan with query document and column list */ - foreignPrivateList = list_make2(queryBuffer, columnList); + foreignPrivateList = list_make2(columnList, opExpressionList); + + /* only clean up the query struct */ + BsonDestroy(queryDocument); /* create the foreign scan node */ foreignScan = make_foreignscan(targetList, restrictionClauses, scanRangeTableIndex, NIL, /* no expressions to evaluate */ foreignPrivateList); + return foreignScan; } @@ -374,21 +380,46 @@ MongoGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId, static void MongoExplainForeignScan(ForeignScanState *scanState, ExplainState *explainState) { - MongoFdwOptions *mongoFdwOptions = NULL; - StringInfo namespaceName = NULL; - Oid foreignTableId = InvalidOid; + MongoFdwOptions *options = NULL; + StringInfo namespaceName = NULL; + Oid foreignTableId = InvalidOid; foreignTableId = RelationGetRelid(scanState->ss.ss_currentRelation); - mongoFdwOptions = MongoGetOptions(foreignTableId); + options = mongo_get_options(foreignTableId); /* construct fully qualified collection name */ namespaceName = makeStringInfo(); - appendStringInfo(namespaceName, "%s.%s", mongoFdwOptions->databaseName, - mongoFdwOptions->collectionName); + appendStringInfo(namespaceName, "%s.%s", options->svr_database, + options->collectionName); + + mongo_free_options(options); ExplainPropertyText("Foreign Namespace", namespaceName->data, explainState); } +static void +MongoExplainForeignModify(ModifyTableState *mtstate, + ResultRelInfo *rinfo, + List *fdw_private, + int subplan_index, + ExplainState *es) +{ + MongoFdwOptions *options = NULL; + StringInfo namespaceName = NULL; + Oid foreignTableId = InvalidOid; + + foreignTableId = RelationGetRelid(rinfo->ri_RelationDesc); + options = mongo_get_options(foreignTableId); + + /* construct fully qualified collection name */ + namespaceName = makeStringInfo(); + appendStringInfo(namespaceName, "%s.%s", options->svr_database, + options->collectionName); + + mongo_free_options(options); + ExplainPropertyText("Foreign Namespace", namespaceName->data, es); +} + /* * MongoBeginForeignScan connects to the MongoDB server, and opens a cursor that @@ -399,79 +430,75 @@ MongoExplainForeignScan(ForeignScanState *scanState, ExplainState *explainState) static void MongoBeginForeignScan(ForeignScanState *scanState, int executorFlags) { - mongo *mongoConnection = NULL; - mongo_cursor *mongoCursor = NULL; - int32 connectStatus = MONGO_ERROR; - Oid foreignTableId = InvalidOid; - List *columnList = NIL; - HTAB *columnMappingHash = NULL; - char *addressName = NULL; - int32 portNumber = 0; - int32 errorCode = 0; - StringInfo namespaceName = NULL; - ForeignScan *foreignScan = NULL; - List *foreignPrivateList = NIL; - Const *queryBuffer = NULL; - bson *queryDocument = NULL; - MongoFdwOptions *mongoFdwOptions = NULL; - MongoFdwExecState *executionState = NULL; + MONGO_CONN *mongoConnection = NULL; + MONGO_CURSOR *mongoCursor = NULL; + Oid foreignTableId = InvalidOid; + List *columnList = NIL; + HTAB *columnMappingHash = NULL; + ForeignScan *foreignScan = NULL; + List *foreignPrivateList = NIL; + BSON *queryDocument = NULL; + MongoFdwOptions *options = NULL; + MongoFdwModifyState *fmstate = NULL; + List *opExpressionList = NIL; + RangeTblEntry *rte; + EState *estate = scanState->ss.ps.state; + ForeignScan *fsplan = (ForeignScan *) scanState->ss.ps.plan; + Oid userid; + ForeignServer *server; + UserMapping *user; + ForeignTable *table; + /* if Explain with no Analyze, do nothing */ if (executorFlags & EXEC_FLAG_EXPLAIN_ONLY) - { return; - } foreignTableId = RelationGetRelid(scanState->ss.ss_currentRelation); - mongoFdwOptions = MongoGetOptions(foreignTableId); - - /* resolve hostname and port number; and connect to mongo server */ - addressName = mongoFdwOptions->addressName; - portNumber = mongoFdwOptions->portNumber; - - mongoConnection = mongo_alloc(); - mongo_init(mongoConnection); + options = mongo_get_options(foreignTableId); - connectStatus = mongo_connect(mongoConnection, addressName, portNumber); - if (connectStatus != MONGO_OK) - { - errorCode = (int32) mongoConnection->err; + fmstate = (MongoFdwModifyState *) palloc0(sizeof(MongoFdwModifyState)); + /* + * Identify which user to do the remote access as. This should match what + * ExecCheckRTEPerms() does. + */ + rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table); + userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); - mongo_destroy(mongoConnection); - mongo_dealloc(mongoConnection); + /* Get info about foreign table. */ + fmstate->rel = scanState->ss.ss_currentRelation; + table = GetForeignTable(RelationGetRelid(fmstate->rel)); + server = GetForeignServer(table->serverid); + user = GetUserMapping(userid, server->serverid); - ereport(ERROR, (errmsg("could not connect to %s:%d", addressName, portNumber), - errhint("Mongo driver connection error: %d", errorCode))); - } + /* + * Get connection to the foreign server. Connection manager will + * establish new connection if necessary. + */ + mongoConnection = mongo_get_connection(server, user, options); - /* deserialize query document; and create column info hash */ foreignScan = (ForeignScan *) scanState->ss.ps.plan; foreignPrivateList = foreignScan->fdw_private; Assert(list_length(foreignPrivateList) == 2); - queryBuffer = (Const *) linitial(foreignPrivateList); - queryDocument = DeserializeDocument(queryBuffer); + columnList = list_nth(foreignPrivateList, 0); + opExpressionList = list_nth(foreignPrivateList, 1); - columnList = (List *) lsecond(foreignPrivateList); - columnMappingHash = ColumnMappingHash(foreignTableId, columnList); + queryDocument = QueryDocument(foreignTableId, opExpressionList); - namespaceName = makeStringInfo(); - appendStringInfo(namespaceName, "%s.%s", mongoFdwOptions->databaseName, - mongoFdwOptions->collectionName); + columnMappingHash = ColumnMappingHash(foreignTableId, columnList); /* create cursor for collection name and set query */ - mongoCursor = mongo_cursor_alloc(); - mongo_cursor_init(mongoCursor, mongoConnection, namespaceName->data); - mongo_cursor_set_query(mongoCursor, queryDocument); + mongoCursor = MongoCursorCreate(mongoConnection, options->svr_database, options->collectionName, queryDocument); /* create and set foreign execution state */ - executionState = (MongoFdwExecState *) palloc0(sizeof(MongoFdwExecState)); - executionState->columnMappingHash = columnMappingHash; - executionState->mongoConnection = mongoConnection; - executionState->mongoCursor = mongoCursor; - executionState->queryDocument = queryDocument; + fmstate->columnMappingHash = columnMappingHash; + fmstate->mongoConnection = mongoConnection; + fmstate->mongoCursor = mongoCursor; + fmstate->queryDocument = queryDocument; + fmstate->options = options; - scanState->fdw_state = (void *) executionState; + scanState->fdw_state = (void *) fmstate; } @@ -483,16 +510,14 @@ MongoBeginForeignScan(ForeignScanState *scanState, int executorFlags) static TupleTableSlot * MongoIterateForeignScan(ForeignScanState *scanState) { - MongoFdwExecState *executionState = (MongoFdwExecState *) scanState->fdw_state; - TupleTableSlot *tupleSlot = scanState->ss.ss_ScanTupleSlot; - mongo_cursor *mongoCursor = executionState->mongoCursor; - HTAB *columnMappingHash = executionState->columnMappingHash; - int32 cursorStatus = MONGO_ERROR; - - TupleDesc tupleDescriptor = tupleSlot->tts_tupleDescriptor; - Datum *columnValues = tupleSlot->tts_values; - bool *columnNulls = tupleSlot->tts_isnull; - int32 columnCount = tupleDescriptor->natts; + MongoFdwModifyState *fmstate = (MongoFdwModifyState *) scanState->fdw_state; + TupleTableSlot *tupleSlot = scanState->ss.ss_ScanTupleSlot; + MONGO_CURSOR *mongoCursor = fmstate->mongoCursor; + HTAB *columnMappingHash = fmstate->columnMappingHash; + TupleDesc tupleDescriptor = tupleSlot->tts_tupleDescriptor; + Datum *columnValues = tupleSlot->tts_values; + bool *columnNulls = tupleSlot->tts_isnull; + int32 columnCount = tupleDescriptor->natts; /* * We execute the protocol to load a virtual tuple into a slot. We first @@ -506,10 +531,9 @@ MongoIterateForeignScan(ForeignScanState *scanState) memset(columnValues, 0, columnCount * sizeof(Datum)); memset(columnNulls, true, columnCount * sizeof(bool)); - cursorStatus = mongo_cursor_next(mongoCursor); - if (cursorStatus == MONGO_OK) + if (MongoCursorNext(mongoCursor, NULL)) { - const bson *bsonDocument = mongo_cursor_bson(mongoCursor); + const BSON *bsonDocument = MongoCursorBson(mongoCursor); const char *bsonDocumentKey = NULL; /* top level document */ FillTupleSlot(bsonDocument, bsonDocumentKey, @@ -519,19 +543,23 @@ MongoIterateForeignScan(ForeignScanState *scanState) } else { - /* - * The following is a courtesy check. In practice when Mongo shuts down, - * mongo_cursor_next() could possibly crash. This function first frees - * cursor->reply, and then references reply in mongo_cursor_destroy(). - */ + #ifdef META_DRIVER + bson_error_t error; + if (mongoc_cursor_error (mongoCursor, &error)) + { + MongoFreeScanState(fmstate); + ereport(ERROR, (errmsg("could not iterate over mongo collection"), + errhint("Mongo driver error: %s", error.message))); + } + #else mongo_cursor_error_t errorCode = mongoCursor->err; if (errorCode != MONGO_CURSOR_EXHAUSTED) { - MongoFreeScanState(executionState); - + MongoFreeScanState(fmstate); ereport(ERROR, (errmsg("could not iterate over mongo collection"), - errhint("Mongo driver cursor error code: %d", errorCode))); + errhint("Mongo driver cursor error code: %d", errorCode))); } + #endif } return tupleSlot; @@ -545,12 +573,17 @@ MongoIterateForeignScan(ForeignScanState *scanState) static void MongoEndForeignScan(ForeignScanState *scanState) { - MongoFdwExecState *executionState = (MongoFdwExecState *) scanState->fdw_state; + MongoFdwModifyState *fmstate = (MongoFdwModifyState *) scanState->fdw_state; /* if we executed a query, reclaim mongo related resources */ - if (executionState != NULL) + if (fmstate != NULL) { - MongoFreeScanState(executionState); + if (fmstate->options) + { + mongo_free_options(fmstate->options); + fmstate->options = NULL; + } + MongoFreeScanState(fmstate); } } @@ -563,208 +596,495 @@ MongoEndForeignScan(ForeignScanState *scanState) static void MongoReScanForeignScan(ForeignScanState *scanState) { - MongoFdwExecState *executionState = (MongoFdwExecState *) scanState->fdw_state; - mongo *mongoConnection = executionState->mongoConnection; - MongoFdwOptions *mongoFdwOptions = NULL; - mongo_cursor *mongoCursor = NULL; - StringInfo namespaceName = NULL; - Oid foreignTableId = InvalidOid; + MongoFdwModifyState *fmstate = (MongoFdwModifyState *) scanState->fdw_state; + MONGO_CONN *mongoConnection = fmstate->mongoConnection; + MongoFdwOptions *options = NULL; + Oid foreignTableId = InvalidOid; /* close down the old cursor */ - mongo_cursor_destroy(executionState->mongoCursor); - mongo_cursor_dealloc(executionState->mongoCursor); + MongoCursorDestroy(fmstate->mongoCursor); /* reconstruct full collection name */ foreignTableId = RelationGetRelid(scanState->ss.ss_currentRelation); - mongoFdwOptions = MongoGetOptions(foreignTableId); - - namespaceName = makeStringInfo(); - appendStringInfo(namespaceName, "%s.%s", mongoFdwOptions->databaseName, - mongoFdwOptions->collectionName); + options = mongo_get_options(foreignTableId); /* reconstruct cursor for collection name and set query */ - mongoCursor = mongo_cursor_alloc(); - mongo_cursor_init(mongoCursor, mongoConnection, namespaceName->data); - mongo_cursor_set_query(mongoCursor, executionState->queryDocument); + fmstate->mongoCursor = MongoCursorCreate(mongoConnection, + fmstate->options->svr_database, + fmstate->options->collectionName, + fmstate->queryDocument); + mongo_free_options(options); +} + +static List * +MongoPlanForeignModify(PlannerInfo *root, + ModifyTable *plan, + Index resultRelation, + int subplan_index) +{ + CmdType operation = plan->operation; + RangeTblEntry *rte = planner_rt_fetch(resultRelation, root); + Relation rel; + List *targetAttrs = NIL; + + /* + * Core code already has some lock on each rel being planned, so we can + * use NoLock here. + */ + rel = heap_open(rte->relid, NoLock); + + if (operation == CMD_INSERT) + { + TupleDesc tupdesc = RelationGetDescr(rel); + int attnum; + + for (attnum = 1; attnum <= tupdesc->natts; attnum++) + { + Form_pg_attribute attr = tupdesc->attrs[attnum - 1]; + + if (!attr->attisdropped) + targetAttrs = lappend_int(targetAttrs, attnum); + } + } + else if (operation == CMD_UPDATE) + { + Bitmapset *tmpset = bms_copy(rte->modifiedCols); + AttrNumber col; + + while ((col = bms_first_member(tmpset)) >= 0) + { + col += FirstLowInvalidHeapAttributeNumber; + if (col <= InvalidAttrNumber) /* shouldn't happen */ + elog(ERROR, "system-column update is not supported"); + /* + * We also disallow updates to the first column which + * happens to be the row identifier in MongoDb (_id) + */ + if (col == 1) /* shouldn't happen */ + elog(ERROR, "row identifier column update is not supported"); - executionState->mongoCursor = mongoCursor; + targetAttrs = lappend_int(targetAttrs, col); + } + /* We also want the rowid column to be available for the update */ + targetAttrs = lcons_int(1, targetAttrs); + } + else + { + targetAttrs = lcons_int(1, targetAttrs); + } + /* + * RETURNING list not supported + * #truongsinh: allow RETURNING for now + if (plan->returningLists) + elog(ERROR, "RETURNING is not supported by this FDW"); + */ + + heap_close(rel, NoLock); + + return list_make1(targetAttrs); } /* - * SerializeDocument serializes the document's data to a constant, as advised in - * foreign/fdwapi.h. Note that this function shallow-copies the document's data; - * and the caller should therefore not free it. + * Begin an insert/update/delete operation on a foreign table */ -static Const * -SerializeDocument(bson *document) +static void +MongoBeginForeignModify(ModifyTableState *mtstate, + ResultRelInfo *resultRelInfo, + List *fdw_private, + int subplan_index, + int eflags) { - Const *serializedDocument = NULL; - Datum documentDatum = 0; + MongoFdwModifyState *fmstate = NULL; + Relation rel = resultRelInfo->ri_RelationDesc; + AttrNumber n_params = 0; + Oid typefnoid = InvalidOid; + bool isvarlena = false; + ListCell *lc = NULL; + Oid foreignTableId = InvalidOid; /* - * We access document data and wrap a datum around it. Note that even when - * we have an empty document, the document size can't be zero according to - * bson apis. + * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState + * stays NULL. */ - const char *documentData = bson_data(document); - int32 documentSize = bson_buffer_size(document); - Assert(documentSize != 0); + if (eflags & EXEC_FLAG_EXPLAIN_ONLY) + return; + + foreignTableId = RelationGetRelid(rel); + + /* Begin constructing MongoFdwModifyState. */ + fmstate = (MongoFdwModifyState *) palloc0(sizeof(MongoFdwModifyState)); + + fmstate->rel = rel; + fmstate->options = mongo_get_options(foreignTableId); + + fmstate->target_attrs = (List *) list_nth(fdw_private, 0); + + n_params = list_length(fmstate->target_attrs) + 1; + fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params); + fmstate->p_nums = 0; + + /* Set up for remaining transmittable parameters */ + foreach(lc, fmstate->target_attrs) + { + int attnum = lfirst_int(lc); + Form_pg_attribute attr = RelationGetDescr(rel)->attrs[attnum - 1]; - documentDatum = CStringGetDatum(documentData); - serializedDocument = makeConst(CSTRINGOID, -1, InvalidOid, documentSize, - documentDatum, false, false); + Assert(!attr->attisdropped); - return serializedDocument; + getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena); + fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); + fmstate->p_nums++; + } + Assert(fmstate->p_nums <= n_params); + + resultRelInfo->ri_FdwState = fmstate; } /* - * DeserializeDocument deserializes the constant to a bson document. For this, - * the function creates a document, and explicitly sets the document's data. + * Insert one row into a foreign table. */ -static bson * -DeserializeDocument(Const *constant) +static TupleTableSlot * +MongoExecForeignInsert(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + TupleTableSlot *planSlot) { - bson *document = NULL; - Datum documentDatum = constant->constvalue; - char *documentData = DatumGetCString(documentDatum); + MongoFdwOptions *options = NULL; + MONGO_CONN *mongoConnection = NULL; + Oid foreignTableId = InvalidOid; + BSON *b = NULL; + Oid typoid; + Datum value; + bool isnull = false; + Oid userid; + ForeignServer *server; + UserMapping *user; + ForeignTable *table; + + MongoFdwModifyState *fmstate = (MongoFdwModifyState *) resultRelInfo->ri_FdwState; + + foreignTableId = RelationGetRelid(resultRelInfo->ri_RelationDesc); + + userid = GetUserId(); + + /* Get info about foreign table. */ + table = GetForeignTable(RelationGetRelid(fmstate->rel)); + server = GetForeignServer(table->serverid); + user = GetUserMapping(userid, server->serverid); + + /* + * Get connection to the foreign server. Connection manager will + * establish new connection if necessary. + */ + options = fmstate->options; + mongoConnection = mongo_get_connection(server, user, options); + + b = BsonCreate(); + + typoid = get_atttype(foreignTableId, 1); + + /* get following parameters from slot */ + if (slot != NULL && fmstate->target_attrs != NIL) + { + ListCell *lc; + + foreach(lc, fmstate->target_attrs) + { + int attnum = lfirst_int(lc); + value = slot_getattr(slot, attnum, &isnull); + + /* first column of MongoDB's foreign table must be _id */ + if (strcmp(slot->tts_tupleDescriptor->attrs[0]->attname.data, "_id") != 0) + elog(ERROR, "first column of MongoDB's foreign table must be \"_id\""); + + if (attnum == 1 && isnull) + { + /* + * Ignore the value of first column which is row identifier in MongoDb (_id) + * and let MongoDB to insert the unique value for that column + * only if it is null + */ + } + else + { + AppenMongoValue(b, slot->tts_tupleDescriptor->attrs[attnum - 1]->attname.data, value, + isnull, slot->tts_tupleDescriptor->attrs[attnum -1]->atttypid); + } + } + } + BsonFinish(b); - Assert(constant->constlen > 0); - Assert(constant->constisnull == false); + /* Now we are ready to insert tuple / document into MongoDB */ + MongoInsert(mongoConnection, options->svr_database, options->collectionName, b); - document = bson_alloc(); - bson_init_size(document, 0); - bson_init_finished_data_with_copy(document, documentData); + BsonDestroy(b); - return document; + return slot; } /* - * ForeignTableDocumentCount connects to the MongoDB server, and queries it for - * the number of documents in the foreign collection. On success, the function - * returns the document count. On failure, the function returns -1.0. + * Add column(s) needed for update/delete on a foreign table, we are using + * first column as row identification column, so we are adding that into target + * list. */ -static double -ForeignTableDocumentCount(Oid foreignTableId) +static void +MongoAddForeignUpdateTargets(Query *parsetree, + RangeTblEntry *target_rte, + Relation target_relation) { - MongoFdwOptions *options = NULL; - mongo *mongoConnection = NULL; - const bson *emptyQuery = NULL; - int32 status = MONGO_ERROR; - double documentCount = 0.0; + Var *var = NULL; + const char *attrname = NULL; + TargetEntry *tle = NULL; + + /* + * What we need is the rowid which is the first column + */ + Form_pg_attribute attr = + RelationGetDescr(target_relation)->attrs[0]; + + /* Make a Var representing the desired value */ + var = makeVar(parsetree->resultRelation, + 1, + attr->atttypid, + attr->atttypmod, + InvalidOid, + 0); + + /* Wrap it in a TLE with the right name ... */ + attrname = NameStr(attr->attname); + + tle = makeTargetEntry((Expr *) var, + list_length(parsetree->targetList) + 1, + pstrdup(attrname), + true); + + /* ... and add it to the query's targetlist */ + parsetree->targetList = lappend(parsetree->targetList, tle); +} + + +static TupleTableSlot * +MongoExecForeignUpdate(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + TupleTableSlot *planSlot) +{ + MongoFdwOptions *options = NULL; + MONGO_CONN *mongoConnection = NULL; + Datum datum = 0; + bool isNull = false; + Oid foreignTableId = InvalidOid; + char *columnName = NULL; + Oid typoid = InvalidOid; + BSON *b = NULL; + BSON *op = NULL; + BSON set; + Oid userid = GetUserId(); + ForeignServer *server; + UserMapping *user; + ForeignTable *table; + + + MongoFdwModifyState *fmstate = (MongoFdwModifyState *) resultRelInfo->ri_FdwState; + + foreignTableId = RelationGetRelid(resultRelInfo->ri_RelationDesc); /* resolve foreign table options; and connect to mongo server */ - options = MongoGetOptions(foreignTableId); + options = fmstate->options; + + /* Get info about foreign table. */ + table = GetForeignTable(foreignTableId); + server = GetForeignServer(table->serverid); + user = GetUserMapping(userid, server->serverid); + + /* + * Get connection to the foreign server. Connection manager will + * establish new connection if necessary. + */ + mongoConnection = mongo_get_connection(server, user, options); + + /* Get the id that was passed up as a resjunk column */ + datum = ExecGetJunkAttribute(planSlot, 1, &isNull); - mongoConnection = mongo_alloc(); - mongo_init(mongoConnection); + columnName = get_relid_attribute_name(foreignTableId, 1); - status = mongo_connect(mongoConnection, options->addressName, options->portNumber); - if (status == MONGO_OK) + typoid = get_atttype(foreignTableId, 1); + + b = BsonCreate(); + BsonAppendStartObject(b, "$set", &set); + + /* get following parameters from slot */ + if (slot != NULL && fmstate->target_attrs != NIL) { - documentCount = mongo_count(mongoConnection, options->databaseName, - options->collectionName, emptyQuery); + ListCell *lc; + + foreach(lc, fmstate->target_attrs) + { + int attnum = lfirst_int(lc); + Datum value; + bool isnull; + + if (strcmp("_id", slot->tts_tupleDescriptor->attrs[attnum - 1]->attname.data) == 0) + continue; + + value = slot_getattr(slot, attnum, &isnull); +#ifdef META_DRIVER + AppenMongoValue(&set, slot->tts_tupleDescriptor->attrs[attnum - 1]->attname.data, value, + isnull ? true : false, slot->tts_tupleDescriptor->attrs[attnum - 1]->atttypid); +#else + AppenMongoValue(b, slot->tts_tupleDescriptor->attrs[attnum - 1]->attname.data, value, + isnull ? true : false, slot->tts_tupleDescriptor->attrs[attnum - 1]->atttypid); +#endif + } } - else + BsonAppendFinishObject(b, &set); + BsonFinish(b); + + op = BsonCreate(); + if (!AppenMongoValue(op, columnName, datum, false, typoid)) { - documentCount = -1.0; + BsonDestroy(b); + return NULL; } + BsonFinish(op); - mongo_destroy(mongoConnection); - mongo_dealloc(mongoConnection); + /* We are ready to update the row into MongoDB */ + MongoUpdate(mongoConnection, options->svr_database, options->collectionName, op, b); - return documentCount; -} + BsonDestroy(op); + BsonDestroy(b); + /* Return NULL if nothing was updated on the remote end */ + return slot; +} /* - * MongoGetOptions returns the option values to be used when connecting to and - * querying MongoDB. To resolve these values, the function checks the foreign - * table's options, and if not present, falls back to default values. + * MongoExecForeignDelete + * Delete one row from a foreign table */ -static MongoFdwOptions * -MongoGetOptions(Oid foreignTableId) +static TupleTableSlot * +MongoExecForeignDelete(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + TupleTableSlot *planSlot) { - MongoFdwOptions *mongoFdwOptions = NULL; - char *addressName = NULL; - char *portName = NULL; - int32 portNumber = 0; - char *databaseName = NULL; - char *collectionName = NULL; - - addressName = MongoGetOptionValue(foreignTableId, OPTION_NAME_ADDRESS); - if (addressName == NULL) - { - addressName = pstrdup(DEFAULT_IP_ADDRESS); - } + MongoFdwOptions *options = NULL; + MONGO_CONN *mongoConnection = NULL; + Datum datum = 0; + bool isNull = false; + Oid foreignTableId = InvalidOid; + char *columnName = NULL; + Oid typoid = InvalidOid; + BSON *b = NULL; + Oid userid = GetUserId(); + ForeignServer *server; + UserMapping *user; + ForeignTable *table; - portName = MongoGetOptionValue(foreignTableId, OPTION_NAME_PORT); - if (portName == NULL) - { - portNumber = DEFAULT_PORT_NUMBER; - } - else - { - portNumber = pg_atoi(portName, sizeof(int32), 0); - } - databaseName = MongoGetOptionValue(foreignTableId, OPTION_NAME_DATABASE); - if (databaseName == NULL) - { - databaseName = pstrdup(DEFAULT_DATABASE_NAME); - } + MongoFdwModifyState *fmstate = (MongoFdwModifyState *) resultRelInfo->ri_FdwState; + + foreignTableId = RelationGetRelid(resultRelInfo->ri_RelationDesc); + + /* resolve foreign table options; and connect to mongo server */ + options = fmstate->options; - collectionName = MongoGetOptionValue(foreignTableId, OPTION_NAME_COLLECTION); - if (collectionName == NULL) + /* Get info about foreign table. */ + table = GetForeignTable(foreignTableId); + server = GetForeignServer(table->serverid); + user = GetUserMapping(userid, server->serverid); + + /* + * Get connection to the foreign server. Connection manager will + * establish new connection if necessary. + */ + mongoConnection = mongo_get_connection(server, user, options); + + /* Get the id that was passed up as a resjunk column */ + datum = ExecGetJunkAttribute(planSlot, 1, &isNull); + + columnName = get_relid_attribute_name(foreignTableId, 1); + + typoid = get_atttype(foreignTableId, 1); + + b = BsonCreate(); + if (!AppenMongoValue(b,columnName, datum, false, typoid)) { - collectionName = get_rel_name(foreignTableId); + BsonDestroy(b); + return NULL; } + BsonFinish(b); - mongoFdwOptions = (MongoFdwOptions *) palloc0(sizeof(MongoFdwOptions)); - mongoFdwOptions->addressName = addressName; - mongoFdwOptions->portNumber = portNumber; - mongoFdwOptions->databaseName = databaseName; - mongoFdwOptions->collectionName = collectionName; + /* Now we are ready to delete a single document from MongoDB */ + MongoDelete(mongoConnection, options->svr_database, options->collectionName, b); - return mongoFdwOptions; + BsonDestroy(b); + + /* Return NULL if nothing was updated on the remote end */ + return slot; } +/* + * MongoEndForeignModify + * Finish an insert/update/delete operation on a foreign table + */ +static void +MongoEndForeignModify(EState *estate, ResultRelInfo *resultRelInfo) +{ + MongoFdwModifyState *fmstate = (MongoFdwModifyState *) resultRelInfo->ri_FdwState; + if (fmstate) + { + if (fmstate->options) + { + mongo_free_options(fmstate->options); + fmstate->options = NULL; + } + MongoFreeScanState(fmstate); + pfree(fmstate); + } +} /* - * MongoGetOptionValue walks over foreign table and foreign server options, and - * looks for the option with the given name. If found, the function returns the - * option's value. + * ForeignTableDocumentCount connects to the MongoDB server, and queries it for + * the number of documents in the foreign collection. On success, the function + * returns the document count. On failure, the function returns -1.0. */ -static char * -MongoGetOptionValue(Oid foreignTableId, const char *optionName) +static double +ForeignTableDocumentCount(Oid foreignTableId) { - ForeignTable *foreignTable = NULL; - ForeignServer *foreignServer = NULL; - List *optionList = NIL; - ListCell *optionCell = NULL; - char *optionValue = NULL; + MongoFdwOptions *options = NULL; + MONGO_CONN *mongoConnection = NULL; + const BSON *emptyQuery = NULL; + double documentCount = 0.0; + Oid userid = GetUserId(); + ForeignServer *server; + UserMapping *user; + ForeignTable *table; - foreignTable = GetForeignTable(foreignTableId); - foreignServer = GetForeignServer(foreignTable->serverid); - optionList = list_concat(optionList, foreignTable->options); - optionList = list_concat(optionList, foreignServer->options); + /* Get info about foreign table. */ + table = GetForeignTable(foreignTableId); + server = GetForeignServer(table->serverid); + user = GetUserMapping(userid, server->serverid); - foreach(optionCell, optionList) - { - DefElem *optionDef = (DefElem *) lfirst(optionCell); - char *optionDefName = optionDef->defname; + /* resolve foreign table options; and connect to mongo server */ + options = mongo_get_options(foreignTableId); - if (strncmp(optionDefName, optionName, NAMEDATALEN) == 0) - { - optionValue = defGetString(optionDef); - break; - } - } + /* + * Get connection to the foreign server. Connection manager will + * establish new connection if necessary. + */ + mongoConnection = mongo_get_connection(server, user, options); + + + documentCount = MongoAggregateCount(mongoConnection, options->svr_database, options->collectionName, emptyQuery); - return optionValue; + mongo_free_options(options); + + return documentCount; } @@ -776,9 +1096,9 @@ MongoGetOptionValue(Oid foreignTableId, const char *optionName) static HTAB * ColumnMappingHash(Oid foreignTableId, List *columnList) { - ListCell *columnCell = NULL; - const long hashTableSize = 2048; - HTAB *columnMappingHash = NULL; + ListCell *columnCell = NULL; + const long hashTableSize = 2048; + HTAB *columnMappingHash = NULL; /* create hash table */ HASHCTL hashInfo; @@ -828,16 +1148,16 @@ ColumnMappingHash(Oid foreignTableId, List *columnList) * passed as NULL. */ static void -FillTupleSlot(const bson *bsonDocument, const char *bsonDocumentKey, +FillTupleSlot(const BSON *bsonDocument, const char *bsonDocumentKey, HTAB *columnMappingHash, Datum *columnValues, bool *columnNulls) { - bson_iterator bsonIterator = { NULL, 0 }; - bson_iterator_init(&bsonIterator, bsonDocument); + BSON_ITERATOR bsonIterator = { NULL, 0 }; + BsonIterInit(&bsonIterator, (BSON*)bsonDocument); - while (bson_iterator_next(&bsonIterator)) + while (BsonIterNext(&bsonIterator)) { - const char *bsonKey = bson_iterator_key(&bsonIterator); - bson_type bsonType = bson_iterator_type(&bsonIterator); + const char *bsonKey = BsonIterKey(&bsonIterator); + BSON_TYPE bsonType = BsonIterType(&bsonIterator); ColumnMapping *columnMapping = NULL; Oid columnTypeId = InvalidOid; @@ -872,23 +1192,24 @@ FillTupleSlot(const bson *bsonDocument, const char *bsonDocumentKey, } /* recurse into nested objects */ - if (bsonType == BSON_OBJECT && columnTypeId != JSONOID) + if (bsonType == BSON_TYPE_DOCUMENT && columnTypeId != JSONOID) { - bson subObject; - bson_iterator_subobject_init(&bsonIterator, &subObject, false); + BSON subObject; + BsonIterSubObject(&bsonIterator, &subObject); FillTupleSlot(&subObject, bsonFullKey, columnMappingHash, columnValues, columnNulls); continue; } - /* if no corresponding column or null bson value, continue */ - if (columnMapping == NULL || bsonType == BSON_NULL) + /* if no corresponding column or null BSON value, continue */ + if (columnMapping == NULL || bsonType == BSON_TYPE_NULL) { continue; } /* check if columns have compatible types */ - if (OidIsValid(columnArrayTypeId) && bsonType == BSON_ARRAY) + + if (OidIsValid(columnArrayTypeId) && bsonType == BSON_TYPE_ARRAY) { compatibleTypes = true; } @@ -931,7 +1252,7 @@ FillTupleSlot(const bson *bsonDocument, const char *bsonDocumentKey, * internal conversions applied by BSON APIs. */ static bool -ColumnTypesCompatible(bson_type bsonType, Oid columnTypeId) +ColumnTypesCompatible(BSON_TYPE bsonType, Oid columnTypeId) { bool compatibleTypes = false; @@ -942,8 +1263,8 @@ ColumnTypesCompatible(bson_type bsonType, Oid columnTypeId) case INT8OID: case FLOAT4OID: case FLOAT8OID: case NUMERICOID: { - if (bsonType == BSON_INT || bsonType == BSON_LONG || - bsonType == BSON_DOUBLE) + if (bsonType == BSON_TYPE_INT32 || bsonType == BSON_TYPE_INT64 || + bsonType == BSON_TYPE_DOUBLE) { compatibleTypes = true; } @@ -951,8 +1272,8 @@ ColumnTypesCompatible(bson_type bsonType, Oid columnTypeId) } case BOOLOID: { - if (bsonType == BSON_INT || bsonType == BSON_LONG || - bsonType == BSON_DOUBLE || bsonType == BSON_BOOL) + if (bsonType == BSON_TYPE_INT32 || bsonType == BSON_TYPE_INT64 || + bsonType == BSON_TYPE_DOUBLE || bsonType == BSON_TYPE_BOOL) { compatibleTypes = true; } @@ -962,7 +1283,15 @@ ColumnTypesCompatible(bson_type bsonType, Oid columnTypeId) case VARCHAROID: case TEXTOID: { - if (bsonType == BSON_STRING) + if (bsonType == BSON_TYPE_UTF8) + { + compatibleTypes = true; + } + break; + } + case BYTEAOID: + { + if (bsonType == BSON_TYPE_BINDATA) { compatibleTypes = true; } @@ -975,7 +1304,7 @@ ColumnTypesCompatible(bson_type bsonType, Oid columnTypeId) * object identifier. We can safely overload this 64-byte data type * since it's reserved for internal use in PostgreSQL. */ - if (bsonType == BSON_OID) + if (bsonType == BSON_TYPE_OID) { compatibleTypes = true; } @@ -985,12 +1314,18 @@ ColumnTypesCompatible(bson_type bsonType, Oid columnTypeId) case TIMESTAMPOID: case TIMESTAMPTZOID: { - if (bsonType == BSON_DATE) + if (bsonType == BSON_TYPE_DATE_TIME) { compatibleTypes = true; } break; } + case NUMERICARRAY_OID: + { + if (bsonType == BSON_TYPE_ARRAY) + compatibleTypes = true; + break; + } case JSONOID: { if (bsonType == BSON_OBJECT || bsonType == BSON_ARRAY) @@ -1007,7 +1342,7 @@ ColumnTypesCompatible(bson_type bsonType, Oid columnTypeId) * such as money or inet, do not have equivalents in MongoDB. */ ereport(ERROR, (errcode(ERRCODE_FDW_INVALID_DATA_TYPE), - errmsg("cannot convert bson type to column type"), + errmsg("cannot convert BSON type to column type"), errhint("Column type: %u", (uint32) columnTypeId))); break; } @@ -1024,29 +1359,28 @@ ColumnTypesCompatible(bson_type bsonType, Oid columnTypeId) * datum from element datums, and returns the array datum. */ static Datum -ColumnValueArray(bson_iterator *bsonIterator, Oid valueTypeId) +ColumnValueArray(BSON_ITERATOR *bsonIterator, Oid valueTypeId) { - Datum *columnValueArray = palloc0(INITIAL_ARRAY_CAPACITY * sizeof(Datum)); - uint32 arrayCapacity = INITIAL_ARRAY_CAPACITY; - uint32 arrayGrowthFactor = 2; - uint32 arrayIndex = 0; - - ArrayType *columnValueObject = NULL; - Datum columnValueDatum = 0; - bool typeByValue = false; - char typeAlignment = 0; - int16 typeLength = 0; - - bson_iterator bsonSubIterator = { NULL, 0 }; - bson_iterator_subiterator(bsonIterator, &bsonSubIterator); - - while (bson_iterator_next(&bsonSubIterator)) + Datum *columnValueArray = palloc0(INITIAL_ARRAY_CAPACITY * sizeof(Datum)); + uint32 arrayCapacity = INITIAL_ARRAY_CAPACITY; + uint32 arrayGrowthFactor = 2; + uint32 arrayIndex = 0; + + ArrayType *columnValueObject = NULL; + Datum columnValueDatum = 0; + bool typeByValue = false; + char typeAlignment = 0; + int16 typeLength = 0; + + BSON_ITERATOR bsonSubIterator = { NULL, 0 }; + BsonIterSubIter(bsonIterator, &bsonSubIterator); + while (BsonIterNext(&bsonSubIterator)) { - bson_type bsonType = bson_iterator_type(&bsonSubIterator); + BSON_TYPE bsonType = BsonIterType(&bsonSubIterator); bool compatibleTypes = false; compatibleTypes = ColumnTypesCompatible(bsonType, valueTypeId); - if (bsonType == BSON_NULL || !compatibleTypes) + if (bsonType == BSON_TYPE_NULL || !compatibleTypes) { continue; } @@ -1077,7 +1411,7 @@ ColumnValueArray(bson_iterator *bsonIterator, Oid valueTypeId) * datum. The function then returns this datum. */ static Datum -ColumnValue(bson_iterator *bsonIterator, Oid columnTypeId, int32 columnTypeMod) +ColumnValue(BSON_ITERATOR *bsonIterator, Oid columnTypeId, int32 columnTypeMod) { Datum columnValue = 0; @@ -1085,37 +1419,37 @@ ColumnValue(bson_iterator *bsonIterator, Oid columnTypeId, int32 columnTypeMod) { case INT2OID: { - int16 value = (int16) bson_iterator_int(bsonIterator); + int16 value = (int16) BsonIterInt32(bsonIterator); columnValue = Int16GetDatum(value); break; } case INT4OID: { - int32 value = bson_iterator_int(bsonIterator); + int32 value = BsonIterInt32(bsonIterator); columnValue = Int32GetDatum(value); break; } case INT8OID: { - int64 value = bson_iterator_long(bsonIterator); + int64 value = BsonIterInt64(bsonIterator); columnValue = Int64GetDatum(value); break; } case FLOAT4OID: { - float4 value = (float4) bson_iterator_double(bsonIterator); + float4 value = (float4) BsonIterDouble(bsonIterator); columnValue = Float4GetDatum(value); break; } case FLOAT8OID: { - float8 value = bson_iterator_double(bsonIterator); + float8 value = BsonIterDouble(bsonIterator); columnValue = Float8GetDatum(value); break; } case NUMERICOID: { - float8 value = bson_iterator_double(bsonIterator); + float8 value = BsonIterDouble(bsonIterator); Datum valueDatum = Float8GetDatum(value); /* overlook type modifiers for numeric */ @@ -1124,13 +1458,13 @@ ColumnValue(bson_iterator *bsonIterator, Oid columnTypeId, int32 columnTypeMod) } case BOOLOID: { - bool value = bson_iterator_bool(bsonIterator); + bool value = BsonIterBool(bsonIterator); columnValue = BoolGetDatum(value); break; } case BPCHAROID: { - const char *value = bson_iterator_string(bsonIterator); + const char *value = BsonIterString(bsonIterator); Datum valueDatum = CStringGetDatum(value); columnValue = DirectFunctionCall3(bpcharin, valueDatum, @@ -1140,7 +1474,7 @@ ColumnValue(bson_iterator *bsonIterator, Oid columnTypeId, int32 columnTypeMod) } case VARCHAROID: { - const char *value = bson_iterator_string(bsonIterator); + const char *value = BsonIterString(bsonIterator); Datum valueDatum = CStringGetDatum(value); columnValue = DirectFunctionCall3(varcharin, valueDatum, @@ -1150,7 +1484,7 @@ ColumnValue(bson_iterator *bsonIterator, Oid columnTypeId, int32 columnTypeMod) } case TEXTOID: { - const char *value = bson_iterator_string(bsonIterator); + const char *value = BsonIterString(bsonIterator); columnValue = CStringGetTextDatum(value); break; } @@ -1159,7 +1493,7 @@ ColumnValue(bson_iterator *bsonIterator, Oid columnTypeId, int32 columnTypeMod) char value[NAMEDATALEN]; Datum valueDatum = 0; - bson_oid_t *bsonObjectId = bson_iterator_oid(bsonIterator); + bson_oid_t *bsonObjectId = (bson_oid_t*) BsonIterOid(bsonIterator); bson_oid_to_string(bsonObjectId, value); valueDatum = CStringGetDatum(value); @@ -1168,9 +1502,19 @@ ColumnValue(bson_iterator *bsonIterator, Oid columnTypeId, int32 columnTypeMod) Int32GetDatum(columnTypeMod)); break; } + case BYTEAOID: + { + int value_len = BsonIterBinLen(bsonIterator); + const char *value = BsonIterBinData(bsonIterator); + bytea *result = (bytea *)palloc(value_len + VARHDRSZ); + memcpy(VARDATA(result), value, value_len); + SET_VARSIZE(result, value_len + VARHDRSZ); + columnValue = PointerGetDatum(result); + break; + } case DATEOID: { - int64 valueMillis = bson_iterator_date(bsonIterator); + int64 valueMillis = BsonIterDate(bsonIterator); int64 timestamp = (valueMillis * 1000L) - POSTGRES_TO_UNIX_EPOCH_USECS; Datum timestampDatum = TimestampGetDatum(timestamp); @@ -1180,7 +1524,7 @@ ColumnValue(bson_iterator *bsonIterator, Oid columnTypeId, int32 columnTypeMod) case TIMESTAMPOID: case TIMESTAMPTZOID: { - int64 valueMillis = bson_iterator_date(bsonIterator); + int64 valueMillis = BsonIterDate(bsonIterator); int64 timestamp = (valueMillis * 1000L) - POSTGRES_TO_UNIX_EPOCH_USECS; /* overlook type modifiers for timestamp */ @@ -1214,7 +1558,7 @@ ColumnValue(bson_iterator *bsonIterator, Oid columnTypeId, int32 columnTypeMod) default: { ereport(ERROR, (errcode(ERRCODE_FDW_INVALID_DATA_TYPE), - errmsg("cannot convert bson type to column type"), + errmsg("cannot convert BSON type to column type"), errhint("Column type: %u", (uint32) columnTypeId))); break; } @@ -1233,12 +1577,15 @@ ColumnValue(bson_iterator *bsonIterator, Oid columnTypeId, int32 columnTypeMod) * [1] http://docs.mongodb.org/manual/reference/mongodb-extended-json/ */ void -DumpJson(StringInfo output, const char *bsonData, bool isArray) { - bson_iterator i; - const char *key; - bool isFirstElement; - char beginSymbol, endSymbol; - bson_type t; +DumpJson(StringInfo output, const char *bsonData, bool isArray) +{ + bson_iterator i; + const char *key; + bool isFirstElement; + char beginSymbol, endSymbol; + bson_type t; + + bson_iterator_from_buffer(&i, bsonData); if (isArray) @@ -1290,7 +1637,7 @@ DumpJson(StringInfo output, const char *bsonData, bool isArray) { { char oidhex[25]; bson_oid_to_string(bson_iterator_oid(&i), oidhex); - appendStringInfo(output, "\"%s\"", oidhex); + appendStringInfo(output, "{\"$oid\":\"%s\"}", oidhex); break; } case BSON_BOOL: @@ -1336,7 +1683,7 @@ DumpJson(StringInfo output, const char *bsonData, bool isArray) { appendStringInfo(output, "%d", bson_iterator_int(&i)); break; case BSON_LONG: - appendStringInfo(output, "%ld", (uint64_t)bson_iterator_long(&i)); + appendStringInfo(output, "%lld", (uint64_t)bson_iterator_long(&i)); break; case BSON_TIMESTAMP: ereport(ERROR, (errcode(ERRCODE_FDW_INVALID_DATA_TYPE), @@ -1363,10 +1710,12 @@ DumpJson(StringInfo output, const char *bsonData, bool isArray) { * EscapeJsonString escapes a string for safe inclusion in JSON. */ const char * -EscapeJsonString(const char *string) { - StringInfo buffer; - const char *ptr; - int i, segmentStartIdx, len; +EscapeJsonString(const char *string) +{ + StringInfo buffer; + const char *ptr; + int i, segmentStartIdx, len; + bool needsEscaping = false; for (ptr = string; *ptr; ++ptr) { if (*ptr == '"' || *ptr == '\r' || *ptr == '\n' || *ptr == '\t'\ @@ -1411,22 +1760,25 @@ EscapeJsonString(const char *string) { * all Mongo related resources allocated for the foreign scan. */ static void -MongoFreeScanState(MongoFdwExecState *executionState) +MongoFreeScanState(MongoFdwModifyState *fmstate) { - if (executionState == NULL) - { + if (fmstate == NULL) return; - } - bson_destroy(executionState->queryDocument); - bson_dealloc(executionState->queryDocument); + if (fmstate->queryDocument) + { + BsonDestroy(fmstate->queryDocument); + fmstate->queryDocument = NULL; + } - mongo_cursor_destroy(executionState->mongoCursor); - mongo_cursor_dealloc(executionState->mongoCursor); + if (fmstate->mongoCursor) + { + MongoCursorDestroy(fmstate->mongoCursor); + fmstate->mongoCursor = NULL; + } - /* also close the connection to mongo server */ - mongo_destroy(executionState->mongoConnection); - mongo_dealloc(executionState->mongoConnection); + /* Release remote connection */ + mongo_release_connection(fmstate->mongoConnection); } @@ -1438,15 +1790,16 @@ MongoAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc *acquireSampleRowsFunc, BlockNumber *totalPageCount) { - BlockNumber pageCount = 0; - int attributeCount = 0; - int32 *attributeWidths = NULL; - Oid foreignTableId = InvalidOid; - int32 documentWidth = 0; - double documentCount = 0.0; - double foreignTableSize = 0; + BlockNumber pageCount = 0; + int attributeCount = 0; + int32 *attributeWidths = NULL; + Oid foreignTableId = InvalidOid; + int32 documentWidth = 0; + double documentCount = 0.0; + double foreignTableSize = 0; foreignTableId = RelationGetRelid(relation); + documentCount = ForeignTableDocumentCount(foreignTableId); if (documentCount > 0.0) @@ -1497,30 +1850,29 @@ MongoAcquireSampleRows(Relation relation, int errorLevel, HeapTuple *sampleRows, int targetRowCount, double *totalRowCount, double *totalDeadRowCount) { - int sampleRowCount = 0; - double rowCount = 0; - double rowCountToSkip = -1; /* -1 means not set yet */ - double randomState = 0; - Datum *columnValues = NULL; - bool *columnNulls = NULL; - Oid foreignTableId = InvalidOid; - TupleDesc tupleDescriptor = NULL; - Form_pg_attribute *attributesPtr = NULL; - AttrNumber columnCount = 0; - AttrNumber columnId = 0; - HTAB *columnMappingHash = NULL; - mongo_cursor *mongoCursor = NULL; - bson *queryDocument = NULL; - Const *queryBuffer = NULL; - List *columnList = NIL; - ForeignScanState *scanState = NULL; - List *foreignPrivateList = NIL; - ForeignScan *foreignScan = NULL; - MongoFdwExecState *executionState = NULL; - char *relationName = NULL; - int executorFlags = 0; - MemoryContext oldContext = CurrentMemoryContext; - MemoryContext tupleContext = NULL; + int sampleRowCount = 0; + double rowCount = 0; + double rowCountToSkip = -1; /* -1 means not set yet */ + double randomState = 0; + Datum *columnValues = NULL; + bool *columnNulls = NULL; + Oid foreignTableId = InvalidOid; + TupleDesc tupleDescriptor = NULL; + Form_pg_attribute *attributesPtr = NULL; + AttrNumber columnCount = 0; + AttrNumber columnId = 0; + HTAB *columnMappingHash = NULL; + MONGO_CURSOR *mongoCursor = NULL; + BSON *queryDocument = NULL; + List *columnList = NIL; + ForeignScanState *scanState = NULL; + List *foreignPrivateList = NIL; + ForeignScan *foreignScan = NULL; + MongoFdwModifyState *fmstate = NULL; + char *relationName = NULL; + int executorFlags = 0; + MemoryContext oldContext = CurrentMemoryContext; + MemoryContext tupleContext = NULL; /* create list of columns in the relation */ tupleDescriptor = RelationGetDescr(relation); @@ -1545,13 +1897,10 @@ MongoAcquireSampleRows(Relation relation, int errorLevel, foreignTableId = RelationGetRelid(relation); queryDocument = QueryDocument(foreignTableId, NIL); - queryBuffer = SerializeDocument(queryDocument); + foreignPrivateList = list_make2(columnList, NULL); /* only clean up the query struct, but not its data */ - bson_dealloc(queryDocument); - - /* construct foreign plan with query document and column list */ - foreignPrivateList = list_make2(queryBuffer, columnList); + BsonDestroy(queryDocument); foreignScan = makeNode(ForeignScan); foreignScan->fdw_private = foreignPrivateList; @@ -1560,9 +1909,9 @@ MongoAcquireSampleRows(Relation relation, int errorLevel, MongoBeginForeignScan(scanState, executorFlags); - executionState = (MongoFdwExecState *) scanState->fdw_state; - mongoCursor = executionState->mongoCursor; - columnMappingHash = executionState->columnMappingHash; + fmstate = (MongoFdwModifyState *) scanState->fdw_state; + mongoCursor = fmstate->mongoCursor; + columnMappingHash = fmstate->columnMappingHash; /* * Use per-tuple memory context to prevent leak of memory used to read @@ -1582,8 +1931,6 @@ MongoAcquireSampleRows(Relation relation, int errorLevel, for (;;) { - int32 cursorStatus = MONGO_ERROR; - /* check for user-requested abort or sleep */ vacuum_delay_point(); @@ -1591,10 +1938,9 @@ MongoAcquireSampleRows(Relation relation, int errorLevel, memset(columnValues, 0, columnCount * sizeof(Datum)); memset(columnNulls, true, columnCount * sizeof(bool)); - cursorStatus = mongo_cursor_next(mongoCursor); - if (cursorStatus == MONGO_OK) + if(MongoCursorNext(mongoCursor, NULL)) { - const bson *bsonDocument = mongo_cursor_bson(mongoCursor); + const BSON *bsonDocument = MongoCursorBson(mongoCursor); const char *bsonDocumentKey = NULL; /* top level document */ /* fetch next tuple */ @@ -1608,19 +1954,23 @@ MongoAcquireSampleRows(Relation relation, int errorLevel, } else { - /* - * The following is a courtesy check. In practice when Mongo shuts down, - * mongo_cursor_next() could possibly crash. - */ - mongo_cursor_error_t errorCode = mongoCursor->err; - if (errorCode != MONGO_CURSOR_EXHAUSTED) + #ifdef META_DRIVER + bson_error_t error; + if (mongoc_cursor_error (mongoCursor, &error)) { - MongoFreeScanState(executionState); + MongoFreeScanState(fmstate); ereport(ERROR, (errmsg("could not iterate over mongo collection"), - errhint("Mongo driver cursor error code: %d", - errorCode))); + errhint("Mongo driver error: %s", error.message))); } - + #else + mongo_cursor_error_t errorCode = mongoCursor->err; + if (errorCode != MONGO_CURSOR_EXHAUSTED) + { + MongoFreeScanState(fmstate); + ereport(ERROR, (errmsg("could not iterate over mongo collection"), + errhint("Mongo driver cursor error code: %d", errorCode))); + } + #endif break; } @@ -1673,14 +2023,14 @@ MongoAcquireSampleRows(Relation relation, int errorLevel, /* clean up */ MemoryContextDelete(tupleContext); - MongoFreeScanState(executionState); + MongoFreeScanState(fmstate); pfree(columnValues); pfree(columnNulls); /* emit some interesting relation info */ relationName = RelationGetRelationName(relation); - ereport(errorLevel, (errmsg("\"%s\": collection contains %.0f rows; %d rows in sample", + ereport(errorLevel, (errmsg("\"%s\": collection contains %.0f rows; %d rows in sample", relationName, rowCount, sampleRowCount))); (*totalRowCount) = rowCount; diff --git a/mongo_fdw.control b/mongo_fdw.control index c0fbe86..a457792 100644 --- a/mongo_fdw.control +++ b/mongo_fdw.control @@ -1,6 +1,7 @@ # mongo_fdw extension # -# Copyright (c) 2012-2014 Citus Data, Inc. +# Portions Copyright © 2004-2014, EnterpriseDB Corporation. +# Portions Copyright © 2012–2014 Citus Data, Inc. # comment = 'foreign data wrapper for MongoDB access' default_version = '1.0' diff --git a/mongo_fdw.h b/mongo_fdw.h index 60947a2..b2874cd 100644 --- a/mongo_fdw.h +++ b/mongo_fdw.h @@ -1,10 +1,16 @@ /*------------------------------------------------------------------------- * * mongo_fdw.h + * Foreign-data wrapper for remote MongoDB servers * - * Type and function declarations for MongoDB foreign data wrapper. + * Portions Copyright (c) 2012-2014, PostgreSQL Global Development Group * - * Copyright (c) 2012-2014 Citus Data, Inc. + * Portions Copyright (c) 2004-2014, EnterpriseDB Corporation. + * + * Portions Copyright (c) 2012–2014 Citus Data, Inc. + * + * IDENTIFICATION + * mongo_fdw.h * *------------------------------------------------------------------------- */ @@ -12,8 +18,15 @@ #ifndef MONGO_FDW_H #define MONGO_FDW_H +#include "config.h" +#include "mongo_wrapper.h" #include "bson.h" -#include "mongo.h" + +#ifdef META_DRIVER + #include "mongoc.h" +#else + #include "mongo.h" +#endif #include "fmgr.h" #include "catalog/pg_foreign_server.h" @@ -22,13 +35,60 @@ #include "nodes/pg_list.h" #include "nodes/relation.h" #include "utils/timestamp.h" - +#include "access/reloptions.h" +#include "catalog/pg_type.h" +#include "commands/defrem.h" +#include "commands/explain.h" +#include "commands/vacuum.h" +#include "foreign/fdwapi.h" +#include "foreign/foreign.h" +#include "nodes/makefuncs.h" +#include "optimizer/cost.h" +#include "optimizer/pathnode.h" +#include "optimizer/plancat.h" +#include "optimizer/planmain.h" +#include "optimizer/restrictinfo.h" +#include "utils/array.h" +#include "utils/builtins.h" +#include "utils/date.h" +#include "utils/hsearch.h" +#include "utils/lsyscache.h" +#include "utils/rel.h" +#include "utils/memutils.h" +#include "catalog/pg_user_mapping.h" + +#ifdef META_DRIVER + #define BSON bson_t + #define BSON_TYPE bson_type_t + #define BSON_ITERATOR bson_iter_t + #define MONGO_CONN mongoc_client_t + #define MONGO_CURSOR mongoc_cursor_t +#else + #define BSON bson + #define BSON_TYPE bson_type + #define BSON_ITERATOR bson_iterator + #define MONGO_CONN mongo + #define MONGO_CURSOR mongo_cursor + #define BSON_TYPE_DOCUMENT BSON_OBJECT + #define BSON_TYPE_NULL BSON_NULL + #define BSON_TYPE_ARRAY BSON_ARRAY + #define BSON_TYPE_INT32 BSON_INT + #define BSON_TYPE_INT64 BSON_LONG + #define BSON_TYPE_DOUBLE BSON_DOUBLE + #define BSON_TYPE_BINDATA BSON_BINDATA + #define BSON_TYPE_BOOL BSON_BOOL + #define BSON_TYPE_UTF8 BSON_STRING + #define BSON_TYPE_OID BSON_OID + #define BSON_TYPE_DATE_TIME BSON_DATE +#endif /* Defines for valid option names */ #define OPTION_NAME_ADDRESS "address" #define OPTION_NAME_PORT "port" #define OPTION_NAME_DATABASE "database" #define OPTION_NAME_COLLECTION "collection" +#define OPTION_NAME_USERNAME "username" +#define OPTION_NAME_PASSWORD "password" /* Default values for option parameters */ #define DEFAULT_IP_ADDRESS "127.0.0.1" @@ -58,16 +118,21 @@ typedef struct MongoValidOption /* Array of options that are valid for mongo_fdw */ -static const uint32 ValidOptionCount = 4; +static const uint32 ValidOptionCount = 6; static const MongoValidOption ValidOptionArray[] = { /* foreign server options */ { OPTION_NAME_ADDRESS, ForeignServerRelationId }, - { OPTION_NAME_PORT, ForeignServerRelationId }, + { OPTION_NAME_PORT, ForeignServerRelationId }, /* foreign table options */ { OPTION_NAME_DATABASE, ForeignTableRelationId }, - { OPTION_NAME_COLLECTION, ForeignTableRelationId } + { OPTION_NAME_COLLECTION, ForeignTableRelationId }, + + /* User mapping options */ + { OPTION_NAME_USERNAME, UserMappingRelationId }, + { OPTION_NAME_PASSWORD, UserMappingRelationId } + }; @@ -78,11 +143,12 @@ static const MongoValidOption ValidOptionArray[] = */ typedef struct MongoFdwOptions { - char *addressName; - int32 portNumber; - char *databaseName; + char *svr_address; + int32 svr_port; + char *svr_database; char *collectionName; - + char *svr_username; + char *svr_password; } MongoFdwOptions; @@ -90,14 +156,29 @@ typedef struct MongoFdwOptions * MongoFdwExecState keeps foreign data wrapper specific execution state that we * create and hold onto when executing the query. */ -typedef struct MongoFdwExecState +/* + * Execution state of a foreign insert/update/delete operation. + */ +typedef struct MongoFdwModifyState { - struct HTAB *columnMappingHash; - mongo *mongoConnection; - mongo_cursor *mongoCursor; - bson *queryDocument; + Relation rel; /* relcache entry for the foreign table */ + List *target_attrs; /* list of target attribute numbers */ + + /* info about parameters for prepared statement */ + int p_nums; /* number of parameters to transmit */ + FmgrInfo *p_flinfo; /* output conversion functions for them */ + + struct HTAB *columnMappingHash; + + MONGO_CONN *mongoConnection; /* MongoDB connection */ + MONGO_CURSOR *mongoCursor; /* MongoDB cursor */ + BSON *queryDocument; /* Bson Document */ -} MongoFdwExecState; + MongoFdwOptions *options; + + /* working memory context */ + MemoryContext temp_cxt; /* context for per-tuple temporary data */ +} MongoFdwModifyState; /* @@ -113,13 +194,21 @@ typedef struct ColumnMapping Oid columnTypeId; int32 columnTypeMod; Oid columnArrayTypeId; - } ColumnMapping; +/* options.c */ +extern MongoFdwOptions * mongo_get_options(Oid foreignTableId); +extern void mongo_free_options(MongoFdwOptions *options); +extern StringInfo mongo_option_names_string(Oid currentContextId); + +/* connection.c */ +MONGO_CONN* mongo_get_connection(ForeignServer *server, UserMapping *user, MongoFdwOptions *opt); +extern void mongo_cleanup_connection(void); +extern void mongo_release_connection(MONGO_CONN* conn); /* Function declarations related to creating the mongo query */ extern List * ApplicableOpExpressionList(RelOptInfo *baserel); -extern bson * QueryDocument(Oid relationId, List *opExpressionList); +extern BSON * QueryDocument(Oid relationId, List *opExpressionList); extern List * ColumnList(RelOptInfo *baserel); /* Function declarations for foreign data wrapper */ diff --git a/mongo_query.c b/mongo_query.c index c818f6c..87200c0 100644 --- a/mongo_query.c +++ b/mongo_query.c @@ -1,18 +1,30 @@ /*------------------------------------------------------------------------- * * mongo_query.c + * Foreign-data wrapper for remote MongoDB servers * - * Function definitions for sending queries to MongoDB. These functions assume - * that queries are sent through the official MongoDB C driver, and apply query - * optimizations to reduce the amount of data fetched from the driver. + * Portions Copyright (c) 2012-2014, PostgreSQL Global Development Group * - * Copyright (c) 2012-2014 Citus Data, Inc. + * Portions Copyright (c) 2004-2014, EnterpriseDB Corporation. + * + * Portions Copyright (c) 2012–2014 Citus Data, Inc. + * + * IDENTIFICATION + * mongo_query.c * *------------------------------------------------------------------------- */ #include "postgres.h" +#include "mongo_wrapper.h" + +#ifdef META_DRIVER + #include "mongoc.h" +#else + #include "mongo.h" +#endif #include "mongo_fdw.h" +#include "mongo_query.h" #include "catalog/pg_type.h" #include "nodes/makefuncs.h" @@ -24,7 +36,9 @@ #include "utils/lsyscache.h" #include "utils/numeric.h" #include "utils/timestamp.h" - +#include "bson.h" +#include "json.h" +#include "bits.h" /* Local functions forward declarations */ static Expr * FindArgumentOfType(List *argumentList, NodeTag argumentType); @@ -32,10 +46,74 @@ static char * MongoOperatorName(const char *operatorName); static List * EqualityOperatorList(List *operatorList); static List * UniqueColumnList(List *operatorList); static List * ColumnOperatorList(Var *column, List *operatorList); -static void AppendConstantValue(bson *queryDocument, const char *keyName, +static void AppendConstantValue(BSON *queryDocument, const char *keyName, Const *constant); +bool json_to_bson_append_element( bson *bb , const char *k , struct json_object *v ); + +bool json_to_bson_append_element( bson *bb , const char *k , struct json_object *v ) { + bool status; + status = true; + if ( ! v ) { + bson_append_null( bb , k ); + return status; + } + + switch ( json_object_get_type( v ) ) { + case json_type_int: + bson_append_int( bb , k , json_object_get_int( v ) ); + break; + case json_type_boolean: + bson_append_bool( bb , k , json_object_get_boolean( v ) ); + break; + case json_type_double: + bson_append_double( bb , k , json_object_get_double( v ) ); + break; + case json_type_string: + bson_append_string( bb , k , json_object_get_string( v ) ); + break; + case json_type_object: + { + struct json_object *joj = NULL; + joj = json_object_object_get( v, "$oid" ); + if (joj != NULL) { + bson_oid_t bsonObjectId; + memset(bsonObjectId.bytes, 0, sizeof(bsonObjectId.bytes)); + BsonOidFromString(&bsonObjectId, json_object_get_string(joj) ); + status = BsonAppendOid( bb, k , &bsonObjectId); + break; + } + joj = json_object_object_get( v, "$date" ); + if (joj != NULL) { + status = BsonAppendDate( bb, k , json_object_get_int64(joj)); + break; + } + + bson_append_start_object( bb , k ); + json_object_object_foreach( v, kk, vv ) { + json_to_bson_append_element( bb , kk , vv ); + } + bson_append_finish_object( bb ); + break; + } + case json_type_array: + bson_append_start_array( bb , k ); + int i; + char buf[10]; + for ( i=0; i= date '1994-01-01' AND l_shipdate < date '1995-01-01'" become * "l_shipdate: { $gte: new Date(757382400000), $lt: new Date(788918400000) }". */ -bson * +BSON * QueryDocument(Oid relationId, List *opExpressionList) { List *equalityOperatorList = NIL; @@ -161,12 +239,9 @@ QueryDocument(Oid relationId, List *opExpressionList) List *columnList = NIL; ListCell *equalityOperatorCell = NULL; ListCell *columnCell = NULL; - bson *queryDocument = NULL; - int documentStatus = BSON_OK; - - queryDocument = bson_alloc(); - bson_init(queryDocument); + BSON *queryDocument = NULL; + queryDocument = BsonCreate(); /* * We distinguish between equality expressions and others since we need to * insert the latter (<, >, <=, >=, <>) as separate sub-documents into the @@ -209,6 +284,7 @@ QueryDocument(Oid relationId, List *opExpressionList) char *columnName = NULL; List *columnOperatorList = NIL; ListCell *columnOperatorCell = NULL; + BSON r; columnId = column->varattno; columnName = get_relid_attribute_name(relationId, columnId); @@ -217,7 +293,7 @@ QueryDocument(Oid relationId, List *opExpressionList) columnOperatorList = ColumnOperatorList(column, comparisonOperatorList); /* for comparison expressions, start a sub-document */ - bson_append_start_object(queryDocument, columnName); + BsonAppendStartObject(queryDocument, columnName, &r); foreach(columnOperatorCell, columnOperatorList) { @@ -230,15 +306,16 @@ QueryDocument(Oid relationId, List *opExpressionList) operatorName = get_opname(columnOperator->opno); mongoOperatorName = MongoOperatorName(operatorName); - +#ifdef META_DRIVER + AppendConstantValue(&r, mongoOperatorName, constant); +#else AppendConstantValue(queryDocument, mongoOperatorName, constant); +#endif } - - bson_append_finish_object(queryDocument); + BsonAppendFinishObject(queryDocument, &r); } - documentStatus = bson_finish(queryDocument); - if (documentStatus != BSON_OK) + if (!BsonFinish(queryDocument)) { ereport(ERROR, (errmsg("could not create document for query"), errhint("BSON error: %d", queryDocument->err))); @@ -361,61 +438,69 @@ ColumnOperatorList(Var *column, List *operatorList) * its MongoDB equivalent. */ static void -AppendConstantValue(bson *queryDocument, const char *keyName, Const *constant) +AppendConstantValue(BSON *queryDocument, const char *keyName, Const *constant) { - Datum constantValue = constant->constvalue; - Oid constantTypeId = constant->consttype; - - bool constantNull = constant->constisnull; - if (constantNull) + if (constant->constisnull) { - bson_append_null(queryDocument, keyName); + BsonAppendNull(queryDocument, keyName); return; } + AppenMongoValue(queryDocument, keyName, constant->constvalue, false, constant->consttype); +} + +bool +AppenMongoValue(BSON *queryDocument, const char *keyName, Datum value, bool isnull, Oid id) +{ + bool status; + if (isnull) + { + status = BsonAppendNull(queryDocument, keyName); + return status; + } - switch(constantTypeId) + switch(id) { case INT2OID: { - int16 value = DatumGetInt16(constantValue); - bson_append_int(queryDocument, keyName, (int) value); + int16 valueInt = DatumGetInt16(value); + status = BsonAppendInt32(queryDocument, keyName, (int) valueInt); break; } case INT4OID: { - int32 value = DatumGetInt32(constantValue); - bson_append_int(queryDocument, keyName, value); + int32 valueInt = DatumGetInt32(value); + status = BsonAppendInt32(queryDocument, keyName, valueInt); break; } case INT8OID: { - int64 value = DatumGetInt64(constantValue); - bson_append_long(queryDocument, keyName, value); + int64 valueLong = DatumGetInt64(value); + status = BsonAppendInt64(queryDocument, keyName, valueLong); break; } case FLOAT4OID: { - float4 value = DatumGetFloat4(constantValue); - bson_append_double(queryDocument, keyName, (double) value); + float4 valueFloat = DatumGetFloat4(value); + status = BsonAppendDouble(queryDocument, keyName, (double) valueFloat); break; } case FLOAT8OID: { - float8 value = DatumGetFloat8(constantValue); - bson_append_double(queryDocument, keyName, value); + float8 valueFloat = DatumGetFloat8(value); + status = BsonAppendDouble(queryDocument, keyName, valueFloat); break; } case NUMERICOID: { - Datum valueDatum = DirectFunctionCall1(numeric_float8, constantValue); - float8 value = DatumGetFloat8(valueDatum); - bson_append_double(queryDocument, keyName, value); + Datum valueDatum = DirectFunctionCall1(numeric_float8, value); + float8 valueFloat = DatumGetFloat8(valueDatum); + status = BsonAppendDouble(queryDocument, keyName, valueFloat); break; } case BOOLOID: { - bool value = DatumGetBool(constantValue); - bson_append_int(queryDocument, keyName, (int) value); + bool valueBool = DatumGetBool(value); + status = BsonAppendBool(queryDocument, keyName, valueBool); break; } case BPCHAROID: @@ -425,46 +510,196 @@ AppendConstantValue(bson *queryDocument, const char *keyName, Const *constant) char *outputString = NULL; Oid outputFunctionId = InvalidOid; bool typeVarLength = false; - - getTypeOutputInfo(constantTypeId, &outputFunctionId, &typeVarLength); - outputString = OidOutputFunctionCall(outputFunctionId, constantValue); - - bson_append_string(queryDocument, keyName, outputString); + getTypeOutputInfo(id, &outputFunctionId, &typeVarLength); + outputString = OidOutputFunctionCall(outputFunctionId, value); + status = BsonAppendUTF8(queryDocument, keyName, outputString); break; } - case NAMEOID: + case BYTEAOID: + { + int len; + char *data; + char *result = DatumGetPointer(value); + if (VARATT_IS_1B(result)) { + len = VARSIZE_1B(result) - VARHDRSZ_SHORT; + data = VARDATA_1B(result); + } else { + len = VARSIZE_4B(result) - VARHDRSZ; + data = VARDATA_4B(result); + } + status = BsonAppendBinary(queryDocument, keyName, data, len); + break; + } + case NAMEOID: { char *outputString = NULL; Oid outputFunctionId = InvalidOid; bool typeVarLength = false; bson_oid_t bsonObjectId; memset(bsonObjectId.bytes, 0, sizeof(bsonObjectId.bytes)); - - getTypeOutputInfo(constantTypeId, &outputFunctionId, &typeVarLength); - outputString = OidOutputFunctionCall(outputFunctionId, constantValue); - bson_oid_from_string(&bsonObjectId, outputString); - - bson_append_oid(queryDocument, keyName, &bsonObjectId); + getTypeOutputInfo(id, &outputFunctionId, &typeVarLength); + outputString = OidOutputFunctionCall(outputFunctionId, value); + BsonOidFromString(&bsonObjectId, outputString); + status = BsonAppendOid(queryDocument, keyName, &bsonObjectId); break; } case DATEOID: { - Datum valueDatum = DirectFunctionCall1(date_timestamp, constantValue); + Datum valueDatum = DirectFunctionCall1(date_timestamp, value); Timestamp valueTimestamp = DatumGetTimestamp(valueDatum); int64 valueMicroSecs = valueTimestamp + POSTGRES_TO_UNIX_EPOCH_USECS; int64 valueMilliSecs = valueMicroSecs / 1000; - bson_append_date(queryDocument, keyName, valueMilliSecs); + status = BsonAppendDate(queryDocument, keyName, valueMilliSecs); break; } case TIMESTAMPOID: case TIMESTAMPTZOID: { - Timestamp valueTimestamp = DatumGetTimestamp(constantValue); + Timestamp valueTimestamp = DatumGetTimestamp(value); int64 valueMicroSecs = valueTimestamp + POSTGRES_TO_UNIX_EPOCH_USECS; int64 valueMilliSecs = valueMicroSecs / 1000; - bson_append_date(queryDocument, keyName, valueMilliSecs); + status = BsonAppendDate(queryDocument, keyName, valueMilliSecs); + break; + } + case NUMERICARRAY_OID: + { + ArrayType *array; + Oid elmtype; + int16 elmlen; + bool elmbyval; + char elmalign; + int num_elems; + Datum *elem_values; + bool *elem_nulls; + int i; + BSON t; + + array = DatumGetArrayTypeP(value); + elmtype = ARR_ELEMTYPE(array); + get_typlenbyvalalign(elmtype, &elmlen, &elmbyval, &elmalign); + + deconstruct_array(array, elmtype, elmlen, elmbyval, elmalign, &elem_values, &elem_nulls, &num_elems); + + BsonAppendStartArray(queryDocument, keyName, &t); + for (i = 0; i < num_elems; i++) + { + Datum valueDatum; + float8 valueFloat; + if (elem_nulls[i]) + continue; + + valueDatum = DirectFunctionCall1(numeric_float8, elem_values[i]); + valueFloat = DatumGetFloat8(valueDatum); + char *index = malloc(snprintf(0, 0, "%d", i)+1); + sprintf(index, "%d", i); +#ifdef META_DRIVER + status = BsonAppendDouble(&t, index, valueFloat); +#else + status = BsonAppendDouble(queryDocument, index, valueFloat); +#endif + } + BsonAppendFinishArray(queryDocument, &t); + pfree(elem_values); + pfree(elem_nulls); + break; + } + case TEXTARRAYOID: + { + ArrayType *array; + Oid elmtype; + int16 elmlen; + bool elmbyval; + char elmalign; + int num_elems; + Datum *elem_values; + bool *elem_nulls; + int i; + BSON t; + + array = DatumGetArrayTypeP(value); + elmtype = ARR_ELEMTYPE(array); + get_typlenbyvalalign(elmtype, &elmlen, &elmbyval, &elmalign); + + deconstruct_array(array, elmtype, elmlen, elmbyval, elmalign, &elem_values, &elem_nulls, &num_elems); + + BsonAppendStartArray(queryDocument, keyName, &t); + for (i = 0; i < num_elems; i++) + { + char *valueString = NULL; + Oid outputFunctionId = InvalidOid; + bool typeVarLength = false; + if (elem_nulls[i]) + continue; + getTypeOutputInfo(TEXTOID, &outputFunctionId, &typeVarLength); + valueString = OidOutputFunctionCall(outputFunctionId, elem_values[i]); + char *index = malloc(snprintf(0, 0, "%d", i)+1); + sprintf(index, "%d", i); + status = BsonAppendUTF8(queryDocument, index, valueString); + } + BsonAppendFinishArray(queryDocument, &t); + pfree(elem_values); + pfree(elem_nulls); + break; + } + case 1003: // NAMEARRAYOID + { + ArrayType *array; + Oid elmtype; + int16 elmlen; + bool elmbyval; + char elmalign; + int num_elems; + Datum *elem_values; + bool *elem_nulls; + int i; + BSON t; + + array = DatumGetArrayTypeP(value); + elmtype = ARR_ELEMTYPE(array); + get_typlenbyvalalign(elmtype, &elmlen, &elmbyval, &elmalign); + + deconstruct_array(array, elmtype, elmlen, elmbyval, elmalign, &elem_values, &elem_nulls, &num_elems); + + BsonAppendStartArray(queryDocument, keyName, &t); + for (i = 0; i < num_elems; i++) + { + if (elem_nulls[i]) + continue; + char *valueString = NULL; + Oid outputFunctionId = InvalidOid; + bool typeVarLength = false; + bson_oid_t bsonObjectId; + memset(bsonObjectId.bytes, 0, sizeof(bsonObjectId.bytes)); + getTypeOutputInfo(NAMEOID, &outputFunctionId, &typeVarLength); + valueString = OidOutputFunctionCall(outputFunctionId, elem_values[i]); + BsonOidFromString(&bsonObjectId, valueString); + char *index = malloc(snprintf(0, 0, "%d", i)+1); + sprintf(index, "%d", i); + status = BsonAppendOid(queryDocument, index, &bsonObjectId); + } + BsonAppendFinishArray(queryDocument, &t); + pfree(elem_values); + pfree(elem_nulls); + break; + } + case JSONOID: + { + char *outputString = NULL; + Oid outputFunctionId = InvalidOid; + bool typeVarLength = false; + getTypeOutputInfo(id, &outputFunctionId, &typeVarLength); + outputString = OidOutputFunctionCall(outputFunctionId, value); + struct json_object *o = json_tokener_parse( outputString ); + + if ( is_error( o ) ) { + fprintf( stderr , "\t ERROR PARSING\n" ); + status = 0; + break; + } + + status = json_to_bson_append_element( queryDocument, keyName, o ); break; } default: @@ -474,12 +709,13 @@ AppendConstantValue(bson *queryDocument, const char *keyName, Const *constant) * byte arrays are easy to add, but they need testing. Other types * such as money or inet, do not have equivalents in MongoDB. */ - ereport(ERROR, (errcode(ERRCODE_FDW_INVALID_DATA_TYPE), - errmsg("cannot convert constant value to BSON value"), - errhint("Constant value data type: %u", constantTypeId))); + ereport(ERROR, (errcode(ERRCODE_FDW_INVALID_DATA_TYPE), + errmsg("cannot convert constant value to BSON value"), + errhint("Constant value data type: %u", id))); break; } } + return status; } diff --git a/mongo_query.h b/mongo_query.h index 0a75e98..9ba8508 100644 --- a/mongo_query.h +++ b/mongo_query.h @@ -1,10 +1,16 @@ /*------------------------------------------------------------------------- * * mongo_query.h + * Foreign-data wrapper for remote MongoDB servers * - * Type and function declarations for constructing queries to send to MongoDB. + * Portions Copyright (c) 2012-2014, PostgreSQL Global Development Group * - * Copyright (c) 2012-2014 Citus Data, Inc. + * Portions Copyright (c) 2004-2014, EnterpriseDB Corporation. + * + * Portions Copyright (c) 2012–2014 Citus Data, Inc. + * + * IDENTIFICATION + * mongo_query.h * *------------------------------------------------------------------------- */ @@ -13,6 +19,8 @@ #define MONGO_QUERY_H +#define NUMERICARRAY_OID 1231 +bool AppenMongoValue(BSON *queryDocument, const char *keyName, Datum value, bool isnull, Oid id); -#endif /* MONGO_QUERY_H */ +#endif /* MONGO_QUERY_H */ diff --git a/mongo_wrapper.c b/mongo_wrapper.c new file mode 100644 index 0000000..0b9e1d1 --- /dev/null +++ b/mongo_wrapper.c @@ -0,0 +1,359 @@ +/*------------------------------------------------------------------------- + * + * mongo_wrapper.c + * Foreign-data wrapper for remote MongoDB servers + * + * Portions Copyright (c) 2012-2014, PostgreSQL Global Development Group + * + * Portions Copyright (c) 2004-2014, EnterpriseDB Corporation. + * + * Portions Copyright (c) 2012–2014 Citus Data, Inc. + * + * IDENTIFICATION + * mongo_wrapper.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "mongo_wrapper.h" + +#ifdef META_DRIVER + #include "mongoc.h" +#else + #include "mongo.h" +#endif + +#include "mongo_fdw.h" + +#define QUAL_STRING_LEN 512 + +MONGO_CONN* +MongoConnect(const char* host, const unsigned short port, char* databaseName, char *user, char *password) +{ + MONGO_CONN *conn; + conn = mongo_alloc(); + mongo_init(conn); + + if (mongo_connect(conn, host, port) != MONGO_OK) + { + int err = conn->err; + mongo_destroy(conn); + mongo_dealloc(conn); + ereport(ERROR, (errmsg("could not connect to %s:%d", host, port), + errhint("Mongo driver connection error: %d", err))); + } + if (user && password) + { + if (mongo_cmd_authenticate(conn, databaseName, user, password) != MONGO_OK) + { + char *str = pstrdup(conn->errstr); + mongo_destroy(conn); + mongo_dealloc(conn); + ereport(ERROR, (errmsg("could not connect to %s:%d", host, port), + errhint("Mongo driver connection error: %s", str))); + } + } + return conn; +} + +void +MongoDisconnect(MONGO_CONN* conn) +{ + mongo_destroy(conn); + mongo_dealloc(conn); +} + +bool +MongoInsert(MONGO_CONN* conn, char* database, char *collection, bson* b) +{ + char qual[QUAL_STRING_LEN]; + + snprintf (qual, QUAL_STRING_LEN, "%s.%s", database, collection); + if (mongo_insert(conn, qual, b, NULL) != MONGO_OK) + ereport(ERROR, (errmsg("failed to insert row"), + errhint("Mongo driver insert error: %d", conn->err))); + return true; +} + + +bool +MongoUpdate(MONGO_CONN* conn, char* database, char *collection, BSON* b, BSON* op) +{ + char qual[QUAL_STRING_LEN]; + + snprintf (qual, QUAL_STRING_LEN, "%s.%s", database, collection); + if (mongo_update(conn, qual, b, op, MONGO_UPDATE_BASIC, 0) != MONGO_OK) + ereport(ERROR, (errmsg("failed to update row"), + errhint("Mongo driver update error: %d", conn->err))); + return true; +} + + +bool +MongoDelete(MONGO_CONN* conn, char* database, char *collection, BSON* b) +{ + char qual[QUAL_STRING_LEN]; + + snprintf (qual, QUAL_STRING_LEN, "%s.%s", database, collection); + if (mongo_remove(conn, qual, b , NULL) != MONGO_OK) + ereport(ERROR, (errmsg("failed to delete row"), + errhint("Mongo driver delete error: %d", conn->err))); + + return true; +} + + +MONGO_CURSOR* +MongoCursorCreate(MONGO_CONN* conn, char* database, char *collection, BSON* q) +{ + MONGO_CURSOR* c; + char qual[QUAL_STRING_LEN]; + + snprintf (qual, QUAL_STRING_LEN, "%s.%s", database, collection); + c = mongo_cursor_alloc(); + mongo_cursor_init(c, conn , qual); + mongo_cursor_set_query(c, q); + return c; +} + + +const bson* +MongoCursorBson(MONGO_CURSOR* c) +{ + return mongo_cursor_bson(c); +} + + +bool +MongoCursorNext(MONGO_CURSOR* c, BSON* b) +{ + return (mongo_cursor_next(c) == MONGO_OK); +} + +void +MongoCursorDestroy(MONGO_CURSOR* c) +{ + mongo_cursor_destroy(c); + mongo_cursor_dealloc(c); +} + +BSON* +BsonCreate() +{ + BSON *b = NULL; + b = bson_alloc(); + bson_init(b); + return b; +} + +void +BsonDestroy(BSON *b) +{ + bson_destroy(b); + bson_dealloc(b); +} + +bool +BsonIterInit(BSON_ITERATOR *it, BSON *b) +{ + bson_iterator_init(it, b); + return true; +} + +bool +BsonIterSubObject(BSON_ITERATOR *it, BSON *b) +{ + bson_iterator_subobject_init(it, b, 0); + return true; +} + +int32_t +BsonIterInt32(BSON_ITERATOR *it) +{ + return bson_iterator_int(it); +} + + +int64_t +BsonIterInt64(BSON_ITERATOR *it) +{ + return bson_iterator_long(it); +} + + +double +BsonIterDouble(BSON_ITERATOR *it) +{ + return bson_iterator_double(it); +} + + +bool +BsonIterBool(BSON_ITERATOR *it) +{ + return bson_iterator_bool(it); +} + + +const char* +BsonIterString(BSON_ITERATOR *it) +{ + return bson_iterator_string(it); +} + +const char* BsonIterBinData(BSON_ITERATOR *it) +{ + return bson_iterator_bin_data(it); +} + +int BsonIterBinLen(BSON_ITERATOR *it) +{ + return bson_iterator_bin_len(it); +} +const bson_oid_t * +BsonIterOid(BSON_ITERATOR *it) +{ + return bson_iterator_oid(it); +} + + +time_t +BsonIterDate(BSON_ITERATOR *it) +{ + return bson_iterator_date(it); +} + + +const char* +BsonIterKey(BSON_ITERATOR *it) +{ + return bson_iterator_key(it); +} + +int +BsonIterType(BSON_ITERATOR *it) +{ + return bson_iterator_type(it); +} + +int +BsonIterNext(BSON_ITERATOR *it) +{ + return bson_iterator_next(it); +} + + +bool +BsonIterSubIter(BSON_ITERATOR *it, BSON_ITERATOR* sub) +{ + bson_iterator_subiterator(it, sub); + return true; +} + + +void +BsonOidFromString(bson_oid_t *o, char* str) +{ + bson_oid_from_string(o, str); +} + + +bool +BsonAppendOid(BSON *b, const char* key, bson_oid_t *v) +{ + return (bson_append_oid(b, key, v) == MONGO_OK); +} + +bool +BsonAppendBool(BSON *b, const char* key, bool v) +{ + return (bson_append_bool(b, key, v) == MONGO_OK); +} + +bool +BsonAppendNull(BSON *b, const char* key) +{ + return (bson_append_null(b, key) == MONGO_OK); +} + + +bool +BsonAppendInt32(BSON *b, const char* key, int v) +{ + return (bson_append_int(b, key, v) == MONGO_OK); +} + + +bool +BsonAppendInt64(BSON *b, const char* key, int64_t v) +{ + return (bson_append_long(b, key, v) == MONGO_OK); +} + +bool +BsonAppendDouble(BSON *b, const char* key, double v) +{ + return (bson_append_double(b, key, v) == MONGO_OK); +} + +bool +BsonAppendUTF8(BSON *b, const char* key, char *v) +{ + return (bson_append_string(b, key, v) == MONGO_OK); +} + +bool BsonAppendBinary(BSON *b, const char* key, char *v, size_t len) +{ + return (bson_append_binary(b, key, BSON_BIN_BINARY, v, len) == MONGO_OK); +} +bool +BsonAppendDate(BSON *b, const char* key, time_t v) +{ + return (bson_append_date(b, key, v) == MONGO_OK); +} + + +bool BsonAppendStartArray(BSON *b, const char* key, BSON* c) +{ + return (bson_append_start_array(b, key) == MONGO_OK); +} + + +bool BsonAppendFinishArray(BSON *b, BSON *c) +{ + return (bson_append_finish_array(b) == MONGO_OK); +} + +bool +BsonAppendStartObject(BSON* b, char *key, BSON* r) +{ + return (bson_append_start_object(b, key) == MONGO_OK); +} + +bool +BsonAppendFinishObject(BSON* b, BSON* r) +{ + return (bson_append_finish_object(b) == MONGO_OK); +} + + +bool +BsonAppendBson(BSON* b, char *key, BSON* c) +{ + return (bson_append_bson(b, key, c) == MONGO_OK); +} + + +bool +BsonFinish(BSON* b) +{ + return (bson_finish(b) == MONGO_OK); +} + +double +MongoAggregateCount(MONGO_CONN* conn, const char* database, const char* collection, const BSON* b) +{ + return mongo_count(conn, database, collection, b); +} + diff --git a/mongo_wrapper.h b/mongo_wrapper.h new file mode 100644 index 0000000..b08640e --- /dev/null +++ b/mongo_wrapper.h @@ -0,0 +1,76 @@ +/*------------------------------------------------------------------------- + * + * mongo_wrapper.h + * Foreign-data wrapper for remote MongoDB servers + * + * Portions Copyright (c) 2012-2014, PostgreSQL Global Development Group + * + * Portions Copyright (c) 2004-2014, EnterpriseDB Corporation. + * + * Portions Copyright (c) 2012–2014 Citus Data, Inc. + * + * IDENTIFICATION + * mongo_wrapper.h + * + *------------------------------------------------------------------------- + */ +#ifndef MONGO_WRAPPER_H +#define MONGO_WRAPPER_H + +#include "mongo_fdw.h" +#include "bson.h" + +#ifdef META_DRIVER + #include "mongoc.h" +#else + #include "mongo.h" +#endif + +MONGO_CONN* MongoConnect(const char* host, const unsigned short port, char *databaseName, char *user, char *password); +void MongoDisconnect(MONGO_CONN* conn); +bool MongoInsert(MONGO_CONN* conn, char* database, char *collection, BSON* b); +bool MongoUpdate(MONGO_CONN* conn, char* database, char *collection, BSON* b, BSON* op); +bool MongoDelete(MONGO_CONN* conn, char* database, char *collection, BSON* b); +MONGO_CURSOR* MongoCursorCreate(MONGO_CONN* conn, char* database, char *collection, BSON* q); +const BSON* MongoCursorBson(MONGO_CURSOR* c); +bool MongoCursorNext(MONGO_CURSOR* c, BSON* b); +void MongoCursorDestroy(MONGO_CURSOR* c); +double MongoAggregateCount(MONGO_CONN* conn, const char* database, const char* collection, const BSON* b); + +BSON* BsonCreate(void); +void BsonDestroy(BSON *b); + +bool BsonIterInit(BSON_ITERATOR *it, BSON *b); +bool BsonIterSubObject(BSON_ITERATOR *it, BSON *b); +int32_t BsonIterInt32(BSON_ITERATOR *it); +int64_t BsonIterInt64(BSON_ITERATOR *it); +double BsonIterDouble(BSON_ITERATOR *it); +bool BsonIterBool(BSON_ITERATOR *it); +const char* BsonIterString(BSON_ITERATOR *it); +const char* BsonIterBinData(BSON_ITERATOR *it); +int BsonIterBinLen(BSON_ITERATOR *it); +const bson_oid_t * BsonIterOid(BSON_ITERATOR *it); +time_t BsonIterDate(BSON_ITERATOR *it); +const char* BsonIterKey(BSON_ITERATOR *it); +int BsonIterType(BSON_ITERATOR *it); +int BsonIterNext(BSON_ITERATOR *it); +bool BsonIterSubIter(BSON_ITERATOR *it, BSON_ITERATOR* sub); +void BsonOidFromString(bson_oid_t *o, char* str); + +BSON *BsonCreate(); +bool BsonAppendOid(BSON *b, const char* key, bson_oid_t *v); +bool BsonAppendBool(BSON *b, const char* key, bool v); +bool BsonAppendNull(BSON *b, const char* key); +bool BsonAppendInt32(BSON *b, const char* key, int v); +bool BsonAppendInt64(BSON *b, const char* key, int64_t v); +bool BsonAppendDouble(BSON *b, const char* key, double v); +bool BsonAppendUTF8(BSON *b, const char* key, char *v); +bool BsonAppendBinary(BSON *b, const char* key, char *v, size_t len); +bool BsonAppendDate(BSON *b, const char* key, time_t v); +bool BsonAppendStartArray(BSON *b, const char* key, BSON* c); +bool BsonAppendFinishArray(BSON *b, BSON *c); +bool BsonAppendStartObject(BSON* b, char *key, BSON *r); +bool BsonAppendFinishObject(BSON* b, BSON* r); +bool BsonAppendBson(BSON* b, char *key, BSON* c); +bool BsonFinish(BSON* b); +#endif diff --git a/mongo_wrapper_meta.c b/mongo_wrapper_meta.c new file mode 100644 index 0000000..400bd12 --- /dev/null +++ b/mongo_wrapper_meta.c @@ -0,0 +1,428 @@ +/*------------------------------------------------------------------------- + * + * mongo_wrapper_meta.c + * Foreign-data wrapper for remote MongoDB servers + * + * Portions Copyright (c) 2012-2014, PostgreSQL Global Development Group + * + * Portions Copyright (c) 2004-2014, EnterpriseDB Corporation. + * + * Portions Copyright (c) 2012–2014 Citus Data, Inc. + * + * IDENTIFICATION + * mongo_wrapper_meta.c + * + *------------------------------------------------------------------------- + */ + + +#include "postgres.h" +#include + +#include "mongo_wrapper.h" + +/* + * Connect to MongoDB server using Host/ip and Port number. + */ +MONGO_CONN* +MongoConnect(const char* host, const unsigned short port, char* databaseName, char *user, char *password) +{ + MONGO_CONN *client = NULL; + char* uri = NULL; + + if (user && password) + uri = bson_strdup_printf ("mongodb://%s:%s@%s:%hu/", user, password, host, port); + else + uri = bson_strdup_printf ("mongodb://%s:%hu/", host, port); + + client = mongoc_client_new(uri); + + bson_free(uri); + + if (client == NULL) + ereport(ERROR, (errmsg("could not connect to %s:%d", host, port), + errhint("Mongo driver connection error"))); + return client; +} + +/* + * Disconnect from MongoDB server. + */ +void +MongoDisconnect(MONGO_CONN* conn) +{ + if (conn) + mongoc_client_destroy(conn); +} + + +/* + * Insert a document 'b' into MongoDB. + */ +bool +MongoInsert(MONGO_CONN* conn, char *database, char* collection, BSON* b) +{ + mongoc_collection_t *c = NULL; + bson_error_t error; + bool r = false; + + c = mongoc_client_get_collection(conn, database, collection); + + r = mongoc_collection_insert(c, MONGOC_INSERT_NONE, b, NULL, &error); + mongoc_collection_destroy(c); + if (!r) + ereport(ERROR, (errmsg("failed to insert row"), + errhint("Mongo error: \"%s\"", error.message))); + return true; +} + + +/* + * Update a document 'b' into MongoDB. + */ +bool +MongoUpdate(MONGO_CONN* conn, char* database, char *collection, BSON* b, BSON* op) +{ + mongoc_collection_t *c = NULL; + bson_error_t error; + bool r = false; + + c = mongoc_client_get_collection (conn, database, collection); + + r = mongoc_collection_update(c, MONGOC_UPDATE_NONE, b, op, NULL, &error); + mongoc_collection_destroy(c); + if (!r) + ereport(ERROR, (errmsg("failed to update row"), + errhint("Mongo error: \"%s\"", error.message))); + return true; +} + + +/* + * Delete MongoDB's document. + */ +bool +MongoDelete(MONGO_CONN* conn, char* database, char *collection, BSON* b) +{ + mongoc_collection_t *c = NULL; + bson_error_t error; + bool r = false; + + c = mongoc_client_get_collection (conn, database, collection); + + r = mongoc_collection_delete(c, MONGOC_DELETE_SINGLE_REMOVE, b, NULL, &error); + mongoc_collection_destroy(c); + if (!r) + ereport(ERROR, (errmsg("failed to delete row"), + errhint("Mongo error: \"%s\"", error.message))); + return true; +} + +/* + * Performs a query against the configured MongoDB server and return + * cursor which can be destroyed by calling mongoc_cursor_current. + */ +MONGO_CURSOR* +MongoCursorCreate(MONGO_CONN* conn, char* database, char *collection, BSON* q) +{ + mongoc_collection_t *c = NULL; + MONGO_CURSOR *cur = NULL; + bson_error_t error; + + c = mongoc_client_get_collection (conn, database, collection); + cur = mongoc_collection_find(c, MONGOC_QUERY_NONE, 0, 0, 0, q, NULL, NULL); + mongoc_cursor_error(cur, &error); + if (!cur) + ereport(ERROR, (errmsg("failed to create cursor"), + errhint("Mongo error: \"%s\"", error.message))); + + mongoc_collection_destroy(c); + return cur; +} + + +/* + * Destroy cursor created by calling MongoCursorCreate function. + */ +void +MongoCursorDestroy(MONGO_CURSOR* c) +{ + mongoc_cursor_destroy(c); +} + + +/* + * Get the current document from cursor. + */ +const BSON* +MongoCursorBson(MONGO_CURSOR* c) +{ + return mongoc_cursor_current(c); +} + +/* + * Get the next document from the cursor. + */ +bool +MongoCursorNext(MONGO_CURSOR* c, BSON *b) +{ + return mongoc_cursor_next(c, (const BSON**) &b); +} + + +/* + * Allocates a new bson_t structure, and also initialize the bson + * object. After that point objects can be appended to that bson + * object and can be iterated. A newly allocated bson_t that should + * be freed with bson_destroy(). + */ +BSON* +BsonCreate(void) +{ + BSON *b = NULL; + b = bson_new(); + bson_init(b); + return b; +} + +/* + * Destroy Bson objected created by BsonCreate function. + */ +void +BsonDestroy(BSON *b) +{ + bson_destroy(b); +} + + +/* + * Initialize the bson Iterator. + */ +bool +BsonIterInit(BSON_ITERATOR *it, BSON *b) +{ + return bson_iter_init(it, b); +} + + +bool +BsonIterSubObject(BSON_ITERATOR *it, BSON *b) +{ + /* TODO: Need to see the Meta Driver equalient for "bson_iterator_subobject" */ + return true; +} + +int32_t +BsonIterInt32(BSON_ITERATOR *it) +{ + return bson_iter_int32(it); +} + + +int64_t +BsonIterInt64(BSON_ITERATOR *it) +{ + return bson_iter_int64(it); +} + + +double +BsonIterDouble(BSON_ITERATOR *it) +{ + return bson_iter_double(it); +} + + +bool +BsonIterBool(BSON_ITERATOR *it) +{ + return bson_iter_bool(it); +} + + +const char* +BsonIterString(BSON_ITERATOR *it) +{ + uint32_t len = 0; + return bson_iter_utf8(it, &len); +} + + +const bson_oid_t * +BsonIterOid(BSON_ITERATOR *it) +{ + return bson_iter_oid(it); +} + + +time_t +BsonIterDate(BSON_ITERATOR *it) +{ + return bson_iter_date_time(it); +} + + +const char* +BsonIterKey(BSON_ITERATOR *it) +{ + return bson_iter_key(it); +} + +int +BsonIterType(BSON_ITERATOR *it) +{ + return bson_iter_type(it); +} + +int +BsonIterNext(BSON_ITERATOR *it) +{ + return bson_iter_next(it); +} + + +bool +BsonIterSubIter(BSON_ITERATOR *it, BSON_ITERATOR* sub) +{ + return bson_iter_recurse(it, sub); +} + + +void +BsonOidFromString(bson_oid_t *o, char* str) +{ + bson_oid_init_from_string(o, str); +} + + +bool +BsonAppendOid(BSON *b, const char* key, bson_oid_t *v) +{ + return bson_append_oid(b, key, strlen(key), v); +} + +bool +BsonAppendBool(BSON *b, const char* key, bool v) +{ + return bson_append_bool(b, key, -1, v); +} + +bool +BsonAppendStartObject(BSON* b, char *key, BSON* r) +{ + return bson_append_document_begin(b, key, strlen(key), r); +} + + +bool +BsonAppendFinishObject(BSON* b, BSON* r) +{ + return bson_append_document_end(b, r); +} + + +bool +BsonAppendNull(BSON *b, const char* key) +{ + return bson_append_null(b, key, strlen(key)); +} + + +bool +BsonAppendInt32(BSON *b, const char* key, int v) +{ + return bson_append_int32(b, key, strlen(key), v); +} + + +bool +BsonAppendInt64(BSON *b, const char* key, int64_t v) +{ + return bson_append_int64(b, key, strlen(key), v); +} + +bool +BsonAppendDouble(BSON *b, const char* key, double v) +{ + return bson_append_double(b, key, strlen(key), v); +} + +bool +BsonAppendUTF8(BSON *b, const char* key, char *v) +{ + + return bson_append_utf8(b, key, strlen(key), v, strlen(v)); +} + + +bool +BsonAppendDate(BSON *b, const char* key, time_t v) +{ + return bson_append_date_time(b, key, strlen(key), v); +} + + +bool +BsonAppendBson(BSON* b, char *key, BSON* c) +{ + return bson_append_document(b, key, strlen(key), c); +} + +bool BsonAppendStartArray(BSON *b, const char* key, BSON* c) +{ + return bson_append_array_begin(b, key, -1, c); +} + + +bool BsonAppendFinishArray(BSON *b, BSON* c) +{ + return bson_append_array_end(b, c); +} + + +bool +BsonFinish(BSON* b) +{ + /* + * There is no need for bson_finish in Meta Driver. + * We are doing nothing, just because of compatiblity with legacy + * driver. + */ + return true; +} + +/* + * Count the number of documents. + */ +double +MongoAggregateCount(MONGO_CONN* conn, const char* database, const char* collection, const BSON* b) +{ + BSON *cmd = NULL; + BSON *out; + double count = -1; + bool r = false; + bson_error_t error; + mongoc_collection_t *c = NULL; + + c = mongoc_client_get_collection (conn, database, collection); + + cmd = BsonCreate(); + out = BsonCreate(); + BsonAppendUTF8(cmd, "count", (char*)collection); + if (b) /* not empty */ + BsonAppendBson(cmd, "query", (BSON*)b); + + BsonFinish(cmd); + r = mongoc_collection_command_simple(c, cmd, NULL, out, &error); + if (r) + { + bson_iter_t it; + if (bson_iter_init_find(&it, out, "n")) + count = BsonIterDouble(&it); + } + mongoc_collection_destroy(c); + + BsonDestroy(out); + BsonDestroy(cmd); + return count; +} diff --git a/option.c b/option.c new file mode 100644 index 0000000..51230bb --- /dev/null +++ b/option.c @@ -0,0 +1,237 @@ +/*------------------------------------------------------------------------- + * + * option.c + * Foreign-data wrapper for remote MongoDB servers + * + * Portions Copyright (c) 2012-2014, PostgreSQL Global Development Group + * + * Portions Copyright (c) 2004-2014, EnterpriseDB Corporation. + * + * Portions Copyright (c) 2012–2014 Citus Data, Inc. + * + * IDENTIFICATION + * option.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "mongo_wrapper.h" +#include "mongo_fdw.h" + +#include "access/reloptions.h" +#include "catalog/pg_type.h" +#include "commands/defrem.h" +#include "commands/explain.h" +#include "commands/vacuum.h" +#include "foreign/fdwapi.h" +#include "foreign/foreign.h" +#include "nodes/makefuncs.h" +#include "optimizer/cost.h" +#include "optimizer/pathnode.h" +#include "optimizer/plancat.h" +#include "optimizer/planmain.h" +#include "optimizer/restrictinfo.h" +#include "utils/array.h" +#include "utils/builtins.h" +#include "utils/date.h" +#include "utils/hsearch.h" +#include "utils/lsyscache.h" +#include "utils/rel.h" +#include "utils/memutils.h" +#include "miscadmin.h" + +static char * mongo_get_option_value(Oid foreignTableId, const char *optionName); + +/* + * Validate the generic options given to a FOREIGN DATA WRAPPER, SERVER, + * USER MAPPING or FOREIGN TABLE that uses postgres_fdw. + * + * Raise an ERROR if the option or its value is considered invalid. + */ +extern Datum mongo_fdw_validator(PG_FUNCTION_ARGS); + +PG_FUNCTION_INFO_V1(mongo_fdw_validator); + +/* + * mongo_fdw_validator validates options given to one of the following commands: + * foreign data wrapper, server, user mapping, or foreign table. This function + * errors out if the given option name or its value is considered invalid. + */ +Datum +mongo_fdw_validator(PG_FUNCTION_ARGS) +{ + Datum optionArray = PG_GETARG_DATUM(0); + Oid optionContextId = PG_GETARG_OID(1); + List *optionList = untransformRelOptions(optionArray); + ListCell *optionCell = NULL; + + foreach(optionCell, optionList) + { + DefElem *optionDef = (DefElem *) lfirst(optionCell); + char *optionName = optionDef->defname; + bool optionValid = false; + + int32 optionIndex = 0; + for (optionIndex = 0; optionIndex < ValidOptionCount; optionIndex++) + { + const MongoValidOption *validOption = &(ValidOptionArray[optionIndex]); + + if ((optionContextId == validOption->optionContextId) && + (strncmp(optionName, validOption->optionName, NAMEDATALEN) == 0)) + { + optionValid = true; + break; + } + } + + /* if invalid option, display an informative error message */ + if (!optionValid) + { + StringInfo optionNamesString = mongo_option_names_string(optionContextId); + + ereport(ERROR, (errcode(ERRCODE_FDW_INVALID_OPTION_NAME), + errmsg("invalid option \"%s\"", optionName), + errhint("Valid options in this context are: %s", + optionNamesString->data))); + } + + /* if port option is given, error out if its value isn't an integer */ + if (strncmp(optionName, OPTION_NAME_PORT, NAMEDATALEN) == 0) + { + char *optionValue = defGetString(optionDef); + int32 portNumber = pg_atoi(optionValue, sizeof(int32), 0); + (void) portNumber; + } + } + PG_RETURN_VOID(); +} + +/* + * mongo_option_names_string finds all options that are valid for the current context, + * and concatenates these option names in a comma separated string. + */ +StringInfo +mongo_option_names_string(Oid currentContextId) +{ + StringInfo optionNamesString = makeStringInfo(); + bool firstOptionPrinted = false; + + int32 optionIndex = 0; + for (optionIndex = 0; optionIndex < ValidOptionCount; optionIndex++) + { + const MongoValidOption *validOption = &(ValidOptionArray[optionIndex]); + + /* if option belongs to current context, append option name */ + if (currentContextId == validOption->optionContextId) + { + if (firstOptionPrinted) + appendStringInfoString(optionNamesString, ", "); + + appendStringInfoString(optionNamesString, validOption->optionName); + firstOptionPrinted = true; + } + } + return optionNamesString; +} + + +/* + * mongo_get_options returns the option values to be used when connecting to and + * querying MongoDB. To resolve these values, the function checks the foreign + * table's options, and if not present, falls back to default values. + */ +MongoFdwOptions * +mongo_get_options(Oid foreignTableId) +{ + MongoFdwOptions *options = NULL; + char *addressName = NULL; + char *portName = NULL; + int32 portNumber = 0; + char *svr_database = NULL; + char *collectionName = NULL; + char *svr_username= NULL; + char *svr_password= NULL; + + addressName = mongo_get_option_value(foreignTableId, OPTION_NAME_ADDRESS); + if (addressName == NULL) + addressName = pstrdup(DEFAULT_IP_ADDRESS); + + portName = mongo_get_option_value(foreignTableId, OPTION_NAME_PORT); + if (portName == NULL) + portNumber = DEFAULT_PORT_NUMBER; + else + portNumber = pg_atoi(portName, sizeof(int32), 0); + + svr_database = mongo_get_option_value(foreignTableId, OPTION_NAME_DATABASE); + if (svr_database == NULL) + svr_database = pstrdup(DEFAULT_DATABASE_NAME); + + collectionName = mongo_get_option_value(foreignTableId, OPTION_NAME_COLLECTION); + if (collectionName == NULL) + collectionName = get_rel_name(foreignTableId); + + svr_username = mongo_get_option_value(foreignTableId, OPTION_NAME_USERNAME); + svr_password = mongo_get_option_value(foreignTableId, OPTION_NAME_PASSWORD); + + options = (MongoFdwOptions *) palloc0(sizeof(MongoFdwOptions)); + + options->svr_address = addressName; + options->svr_port = portNumber; + options->svr_database = svr_database; + options->collectionName = collectionName; + options->svr_username = svr_username; + options->svr_password = svr_password; + + return options; +} + + +void +mongo_free_options(MongoFdwOptions *options) +{ + if (options) + { + pfree(options->svr_address); + pfree(options->svr_database); + pfree(options); + } +} + +/* + * mongo_get_option_value walks over foreign table and foreign server options, and + * looks for the option with the given name. If found, the function returns the + * option's value. + */ +static char * +mongo_get_option_value(Oid foreignTableId, const char *optionName) +{ + ForeignTable *foreignTable = NULL; + ForeignServer *foreignServer = NULL; + List *optionList = NIL; + ListCell *optionCell = NULL; + UserMapping *mapping= NULL; + char *optionValue = NULL; + + foreignTable = GetForeignTable(foreignTableId); + foreignServer = GetForeignServer(foreignTable->serverid); + mapping = GetUserMapping(GetUserId(), foreignTable->serverid); + + optionList = list_concat(optionList, foreignTable->options); + optionList = list_concat(optionList, foreignServer->options); + optionList = list_concat(optionList, mapping->options); + + foreach(optionCell, optionList) + { + DefElem *optionDef = (DefElem *) lfirst(optionCell); + char *optionDefName = optionDef->defname; + + if (strncmp(optionDefName, optionName, NAMEDATALEN) == 0) + { + optionValue = defGetString(optionDef); + break; + } + } + return optionValue; +} +