diff --git a/flytecopilot/data/download.go b/flytecopilot/data/download.go index 73d6e3be53..8176989a3c 100644 --- a/flytecopilot/data/download.go +++ b/flytecopilot/data/download.go @@ -366,6 +366,10 @@ func (d Downloader) handleLiteral(ctx context.Context, lit *core.Literal, filePa Collection: c2, }}, nil case *core.Literal_Map: + err := os.MkdirAll(filePath, os.ModePerm) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to create directory [%s]", filePath) + } v, m, err := d.RecursiveDownload(ctx, lit.GetMap(), filePath, writeToFile) if err != nil { return nil, nil, err @@ -387,6 +391,10 @@ func (d Downloader) handleCollection(ctx context.Context, c *core.LiteralCollect litCollection := &core.LiteralCollection{} for i, lit := range c.GetLiterals() { filePath := path.Join(dir, strconv.Itoa(i)) + err := os.MkdirAll(dir, os.ModePerm) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to create directory [%s]", dir) + } v, lit, err := d.handleLiteral(ctx, lit, filePath, writePrimitiveToFile) if err != nil { return nil, nil, err @@ -410,6 +418,16 @@ func (d Downloader) RecursiveDownload(ctx context.Context, inputs *core.LiteralM } f := make(FutureMap, len(inputs.GetLiterals())) for variable, literal := range inputs.GetLiterals() { + if literal.GetOffloadedMetadata() != nil { + offloadedMetadataURI := literal.GetOffloadedMetadata().GetUri() + // literal will be overwritten with the contents of the offloaded data which contains the actual large literal. + if err := d.store.ReadProtobuf(ctx, storage.DataReference(offloadedMetadataURI), literal); err != nil { + errString := fmt.Sprintf("Failed to read the object at location [%s] with error [%s]", offloadedMetadataURI, err) + logger.Error(ctx, errString) + return nil, nil, fmt.Errorf(errString) + } + logger.Infof(ctx, "read object at location [%s]", offloadedMetadataURI) + } varPath := path.Join(dir, variable) lit := literal f[variable] = futures.NewAsyncFuture(childCtx, func(ctx2 context.Context) (interface{}, error) { diff --git a/flytecopilot/data/download_test.go b/flytecopilot/data/download_test.go index b4bee54fc5..03003f9a16 100644 --- a/flytecopilot/data/download_test.go +++ b/flytecopilot/data/download_test.go @@ -152,3 +152,150 @@ func TestHandleBlobHTTP(t *testing.T) { t.Errorf("expected file %s to exist", toPath) } } + +func TestRecursiveDownload(t *testing.T) { + t.Run("OffloadedMetadataContainsCollectionOfStrings", func(t *testing.T) { + s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) + assert.NoError(t, err) + + d := Downloader{store: s} + + offloadedLiteral := &core.Literal{ + Value: &core.Literal_OffloadedMetadata{ + OffloadedMetadata: &core.LiteralOffloadedMetadata{ + Uri: "s3://container/offloaded", + }, + }, + } + + inputs := &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "input1": offloadedLiteral, + }, + } + + // Mock reading the offloaded metadata + err = s.WriteProtobuf(context.Background(), storage.DataReference("s3://container/offloaded"), storage.Options{}, &core.Literal{ + Value: &core.Literal_Collection{ + Collection: &core.LiteralCollection{ + Literals: []*core.Literal{ + { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_StringValue{ + StringValue: "string1", + }, + }, + }, + }, + }, + }, + { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_StringValue{ + StringValue: "string2", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }) + assert.NoError(t, err) + + toPath := "./inputs" + defer func() { + err := os.RemoveAll(toPath) + if err != nil { + t.Errorf("Failed to delete directory: %v", err) + } + }() + + varMap, lMap, err := d.RecursiveDownload(context.Background(), inputs, toPath, true) + assert.NoError(t, err) + assert.NotNil(t, varMap) + assert.NotNil(t, lMap) + assert.Equal(t, []interface{}{"string1", "string2"}, varMap["input1"]) + }) + + t.Run("OffloadedMetadataContainsMapOfStringString", func(t *testing.T) { + s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) + assert.NoError(t, err) + + d := Downloader{store: s} + + offloadedLiteral := &core.Literal{ + Value: &core.Literal_OffloadedMetadata{ + OffloadedMetadata: &core.LiteralOffloadedMetadata{ + Uri: "s3://container/offloaded", + }, + }, + } + + inputs := &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "input1": offloadedLiteral, + }, + } + + // Mock reading the offloaded metadata + err = s.WriteProtobuf(context.Background(), storage.DataReference("s3://container/offloaded"), storage.Options{}, &core.Literal{ + Value: &core.Literal_Map{ + Map: &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "key1": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_StringValue{ + StringValue: "value1", + }, + }, + }, + }, + }, + }, + "key2": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_StringValue{ + StringValue: "value2", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }) + assert.NoError(t, err) + + toPath := "./inputs" + defer func() { + err := os.RemoveAll(toPath) + if err != nil { + t.Errorf("Failed to delete directory: %v", err) + } + }() + + varMap, lMap, err := d.RecursiveDownload(context.Background(), inputs, toPath, true) + assert.NoError(t, err) + assert.NotNil(t, varMap) + assert.NotNil(t, lMap) + assert.Equal(t, "value1", varMap["input1"].(VarMap)["key1"]) + assert.Equal(t, "value2", varMap["input1"].(VarMap)["key2"]) + }) +}