Skip to content

Commit

Permalink
Adding support for downloading offloaded literal in copilot
Browse files Browse the repository at this point in the history
Signed-off-by: pmahindrakar-oss <prafulla.mahindrakar@gmail.com>
  • Loading branch information
pmahindrakar-oss committed Nov 26, 2024
1 parent 25c89ee commit aa97b4d
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 0 deletions.
18 changes: 18 additions & 0 deletions flytecopilot/data/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,10 @@ func (d Downloader) handleLiteral(ctx context.Context, lit *core.Literal, filePa
Collection: c2,
}}, nil
case *core.Literal_Map:
err := os.MkdirAll(filePath, os.ModePerm)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to create directory [%s]", filePath)
}

Check warning on line 372 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L371-L372

Added lines #L371 - L372 were not covered by tests
v, m, err := d.RecursiveDownload(ctx, lit.GetMap(), filePath, writeToFile)
if err != nil {
return nil, nil, err
Expand All @@ -387,6 +391,10 @@ func (d Downloader) handleCollection(ctx context.Context, c *core.LiteralCollect
litCollection := &core.LiteralCollection{}
for i, lit := range c.GetLiterals() {
filePath := path.Join(dir, strconv.Itoa(i))
err := os.MkdirAll(dir, os.ModePerm)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to create directory [%s]", dir)
}

Check warning on line 397 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L396-L397

Added lines #L396 - L397 were not covered by tests
v, lit, err := d.handleLiteral(ctx, lit, filePath, writePrimitiveToFile)
if err != nil {
return nil, nil, err
Expand All @@ -410,6 +418,16 @@ func (d Downloader) RecursiveDownload(ctx context.Context, inputs *core.LiteralM
}
f := make(FutureMap, len(inputs.GetLiterals()))
for variable, literal := range inputs.GetLiterals() {
if literal.GetOffloadedMetadata() != nil {
offloadedMetadataURI := literal.GetOffloadedMetadata().GetUri()
// literal will be overwritten with the contents of the offloaded data which contains the actual large literal.
if err := d.store.ReadProtobuf(ctx, storage.DataReference(offloadedMetadataURI), literal); err != nil {
errString := fmt.Sprintf("Failed to read the object at location [%s] with error [%s]", offloadedMetadataURI, err)
logger.Error(ctx, errString)
return nil, nil, fmt.Errorf(errString)
}

Check warning on line 428 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L425-L428

Added lines #L425 - L428 were not covered by tests
logger.Infof(ctx, "read object at location [%s]", offloadedMetadataURI)
}
varPath := path.Join(dir, variable)
lit := literal
f[variable] = futures.NewAsyncFuture(childCtx, func(ctx2 context.Context) (interface{}, error) {
Expand Down
147 changes: 147 additions & 0 deletions flytecopilot/data/download_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,150 @@ func TestHandleBlobHTTP(t *testing.T) {
t.Errorf("expected file %s to exist", toPath)
}
}

func TestRecursiveDownload(t *testing.T) {
t.Run("OffloadedMetadataContainsCollectionOfStrings", func(t *testing.T) {
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)

d := Downloader{store: s}

offloadedLiteral := &core.Literal{
Value: &core.Literal_OffloadedMetadata{
OffloadedMetadata: &core.LiteralOffloadedMetadata{
Uri: "s3://container/offloaded",
},
},
}

inputs := &core.LiteralMap{
Literals: map[string]*core.Literal{
"input1": offloadedLiteral,
},
}

// Mock reading the offloaded metadata
err = s.WriteProtobuf(context.Background(), storage.DataReference("s3://container/offloaded"), storage.Options{}, &core.Literal{
Value: &core.Literal_Collection{
Collection: &core.LiteralCollection{
Literals: []*core.Literal{
{
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Primitive{
Primitive: &core.Primitive{
Value: &core.Primitive_StringValue{
StringValue: "string1",
},
},
},
},
},
},
{
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Primitive{
Primitive: &core.Primitive{
Value: &core.Primitive_StringValue{
StringValue: "string2",
},
},
},
},
},
},
},
},
},
})
assert.NoError(t, err)

toPath := "./inputs"
defer func() {
err := os.RemoveAll(toPath)
if err != nil {
t.Errorf("Failed to delete directory: %v", err)
}
}()

varMap, lMap, err := d.RecursiveDownload(context.Background(), inputs, toPath, true)
assert.NoError(t, err)
assert.NotNil(t, varMap)
assert.NotNil(t, lMap)
assert.Equal(t, []interface{}{"string1", "string2"}, varMap["input1"])
})

t.Run("OffloadedMetadataContainsMapOfStringString", func(t *testing.T) {
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)

d := Downloader{store: s}

offloadedLiteral := &core.Literal{
Value: &core.Literal_OffloadedMetadata{
OffloadedMetadata: &core.LiteralOffloadedMetadata{
Uri: "s3://container/offloaded",
},
},
}

inputs := &core.LiteralMap{
Literals: map[string]*core.Literal{
"input1": offloadedLiteral,
},
}

// Mock reading the offloaded metadata
err = s.WriteProtobuf(context.Background(), storage.DataReference("s3://container/offloaded"), storage.Options{}, &core.Literal{
Value: &core.Literal_Map{
Map: &core.LiteralMap{
Literals: map[string]*core.Literal{
"key1": {
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Primitive{
Primitive: &core.Primitive{
Value: &core.Primitive_StringValue{
StringValue: "value1",
},
},
},
},
},
},
"key2": {
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Primitive{
Primitive: &core.Primitive{
Value: &core.Primitive_StringValue{
StringValue: "value2",
},
},
},
},
},
},
},
},
},
})
assert.NoError(t, err)

toPath := "./inputs"
defer func() {
err := os.RemoveAll(toPath)
if err != nil {
t.Errorf("Failed to delete directory: %v", err)
}
}()

varMap, lMap, err := d.RecursiveDownload(context.Background(), inputs, toPath, true)
assert.NoError(t, err)
assert.NotNil(t, varMap)
assert.NotNil(t, lMap)
assert.Equal(t, "value1", varMap["input1"].(VarMap)["key1"])
assert.Equal(t, "value2", varMap["input1"].(VarMap)["key2"])
})
}

0 comments on commit aa97b4d

Please sign in to comment.