|
| 1 | +// SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC |
| 2 | +// SPDX-License-Identifier: Apache-2.0 |
| 3 | + |
| 4 | +package chrysom |
| 5 | + |
| 6 | +import ( |
| 7 | + "bytes" |
| 8 | + "context" |
| 9 | + "encoding/json" |
| 10 | + "errors" |
| 11 | + "fmt" |
| 12 | + "io" |
| 13 | + "net/http" |
| 14 | + |
| 15 | + "github.com/xmidt-org/ancla/model" |
| 16 | + "github.com/xmidt-org/bascule/acquire" |
| 17 | + "go.uber.org/zap" |
| 18 | +) |
| 19 | + |
| 20 | +// Request and Response Headers. |
| 21 | +const ( |
| 22 | + ItemOwnerHeaderKey = "X-Xmidt-Owner" |
| 23 | + XmidtErrorHeaderKey = "X-Xmidt-Error" |
| 24 | +) |
| 25 | + |
| 26 | +var ( |
| 27 | + ErrNilMeasures = errors.New("measures cannot be nil") |
| 28 | + ErrAddressEmpty = errors.New("argus address is required") |
| 29 | + ErrBucketEmpty = errors.New("bucket name is required") |
| 30 | + ErrItemIDEmpty = errors.New("item ID is required") |
| 31 | + ErrItemDataEmpty = errors.New("data field in item is required") |
| 32 | + ErrUndefinedIntervalTicker = errors.New("interval ticker is nil. Can't listen for updates") |
| 33 | + ErrAuthAcquirerFailure = errors.New("failed acquiring auth token") |
| 34 | + ErrBadRequest = errors.New("argus rejected the request as invalid") |
| 35 | +) |
| 36 | + |
| 37 | +var ( |
| 38 | + errNonSuccessResponse = errors.New("argus responded with a non-success status code") |
| 39 | + errNewRequestFailure = errors.New("failed creating an HTTP request") |
| 40 | + errDoRequestFailure = errors.New("http client failed while sending request") |
| 41 | + errReadingBodyFailure = errors.New("failed while reading http response body") |
| 42 | + errJSONUnmarshal = errors.New("failed unmarshaling JSON response payload") |
| 43 | + errJSONMarshal = errors.New("failed marshaling item as JSON payload") |
| 44 | +) |
| 45 | + |
| 46 | +// BasicClientConfig contains config data for the client that will be used to |
| 47 | +// make requests to the Argus client. |
| 48 | +type BasicClientConfig struct { |
| 49 | + // Address is the Argus URL (i.e. https://example-argus.io:8090) |
| 50 | + Address string |
| 51 | + |
| 52 | + // Bucket partition to be used by this client. |
| 53 | + Bucket string |
| 54 | + |
| 55 | + // HTTPClient refers to the client that will be used to send requests. |
| 56 | + // (Optional) Defaults to http.DefaultClient. |
| 57 | + HTTPClient *http.Client |
| 58 | + |
| 59 | + // Auth provides the mechanism to add auth headers to outgoing requests. |
| 60 | + // (Optional) If not provided, no auth headers are added. |
| 61 | + Auth Auth |
| 62 | +} |
| 63 | + |
| 64 | +// BasicClient is the client used to make requests to Argus. |
| 65 | +type BasicClient struct { |
| 66 | + client *http.Client |
| 67 | + auth acquire.Acquirer |
| 68 | + storeBaseURL string |
| 69 | + bucket string |
| 70 | + getLogger func(context.Context) *zap.Logger |
| 71 | +} |
| 72 | + |
| 73 | +// Auth contains authorization data for requests to Argus. |
| 74 | +type Auth struct { |
| 75 | + JWT acquire.RemoteBearerTokenAcquirerOptions |
| 76 | + Basic string |
| 77 | +} |
| 78 | + |
| 79 | +type response struct { |
| 80 | + Body []byte |
| 81 | + ArgusErrorHeader string |
| 82 | + Code int |
| 83 | +} |
| 84 | + |
| 85 | +const ( |
| 86 | + storeAPIPath = "/api/v1/store" |
| 87 | + errWrappedFmt = "%w: %s" |
| 88 | + errStatusCodeFmt = "%w: received status %v" |
| 89 | + errorHeaderKey = "errorHeader" |
| 90 | +) |
| 91 | + |
| 92 | +// Items is a slice of model.Item(s) . |
| 93 | +type Items []model.Item |
| 94 | + |
| 95 | +// NewBasicClient creates a new BasicClient that can be used to |
| 96 | +// make requests to Argus. |
| 97 | +func NewBasicClient(config BasicClientConfig, |
| 98 | + getLogger func(context.Context) *zap.Logger) (*BasicClient, error) { |
| 99 | + err := validateBasicConfig(&config) |
| 100 | + if err != nil { |
| 101 | + return nil, err |
| 102 | + } |
| 103 | + |
| 104 | + tokenAcquirer, err := buildTokenAcquirer(config.Auth) |
| 105 | + if err != nil { |
| 106 | + return nil, err |
| 107 | + } |
| 108 | + clientStore := &BasicClient{ |
| 109 | + client: config.HTTPClient, |
| 110 | + auth: tokenAcquirer, |
| 111 | + bucket: config.Bucket, |
| 112 | + storeBaseURL: config.Address + storeAPIPath, |
| 113 | + getLogger: getLogger, |
| 114 | + } |
| 115 | + |
| 116 | + return clientStore, nil |
| 117 | +} |
| 118 | + |
| 119 | +// GetItems fetches all items that belong to a given owner. |
| 120 | +func (c *BasicClient) GetItems(ctx context.Context, owner string) (Items, error) { |
| 121 | + response, err := c.sendRequest(ctx, owner, http.MethodGet, fmt.Sprintf("%s/%s", c.storeBaseURL, c.bucket), nil) |
| 122 | + if err != nil { |
| 123 | + return nil, err |
| 124 | + } |
| 125 | + |
| 126 | + if response.Code != http.StatusOK { |
| 127 | + c.getLogger(ctx).Error("Argus responded with non-200 response for GetItems request", |
| 128 | + zap.Int("code", response.Code), zap.String(errorHeaderKey, response.ArgusErrorHeader)) |
| 129 | + return nil, fmt.Errorf(errStatusCodeFmt, translateNonSuccessStatusCode(response.Code), response.Code) |
| 130 | + } |
| 131 | + |
| 132 | + var items Items |
| 133 | + |
| 134 | + err = json.Unmarshal(response.Body, &items) |
| 135 | + if err != nil { |
| 136 | + return nil, fmt.Errorf("GetItems: %w: %s", errJSONUnmarshal, err.Error()) |
| 137 | + } |
| 138 | + |
| 139 | + return items, nil |
| 140 | +} |
| 141 | + |
| 142 | +// PushItem creates a new item if one doesn't already exist. If an item exists |
| 143 | +// and the ownership matches, the item is simply updated. |
| 144 | +func (c *BasicClient) PushItem(ctx context.Context, owner string, item model.Item) (PushResult, error) { |
| 145 | + err := validatePushItemInput(owner, item) |
| 146 | + if err != nil { |
| 147 | + return NilPushResult, err |
| 148 | + } |
| 149 | + |
| 150 | + data, err := json.Marshal(item) |
| 151 | + if err != nil { |
| 152 | + return NilPushResult, fmt.Errorf(errWrappedFmt, errJSONMarshal, err.Error()) |
| 153 | + } |
| 154 | + |
| 155 | + response, err := c.sendRequest(ctx, owner, http.MethodPut, fmt.Sprintf("%s/%s/%s", c.storeBaseURL, c.bucket, item.ID), bytes.NewReader(data)) |
| 156 | + if err != nil { |
| 157 | + return NilPushResult, err |
| 158 | + } |
| 159 | + |
| 160 | + if response.Code == http.StatusCreated { |
| 161 | + return CreatedPushResult, nil |
| 162 | + } |
| 163 | + |
| 164 | + if response.Code == http.StatusOK { |
| 165 | + return UpdatedPushResult, nil |
| 166 | + } |
| 167 | + |
| 168 | + c.getLogger(ctx).Error("Argus responded with a non-successful status code for a PushItem request", |
| 169 | + zap.Int("code", response.Code), zap.String(errorHeaderKey, response.ArgusErrorHeader)) |
| 170 | + |
| 171 | + return NilPushResult, fmt.Errorf(errStatusCodeFmt, translateNonSuccessStatusCode(response.Code), response.Code) |
| 172 | +} |
| 173 | + |
| 174 | +// RemoveItem removes the item if it exists and returns the data associated to it. |
| 175 | +func (c *BasicClient) RemoveItem(ctx context.Context, id, owner string) (model.Item, error) { |
| 176 | + if len(id) < 1 { |
| 177 | + return model.Item{}, ErrItemIDEmpty |
| 178 | + } |
| 179 | + |
| 180 | + resp, err := c.sendRequest(ctx, owner, http.MethodDelete, fmt.Sprintf("%s/%s/%s", c.storeBaseURL, c.bucket, id), nil) |
| 181 | + if err != nil { |
| 182 | + return model.Item{}, err |
| 183 | + } |
| 184 | + |
| 185 | + if resp.Code != http.StatusOK { |
| 186 | + c.getLogger(ctx).Error("Argus responded with a non-successful status code for a RemoveItem request", |
| 187 | + zap.Int("code", resp.Code), zap.String(errorHeaderKey, resp.ArgusErrorHeader)) |
| 188 | + return model.Item{}, fmt.Errorf(errStatusCodeFmt, translateNonSuccessStatusCode(resp.Code), resp.Code) |
| 189 | + } |
| 190 | + |
| 191 | + var item model.Item |
| 192 | + err = json.Unmarshal(resp.Body, &item) |
| 193 | + if err != nil { |
| 194 | + return item, fmt.Errorf("RemoveItem: %w: %s", errJSONUnmarshal, err.Error()) |
| 195 | + } |
| 196 | + return item, nil |
| 197 | +} |
| 198 | + |
| 199 | +func validatePushItemInput(_ string, item model.Item) error { |
| 200 | + if len(item.ID) < 1 { |
| 201 | + return ErrItemIDEmpty |
| 202 | + } |
| 203 | + |
| 204 | + if len(item.Data) < 1 { |
| 205 | + return ErrItemDataEmpty |
| 206 | + } |
| 207 | + |
| 208 | + return nil |
| 209 | +} |
| 210 | + |
| 211 | +func (c *BasicClient) sendRequest(ctx context.Context, owner, method, url string, body io.Reader) (response, error) { |
| 212 | + r, err := http.NewRequestWithContext(ctx, method, url, body) |
| 213 | + if err != nil { |
| 214 | + return response{}, fmt.Errorf(errWrappedFmt, errNewRequestFailure, err.Error()) |
| 215 | + } |
| 216 | + err = acquire.AddAuth(r, c.auth) |
| 217 | + if err != nil { |
| 218 | + return response{}, fmt.Errorf(errWrappedFmt, ErrAuthAcquirerFailure, err.Error()) |
| 219 | + } |
| 220 | + if len(owner) > 0 { |
| 221 | + r.Header.Set(ItemOwnerHeaderKey, owner) |
| 222 | + } |
| 223 | + resp, err := c.client.Do(r) |
| 224 | + if err != nil { |
| 225 | + return response{}, fmt.Errorf(errWrappedFmt, errDoRequestFailure, err.Error()) |
| 226 | + } |
| 227 | + defer resp.Body.Close() |
| 228 | + var sqResp = response{ |
| 229 | + Code: resp.StatusCode, |
| 230 | + ArgusErrorHeader: resp.Header.Get(XmidtErrorHeaderKey), |
| 231 | + } |
| 232 | + bodyBytes, err := io.ReadAll(resp.Body) |
| 233 | + if err != nil { |
| 234 | + return sqResp, fmt.Errorf(errWrappedFmt, errReadingBodyFailure, err.Error()) |
| 235 | + } |
| 236 | + sqResp.Body = bodyBytes |
| 237 | + return sqResp, nil |
| 238 | +} |
| 239 | + |
| 240 | +func isEmpty(options acquire.RemoteBearerTokenAcquirerOptions) bool { |
| 241 | + return len(options.AuthURL) < 1 || options.Buffer == 0 || options.Timeout == 0 |
| 242 | +} |
| 243 | + |
| 244 | +// translateNonSuccessStatusCode returns as specific error |
| 245 | +// for known Argus status codes. |
| 246 | +func translateNonSuccessStatusCode(code int) error { |
| 247 | + switch code { |
| 248 | + case http.StatusBadRequest: |
| 249 | + return ErrBadRequest |
| 250 | + case http.StatusUnauthorized, http.StatusForbidden: |
| 251 | + return ErrFailedAuthentication |
| 252 | + default: |
| 253 | + return errNonSuccessResponse |
| 254 | + } |
| 255 | +} |
| 256 | + |
| 257 | +func buildTokenAcquirer(auth Auth) (acquire.Acquirer, error) { |
| 258 | + if !isEmpty(auth.JWT) { |
| 259 | + return acquire.NewRemoteBearerTokenAcquirer(auth.JWT) |
| 260 | + } else if len(auth.Basic) > 0 { |
| 261 | + return acquire.NewFixedAuthAcquirer(auth.Basic) |
| 262 | + } |
| 263 | + return &acquire.DefaultAcquirer{}, nil |
| 264 | +} |
| 265 | + |
| 266 | +func validateBasicConfig(config *BasicClientConfig) error { |
| 267 | + if config.Address == "" { |
| 268 | + return ErrAddressEmpty |
| 269 | + } |
| 270 | + |
| 271 | + if config.Bucket == "" { |
| 272 | + return ErrBucketEmpty |
| 273 | + } |
| 274 | + |
| 275 | + if config.HTTPClient == nil { |
| 276 | + config.HTTPClient = http.DefaultClient |
| 277 | + } |
| 278 | + |
| 279 | + return nil |
| 280 | +} |
0 commit comments