Skip to content

Commit

Permalink
Merge pull request #59 from rolobio/feature/table-contains
Browse files Browse the repository at this point in the history
You can check if a Dict belongs to a Table by using in
  • Loading branch information
rolobio authored May 31, 2019
2 parents f8f22c1 + c67d746 commit 2b74b9c
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 34 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,17 @@ bob['manager_id'] = None
[aly,]
```

### Check if a Dict belongs to a Table
```python
>>> bob = Person(name='bob').flush()
>>> bob.table
Person
>>> bob in Person
True
>>> bob in Car
False
```

### Advanced query'ing
```python
# DictORM supports many simple expressions. It is by no means exhaustive, but
Expand Down
61 changes: 40 additions & 21 deletions dictorm/dictorm.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""What if you could insert a Python dictionary into the database? DictORM allows you to select/insert/update rows of a database as if they were Python Dictionaries."""
from typing import Union, Optional, List

__version__ = '4.0'
__version__ = '4.1'

from contextlib import contextmanager
from itertools import chain
Expand Down Expand Up @@ -107,7 +107,7 @@ class Dict(dict):
"""

def __init__(self, table, *a, **kw):
self._table: Table = table
self.table: Table = table
self._in_db = False
self._curs: CursorHint = table.db.curs
super(Dict, self).__init__(*a, **kw)
Expand All @@ -123,37 +123,37 @@ def flush(self):
All original column/values will bet inserted/updated by this method.
All references will be flushed as well.
"""
if self._table.refs:
if self.table.refs:
for i in self.values():
if isinstance(i, Dict):
i.flush()

# This will be sent to the DB, don't convert dicts to json unless
# the table has json columns.
items = self.no_refs()
if self._table.has_json:
if self.table.has_json:
items = _json_dicts(items)

# Insert/Update only with columns present on the table, this allows custom
# instances of Dicts to be inserted even if they have columns not on the table
items = {k: v for k, v in items.items() if k in self._table.column_names}
items = {k: v for k, v in items.items() if k in self.table.column_names}

if not self._in_db:
# Insert this Dict into it's respective table, interpolating
# my values into the query
query = self._table.db.insert(self._table.name, **items
).returning('*')
query = self.table.db.insert(self.table.name, **items
).returning('*')
d = self.__execute_query(query)
self._in_db = True
else:
# Update this dictionary's row
if not self._table.pks:
if not self.table.pks:
raise NoPrimaryKey(
'Cannot update to {0}, no primary keys defined.'.format(
self._table))
self.table))
# Update without references, "wheres" are the primary values
query = self._table.db.update(self._table.name, **items
).where(self._old_pk_and or self.pk_and()).returning('*')
query = self.table.db.update(self.table.name, **items
).where(self._old_pk_and or self.pk_and()).returning('*')
d = self.__execute_query(query)

if d:
Expand All @@ -166,7 +166,7 @@ def delete(self):
Delete this row from it's table in the database. Requires primary keys
to be specified.
"""
query = self._table.db.delete(self._table.name).where(
query = self.table.db.delete(self.table.name).where(
self._old_pk_and or self.pk_and())
return self.__execute_query(query)

Expand All @@ -188,37 +188,37 @@ def pk_and(self):
Return an And() of all this Dict's primary key and values. i.e.
And(id=1, other_primary=4)
"""
return And(*[self._table[k] == v for k, v in self.items() if k in \
self._table.pks])
return And(*[self.table[k] == v for k, v in self.items() if k in \
self.table.pks])

def no_pks(self):
"""
Return a dictionary without the primary keys that are associated with
this Dict in the Database. This should be used when doing an update of
another Dict.
"""
return {k: v for k, v in self.items() if k not in self._table.pks}
return {k: v for k, v in self.items() if k not in self.table.pks}

def no_refs(self):
"""
Return a dictionary without the key/value(s) added by a reference. They
should never be sent in the query to the Database.
"""
return {k: v for k, v in self.items() if k not in self._table.refs}
return {k: v for k, v in self.items() if k not in self.table.refs}

def references(self):
"""
Return a dictionary of only the referenced rows.
"""
return {k: v for k, v in self.items() if k in self._table.refs}
return {k: v for k, v in self.items() if k in self.table.refs}

def __getitem__(self, key):
"""
Get the provided "key" from this Dict instance. If the key refers to a
referenced row, get that row first. Will only get a referenced row
once, until the referenced row's foreign key is changed.
"""
ref = self._table.refs.get(key)
ref = self.table.refs.get(key)
if not ref and key not in self:
raise KeyError(str(key))
# Only get the referenced row once, if it has a value, the reference's
Expand Down Expand Up @@ -252,7 +252,7 @@ def __setitem__(self, key, value):
Set self[key] to value. If key is a reference's matching foreign key,
set the reference to None.
"""
ref = self._table.fks.get(key)
ref = self.table.fks.get(key)
if ref:
super(Dict, self).__setitem__(ref, None)
return super(Dict, self).__setitem__(key, value)
Expand Down Expand Up @@ -548,12 +548,16 @@ def get_where(self, *a, **kw) -> ResultsGenerator:
You cannot use this method without primary keys, unless you specify the
column you are matching.
>>> get_where(some_column=83)
>>> your_table.get_where(some_column=83)
ResultsGenerator()
>>> get_where(4) # no primary keys defined!
>>> your_table.get_where(4) # no primary keys defined!
NoPrimaryKey()
Check if a Dict belongs to this Table:
>>> bob in Person
True
"""
# All args/kwargs are combined in an SQL And comparison
operator_group = args_to_comp(And(), self, *a, **kw)
Expand Down Expand Up @@ -672,6 +676,21 @@ def __getitem__(self, ref_name) -> Union[Column, SqliteColumn]:
return self.refs[ref_name]
return self.db.column(self, ref_name)

def __contains__(self, item: Dict):
"""
Compare a row's table to myself. If the tables match, the row is a member of this
table.
Example:
>>> bob in Person
True
>>> bob in Car
False
"""
if isinstance(item, Dict):
return item.table == self
raise ValueError('Cannot check if item is in this Table because it is not a Dict.')


class DictDB(dict):
"""
Expand Down
40 changes: 27 additions & 13 deletions dictorm/test/test_dictorm.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,13 +603,13 @@ def test_reexecute(self):
alice = Person(name='Alice', manager_id=bob['id']).flush()
self.assertEqual(alice['manager'], bob)

original_get_where = alice._table.get_where
alice._table.get_where = error
original_get_where = alice.table.get_where
alice.table.get_where = error
self.assertEqual(alice['manager'], bob)

steve = Person(name='Steve').flush()

alice._table.get_where = original_get_where
alice.table.get_where = original_get_where
alice['manager_id'] = steve['id']
alice.flush()
self.assertEqualNoRefs(alice['manager'], steve)
Expand Down Expand Up @@ -647,8 +647,8 @@ def test_table_equal(self):

will = Person(name='Will').flush()
bob = Person(name='Bob').flush()
self.assertEqual(will._table, bob._table)
self.assertIs(will._table, bob._table)
self.assertEqual(will.table, bob.table)
self.assertIs(will.table, bob.table)

Car = self.db['car']
self.assertNotEqual(Person, Car)
Expand All @@ -659,10 +659,10 @@ def test_table_equal(self):
will.flush()
will['car']['license_plate'] = 'foo'

self.assertEqual(stratus._table, Car)
self.assertIs(stratus._table, Car)
self.assertEqual(will['car']._table, Car)
self.assertIs(will['car']._table, Car)
self.assertEqual(stratus.table, Car)
self.assertIs(stratus.table, Car)
self.assertEqual(will['car'].table, Car)
self.assertIs(will['car'].table, Car)

def test_real(self):
"""
Expand Down Expand Up @@ -836,16 +836,16 @@ def test_onetoone_cache(self):
bob['manager_id'] = bill['id']

self.assertEqual(bob['manager'], bill)
old_get_one = bob._table.get_one
bob._table.get_one = error
old_get_one = bob.table.get_one
bob.table.get_one = error
# Error fuction shouldn't be called, since manager is cached
self.assertEqual(bob['manager'], bill)

Car = self.db['car']
Person['car'] = Person['car_id'] == Car['id']
Person['manager_car'] = Person['manager'].substratum('car')

bob._table.get_one = old_get_one
bob.table.get_one = old_get_one
self.assertEqual(bob['manager_car'], None)
bill_car = Car(name='Prius').flush()
bill['car_id'] = bill_car['id']
Expand Down Expand Up @@ -875,7 +875,7 @@ def test_results_cache(self):
for sub in subordinates:
assert isinstance(sub, dictorm.Dict)
# Error would be raised if subordinates isn't cached
bob._table.get_where = error
bob.table.get_where = error
for sub in subordinates:
assert isinstance(sub, dictorm.Dict)

Expand Down Expand Up @@ -1322,6 +1322,20 @@ def test_any(self):
list(Person.get_where(Person['id'].Any([2, 3])))
)

def test_contains(self):
"""
A Dict can be matched to the Table it originates from.
"""
Person, Car = self.db['person'], self.db['car']
steve = Person(name='Steve').flush()
steve_car = Car(name='Stratus', person_id=steve['id']).flush()
self.assertTrue(steve in Person)
self.assertFalse(steve in Car)
self.assertFalse(steve_car in Person)
self.assertTrue(steve_car in Car)

self.assertRaises(ValueError, Person.__contains__, 'foo')


class SqliteTestBase(object):

Expand Down

0 comments on commit 2b74b9c

Please sign in to comment.