diff --git a/api/swagger.yml b/api/swagger.yml index fd6492ebcf0..36c64b3f62d 100644 --- a/api/swagger.yml +++ b/api/swagger.yml @@ -90,6 +90,15 @@ components: required: false schema: type: string + + IfMatch: + in: header + name: If-Match + description: Set to the object's ETag to atomically allow operations only if the object's current ETag matches the provided value. + example: "2e9ec317e197e02e4264d128c2e7e681" + required: false + schema: + type: string NoTombstone: in: query @@ -4877,6 +4886,7 @@ paths: parameters: - $ref: "#/components/parameters/IfNoneMatch" + - $ref: "#/components/parameters/IfMatch" responses: 200: @@ -4906,6 +4916,8 @@ paths: $ref: "#/components/responses/PreconditionFailed" 429: description: too many requests + 501: + $ref: "#/components/responses/NotImplemented" default: $ref: "#/components/responses/ServerError" @@ -5114,6 +5126,7 @@ paths: parameters: - $ref: "#/components/parameters/IfNoneMatch" + - $ref: "#/components/parameters/IfMatch" - in: query name: storageClass description: Deprecated, this capability will not be supported in future releases. @@ -5146,6 +5159,8 @@ paths: $ref: "#/components/responses/PreconditionFailed" 429: description: too many requests + 501: + $ref: "#/components/responses/NotImplemented" default: $ref: "#/components/responses/ServerError" delete: diff --git a/clients/java/api/openapi.yaml b/clients/java/api/openapi.yaml index 583e79c6ca9..7734920fbe5 100644 --- a/clients/java/api/openapi.yaml +++ b/clients/java/api/openapi.yaml @@ -5086,6 +5086,16 @@ paths: schema: type: string style: simple + - description: Set to the object's ETag to atomically allow operations only + if the object's current ETag matches the provided value. + example: 2e9ec317e197e02e4264d128c2e7e681 + explode: false + in: header + name: If-Match + required: false + schema: + type: string + style: simple requestBody: content: application/json: @@ -5137,6 +5147,12 @@ paths: description: Precondition Failed "429": description: too many requests + "501": + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + description: Not Implemented default: content: application/json: @@ -5507,6 +5523,16 @@ paths: schema: type: string style: simple + - description: Set to the object's ETag to atomically allow operations only + if the object's current ETag matches the provided value. + example: 2e9ec317e197e02e4264d128c2e7e681 + explode: false + in: header + name: If-Match + required: false + schema: + type: string + style: simple - deprecated: true description: "Deprecated, this capability will not be supported in future\ \ releases." @@ -5573,6 +5599,12 @@ paths: description: Precondition Failed "429": description: too many requests + "501": + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + description: Not Implemented default: content: application/json: @@ -7719,6 +7751,17 @@ components: schema: type: string style: simple + IfMatch: + description: Set to the object's ETag to atomically allow operations only if + the object's current ETag matches the provided value. + example: 2e9ec317e197e02e4264d128c2e7e681 + explode: false + in: header + name: If-Match + required: false + schema: + type: string + style: simple NoTombstone: description: delete entry without tombstone when possible *EXPERIMENTAL* explode: true diff --git a/clients/java/docs/ObjectsApi.md b/clients/java/docs/ObjectsApi.md index 44c1ae28fdd..1fc3db232c4 100644 --- a/clients/java/docs/ObjectsApi.md +++ b/clients/java/docs/ObjectsApi.md @@ -955,7 +955,7 @@ null (empty response body) # **uploadObject** -> ObjectStats uploadObject(repository, branch, path).ifNoneMatch(ifNoneMatch).storageClass(storageClass).force(force).content(content).execute(); +> ObjectStats uploadObject(repository, branch, path).ifNoneMatch(ifNoneMatch).ifMatch(ifMatch).storageClass(storageClass).force(force).content(content).execute(); @@ -1006,12 +1006,14 @@ public class Example { String branch = "branch_example"; // String | String path = "path_example"; // String | relative to the branch String ifNoneMatch = "*"; // String | Set to \"*\" to atomically allow the upload only if the key has no object yet. Other values are not supported. + String ifMatch = "2e9ec317e197e02e4264d128c2e7e681"; // String | Set to the object's ETag to atomically allow operations only if the object's current ETag matches the provided value. String storageClass = "storageClass_example"; // String | Deprecated, this capability will not be supported in future releases. Boolean force = false; // Boolean | File content = new File("/path/to/file"); // File | Only a single file per upload which must be named \\\"content\\\". try { ObjectStats result = apiInstance.uploadObject(repository, branch, path) .ifNoneMatch(ifNoneMatch) + .ifMatch(ifMatch) .storageClass(storageClass) .force(force) .content(content) @@ -1036,6 +1038,7 @@ public class Example { | **branch** | **String**| | | | **path** | **String**| relative to the branch | | | **ifNoneMatch** | **String**| Set to \"*\" to atomically allow the upload only if the key has no object yet. Other values are not supported. | [optional] | +| **ifMatch** | **String**| Set to the object's ETag to atomically allow operations only if the object's current ETag matches the provided value. | [optional] | | **storageClass** | **String**| Deprecated, this capability will not be supported in future releases. | [optional] | | **force** | **Boolean**| | [optional] [default to false] | | **content** | **File**| Only a single file per upload which must be named \\\"content\\\". | [optional] | @@ -1063,5 +1066,6 @@ public class Example { | **404** | Resource Not Found | - | | **412** | Precondition Failed | - | | **429** | too many requests | - | +| **501** | Not Implemented | - | | **0** | Internal Server Error | - | diff --git a/clients/java/docs/StagingApi.md b/clients/java/docs/StagingApi.md index 837e40a3ec3..7174267c442 100644 --- a/clients/java/docs/StagingApi.md +++ b/clients/java/docs/StagingApi.md @@ -110,7 +110,7 @@ public class Example { # **linkPhysicalAddress** -> ObjectStats linkPhysicalAddress(repository, branch, path, stagingMetadata).ifNoneMatch(ifNoneMatch).execute(); +> ObjectStats linkPhysicalAddress(repository, branch, path, stagingMetadata).ifNoneMatch(ifNoneMatch).ifMatch(ifMatch).execute(); associate staging on this physical address with a path @@ -164,9 +164,11 @@ public class Example { String path = "path_example"; // String | relative to the branch StagingMetadata stagingMetadata = new StagingMetadata(); // StagingMetadata | String ifNoneMatch = "*"; // String | Set to \"*\" to atomically allow the upload only if the key has no object yet. Other values are not supported. + String ifMatch = "2e9ec317e197e02e4264d128c2e7e681"; // String | Set to the object's ETag to atomically allow operations only if the object's current ETag matches the provided value. try { ObjectStats result = apiInstance.linkPhysicalAddress(repository, branch, path, stagingMetadata) .ifNoneMatch(ifNoneMatch) + .ifMatch(ifMatch) .execute(); System.out.println(result); } catch (ApiException e) { @@ -189,6 +191,7 @@ public class Example { | **path** | **String**| relative to the branch | | | **stagingMetadata** | [**StagingMetadata**](StagingMetadata.md)| | | | **ifNoneMatch** | **String**| Set to \"*\" to atomically allow the upload only if the key has no object yet. Other values are not supported. | [optional] | +| **ifMatch** | **String**| Set to the object's ETag to atomically allow operations only if the object's current ETag matches the provided value. | [optional] | ### Return type @@ -214,5 +217,6 @@ public class Example { | **409** | conflict with a commit, try here | - | | **412** | Precondition Failed | - | | **429** | too many requests | - | +| **501** | Not Implemented | - | | **0** | Internal Server Error | - | diff --git a/clients/java/src/main/java/io/lakefs/clients/sdk/ObjectsApi.java b/clients/java/src/main/java/io/lakefs/clients/sdk/ObjectsApi.java index a26e2cc234e..3237414a287 100644 --- a/clients/java/src/main/java/io/lakefs/clients/sdk/ObjectsApi.java +++ b/clients/java/src/main/java/io/lakefs/clients/sdk/ObjectsApi.java @@ -2133,7 +2133,7 @@ public okhttp3.Call executeAsync(final ApiCallback _callback) throws ApiEx public APIupdateObjectUserMetadataRequest updateObjectUserMetadata(String repository, String branch, String path, UpdateObjectUserMetadata updateObjectUserMetadata) { return new APIupdateObjectUserMetadataRequest(repository, branch, path, updateObjectUserMetadata); } - private okhttp3.Call uploadObjectCall(String repository, String branch, String path, String ifNoneMatch, String storageClass, Boolean force, File content, final ApiCallback _callback) throws ApiException { + private okhttp3.Call uploadObjectCall(String repository, String branch, String path, String ifNoneMatch, String ifMatch, String storageClass, Boolean force, File content, final ApiCallback _callback) throws ApiException { String basePath = null; // Operation Servers String[] localBasePaths = new String[] { }; @@ -2180,6 +2180,10 @@ private okhttp3.Call uploadObjectCall(String repository, String branch, String p localVarHeaderParams.put("If-None-Match", localVarApiClient.parameterToString(ifNoneMatch)); } + if (ifMatch != null) { + localVarHeaderParams.put("If-Match", localVarApiClient.parameterToString(ifMatch)); + } + final String[] localVarAccepts = { "application/json" }; @@ -2202,7 +2206,7 @@ private okhttp3.Call uploadObjectCall(String repository, String branch, String p } @SuppressWarnings("rawtypes") - private okhttp3.Call uploadObjectValidateBeforeCall(String repository, String branch, String path, String ifNoneMatch, String storageClass, Boolean force, File content, final ApiCallback _callback) throws ApiException { + private okhttp3.Call uploadObjectValidateBeforeCall(String repository, String branch, String path, String ifNoneMatch, String ifMatch, String storageClass, Boolean force, File content, final ApiCallback _callback) throws ApiException { // verify the required parameter 'repository' is set if (repository == null) { throw new ApiException("Missing the required parameter 'repository' when calling uploadObject(Async)"); @@ -2218,20 +2222,20 @@ private okhttp3.Call uploadObjectValidateBeforeCall(String repository, String br throw new ApiException("Missing the required parameter 'path' when calling uploadObject(Async)"); } - return uploadObjectCall(repository, branch, path, ifNoneMatch, storageClass, force, content, _callback); + return uploadObjectCall(repository, branch, path, ifNoneMatch, ifMatch, storageClass, force, content, _callback); } - private ApiResponse uploadObjectWithHttpInfo(String repository, String branch, String path, String ifNoneMatch, String storageClass, Boolean force, File content) throws ApiException { - okhttp3.Call localVarCall = uploadObjectValidateBeforeCall(repository, branch, path, ifNoneMatch, storageClass, force, content, null); + private ApiResponse uploadObjectWithHttpInfo(String repository, String branch, String path, String ifNoneMatch, String ifMatch, String storageClass, Boolean force, File content) throws ApiException { + okhttp3.Call localVarCall = uploadObjectValidateBeforeCall(repository, branch, path, ifNoneMatch, ifMatch, storageClass, force, content, null); Type localVarReturnType = new TypeToken(){}.getType(); return localVarApiClient.execute(localVarCall, localVarReturnType); } - private okhttp3.Call uploadObjectAsync(String repository, String branch, String path, String ifNoneMatch, String storageClass, Boolean force, File content, final ApiCallback _callback) throws ApiException { + private okhttp3.Call uploadObjectAsync(String repository, String branch, String path, String ifNoneMatch, String ifMatch, String storageClass, Boolean force, File content, final ApiCallback _callback) throws ApiException { - okhttp3.Call localVarCall = uploadObjectValidateBeforeCall(repository, branch, path, ifNoneMatch, storageClass, force, content, _callback); + okhttp3.Call localVarCall = uploadObjectValidateBeforeCall(repository, branch, path, ifNoneMatch, ifMatch, storageClass, force, content, _callback); Type localVarReturnType = new TypeToken(){}.getType(); localVarApiClient.executeAsync(localVarCall, localVarReturnType, _callback); return localVarCall; @@ -2242,6 +2246,7 @@ public class APIuploadObjectRequest { private final String branch; private final String path; private String ifNoneMatch; + private String ifMatch; private String storageClass; private Boolean force; private File content; @@ -2262,6 +2267,16 @@ public APIuploadObjectRequest ifNoneMatch(String ifNoneMatch) { return this; } + /** + * Set ifMatch + * @param ifMatch Set to the object's ETag to atomically allow operations only if the object's current ETag matches the provided value. (optional) + * @return APIuploadObjectRequest + */ + public APIuploadObjectRequest ifMatch(String ifMatch) { + this.ifMatch = ifMatch; + return this; + } + /** * Set storageClass * @param storageClass Deprecated, this capability will not be supported in future releases. (optional) @@ -2307,11 +2322,12 @@ public APIuploadObjectRequest content(File content) { 404 Resource Not Found - 412 Precondition Failed - 429 too many requests - + 501 Not Implemented - 0 Internal Server Error - */ public okhttp3.Call buildCall(final ApiCallback _callback) throws ApiException { - return uploadObjectCall(repository, branch, path, ifNoneMatch, storageClass, force, content, _callback); + return uploadObjectCall(repository, branch, path, ifNoneMatch, ifMatch, storageClass, force, content, _callback); } /** @@ -2328,11 +2344,12 @@ public okhttp3.Call buildCall(final ApiCallback _callback) throws ApiException { 404 Resource Not Found - 412 Precondition Failed - 429 too many requests - + 501 Not Implemented - 0 Internal Server Error - */ public ObjectStats execute() throws ApiException { - ApiResponse localVarResp = uploadObjectWithHttpInfo(repository, branch, path, ifNoneMatch, storageClass, force, content); + ApiResponse localVarResp = uploadObjectWithHttpInfo(repository, branch, path, ifNoneMatch, ifMatch, storageClass, force, content); return localVarResp.getData(); } @@ -2350,11 +2367,12 @@ public ObjectStats execute() throws ApiException { 404 Resource Not Found - 412 Precondition Failed - 429 too many requests - + 501 Not Implemented - 0 Internal Server Error - */ public ApiResponse executeWithHttpInfo() throws ApiException { - return uploadObjectWithHttpInfo(repository, branch, path, ifNoneMatch, storageClass, force, content); + return uploadObjectWithHttpInfo(repository, branch, path, ifNoneMatch, ifMatch, storageClass, force, content); } /** @@ -2372,11 +2390,12 @@ public ApiResponse executeWithHttpInfo() throws ApiException { 404 Resource Not Found - 412 Precondition Failed - 429 too many requests - + 501 Not Implemented - 0 Internal Server Error - */ public okhttp3.Call executeAsync(final ApiCallback _callback) throws ApiException { - return uploadObjectAsync(repository, branch, path, ifNoneMatch, storageClass, force, content, _callback); + return uploadObjectAsync(repository, branch, path, ifNoneMatch, ifMatch, storageClass, force, content, _callback); } } @@ -2397,6 +2416,7 @@ public okhttp3.Call executeAsync(final ApiCallback _callback) throw 404 Resource Not Found - 412 Precondition Failed - 429 too many requests - + 501 Not Implemented - 0 Internal Server Error - */ diff --git a/clients/java/src/main/java/io/lakefs/clients/sdk/StagingApi.java b/clients/java/src/main/java/io/lakefs/clients/sdk/StagingApi.java index f7886e71996..6573a49762b 100644 --- a/clients/java/src/main/java/io/lakefs/clients/sdk/StagingApi.java +++ b/clients/java/src/main/java/io/lakefs/clients/sdk/StagingApi.java @@ -283,7 +283,7 @@ public okhttp3.Call executeAsync(final ApiCallback _callback) t public APIgetPhysicalAddressRequest getPhysicalAddress(String repository, String branch, String path) { return new APIgetPhysicalAddressRequest(repository, branch, path); } - private okhttp3.Call linkPhysicalAddressCall(String repository, String branch, String path, StagingMetadata stagingMetadata, String ifNoneMatch, final ApiCallback _callback) throws ApiException { + private okhttp3.Call linkPhysicalAddressCall(String repository, String branch, String path, StagingMetadata stagingMetadata, String ifNoneMatch, String ifMatch, final ApiCallback _callback) throws ApiException { String basePath = null; // Operation Servers String[] localBasePaths = new String[] { }; @@ -318,6 +318,10 @@ private okhttp3.Call linkPhysicalAddressCall(String repository, String branch, S localVarHeaderParams.put("If-None-Match", localVarApiClient.parameterToString(ifNoneMatch)); } + if (ifMatch != null) { + localVarHeaderParams.put("If-Match", localVarApiClient.parameterToString(ifMatch)); + } + final String[] localVarAccepts = { "application/json" }; @@ -339,7 +343,7 @@ private okhttp3.Call linkPhysicalAddressCall(String repository, String branch, S } @SuppressWarnings("rawtypes") - private okhttp3.Call linkPhysicalAddressValidateBeforeCall(String repository, String branch, String path, StagingMetadata stagingMetadata, String ifNoneMatch, final ApiCallback _callback) throws ApiException { + private okhttp3.Call linkPhysicalAddressValidateBeforeCall(String repository, String branch, String path, StagingMetadata stagingMetadata, String ifNoneMatch, String ifMatch, final ApiCallback _callback) throws ApiException { // verify the required parameter 'repository' is set if (repository == null) { throw new ApiException("Missing the required parameter 'repository' when calling linkPhysicalAddress(Async)"); @@ -360,20 +364,20 @@ private okhttp3.Call linkPhysicalAddressValidateBeforeCall(String repository, St throw new ApiException("Missing the required parameter 'stagingMetadata' when calling linkPhysicalAddress(Async)"); } - return linkPhysicalAddressCall(repository, branch, path, stagingMetadata, ifNoneMatch, _callback); + return linkPhysicalAddressCall(repository, branch, path, stagingMetadata, ifNoneMatch, ifMatch, _callback); } - private ApiResponse linkPhysicalAddressWithHttpInfo(String repository, String branch, String path, StagingMetadata stagingMetadata, String ifNoneMatch) throws ApiException { - okhttp3.Call localVarCall = linkPhysicalAddressValidateBeforeCall(repository, branch, path, stagingMetadata, ifNoneMatch, null); + private ApiResponse linkPhysicalAddressWithHttpInfo(String repository, String branch, String path, StagingMetadata stagingMetadata, String ifNoneMatch, String ifMatch) throws ApiException { + okhttp3.Call localVarCall = linkPhysicalAddressValidateBeforeCall(repository, branch, path, stagingMetadata, ifNoneMatch, ifMatch, null); Type localVarReturnType = new TypeToken(){}.getType(); return localVarApiClient.execute(localVarCall, localVarReturnType); } - private okhttp3.Call linkPhysicalAddressAsync(String repository, String branch, String path, StagingMetadata stagingMetadata, String ifNoneMatch, final ApiCallback _callback) throws ApiException { + private okhttp3.Call linkPhysicalAddressAsync(String repository, String branch, String path, StagingMetadata stagingMetadata, String ifNoneMatch, String ifMatch, final ApiCallback _callback) throws ApiException { - okhttp3.Call localVarCall = linkPhysicalAddressValidateBeforeCall(repository, branch, path, stagingMetadata, ifNoneMatch, _callback); + okhttp3.Call localVarCall = linkPhysicalAddressValidateBeforeCall(repository, branch, path, stagingMetadata, ifNoneMatch, ifMatch, _callback); Type localVarReturnType = new TypeToken(){}.getType(); localVarApiClient.executeAsync(localVarCall, localVarReturnType, _callback); return localVarCall; @@ -385,6 +389,7 @@ public class APIlinkPhysicalAddressRequest { private final String path; private final StagingMetadata stagingMetadata; private String ifNoneMatch; + private String ifMatch; private APIlinkPhysicalAddressRequest(String repository, String branch, String path, StagingMetadata stagingMetadata) { this.repository = repository; @@ -403,6 +408,16 @@ public APIlinkPhysicalAddressRequest ifNoneMatch(String ifNoneMatch) { return this; } + /** + * Set ifMatch + * @param ifMatch Set to the object's ETag to atomically allow operations only if the object's current ETag matches the provided value. (optional) + * @return APIlinkPhysicalAddressRequest + */ + public APIlinkPhysicalAddressRequest ifMatch(String ifMatch) { + this.ifMatch = ifMatch; + return this; + } + /** * Build call for linkPhysicalAddress * @param _callback ApiCallback API callback @@ -419,11 +434,12 @@ public APIlinkPhysicalAddressRequest ifNoneMatch(String ifNoneMatch) { 409 conflict with a commit, try here - 412 Precondition Failed - 429 too many requests - + 501 Not Implemented - 0 Internal Server Error - */ public okhttp3.Call buildCall(final ApiCallback _callback) throws ApiException { - return linkPhysicalAddressCall(repository, branch, path, stagingMetadata, ifNoneMatch, _callback); + return linkPhysicalAddressCall(repository, branch, path, stagingMetadata, ifNoneMatch, ifMatch, _callback); } /** @@ -441,11 +457,12 @@ public okhttp3.Call buildCall(final ApiCallback _callback) throws ApiException { 409 conflict with a commit, try here - 412 Precondition Failed - 429 too many requests - + 501 Not Implemented - 0 Internal Server Error - */ public ObjectStats execute() throws ApiException { - ApiResponse localVarResp = linkPhysicalAddressWithHttpInfo(repository, branch, path, stagingMetadata, ifNoneMatch); + ApiResponse localVarResp = linkPhysicalAddressWithHttpInfo(repository, branch, path, stagingMetadata, ifNoneMatch, ifMatch); return localVarResp.getData(); } @@ -464,11 +481,12 @@ public ObjectStats execute() throws ApiException { 409 conflict with a commit, try here - 412 Precondition Failed - 429 too many requests - + 501 Not Implemented - 0 Internal Server Error - */ public ApiResponse executeWithHttpInfo() throws ApiException { - return linkPhysicalAddressWithHttpInfo(repository, branch, path, stagingMetadata, ifNoneMatch); + return linkPhysicalAddressWithHttpInfo(repository, branch, path, stagingMetadata, ifNoneMatch, ifMatch); } /** @@ -487,11 +505,12 @@ public ApiResponse executeWithHttpInfo() throws ApiException { 409 conflict with a commit, try here - 412 Precondition Failed - 429 too many requests - + 501 Not Implemented - 0 Internal Server Error - */ public okhttp3.Call executeAsync(final ApiCallback _callback) throws ApiException { - return linkPhysicalAddressAsync(repository, branch, path, stagingMetadata, ifNoneMatch, _callback); + return linkPhysicalAddressAsync(repository, branch, path, stagingMetadata, ifNoneMatch, ifMatch, _callback); } } @@ -514,6 +533,7 @@ public okhttp3.Call executeAsync(final ApiCallback _callback) throw 409 conflict with a commit, try here - 412 Precondition Failed - 429 too many requests - + 501 Not Implemented - 0 Internal Server Error - */ diff --git a/clients/java/src/test/java/io/lakefs/clients/sdk/ObjectsApiTest.java b/clients/java/src/test/java/io/lakefs/clients/sdk/ObjectsApiTest.java index d12b473ac78..70c1bf4ea4b 100644 --- a/clients/java/src/test/java/io/lakefs/clients/sdk/ObjectsApiTest.java +++ b/clients/java/src/test/java/io/lakefs/clients/sdk/ObjectsApiTest.java @@ -216,11 +216,13 @@ public void uploadObjectTest() throws ApiException { String branch = null; String path = null; String ifNoneMatch = null; + String ifMatch = null; String storageClass = null; Boolean force = null; File content = null; ObjectStats response = api.uploadObject(repository, branch, path) .ifNoneMatch(ifNoneMatch) + .ifMatch(ifMatch) .storageClass(storageClass) .force(force) .content(content) diff --git a/clients/java/src/test/java/io/lakefs/clients/sdk/StagingApiTest.java b/clients/java/src/test/java/io/lakefs/clients/sdk/StagingApiTest.java index 073a9336b88..b690e22013d 100644 --- a/clients/java/src/test/java/io/lakefs/clients/sdk/StagingApiTest.java +++ b/clients/java/src/test/java/io/lakefs/clients/sdk/StagingApiTest.java @@ -65,8 +65,10 @@ public void linkPhysicalAddressTest() throws ApiException { String path = null; StagingMetadata stagingMetadata = null; String ifNoneMatch = null; + String ifMatch = null; ObjectStats response = api.linkPhysicalAddress(repository, branch, path, stagingMetadata) .ifNoneMatch(ifNoneMatch) + .ifMatch(ifMatch) .execute(); // TODO: test validations } diff --git a/clients/python/docs/ObjectsApi.md b/clients/python/docs/ObjectsApi.md index 02034e34fd8..58b34860016 100644 --- a/clients/python/docs/ObjectsApi.md +++ b/clients/python/docs/ObjectsApi.md @@ -1087,7 +1087,7 @@ void (empty response body) [[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md) # **upload_object** -> ObjectStats upload_object(repository, branch, path, if_none_match=if_none_match, storage_class=storage_class, force=force, content=content) +> ObjectStats upload_object(repository, branch, path, if_none_match=if_none_match, if_match=if_match, storage_class=storage_class, force=force, content=content) @@ -1155,12 +1155,13 @@ with lakefs_sdk.ApiClient(configuration) as api_client: branch = 'branch_example' # str | path = 'path_example' # str | relative to the branch if_none_match = '*' # str | Set to \"*\" to atomically allow the upload only if the key has no object yet. Other values are not supported. (optional) + if_match = '2e9ec317e197e02e4264d128c2e7e681' # str | Set to the object's ETag to atomically allow operations only if the object's current ETag matches the provided value. (optional) storage_class = 'storage_class_example' # str | Deprecated, this capability will not be supported in future releases. (optional) force = False # bool | (optional) (default to False) content = None # bytearray | Only a single file per upload which must be named \\\"content\\\". (optional) try: - api_response = api_instance.upload_object(repository, branch, path, if_none_match=if_none_match, storage_class=storage_class, force=force, content=content) + api_response = api_instance.upload_object(repository, branch, path, if_none_match=if_none_match, if_match=if_match, storage_class=storage_class, force=force, content=content) print("The response of ObjectsApi->upload_object:\n") pprint(api_response) except Exception as e: @@ -1178,6 +1179,7 @@ Name | Type | Description | Notes **branch** | **str**| | **path** | **str**| relative to the branch | **if_none_match** | **str**| Set to \"*\" to atomically allow the upload only if the key has no object yet. Other values are not supported. | [optional] + **if_match** | **str**| Set to the object's ETag to atomically allow operations only if the object's current ETag matches the provided value. | [optional] **storage_class** | **str**| Deprecated, this capability will not be supported in future releases. | [optional] **force** | **bool**| | [optional] [default to False] **content** | **bytearray**| Only a single file per upload which must be named \\\"content\\\". | [optional] @@ -1206,6 +1208,7 @@ Name | Type | Description | Notes **404** | Resource Not Found | - | **412** | Precondition Failed | - | **429** | too many requests | - | +**501** | Not Implemented | - | **0** | Internal Server Error | - | [[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md) diff --git a/clients/python/docs/StagingApi.md b/clients/python/docs/StagingApi.md index 1660c28b255..a5f50bc740e 100644 --- a/clients/python/docs/StagingApi.md +++ b/clients/python/docs/StagingApi.md @@ -125,7 +125,7 @@ Name | Type | Description | Notes [[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md) # **link_physical_address** -> ObjectStats link_physical_address(repository, branch, path, staging_metadata, if_none_match=if_none_match) +> ObjectStats link_physical_address(repository, branch, path, staging_metadata, if_none_match=if_none_match, if_match=if_match) associate staging on this physical address with a path @@ -197,10 +197,11 @@ with lakefs_sdk.ApiClient(configuration) as api_client: path = 'path_example' # str | relative to the branch staging_metadata = lakefs_sdk.StagingMetadata() # StagingMetadata | if_none_match = '*' # str | Set to \"*\" to atomically allow the upload only if the key has no object yet. Other values are not supported. (optional) + if_match = '2e9ec317e197e02e4264d128c2e7e681' # str | Set to the object's ETag to atomically allow operations only if the object's current ETag matches the provided value. (optional) try: # associate staging on this physical address with a path - api_response = api_instance.link_physical_address(repository, branch, path, staging_metadata, if_none_match=if_none_match) + api_response = api_instance.link_physical_address(repository, branch, path, staging_metadata, if_none_match=if_none_match, if_match=if_match) print("The response of StagingApi->link_physical_address:\n") pprint(api_response) except Exception as e: @@ -219,6 +220,7 @@ Name | Type | Description | Notes **path** | **str**| relative to the branch | **staging_metadata** | [**StagingMetadata**](StagingMetadata.md)| | **if_none_match** | **str**| Set to \"*\" to atomically allow the upload only if the key has no object yet. Other values are not supported. | [optional] + **if_match** | **str**| Set to the object's ETag to atomically allow operations only if the object's current ETag matches the provided value. | [optional] ### Return type @@ -245,6 +247,7 @@ Name | Type | Description | Notes **409** | conflict with a commit, try here | - | **412** | Precondition Failed | - | **429** | too many requests | - | +**501** | Not Implemented | - | **0** | Internal Server Error | - | [[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md) diff --git a/clients/python/lakefs_sdk/api/objects_api.py b/clients/python/lakefs_sdk/api/objects_api.py index 6f9c7f35953..abe34274f46 100644 --- a/clients/python/lakefs_sdk/api/objects_api.py +++ b/clients/python/lakefs_sdk/api/objects_api.py @@ -1621,13 +1621,13 @@ def update_object_user_metadata_with_http_info(self, repository : StrictStr, bra _request_auth=_params.get('_request_auth')) @validate_arguments - def upload_object(self, repository : StrictStr, branch : StrictStr, path : Annotated[StrictStr, Field(..., description="relative to the branch")], if_none_match : Annotated[Optional[StrictStr], Field(description="Set to \"*\" to atomically allow the upload only if the key has no object yet. Other values are not supported.")] = None, storage_class : Annotated[Optional[StrictStr], Field(description="Deprecated, this capability will not be supported in future releases.")] = None, force : Optional[StrictBool] = None, content : Annotated[Optional[Union[StrictBytes, StrictStr]], Field(description="Only a single file per upload which must be named \\\"content\\\".")] = None, **kwargs) -> ObjectStats: # noqa: E501 + def upload_object(self, repository : StrictStr, branch : StrictStr, path : Annotated[StrictStr, Field(..., description="relative to the branch")], if_none_match : Annotated[Optional[StrictStr], Field(description="Set to \"*\" to atomically allow the upload only if the key has no object yet. Other values are not supported.")] = None, if_match : Annotated[Optional[StrictStr], Field(description="Set to the object's ETag to atomically allow operations only if the object's current ETag matches the provided value.")] = None, storage_class : Annotated[Optional[StrictStr], Field(description="Deprecated, this capability will not be supported in future releases.")] = None, force : Optional[StrictBool] = None, content : Annotated[Optional[Union[StrictBytes, StrictStr]], Field(description="Only a single file per upload which must be named \\\"content\\\".")] = None, **kwargs) -> ObjectStats: # noqa: E501 """upload_object # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True - >>> thread = api.upload_object(repository, branch, path, if_none_match, storage_class, force, content, async_req=True) + >>> thread = api.upload_object(repository, branch, path, if_none_match, if_match, storage_class, force, content, async_req=True) >>> result = thread.get() :param repository: (required) @@ -1638,6 +1638,8 @@ def upload_object(self, repository : StrictStr, branch : StrictStr, path : Annot :type path: str :param if_none_match: Set to \"*\" to atomically allow the upload only if the key has no object yet. Other values are not supported. :type if_none_match: str + :param if_match: Set to the object's ETag to atomically allow operations only if the object's current ETag matches the provided value. + :type if_match: str :param storage_class: Deprecated, this capability will not be supported in future releases. :type storage_class: str :param force: @@ -1659,16 +1661,16 @@ def upload_object(self, repository : StrictStr, branch : StrictStr, path : Annot if '_preload_content' in kwargs: message = "Error! Please call the upload_object_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501 raise ValueError(message) - return self.upload_object_with_http_info(repository, branch, path, if_none_match, storage_class, force, content, **kwargs) # noqa: E501 + return self.upload_object_with_http_info(repository, branch, path, if_none_match, if_match, storage_class, force, content, **kwargs) # noqa: E501 @validate_arguments - def upload_object_with_http_info(self, repository : StrictStr, branch : StrictStr, path : Annotated[StrictStr, Field(..., description="relative to the branch")], if_none_match : Annotated[Optional[StrictStr], Field(description="Set to \"*\" to atomically allow the upload only if the key has no object yet. Other values are not supported.")] = None, storage_class : Annotated[Optional[StrictStr], Field(description="Deprecated, this capability will not be supported in future releases.")] = None, force : Optional[StrictBool] = None, content : Annotated[Optional[Union[StrictBytes, StrictStr]], Field(description="Only a single file per upload which must be named \\\"content\\\".")] = None, **kwargs) -> ApiResponse: # noqa: E501 + def upload_object_with_http_info(self, repository : StrictStr, branch : StrictStr, path : Annotated[StrictStr, Field(..., description="relative to the branch")], if_none_match : Annotated[Optional[StrictStr], Field(description="Set to \"*\" to atomically allow the upload only if the key has no object yet. Other values are not supported.")] = None, if_match : Annotated[Optional[StrictStr], Field(description="Set to the object's ETag to atomically allow operations only if the object's current ETag matches the provided value.")] = None, storage_class : Annotated[Optional[StrictStr], Field(description="Deprecated, this capability will not be supported in future releases.")] = None, force : Optional[StrictBool] = None, content : Annotated[Optional[Union[StrictBytes, StrictStr]], Field(description="Only a single file per upload which must be named \\\"content\\\".")] = None, **kwargs) -> ApiResponse: # noqa: E501 """upload_object # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True - >>> thread = api.upload_object_with_http_info(repository, branch, path, if_none_match, storage_class, force, content, async_req=True) + >>> thread = api.upload_object_with_http_info(repository, branch, path, if_none_match, if_match, storage_class, force, content, async_req=True) >>> result = thread.get() :param repository: (required) @@ -1679,6 +1681,8 @@ def upload_object_with_http_info(self, repository : StrictStr, branch : StrictSt :type path: str :param if_none_match: Set to \"*\" to atomically allow the upload only if the key has no object yet. Other values are not supported. :type if_none_match: str + :param if_match: Set to the object's ETag to atomically allow operations only if the object's current ETag matches the provided value. + :type if_match: str :param storage_class: Deprecated, this capability will not be supported in future releases. :type storage_class: str :param force: @@ -1717,6 +1721,7 @@ def upload_object_with_http_info(self, repository : StrictStr, branch : StrictSt 'branch', 'path', 'if_none_match', + 'if_match', 'storage_class', 'force', 'content' @@ -1770,6 +1775,9 @@ def upload_object_with_http_info(self, repository : StrictStr, branch : StrictSt if _params['if_none_match']: _header_params['If-None-Match'] = _params['if_none_match'] + if _params['if_match']: + _header_params['If-Match'] = _params['if_match'] + # process the form parameters _form_params = [] _files = {} @@ -1800,6 +1808,7 @@ def upload_object_with_http_info(self, repository : StrictStr, branch : StrictSt '404': "Error", '412': "Error", '429': None, + '501': "Error", } return self.api_client.call_api( diff --git a/clients/python/lakefs_sdk/api/staging_api.py b/clients/python/lakefs_sdk/api/staging_api.py index 0039821848d..d3e9eca5545 100644 --- a/clients/python/lakefs_sdk/api/staging_api.py +++ b/clients/python/lakefs_sdk/api/staging_api.py @@ -220,14 +220,14 @@ def get_physical_address_with_http_info(self, repository : StrictStr, branch : S _request_auth=_params.get('_request_auth')) @validate_arguments - def link_physical_address(self, repository : StrictStr, branch : StrictStr, path : Annotated[StrictStr, Field(..., description="relative to the branch")], staging_metadata : StagingMetadata, if_none_match : Annotated[Optional[StrictStr], Field(description="Set to \"*\" to atomically allow the upload only if the key has no object yet. Other values are not supported.")] = None, **kwargs) -> ObjectStats: # noqa: E501 + def link_physical_address(self, repository : StrictStr, branch : StrictStr, path : Annotated[StrictStr, Field(..., description="relative to the branch")], staging_metadata : StagingMetadata, if_none_match : Annotated[Optional[StrictStr], Field(description="Set to \"*\" to atomically allow the upload only if the key has no object yet. Other values are not supported.")] = None, if_match : Annotated[Optional[StrictStr], Field(description="Set to the object's ETag to atomically allow operations only if the object's current ETag matches the provided value.")] = None, **kwargs) -> ObjectStats: # noqa: E501 """associate staging on this physical address with a path # noqa: E501 Link the physical address with the path in lakeFS, creating an uncommitted change. The given address can be one generated by getPhysicalAddress, or an address outside the repository's storage namespace. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True - >>> thread = api.link_physical_address(repository, branch, path, staging_metadata, if_none_match, async_req=True) + >>> thread = api.link_physical_address(repository, branch, path, staging_metadata, if_none_match, if_match, async_req=True) >>> result = thread.get() :param repository: (required) @@ -240,6 +240,8 @@ def link_physical_address(self, repository : StrictStr, branch : StrictStr, path :type staging_metadata: StagingMetadata :param if_none_match: Set to \"*\" to atomically allow the upload only if the key has no object yet. Other values are not supported. :type if_none_match: str + :param if_match: Set to the object's ETag to atomically allow operations only if the object's current ETag matches the provided value. + :type if_match: str :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _request_timeout: timeout setting for this request. @@ -255,17 +257,17 @@ def link_physical_address(self, repository : StrictStr, branch : StrictStr, path if '_preload_content' in kwargs: message = "Error! Please call the link_physical_address_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501 raise ValueError(message) - return self.link_physical_address_with_http_info(repository, branch, path, staging_metadata, if_none_match, **kwargs) # noqa: E501 + return self.link_physical_address_with_http_info(repository, branch, path, staging_metadata, if_none_match, if_match, **kwargs) # noqa: E501 @validate_arguments - def link_physical_address_with_http_info(self, repository : StrictStr, branch : StrictStr, path : Annotated[StrictStr, Field(..., description="relative to the branch")], staging_metadata : StagingMetadata, if_none_match : Annotated[Optional[StrictStr], Field(description="Set to \"*\" to atomically allow the upload only if the key has no object yet. Other values are not supported.")] = None, **kwargs) -> ApiResponse: # noqa: E501 + def link_physical_address_with_http_info(self, repository : StrictStr, branch : StrictStr, path : Annotated[StrictStr, Field(..., description="relative to the branch")], staging_metadata : StagingMetadata, if_none_match : Annotated[Optional[StrictStr], Field(description="Set to \"*\" to atomically allow the upload only if the key has no object yet. Other values are not supported.")] = None, if_match : Annotated[Optional[StrictStr], Field(description="Set to the object's ETag to atomically allow operations only if the object's current ETag matches the provided value.")] = None, **kwargs) -> ApiResponse: # noqa: E501 """associate staging on this physical address with a path # noqa: E501 Link the physical address with the path in lakeFS, creating an uncommitted change. The given address can be one generated by getPhysicalAddress, or an address outside the repository's storage namespace. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True - >>> thread = api.link_physical_address_with_http_info(repository, branch, path, staging_metadata, if_none_match, async_req=True) + >>> thread = api.link_physical_address_with_http_info(repository, branch, path, staging_metadata, if_none_match, if_match, async_req=True) >>> result = thread.get() :param repository: (required) @@ -278,6 +280,8 @@ def link_physical_address_with_http_info(self, repository : StrictStr, branch : :type staging_metadata: StagingMetadata :param if_none_match: Set to \"*\" to atomically allow the upload only if the key has no object yet. Other values are not supported. :type if_none_match: str + :param if_match: Set to the object's ETag to atomically allow operations only if the object's current ETag matches the provided value. + :type if_match: str :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _preload_content: if False, the ApiResponse.data will @@ -310,7 +314,8 @@ def link_physical_address_with_http_info(self, repository : StrictStr, branch : 'branch', 'path', 'staging_metadata', - 'if_none_match' + 'if_none_match', + 'if_match' ] _all_params.extend( [ @@ -355,6 +360,9 @@ def link_physical_address_with_http_info(self, repository : StrictStr, branch : if _params['if_none_match']: _header_params['If-None-Match'] = _params['if_none_match'] + if _params['if_match']: + _header_params['If-Match'] = _params['if_match'] + # process the form parameters _form_params = [] _files = {} @@ -386,6 +394,7 @@ def link_physical_address_with_http_info(self, repository : StrictStr, branch : '409': "StagingLocation", '412': "Error", '429': None, + '501': "Error", } return self.api_client.call_api( diff --git a/clients/rust/docs/ObjectsApi.md b/clients/rust/docs/ObjectsApi.md index 05336e7031b..3b67e7313b2 100644 --- a/clients/rust/docs/ObjectsApi.md +++ b/clients/rust/docs/ObjectsApi.md @@ -306,7 +306,7 @@ Name | Type | Description | Required | Notes ## upload_object -> models::ObjectStats upload_object(repository, branch, path, if_none_match, storage_class, force, content) +> models::ObjectStats upload_object(repository, branch, path, if_none_match, if_match, storage_class, force, content) ### Parameters @@ -318,6 +318,7 @@ Name | Type | Description | Required | Notes **branch** | **String** | | [required] | **path** | **String** | relative to the branch | [required] | **if_none_match** | Option<**String**> | Set to \"*\" to atomically allow the upload only if the key has no object yet. Other values are not supported. | | +**if_match** | Option<**String**> | Set to the object's ETag to atomically allow operations only if the object's current ETag matches the provided value. | | **storage_class** | Option<**String**> | Deprecated, this capability will not be supported in future releases. | | **force** | Option<**bool**> | | |[default to false] **content** | Option<**std::path::PathBuf**> | Only a single file per upload which must be named \\\"content\\\". | | diff --git a/clients/rust/docs/StagingApi.md b/clients/rust/docs/StagingApi.md index f92f6d2e05e..77c4c988ac1 100644 --- a/clients/rust/docs/StagingApi.md +++ b/clients/rust/docs/StagingApi.md @@ -42,7 +42,7 @@ Name | Type | Description | Required | Notes ## link_physical_address -> models::ObjectStats link_physical_address(repository, branch, path, staging_metadata, if_none_match) +> models::ObjectStats link_physical_address(repository, branch, path, staging_metadata, if_none_match, if_match) associate staging on this physical address with a path Link the physical address with the path in lakeFS, creating an uncommitted change. The given address can be one generated by getPhysicalAddress, or an address outside the repository's storage namespace. @@ -57,6 +57,7 @@ Name | Type | Description | Required | Notes **path** | **String** | relative to the branch | [required] | **staging_metadata** | [**StagingMetadata**](StagingMetadata.md) | | [required] | **if_none_match** | Option<**String**> | Set to \"*\" to atomically allow the upload only if the key has no object yet. Other values are not supported. | | +**if_match** | Option<**String**> | Set to the object's ETag to atomically allow operations only if the object's current ETag matches the provided value. | | ### Return type diff --git a/clients/rust/src/apis/objects_api.rs b/clients/rust/src/apis/objects_api.rs index 9a0738cc06e..61488c23edc 100644 --- a/clients/rust/src/apis/objects_api.rs +++ b/clients/rust/src/apis/objects_api.rs @@ -137,6 +137,7 @@ pub enum UploadObjectError { Status404(models::Error), Status412(models::Error), Status429(), + Status501(models::Error), DefaultResponse(models::Error), UnknownValue(serde_json::Value), } @@ -497,7 +498,7 @@ pub async fn update_object_user_metadata(configuration: &configuration::Configur } } -pub async fn upload_object(configuration: &configuration::Configuration, repository: &str, branch: &str, path: &str, if_none_match: Option<&str>, storage_class: Option<&str>, force: Option, content: Option) -> Result> { +pub async fn upload_object(configuration: &configuration::Configuration, repository: &str, branch: &str, path: &str, if_none_match: Option<&str>, if_match: Option<&str>, storage_class: Option<&str>, force: Option, content: Option) -> Result> { let local_var_configuration = configuration; let local_var_client = &local_var_configuration.client; @@ -518,6 +519,9 @@ pub async fn upload_object(configuration: &configuration::Configuration, reposit if let Some(local_var_param_value) = if_none_match { local_var_req_builder = local_var_req_builder.header("If-None-Match", local_var_param_value.to_string()); } + if let Some(local_var_param_value) = if_match { + local_var_req_builder = local_var_req_builder.header("If-Match", local_var_param_value.to_string()); + } if let Some(ref local_var_auth_conf) = local_var_configuration.basic_auth { local_var_req_builder = local_var_req_builder.basic_auth(local_var_auth_conf.0.to_owned(), local_var_auth_conf.1.to_owned()); }; diff --git a/clients/rust/src/apis/staging_api.rs b/clients/rust/src/apis/staging_api.rs index ea55d721b4d..7780afb0d1c 100644 --- a/clients/rust/src/apis/staging_api.rs +++ b/clients/rust/src/apis/staging_api.rs @@ -37,6 +37,7 @@ pub enum LinkPhysicalAddressError { Status409(models::StagingLocation), Status412(models::Error), Status429(), + Status501(models::Error), DefaultResponse(models::Error), UnknownValue(serde_json::Value), } @@ -80,7 +81,7 @@ pub async fn get_physical_address(configuration: &configuration::Configuration, } /// Link the physical address with the path in lakeFS, creating an uncommitted change. The given address can be one generated by getPhysicalAddress, or an address outside the repository's storage namespace. -pub async fn link_physical_address(configuration: &configuration::Configuration, repository: &str, branch: &str, path: &str, staging_metadata: models::StagingMetadata, if_none_match: Option<&str>) -> Result> { +pub async fn link_physical_address(configuration: &configuration::Configuration, repository: &str, branch: &str, path: &str, staging_metadata: models::StagingMetadata, if_none_match: Option<&str>, if_match: Option<&str>) -> Result> { let local_var_configuration = configuration; let local_var_client = &local_var_configuration.client; @@ -95,6 +96,9 @@ pub async fn link_physical_address(configuration: &configuration::Configuration, if let Some(local_var_param_value) = if_none_match { local_var_req_builder = local_var_req_builder.header("If-None-Match", local_var_param_value.to_string()); } + if let Some(local_var_param_value) = if_match { + local_var_req_builder = local_var_req_builder.header("If-Match", local_var_param_value.to_string()); + } if let Some(ref local_var_auth_conf) = local_var_configuration.basic_auth { local_var_req_builder = local_var_req_builder.basic_auth(local_var_auth_conf.0.to_owned(), local_var_auth_conf.1.to_owned()); }; diff --git a/modules/api/factory/build.go b/modules/api/factory/build.go index 305c0f3d9cd..a71817f171e 100644 --- a/modules/api/factory/build.go +++ b/modules/api/factory/build.go @@ -2,13 +2,16 @@ package factory import ( "context" + "fmt" "net/http" "github.com/go-chi/chi/v5" "github.com/treeverse/lakefs/pkg/auth" "github.com/treeverse/lakefs/pkg/authentication" "github.com/treeverse/lakefs/pkg/block" + "github.com/treeverse/lakefs/pkg/catalog" "github.com/treeverse/lakefs/pkg/config" + "github.com/treeverse/lakefs/pkg/graveler" "github.com/treeverse/lakefs/pkg/license" "github.com/treeverse/lakefs/pkg/logging" "github.com/treeverse/lakefs/pkg/stats" @@ -38,3 +41,28 @@ func RegisterServices(ctx context.Context, sd ServiceDependencies, router *chi.M func NotImplementedIcebergCatalogHandler(w http.ResponseWriter, r *http.Request) { http.Error(w, "Iceberg REST Catalog Not Implemented", http.StatusNotImplemented) } + +// BuildConditionFromParams creates a graveler.ConditionFunc from upload params. +// Returns nil if no precondition is specified in the params. +// Handles IfNoneMatch (must be "*") and IfMatch (ETag validation). +func BuildConditionFromParams(ifMatch, ifNoneMatch *string) (*graveler.ConditionFunc, error) { + var condition graveler.ConditionFunc + switch { + case ifMatch != nil && ifNoneMatch != nil: + return nil, fmt.Errorf("cannot specify both If-Match and If-None-Match: %w", catalog.ErrNotImplemented) + case ifMatch != nil: + // Handle IfMatch: not supported + return nil, catalog.ErrNotImplemented + case ifNoneMatch != nil && *ifNoneMatch != "*": + // If-None-Match only supports "*" + return nil, fmt.Errorf("If-None-Match only supports '*': %w", catalog.ErrNotImplemented) + case ifNoneMatch != nil: + condition = func(currentValue *graveler.Value) error { + if currentValue != nil { + return graveler.ErrPreconditionFailed + } + return nil + } + } + return &condition, nil +} diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 3bb83e2b82d..61b0dacc7e5 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -25,6 +25,7 @@ import ( "github.com/go-openapi/swag" "github.com/gorilla/sessions" authacl "github.com/treeverse/lakefs/contrib/auth/acl" + apifactory "github.com/treeverse/lakefs/modules/api/factory" "github.com/treeverse/lakefs/pkg/actions" "github.com/treeverse/lakefs/pkg/api/apigen" "github.com/treeverse/lakefs/pkg/api/apiutil" @@ -936,15 +937,6 @@ func (c *Controller) LinkPhysicalAddress(w http.ResponseWriter, r *http.Request, return } - ifAbsent := false - if params.IfNoneMatch != nil { - if swag.StringValue((*string)(params.IfNoneMatch)) != "*" { - writeError(w, r, http.StatusBadRequest, "Unsupported value for If-None-Match - Only \"*\" is supported") - return - } - ifAbsent = true - } - storage := c.Config.StorageConfig().GetStorageByID(repo.StorageID) if storage == nil { c.handleAPIError(ctx, w, r, fmt.Errorf("no storage config found for id: %s: %w", repo.StorageID, block.ErrInvalidAddress)) @@ -998,8 +990,18 @@ func (c *Controller) LinkPhysicalAddress(w http.ResponseWriter, r *http.Request, if body.UserMetadata != nil { entryBuilder.Metadata(body.UserMetadata.AdditionalProperties) } + condition, err := apifactory.BuildConditionFromParams((*string)(params.IfMatch), (*string)(params.IfNoneMatch)) + if c.handleAPIError(ctx, w, r, err) { + return + } + opts := []graveler.SetOptionsFunc{ + graveler.WithForce(swag.BoolValue(body.Force)), + } + if condition != nil { + opts = append(opts, graveler.WithCondition(*condition)) + } entry := entryBuilder.Build() - err = c.Catalog.CreateEntry(ctx, repo.Name, branch, entry, graveler.WithForce(swag.BoolValue(body.Force)), graveler.WithIfAbsent(ifAbsent)) + err = c.Catalog.CreateEntry(ctx, repo.Name, branch, entry, opts...) if c.handleAPIError(ctx, w, r, err) { return } @@ -3019,7 +3021,8 @@ func (c *Controller) handleAPIErrorCallback(ctx context.Context, w http.Response cb(w, r, http.StatusPreconditionFailed, "Precondition failed") case errors.Is(err, authentication.ErrNotImplemented), errors.Is(err, auth.ErrNotImplemented), - errors.Is(err, license.ErrNotImplemented): + errors.Is(err, license.ErrNotImplemented), + errors.Is(err, catalog.ErrNotImplemented): cb(w, r, http.StatusNotImplemented, "Not implemented") case errors.Is(err, authentication.ErrInsufficientPermissions): c.Logger.WithContext(ctx).WithError(err).Info("User verification failed - insufficient permissions") @@ -3432,16 +3435,7 @@ func (c *Controller) UploadObject(w http.ResponseWriter, r *http.Request, reposi return } - // before writing body, ensure preconditions - this means we essentially check for object existence twice: - // once before uploading the body to save resources and time, - // and then graveler will check again when passed a SetOptions. - allowOverwrite := true - if params.IfNoneMatch != nil { - if swag.StringValue((*string)(params.IfNoneMatch)) != "*" { - writeError(w, r, http.StatusBadRequest, "Unsupported value for If-None-Match - Only \"*\" is supported") - return - } - // check if exists + if params.IfNoneMatch != nil && *params.IfNoneMatch == "*" { _, err := c.Catalog.GetEntry(ctx, repo.Name, branch, params.Path, catalog.GetEntryParams{}) if err == nil { writeError(w, r, http.StatusPreconditionFailed, "path already exists") @@ -3451,7 +3445,15 @@ func (c *Controller) UploadObject(w http.ResponseWriter, r *http.Request, reposi writeError(w, r, http.StatusInternalServerError, err) return } - allowOverwrite = false + } + var setOpts []graveler.SetOptionsFunc + // Handle preconditions + condition, err := apifactory.BuildConditionFromParams((*string)(params.IfMatch), (*string)(params.IfNoneMatch)) + if c.handleAPIError(ctx, w, r, err) { + return + } + if condition != nil { + setOpts = append(setOpts, graveler.WithCondition(*condition)) } // read request body parse multipart for "content" and upload the data @@ -3542,7 +3544,8 @@ func (c *Controller) UploadObject(w http.ResponseWriter, r *http.Request, reposi } entry := entryBuilder.Build() - err = c.Catalog.CreateEntry(ctx, repo.Name, branch, entry, graveler.WithIfAbsent(!allowOverwrite), graveler.WithForce(swag.BoolValue(params.Force))) + setOpts = append(setOpts, graveler.WithForce(swag.BoolValue(params.Force))) + err = c.Catalog.CreateEntry(ctx, repo.Name, branch, entry, setOpts...) if errors.Is(err, graveler.ErrPreconditionFailed) { writeError(w, r, http.StatusPreconditionFailed, "path already exists") return diff --git a/pkg/api/controller_test.go b/pkg/api/controller_test.go index 024c26e39f1..885d8f203d8 100644 --- a/pkg/api/controller_test.go +++ b/pkg/api/controller_test.go @@ -2216,6 +2216,41 @@ func TestController_UploadObjectHandler(t *testing.T) { } }) + t.Run("disable overwrite with if-none-match (tombstone)", func(t *testing.T) { + _, err := deps.catalog.CreateBranch(ctx, "my-new-repo", "tombstone-branch", "main") + testutil.Must(t, err) + + // write first + contentType, buf := writeMultipart("content", "baz5", "hello world!") + b, err := clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "tombstone-branch", &apigen.UploadObjectParams{ + Path: "foo/baz5", + }, contentType, buf) + testutil.Must(t, err) + if b.StatusCode() != http.StatusCreated { + t.Fatalf("expected 201 for UploadObject, got %d", b.StatusCode()) + } + + // commit + _, err = deps.catalog.Commit(ctx, "my-new-repo", "tombstone-branch", "commit object", "user1", nil, nil, nil, false) + testutil.Must(t, err) + + // delete the object in staging (creating a tombstone) + err = deps.catalog.DeleteEntry(ctx, "my-new-repo", "tombstone-branch", "foo/baz5") + testutil.Must(t, err) + + // try to upload with if-none-match - should succeed since object is deleted (tombstone) + ifNoneMatch := apigen.IfNoneMatch("*") + contentType, buf = writeMultipart("content", "baz5", "new content!") + resp, err := clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "tombstone-branch", &apigen.UploadObjectParams{ + Path: "foo/baz5", + IfNoneMatch: &ifNoneMatch, + }, contentType, buf) + testutil.Must(t, err) + if resp.StatusCode() != http.StatusCreated { + t.Fatalf("expected 201 for UploadObject with tombstone, got %d", resp.StatusCode()) + } + }) + t.Run("disable overwrite with if-none-match (no entry)", func(t *testing.T) { ifNoneMatch := apigen.IfNoneMatch("*") contentType, buf := writeMultipart("content", "baz4", "something else!") @@ -2241,8 +2276,8 @@ func TestController_UploadObjectHandler(t *testing.T) { if err != nil { t.Fatalf("UploadObject err=%s, expected no error", err) } - if resp.JSON400 == nil { - t.Fatalf("UploadObject status code=%d, expected 400", resp.StatusCode()) + if resp.JSON501 == nil { + t.Fatalf("UploadObject status code=%d, expected 501", resp.StatusCode()) } }) diff --git a/pkg/catalog/catalog.go b/pkg/catalog/catalog.go index 1918d951c46..b8c5b372cbb 100644 --- a/pkg/catalog/catalog.go +++ b/pkg/catalog/catalog.go @@ -1149,6 +1149,24 @@ func addressTypeToCatalog(t Entry_AddressType) AddressType { } } +// EntryCondition adapts an Entry-level condition function to a graveler.ConditionFunc. +// It converts graveler Values to Entries before applying the condition, enabling Entry-based +// validation logic (e.g., Object metadata checks) to work with graveler's conditional operations. +func EntryCondition(condition func(*Entry) error) graveler.ConditionFunc { + return func(currentValue *graveler.Value) error { + if currentValue == nil { + return condition(nil) + } + + currentEntry, err := ValueToEntry(currentValue) + if err != nil { + return err + } + + return condition(currentEntry) + } +} + func (c *Catalog) CreateEntry(ctx context.Context, repositoryID string, branch string, entry DBEntry, opts ...graveler.SetOptionsFunc) error { branchID := graveler.BranchID(branch) ent := newEntryFromCatalogEntry(entry) diff --git a/pkg/catalog/catalog_test.go b/pkg/catalog/catalog_test.go index 40a69f59122..975ef324951 100644 --- a/pkg/catalog/catalog_test.go +++ b/pkg/catalog/catalog_test.go @@ -3,6 +3,7 @@ package catalog_test import ( "bytes" "context" + "errors" "fmt" "io" "net/url" @@ -942,3 +943,135 @@ func readPhysicalAddressesFromParquetObject(t *testing.T, repositoryID string, c } return records } + +func TestEntryCondition(t *testing.T) { + // Helper to create a graveler.Value from an Entry + createValueFromEntry := func(entry *catalog.Entry) *graveler.Value { + value, err := catalog.EntryToValue(entry) + require.NoError(t, err) + return value + } + + tests := []struct { + name string + entry *catalog.Entry + conditionFunc func(*catalog.Entry) error + expectedErr error + }{ + { + name: "condition passes with valid entry", + entry: &catalog.Entry{ + Address: "s3://bucket/key", + Size: 100, + ETag: "abc123", + }, + conditionFunc: func(e *catalog.Entry) error { + if e != nil && e.Size == 100 { + return nil + } + return graveler.ErrPreconditionFailed + }, + expectedErr: nil, + }, + { + name: "condition fails validation", + entry: &catalog.Entry{ + Address: "s3://bucket/key", + Size: 100, + ETag: "abc123", + }, + conditionFunc: func(e *catalog.Entry) error { + if e != nil && e.Size == 200 { + return nil + } + return graveler.ErrPreconditionFailed + }, + expectedErr: graveler.ErrPreconditionFailed, + }, + { + name: "condition with nil entry - expect nil", + entry: nil, + conditionFunc: func(e *catalog.Entry) error { + if e == nil { + return nil + } + return graveler.ErrPreconditionFailed + }, + expectedErr: nil, + }, + { + name: "condition with nil entry - expect non-nil", + entry: nil, + conditionFunc: func(e *catalog.Entry) error { + if e != nil { + return nil + } + return graveler.ErrPreconditionFailed + }, + expectedErr: graveler.ErrPreconditionFailed, + }, + { + name: "condition validates etag", + entry: &catalog.Entry{ + Address: "s3://bucket/key", + Size: 100, + ETag: "expected-etag", + }, + conditionFunc: func(e *catalog.Entry) error { + if e == nil { + return graveler.ErrPreconditionFailed + } + if e.ETag != "expected-etag" { + return graveler.ErrPreconditionFailed + } + return nil + }, + expectedErr: nil, + }, + { + name: "condition validates etag mismatch", + entry: &catalog.Entry{ + Address: "s3://bucket/key", + Size: 100, + ETag: "actual-etag", + }, + conditionFunc: func(e *catalog.Entry) error { + if e == nil { + return graveler.ErrPreconditionFailed + } + if e.ETag != "expected-etag" { + return graveler.ErrPreconditionFailed + } + return nil + }, + expectedErr: graveler.ErrPreconditionFailed, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create the ConditionFunc using WithEntryCondition + conditionFunc := catalog.EntryCondition(tt.conditionFunc) + + // Apply it to SetOptions using graveler.WithCondition + opts := &graveler.SetOptions{} + graveler.WithCondition(conditionFunc)(opts) + + // Verify condition was set + require.NotNil(t, opts.Condition) + + // Convert entry to value (or nil) + var currentValue *graveler.Value + if tt.entry != nil { + currentValue = createValueFromEntry(tt.entry) + } + + // Execute the condition + err := opts.Condition(currentValue) + // Verify the result + if !errors.Is(err, tt.expectedErr) { + t.Errorf("WithEntryCondition() error = %v, expectedErr %v", err, tt.expectedErr) + } + }) + } +} diff --git a/pkg/catalog/errors.go b/pkg/catalog/errors.go index e8deeec49de..f032ae085c5 100644 --- a/pkg/catalog/errors.go +++ b/pkg/catalog/errors.go @@ -17,6 +17,6 @@ var ( // ErrItClosed is used to determine the reason for the end of the walk ErrItClosed = errors.New("iterator closed") - ErrFeatureNotSupported = errors.New("feature not supported") - ErrNonEmptyRepository = errors.New("non empty repository") + ErrNotImplemented = errors.New("functionality not implemented") + ErrNonEmptyRepository = errors.New("non empty repository") ) diff --git a/pkg/catalog/walk_entry_iterator.go b/pkg/catalog/walk_entry_iterator.go index 2ddeb9bf0ed..54dbbe068f9 100644 --- a/pkg/catalog/walk_entry_iterator.go +++ b/pkg/catalog/walk_entry_iterator.go @@ -108,7 +108,7 @@ func (it *walkEntryIterator) Next() bool { } func (it *walkEntryIterator) SeekGE(Path) { - it.err.Store(ErrFeatureNotSupported) + panic("Not supported") } func (it *walkEntryIterator) Value() *EntryRecord { diff --git a/pkg/gateway/operations/operation_utils.go b/pkg/gateway/operations/operation_utils.go index 93d8e700db0..aaef68a6bd9 100644 --- a/pkg/gateway/operations/operation_utils.go +++ b/pkg/gateway/operations/operation_utils.go @@ -41,7 +41,7 @@ func shouldReplaceMetadata(req *http.Request) bool { return req.Header.Get(amzMetadataDirectiveHeaderPrefix) == "REPLACE" } -func (o *PathOperation) finishUpload(req *http.Request, mTime *time.Time, checksum, physicalAddress string, size int64, relative bool, metadata map[string]string, contentType string, allowOverwrite bool) error { +func (o *PathOperation) finishUpload(req *http.Request, mTime *time.Time, checksum, physicalAddress string, size int64, relative bool, metadata map[string]string, contentType string, opts ...graveler.SetOptionsFunc) error { var writeTime time.Time if mTime == nil { writeTime = time.Now() @@ -60,7 +60,7 @@ func (o *PathOperation) finishUpload(req *http.Request, mTime *time.Time, checks ContentType(contentType). Build() - err := o.Catalog.CreateEntry(req.Context(), o.Repository.Name, o.Reference, entry, graveler.WithIfAbsent(!allowOverwrite)) + err := o.Catalog.CreateEntry(req.Context(), o.Repository.Name, o.Reference, entry, opts...) if err != nil { o.Log(req).WithError(err).Error("could not update metadata") return err diff --git a/pkg/gateway/operations/postobject.go b/pkg/gateway/operations/postobject.go index 07038a4d4fc..709124e7896 100644 --- a/pkg/gateway/operations/postobject.go +++ b/pkg/gateway/operations/postobject.go @@ -9,6 +9,7 @@ import ( "strings" "time" + apifactory "github.com/treeverse/lakefs/modules/api/factory" "github.com/treeverse/lakefs/pkg/block" "github.com/treeverse/lakefs/pkg/catalog" gatewayErrors "github.com/treeverse/lakefs/pkg/gateway/errors" @@ -97,15 +98,11 @@ func (controller *PostObject) HandleCompleteMultipartUpload(w http.ResponseWrite return } // check and validate whether if-none-match header provided - allowOverwrite, err := o.checkIfAbsent(req) - if err != nil { - _ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrNotImplemented)) - return - } + ifNoneMatch := o.ifNoneMatchHeader(req) // before writing body, ensure preconditions - this means we essentially check for object existence twice: // once here, before uploading the body to save resources and time, // and then graveler will check again when passed a SetOptions. - if !allowOverwrite { + if ifNoneMatch != nil && *ifNoneMatch == "*" { _, err := o.Catalog.GetEntry(req.Context(), o.Repository.Name, o.Reference, o.Path, catalog.GetEntryParams{}) if err == nil { // In case object exists in catalog, no error returns @@ -113,6 +110,22 @@ func (controller *PostObject) HandleCompleteMultipartUpload(w http.ResponseWrite return } } + ifMatch := o.ifMatchHeader(req) + condition, err := apifactory.BuildConditionFromParams(ifMatch, ifNoneMatch) + if err != nil { + if errors.Is(err, graveler.ErrInvalidValue) { + _ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrBadRequest)) + return + } + if errors.Is(err, catalog.ErrNotImplemented) { + _ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrNotImplemented)) + return + } + _ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrInternalError)) + return + } + opts := graveler.WithCondition(*condition) + objName := multiPart.PhysicalAddress req = req.WithContext(logging.AddFields(req.Context(), logging.Fields{logging.PhysicalAddressFieldKey: objName})) xmlMultipartComplete, err := io.ReadAll(req.Body) @@ -144,7 +157,7 @@ func (controller *PostObject) HandleCompleteMultipartUpload(w http.ResponseWrite return } checksum := strings.Split(resp.ETag, "-")[0] - err = o.finishUpload(req, resp.MTime, checksum, objName, resp.ContentLength, true, multiPart.Metadata, multiPart.ContentType, allowOverwrite) + err = o.finishUpload(req, resp.MTime, checksum, objName, resp.ContentLength, true, multiPart.Metadata, multiPart.ContentType, opts) if errors.Is(err, graveler.ErrPreconditionFailed) { _ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrPreconditionFailed)) return diff --git a/pkg/gateway/operations/putobject.go b/pkg/gateway/operations/putobject.go index 57c580db821..79e430785ac 100644 --- a/pkg/gateway/operations/putobject.go +++ b/pkg/gateway/operations/putobject.go @@ -7,6 +7,7 @@ import ( "strconv" "time" + apifactory "github.com/treeverse/lakefs/modules/api/factory" "github.com/treeverse/lakefs/pkg/block" "github.com/treeverse/lakefs/pkg/catalog" gatewayErrors "github.com/treeverse/lakefs/pkg/gateway/errors" @@ -20,6 +21,7 @@ import ( ) const ( + IfMatchHeader = "If-Match" IfNoneMatchHeader = "If-None-Match" CopySourceHeader = "x-amz-copy-source" CopySourceRangeHeader = "x-amz-copy-source-range" @@ -316,16 +318,11 @@ func handlePut(w http.ResponseWriter, req *http.Request, o *PathOperation) { o.Incr("put_object", o.Principal, o.Repository.Name, o.Reference) storageClass := StorageClassFromHeader(req.Header) opts := block.PutOpts{StorageClass: storageClass} - // check and validate whether if-none-match header provided - allowOverwrite, err := o.checkIfAbsent(req) - if err != nil { - _ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrNotImplemented)) - return - } + ifNoneMatch := o.ifNoneMatchHeader(req) // before writing body, ensure preconditions - this means we essentially check for object existence twice: // once here, before uploading the body to save resources and time, // and then graveler will check again when passed a SetOptions. - if !allowOverwrite { + if ifNoneMatch != nil && *ifNoneMatch == "*" { _, err := o.Catalog.GetEntry(req.Context(), o.Repository.Name, o.Reference, o.Path, catalog.GetEntryParams{}) if err == nil { // In case object exists in catalog, no error returns @@ -333,6 +330,22 @@ func handlePut(w http.ResponseWriter, req *http.Request, o *PathOperation) { return } } + ifMatch := o.ifMatchHeader(req) + condition, err := apifactory.BuildConditionFromParams(ifMatch, ifNoneMatch) + if err != nil { + if errors.Is(err, graveler.ErrInvalidValue) { + _ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrBadRequest)) + return + } + if errors.Is(err, catalog.ErrNotImplemented) { + _ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrNotImplemented)) + return + } + _ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrInternalError)) + return + } + gravelerOpts := graveler.WithCondition(*condition) + objectPointer := block.ObjectPointer{ StorageID: o.Repository.StorageID, StorageNamespace: o.Repository.StorageNamespace, @@ -349,7 +362,7 @@ func handlePut(w http.ResponseWriter, req *http.Request, o *PathOperation) { // write metadata metadata := amzMetaAsMetadata(req) contentType := req.Header.Get("Content-Type") - err = o.finishUpload(req, &blob.CreationDate, blob.Checksum, blob.PhysicalAddress, blob.Size, true, metadata, contentType, allowOverwrite) + err = o.finishUpload(req, &blob.CreationDate, blob.Checksum, blob.PhysicalAddress, blob.Size, true, metadata, contentType, gravelerOpts) if errors.Is(err, graveler.ErrPreconditionFailed) { _ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrPreconditionFailed)) return @@ -370,13 +383,18 @@ func handlePut(w http.ResponseWriter, req *http.Request, o *PathOperation) { w.WriteHeader(http.StatusOK) } -func (o *PathOperation) checkIfAbsent(req *http.Request) (bool, error) { - headerValue := req.Header.Get(IfNoneMatchHeader) - if headerValue == "" { - return true, nil +func (o *PathOperation) ifNoneMatchHeader(req *http.Request) *string { + h := req.Header.Get(IfNoneMatchHeader) + if h == "" { + return nil } - if headerValue == "*" { - return false, nil + return &h +} + +func (o *PathOperation) ifMatchHeader(req *http.Request) *string { + h := req.Header.Get(IfMatchHeader) + if h == "" { + return nil } - return false, gatewayErrors.ErrNotImplemented + return &h } diff --git a/pkg/graveler/graveler.go b/pkg/graveler/graveler.go index 93e7e81a52a..4ce23c1f708 100644 --- a/pkg/graveler/graveler.go +++ b/pkg/graveler/graveler.go @@ -198,8 +198,8 @@ func WithStageOnly(v bool) GetOptionsFunc { } } +type ConditionFunc func(currentValue *Value) error type SetOptions struct { - IfAbsent bool // MaxTries set number of times we try to perform the operation before we fail with BranchWriteMaxTries. // By default, 0 - we try BranchWriteMaxTries MaxTries int @@ -214,6 +214,10 @@ type SetOptions struct { SquashMerge bool // NoTombstone will try to remove entry without setting a tombstone in KV NoTombstone bool + // Condition is a function that validates the current value before performing the Set. + // If the condition returns an error, the Set operation fails with that error. + // If the condition succeeds, the Set is performed using SetIf with the current value. + Condition ConditionFunc } type SetOptionsFunc func(opts *SetOptions) @@ -226,12 +230,6 @@ func NewSetOptions(opts []SetOptionsFunc) *SetOptions { return options } -func WithIfAbsent(v bool) SetOptionsFunc { - return func(opts *SetOptions) { - opts.IfAbsent = v - } -} - func WithForce(v bool) SetOptionsFunc { return func(opts *SetOptions) { opts.Force = v @@ -262,6 +260,12 @@ func WithNoTombstone(v bool) SetOptionsFunc { } } +func WithCondition(condition ConditionFunc) SetOptionsFunc { + return func(opts *SetOptions) { + opts.Condition = condition + } +} + // ListOptions controls list request defaults type ListOptions struct { // Shows entities marked as hidden @@ -546,7 +550,8 @@ type BranchRecord struct { type BranchUpdateFunc func(*Branch) (*Branch, error) // ValueUpdateFunc Used to pass validation call back to staging manager for UpdateValue flow -type ValueUpdateFunc func(*Value) (*Value, error) +// val - current value, nil if not fount, val.Identity == nil indicates tombstone +type ValueUpdateFunc func(val *Value) (*Value, error) // TagRecord holds TagID with the associated Tag data type TagRecord struct { @@ -1843,28 +1848,51 @@ func (g *Graveler) Set(ctx context.Context, repository *RepositoryRecord, branch log := g.log(ctx).WithFields(logging.Fields{"key": key, "operation": "set"}) err = g.safeBranchWrite(ctx, log, repository, branchID, safeBranchWriteOptions{MaxTries: options.MaxTries}, func(branch *Branch) error { - if !options.IfAbsent { + if options.Condition == nil { return g.StagingManager.Set(ctx, branch.StagingToken, key, &value, false) } - // verify the key not found - _, err := g.Get(ctx, repository, Ref(branchID), key) - if err == nil { // Entry found, return precondition failed - return ErrPreconditionFailed + // setFunc is a update function that sets the value regardless of the current value + setFunc := func(_ *Value) (*Value, error) { + return &value, nil } - if !errors.Is(err, ErrNotFound) { - return err + return g.handleUpdate(ctx, repository, branchID, branch, key, setFunc, options.Condition) + }, "set") + return err +} + +// handleUpdate applies the provided condition and update callback functions to the current value of the given key, +// considering both committed and staging values. The condition is checked before applying the update. +func (g *Graveler) handleUpdate(ctx context.Context, repository *RepositoryRecord, branchID BranchID, branch *Branch, key Key, updateFunc ValueUpdateFunc, condition ConditionFunc) error { + // Get current value considering committed and sealed tokens + curValue, err := g.Get(ctx, repository, Ref(branchID), key) + if err != nil && !errors.Is(err, ErrNotFound) { + return err + } + + return g.StagingManager.Update(ctx, branch.StagingToken, key, func(stagingValue *Value) (*Value, error) { + var latestValue *Value + switch { + case stagingValue != nil && stagingValue.Identity == nil: + // tombstone in staging + latestValue = nil + case stagingValue != nil: + latestValue = stagingValue + default: + latestValue = curValue } - // update stage with new value only if key not found or tombstone - return g.StagingManager.Update(ctx, branch.StagingToken, key, func(currentValue *Value) (*Value, error) { - if currentValue == nil || currentValue.Identity == nil { - return &value, nil + if condition != nil { + if err := condition(latestValue); err != nil { + return nil, err } - return nil, ErrSkipValueUpdate - }) - }, "set") - return err + } + + if updateFunc != nil { + return updateFunc(latestValue) + } + return curValue, nil + }) } // safeBranchWrite repeatedly attempts to perform stagingOperation, retrying @@ -1931,27 +1959,8 @@ func (g *Graveler) Update(ctx context.Context, repository *RepositoryRecord, bra log := g.log(ctx).WithFields(logging.Fields{"key": key, "operation": "update_user_metadata"}) - // committedValue, if non-nil is a value read from either uncommitted or committed. Usually - // it is read from committed. If there is a value on staging, that entry will be modified - // and committedValue will never be read. - var committedValue *Value - err = g.safeBranchWrite(ctx, log, repository, branchID, safeBranchWriteOptions{MaxTries: options.MaxTries}, func(branch *Branch) error { - return g.StagingManager.Update(ctx, branch.StagingToken, key, func(currentValue *Value) (*Value, error) { - if currentValue == nil { - // Object not on staging: need to update committed value. - if committedValue == nil { - committedValue, err = g.Get(ctx, repository, Ref(branchID), key) - if err != nil { - // (Includes ErrNotFound) - return nil, fmt.Errorf("read from committed: %w", err) - } - } - // Get always returns a non-nil value or an error. - currentValue = committedValue - } - return update(currentValue) - }) + return g.handleUpdate(ctx, repository, branchID, branch, key, update, options.Condition) }, "update_metadata") return err } diff --git a/pkg/graveler/graveler_test.go b/pkg/graveler/graveler_test.go index dfe16b44e88..eb68960d39c 100644 --- a/pkg/graveler/graveler_test.go +++ b/pkg/graveler/graveler_test.go @@ -32,8 +32,6 @@ type Hooks struct { TagID graveler.TagID } -var ErrGravelerUpdate = errors.New("test update error") - func (h *Hooks) PrepareCommitHook(_ context.Context, record graveler.HookRecord) error { h.Called = append(h.Called, "PrepareCommitHook") h.RepositoryID = record.Repository.RepositoryID @@ -383,10 +381,8 @@ func TestGraveler_Get(t *testing.T) { func TestGraveler_Set(t *testing.T) { newSetVal := &graveler.ValueRecord{Key: []byte("key"), Value: &graveler.Value{Data: []byte("newValue"), Identity: []byte("newIdentity")}} - sampleVal := &graveler.Value{Identity: []byte("sampleIdentity"), Data: []byte("sampleValue")} tests := []struct { name string - ifAbsent bool expectedValueResult *graveler.ValueRecord expectedErr error committedMgr *testutil.CommittedFake @@ -407,46 +403,12 @@ func TestGraveler_Set(t *testing.T) { refMgr: &testutil.RefsFake{Branch: &graveler.Branch{CommitID: "commit1"}}, expectedValueResult: newSetVal, }, - { - name: "overwrite no prior value", - committedMgr: &testutil.CommittedFake{Err: graveler.ErrNotFound}, - stagingMgr: &testutil.StagingFake{}, - refMgr: &testutil.RefsFake{Branch: &graveler.Branch{CommitID: "bla"}, Commits: map[graveler.CommitID]*graveler.Commit{"": {}}}, - expectedValueResult: newSetVal, - ifAbsent: true, - }, - { - name: "overwrite with prior committed value", - committedMgr: &testutil.CommittedFake{}, - stagingMgr: &testutil.StagingFake{}, - refMgr: &testutil.RefsFake{Branch: &graveler.Branch{CommitID: "bla"}, Commits: map[graveler.CommitID]*graveler.Commit{"": {}}}, - expectedValueResult: nil, - expectedErr: graveler.ErrPreconditionFailed, - ifAbsent: true, - }, - { - name: "overwrite with prior staging value", - committedMgr: &testutil.CommittedFake{}, - stagingMgr: &testutil.StagingFake{Values: map[string]map[string]*graveler.Value{"st": {"key": sampleVal}}, LastSetValueRecord: &graveler.ValueRecord{Key: []byte("key"), Value: sampleVal}}, - refMgr: &testutil.RefsFake{Branch: &graveler.Branch{CommitID: "bla", StagingToken: "st"}, Commits: map[graveler.CommitID]*graveler.Commit{"": {}}}, - expectedValueResult: &graveler.ValueRecord{Key: []byte("key"), Value: sampleVal}, - expectedErr: graveler.ErrPreconditionFailed, - ifAbsent: true, - }, - { - name: "overwrite with prior staging tombstone", - committedMgr: &testutil.CommittedFake{Err: graveler.ErrNotFound}, - stagingMgr: &testutil.StagingFake{Values: map[string]map[string]*graveler.Value{"st1": {"key": nil}, "st2": {"key": sampleVal}}, LastSetValueRecord: &graveler.ValueRecord{Key: []byte("key"), Value: sampleVal}}, - refMgr: &testutil.RefsFake{Branch: &graveler.Branch{CommitID: "bla", StagingToken: "st1", SealedTokens: []graveler.StagingToken{"st2"}}, Commits: map[graveler.CommitID]*graveler.Commit{"": {}}}, - expectedValueResult: newSetVal, - ifAbsent: true, - }, } ctx := context.Background() for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { store := newGraveler(t, tt.committedMgr, tt.stagingMgr, tt.refMgr, nil, testutil.NewProtectedBranchesManagerFake()) - err := store.Set(ctx, repository, "branch-1", newSetVal.Key, *newSetVal.Value, graveler.WithIfAbsent(tt.ifAbsent)) + err := store.Set(ctx, repository, "branch-1", newSetVal.Key, *newSetVal.Value) if !errors.Is(err, tt.expectedErr) { t.Fatalf("Set() - error: %v, expected: %v", err, tt.expectedErr) } @@ -467,36 +429,29 @@ func TestGravelerSet_Advanced(t *testing.T) { newSetVal := &graveler.ValueRecord{Key: []byte("key"), Value: &graveler.Value{Data: []byte("newValue"), Identity: []byte("newIdentity")}} // RefManager mock base setup refMgr := mock.NewMockRefManager(ctrl) - refExpect := refMgr.EXPECT() - refExpectCommitNotFound := func() { - refExpect.ParseRef(gomock.Any()).Times(1).Return(graveler.RawRef{BaseRef: ""}, nil) - refExpect.ResolveRawRef(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(&graveler.ResolvedRef{ + + t.Run("branch deleted after update", func(t *testing.T) { + branch := &graveler.Branch{CommitID: "commit1", StagingToken: "st"} + refMgr.EXPECT().GetBranch(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(branch, nil) + refMgr.EXPECT().ParseRef(gomock.Any()).Times(1).Return(graveler.RawRef{BaseRef: "branch-1"}, nil) + refMgr.EXPECT().ResolveRawRef(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(&graveler.ResolvedRef{ + Type: graveler.ReferenceTypeBranch, BranchRecord: graveler.BranchRecord{ - Branch: &graveler.Branch{}, + BranchID: "branch-1", + Branch: branch, }, }, nil) - refExpect.GetCommit(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(nil, graveler.ErrCommitNotFound) - } - - t.Run("update failure", func(t *testing.T) { - stagingMgr := &testutil.StagingFake{ - UpdateErr: ErrGravelerUpdate, - } - refMgr.EXPECT().GetBranch(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(&graveler.Branch{}, nil) - refExpectCommitNotFound() - store := newGraveler(t, committedMgr, stagingMgr, refMgr, nil, testutil.NewProtectedBranchesManagerFake()) - err := store.Set(ctx, repository, "branch-1", newSetVal.Key, *newSetVal.Value, graveler.WithIfAbsent(true)) - require.ErrorIs(t, err, ErrGravelerUpdate) - require.Nil(t, stagingMgr.LastSetValueRecord) - }) - - t.Run("branch deleted after update", func(t *testing.T) { - refMgr.EXPECT().GetBranch(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(&graveler.Branch{}, nil) - refExpectCommitNotFound() refMgr.EXPECT().GetBranch(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(nil, graveler.ErrNotFound) stagingMgr := &testutil.StagingFake{} store := newGraveler(t, committedMgr, stagingMgr, refMgr, nil, testutil.NewProtectedBranchesManagerFake()) - err := store.Set(ctx, repository, "branch-1", newSetVal.Key, *newSetVal.Value, graveler.WithIfAbsent(true)) + // condition equivalent to "if-absent" + condition := func(currentValue *graveler.Value) error { + if currentValue != nil { + return graveler.ErrPreconditionFailed + } + return nil + } + err := store.Set(ctx, repository, "branch-1", newSetVal.Key, *newSetVal.Value, graveler.WithCondition(condition)) require.ErrorIs(t, err, graveler.ErrNotFound) require.Equal(t, newSetVal, stagingMgr.LastSetValueRecord) }) @@ -504,17 +459,15 @@ func TestGravelerSet_Advanced(t *testing.T) { t.Run("branch token changed after update - one retry", func(t *testing.T) { // Test safeBranchWrite retries when token changed after update a single time and then succeeds refMgr.EXPECT().GetBranch(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(&graveler.Branch{}, nil) - refExpectCommitNotFound() refMgr.EXPECT().GetBranch(gomock.Any(), gomock.Any(), gomock.Any()).Times(2).Return(&graveler.Branch{ StagingToken: "new_token", }, nil) - refExpectCommitNotFound() refMgr.EXPECT().GetBranch(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(&graveler.Branch{ StagingToken: "new_token", }, nil) stagingMgr := &testutil.StagingFake{} store := newGraveler(t, committedMgr, stagingMgr, refMgr, nil, testutil.NewProtectedBranchesManagerFake()) - err := store.Set(ctx, repository, "branch-1", newSetVal.Key, *newSetVal.Value, graveler.WithIfAbsent(true)) + err := store.Set(ctx, repository, "branch-1", newSetVal.Key, *newSetVal.Value) require.Nil(t, err) require.Equal(t, newSetVal, stagingMgr.LastSetValueRecord) }) @@ -525,7 +478,6 @@ func TestGravelerSet_Advanced(t *testing.T) { refMgr.EXPECT().GetBranch(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(&graveler.Branch{ StagingToken: graveler.StagingToken("new_token_" + strconv.Itoa(i)), }, nil) - refExpectCommitNotFound() refMgr.EXPECT().GetBranch(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(&graveler.Branch{ StagingToken: graveler.StagingToken("new_token_" + strconv.Itoa(i+1)), }, nil) @@ -533,13 +485,12 @@ func TestGravelerSet_Advanced(t *testing.T) { refMgr.EXPECT().GetBranch(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(&graveler.Branch{ StagingToken: graveler.StagingToken("new_token_" + strconv.Itoa(graveler.BranchWriteMaxTries)), }, nil) - refExpectCommitNotFound() refMgr.EXPECT().GetBranch(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(&graveler.Branch{ StagingToken: graveler.StagingToken("new_token_" + strconv.Itoa(graveler.BranchWriteMaxTries)), }, nil) stagingMgr := &testutil.StagingFake{} store := newGraveler(t, committedMgr, stagingMgr, refMgr, nil, testutil.NewProtectedBranchesManagerFake()) - err := store.Set(ctx, repository, "branch-1", newSetVal.Key, *newSetVal.Value, graveler.WithIfAbsent(true)) + err := store.Set(ctx, repository, "branch-1", newSetVal.Key, *newSetVal.Value) require.Nil(t, err) require.Equal(t, newSetVal, stagingMgr.LastSetValueRecord) }) @@ -550,19 +501,183 @@ func TestGravelerSet_Advanced(t *testing.T) { refMgr.EXPECT().GetBranch(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(&graveler.Branch{ StagingToken: graveler.StagingToken("new_token_" + strconv.Itoa(i)), }, nil) - refExpectCommitNotFound() refMgr.EXPECT().GetBranch(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(&graveler.Branch{ StagingToken: graveler.StagingToken("new_token_" + strconv.Itoa(i+1)), }, nil) } stagingMgr := &testutil.StagingFake{} store := newGraveler(t, committedMgr, stagingMgr, refMgr, nil, testutil.NewProtectedBranchesManagerFake()) - err := store.Set(ctx, repository, "branch-1", newSetVal.Key, *newSetVal.Value, graveler.WithIfAbsent(true)) + err := store.Set(ctx, repository, "branch-1", newSetVal.Key, *newSetVal.Value) require.ErrorIs(t, err, graveler.ErrTooManyTries) require.Equal(t, newSetVal, stagingMgr.LastSetValueRecord) }) } +func TestGraveler_SetWithCondition(t *testing.T) { + ctx := context.Background() + newSetVal := &graveler.ValueRecord{Key: []byte("key"), Value: &graveler.Value{Data: []byte("newValue"), Identity: []byte("newIdentity")}} + existingVal := &graveler.Value{Identity: []byte("existingIdentity"), Data: []byte("existingValue")} + + tests := []struct { + name string + existingValue *graveler.Value + existingInCommitted bool // If true, put value in committed instead of staging + isTombstone bool // If true, set an explicit nil in staging (tombstone) + condition func(*graveler.Value) error + expectedErr error + expectedValueResult *graveler.ValueRecord + }{ + { + name: "condition passes with existing value in staging", + existingValue: existingVal, + condition: func(v *graveler.Value) error { + if v != nil && string(v.Identity) == "existingIdentity" { + return nil + } + return graveler.ErrPreconditionFailed + }, + expectedValueResult: newSetVal, + }, + { + name: "condition fails with existing value in staging", + existingValue: existingVal, + condition: func(v *graveler.Value) error { + if v != nil && string(v.Identity) == "wrongIdentity" { + return nil + } + return graveler.ErrPreconditionFailed + }, + expectedErr: graveler.ErrPreconditionFailed, + }, + { + name: "condition passes with nil value", + existingValue: nil, + condition: func(v *graveler.Value) error { + if v == nil { + return nil + } + return graveler.ErrPreconditionFailed + }, + expectedValueResult: newSetVal, + }, + { + name: "condition fails with nil value", + existingValue: nil, + condition: func(v *graveler.Value) error { + if v != nil { + return nil + } + return graveler.ErrPreconditionFailed + }, + expectedErr: graveler.ErrPreconditionFailed, + }, + { + name: "condition passes with existing value in committed", + existingValue: existingVal, + existingInCommitted: true, + condition: func(v *graveler.Value) error { + if v != nil && string(v.Identity) == "existingIdentity" { + return nil + } + return graveler.ErrPreconditionFailed + }, + expectedValueResult: newSetVal, + }, + { + name: "condition fails with existing value in committed", + existingValue: existingVal, + existingInCommitted: true, + condition: func(v *graveler.Value) error { + if v != nil && string(v.Identity) == "wrongIdentity" { + return nil + } + return graveler.ErrPreconditionFailed + }, + expectedErr: graveler.ErrPreconditionFailed, + }, + { + name: "condition passes with tombstone in staging", + existingValue: existingVal, + existingInCommitted: true, + isTombstone: true, + condition: func(v *graveler.Value) error { + if v == nil { + return nil + } + return graveler.ErrPreconditionFailed + }, + expectedValueResult: newSetVal, + }, + { + name: "condition fails with tombstone in staging", + existingValue: existingVal, + existingInCommitted: true, + isTombstone: true, + condition: func(v *graveler.Value) error { + if v != nil { + return nil + } + return graveler.ErrPreconditionFailed + }, + expectedErr: graveler.ErrPreconditionFailed, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + committedMgr := &testutil.CommittedFake{} + stagingMgr := &testutil.StagingFake{} + + if tt.existingValue != nil { + if tt.existingInCommitted { + // Populate committed storage + committedMgr.ValuesByKey = map[string]*graveler.Value{ + string(newSetVal.Key): tt.existingValue, + } + } else { + // Populate staging storage + stagingMgr.Values = map[string]map[string]*graveler.Value{ + "st": {string(newSetVal.Key): tt.existingValue}, + } + } + } + + // Set up tombstone if requested + if tt.isTombstone { + stagingMgr.Values = map[string]map[string]*graveler.Value{ + "st": {string(newSetVal.Key): nil}, + } + } + // RefManager mock setup - returns a random commit for the branch just to get to the committed manager and get the existing value + randomCommit := &graveler.Commit{} + refMgr := &testutil.RefsFake{ + Branch: &graveler.Branch{ + CommitID: "commit1", + StagingToken: "st", + }, + CommitID: "commit1", + Commits: map[graveler.CommitID]*graveler.Commit{"commit1": randomCommit}, + } + + store := newGraveler(t, committedMgr, stagingMgr, refMgr, nil, testutil.NewProtectedBranchesManagerFake()) + + // Create SetOptionsFunc with condition + withCondition := func(opts *graveler.SetOptions) { + opts.Condition = tt.condition + } + + err := store.Set(ctx, repository, "branch-1", newSetVal.Key, *newSetVal.Value, withCondition) + if !errors.Is(err, tt.expectedErr) { + t.Fatalf("Set() with condition - error: %v, expected: %v", err, tt.expectedErr) + } + + if err == nil { + require.Equal(t, tt.expectedValueResult, stagingMgr.LastSetValueRecord) + } + }) + } +} + func TestGravelerGet_Advanced(t *testing.T) { tests := []struct { name string diff --git a/pkg/graveler/staging/manager.go b/pkg/graveler/staging/manager.go index 688dcd38da1..d0e350f7137 100644 --- a/pkg/graveler/staging/manager.go +++ b/pkg/graveler/staging/manager.go @@ -116,15 +116,12 @@ func (m *Manager) Set(ctx context.Context, st graveler.StagingToken, key gravele func (m *Manager) Update(ctx context.Context, st graveler.StagingToken, key graveler.Key, updateFunc graveler.ValueUpdateFunc) error { oldValueProto := &graveler.StagedEntryData{} - var oldValue *graveler.Value pred, err := kv.GetMsg(ctx, m.kvStore, graveler.StagingTokenPartition(st), key, oldValueProto) - if err != nil { - if errors.Is(err, kv.ErrNotFound) { - oldValue = nil - } else { - return err - } - } else { + if err != nil && !errors.Is(err, kv.ErrNotFound) { + return err + } + var oldValue *graveler.Value + if !errors.Is(err, kv.ErrNotFound) { oldValue = graveler.StagedEntryFromProto(oldValueProto) } updatedValue, err := updateFunc(oldValue) diff --git a/pkg/graveler/testutil/fakes.go b/pkg/graveler/testutil/fakes.go index 6f32b7261e3..2b9c0ecae3d 100644 --- a/pkg/graveler/testutil/fakes.go +++ b/pkg/graveler/testutil/fakes.go @@ -186,8 +186,13 @@ func (s *StagingFake) Update(_ context.Context, st graveler.StagingToken, key gr if s.UpdateErr != nil { return s.UpdateErr } - v := s.Values[st.String()][key.String()] - + v, exists := s.Values[st.String()][key.String()] + if !exists { + v = nil + } else if v == nil { + // Tombstone + v = new(graveler.Value) + } val, err := updateFunc(v) if err != nil { return err