Skip to content

Commit

Permalink
Fix store deletion and listing of keys with store-id prefix.
Browse files Browse the repository at this point in the history
  • Loading branch information
whilo committed Dec 2, 2023
1 parent d98bdf5 commit 3e15429
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 18 deletions.
49 changes: 33 additions & 16 deletions src/konserve_s3/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
[konserve.impl.storage-layout :refer [PBackingStore PBackingBlob PBackingLock -delete-store header-size]]
[konserve.utils :refer [async+sync *default-sync-translation*]]
[superv.async :refer [go-try-]]
[taoensso.timbre :refer [info]])
[taoensso.timbre :refer [info trace]])
(:import [java.io ByteArrayInputStream ByteArrayOutputStream]
[java.util Arrays]
;; AWS API
Expand All @@ -26,13 +26,15 @@
GetObjectRequest GetObjectResponse
PutObjectRequest PutObjectRequest
CopyObjectRequest Delete DeleteObjectRequest DeleteObjectsRequest HeadObjectRequest
NoSuchBucketException NoSuchKeyException]
NoSuchBucketException NoSuchKeyException
ObjectIdentifier]
[software.amazon.awssdk.core.sync RequestBody]))

#_(set! *warn-on-reflection* 1)

(def ^:const default-bucket "konserve")
(def ^:const output-stream-buffer-size (* 1024 1024))
(def ^:const deletion-batch-size 1000)

(def regions (into {} (map (fn [r] [(.toString r) r]) (Region/regions))))

Expand Down Expand Up @@ -108,7 +110,7 @@
(let [request (-> (ListObjectsRequest/builder)
(.bucket bucket)
(.build))]
(map #(.key %) (.contents (.listObjects client request)))))
(doall (map #(.key %) (.contents (.listObjects client request))))))

(defn copy [client bucket source-key destination-key]
(.copyObject client (-> (CopyObjectRequest/builder)
Expand All @@ -125,12 +127,16 @@
(.build))))

(defn delete-keys [client bucket keys]
(.deleteObject client (-> (DeleteObjectsRequest/builder)
(.bucket bucket)
(.delete (-> (Delete/builder)
(.objects keys)
(.build)))
(.build))))
(let [keys-ids (map (fn [key] (-> (ObjectIdentifier/builder)
(.key key)
(.build)))
keys)]
(.deleteObjects client (-> (DeleteObjectsRequest/builder)
(.bucket bucket)
(.delete (-> (Delete/builder)
(.objects keys-ids)
(.build)))
(.build)))))

(extend-protocol PBackingLock
Boolean
Expand Down Expand Up @@ -235,17 +241,28 @@
(async+sync (:sync? env) *default-sync-translation*
(go-try- (when (bucket-exists? client bucket)
(info "This will delete all konserve files, but won't delete the bucket. You can use konserve-s3.core/delete-bucket if you intend to delete the bucket as well.")
(doseq [keys (partition 1000 (list-objects client bucket))]
(doseq [keys (->> (list-objects client bucket)
(filter (fn [^String key]
(and (.startsWith key store-id)
(or (.endsWith key ".ksv")
(.endsWith key ".ksv.new")
(.endsWith key ".ksv.backup")))))
(partition deletion-batch-size deletion-batch-size []))]
(trace "deleting keys: " keys)
(delete-keys client bucket keys))
(.close client)))))
(-keys [_ env]
(async+sync (:sync? env) *default-sync-translation*
(go-try- (filter (fn [^String key]
(and (.startsWith key store-id)
(or (.endsWith key ".ksv")
(.endsWith key ".ksv.new")
(.endsWith key ".ksv.backup"))))
(list-objects client bucket))))))
(go-try-
(let [keys (list-objects client bucket)]
(->> (filter (fn [^String key]
(and (.startsWith key store-id)
(or (.endsWith key ".ksv")
(.endsWith key ".ksv.new")
(.endsWith key ".ksv.backup"))))
keys)
;; remove store-id prefix
(map #(subs % (inc (count store-id))))))))))

(defn connect-store [s3-spec & {:keys [opts]
:as params}]
Expand Down
7 changes: 5 additions & 2 deletions test/konserve_s3/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
(:require [clojure.test :refer [deftest testing]]
[clojure.core.async :refer [<!!]]
[konserve.compliance-test :refer [compliance-test]]
[konserve-s3.core :refer [connect-store release delete-store]]))
[konserve-s3.core :refer [connect-store release delete-store]]
[konserve.core :as k]))

(def s3-spec {:region "us-west-1"
:store-id "test-store"})
:store-id "test-store"
:access-key (System/getenv "AWS_ACCESS_KEY_ID")
:secret (System/getenv "AWS_SECRET_ACCESS_KEY")})

(deftest s3-compliance-sync-test
(let [s3-spec (assoc s3-spec :bucket "konserve-s3-sync-test")
Expand Down

0 comments on commit 3e15429

Please sign in to comment.