Skip to content

Commit

Permalink
store -> store!, make db first arg, use storage stored on DB, do not …
Browse files Browse the repository at this point in the history
…re-create adapter, batch addresses, collect-garbage!
  • Loading branch information
tonsky committed Jul 28, 2023
1 parent d673b89 commit 1fd4a7e
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 97 deletions.
217 changes: 130 additions & 87 deletions src/datascript/core_storage.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

(in-ns 'datascript.core)

(require
Expand All @@ -13,6 +12,7 @@
'[java.io BufferedOutputStream File FileOutputStream OutputStream PushbackReader]
'[me.tonsky.persistent_sorted_set ANode Branch Leaf PersistentSortedSet RefType Settings])

;; datascriptstorageroot23718
(def ^:private root-addr
#uuid "d5695966-036d-6740-8541-d80034219c28")

Expand All @@ -36,7 +36,123 @@
; (defprotocol ICache
; (-compute-if-absent [_ key compute-fn]))

(defn output-stream ^OutputStream [^File file]
(def ^:private ^:dynamic *store-buffer*)

(defrecord StorageAdapter [storage ^Settings settings]
me.tonsky.persistent_sorted_set.IStorage
(store [_ ^ANode node]
(let [addr (squuid)
keys (map (fn [^Datom d]
[(.-e d) (.-a d) (.-v d) (.-tx d)])
(.keys node))
data (cond-> {:level (.level node)
:keys keys}
(instance? Branch node)
(assoc :addresses (.addresses ^Branch node)))]
(vswap! *store-buffer* conj! [addr data])
addr))
(restore [_ addr]
(let [{:keys [level keys addresses]} (-restore storage addr)
keys' ^List (map (fn [[e a v tx]] (db/datom e a v tx)) keys)]
(if addresses
(Branch. (int level) keys' ^List addresses settings)
(Leaf. keys' settings)))))

(defn- make-adapter [storage opts]
(let [settings (@#'set/map->settings opts)]
(->StorageAdapter storage settings)))

(defn- adapter ^StorageAdapter [db]
(.-_storage ^PersistentSortedSet (:eavt db)))

(defn storage
"Returns IStorage used by current DB instance"
[db]
(when-some [adapter (adapter db)]
(:storage adapter)))

(defn- store-impl! [db adapter opts]
(binding [*store-buffer* (volatile! (transient []))]
(let [eavt ^PersistentSortedSet (:eavt db)
settings (.-_settings eavt)
meta (merge
{:schema (:schema db)
:max-eid (:max-eid db)
:max-tx (:max-tx db)
:eavt (set/store (:eavt db) adapter)
:aevt (set/store (:aevt db) adapter)
:avet (set/store (:avet db) adapter)}
(@#'set/settings->map settings))]
(vswap! *store-buffer* conj! [root-addr meta])
(-store (:storage adapter) (persistent! @*store-buffer*))
db)))

(defn store!
([db]
(if-some [adapter (adapter db)]
(store-impl! db adapter {})
(throw (ex-info "Database has no associated storage" {}))))
([db storage]
(if-some [adapter (adapter db)]
(let [current-storage (:storage adapter)]
(if (identical? current-storage storage)
(store-impl! db adapter {})
(throw (ex-info "Database is already stored with another IStorage" {:storage current-storage}))))
(let [settings (.-_settings ^PersistentSortedSet (:eavt db))
adapter (StorageAdapter. storage settings)]
(store-impl! db adapter {})))))

(defn restore
([storage]
(restore storage {}))
([storage opts]
(let [root (-restore storage root-addr)
{:keys [schema eavt aevt avet max-eid max-tx]} root
opts (merge root opts)
adapter (make-adapter storage opts)]
(db/restore-db
{:schema schema
:eavt (set/restore-by db/cmp-datoms-eavt eavt adapter opts)
:aevt (set/restore-by db/cmp-datoms-aevt aevt adapter opts)
:avet (set/restore-by db/cmp-datoms-avet avet adapter opts)
:max-eid max-eid
:max-tx max-tx}))))

(defn- addresses-impl [db *set]
(let [visit-fn #(vswap! *set conj! %)]
(visit-fn root-addr)
(.walkAddresses ^PersistentSortedSet (:eavt db) visit-fn)
(.walkAddresses ^PersistentSortedSet (:aevt db) visit-fn)
(.walkAddresses ^PersistentSortedSet (:avet db) visit-fn)))

(defn addresses
"Returns all addresses in use by current db. Anything that is not in
the return set is safe to be deleted"
[& dbs]
(let [*set (volatile! (transient #{}))]
(doseq [db dbs]
(addresses-impl db *set))
(persistent! @*set)))

(defn collect-garbage!
"Deletes all keys from storage that are not referenced by any of the provided dbs.
Careful! If you have a lazy-loaded database and do GC on a newer version of it,
old version might stop working. Make sure to always pass all alive references
to DBs you are using"
[& dbs]
(let [used (apply addresses dbs)]
(doseq [db dbs
:let [storage (storage db)]
:when storage
:let [unused (->> (-list-addresses storage)
(remove used)
(vec))]]
(-delete storage unused))))

(defn- output-stream
"OutputStream that ignores flushes. Workaround for slow transit-clj JSON writer.
See https://github.com/cognitect/transit-clj/issues/43"
^OutputStream [^File file]
(let [os (FileOutputStream. file)]
(proxy [BufferedOutputStream] [os]
(flush [])
Expand All @@ -46,6 +162,18 @@
(proxy-super close))))))

(defn file-storage
"Default implementation that stores data in files in a dir.
Options are:
:freeze-fn :: (data) -> String. A serialization function
:thaw-fn :: (String) -> data. A deserialization function
:write-fn :: (OutputStream data) -> void. Implement your own writer to FileOutputStream
:read-fn :: (InputStream) -> Object. Implement your own reader from FileInputStream
:addr->filename-fn :: (UUID) -> String. Construct file name from address
:filename->addr-fn :: (String) -> UUID. Reconstruct address from file name
All options are optional."
([dir]
(file-storage dir {}))
([dir opts]
Expand Down Expand Up @@ -86,88 +214,3 @@
(-delete [_ addrs-seq]
(doseq [addr addrs-seq]
(.delete (io/file dir (addr->filename-fn addr)))))))))

(deftype StorageAdapter [istorage ^Settings settings *buffer]
me.tonsky.persistent_sorted_set.IStorage
(store [_ ^ANode node]
(let [addr (squuid)
keys (map (fn [^Datom d]
[(.-e d) (.-a d) (.-v d) (.-tx d)])
(.keys node))
data (cond-> {:level (.level node)
:keys keys}
(instance? Branch node)
(assoc :addresses (.addresses ^Branch node)))]
; (-store istorage addr data)
(vswap! *buffer conj! [addr data])
addr))
(restore [_ addr]
(let [{:keys [level keys addresses]} (-restore istorage addr)
keys' ^List (map (fn [[e a v tx]] (db/datom e a v tx)) keys)]
(if addresses
(Branch. (int level) keys' ^List addresses settings)
(Leaf. keys' settings)))))

(defn validate-storage [^PersistentSortedSet set istorage]
(when-some [^StorageAdapter adapter (.-_storage set)]
(let [istorage' (.-istorage adapter)]
(when-not (identical? istorage' istorage)
(throw (ex-info "Database is already stored with another IStorage"
{:istorage istorage'}))))))

(defn store
([istorage db]
(store istorage db {}))
([istorage db opts]
(validate-storage (:eavt db) istorage)
(validate-storage (:aevt db) istorage)
(validate-storage (:avet db) istorage)
(let [eavt ^PersistentSortedSet (:eavt db)
settings (.-_settings eavt)
*buffer (volatile! (transient []))
adapter (StorageAdapter. istorage settings *buffer)
meta (merge
{:schema (:schema db)
:max-eid (:max-eid db)
:max-tx (:max-tx db)
:eavt (set/store (:eavt db) adapter)
:aevt (set/store (:aevt db) adapter)
:avet (set/store (:avet db) adapter)}
(@#'set/settings->map settings))]
(vswap! *buffer conj! [root-addr meta])
(-store istorage (persistent! @*buffer)))))

(defn restore
([istorage]
(restore istorage {}))
([istorage opts]
(let [root (-restore istorage root-addr)
{:keys [schema eavt aevt avet max-eid max-tx]} root
settings (@#'set/map->settings root)
adapter (StorageAdapter. istorage settings nil)]
(db/restore-db
{:schema schema
:eavt (set/restore-by db/cmp-datoms-eavt eavt adapter root)
:aevt (set/restore-by db/cmp-datoms-aevt aevt adapter root)
:avet (set/restore-by db/cmp-datoms-avet avet adapter root)
:max-eid max-eid
:max-tx max-tx}))))

(defn addresses
"Returns all addresses in use by current db. Anything that is not in
the return set is safe to be deleted"
[db]
(let [set (HashSet.)
visit-fn #(.add set %)]
(.add set root-addr)
(.walkAddresses ^PersistentSortedSet (:eavt db) visit-fn)
(.walkAddresses ^PersistentSortedSet (:aevt db) visit-fn)
(.walkAddresses ^PersistentSortedSet (:avet db) visit-fn)
set))

(defn collect-garbage [istorage db]
(let [used (addresses db)
unused (->> (-list-addresses istorage)
(remove #(contains? used %))
(vec))]
(-delete istorage unused)))
6 changes: 3 additions & 3 deletions src/datascript/db.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -964,9 +964,9 @@
(map->DB
{:schema schema
:rschema (rschema (merge implicit-schema schema))
:eavt (set/sorted-set-opts (assoc opts :cmp cmp-datoms-eavt))
:aevt (set/sorted-set-opts (assoc opts :cmp cmp-datoms-aevt))
:avet (set/sorted-set-opts (assoc opts :cmp cmp-datoms-avet))
:eavt (set/sorted-set* (assoc opts :cmp cmp-datoms-eavt))
:aevt (set/sorted-set* (assoc opts :cmp cmp-datoms-aevt))
:avet (set/sorted-set* (assoc opts :cmp cmp-datoms-avet))
:max-eid e0
:max-tx tx0
:pull-patterns (lru/cache 100)
Expand Down
14 changes: 7 additions & 7 deletions test_storage/datascript/test_storage.clj
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,19 @@
(def db (d/from-serializable json {:branching-factor 512}))
(count db))

(d/store streaming-edn-storage (d/empty-db))
(d/store! (d/empty-db) streaming-edn-storage)

(d/store streaming-edn-storage db) ;; 10 sec
(d/store inmemory-edn-storage db) ;; 10 sec
(d/store streaming-transit-json-storage db) ;; 7.5 sec
(d/store inmemory-transit-json-storage db) ;; 6.4 sec
(d/store streaming-transit-msgpack-storage db) ;; 6.3 sec
(d/store! db streaming-edn-storage) ;; 10 sec
(d/store! db inmemory-edn-storage) ;; 10 sec
(d/store! db streaming-transit-json-storage) ;; 7.5 sec
(d/store! db inmemory-transit-json-storage) ;; 6.4 sec
(d/store! db streaming-transit-msgpack-storage) ;; 6.3 sec

(def db' (d/restore streaming-edn-storage))

(count (d/addresses db'))
(count (d/-list-addresses streaming-edn-storage))
(d/collect-garbage streaming-edn-storage db')
(d/collect-garbage! db')

(first (:eavt db'))

Expand Down

0 comments on commit 1fd4a7e

Please sign in to comment.