Skip to content

Commit

Permalink
feat: Add ctx for lock and unlock interface (#1022)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenxuwan authored Dec 11, 2023
1 parent 45d0793 commit 94fc2ed
Show file tree
Hide file tree
Showing 18 changed files with 83 additions and 83 deletions.
4 changes: 2 additions & 2 deletions components/lock/consul/consul_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func getTTL(expire int32) string {
return strconv.Itoa(int(expire)) + "s"
}

func (c *ConsulLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse, error) {
func (c *ConsulLock) TryLock(ctx context.Context, req *lock.TryLockRequest) (*lock.TryLockResponse, error) {

// create a session TTL
session, _, err := c.sessionFactory.Create(&api.SessionEntry{
Expand Down Expand Up @@ -112,7 +112,7 @@ func (c *ConsulLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse, e
Success: false,
}, nil
}
func (c *ConsulLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockResponse, error) {
func (c *ConsulLock) Unlock(ctx context.Context, req *lock.UnlockRequest) (*lock.UnlockResponse, error) {

session, ok := c.sMap.Load(req.LockOwner + "-" + req.ResourceId)

Expand Down
14 changes: 7 additions & 7 deletions components/lock/consul/consul_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestConsulLock_TryLock(t *testing.T) {
kv.EXPECT().Release(&api.KVPair{Key: resouseId, Value: []byte(lockOwerA), Session: "session1"}, nil).
Return(true, nil, nil).Times(1)

tryLock, err := comp.TryLock(&lock.TryLockRequest{
tryLock, err := comp.TryLock(context.TODO(), &lock.TryLockRequest{
ResourceId: resouseId,
LockOwner: lockOwerA,
Expire: expireTime,
Expand All @@ -84,7 +84,7 @@ func TestConsulLock_TryLock(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, true, tryLock.Success)

unlock, err := comp.Unlock(&lock.UnlockRequest{
unlock, err := comp.Unlock(context.TODO(), &lock.UnlockRequest{
ResourceId: resouseId,
LockOwner: lockOwerA,
})
Expand Down Expand Up @@ -121,7 +121,7 @@ func TestConsulLock_ALock_BLock(t *testing.T) {
kv.EXPECT().Acquire(&api.KVPair{Key: resouseId, Value: []byte(lockOwerB), Session: "session2"}, nil).
Return(false, nil, nil).Times(1)

tryLock, _ := comp.TryLock(&lock.TryLockRequest{
tryLock, _ := comp.TryLock(context.TODO(), &lock.TryLockRequest{
ResourceId: resouseId,
LockOwner: lockOwerA,
Expire: expireTime,
Expand All @@ -130,7 +130,7 @@ func TestConsulLock_ALock_BLock(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, true, tryLock.Success)

bLock, _ := comp.TryLock(&lock.TryLockRequest{
bLock, _ := comp.TryLock(context.TODO(), &lock.TryLockRequest{
ResourceId: resouseId,
LockOwner: lockOwerB,
Expire: expireTime,
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestConsulLock_ALock_BUnlock(t *testing.T) {
kv.EXPECT().Release(&api.KVPair{Key: resouseId, Value: []byte(lockOwerA), Session: "session1"}, nil).
Return(true, nil, nil).Times(1)

tryLock, _ := comp.TryLock(&lock.TryLockRequest{
tryLock, _ := comp.TryLock(context.TODO(), &lock.TryLockRequest{
ResourceId: resouseId,
LockOwner: lockOwerA,
Expire: expireTime,
Expand All @@ -175,15 +175,15 @@ func TestConsulLock_ALock_BUnlock(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, true, tryLock.Success)

unlock, _ := comp.Unlock(&lock.UnlockRequest{
unlock, _ := comp.Unlock(context.TODO(), &lock.UnlockRequest{
ResourceId: resouseId,
LockOwner: lockOwerB,
})

assert.NoError(t, err)
assert.Equal(t, lock.LOCK_UNEXIST, unlock.Status)

unlock2, err := comp.Unlock(&lock.UnlockRequest{
unlock2, err := comp.Unlock(context.TODO(), &lock.UnlockRequest{
ResourceId: resouseId,
LockOwner: lockOwerA,
})
Expand Down
4 changes: 2 additions & 2 deletions components/lock/etcd/etcd_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (e *EtcdLock) Features() []lock.Feature {
}

// Node tries to acquire a etcd lock
func (e *EtcdLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse, error) {
func (e *EtcdLock) TryLock(ctx context.Context, req *lock.TryLockRequest) (*lock.TryLockResponse, error) {
var leaseId clientv3.LeaseID
//1.Create new lease
lease := clientv3.NewLease(e.client)
Expand Down Expand Up @@ -108,7 +108,7 @@ func (e *EtcdLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse, err
}

// Node tries to release a etcd lock
func (e *EtcdLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockResponse, error) {
func (e *EtcdLock) Unlock(ctx context.Context, req *lock.UnlockRequest) (*lock.UnlockResponse, error) {
key := e.getKey(req.ResourceId)

// 1.Create new KV
Expand Down
16 changes: 8 additions & 8 deletions components/lock/etcd/etcd_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestEtcdLock_TryLock(t *testing.T) {
assert.NoError(t, err)

ownerId1 := uuid.New().String()
resp, err = comp.TryLock(&lock.TryLockRequest{
resp, err = comp.TryLock(context.TODO(), &lock.TryLockRequest{
ResourceId: resourceId,
LockOwner: ownerId1,
Expire: 10,
Expand All @@ -132,7 +132,7 @@ func TestEtcdLock_TryLock(t *testing.T) {
assert.Equal(t, true, resp.Success)

//repeat
resp, err = comp.TryLock(&lock.TryLockRequest{
resp, err = comp.TryLock(context.TODO(), &lock.TryLockRequest{
ResourceId: resourceId,
LockOwner: ownerId1,
Expire: 10,
Expand All @@ -146,7 +146,7 @@ func TestEtcdLock_TryLock(t *testing.T) {
go func() {
//another owner
ownerId2 := uuid.New().String()
resp, err = comp.TryLock(&lock.TryLockRequest{
resp, err = comp.TryLock(context.TODO(), &lock.TryLockRequest{
ResourceId: resourceId,
LockOwner: ownerId2,
Expire: 10,
Expand All @@ -159,7 +159,7 @@ func TestEtcdLock_TryLock(t *testing.T) {
wg.Wait()

//another resource
resp, err = comp.TryLock(&lock.TryLockRequest{
resp, err = comp.TryLock(context.TODO(), &lock.TryLockRequest{
ResourceId: resourceId2,
LockOwner: ownerId1,
Expire: 10,
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestEtcdLock_UnLock(t *testing.T) {
assert.NoError(t, err)

ownerId1 := uuid.New().String()
lockresp, err = comp.TryLock(&lock.TryLockRequest{
lockresp, err = comp.TryLock(context.TODO(), &lock.TryLockRequest{
ResourceId: resourceId3,
LockOwner: ownerId1,
Expire: 10,
Expand All @@ -203,23 +203,23 @@ func TestEtcdLock_UnLock(t *testing.T) {
assert.Equal(t, true, lockresp.Success)

//error ownerid
resp, err = comp.Unlock(&lock.UnlockRequest{
resp, err = comp.Unlock(context.TODO(), &lock.UnlockRequest{
ResourceId: resourceId3,
LockOwner: uuid.New().String(),
})
assert.NoError(t, err)
assert.Equal(t, lock.LOCK_BELONG_TO_OTHERS, resp.Status)

//error resourceid
resp, err = comp.Unlock(&lock.UnlockRequest{
resp, err = comp.Unlock(context.TODO(), &lock.UnlockRequest{
ResourceId: resourceId4,
LockOwner: ownerId1,
})
assert.NoError(t, err)
assert.Equal(t, lock.LOCK_UNEXIST, resp.Status)

//success
resp, err = comp.Unlock(&lock.UnlockRequest{
resp, err = comp.Unlock(context.TODO(), &lock.UnlockRequest{
ResourceId: resourceId3,
LockOwner: ownerId1,
})
Expand Down
4 changes: 2 additions & 2 deletions components/lock/in-memory/in_memory_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (s *InMemoryLock) Features() []lock.Feature {
}

// Try to add a lock. Currently this is a non-reentrant lock
func (s *InMemoryLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse, error) {
func (s *InMemoryLock) TryLock(ctx context.Context, req *lock.TryLockRequest) (*lock.TryLockResponse, error) {
s.data.Lock()
defer s.data.Unlock()
// 1. Find the memoryLock for this resourceId
Expand Down Expand Up @@ -109,7 +109,7 @@ func (s *InMemoryLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse,
}, nil
}

func (s *InMemoryLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockResponse, error) {
func (s *InMemoryLock) Unlock(ctx context.Context, req *lock.UnlockRequest) (*lock.UnlockResponse, error) {
s.data.Lock()
defer s.data.Unlock()
// 1. Find the memoryLock for this resourceId
Expand Down
26 changes: 13 additions & 13 deletions components/lock/in-memory/in_memory_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ func TestTryLock(t *testing.T) {

var err error
var resp *lock.TryLockResponse
resp, err = s.TryLock(req)
resp, err = s.TryLock(context.TODO(), req)
assert.NoError(t, err)
assert.NotNil(t, req)
assert.True(t, resp.Success)

resp, err = s.TryLock(req)
resp, err = s.TryLock(context.TODO(), req)
assert.NoError(t, err)
assert.NotNil(t, req)
assert.False(t, resp.Success)
Expand All @@ -76,7 +76,7 @@ func TestTryLock(t *testing.T) {
Expire: 1,
}

resp, err = s.TryLock(req)
resp, err = s.TryLock(context.TODO(), req)
assert.NoError(t, err)
assert.NotNil(t, req)
assert.True(t, resp.Success)
Expand All @@ -87,14 +87,14 @@ func TestTryLock(t *testing.T) {
Expire: 1,
}

resp, err = s.TryLock(req)
resp, err = s.TryLock(context.TODO(), req)
assert.NoError(t, err)
assert.NotNil(t, req)
assert.False(t, resp.Success)

s.data.locks["key112"].expireTime = time.Now().Add(-2 * time.Second)

resp, err = s.TryLock(req)
resp, err = s.TryLock(context.TODO(), req)
assert.NoError(t, err)
assert.NotNil(t, req)
assert.True(t, resp.Success)
Expand All @@ -112,7 +112,7 @@ func TestUnLock(t *testing.T) {

var err error
var resp *lock.UnlockResponse
resp, err = s.Unlock(req)
resp, err = s.Unlock(context.TODO(), req)
assert.NoError(t, err)
assert.NotNil(t, req)
assert.Equal(t, lock.LOCK_UNEXIST, resp.Status)
Expand All @@ -124,24 +124,24 @@ func TestUnLock(t *testing.T) {
}

var lockResp *lock.TryLockResponse
lockResp, err = s.TryLock(lockReq)
lockResp, err = s.TryLock(context.TODO(), lockReq)
assert.NoError(t, err)
assert.NotNil(t, req)
assert.True(t, lockResp.Success)

resp, err = s.Unlock(req)
resp, err = s.Unlock(context.TODO(), req)
assert.NoError(t, err)
assert.NotNil(t, req)
assert.Equal(t, lock.SUCCESS, resp.Status)

lockResp, err = s.TryLock(lockReq)
lockResp, err = s.TryLock(context.TODO(), lockReq)
assert.NoError(t, err)
assert.NotNil(t, req)
assert.True(t, lockResp.Success)

req.LockOwner = "1"

resp, err = s.Unlock(req)
resp, err = s.Unlock(context.TODO(), req)
assert.NoError(t, err)
assert.NotNil(t, req)
assert.Equal(t, lock.LOCK_BELONG_TO_OTHERS, resp.Status)
Expand All @@ -150,17 +150,17 @@ func TestUnLock(t *testing.T) {
lockReq.ResourceId = "11"
req.LockOwner = "own1"
lockReq.LockOwner = "own1"
lockResp, err = s.TryLock(lockReq)
lockResp, err = s.TryLock(context.TODO(), lockReq)
assert.NoError(t, err)
assert.NotNil(t, req)
assert.True(t, lockResp.Success)

resp, err = s.Unlock(req)
resp, err = s.Unlock(context.TODO(), req)
assert.NoError(t, err)
assert.NotNil(t, req)
assert.Equal(t, lock.SUCCESS, resp.Status)

resp, err = s.Unlock(req)
resp, err = s.Unlock(context.TODO(), req)
assert.NoError(t, err)
assert.NotNil(t, req)
assert.Equal(t, lock.LOCK_UNEXIST, resp.Status)
Expand Down
4 changes: 2 additions & 2 deletions components/lock/lock_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ type LockStore interface {
// Get lock's features
Features() []Feature
// Node tries to acquire a lock
TryLock(req *TryLockRequest) (*TryLockResponse, error)
TryLock(ctx context.Context, req *TryLockRequest) (*TryLockResponse, error)
// Node tries to release a lock
Unlock(req *UnlockRequest) (*UnlockResponse, error)
Unlock(ctx context.Context, req *UnlockRequest) (*UnlockResponse, error)
// Node tries to renewal lease
LockKeepAlive(context.Context, *LockKeepAliveRequest) (*LockKeepAliveResponse, error)
}
4 changes: 2 additions & 2 deletions components/lock/mongo/mongo_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (e *MongoLock) LockKeepAlive(ctx context.Context, request *lock.LockKeepAli
return nil, nil
}

func (e *MongoLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse, error) {
func (e *MongoLock) TryLock(ctx context.Context, req *lock.TryLockRequest) (*lock.TryLockResponse, error) {
var err error
// create mongo session
e.session, err = e.client.StartSession()
Expand Down Expand Up @@ -171,7 +171,7 @@ func (e *MongoLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse, er
}, nil
}

func (e *MongoLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockResponse, error) {
func (e *MongoLock) Unlock(ctx context.Context, req *lock.UnlockRequest) (*lock.UnlockResponse, error) {
var err error
// create mongo session
e.session, err = e.client.StartSession()
Expand Down
14 changes: 7 additions & 7 deletions components/lock/mongo/mongo_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,15 @@ func TestMongoLock_TryLock(t *testing.T) {
comp.client = &mockMongoClient

ownerId1 := uuid.New().String()
resp, err = comp.TryLock(&lock.TryLockRequest{
resp, err = comp.TryLock(context.TODO(), &lock.TryLockRequest{
ResourceId: resourceId,
LockOwner: ownerId1,
Expire: 10,
})
assert.NoError(t, err)
assert.Equal(t, true, resp.Success)

resp, err = comp.TryLock(&lock.TryLockRequest{
resp, err = comp.TryLock(context.TODO(), &lock.TryLockRequest{
ResourceId: resourceId,
LockOwner: ownerId1,
Expire: 10,
Expand All @@ -108,7 +108,7 @@ func TestMongoLock_TryLock(t *testing.T) {

go func() {
ownerId2 := uuid.New().String()
resp, err = comp.TryLock(&lock.TryLockRequest{
resp, err = comp.TryLock(context.TODO(), &lock.TryLockRequest{
ResourceId: resourceId,
LockOwner: ownerId2,
Expire: 10,
Expand All @@ -121,7 +121,7 @@ func TestMongoLock_TryLock(t *testing.T) {
wg.Wait()

//another resource
resp, err = comp.TryLock(&lock.TryLockRequest{
resp, err = comp.TryLock(context.TODO(), &lock.TryLockRequest{
ResourceId: resourceId2,
LockOwner: ownerId1,
Expire: 10,
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestMongoLock_Unlock(t *testing.T) {
comp.client = &mockMongoClient

ownerId1 := uuid.New().String()
lockresp, err = comp.TryLock(&lock.TryLockRequest{
lockresp, err = comp.TryLock(context.TODO(), &lock.TryLockRequest{
ResourceId: resourceId3,
LockOwner: ownerId1,
Expire: 10,
Expand All @@ -172,15 +172,15 @@ func TestMongoLock_Unlock(t *testing.T) {
assert.Equal(t, true, lockresp.Success)

//error resourceid
resp, err = comp.Unlock(&lock.UnlockRequest{
resp, err = comp.Unlock(context.TODO(), &lock.UnlockRequest{
ResourceId: resourceId4,
LockOwner: ownerId1,
})
assert.NoError(t, err)
assert.Equal(t, lock.LOCK_UNEXIST, resp.Status)

//success
resp, err = comp.Unlock(&lock.UnlockRequest{
resp, err = comp.Unlock(context.TODO(), &lock.UnlockRequest{
ResourceId: resourceId3,
LockOwner: ownerId1,
})
Expand Down
4 changes: 2 additions & 2 deletions components/lock/redis/cluster_redis_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (c *ClusterRedisLock) LockKeepAlive(ctx context.Context, request *lock.Lock
return nil, nil
}

func (c *ClusterRedisLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse, error) {
func (c *ClusterRedisLock) TryLock(ctx context.Context, req *lock.TryLockRequest) (*lock.TryLockResponse, error) {
//try to get lock on all redis nodes
intervalStart := utils.GetMiliTimestamp(time.Now().UnixNano())
//intervalLimit must be 1/10 of expire time to make sure time of lock far less than expire time
Expand Down Expand Up @@ -154,7 +154,7 @@ func (c *ClusterRedisLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockRespo
}, err
}

func (c *ClusterRedisLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockResponse, error) {
func (c *ClusterRedisLock) Unlock(ctx context.Context, req *lock.UnlockRequest) (*lock.UnlockResponse, error) {
wg := sync.WaitGroup{}
//err means there were some internal errors,then the status must be INTERNAL_ERROR
//the LOCK_UNEXIST and LOCK_BELONG_TO_OTHERS status codes can be ignore
Expand Down
Loading

0 comments on commit 94fc2ed

Please sign in to comment.