Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: support aliyun oss backend. #216

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"errors"
"net/url"

"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/table"
"github.com/aws/aws-sdk-go-v2/aws"
Expand All @@ -32,6 +33,18 @@ type CatalogType string

type AwsProperties map[string]string

// OSSConfig contains configuration for accessing OSS object storage
type OSSConfig struct {
divinerapier marked this conversation as resolved.
Show resolved Hide resolved
// Endpoint specifies the OSS service endpoint URL
Endpoint string
// AccessKey is the access key ID for OSS authentication
AccessKey string
// SecretKey is the secret access key for OSS authentication
SecretKey string
// SignatureVersion is the signature version for OSS authentication
SignatureVersion oss.SignatureVersionType
}

const (
REST CatalogType = "rest"
Hive CatalogType = "hive"
Expand Down Expand Up @@ -122,12 +135,21 @@ func WithPrefix(prefix string) Option[RestCatalog] {
}
}

// WithOSSConfig sets the OSS configuration for the catalog.
func WithOSSConfig(cfg OSSConfig) Option[RestCatalog] {
return func(o *options) {
o.ossConfig = cfg
}
}

type Option[T GlueCatalog | RestCatalog] func(*options)

type options struct {
awsConfig aws.Config
awsProperties AwsProperties

ossConfig OSSConfig

tlsConfig *tls.Config
credential string
oauthToken string
Expand Down
13 changes: 13 additions & 0 deletions catalog/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"maps"
"net/http"
"net/url"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -56,6 +57,11 @@ const (
keyRestSigV4Region = "rest.signing-region"
keyRestSigV4Service = "rest.signing-name"
keyAuthUrl = "rest.authorization-url"

keyOSSAccessKey = "client.oss-access-key"
keyOSSSecretKey = "client.oss-secret-key"
keyOSSEndpoint = "client.oss-endpoint"
keyOSSSignatureVersion = "client.oss-signature-version"
)

var (
Expand Down Expand Up @@ -356,6 +362,12 @@ func toProps(o *options) iceberg.Properties {
if o.authUri != nil {
setIf(keyAuthUrl, o.authUri.String())
}

setIf(keyOSSAccessKey, o.ossConfig.AccessKey)
setIf(keyOSSSecretKey, o.ossConfig.SecretKey)
setIf(keyOSSEndpoint, o.ossConfig.Endpoint)
// Convert OSS signature version from enum to string representation
setIf(keyOSSSignatureVersion, strconv.FormatInt(int64(o.ossConfig.SignatureVersion), 10))
return props
}

Expand Down Expand Up @@ -515,6 +527,7 @@ func (r *RestCatalog) fetchConfig(opts *options) (*options, error) {

o := fromProps(cfg)
o.awsConfig = opts.awsConfig
o.ossConfig = opts.ossConfig
o.tlsConfig = opts.tlsConfig

if uri, ok := cfg["uri"]; ok {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go 1.23
toolchain go1.23.2

require (
github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.1.3
github.com/apache/arrow-go/v18 v18.0.1-0.20241029153821-f0c5d9939d3f
github.com/aws/aws-sdk-go-v2 v1.32.5
github.com/aws/aws-sdk-go-v2/config v1.28.5
Expand Down Expand Up @@ -97,6 +98,7 @@ require (
golang.org/x/sys v0.26.0 // indirect
golang.org/x/term v0.25.0 // indirect
golang.org/x/text v0.19.0 // indirect
golang.org/x/time v0.4.0 // indirect
golang.org/x/tools v0.26.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ github.com/alecthomas/participle/v2 v2.1.0 h1:z7dElHRrOEEq45F2TG5cbQihMtNTv8vwld
github.com/alecthomas/participle/v2 v2.1.0/go.mod h1:Y1+hAs8DHPmc3YUFzqllV+eSQ9ljPTk0ZkPMtEdAx2c=
github.com/alecthomas/repr v0.2.0 h1:HAzS41CIzNW5syS8Mf9UwXhNH1J9aix/BvDRf1Ml2Yk=
github.com/alecthomas/repr v0.2.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4=
github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.1.3 h1:grJyLSdRJtfxKKhCTWSeJhnOQsp2WoLNdK8XA5FE9oo=
github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.1.3/go.mod h1:FTzydeQVmR24FI0D6XWUOMKckjXehM/jgMn1xC+DA9M=
github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
github.com/apache/arrow-go/v18 v18.0.1-0.20241029153821-f0c5d9939d3f h1:k14GhTGJuvq27vRgLxf4iuufzLt7GeN3UOytJmU7W/A=
Expand Down Expand Up @@ -243,6 +245,8 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/time v0.4.0 h1:Z81tqI5ddIoXDPvVQ7/7CC9TnLM7ubaFG2qXYd5BbYY=
golang.org/x/time v0.4.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
Expand Down
2 changes: 2 additions & 0 deletions io/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ func inferFileIOFromSchema(path string, props map[string]string) (IO, error) {
switch parsed.Scheme {
case "s3", "s3a", "s3n":
return createS3FileIO(parsed, props)
case "oss":
return createOSSFileIO(parsed, props)
case "file", "":
return LocalFS{}, nil
default:
Expand Down
173 changes: 173 additions & 0 deletions io/oss.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package io

import (
"context"
"fmt"
"io"
"io/fs"
"net/url"
"os"
"strings"
"sync"
"time"

"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
)

// Constants for OSS configuration options
const (
OSSAccessKey = "client.oss-access-key"
OSSSecretKey = "client.oss-secret-key"
OSSEndpoint = "client.oss-endpoint"
OSSSignatureVersion = "client.oss-signature-version"
)

func createOSSFileIO(parsed *url.URL, props map[string]string) (IO, error) {
endpoint, ok := props[OSSEndpoint]
if !ok {
endpoint = os.Getenv("OSS_ENDPOINT")
}
if endpoint == "" {
return nil, fmt.Errorf("oss endpoint must be specified")
}

accessKey := props[OSSAccessKey]
secretKey := props[OSSSecretKey]

provider := credentials.NewStaticCredentialsProvider(accessKey, secretKey)

cfg := oss.LoadDefaultConfig().
WithRetryMaxAttempts(3).
WithCredentialsProvider(provider).
WithEndpoint(endpoint).
WithSignatureVersion(parseSignatureVersion(props[OSSSignatureVersion])).
WithUsePathStyle(true).
WithConnectTimeout(10 * time.Second).
WithReadWriteTimeout(time.Minute)

client := oss.NewClient(cfg)

ossFS := &ossFS{
client: client,
bucket: parsed.Host,
}

preprocess := func(n string) string {
_, after, found := strings.Cut(n, "://")
if found {
n = after
}
return strings.TrimPrefix(n, parsed.Host)
}

return FSPreProcName(ossFS, preprocess), nil
}

// parseSignatureVersion converts string version to oss.SignatureVersionType
// "0" -> 0 (v1)
// "1" -> 1 (v4)
// defaults to 1 (v4) for unknown values
func parseSignatureVersion(version string) oss.SignatureVersionType {
switch version {
case "0": // v1
return 0
case "1": // v4
return 1
default:
return 1
}
}

type ossFS struct {
client *oss.Client
bucket string
}

// Open implements fs.FS
func (o *ossFS) Open(name string) (fs.File, error) {
if !fs.ValidPath(name) {
return nil, &os.PathError{Op: "open", Path: name, Err: os.ErrInvalid}
}
if name == "." {
return &ossFile{
name: name,
}, nil
}
name = strings.TrimPrefix(name, "/")

file, err := o.client.OpenFile(context.Background(), o.bucket, name)
if err != nil {
return nil, err
}

return &ossFile{
file: file,
name: name,
}, nil
}

type ossFile struct {
mutex sync.Mutex
file *oss.ReadOnlyFile
name string
}

// Read implements io.Reader
func (f *ossFile) Read(p []byte) (int, error) {
f.mutex.Lock()
defer f.mutex.Unlock()

return f.file.Read(p)
}

// Seek implements io.Seeker
func (f *ossFile) Seek(offset int64, whence int) (int64, error) {
f.mutex.Lock()
defer f.mutex.Unlock()

return f.file.Seek(offset, whence)
}

// Close implements io.Closer
func (f *ossFile) Close() error {
f.mutex.Lock()
defer f.mutex.Unlock()

return f.file.Close()
}

// ReadAt implements io.ReaderAt
func (f *ossFile) ReadAt(p []byte, off int64) (n int, err error) {
f.mutex.Lock()
defer f.mutex.Unlock()

if _, err := f.file.Seek(off, io.SeekStart); err != nil {
return 0, err
}
return f.file.Read(p)
}

func (f *ossFile) Stat() (fs.FileInfo, error) {
f.mutex.Lock()
defer f.mutex.Unlock()

return f.file.Stat()
}
Loading