Skip to content

Commit

Permalink
feat(worker): add/set/delete repo worker tags (#2236)
Browse files Browse the repository at this point in the history
implements repo changes for add/set/delete worker tag operations
  • Loading branch information
A-440Hz authored Jul 5, 2022
1 parent fd5d15a commit 83bf38f
Show file tree
Hide file tree
Showing 4 changed files with 760 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,6 @@ message WorkerTag {
string value = 30;

// source is the source of the tag. Either 'configuration' or 'api'.
// @inject_tag: `gorm:"default:not_null"`
// @inject_tag: `gorm:"primary_key"`
string source = 40;
}
155 changes: 154 additions & 1 deletion internal/server/repository_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func setWorkerTags(ctx context.Context, w db.Writer, id string, ts TagSource, ta
}
_, err := w.Exec(ctx, deleteTagsByWorkerIdSql, []interface{}{ts.String(), id})
if err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg(fmt.Sprintf("couldn't delete exist tags for worker %q", id)))
return errors.Wrap(ctx, err, op, errors.WithMsg(fmt.Sprintf("couldn't delete existing tags for worker %q", id)))
}

// If tags were cleared out entirely, then we'll have nothing
Expand Down Expand Up @@ -558,3 +558,156 @@ func (r *Repository) CreateWorker(ctx context.Context, worker *Worker, opt ...Op
}
return returnedWorker, nil
}

// AddWorkerTags adds specified api tags to the repo worker and returns its new tags.
// No options are currently supported.
func (r *Repository) AddWorkerTags(ctx context.Context, workerId string, workerVersion uint32, tags []*Tag, _ ...Option) ([]*Tag, error) {
const op = "server.(Repository).AddWorkerTags"
switch {
case workerId == "":
return nil, errors.New(ctx, errors.InvalidParameter, op, "worker public id is empty")
case workerVersion == 0:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing version")
case len(tags) == 0:
return nil, errors.New(ctx, errors.InvalidParameter, op, "no tags provided")
}

worker, err := lookupWorker(ctx, r.reader, workerId)
if err != nil {
return nil, errors.Wrap(ctx, err, op)
}
if worker == nil {
return nil, errors.New(ctx, errors.InvalidParameter, op, fmt.Sprintf("no worker found with public id %s", workerId))
}

newTags := worker.apiTags
for _, t := range tags {
newTags = append(newTags, t)
}
_, err = r.writer.DoTx(ctx, db.StdRetryCnt, db.ExpBackoff{}, func(reader db.Reader, w db.Writer) error {
worker := worker.clone()
worker.PublicId = workerId
worker.Version = workerVersion + 1
rowsUpdated, err := w.Update(ctx, worker, []string{"Version"}, nil, db.WithVersion(&workerVersion))
if err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to update worker version"))
}
if rowsUpdated != 1 {
return errors.New(ctx, errors.MultipleRecords, op, fmt.Sprintf("updated worker version and %d rows updated", rowsUpdated))
}
err = setWorkerTags(ctx, w, workerId, ApiTagSource, newTags)
if err != nil {
return errors.Wrap(ctx, err, op)
}
return nil
})
if err != nil {
return nil, errors.Wrap(ctx, err, op)
}
return newTags, nil
}

// SetWorkerTags clears the current repo worker's api tags and sets them from the input parameters.
// Returns the current repo worker tags. No options are currently supported.
func (r *Repository) SetWorkerTags(ctx context.Context, workerId string, workerVersion uint32, tags []*Tag, _ ...Option) ([]*Tag, error) {
const op = "server.(Repository).SetWorkerTags"
switch {
case workerId == "":
return nil, errors.New(ctx, errors.InvalidParameter, op, "worker public id is empty")
case workerVersion == 0:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing version")
}

worker, err := lookupWorker(ctx, r.reader, workerId)
if err != nil {
return nil, errors.Wrap(ctx, err, op)
}
if worker == nil {
return nil, errors.New(ctx, errors.InvalidParameter, op, fmt.Sprintf("no worker found with public id %s", workerId))
}

_, err = r.writer.DoTx(ctx, db.StdRetryCnt, db.ExpBackoff{}, func(reader db.Reader, w db.Writer) error {
worker := worker.clone()
worker.PublicId = workerId
worker.Version = workerVersion + 1
rowsUpdated, err := w.Update(ctx, worker, []string{"Version"}, nil, db.WithVersion(&workerVersion))
if err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to update worker version"))
}
if rowsUpdated != 1 {
return errors.New(ctx, errors.MultipleRecords, op, fmt.Sprintf("updated worker version and %d rows updated", rowsUpdated))
}
err = setWorkerTags(ctx, w, workerId, ApiTagSource, tags)
if err != nil {
return errors.Wrap(ctx, err, op)
}
return nil
})
if err != nil {
return nil, errors.Wrap(ctx, err, op)
}
return tags, nil
}

// DeleteWorkerTags deletes specified api worker tags from the repo. Returns the number of rows deleted.
// No options are currently supported.
func (r *Repository) DeleteWorkerTags(ctx context.Context, workerId string, workerVersion uint32, tags []*Tag, _ ...Option) (int, error) {
const op = "server.(Repository).DeleteWorkerTags"
switch {
case workerId == "":
return db.NoRowsAffected, errors.New(ctx, errors.InvalidParameter, op, "worker public id is empty")
case workerVersion == 0:
return db.NoRowsAffected, errors.New(ctx, errors.InvalidParameter, op, "missing version")
case len(tags) == 0:
return db.NoRowsAffected, errors.New(ctx, errors.InvalidParameter, op, "no tags provided")
}

worker, err := lookupWorker(ctx, r.reader, workerId)
if err != nil {
return db.NoRowsAffected, errors.Wrap(ctx, err, op)
}
if worker == nil {
return db.NoRowsAffected, errors.New(ctx, errors.InvalidParameter, op, fmt.Sprintf("no worker found with public id %s", workerId))
}

rowsDeleted := 0
deleteTags := make([]interface{}, 0, len(tags))
for _, t := range tags {
if t == nil {
return db.NoRowsAffected, errors.New(ctx, errors.InvalidParameter, op, "found nil tag value in input")
}
deleteTags = append(deleteTags, &store.WorkerTag{
WorkerId: workerId,
Key: t.Key,
Value: t.Value,
Source: ApiTagSource.String(),
})
}

_, err = r.writer.DoTx(ctx, db.StdRetryCnt, db.ExpBackoff{}, func(reader db.Reader, w db.Writer) error {
worker := worker.clone()
worker.PublicId = workerId
worker.Version = workerVersion + 1
rowsUpdated, err := w.Update(ctx, worker, []string{"Version"}, nil, db.WithVersion(&workerVersion))
if err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to update worker version"))
}
if rowsUpdated != 1 {
return errors.New(ctx, errors.MultipleRecords, op, fmt.Sprintf("updated worker version and %d rows updated", rowsUpdated))
}

rowsDeleted, err = w.DeleteItems(ctx, deleteTags)
if err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to delete worker tags"))
}
if rowsDeleted != len(deleteTags) {
return errors.New(ctx, errors.MultipleRecords, op, fmt.Sprintf("tags deleted %d did not match request for %d", rowsDeleted, len(tags)))
}
return nil
})

if err != nil {
return db.NoRowsAffected, errors.Wrap(ctx, err, op)
}
return rowsDeleted, nil
}
4 changes: 2 additions & 2 deletions internal/server/store/worker.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 83bf38f

Please sign in to comment.