diff --git a/server/airbyte/runner.go b/server/airbyte/runner.go index 7823054d5..04c9c1db6 100644 --- a/server/airbyte/runner.go +++ b/server/airbyte/runner.go @@ -324,8 +324,7 @@ func saveConfig(airbyteSourceConfig interface{}) (string, string, error) { absoluteFilePath := path.Join(absoluteDirPath, fileName) //write airbyte config as file path - _, err := parsers.ParseJSONAsFile(absoluteFilePath, airbyteSourceConfig) - if err != nil { + if _, err := parsers.ParseJSONAsFile(absoluteFilePath, airbyteSourceConfig); err != nil { return "", "", fmt.Errorf("Error writing airbyte config [%v]: %v", airbyteSourceConfig, err) } diff --git a/server/drivers/airbyte/airbyte.go b/server/drivers/airbyte/airbyte.go index 21169c241..3da4f75d2 100644 --- a/server/drivers/airbyte/airbyte.go +++ b/server/drivers/airbyte/airbyte.go @@ -81,7 +81,7 @@ func NewAirbyte(ctx context.Context, sourceConfig *base.SourceConfig, collection //parse airbyte catalog as file path catalogPath, err := parsers.ParseJSONAsFile(path.Join(pathToConfigs, base.CatalogFileName), config.Catalog) - if err != nil { + if err != nil && err != parsers.ErrValueIsNil { return nil, fmt.Errorf("Error parsing airbyte catalog [%v]: %v", config.Catalog, err) } @@ -93,7 +93,7 @@ func NewAirbyte(ctx context.Context, sourceConfig *base.SourceConfig, collection //parse airbyte state as file path statePath, err := parsers.ParseJSONAsFile(path.Join(pathToConfigs, base.StateFileName), config.InitialState) - if err != nil { + if err != nil && err != parsers.ErrValueIsNil { return nil, fmt.Errorf("Error parsing airbyte initial state [%v]: %v", config.InitialState, err) } @@ -151,10 +151,11 @@ func TestAirbyte(sourceConfig *base.SourceConfig) error { } base.FillPreconfiguredOauth(config.DockerImage, config.Config) airbyteRunner := airbyte.NewRunner(config.DockerImage, config.ImageVersion, "") - err := airbyteRunner.Check(config.Config) - if err != nil { + + if err := airbyteRunner.Check(config.Config); err != nil { return err } + selectedStreamsWithNamespace := selectedStreamsWithNamespace(config) if len(selectedStreamsWithNamespace) > 0 { airbyteRunner = airbyte.NewRunner(config.DockerImage, config.ImageVersion, "") diff --git a/server/drivers/singer/singer.go b/server/drivers/singer/singer.go index 2c0251a01..5245a544c 100644 --- a/server/drivers/singer/singer.go +++ b/server/drivers/singer/singer.go @@ -86,20 +86,19 @@ func NewSinger(ctx context.Context, sourceConfig *base.SourceConfig, collection _, err = os.Stat(configPath) if err != nil { //parse singer config as file path - _, err := parsers.ParseJSONAsFile(configPath, config.Config) - if err != nil { + if _, err := parsers.ParseJSONAsFile(configPath, config.Config); err != nil { return nil, fmt.Errorf("Error parsing singer config [%v]: %v", config.Config, err) } } //parse singer catalog as file path catalogPath, err := parsers.ParseJSONAsFile(path.Join(pathToConfigs, base.CatalogFileName), config.Catalog) - if err != nil { + if err != nil && err != parsers.ErrValueIsNil { return nil, fmt.Errorf("Error parsing singer catalog [%v]: %v", config.Catalog, err) } //parse singer properties as file path propertiesPath, err := parsers.ParseJSONAsFile(path.Join(pathToConfigs, base.PropertiesFileName), config.Properties) - if err != nil { + if err != nil && err != parsers.ErrValueIsNil { return nil, fmt.Errorf("Error parsing singer properties [%v]: %v", config.Properties, err) } @@ -140,7 +139,7 @@ func NewSinger(ctx context.Context, sourceConfig *base.SourceConfig, collection //parse singer state as file path statePath, err := parsers.ParseJSONAsFile(path.Join(pathToConfigs, base.StateFileName), config.InitialState) - if err != nil { + if err != nil && err != parsers.ErrValueIsNil { return nil, fmt.Errorf("Error parsing singer initial state [%v]: %v", config.InitialState, err) } @@ -328,8 +327,7 @@ func (s *Singer) Load(config string, state string, taskLogger logging.TaskLogger } if config != "" { - _, err = parsers.ParseJSONAsFile(s.GetConfigPath(), config) - if err != nil { + if _, err = parsers.ParseJSONAsFile(s.GetConfigPath(), config); err != nil { return fmt.Errorf("Failed to write config loaded from meta storage: %v", err) } } diff --git a/server/parsers/json_files.go b/server/parsers/json_files.go index 60e5caadb..4b93a1840 100644 --- a/server/parsers/json_files.go +++ b/server/parsers/json_files.go @@ -8,12 +8,14 @@ import ( "strings" ) +var ErrValueIsNil = errors.New("value is nil") + //ParseJSONAsFile parses value and write it to a json file //returns path to created json file or return value if it is already path to json file //or empty string if value is nil func ParseJSONAsFile(newPath string, value interface{}) (string, error) { if value == nil { - return "", nil + return "", ErrValueIsNil } switch value.(type) { @@ -26,14 +28,24 @@ func ParseJSONAsFile(newPath string, value interface{}) (string, error) { return newPath, ioutil.WriteFile(newPath, b, 0644) case string: - payload := value.(string) - if strings.HasPrefix(payload, "{") { - return newPath, ioutil.WriteFile(newPath, []byte(payload), 0644) + valueString := value.(string) + if strings.HasPrefix(valueString, "{") { + return newPath, ioutil.WriteFile(newPath, []byte(valueString), 0644) + } + + //check if it is valid json in the filepath + content, err := ioutil.ReadFile(valueString) + if err == nil && len(content) > 0 && strings.HasPrefix(string(content), "{") { + testObj := map[string]interface{}{} + if err := json.Unmarshal(content, &testObj); err != nil { + return "", fmt.Errorf("value in file %s must contain a valid JSON", value) + } + + return valueString, nil } - //already file - return payload, nil + return "", fmt.Errorf("value must be a path to json file or raw json in ParseJSONAsFile(): %v", value) default: - return "", errors.New("Unknown type. Value must be path to json file or raw json") + return "", fmt.Errorf("Unknown type. Value must be a path to json file or raw json: %v (%T)", value, value) } } diff --git a/server/singer/bridge.go b/server/singer/bridge.go index 0b31ce295..9395de9fd 100644 --- a/server/singer/bridge.go +++ b/server/singer/bridge.go @@ -239,7 +239,7 @@ func (b *Bridge) Discover(tap, singerConfigPath string) (*RawCatalog, error) { return catalog, nil } -//saveConfig saves config as file for using +//SaveConfig saves config as file for using //returns absolute file path to generated file func SaveConfig(sourceId string, tap string, singerConfig interface{}) (string, error) { configDir := path.Join(Instance.VenvDir, sourceId, tap) @@ -250,8 +250,7 @@ func SaveConfig(sourceId string, tap string, singerConfig interface{}) (string, absoluteFilePath := path.Join(configDir, base.ConfigFileName) //write singer config as file path - _, err := parsers.ParseJSONAsFile(absoluteFilePath, singerConfig) - if err != nil { + if _, err := parsers.ParseJSONAsFile(absoluteFilePath, singerConfig); err != nil { return "", fmt.Errorf("Error writing singer to %s: %v", absoluteFilePath, err) }