Skip to content

Commit

Permalink
Merge pull request #739 from supabase-community/storage-fix
Browse files Browse the repository at this point in the history
Add missing option builders for resumable uploads and cache options
  • Loading branch information
jan-tennert authored Sep 26, 2024
2 parents 97eb197 + 105d0ae commit f5b424c
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.github.jan.supabase.storage

import io.github.jan.supabase.storage.resumable.Fingerprint
import io.github.jan.supabase.storage.resumable.ResumableClient
import io.github.jan.supabase.storage.resumable.ResumableUpload
import io.ktor.util.cio.readChannel
import io.ktor.utils.io.discard
import java.io.File
Expand All @@ -14,18 +15,18 @@ import kotlin.io.path.fileSize
* If there is an url in the cache for the given [Fingerprint], the upload will be continued.
* @param file The file to upload
* @param path The path to upload the data to
* @param upsert Whether to overwrite existing files
* @param options The options for the upload
*/
suspend fun ResumableClient.createOrContinueUpload(path: String, file: File, upsert: Boolean = false) = createOrContinueUpload({ file.readChannel().apply { discard(it) } }, file.absolutePath, file.length(), path, upsert)
suspend fun ResumableClient.createOrContinueUpload(path: String, file: File, options: UploadOptionBuilder.() -> Unit = {}) = createOrContinueUpload({ file.readChannel().apply { discard(it) } }, file.absolutePath, file.length(), path, options)

/**
* Creates a new resumable upload or continues an existing one.
* If there is an url in the cache for the given [Fingerprint], the upload will be continued.
* @param file The file to upload
* @param path The path to upload the data to
* @param upsert Whether to overwrite existing files
* @param options The options for the upload
*/
suspend fun ResumableClient.createOrContinueUpload(path: String, file: Path, upsert: Boolean = false) = createOrContinueUpload({ file.readChannel().apply { discard(it) } }, file.absolutePathString(), file.fileSize(), path, upsert)
suspend fun ResumableClient.createOrContinueUpload(path: String, file: Path, options: UploadOptionBuilder.() -> Unit = {}) = createOrContinueUpload({ file.readChannel().apply { discard(it) } }, file.absolutePathString(), file.fileSize(), path, options)

/**
* Reads pending uploads from the cache and creates a new [ResumableUpload] for each of them. This done in parallel, so you can start the uploads independently.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import io.ktor.utils.io.jvm.javaio.toByteReadChannel
* Creates a new upload or continues an existing one from the given [uri]
* @param path The path to upload the file to
* @param uri The uri of the file to upload (make sure you have access to it)
* @param upsert Whether to overwrite an existing file
* @param options The options for the upload
*/
suspend fun ResumableClient.createOrContinueUpload(path: String, uri: Uri, upsert: Boolean = false) = createOrContinueUpload(uri.createByteReader(), uri.toString(), uri.contentSize, path, upsert)
suspend fun ResumableClient.createOrContinueUpload(path: String, uri: Uri, options: UploadOptionBuilder.() -> Unit = {}) = createOrContinueUpload(uri.createByteReader(), uri.toString(), uri.contentSize, path, options)

@SuppressLint("Recycle")
private suspend fun Uri.createByteReader(): suspend (Long) -> ByteReadChannel = { offset: Long ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,9 @@ sealed interface BucketApi {
* Creates a signed url to upload without authentication.
* These urls are valid for 2 hours.
* @param path The path to create an url for
* @param upsert Whether to upsert the file if it already exists
*/
suspend fun createSignedUploadUrl(path: String): UploadSignedUrl
suspend fun createSignedUploadUrl(path: String, upsert: Boolean = false): UploadSignedUrl

/**
* Creates a signed url to download without authentication. The url will expire after [expiresIn]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ internal class BucketApiImpl(override val bucketId: String, val storage: Storage
)
}

override suspend fun createSignedUploadUrl(path: String): UploadSignedUrl {
val result = storage.api.post("object/upload/sign/$bucketId/$path")
override suspend fun createSignedUploadUrl(path: String, upsert: Boolean): UploadSignedUrl {
val result = storage.api.post("object/upload/sign/$bucketId/$path") {
header(UPSERT_HEADER, upsert.toString())
}
val urlPath = result.body<JsonObject>()["url"]?.jsonPrimitive?.content?.substring(1)
?: error("Expected a url in create upload signed url response")
val url = Url(storage.resolveUrl(urlPath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,18 @@ import kotlinx.serialization.Serializable
* @param path The storage path
* @param bucketId The bucket id
* @param expiresAt The time the url expires
* @param upsert Whether the entry should be updated if it already exists
* @param contentType The content type of the file
*/
@Serializable
data class ResumableCacheEntry(val url: String, val path: String, val bucketId: String, val expiresAt: Instant)
data class ResumableCacheEntry(
val url: String,
val path: String,
val bucketId: String,
val expiresAt: Instant,
val upsert: Boolean = false, //for compatibility with the old cache
val contentType: String = "application/octet-stream"
)

/**
* A pair of a [Fingerprint] and a [ResumableCacheEntry]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import io.github.jan.supabase.auth.Auth
import io.github.jan.supabase.logging.d
import io.github.jan.supabase.storage.BucketApi
import io.github.jan.supabase.storage.Storage
import io.github.jan.supabase.storage.UploadOptionBuilder
import io.github.jan.supabase.storage.resumable.ResumableClient.Companion.TUS_VERSION
import io.github.jan.supabase.storage.storage
import io.ktor.client.request.bearerAuth
Expand Down Expand Up @@ -41,18 +42,18 @@ sealed interface ResumableClient {
* @param channel A function that takes the offset of the upload and returns a [ByteReadChannel] that reads the data to upload from the given offset
* @param size The size of the data to upload
* @param path The path to upload the data to
* @param upsert Whether to overwrite existing files
* @param options The options for the upload
*/
suspend fun createOrContinueUpload(channel: suspend (offset: Long) -> ByteReadChannel, source: String, size: Long, path: String, upsert: Boolean = false): ResumableUpload
suspend fun createOrContinueUpload(channel: suspend (offset: Long) -> ByteReadChannel, source: String, size: Long, path: String, options: UploadOptionBuilder.() -> Unit = {}): ResumableUpload

/**
* Creates a new resumable upload or continues an existing one.
* If there is an url in the cache for the given [Fingerprint], the upload will be continued.
* @param data The data to upload as a [ByteArray]
* @param path The path to upload the data to
* @param upsert Whether to overwrite existing files
* @param options The options for the upload
*/
suspend fun createOrContinueUpload(data: ByteArray, source: String, path: String, upsert: Boolean = false) = createOrContinueUpload({ ByteReadChannel(data).apply { discard(it) } }, source, data.size.toLong(), path)
suspend fun createOrContinueUpload(data: ByteArray, source: String, path: String, options: UploadOptionBuilder.() -> Unit = {}) = createOrContinueUpload({ ByteReadChannel(data).apply { discard(it) } }, source, data.size.toLong(), path)

/**
* Reads pending uploads from the cache and creates a new [ResumableUpload] for each of them. This done in parallel, so you can start the downloads independently.
Expand Down Expand Up @@ -94,23 +95,24 @@ internal class ResumableClientImpl(private val storageApi: BucketApi, private va
source: String,
size: Long,
path: String,
upsert: Boolean
options: UploadOptionBuilder.() -> Unit
): ResumableUpload {
val cachedEntry = cache.get(Fingerprint(source, size))
if(cachedEntry != null) {
Storage.logger.d { "Found cached upload for $path" }
return resumeUpload(channel, cachedEntry, source, path, size)
}
return createUpload(channel, source, path, size, upsert)
return createUpload(channel, source, path, size, options)
}

private suspend fun createUpload(channel: suspend (Long) -> ByteReadChannel, source: String, path: String, size: Long, upsert: Boolean): ResumableUploadImpl {
private suspend fun createUpload(channel: suspend (Long) -> ByteReadChannel, source: String, path: String, size: Long, options: UploadOptionBuilder.() -> Unit): ResumableUploadImpl {
val uploadOptions = UploadOptionBuilder(storageApi.supabaseClient.storage.serializer).apply(options)
val response = httpClient.post(url) {
header("Upload-Metadata", encodeMetadata(createMetadata(path)))
header("Upload-Metadata", encodeMetadata(createMetadata(path, uploadOptions.contentType)))
bearerAuth(accessTokenOrApiKey())
header("Upload-Length", size)
header("Tus-Resumable", TUS_VERSION)
header("x-upsert", upsert.toString())
header("x-upsert", uploadOptions.upsert)
}
when(response.status) {
HttpStatusCode.Conflict -> error("Specified path already exists. Consider setting upsert to true")
Expand All @@ -120,7 +122,7 @@ internal class ResumableClientImpl(private val storageApi: BucketApi, private va
}
val uploadUrl = response.headers["Location"] ?: error("No upload url found")
val fingerprint = Fingerprint(source, size)
val cacheEntry = ResumableCacheEntry(uploadUrl, path, storageApi.bucketId, Clock.System.now() + 1.days)
val cacheEntry = ResumableCacheEntry(uploadUrl, path, storageApi.bucketId, Clock.System.now() + 1.days, uploadOptions.upsert, uploadOptions.contentType.toString())
cache.set(fingerprint, cacheEntry)
return ResumableUploadImpl(fingerprint, path, cacheEntry, channel, 0, chunkSize, uploadUrl, httpClient, storageApi, { retrieveServerOffset(uploadUrl, path) }) {
cache.remove(fingerprint)
Expand All @@ -132,7 +134,10 @@ internal class ResumableClientImpl(private val storageApi: BucketApi, private va
if(Clock.System.now() > entry.expiresAt) {
Storage.logger.d { "Upload url for $path expired. Creating new one" }
cache.remove(fingerprint)
return createUpload(channel, source, path, size, false)
return createUpload(channel, source, path, size) {
upsert = entry.upsert
contentType = ContentType.parse(entry.contentType)
}
}
val offset = retrieveServerOffset(entry.url, path)
if(offset < size) {
Expand All @@ -156,10 +161,10 @@ internal class ResumableClientImpl(private val storageApi: BucketApi, private va

private fun accessTokenOrApiKey() = storageApi.supabaseClient.pluginManager.getPluginOrNull(Auth)?.currentAccessTokenOrNull() ?: storageApi.supabaseClient.supabaseKey

private fun createMetadata(path: String): Map<String, String> = buildMap {
private fun createMetadata(path: String, contentType: ContentType? = null): Map<String, String> = buildMap {
put("bucketName", storageApi.bucketId)
put("objectName", path)
put("contentType", ContentType.defaultForFilePath(path).toString())
put("contentType", contentType?.toString() ?: ContentType.defaultForFilePath(path).toString())
}

@OptIn(ExperimentalEncodingApi::class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ class UploadViewModel(
file.dataProducer,
file.path ?: file.name,
file.getSize() ?: error("Invalid file"),
path,
true
)
path
) {
upsert = true
}
uploads[upload.fingerprint] = upload
uploadItems.value = uploadItems.value.map {
if(it.fingerprint == upload.fingerprint) UploadState.Loaded(upload.fingerprint, upload.stateFlow.value) else it
Expand Down

0 comments on commit f5b424c

Please sign in to comment.