Skip to content

Commit

Permalink
Bulk insert support (#142)
Browse files Browse the repository at this point in the history
* Notable efficiency improvement for regular inserts
* Bulk insert operation
  • Loading branch information
grigi authored Jun 15, 2019
1 parent 97b96cb commit c984426
Show file tree
Hide file tree
Showing 12 changed files with 286 additions and 79 deletions.
28 changes: 24 additions & 4 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,28 @@
Changelog
=========

0.12.1
------
* Notable efficiency improvement for regular inserts
* Bulk insert operation:

.. note::
The bulk insert operation will do the minimum to ensure that the object
created in the DB has all the defaults and generated fields set,
but may be incomplete reference in Python.

e.g. ``IntField`` primary keys will not be poplulated.

This is recommend only for throw away inserts where you want to ensure optimal
insert performance.

.. code-block:: python3
User.bulk_create([
User(name="...", email="..."),
User(name="...", email="...")
])
0.12.0
------
* Tortoise ORM now supports non-autonumber primary keys.
Expand Down Expand Up @@ -36,8 +58,6 @@ Changelog
guid = fields.UUIDField(pk=True)
For more info, please have a look at :ref:`init_app`

0.11.13
-------
Expand Down Expand Up @@ -69,12 +89,12 @@ Changelog

0.11.7
------
- Fixed 'unique_together' for foreign keys (#114)
- Fixed ``unique_together`` for foreign keys (#114)
- Fixed Field.to_db_value method to handle Enum (#113 #115 #116)

0.11.6
------
- Added ability to use "unique_together" meta Model option
- Added ability to use ``unique_together`` meta Model option

0.11.5
------
Expand Down
32 changes: 29 additions & 3 deletions docs/models.rst
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,33 @@ Any of these are valid primary key definitions in a Model:
guid = fields.UUIDField(pk=True)
The ``Meta`` class
------------------

.. autoclass:: tortoise.models.Model.Meta

.. attribute:: abstract
:annotation: = False

Set to ``True`` to indicate this is an abstract class

.. attribute:: table
:annotation: = ""

Set this to configure a manual table name, instead of a generated one

.. attribute:: unique_together
:annotation: = None

Specify ``unique_together`` to set up compound unique indexes for sets of columns.

It should be a tuple of tuples (lists are fine) in the format of:

.. code-block:: python3
unique_together=("field_a", "field_b")
unique_together=(("field_a", "field_b"), )
unique_together=(("field_a", "field_b"), ("field_c", "field_d", "field_e")
``ForeignKeyField``
-------------------
Expand Down Expand Up @@ -195,7 +222,6 @@ The reverse lookup of ``team.event_team`` works exactly the same way.
Reference
=========

.. autoclass:: tortoise.models.Model
:members:
.. automodule:: tortoise.models
:members: Model
:undoc-members:

2 changes: 1 addition & 1 deletion tortoise/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,4 +401,4 @@ async def do_stuff():
loop.run_until_complete(Tortoise.close_connections())


__version__ = "0.12.0"
__version__ = "0.12.1"
8 changes: 8 additions & 0 deletions tortoise/backends/asyncpg/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,14 @@ async def execute_insert(self, query: str, values: list) -> Optional[asyncpg.Rec
stmt = await connection.prepare(query)
return await stmt.fetchrow(*values)

@translate_exceptions
@retry_connection
async def execute_many(self, query: str, values: list) -> None:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
# TODO: Consider using copy_records_to_table instead
await connection.executemany(query, values)

@translate_exceptions
@retry_connection
async def execute_query(self, query: str) -> List[dict]:
Expand Down
4 changes: 2 additions & 2 deletions tortoise/backends/base/config_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
"engine": "tortoise.backends.sqlite",
"skip_first_char": False,
"vmap": {"path": "file_path"},
"defaults": {},
"cast": {},
"defaults": {"journal_mode": "WAL", "journal_size_limit": 16384},
"cast": {"journal_size_limit": int},
},
"mysql": {
"engine": "tortoise.backends.mysql",
Expand Down
56 changes: 36 additions & 20 deletions tortoise/backends/base/executor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from functools import partial
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Set, Tuple, Type # noqa

from pypika import JoinType, Table
Expand All @@ -9,7 +10,7 @@
if TYPE_CHECKING: # pragma: nocoverage
from tortoise.models import Model

INSERT_CACHE = {} # type: Dict[str, Tuple[list, list, str]]
INSERT_CACHE = {} # type: Dict[str, Tuple[list, str, Dict[str, Callable]]]


class BaseExecutor:
Expand All @@ -23,6 +24,24 @@ def __init__(self, model, db=None, prefetch_map=None, prefetch_queries=None):
self.prefetch_map = prefetch_map if prefetch_map else {}
self._prefetch_queries = prefetch_queries if prefetch_queries else {}

key = "{}:{}".format(self.db.connection_name, self.model._meta.table)
if key not in INSERT_CACHE:
self.regular_columns, columns = self._prepare_insert_columns()
self.query = self._prepare_insert_statement(columns)

self.column_map = {} # type: Dict[str, Callable]
for column in self.regular_columns:
field_object = self.model._meta.fields_map[column]
if field_object.__class__ in self.TO_DB_OVERRIDE:
func = partial(self.TO_DB_OVERRIDE[field_object.__class__], field_object)
else:
func = field_object.to_db_value
self.column_map[column] = func

INSERT_CACHE[key] = self.regular_columns, self.query, self.column_map
else:
self.regular_columns, self.query, self.column_map = INSERT_CACHE[key]

async def execute_explain(self, query) -> Any:
sql = " ".join(((self.EXPLAIN_PREFIX, query.get_sql())))
return await self.db.execute_query(sql)
Expand Down Expand Up @@ -54,14 +73,6 @@ def _field_to_db(cls, field_object: fields.Field, attr: Any, instance) -> Any:
return cls.TO_DB_OVERRIDE[field_object.__class__](field_object, attr, instance)
return field_object.to_db_value(attr, instance)

def _prepare_insert_values(self, instance, regular_columns: List[str]) -> list:
return [
self._field_to_db(
self.model._meta.fields_map[column], getattr(instance, column), instance
)
for column in regular_columns
]

def _prepare_insert_statement(self, columns: List[str]) -> str:
# Insert should implement returning new id to saved object
# Each db has it's own methods for it, so each implementation should
Expand All @@ -72,27 +83,32 @@ async def _process_insert_result(self, instance: "Model", results: Any):
raise NotImplementedError() # pragma: nocoverage

async def execute_insert(self, instance):
key = "{}:{}".format(self.db.connection_name, self.model._meta.table)
if key not in INSERT_CACHE:
regular_columns, columns = self._prepare_insert_columns()
query = self._prepare_insert_statement(columns)
INSERT_CACHE[key] = regular_columns, columns, query
else:
regular_columns, columns, query = INSERT_CACHE[key]

values = self._prepare_insert_values(instance=instance, regular_columns=regular_columns)
insert_result = await self.db.execute_insert(query, values)
values = [
self.column_map[column](getattr(instance, column), instance)
for column in self.regular_columns
]
insert_result = await self.db.execute_insert(self.query, values)
await self._process_insert_result(instance, insert_result)
return instance

async def execute_bulk_insert(self, instances):
values_lists = [
[
self.column_map[column](getattr(instance, column), instance)
for column in self.regular_columns
]
for instance in instances
]
await self.db.execute_many(self.query, values_lists)

async def execute_update(self, instance):
table = Table(self.model._meta.table)
query = self.db.query_class.update(table)
for field, db_field in self.model._meta.fields_db_projection.items():
field_object = self.model._meta.fields_map[field]
if not field_object.generated:
query = query.set(
db_field, self._field_to_db(field_object, getattr(instance, field), instance)
db_field, self.column_map[field](getattr(instance, field), instance)
)
query = query.where(
getattr(table, self.model._meta.db_pk_field)
Expand Down
9 changes: 8 additions & 1 deletion tortoise/backends/mysql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,17 @@ async def execute_insert(self, query: str, values: list) -> int:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
async with connection.cursor() as cursor:
# TODO: Use prepared statement, and cache it
await cursor.execute(query, values)
return cursor.lastrowid # return auto-generated id

@translate_exceptions
@retry_connection
async def execute_many(self, query: str, values: list) -> None:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
async with connection.cursor() as cursor:
await cursor.executemany(query, values)

@translate_exceptions
@retry_connection
async def execute_query(self, query: str) -> List[aiomysql.DictCursor]:
Expand Down
25 changes: 23 additions & 2 deletions tortoise/backends/sqlite/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ class SqliteClient(BaseDBAsyncClient):
def __init__(self, file_path: str, **kwargs) -> None:
super().__init__(**kwargs)
self.filename = file_path

self.pragmas = kwargs.copy()
self.pragmas.pop("connection_name", None)
self.pragmas.pop("fetch_inserted", None)

self._transaction_class = type(
"TransactionWrapper", (TransactionWrapper, self.__class__), {}
)
Expand All @@ -52,15 +57,24 @@ async def create_connection(self, with_db: bool) -> None:
self._connection.start()
await self._connection._connect()
self._connection._conn.row_factory = sqlite3.Row
for pragma, val in self.pragmas.items():
cursor = await self._connection.execute("PRAGMA {}={}".format(pragma, val))
await cursor.close()
self.log.debug(
"Created connection %s with params: filename=%s", self._connection, self.filename
"Created connection %s with params: filename=%s %s",
self._connection,
self.filename,
" ".join(["{}={}".format(k, v) for k, v in self.pragmas.items()]),
)

async def close(self) -> None:
if self._connection:
await self._connection.close()
self.log.debug(
"Closed connection %s with params: filename=%s", self._connection, self.filename
"Closed connection %s with params: filename=%s %s",
self._connection,
self.filename,
" ".join(["{}={}".format(k, v) for k, v in self.pragmas.items()]),
)
self._connection = None

Expand Down Expand Up @@ -91,6 +105,13 @@ async def execute_insert(self, query: str, values: list) -> int:
self.log.debug("%s: %s", query, values)
return (await connection.execute_insert(query, values))[0]

@translate_exceptions
async def execute_many(self, query: str, values: List[list]) -> None:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
# TODO: Ensure that this is wrapped by a transaction, will provide a big speedup
await connection.executemany(query, values)

@translate_exceptions
async def execute_query(self, query: str) -> List[dict]:
async with self.acquire_connection() as connection:
Expand Down
Loading

0 comments on commit c984426

Please sign in to comment.