Skip to content

Commit

Permalink
Add contract negotation termination. (#94)
Browse files Browse the repository at this point in the history
* Add contract negotation termination.

This makes it possible for a contract service to terminate a negotation
via the control interface.

* Bump go to fix vulnerabilities

* Fix tests.
  • Loading branch information
ainmosni authored Feb 4, 2025
1 parent d9da8c2 commit cfb0c84
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM docker.io/library/golang:1.23.3
FROM docker.io/library/golang:1.23.5

RUN apt-get update && \
# Go tools:
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/test-and-lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
strategy:
matrix:
go-version:
- 1.23.3
- 1.23.5
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
Expand All @@ -39,7 +39,7 @@ jobs:
strategy:
matrix:
go-version:
- 1.23.3
- 1.23.5
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
Expand All @@ -56,7 +56,7 @@ jobs:
strategy:
matrix:
go-version:
- 1.23.3
- 1.23.5
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
Expand All @@ -77,7 +77,7 @@ jobs:
strategy:
matrix:
go-version:
- 1.23.3
- 1.23.5
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM docker.io/library/golang:1.23.3 AS builder
FROM docker.io/library/golang:1.23.5 AS builder
WORKDIR /app
COPY . ./
RUN make build
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.debug
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
###########################################
# DO NOT USE ANYWHERE NEAR SENSITIVE DATA #
###########################################
FROM docker.io/library/golang:1.23.3 AS builder
FROM docker.io/library/golang:1.23.5 AS builder
WORKDIR /app
COPY . ./
RUN make debug
Expand Down
22 changes: 22 additions & 0 deletions dsp/contract/negotiation.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,28 @@ func (cn *Negotiation) GetCallback() *url.URL { return cn.callback }
func (cn *Negotiation) GetSelf() *url.URL { return cn.self }
func (cn *Negotiation) GetContract() *Negotiation { return cn }

func (cn *Negotiation) GetLocalPID() uuid.UUID {
switch cn.role {
case constants.DataspaceConsumer:
return cn.GetConsumerPID()
case constants.DataspaceProvider:
return cn.GetProviderPID()
default:
panic("not a valid role")
}
}

func (cn *Negotiation) GetRemotePID() uuid.UUID {
switch cn.role {
case constants.DataspaceConsumer:
return cn.GetProviderPID()
case constants.DataspaceProvider:
return cn.GetConsumerPID()
default:
panic("not a valid role")
}
}

// Negotiation setters, these will panic when the negotiation is RO.
func (cn *Negotiation) SetProviderPID(u uuid.UUID) {
cn.panicRO()
Expand Down
59 changes: 56 additions & 3 deletions dsp/control/contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package control
import (
"context"
"encoding/json"
"path"
"slices"

"github.com/go-dataspace/run-dsp/dsp/constants"
Expand Down Expand Up @@ -255,8 +256,60 @@ func (s *Server) ContractFinalize(

// ContractTerminate sends a ContractTerminationMessage.
func (s *Server) ContractTerminate(
_ context.Context,
_ *dspcontrol.ContractTerminateRequest,
ctx context.Context,
req *dspcontrol.ContractTerminateRequest,
) (*dspcontrol.ContractTerminateResponse, error) {
panic("not implemented") // TODO: Implement
ctx, logger := logging.InjectLabels(ctx, "method", "ContractTerminate")
logger.Info("Called")

pid, err := uuid.Parse(req.GetPid())
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "could not parse PID: %s", err)
}

var negotiation *contract.Negotiation
for _, role := range []constants.DataspaceRole{constants.DataspaceConsumer, constants.DataspaceProvider} {
negotiation, err = s.store.GetContractR(ctx, pid, role)
if err == nil {
logger.Info("Contract found", "pid", negotiation.GetLocalPID().String(), "role", role)
break
}
}
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "could not find contract with PID %s: %s", pid.String(), err)
}
reasons := make([]shared.Multilanguage, 0)
for _, reason := range req.Reason {
reasons = append(reasons, shared.Multilanguage{
Language: "en", // hardcoded for now, going i18n will be its own PR.
Value: reason,
})
}
negotiationTermination := shared.ContractNegotiationTerminationMessage{
Context: shared.GetDSPContext(),
Type: "dspace:ContractNegotiationTerminationMessage",
ProviderPID: negotiation.GetProviderPID().URN(),
ConsumerPID: negotiation.GetConsumerPID().URN(),
Code: req.GetCode(),
Reason: reasons,
}

reqBody, err := shared.ValidateAndMarshal(ctx, negotiationTermination)
if err != nil {
return nil, status.Errorf(codes.Internal, "could not encode termination message: %s", err)
}

cu := shared.MustParseURL(negotiation.GetCallback().String())
cu.Path = path.Join(cu.Path, "negotiations", negotiation.GetRemotePID().String(), "termination")
s.reconciler.Add(statemachine.ReconciliationEntry{
EntityID: negotiation.GetLocalPID(),
Type: statemachine.ReconciliationContract,
Role: negotiation.GetRole(),
TargetState: contract.States.TERMINATED.String(),
Method: "POST",
URL: cu,
Body: reqBody,
Context: ctx,
})
return &dspcontrol.ContractTerminateResponse{}, nil
}
51 changes: 51 additions & 0 deletions dsp/control/contracts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,3 +461,54 @@ func TestContractFinalize(t *testing.T) {
assert.Equal(t, staticProviderPID.URN(), reqPayload.ProviderPID)
assert.Equal(t, contract.States.FINALIZED.String(), reqPayload.EventType)
}

func TestContractTerminate(t *testing.T) {
t.Parallel()

for _, role := range []constants.DataspaceRole{constants.DataspaceProvider, constants.DataspaceConsumer} {
for _, state := range []contract.State{
contract.States.REQUESTED,
contract.States.OFFERED,
contract.States.ACCEPTED,
contract.States.AGREED,
contract.States.VERIFIED,
} {
ctx, cancel, env := setupEnvironment(t)
createNegotiation(ctx, t, env.store, state, role)

curPID := staticProviderPID
remPID := staticConsumerPID
if role == constants.DataspaceConsumer {
curPID = staticConsumerPID
remPID = staticProviderPID
}
_, err := env.server.ContractTerminate(ctx, &provider.ContractTerminateRequest{
Pid: curPID.String(),
Code: "test",
Reason: []string{"test"},
})

assert.Nil(t, err)

assert.NotNil(t, env.reconciler.e)
assert.Equal(t, statemachine.ReconciliationContract, env.reconciler.e.Type)
assert.Equal(t, role, env.reconciler.e.Role)
assert.Equal(t, contract.States.TERMINATED.String(), env.reconciler.e.TargetState)
assert.Equal(t, http.MethodPost, env.reconciler.e.Method)
assert.Equal(
t,
mkRequestUrl(callBack, "negotiations", remPID.String(), "termination"),
env.reconciler.e.URL.String(),
)

var reqPayload shared.ContractNegotiationTerminationMessage
err = json.Unmarshal(env.reconciler.e.Body, &reqPayload)
assert.Nil(t, err)
assert.Equal(t, staticConsumerPID.URN(), reqPayload.ConsumerPID)
assert.Equal(t, staticProviderPID.URN(), reqPayload.ProviderPID)
assert.Equal(t, "test", reqPayload.Code)
assert.Equal(t, "test", reqPayload.Reason[0].Value)
cancel()
}
}
}
3 changes: 2 additions & 1 deletion dsp/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func GetDSPRoutes(
mux.Handle("POST /negotiations/{providerPID}/events", WrapHandlerWithError(ch.providerContractEventHandler))
mux.Handle("POST /negotiations/{providerPID}/agreement/verification",
WrapHandlerWithError(ch.providerContractVerificationHandler))
mux.Handle("POST /negotiations/{PID}/termination", WrapHandlerWithError(ch.contractTerminationHandler))

// Contract negotiation consumer callbacks)
mux.Handle("POST /negotiations/offers", WrapHandlerWithError(ch.consumerContractOfferHandler))
Expand All @@ -73,7 +74,7 @@ func GetDSPRoutes(
WrapHandlerWithError(ch.consumerContractAgreementHandler))
mux.Handle("POST /callback/negotiations/{consumerPID}/events", WrapHandlerWithError(ch.consumerContractEventHandler))

mux.Handle("POST /negotiations/{PID}/termination", WrapHandlerWithError(ch.contractTerminationHandler))
mux.Handle("POST /callback/negotiations/{PID}/termination", WrapHandlerWithError(ch.contractTerminationHandler))

// Transfer process endpoints
mux.Handle("GET /transfers/{providerPID}", WrapHandlerWithError(ch.providerTransferProcessHandler))
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/go-dataspace/run-dsp

go 1.23.3
go 1.23.5

require (
github.com/alecthomas/chroma/v2 v2.14.0
Expand Down

0 comments on commit cfb0c84

Please sign in to comment.