Skip to content

Commit c4409d1

Browse files
committed
[feat] added support for offload policies in namespaces and topics
1 parent ffba2a8 commit c4409d1

File tree

5 files changed

+308
-3
lines changed

5 files changed

+308
-3
lines changed

pulsaradmin/pkg/admin/namespace.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,15 @@ type Namespaces interface {
295295
// RemoveSubscriptionExpirationTime removes subscription expiration time from a namespace,
296296
// defaulting to broker settings
297297
RemoveSubscriptionExpirationTime(namespace utils.NameSpaceName) error
298+
299+
// GetOffload returns the offload configuration for a namespace
300+
GetOffload(namespace utils.NameSpaceName) (*utils.OffloadPolicies, error)
301+
302+
// SetOffload sets the offload configuration on a namespace
303+
SetOffload(namespace utils.NameSpaceName, policy *utils.OffloadPolicies) error
304+
305+
// DeleteOffload removes the offload configuration from a namespace
306+
DeleteOffload(namespace utils.NameSpaceName) error
298307
}
299308

300309
type namespaces struct {
@@ -940,3 +949,32 @@ func (n *namespaces) RemoveSubscriptionExpirationTime(namespace utils.NameSpaceN
940949
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "subscriptionExpirationTime")
941950
return n.pulsar.Client.Delete(endpoint)
942951
}
952+
953+
func (n *namespaces) SetOffload(namespace utils.NameSpaceName, policy *utils.OffloadPolicies) error {
954+
nsName, err := utils.GetNamespaceName(namespace.String())
955+
if err != nil {
956+
return err
957+
}
958+
endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "offloadPolicies")
959+
return n.pulsar.Client.Post(endpoint, policy)
960+
}
961+
962+
func (n *namespaces) GetOffload(namespace utils.NameSpaceName) (*utils.OffloadPolicies, error) {
963+
var policy utils.OffloadPolicies
964+
nsName, err := utils.GetNamespaceName(namespace.String())
965+
if err != nil {
966+
return nil, err
967+
}
968+
endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "offloadPolicies")
969+
err = n.pulsar.Client.Get(endpoint, &policy)
970+
return &policy, err
971+
}
972+
973+
func (n *namespaces) DeleteOffload(namespace utils.NameSpaceName) error {
974+
nsName, err := utils.GetNamespaceName(namespace.String())
975+
if err != nil {
976+
return err
977+
}
978+
endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "removeOffloadPolicies")
979+
return n.pulsar.Client.Delete(endpoint)
980+
}

pulsaradmin/pkg/admin/namespace_test.go

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@ package admin
2020
import (
2121
"testing"
2222

23+
"github.com/stretchr/testify/assert"
24+
"github.com/stretchr/testify/require"
25+
2326
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
2427
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
2528
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
26-
"github.com/stretchr/testify/assert"
27-
"github.com/stretchr/testify/require"
2829
)
2930

3031
func ptr(n int) *int {
@@ -341,3 +342,82 @@ func TestNamespaces_GetOffloadThresholdInSeconds(t *testing.T) {
341342
expected := int64(60)
342343
assert.Equal(t, expected, offloadThresholdInSeconds)
343344
}
345+
346+
func TestNamespaces_SetOffloadPolicy(t *testing.T) {
347+
config := &config.Config{}
348+
admin, err := New(config)
349+
require.NoError(t, err)
350+
require.NotNil(t, admin)
351+
352+
namespace, _ := utils.GetNamespaceName("public/default")
353+
354+
tests := []struct {
355+
name string
356+
errReason string
357+
policy *utils.OffloadPolicies
358+
}{
359+
{
360+
name: "Set invalid empty offload policy",
361+
errReason: "The driver is not supported, support value: S3,aws-s3," +
362+
"google-cloud-storage,filesystem,azureblob,aliyun-oss",
363+
policy: &utils.OffloadPolicies{},
364+
},
365+
{
366+
name: "Set invalid S3 offload policy",
367+
errReason: "The bucket must be specified for namespace offload.",
368+
policy: &utils.OffloadPolicies{
369+
ManagedLedgerOffloadDriver: "S3",
370+
},
371+
},
372+
{
373+
name: "Set valid filesystem offload policy",
374+
errReason: "",
375+
policy: &utils.OffloadPolicies{
376+
ManagedLedgerOffloadDriver: "filesystem",
377+
OffloadersDirectory: "/tmp",
378+
ManagedLedgerOffloadedReadPriority: "BOOKKEEPER_FIRST",
379+
},
380+
},
381+
}
382+
for _, tt := range tests {
383+
t.Run(tt.name, func(t *testing.T) {
384+
err := admin.Namespaces().SetOffload(*namespace, tt.policy)
385+
if tt.errReason == "" {
386+
assert.Equal(t, nil, err)
387+
}
388+
if err != nil {
389+
restError := err.(rest.Error)
390+
assert.Equal(t, tt.errReason, restError.Reason)
391+
}
392+
})
393+
}
394+
}
395+
396+
func TestNamespaces_GetAndDeleteOffloadPolicy(t *testing.T) {
397+
config := &config.Config{}
398+
admin, err := New(config)
399+
require.NoError(t, err)
400+
require.NotNil(t, admin)
401+
402+
namespace, _ := utils.GetNamespaceName("public/default")
403+
404+
// set simple filesystem offload policy and get it
405+
err = admin.Namespaces().SetOffload(*namespace, &utils.OffloadPolicies{
406+
ManagedLedgerOffloadDriver: "filesystem",
407+
OffloadersDirectory: "/var/tmp",
408+
ManagedLedgerOffloadedReadPriority: "TIERED_STORAGE_FIRST",
409+
})
410+
assert.Equal(t, nil, err)
411+
offload, err := admin.Namespaces().GetOffload(*namespace)
412+
assert.Equal(t, nil, err)
413+
assert.Equal(t, "filesystem", offload.ManagedLedgerOffloadDriver)
414+
assert.Equal(t, "/var/tmp", offload.OffloadersDirectory)
415+
assert.Equal(t, "TIERED_STORAGE_FIRST", offload.ManagedLedgerOffloadedReadPriority)
416+
417+
// delete previously set filesystem offload policy
418+
err = admin.Namespaces().DeleteOffload(*namespace)
419+
assert.Equal(t, nil, err)
420+
offload, err = admin.Namespaces().GetOffload(*namespace)
421+
assert.Equal(t, nil, err)
422+
assert.Equal(t, "", offload.ManagedLedgerOffloadDriver)
423+
}

pulsaradmin/pkg/admin/topic.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,30 @@ type Topics interface {
383383
// @param data
384384
// list of replication cluster id
385385
SetReplicationClusters(topic utils.TopicName, data []string) error
386+
387+
// GetOffload returns the offload configuration for a topic
388+
//
389+
// @param topic
390+
// topicName struct
391+
// @param applied
392+
// when set to true, function will try to find policy applied to this topic
393+
// in namespace level, if no policy set in topic level
394+
GetOffload(topic utils.TopicName, applied bool) (*utils.OffloadPolicies, error)
395+
396+
// SetOffload sets the offload policy for a topic
397+
//
398+
// @param topic
399+
// topicName struct
400+
// @param policy
401+
// Pointer to the OffloadPolicies struct with fields set according to the used
402+
// tiered storage configuration
403+
SetOffload(topic utils.TopicName, policy *utils.OffloadPolicies) error
404+
405+
// DeleteOffload removes the offload configuration on a topic
406+
//
407+
// @param topic
408+
// topicName struct
409+
DeleteOffload(topic utils.TopicName) error
386410
}
387411

388412
type topics struct {
@@ -917,3 +941,22 @@ func (t *topics) GetReplicationClusters(topic utils.TopicName) ([]string, error)
917941
err := t.pulsar.Client.Get(endpoint, &data)
918942
return data, err
919943
}
944+
945+
func (t *topics) GetOffload(topic utils.TopicName, applied bool) (*utils.OffloadPolicies, error) {
946+
var policy utils.OffloadPolicies
947+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offloadPolicies")
948+
_, err := t.pulsar.Client.GetWithQueryParams(endpoint, &policy, map[string]string{
949+
"applied": strconv.FormatBool(applied),
950+
}, true)
951+
return &policy, err
952+
}
953+
954+
func (t *topics) SetOffload(topic utils.TopicName, policy *utils.OffloadPolicies) error {
955+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offloadPolicies")
956+
return t.pulsar.Client.Post(endpoint, policy)
957+
}
958+
959+
func (t *topics) DeleteOffload(topic utils.TopicName) error {
960+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offloadPolicies")
961+
return t.pulsar.Client.Delete(endpoint)
962+
}

pulsaradmin/pkg/admin/topic_test.go

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@ import (
2424
"testing"
2525
"time"
2626

27-
"github.com/apache/pulsar-client-go/pulsar"
2827
"github.com/stretchr/testify/assert"
2928

29+
"github.com/apache/pulsar-client-go/pulsar"
30+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
31+
3032
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
3133
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
3234
)
@@ -523,3 +525,91 @@ func TestRetention(t *testing.T) {
523525
100*time.Millisecond,
524526
)
525527
}
528+
529+
func TestSetOffloadPolicy(t *testing.T) {
530+
randomName := newTopicName()
531+
topic := "persistent://public/default/" + randomName
532+
cfg := &config.Config{}
533+
admin, err := New(cfg)
534+
assert.NoError(t, err)
535+
assert.NotNil(t, admin)
536+
topicName, err := utils.GetTopicName(topic)
537+
assert.NoError(t, err)
538+
err = admin.Topics().Create(*topicName, 4)
539+
assert.NoError(t, err)
540+
541+
tests := []struct {
542+
name string
543+
errReason string
544+
policy *utils.OffloadPolicies
545+
}{
546+
{
547+
name: "Set invalid empty offload policy",
548+
errReason: "The driver is not supported, support value: S3,aws-s3," +
549+
"google-cloud-storage,filesystem,azureblob,aliyun-oss",
550+
policy: &utils.OffloadPolicies{},
551+
},
552+
{
553+
name: "Set invalid S3 offload policy",
554+
errReason: "The bucket must be specified for namespace offload.",
555+
policy: &utils.OffloadPolicies{
556+
ManagedLedgerOffloadDriver: "S3",
557+
},
558+
},
559+
{
560+
name: "Set valid filesystem offload policy",
561+
errReason: "",
562+
policy: &utils.OffloadPolicies{
563+
ManagedLedgerOffloadDriver: "filesystem",
564+
OffloadersDirectory: "/tmp",
565+
ManagedLedgerOffloadedReadPriority: "BOOKKEEPER_FIRST",
566+
},
567+
},
568+
}
569+
for _, tt := range tests {
570+
t.Run(tt.name, func(t *testing.T) {
571+
err := admin.Topics().SetOffload(*topicName, tt.policy)
572+
if tt.errReason == "" {
573+
assert.Equal(t, nil, err)
574+
}
575+
if err != nil {
576+
restError := err.(rest.Error)
577+
assert.Equal(t, tt.errReason, restError.Reason)
578+
}
579+
})
580+
}
581+
}
582+
583+
func TestGetAndDeleteOffloadPolicy(t *testing.T) {
584+
randomName := newTopicName()
585+
topic := "persistent://public/default/" + randomName
586+
cfg := &config.Config{}
587+
admin, err := New(cfg)
588+
assert.NoError(t, err)
589+
assert.NotNil(t, admin)
590+
topicName, err := utils.GetTopicName(topic)
591+
assert.NoError(t, err)
592+
err = admin.Topics().Create(*topicName, 4)
593+
assert.NoError(t, err)
594+
595+
// set simple filesystem offload policy and get it
596+
err = admin.Topics().SetOffload(*topicName, &utils.OffloadPolicies{
597+
ManagedLedgerOffloadDriver: "filesystem",
598+
OffloadersDirectory: "/var/tmp",
599+
ManagedLedgerOffloadedReadPriority: "TIERED_STORAGE_FIRST",
600+
})
601+
assert.Equal(t, nil, err)
602+
offload, err := admin.Topics().GetOffload(*topicName, false)
603+
assert.Equal(t, nil, err)
604+
assert.Equal(t, "filesystem", offload.ManagedLedgerOffloadDriver)
605+
assert.Equal(t, "/var/tmp", offload.OffloadersDirectory)
606+
assert.Equal(t, "TIERED_STORAGE_FIRST", offload.ManagedLedgerOffloadedReadPriority)
607+
608+
// delete previously set filesystem offload policy
609+
err = admin.Topics().DeleteOffload(*topicName)
610+
assert.Equal(t, nil, err)
611+
offload, err = admin.Topics().GetOffload(*topicName, false)
612+
assert.Equal(t, nil, err)
613+
assert.Equal(t, "", offload.ManagedLedgerOffloadDriver)
614+
615+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package utils
19+
20+
type OffloadPolicies struct {
21+
FileSystemDriver bool `json:"fileSystemDriver"`
22+
FileSystemProfilePath string `json:"fileSystemProfilePath"`
23+
FileSystemURI string `json:"fileSystemURI"`
24+
GcsDriver bool `json:"gcsDriver"`
25+
GcsManagedLedgerOffloadBucket string `json:"gcsManagedLedgerOffloadBucket"`
26+
GcsManagedLedgerOffloadMaxBlockSizeInBytes int `json:"gcsManagedLedgerOffloadMaxBlockSizeInBytes"`
27+
GcsManagedLedgerOffloadReadBufferSizeInBytes int `json:"gcsManagedLedgerOffloadReadBufferSizeInBytes"`
28+
GcsManagedLedgerOffloadRegion string `json:"gcsManagedLedgerOffloadRegion"`
29+
GcsManagedLedgerOffloadServiceAccountKeyFile string `json:"gcsManagedLedgerOffloadServiceAccountKeyFile"`
30+
ManagedLedgerExtraConfigurations map[string]string `json:"managedLedgerExtraConfigurations"`
31+
ManagedLedgerOffloadBucket string `json:"managedLedgerOffloadBucket"`
32+
ManagedLedgerOffloadDeletionLagInMillis int `json:"managedLedgerOffloadDeletionLagInMillis"`
33+
ManagedLedgerOffloadDriver string `json:"managedLedgerOffloadDriver"`
34+
ManagedLedgerOffloadMaxBlockSizeInBytes int `json:"managedLedgerOffloadMaxBlockSizeInBytes"`
35+
ManagedLedgerOffloadMaxThreads int `json:"managedLedgerOffloadMaxThreads"`
36+
ManagedLedgerOffloadPrefetchRounds int `json:"managedLedgerOffloadPrefetchRounds"`
37+
ManagedLedgerOffloadReadBufferSizeInBytes int `json:"managedLedgerOffloadReadBufferSizeInBytes"`
38+
ManagedLedgerOffloadRegion string `json:"managedLedgerOffloadRegion"`
39+
ManagedLedgerOffloadServiceEndpoint string `json:"managedLedgerOffloadServiceEndpoint"`
40+
ManagedLedgerOffloadThresholdInBytes int `json:"managedLedgerOffloadThresholdInBytes"`
41+
ManagedLedgerOffloadThresholdInSeconds int `json:"managedLedgerOffloadThresholdInSeconds"`
42+
ManagedLedgerOffloadedReadPriority string `json:"managedLedgerOffloadedReadPriority"`
43+
OffloadersDirectory string `json:"offloadersDirectory"`
44+
S3Driver bool `json:"s3Driver"`
45+
S3ManagedLedgerOffloadBucket string `json:"s3ManagedLedgerOffloadBucket"`
46+
S3ManagedLedgerOffloadCredentialID string `json:"s3ManagedLedgerOffloadCredentialId"`
47+
S3ManagedLedgerOffloadCredentialSecret string `json:"s3ManagedLedgerOffloadCredentialSecret"`
48+
S3ManagedLedgerOffloadMaxBlockSizeInBytes int `json:"s3ManagedLedgerOffloadMaxBlockSizeInBytes"`
49+
S3ManagedLedgerOffloadReadBufferSizeInBytes int `json:"s3ManagedLedgerOffloadReadBufferSizeInBytes"`
50+
S3ManagedLedgerOffloadRegion string `json:"s3ManagedLedgerOffloadRegion"`
51+
S3ManagedLedgerOffloadRole string `json:"s3ManagedLedgerOffloadRole"`
52+
S3ManagedLedgerOffloadRoleSessionName string `json:"s3ManagedLedgerOffloadRoleSessionName"`
53+
S3ManagedLedgerOffloadServiceEndpoint string `json:"s3ManagedLedgerOffloadServiceEndpoint"`
54+
}

0 commit comments

Comments
 (0)