Skip to content

Commit 960f4c2

Browse files
authored
feat(vdp): upload raw inputs for run log (#904)
Because - now, the run log inputs are raw data but not blob storage - we want the inputs are download urls rather than dataURI This commit - move upload pipeline run inputs from worker to service - remove the worker upload inputs function - add the service upload inputs function
1 parent b9298e5 commit 960f4c2

File tree

15 files changed

+527
-281
lines changed

15 files changed

+527
-281
lines changed

cmd/main/main.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -298,14 +298,16 @@ func main() {
298298
InstillCoreHost: config.Config.Server.InstillCoreHost,
299299
ComponentStore: compStore,
300300
}),
301-
MgmtPublicServiceClient: mgmtPublicServiceClient,
302-
MgmtPrivateServiceClient: mgmtPrivateServiceClient,
303-
MinioClient: minioClient,
304-
ComponentStore: compStore,
305-
Memory: ms,
306-
WorkerUID: workerUID,
307-
RetentionHandler: nil,
308-
BinaryFetcher: binaryFetcher,
301+
MgmtPublicServiceClient: mgmtPublicServiceClient,
302+
MgmtPrivateServiceClient: mgmtPrivateServiceClient,
303+
MinioClient: minioClient,
304+
ComponentStore: compStore,
305+
Memory: ms,
306+
WorkerUID: workerUID,
307+
RetentionHandler: nil,
308+
BinaryFetcher: binaryFetcher,
309+
ArtifactPublicServiceClient: artifactPublicServiceClient,
310+
ArtifactPrivateServiceClient: artifactPrivateServiceClient,
309311
},
310312
)
311313

@@ -511,7 +513,6 @@ func main() {
511513
lw.RegisterActivity(cw.UpdatePipelineRunActivity)
512514
lw.RegisterActivity(cw.UpsertComponentRunActivity)
513515

514-
mw.RegisterActivity(cw.UploadInputsToMinioActivity)
515516
mw.RegisterActivity(cw.UploadOutputsToMinioActivity)
516517
mw.RegisterActivity(cw.UploadRecipeToMinioActivity)
517518
mw.RegisterActivity(cw.UploadComponentInputsActivity)

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ require (
4040
github.com/influxdata/influxdb-client-go/v2 v2.12.3
4141
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241129105617-c2c298e76498
4242
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a
43-
github.com/instill-ai/x v0.5.0-alpha.0.20241119141833-e4a78ca87792
43+
github.com/instill-ai/x v0.5.0-alpha.0.20241203110942-cee5c110cba8
4444
github.com/itchyny/gojq v0.12.14
4545
github.com/jackc/pgx/v5 v5.5.5
4646
github.com/jmoiron/sqlx v1.4.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1285,8 +1285,8 @@ github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241129105617-c2c298e76498 h1:
12851285
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241129105617-c2c298e76498/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY=
12861286
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a h1:gmy8BcCFDZQan40c/D3f62DwTYtlCwi0VrSax+pKffw=
12871287
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a/go.mod h1:EpX3Yr661uWULtZf5UnJHfr5rw2PDyX8ku4Kx0UtYFw=
1288-
github.com/instill-ai/x v0.5.0-alpha.0.20241119141833-e4a78ca87792 h1:b4lhXcFJ/kGGC1RErtItoI57paf9WXBCVpaPIAApldY=
1289-
github.com/instill-ai/x v0.5.0-alpha.0.20241119141833-e4a78ca87792/go.mod h1:jkVtaq9T2zAFA5N46tlV4K5EEVE7FcOVNbqY4wFWYz8=
1288+
github.com/instill-ai/x v0.5.0-alpha.0.20241203110942-cee5c110cba8 h1:w2F6sI6VbzIXUIh6HrSrV4k43pM/brj1jv6HT994+so=
1289+
github.com/instill-ai/x v0.5.0-alpha.0.20241203110942-cee5c110cba8/go.mod h1:jkVtaq9T2zAFA5N46tlV4K5EEVE7FcOVNbqY4wFWYz8=
12901290
github.com/intel/goresctrl v0.2.0/go.mod h1:+CZdzouYFn5EsxgqAQTEzMfwKwuc0fVdMrT9FCCAVRQ=
12911291
github.com/itchyny/gojq v0.12.14 h1:6k8vVtsrhQSYgSGg827AD+PVVaB1NLXEdX+dda2oZCc=
12921292
github.com/itchyny/gojq v0.12.14/go.mod h1:y1G7oO7XkcR1LPZO59KyoCRy08T3j9vDYRV0GgYSS+s=

pkg/component/internal/util/helper.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -138,15 +138,24 @@ func GetFileTypeByFilename(filename string) (string, error) {
138138

139139
func GetContentTypeFromBase64(base64String string) (string, error) {
140140
// Remove the "data:" prefix and split at the first semicolon
141-
contentType := strings.TrimPrefix(base64String, "data:")
141+
if hasDataPrefix(base64String) {
142+
contentType := strings.TrimPrefix(base64String, "data:")
142143

143-
parts := strings.SplitN(contentType, ";", 2)
144-
if len(parts) != 2 {
145-
return "", fmt.Errorf("invalid format")
144+
parts := strings.SplitN(contentType, ";", 2)
145+
if len(parts) != 2 {
146+
return "", fmt.Errorf("invalid format")
147+
}
148+
149+
// The first part is the content type
150+
return parts[0], nil
146151
}
147152

148-
// The first part is the content type
149-
return parts[0], nil
153+
b, err := base64.StdEncoding.DecodeString(base64String)
154+
if err != nil {
155+
return "", fmt.Errorf("decode base64 string: %w", err)
156+
}
157+
mimeType := strings.Split(mimetype.Detect(b).String(), ";")[0]
158+
return mimeType, nil
150159
}
151160

152161
func GetFileBase64Content(base64String string) string {

pkg/service/blobstorage.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package service
2+
3+
import (
4+
"context"
5+
"encoding/base64"
6+
"fmt"
7+
"mime"
8+
"strings"
9+
"time"
10+
11+
"github.com/gabriel-vasile/mimetype"
12+
"google.golang.org/grpc/metadata"
13+
14+
"github.com/instill-ai/pipeline-backend/pkg/recipe"
15+
"github.com/instill-ai/pipeline-backend/pkg/resource"
16+
"github.com/instill-ai/pipeline-backend/pkg/utils"
17+
18+
artifactpb "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha"
19+
resourcex "github.com/instill-ai/x/resource"
20+
)
21+
22+
func (s *service) uploadBlobAndGetDownloadURL(ctx context.Context, ns resource.Namespace, data string) (string, error) {
23+
mimeType, err := getMimeType(data)
24+
if err != nil {
25+
return "", fmt.Errorf("get mime type: %w", err)
26+
}
27+
artifactClient := s.artifactPublicServiceClient
28+
requesterUID, _ := resourcex.GetRequesterUIDAndUserUID(ctx)
29+
30+
vars, err := recipe.GenerateSystemVariables(ctx, recipe.SystemVariables{})
31+
32+
if err != nil {
33+
return "", fmt.Errorf("generate system variables: %w", err)
34+
}
35+
36+
ctx = metadata.NewOutgoingContext(ctx, utils.GetRequestMetadata(vars))
37+
38+
timestamp := time.Now().Format(time.RFC3339)
39+
objectName := fmt.Sprintf("%s-%s%s", requesterUID.String(), timestamp, getFileExtension(mimeType))
40+
41+
// TODO: We will need to add the expiry days for the blob data.
42+
// This will be addressed in ins-6857
43+
resp, err := artifactClient.GetObjectUploadURL(ctx, &artifactpb.GetObjectUploadURLRequest{
44+
NamespaceId: ns.NsID,
45+
ObjectName: objectName,
46+
ObjectExpireDays: 0,
47+
})
48+
49+
if err != nil {
50+
return "", fmt.Errorf("get upload url: %w", err)
51+
}
52+
53+
uploadURL := resp.GetUploadUrl()
54+
data = removePrefix(data)
55+
b, err := base64.StdEncoding.DecodeString(data)
56+
if err != nil {
57+
return "", fmt.Errorf("decode base64 string: %w", err)
58+
}
59+
60+
err = utils.UploadBlobData(ctx, uploadURL, mimeType, b, s.log)
61+
if err != nil {
62+
return "", fmt.Errorf("upload blob data: %w", err)
63+
}
64+
65+
respDownloadURL, err := artifactClient.GetObjectDownloadURL(ctx, &artifactpb.GetObjectDownloadURLRequest{
66+
NamespaceId: ns.NsID,
67+
ObjectUid: resp.GetObject().GetUid(),
68+
})
69+
if err != nil {
70+
return "", fmt.Errorf("get object download url: %w", err)
71+
}
72+
73+
return respDownloadURL.GetDownloadUrl(), nil
74+
}
75+
76+
func getMimeType(data string) (string, error) {
77+
var mimeType string
78+
if strings.HasPrefix(data, "data:") {
79+
contentType := strings.TrimPrefix(data, "data:")
80+
parts := strings.SplitN(contentType, ";", 2)
81+
if len(parts) == 0 {
82+
return "", fmt.Errorf("invalid data url")
83+
}
84+
mimeType = parts[0]
85+
} else {
86+
b, err := base64.StdEncoding.DecodeString(data)
87+
if err != nil {
88+
return "", fmt.Errorf("decode base64 string: %w", err)
89+
}
90+
mimeType = strings.Split(mimetype.Detect(b).String(), ";")[0]
91+
92+
}
93+
return mimeType, nil
94+
}
95+
96+
func getFileExtension(mimeType string) string {
97+
ext, err := mime.ExtensionsByType(mimeType)
98+
if err != nil {
99+
return ""
100+
}
101+
if len(ext) == 0 {
102+
return ""
103+
}
104+
return ext[0]
105+
}
106+
107+
func removePrefix(data string) string {
108+
if strings.HasPrefix(data, "data:") {
109+
parts := strings.SplitN(data, ",", 2)
110+
if len(parts) == 0 {
111+
return ""
112+
}
113+
return parts[1]
114+
}
115+
return data
116+
}

pkg/service/main.go

Lines changed: 48 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/instill-ai/pipeline-backend/pkg/resource"
2121

2222
componentstore "github.com/instill-ai/pipeline-backend/pkg/component/store"
23+
artifactpb "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha"
2324
mgmtpb "github.com/instill-ai/protogen-go/core/mgmt/v1beta"
2425
pb "github.com/instill-ai/protogen-go/vdp/pipeline/v1beta"
2526
miniox "github.com/instill-ai/x/minio"
@@ -98,37 +99,41 @@ type TriggerResult struct {
9899
// However, we keep it here for now because we may need it in the future.
99100
// service is the implementation of the Service interface
100101
type service struct {
101-
repository repository.Repository
102-
redisClient *redis.Client
103-
temporalClient client.Client
104-
component *componentstore.Store
105-
mgmtPublicServiceClient mgmtpb.MgmtPublicServiceClient
106-
mgmtPrivateServiceClient mgmtpb.MgmtPrivateServiceClient
107-
aclClient acl.ACLClientInterface
108-
converter Converter
109-
minioClient miniox.MinioI
110-
memory memory.MemoryStore
111-
log *zap.Logger
112-
workerUID uuid.UUID
113-
retentionHandler MetadataRetentionHandler
114-
binaryFetcher external.BinaryFetcher
102+
repository repository.Repository
103+
redisClient *redis.Client
104+
temporalClient client.Client
105+
component *componentstore.Store
106+
mgmtPublicServiceClient mgmtpb.MgmtPublicServiceClient
107+
mgmtPrivateServiceClient mgmtpb.MgmtPrivateServiceClient
108+
aclClient acl.ACLClientInterface
109+
converter Converter
110+
minioClient miniox.MinioI
111+
memory memory.MemoryStore
112+
log *zap.Logger
113+
workerUID uuid.UUID
114+
retentionHandler MetadataRetentionHandler
115+
binaryFetcher external.BinaryFetcher
116+
artifactPublicServiceClient artifactpb.ArtifactPublicServiceClient
117+
artifactPrivateServiceClient artifactpb.ArtifactPrivateServiceClient
115118
}
116119

117120
// ServiceConfig is the configuration for the service
118121
type ServiceConfig struct {
119-
Repository repository.Repository
120-
RedisClient *redis.Client
121-
TemporalClient client.Client
122-
ACLClient acl.ACLClientInterface
123-
Converter Converter
124-
MgmtPublicServiceClient mgmtpb.MgmtPublicServiceClient
125-
MgmtPrivateServiceClient mgmtpb.MgmtPrivateServiceClient
126-
MinioClient miniox.MinioI
127-
ComponentStore *componentstore.Store
128-
Memory memory.MemoryStore
129-
WorkerUID uuid.UUID
130-
RetentionHandler MetadataRetentionHandler
131-
BinaryFetcher external.BinaryFetcher
122+
Repository repository.Repository
123+
RedisClient *redis.Client
124+
TemporalClient client.Client
125+
ACLClient acl.ACLClientInterface
126+
Converter Converter
127+
MgmtPublicServiceClient mgmtpb.MgmtPublicServiceClient
128+
MgmtPrivateServiceClient mgmtpb.MgmtPrivateServiceClient
129+
MinioClient miniox.MinioI
130+
ComponentStore *componentstore.Store
131+
Memory memory.MemoryStore
132+
WorkerUID uuid.UUID
133+
RetentionHandler MetadataRetentionHandler
134+
BinaryFetcher external.BinaryFetcher
135+
ArtifactPublicServiceClient artifactpb.ArtifactPublicServiceClient
136+
ArtifactPrivateServiceClient artifactpb.ArtifactPrivateServiceClient
132137
}
133138

134139
// NewService initiates a service instance
@@ -141,19 +146,21 @@ func NewService(
141146
}
142147

143148
return &service{
144-
repository: cfg.Repository,
145-
redisClient: cfg.RedisClient,
146-
temporalClient: cfg.TemporalClient,
147-
mgmtPublicServiceClient: cfg.MgmtPublicServiceClient,
148-
mgmtPrivateServiceClient: cfg.MgmtPrivateServiceClient,
149-
component: cfg.ComponentStore,
150-
aclClient: cfg.ACLClient,
151-
converter: cfg.Converter,
152-
minioClient: cfg.MinioClient,
153-
memory: cfg.Memory,
154-
log: zapLogger,
155-
workerUID: cfg.WorkerUID,
156-
retentionHandler: cfg.RetentionHandler,
157-
binaryFetcher: cfg.BinaryFetcher,
149+
repository: cfg.Repository,
150+
redisClient: cfg.RedisClient,
151+
temporalClient: cfg.TemporalClient,
152+
mgmtPublicServiceClient: cfg.MgmtPublicServiceClient,
153+
mgmtPrivateServiceClient: cfg.MgmtPrivateServiceClient,
154+
component: cfg.ComponentStore,
155+
aclClient: cfg.ACLClient,
156+
converter: cfg.Converter,
157+
minioClient: cfg.MinioClient,
158+
memory: cfg.Memory,
159+
log: zapLogger,
160+
workerUID: cfg.WorkerUID,
161+
retentionHandler: cfg.RetentionHandler,
162+
binaryFetcher: cfg.BinaryFetcher,
163+
artifactPublicServiceClient: cfg.ArtifactPublicServiceClient,
164+
artifactPrivateServiceClient: cfg.ArtifactPrivateServiceClient,
158165
}
159166
}

0 commit comments

Comments
 (0)