-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add RPC #97
Conversation
config/config.go
Outdated
@@ -54,6 +54,8 @@ type ( | |||
InternalHttpServer HttpServerConfig `yaml:"internalHttpServer"` | |||
// ClientAddress is the address for API service to call AsyncService's internal API | |||
ClientAddress string `yaml:"clientAddress"` | |||
// Rpc is the config for rpc calls | |||
Rpc RpcConfig `yaml:"rpc"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think RPC should live under API service
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, modified.
service/api/service_impl.go
Outdated
return nil, s.handleUnknownError(err) | ||
} | ||
|
||
if latestPrcExe.NotExists || latestPrcExe.Status != data_models.ProcessExecutionStatusRunning { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in IWF, a read-only RPC is allowed for a closed workflow execution. Do you think we can still support it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. I moved the second checking condition later to skip writing if the process is not running after receiving the RPC response.
s.logger.Warn("failed to update global attributes") | ||
return nil, NewErrorWithStatus( | ||
http.StatusFailedDependency, | ||
"Failed to write global attributes, please check the error message for details: "+updateResp.UpdatingGlobalAttributesError.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error handling is critical for building reliable applications. I think we need to return the error code to applications as mentioned in xcherryio/apis#24 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
http.StatusFailedDependency means 424, this is the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant the error code from the database driver. The global attribute is mapped to database, the error code from database is like this: https://github.com/xdblab/xdb/blob/main/extensions/postgres/error_checker.go#L33
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need this error code --
the user code will make different decisions on this. If column value is conflict, or database schema is invalid, they should return different error code and different UI message in the application.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is in async state, we don’t have a good way to return error. We can discuss this in zoom
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I also need to improve the error code in start process too to return error code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant the error code from the database driver.
The error code from the database driver should be returned by the updateGlobalAttributesIfNeeded
method first. And here in the implementation for RPC, the UpdatingGlobalAttributesError
is assigned with whatever returned from updateGlobalAttributesIfNeeded
, and it's returned to SDK with NewErrorWithStatus
.
So if you wanna imporve on the error message. you can just update updateGlobalAttributesIfNeeded
itself in another PR. Currently, I will just return whatever the err is from the updateGlobalAttributesIfNeeded
to SDK.
|
||
// if getting next states and there are running state(s), return error | ||
if len(request.StateDecision.GetNextStates()) > 0 && len(sequenceMaps.PendingExecutionMap) > 0 { | ||
return nil, fmt.Errorf("cannot start new states when there are running state(s)") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is a valid assumption. There are use cases that need to start new thread while some states are running
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed it.
func (s serviceImpl) Rpc( | ||
ctx context.Context, request xdbapi.ProcessExecutionRpcRequest, | ||
) (response *xdbapi.ProcessExecutionRpcResponse, retErr *ErrorWithStatus) { | ||
latestPrcExe, err := s.store.GetLatestProcessExecution(ctx, data_models.GetLatestProcessExecutionRequest{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's safe outside a transaction. The process execution could get terminated or even deleted right after this query.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s.store.GetLatestProcessExecution
relies on session.SelectLatestProcessExecution
, that have been used several times in the server. Do you mean we modify the session.SelectLatestProcessExecution
to tx.SelectLatestProcessExecution
for all the usages? If not, why we only need this ONE place to be in the transaction?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And you cannot put it into transaction because later there will be a API call to the worker. This is similar to how the waitUntil/Execute API works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah just found two other places in doStopProcessTx
and doPublishToLocalQueueTx
.
I missed these two in the previous PRs. Sorry. It will have racing conditions in these two APIs as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we need to re-design ALL the transactions that query the table xdb_sys_latest_process_executions
to lock the row in xdb_sys_latest_process_executions
as the first step in each transation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The GetLatestProcessExecution before worke RPC call will still be in the session. Once getting the worker RPC response, we will directly use the tx.SelectProcessExecutionForUpdate
to lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And two other places in doStopProcessTx and doPublishToLocalQueueTx can use sessions to SelectLatestProcessExecution, I don't think there will be race condition. Can you just provide a detailed example to prove the opposite side?
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #97 +/- ##
==========================================
- Coverage 65.51% 63.29% -2.22%
==========================================
Files 77 80 +3
Lines 5849 6133 +284
==========================================
+ Hits 3832 3882 +50
- Misses 1775 2008 +233
- Partials 242 243 +1 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great job (esp the refactoring)
} | ||
} | ||
|
||
if w.checkResponseAndError(errToCheck, httpResp) { | ||
if utils.CheckHttpResponseAndError(errToCheck, httpResp, w.logger) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: try to avoid creating "utils" package as it will be hard to remember what are inside later. As a result it's hard for reuse.
Our pattern is create small package for specific area under common: https://github.com/xcherryio/xcherry/tree/main/common
E.g. urlautofix
, ptr
for this one, it will be clearer if we create a package httperror
(avoid http
)and use as httperror.CheckReponseAndErrorr(err, httpResp, logger)
And
decision.ValidateDecision(...)
utils/util.go
Outdated
"github.com/xdblab/xdb/common/log/tag" | ||
) | ||
|
||
func CheckDecision(decision xdbapi.StateDecision) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually the name could be improved to ValidateDecision
which usually indicate to return error.
CheckXyz usually returns bool.
UpdatingGlobalAttributesError error | ||
} | ||
|
||
HandleStateDecisionRequest struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems like this could be an a structure inside the implementation package(sql) because the caller of persistence API doesn't need to understand
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will move them into sql/common.go.
@@ -47,12 +47,31 @@ type ( | |||
NotExists bool | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my mistake in the previous code review -- I think it's better to rename this file to request_response.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will move them into sql/common.go.
@@ -277,6 +296,54 @@ type ( | |||
LoadGlobalAttributesResponse struct { | |||
Response xdbapi.LoadGlobalAttributeResponse | |||
} | |||
|
|||
UpdateProcessExecutionFromRpcRequest struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe FromRpc
-> ForRpc
?
// then gracefully complete the process regardless of the thread close type set in this state. | ||
// Otherwise, handle the thread close type set in this state. | ||
|
||
toGracefullyComplete := procExecWaitToComplete && len(sequenceMaps.PendingExecutionMap) == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just naming nit, maybe:
shouldGracefulComplete := procExecGracefulCompleteRequested && len(sequenceMaps.PendingExecutionMap) == 0
related to the issue: #66
GracefulCompleteRequested
should make it more clear that there was a gracefulComplete decision was made before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will be better to make another PR for the issue, rather than partially renaming something in this PR to make naming inconsistent.
return nil, err | ||
} | ||
|
||
// skip the writing operations on a closed process |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of silently skip this update, it's better to return an error(maybe conflict error) for this case.
users should be informed that the update is rejected otherwise they won't have the correct expectation for what has happened. (this is the behavior in iWF today without any extra code because Cadence/Temporal will throw error when sending signals to a closed workflow execution)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will return StatusNotFound
} | ||
|
||
if utils.CheckHttpResponseAndError(err, httpResp, s.logger) { | ||
return nil, NewErrorWithStatus( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There could be more useful info that the caller need to get from the worker execution.
In the API, we have decided a workerErrorResponse, and we should try to extract it from the http body to caller.
See similar in iWF server: https://github.com/indeedeng/iwf/blob/main/service/common/rpc/invoke.go#L59
The reason is that worker may want to return some useful error type for caller to make different decision on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make another PR to handle it. Several places need to modify accordingly, not only here. Created #98.
Why make this pull request?
Add RPC.
Java SDK Integ Test: xcherryio/sdk-java#13
What has changed
[Summarize what components of the repo is updated]
[Link to xdb-apis/xdb-golang-sdk PRs if it's on top of any API changes]
How to test this pull request?
[If writing Integration test in Golang SDK repo, please provide link to the pull request of Golang SDK Repo]
[It's recommended to write integration test in Golang SDK repo, and enabled in this server repo first,
without enabling in the SDK repo. After this PR is merged, enable and merge the integration test in the SDK repo]
[Alternatively if Java/other SDK repo is preferred, then just test locally against server PR.
After the server PR is merged, merge the integration test in the SDK repo]
Checklist before merge
[ ] If applicable, merge the xdb-apis/xdb-golang-sdk PRs to main branch
[ ] If applicable, merge the xdb-apis/xdb-apis PRs to main branch
[ ] Update
go.mod
to use the commitID of the main branches for xdb-apis/xdb-golang-sdk