Skip to content

Commit

Permalink
Allow threads on closes and cleanup functions (#104)
Browse files Browse the repository at this point in the history
These functions can induce a decent amount of I/O and take a while, so
allowing threading on them is reasonable here.
  • Loading branch information
GodTamIt authored Dec 14, 2023
1 parent 3e64dc4 commit 0501998
Showing 1 changed file with 47 additions and 28 deletions.
75 changes: 47 additions & 28 deletions src/rdict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,24 +708,27 @@ impl Rdict {
/// Args:
/// wait (bool): whether to wait for the flush to finish.
#[pyo3(signature = (wait = true))]
fn flush(&self, wait: bool) -> PyResult<()> {
fn flush(&self, wait: bool, py: Python) -> PyResult<()> {
let db = self.get_db()?;
let mut f_opt = FlushOptions::new();
f_opt.set_wait(wait);
if let Some(cf) = &self.column_family {
db.flush_cf_opt(cf, &f_opt)
} else {
db.flush_opt(&f_opt)
}

py.allow_threads(|| {
let mut f_opt = FlushOptions::new();
f_opt.set_wait(wait);
if let Some(cf) = &self.column_family {
db.flush_cf_opt(cf, &f_opt)
} else {
db.flush_opt(&f_opt)
}
})
.map_err(|e| PyException::new_err(e.into_string()))
}

/// Flushes the WAL buffer. If `sync` is set to `true`, also syncs
/// the data to disk.
#[pyo3(signature = (sync = true))]
fn flush_wal(&self, sync: bool) -> PyResult<()> {
fn flush_wal(&self, sync: bool, py: Python) -> PyResult<()> {
let db = self.get_db()?;
db.flush_wal(sync)
py.allow_threads(|| db.flush_wal(sync))
.map_err(|e| PyException::new_err(e.into_string()))
}

Expand Down Expand Up @@ -977,25 +980,33 @@ impl Rdict {
/// alive. `del` or `close` all associated instances mentioned
/// above to actually shut down RocksDB.
///
fn close(&mut self) -> PyResult<()> {
fn close(&mut self, py: Python) -> PyResult<()> {
// do not flush if readonly
if let AccessTypeInner::ReadOnly { .. } | AccessTypeInner::Secondary { .. } =
&self.access_type.0
{
drop(self.column_family.take());
self.db.close();
py.allow_threads(|| {
drop(self.column_family.take());
self.db.close();
});
return Ok(());
}
let f_opt = &self.flush_opt;
let db = self.get_db()?;
let flush_wal_result = db.flush_wal(true);
let flush_result = if let Some(cf) = &self.column_family {
db.flush_cf_opt(cf, &f_opt.into())
} else {
db.flush_opt(&f_opt.into())
};
drop(self.column_family.take());
self.db.close();

let (flush_wal_result, flush_result) = py.allow_threads(|| {
let f_opt = &self.flush_opt;
let db = self.get_db()?;

let flush_wal_result = db.flush_wal(true);
let flush_result = if let Some(cf) = &self.column_family {
db.flush_cf_opt(cf, &f_opt.into())
} else {
db.flush_opt(&f_opt.into())
};
drop(self.column_family.take());
self.db.close();

Ok::<_, PyErr>((flush_wal_result, flush_result))
})?;
match (flush_result, flush_wal_result) {
(Ok(_), Ok(_)) => Ok(()),
(Err(e), Ok(_)) => Err(PyException::new_err(e.to_string())),
Expand Down Expand Up @@ -1114,9 +1125,14 @@ impl Rdict {
/// options (rocksdict.Options): Rocksdb options object
#[staticmethod]
#[pyo3(signature = (path, options = OptionsPy::new(false)))]
fn destroy(path: &str, options: OptionsPy) -> PyResult<()> {
fs::remove_file(config_file(path)).ok();
DB::destroy(&options.inner_opt, path).map_err(|e| PyException::new_err(e.to_string()))
fn destroy(path: &str, options: OptionsPy, py: Python) -> PyResult<()> {
let inner_opt = options.inner_opt;

py.allow_threads(|| {
fs::remove_file(config_file(path)).ok();
DB::destroy(&inner_opt, path)
})
.map_err(|e| PyException::new_err(e.to_string()))
}

/// Repair the database.
Expand All @@ -1126,8 +1142,11 @@ impl Rdict {
/// options (rocksdict.Options): Rocksdb options object
#[staticmethod]
#[pyo3(signature = (path, options = OptionsPy::new(false)))]
fn repair(path: &str, options: OptionsPy) -> PyResult<()> {
DB::repair(&options.inner_opt, path).map_err(|e| PyException::new_err(e.to_string()))
fn repair(path: &str, options: OptionsPy, py: Python) -> PyResult<()> {
let inner_opt = options.inner_opt;

py.allow_threads(|| DB::repair(&inner_opt, path))
.map_err(|e| PyException::new_err(e.to_string()))
}

#[staticmethod]
Expand Down

0 comments on commit 0501998

Please sign in to comment.