diff --git a/src/rdict.rs b/src/rdict.rs index 781b32b..3723655 100644 --- a/src/rdict.rs +++ b/src/rdict.rs @@ -708,24 +708,27 @@ impl Rdict { /// Args: /// wait (bool): whether to wait for the flush to finish. #[pyo3(signature = (wait = true))] - fn flush(&self, wait: bool) -> PyResult<()> { + fn flush(&self, wait: bool, py: Python) -> PyResult<()> { let db = self.get_db()?; - let mut f_opt = FlushOptions::new(); - f_opt.set_wait(wait); - if let Some(cf) = &self.column_family { - db.flush_cf_opt(cf, &f_opt) - } else { - db.flush_opt(&f_opt) - } + + py.allow_threads(|| { + let mut f_opt = FlushOptions::new(); + f_opt.set_wait(wait); + if let Some(cf) = &self.column_family { + db.flush_cf_opt(cf, &f_opt) + } else { + db.flush_opt(&f_opt) + } + }) .map_err(|e| PyException::new_err(e.into_string())) } /// Flushes the WAL buffer. If `sync` is set to `true`, also syncs /// the data to disk. #[pyo3(signature = (sync = true))] - fn flush_wal(&self, sync: bool) -> PyResult<()> { + fn flush_wal(&self, sync: bool, py: Python) -> PyResult<()> { let db = self.get_db()?; - db.flush_wal(sync) + py.allow_threads(|| db.flush_wal(sync)) .map_err(|e| PyException::new_err(e.into_string())) } @@ -977,25 +980,33 @@ impl Rdict { /// alive. `del` or `close` all associated instances mentioned /// above to actually shut down RocksDB. /// - fn close(&mut self) -> PyResult<()> { + fn close(&mut self, py: Python) -> PyResult<()> { // do not flush if readonly if let AccessTypeInner::ReadOnly { .. } | AccessTypeInner::Secondary { .. } = &self.access_type.0 { - drop(self.column_family.take()); - self.db.close(); + py.allow_threads(|| { + drop(self.column_family.take()); + self.db.close(); + }); return Ok(()); } - let f_opt = &self.flush_opt; - let db = self.get_db()?; - let flush_wal_result = db.flush_wal(true); - let flush_result = if let Some(cf) = &self.column_family { - db.flush_cf_opt(cf, &f_opt.into()) - } else { - db.flush_opt(&f_opt.into()) - }; - drop(self.column_family.take()); - self.db.close(); + + let (flush_wal_result, flush_result) = py.allow_threads(|| { + let f_opt = &self.flush_opt; + let db = self.get_db()?; + + let flush_wal_result = db.flush_wal(true); + let flush_result = if let Some(cf) = &self.column_family { + db.flush_cf_opt(cf, &f_opt.into()) + } else { + db.flush_opt(&f_opt.into()) + }; + drop(self.column_family.take()); + self.db.close(); + + Ok::<_, PyErr>((flush_wal_result, flush_result)) + })?; match (flush_result, flush_wal_result) { (Ok(_), Ok(_)) => Ok(()), (Err(e), Ok(_)) => Err(PyException::new_err(e.to_string())), @@ -1114,9 +1125,14 @@ impl Rdict { /// options (rocksdict.Options): Rocksdb options object #[staticmethod] #[pyo3(signature = (path, options = OptionsPy::new(false)))] - fn destroy(path: &str, options: OptionsPy) -> PyResult<()> { - fs::remove_file(config_file(path)).ok(); - DB::destroy(&options.inner_opt, path).map_err(|e| PyException::new_err(e.to_string())) + fn destroy(path: &str, options: OptionsPy, py: Python) -> PyResult<()> { + let inner_opt = options.inner_opt; + + py.allow_threads(|| { + fs::remove_file(config_file(path)).ok(); + DB::destroy(&inner_opt, path) + }) + .map_err(|e| PyException::new_err(e.to_string())) } /// Repair the database. @@ -1126,8 +1142,11 @@ impl Rdict { /// options (rocksdict.Options): Rocksdb options object #[staticmethod] #[pyo3(signature = (path, options = OptionsPy::new(false)))] - fn repair(path: &str, options: OptionsPy) -> PyResult<()> { - DB::repair(&options.inner_opt, path).map_err(|e| PyException::new_err(e.to_string())) + fn repair(path: &str, options: OptionsPy, py: Python) -> PyResult<()> { + let inner_opt = options.inner_opt; + + py.allow_threads(|| DB::repair(&inner_opt, path)) + .map_err(|e| PyException::new_err(e.to_string())) } #[staticmethod]