Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions lib/gcpspanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type Client struct {
featureSearchQuery FeatureSearchBaseQuery
missingOneImplQuery MissingOneImplementationQuery
searchCfg searchConfig
notificationCfg notificationConfig
batchWriter
batchSize int
batchWriters int
Expand Down Expand Up @@ -140,10 +141,17 @@ type searchConfig struct {
maxBookmarksPerUser uint32
}

// notificationConfig holds the application configuation for notifications.
type notificationConfig struct {
// Max number of consecutive failures per channel
maxConsecutiveFailuresPerChannel uint32
}

const defaultMaxOwnedSearchesPerUser = 25
const defaultMaxBookmarksPerUser = 25
const defaultBatchSize = 5000
const defaultBatchWriters = 8
const defaultMaxConsecutiveFailuresPerChannel = 5

func combineAndDeduplicate(excluded []string, discouraged []string) []string {
if excluded == nil && discouraged == nil {
Expand Down Expand Up @@ -219,6 +227,9 @@ func NewSpannerClient(projectID string, instanceID string, name string) (*Client
maxOwnedSearchesPerUser: defaultMaxOwnedSearchesPerUser,
maxBookmarksPerUser: defaultMaxBookmarksPerUser,
},
notificationConfig{
maxConsecutiveFailuresPerChannel: defaultMaxConsecutiveFailuresPerChannel,
},
bw,
defaultBatchSize,
defaultBatchWriters,
Expand Down
170 changes: 118 additions & 52 deletions lib/gcpspanner/notification_channel_delivery_attempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package gcpspanner

import (
"context"
"encoding/json"
"fmt"
"time"

Expand All @@ -25,13 +26,45 @@ import (
const notificationChannelDeliveryAttemptTable = "NotificationChannelDeliveryAttempts"
const maxDeliveryAttemptsToKeep = 10

// NotificationChannelDeliveryAttempt represents a row in the NotificationChannelDeliveryAttempt table.
type NotificationChannelDeliveryAttempt struct {
// spannerNotificationChannelDeliveryAttempt represents a row in the spannerNotificationChannelDeliveryAttempt table.
type spannerNotificationChannelDeliveryAttempt struct {
ID string `spanner:"ID"`
ChannelID string `spanner:"ChannelID"`
AttemptTimestamp time.Time `spanner:"AttemptTimestamp"`
Status NotificationChannelDeliveryAttemptStatus `spanner:"Status"`
Details spanner.NullJSON `spanner:"Details"`
AttemptDetails *AttemptDetails `spanner:"-"`
}

func (s spannerNotificationChannelDeliveryAttempt) toPublic() (*NotificationChannelDeliveryAttempt, error) {
var attemptDetails *AttemptDetails
if s.Details.Valid {
attemptDetails = new(AttemptDetails)
b, err := json.Marshal(s.Details.Value)
if err != nil {
return nil, err
}
err = json.Unmarshal(b, &attemptDetails)
if err != nil {
return nil, err
}
}

return &NotificationChannelDeliveryAttempt{
ID: s.ID,
ChannelID: s.ChannelID,
AttemptTimestamp: s.AttemptTimestamp,
Status: s.Status,
AttemptDetails: attemptDetails,
}, nil
}

type NotificationChannelDeliveryAttempt struct {
ID string `spanner:"ID"`
ChannelID string `spanner:"ChannelID"`
AttemptTimestamp time.Time `spanner:"AttemptTimestamp"`
Status NotificationChannelDeliveryAttemptStatus `spanner:"Status"`
AttemptDetails *AttemptDetails `spanner:"AttemptDetails"`
}

type NotificationChannelDeliveryAttemptStatus string
Expand Down Expand Up @@ -71,13 +104,14 @@ func (m notificationChannelDeliveryAttemptMapper) Table() string {

func (m notificationChannelDeliveryAttemptMapper) NewEntity(
id string,
req CreateNotificationChannelDeliveryAttemptRequest) (NotificationChannelDeliveryAttempt, error) {
return NotificationChannelDeliveryAttempt{
req CreateNotificationChannelDeliveryAttemptRequest) (spannerNotificationChannelDeliveryAttempt, error) {
return spannerNotificationChannelDeliveryAttempt{
ID: id,
ChannelID: req.ChannelID,
AttemptTimestamp: req.AttemptTimestamp,
Status: req.Status,
Details: req.Details,
AttemptDetails: nil,
}, nil
}

Expand Down Expand Up @@ -132,7 +166,8 @@ type notificationChannelDeliveryAttemptCursor struct {
}

// EncodePageToken returns the ID of the delivery attempt as a page token.
func (m notificationChannelDeliveryAttemptMapper) EncodePageToken(item NotificationChannelDeliveryAttempt) string {
func (m notificationChannelDeliveryAttemptMapper) EncodePageToken(
item spannerNotificationChannelDeliveryAttempt) string {
return encodeCursor(notificationChannelDeliveryAttemptCursor{
LastID: item.ID,
LastAttemptTimestamp: item.AttemptTimestamp,
Expand All @@ -144,68 +179,80 @@ func (c *Client) CreateNotificationChannelDeliveryAttempt(
ctx context.Context, req CreateNotificationChannelDeliveryAttemptRequest) (*string, error) {
var newID *string
_, err := c.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
// 1. Create the new attempt
id, err := newEntityCreator[notificationChannelDeliveryAttemptMapper](c).createWithTransaction(ctx, txn, req)
if err != nil {
return err
}
newID = id
var err error
newID, err = c.createNotificationChannelDeliveryAttemptWithTransaction(ctx, txn, req)

return err
})

return newID, err
}
func (c *Client) createNotificationChannelDeliveryAttemptWithTransaction(
ctx context.Context, txn *spanner.ReadWriteTransaction,
req CreateNotificationChannelDeliveryAttemptRequest) (*string, error) {
var newID *string
// 1. Create the new attempt
id, err := newEntityCreator[notificationChannelDeliveryAttemptMapper](c).createWithTransaction(ctx, txn, req)
if err != nil {
return nil, err
}
newID = id

// 2. Count existing attempts for the channel. Note: This count does not include the new attempt just buffered.
countStmt := spanner.NewStatement(`
// 2. Count existing attempts for the channel. Note: This count does not include the new attempt just buffered.
countStmt := spanner.NewStatement(`
SELECT COUNT(*)
FROM NotificationChannelDeliveryAttempts
WHERE ChannelID = @channelID`)
countStmt.Params["channelID"] = req.ChannelID
var count int64
err = txn.Query(ctx, countStmt).Do(func(r *spanner.Row) error {
return r.Column(0, &count)
})
if err != nil {
return err
}
countStmt.Params["channelID"] = req.ChannelID
var count int64
err = txn.Query(ctx, countStmt).Do(func(r *spanner.Row) error {
return r.Column(0, &count)
})
if err != nil {
return nil, err
}

// 3. If the pre-insert count is at the limit, fetch the oldest attempts to delete.
if count >= maxDeliveryAttemptsToKeep {
// We need to delete enough to make room for the one we are adding.
deleteCount := count - maxDeliveryAttemptsToKeep + 1
deleteStmt := spanner.NewStatement(`
// 3. If the pre-insert count is at the limit, fetch the oldest attempts to delete.
// We need to delete enough to make room for the one we are adding.

if count < maxDeliveryAttemptsToKeep {
return newID, nil
}

deleteCount := count - maxDeliveryAttemptsToKeep + 1
deleteStmt := spanner.NewStatement(`
SELECT ID
FROM NotificationChannelDeliveryAttempts
WHERE ChannelID = @channelID
ORDER BY AttemptTimestamp ASC
LIMIT @deleteCount`)
deleteStmt.Params["channelID"] = req.ChannelID
deleteStmt.Params["deleteCount"] = deleteCount

var mutations []*spanner.Mutation
err := txn.Query(ctx, deleteStmt).Do(func(r *spanner.Row) error {
var attemptID string
if err := r.Column(0, &attemptID); err != nil {
return err
}
mutations = append(mutations,
spanner.Delete(notificationChannelDeliveryAttemptTable,
spanner.Key{attemptID, req.ChannelID}))

return nil
})
if err != nil {
return err
}

// 4. Buffer delete mutations
if len(mutations) > 0 {
return txn.BufferWrite(mutations)
}
deleteStmt.Params["channelID"] = req.ChannelID
deleteStmt.Params["deleteCount"] = deleteCount

var mutations []*spanner.Mutation
err = txn.Query(ctx, deleteStmt).Do(func(r *spanner.Row) error {
var attemptID string
if err := r.Column(0, &attemptID); err != nil {
return err
}
mutations = append(mutations,
spanner.Delete(notificationChannelDeliveryAttemptTable,
spanner.Key{attemptID, req.ChannelID}))

return nil
})
if err != nil {
return nil, err
}

// 4. Buffer delete mutations
if len(mutations) > 0 {
err := txn.BufferWrite(mutations)
if err != nil {
return nil, err
}
}

return newID, nil
}

Expand All @@ -214,14 +261,33 @@ func (c *Client) GetNotificationChannelDeliveryAttempt(
ctx context.Context, attemptID string, channelID string) (*NotificationChannelDeliveryAttempt, error) {
key := deliveryAttemptKey{ID: attemptID, ChannelID: channelID}

return newEntityReader[notificationChannelDeliveryAttemptMapper,
NotificationChannelDeliveryAttempt, deliveryAttemptKey](c).readRowByKey(ctx, key)
attempt, err := newEntityReader[notificationChannelDeliveryAttemptMapper,
spannerNotificationChannelDeliveryAttempt, deliveryAttemptKey](c).readRowByKey(ctx, key)
if err != nil {
return nil, err
}

return attempt.toPublic()
}

// ListNotificationChannelDeliveryAttempts lists all delivery attempts for a channel.
func (c *Client) ListNotificationChannelDeliveryAttempts(
ctx context.Context,
req ListNotificationChannelDeliveryAttemptsRequest,
) ([]NotificationChannelDeliveryAttempt, *string, error) {
return newEntityLister[notificationChannelDeliveryAttemptMapper](c).list(ctx, req)
attempts, nextPageToken, err := newEntityLister[notificationChannelDeliveryAttemptMapper](c).list(ctx, req)
if err != nil {
return nil, nil, err
}

publicAttempts := make([]NotificationChannelDeliveryAttempt, 0, len(attempts))
for _, attempt := range attempts {
publicAttempt, err := attempt.toPublic()
if err != nil {
return nil, nil, err
}
publicAttempts = append(publicAttempts, *publicAttempt)
}

return publicAttempts, nextPageToken, nil
}
Loading
Loading