Skip to content

Commit c01968e

Browse files
authored
Cancel all background work on last reference drop (#98)
This cancels all background work before dropping the last reference to prevent background work from keeping the database open.
1 parent 4496dd8 commit c01968e

File tree

5 files changed

+105
-49
lines changed

5 files changed

+105
-49
lines changed

src/db_reference.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
use rocksdb::DB;
2+
use std::cell::RefCell;
3+
use std::sync::Arc;
4+
5+
/// The type of a reference to a [rocksdb::DB] that is passed around the library.
6+
pub(crate) type DbReference = Arc<RefCell<DB>>;
7+
8+
/// A wrapper around [DbReference] that cancels all background work when dropped.
9+
///
10+
/// All users of [rocksdb::DB] should use this wrapper instead to avoid keeping background threads
11+
/// alive after the database is dropped.
12+
#[derive(Clone)]
13+
pub(crate) struct DbReferenceHolder {
14+
inner: Option<DbReference>,
15+
}
16+
17+
impl DbReferenceHolder {
18+
pub fn new(db: DB) -> Self {
19+
Self {
20+
inner: Some(Arc::new(RefCell::new(db))),
21+
}
22+
}
23+
24+
pub fn get(&self) -> Option<&DbReference> {
25+
self.inner.as_ref()
26+
}
27+
28+
pub fn close(&mut self) {
29+
if let Some(db) = self.inner.take().and_then(Arc::into_inner) {
30+
db.borrow_mut().cancel_all_background_work(true);
31+
}
32+
}
33+
}
34+
35+
impl Drop for DbReferenceHolder {
36+
fn drop(&mut self) {
37+
self.close();
38+
}
39+
}

src/iter.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,21 @@
1+
use crate::db_reference::DbReferenceHolder;
12
use crate::encoder::{decode_value, encode_key};
3+
use crate::exceptions::DbClosedError;
24
use crate::util::error_message;
35
use crate::{ReadOpt, ReadOptionsPy};
46
use core::slice;
57
use libc::{c_char, c_uchar, size_t};
68
use pyo3::exceptions::PyException;
79
use pyo3::prelude::*;
8-
use rocksdb::{AsColumnFamilyRef, ColumnFamily, DB};
9-
use std::cell::RefCell;
10+
use rocksdb::{AsColumnFamilyRef, ColumnFamily};
1011
use std::ptr::null_mut;
1112
use std::sync::Arc;
1213

1314
#[pyclass]
1415
#[allow(dead_code)]
1516
pub(crate) struct RdictIter {
1617
/// iterator must keep a reference count of DB to keep DB alive.
17-
pub(crate) db: Arc<RefCell<DB>>,
18+
pub(crate) db: DbReferenceHolder,
1819

1920
pub(crate) inner: *mut librocksdb_sys::rocksdb_iterator_t,
2021

@@ -49,26 +50,29 @@ pub(crate) struct RdictValues {
4950

5051
impl RdictIter {
5152
pub(crate) fn new(
52-
db: &Arc<RefCell<DB>>,
53+
db: &DbReferenceHolder,
5354
cf: &Option<Arc<ColumnFamily>>,
5455
readopts: ReadOptionsPy,
5556
pickle_loads: &PyObject,
5657
raw_mode: bool,
5758
py: Python,
5859
) -> PyResult<Self> {
5960
let readopts = readopts.to_read_opt(raw_mode, py)?;
61+
62+
let db_inner = db
63+
.get()
64+
.ok_or_else(|| DbClosedError::new_err("DB instance already closed"))?
65+
.borrow()
66+
.inner();
67+
6068
Ok(RdictIter {
6169
db: db.clone(),
6270
inner: unsafe {
6371
match cf {
64-
None => {
65-
librocksdb_sys::rocksdb_create_iterator(db.borrow().inner(), readopts.0)
72+
None => librocksdb_sys::rocksdb_create_iterator(db_inner, readopts.0),
73+
Some(cf) => {
74+
librocksdb_sys::rocksdb_create_iterator_cf(db_inner, readopts.0, cf.inner())
6675
}
67-
Some(cf) => librocksdb_sys::rocksdb_create_iterator_cf(
68-
db.borrow().inner(),
69-
readopts.0,
70-
cf.inner(),
71-
),
7276
}
7377
},
7478
readopts,

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
// #![feature(core_intrinsics)]
2+
mod db_reference;
23
mod encoder;
34
mod exceptions;
45
mod iter;

src/rdict.rs

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::db_reference::{DbReference, DbReferenceHolder};
12
use crate::encoder::{decode_value, encode_key, encode_value};
23
use crate::exceptions::DbClosedError;
34
use crate::iter::{RdictItems, RdictKeys, RdictValues};
@@ -71,7 +72,7 @@ pub(crate) struct Rdict {
7172
pub(crate) access_type: AccessType,
7273
pub(crate) slice_transforms: Arc<RwLock<HashMap<String, SliceTransformType>>>,
7374
// drop DB last
74-
pub(crate) db: Option<Arc<RefCell<DB>>>,
75+
pub(crate) db: DbReferenceHolder,
7576
}
7677

7778
/// Define DB Access Types.
@@ -136,9 +137,9 @@ impl Rdict {
136137
.save(config_path)
137138
}
138139

139-
fn get_db(&self) -> PyResult<&Arc<RefCell<DB>>> {
140+
fn get_db(&self) -> PyResult<&DbReference> {
140141
self.db
141-
.as_ref()
142+
.get()
142143
.ok_or_else(|| DbClosedError::new_err("DB instance already closed"))
143144
}
144145
}
@@ -266,7 +267,7 @@ impl Rdict {
266267
// save rocksdict config
267268
rocksdict_config.save(config_path)?;
268269
Ok(Rdict {
269-
db: Some(Arc::new(RefCell::new(db))),
270+
db: DbReferenceHolder::new(db),
270271
write_opt: (&w_opt).into(),
271272
flush_opt: FlushOptionsPy::new(),
272273
read_opt: r_opt.to_read_options(options.raw_mode, py)?,
@@ -640,7 +641,7 @@ impl Rdict {
640641
};
641642

642643
RdictIter::new(
643-
self.get_db()?,
644+
&self.db,
644645
&self.column_family,
645646
read_opt,
646647
&self.loads,
@@ -818,7 +819,7 @@ impl Rdict {
818819
"column name `{name}` does not exist, use `create_cf` to creat it",
819820
))),
820821
Some(cf) => Ok(Self {
821-
db: Some(db.clone()),
822+
db: self.db.clone(),
822823
write_opt: (&self.write_opt_py).into(),
823824
flush_opt: self.flush_opt,
824825
read_opt: self.read_opt_py.to_read_options(self.opt_py.raw_mode, py)?,
@@ -855,7 +856,10 @@ impl Rdict {
855856
None => Err(PyException::new_err(format!(
856857
"column name `{name}` does not exist, use `create_cf` to creat it",
857858
))),
858-
Some(cf) => Ok(ColumnFamilyPy { cf, db: db.clone() }),
859+
Some(cf) => Ok(ColumnFamilyPy {
860+
cf,
861+
db: self.db.clone(),
862+
}),
859863
}
860864
}
861865

@@ -1014,8 +1018,8 @@ impl Rdict {
10141018
/// Other Column Family `Rdict` instances, `ColumnFamily`
10151019
/// (cf handle) instances, iterator instances such as`RdictIter`,
10161020
/// `RdictItems`, `RdictKeys`, `RdictValues` can all keep RocksDB
1017-
/// alive. `del` all associated instances mentioned above
1018-
/// to actually shut down RocksDB.
1021+
/// alive. `del` or `close` all associated instances mentioned
1022+
/// above to actually shut down RocksDB.
10191023
///
10201024
fn close(&mut self) -> PyResult<()> {
10211025
let f_opt = &self.flush_opt;
@@ -1024,7 +1028,7 @@ impl Rdict {
10241028
AccessTypeInner::ReadOnly { .. } | AccessTypeInner::Secondary { .. } => {
10251029
drop(db);
10261030
drop(self.column_family.take());
1027-
drop(self.db.take());
1031+
self.db.close();
10281032
return Ok(());
10291033
}
10301034
_ => (),
@@ -1037,7 +1041,7 @@ impl Rdict {
10371041
};
10381042
drop(db);
10391043
drop(self.column_family.take());
1040-
drop(self.db.take());
1044+
self.db.close();
10411045
match (flush_result, flush_wal_result) {
10421046
(Ok(_), Ok(_)) => Ok(()),
10431047
(Err(e), Ok(_)) => Err(PyException::new_err(e.to_string())),
@@ -1257,7 +1261,7 @@ fn get_batch_inner<'a>(
12571261
impl Drop for Rdict {
12581262
// flush
12591263
fn drop(&mut self) {
1260-
if let Some(db) = &self.db {
1264+
if let Some(db) = self.db.get() {
12611265
let f_opt = &self.flush_opt;
12621266
let db = db.borrow();
12631267
let _ = if let Some(cf) = &self.column_family {
@@ -1269,7 +1273,7 @@ impl Drop for Rdict {
12691273
// important, always drop column families first
12701274
// to ensure that CF handles have shorter life than DB.
12711275
drop(self.column_family.take());
1272-
drop(self.db.take());
1276+
self.db.close();
12731277
}
12741278
}
12751279

@@ -1283,7 +1287,7 @@ pub(crate) struct ColumnFamilyPy {
12831287
// must follow this drop order
12841288
pub(crate) cf: Arc<ColumnFamily>,
12851289
// must keep db alive
1286-
db: Arc<RefCell<DB>>,
1290+
db: DbReferenceHolder,
12871291
}
12881292

12891293
unsafe impl Send for ColumnFamilyPy {}

src/snapshot.rs

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
use crate::db_reference::{DbReference, DbReferenceHolder};
12
use crate::encoder::{decode_value, encode_key};
3+
use crate::exceptions::DbClosedError;
24
use crate::{Rdict, RdictItems, RdictIter, RdictKeys, RdictValues, ReadOptionsPy};
35
use pyo3::exceptions::PyException;
46
use pyo3::prelude::*;
5-
use rocksdb::{ColumnFamily, ReadOptions, DB};
6-
use std::cell::RefCell;
7+
use rocksdb::{ColumnFamily, ReadOptions};
78
use std::ops::Deref;
89
use std::sync::Arc;
910

@@ -43,7 +44,7 @@ pub struct Snapshot {
4344
pub(crate) pickle_loads: PyObject,
4445
pub(crate) read_opt: ReadOptions,
4546
// decrease db Rc last
46-
pub(crate) db: Arc<RefCell<DB>>,
47+
pub(crate) db: DbReferenceHolder,
4748
pub(crate) raw_mode: bool,
4849
}
4950

@@ -133,7 +134,7 @@ impl Snapshot {
133134

134135
/// read from snapshot
135136
fn __getitem__(&self, key: &PyAny, py: Python) -> PyResult<PyObject> {
136-
let db = self.db.borrow();
137+
let db = self.get_db().borrow();
137138
let key = encode_key(key, self.raw_mode)?;
138139
let value_result = if let Some(cf) = &self.column_family {
139140
db.get_pinned_cf_opt(cf.deref(), &key[..], &self.read_opt)
@@ -152,33 +153,40 @@ impl Snapshot {
152153

153154
impl Snapshot {
154155
pub(crate) fn new(rdict: &Rdict, py: Python) -> PyResult<Self> {
155-
if let Some(db) = &rdict.db {
156-
let db_borrow = db.borrow();
157-
let snapshot = unsafe { librocksdb_sys::rocksdb_create_snapshot(db_borrow.inner()) };
158-
let r_opt: ReadOptions = rdict
159-
.read_opt_py
160-
.to_read_options(rdict.opt_py.raw_mode, py)?;
161-
unsafe {
162-
set_snapshot(r_opt.inner(), snapshot);
163-
}
164-
Ok(Snapshot {
165-
inner: snapshot,
166-
column_family: rdict.column_family.clone(),
167-
pickle_loads: rdict.loads.clone(),
168-
read_opt: r_opt,
169-
db: db.clone(),
170-
raw_mode: rdict.opt_py.raw_mode,
171-
})
172-
} else {
173-
Err(PyException::new_err("DB already closed"))
156+
let db_inner = rdict
157+
.db
158+
.get()
159+
.ok_or_else(|| DbClosedError::new_err("DB instance already closed"))?
160+
.borrow()
161+
.inner();
162+
let snapshot = unsafe { librocksdb_sys::rocksdb_create_snapshot(db_inner) };
163+
let r_opt: ReadOptions = rdict
164+
.read_opt_py
165+
.to_read_options(rdict.opt_py.raw_mode, py)?;
166+
unsafe {
167+
set_snapshot(r_opt.inner(), snapshot);
174168
}
169+
Ok(Snapshot {
170+
inner: snapshot,
171+
column_family: rdict.column_family.clone(),
172+
pickle_loads: rdict.loads.clone(),
173+
read_opt: r_opt,
174+
db: rdict.db.clone(),
175+
raw_mode: rdict.opt_py.raw_mode,
176+
})
177+
}
178+
179+
fn get_db(&self) -> &DbReference {
180+
self.db
181+
.get()
182+
.expect("Snapshot should never close its DbReference")
175183
}
176184
}
177185

178186
impl Drop for Snapshot {
179187
fn drop(&mut self) {
180188
unsafe {
181-
librocksdb_sys::rocksdb_release_snapshot(self.db.borrow().inner(), self.inner);
189+
librocksdb_sys::rocksdb_release_snapshot(self.get_db().borrow().inner(), self.inner);
182190
}
183191
}
184192
}

0 commit comments

Comments
 (0)