Skip to content

Commit

Permalink
Handle case when Airbyte config does not contain valid json and add l…
Browse files Browse the repository at this point in the history
…ogs (#801)

- handled case when Airbyte config does not contain valid JSON
- added more logs
  • Loading branch information
Sergey Burykin authored Feb 7, 2022
1 parent 79d1347 commit 79a022c
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 23 deletions.
3 changes: 1 addition & 2 deletions server/airbyte/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,7 @@ func saveConfig(airbyteSourceConfig interface{}) (string, string, error) {

absoluteFilePath := path.Join(absoluteDirPath, fileName)
//write airbyte config as file path
_, err := parsers.ParseJSONAsFile(absoluteFilePath, airbyteSourceConfig)
if err != nil {
if _, err := parsers.ParseJSONAsFile(absoluteFilePath, airbyteSourceConfig); err != nil {
return "", "", fmt.Errorf("Error writing airbyte config [%v]: %v", airbyteSourceConfig, err)
}

Expand Down
9 changes: 5 additions & 4 deletions server/drivers/airbyte/airbyte.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func NewAirbyte(ctx context.Context, sourceConfig *base.SourceConfig, collection

//parse airbyte catalog as file path
catalogPath, err := parsers.ParseJSONAsFile(path.Join(pathToConfigs, base.CatalogFileName), config.Catalog)
if err != nil {
if err != nil && err != parsers.ErrValueIsNil {
return nil, fmt.Errorf("Error parsing airbyte catalog [%v]: %v", config.Catalog, err)
}

Expand All @@ -93,7 +93,7 @@ func NewAirbyte(ctx context.Context, sourceConfig *base.SourceConfig, collection

//parse airbyte state as file path
statePath, err := parsers.ParseJSONAsFile(path.Join(pathToConfigs, base.StateFileName), config.InitialState)
if err != nil {
if err != nil && err != parsers.ErrValueIsNil {
return nil, fmt.Errorf("Error parsing airbyte initial state [%v]: %v", config.InitialState, err)
}

Expand Down Expand Up @@ -151,10 +151,11 @@ func TestAirbyte(sourceConfig *base.SourceConfig) error {
}
base.FillPreconfiguredOauth(config.DockerImage, config.Config)
airbyteRunner := airbyte.NewRunner(config.DockerImage, config.ImageVersion, "")
err := airbyteRunner.Check(config.Config)
if err != nil {

if err := airbyteRunner.Check(config.Config); err != nil {
return err
}

selectedStreamsWithNamespace := selectedStreamsWithNamespace(config)
if len(selectedStreamsWithNamespace) > 0 {
airbyteRunner = airbyte.NewRunner(config.DockerImage, config.ImageVersion, "")
Expand Down
12 changes: 5 additions & 7 deletions server/drivers/singer/singer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,19 @@ func NewSinger(ctx context.Context, sourceConfig *base.SourceConfig, collection
_, err = os.Stat(configPath)
if err != nil {
//parse singer config as file path
_, err := parsers.ParseJSONAsFile(configPath, config.Config)
if err != nil {
if _, err := parsers.ParseJSONAsFile(configPath, config.Config); err != nil {
return nil, fmt.Errorf("Error parsing singer config [%v]: %v", config.Config, err)
}
}
//parse singer catalog as file path
catalogPath, err := parsers.ParseJSONAsFile(path.Join(pathToConfigs, base.CatalogFileName), config.Catalog)
if err != nil {
if err != nil && err != parsers.ErrValueIsNil {
return nil, fmt.Errorf("Error parsing singer catalog [%v]: %v", config.Catalog, err)
}

//parse singer properties as file path
propertiesPath, err := parsers.ParseJSONAsFile(path.Join(pathToConfigs, base.PropertiesFileName), config.Properties)
if err != nil {
if err != nil && err != parsers.ErrValueIsNil {
return nil, fmt.Errorf("Error parsing singer properties [%v]: %v", config.Properties, err)
}

Expand Down Expand Up @@ -140,7 +139,7 @@ func NewSinger(ctx context.Context, sourceConfig *base.SourceConfig, collection

//parse singer state as file path
statePath, err := parsers.ParseJSONAsFile(path.Join(pathToConfigs, base.StateFileName), config.InitialState)
if err != nil {
if err != nil && err != parsers.ErrValueIsNil {
return nil, fmt.Errorf("Error parsing singer initial state [%v]: %v", config.InitialState, err)
}

Expand Down Expand Up @@ -328,8 +327,7 @@ func (s *Singer) Load(config string, state string, taskLogger logging.TaskLogger
}

if config != "" {
_, err = parsers.ParseJSONAsFile(s.GetConfigPath(), config)
if err != nil {
if _, err = parsers.ParseJSONAsFile(s.GetConfigPath(), config); err != nil {
return fmt.Errorf("Failed to write config loaded from meta storage: %v", err)
}
}
Expand Down
26 changes: 19 additions & 7 deletions server/parsers/json_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"strings"
)

var ErrValueIsNil = errors.New("value is nil")

//ParseJSONAsFile parses value and write it to a json file
//returns path to created json file or return value if it is already path to json file
//or empty string if value is nil
func ParseJSONAsFile(newPath string, value interface{}) (string, error) {
if value == nil {
return "", nil
return "", ErrValueIsNil
}

switch value.(type) {
Expand All @@ -26,14 +28,24 @@ func ParseJSONAsFile(newPath string, value interface{}) (string, error) {

return newPath, ioutil.WriteFile(newPath, b, 0644)
case string:
payload := value.(string)
if strings.HasPrefix(payload, "{") {
return newPath, ioutil.WriteFile(newPath, []byte(payload), 0644)
valueString := value.(string)
if strings.HasPrefix(valueString, "{") {
return newPath, ioutil.WriteFile(newPath, []byte(valueString), 0644)
}

//check if it is valid json in the filepath
content, err := ioutil.ReadFile(valueString)
if err == nil && len(content) > 0 && strings.HasPrefix(string(content), "{") {
testObj := map[string]interface{}{}
if err := json.Unmarshal(content, &testObj); err != nil {
return "", fmt.Errorf("value in file %s must contain a valid JSON", value)
}

return valueString, nil
}

//already file
return payload, nil
return "", fmt.Errorf("value must be a path to json file or raw json in ParseJSONAsFile(): %v", value)
default:
return "", errors.New("Unknown type. Value must be path to json file or raw json")
return "", fmt.Errorf("Unknown type. Value must be a path to json file or raw json: %v (%T)", value, value)
}
}
5 changes: 2 additions & 3 deletions server/singer/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (b *Bridge) Discover(tap, singerConfigPath string) (*RawCatalog, error) {
return catalog, nil
}

//saveConfig saves config as file for using
//SaveConfig saves config as file for using
//returns absolute file path to generated file
func SaveConfig(sourceId string, tap string, singerConfig interface{}) (string, error) {
configDir := path.Join(Instance.VenvDir, sourceId, tap)
Expand All @@ -250,8 +250,7 @@ func SaveConfig(sourceId string, tap string, singerConfig interface{}) (string,

absoluteFilePath := path.Join(configDir, base.ConfigFileName)
//write singer config as file path
_, err := parsers.ParseJSONAsFile(absoluteFilePath, singerConfig)
if err != nil {
if _, err := parsers.ParseJSONAsFile(absoluteFilePath, singerConfig); err != nil {
return "", fmt.Errorf("Error writing singer to %s: %v", absoluteFilePath, err)
}

Expand Down

0 comments on commit 79a022c

Please sign in to comment.