Skip to content

Commit

Permalink
fix: create topic configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
jonas-grgt committed Jan 24, 2025
1 parent d75e7e4 commit 5bbb8b2
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 26 deletions.
19 changes: 18 additions & 1 deletion kadmin/topic_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,24 @@ type TopicCreationDetails struct {
type TopicCreatedMsg struct {
}

type TopicCreationErrMsg struct {
Err error
}

type TopicCreationStartedMsg struct {
Created chan bool
Err chan error
}

func (msg *TopicCreationStartedMsg) AwaitCompletion() tea.Msg {
select {
case <-msg.Created:
return TopicCreatedMsg{}
case err := <-msg.Err:
return TopicCreationErrMsg{Err: err}
}
}

func (ka *SaramaKafkaAdmin) CreateTopic(tcd TopicCreationDetails) tea.Msg {
created := make(chan bool)
err := make(chan error)
Expand All @@ -36,11 +49,15 @@ func (ka *SaramaKafkaAdmin) CreateTopic(tcd TopicCreationDetails) tea.Msg {
}

func (ka *SaramaKafkaAdmin) doCreateTopic(tcd TopicCreationDetails, created chan bool, errChan chan error) {
properties := make(map[string]*string)
for k, v := range tcd.Properties {
properties[k] = &v
}
err := ka.admin.CreateTopic(tcd.Name, &sarama.TopicDetail{
NumPartitions: int32(tcd.NumPartitions),
ReplicationFactor: 1,
ReplicaAssignment: nil,
ConfigEntries: nil,
ConfigEntries: properties,
}, false)
if err != nil {
errChan <- err
Expand Down
44 changes: 40 additions & 4 deletions kadmin/topic_creator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ func TestCreateTopic(t *testing.T) {
t.Run("Create new topic", func(t *testing.T) {
topic := topicName()
// when
topicCreatedMsg := ka.CreateTopic(TopicCreationDetails{topic, 2, map[string]string{}}).(TopicCreationStartedMsg)
properties := map[string]string{}
properties["compression.type"] = "lz4"
topicCreatedMsg := ka.CreateTopic(TopicCreationDetails{topic, 2, properties}).(TopicCreationStartedMsg)

select {
case <-topicCreatedMsg.Created:
Expand All @@ -20,14 +22,48 @@ func TestCreateTopic(t *testing.T) {

// then
listTopicsMsg := ka.ListTopics().(TopicListingStartedMsg)

var topics []Topic
select {
case topics = <-listTopicsMsg.Topics:
case <-listTopicsMsg.Err:
msg := listTopicsMsg.AwaitCompletion()
switch msg := msg.(type) {
case TopicListedMsg:
topics = msg.Topics
case TopicListedErrorMsg:
assert.Fail(t, "Failed to list topics")
return
}

assert.Contains(t, topics, Topic{topic, 2, 1, 0})

// and
var configs map[string]string
configListingStartedMsg := ka.ListConfigs(topic).(TopicConfigListingStartedMsg)
msg = configListingStartedMsg.AwaitCompletion()
switch msg := msg.(type) {
case TopicConfigsListedMsg:
configs = msg.Configs
case TopicConfigListingErrorMsg:
assert.Fail(t, "Failed to list topic configs")
return
}

assert.Equal(t, "lz4", configs["compression.type"])

t.Run("Creation fails", func(t *testing.T) {
// when
topicCreatedMsg := ka.CreateTopic(TopicCreationDetails{topic, 2, map[string]string{}}).(TopicCreationStartedMsg)

msg = topicCreatedMsg.AwaitCompletion()
switch msg := msg.(type) {
case TopicCreatedMsg:
t.Error("Topic created but expected to fail")
return
case TopicListedErrorMsg:
assert.Equal(t, "kafka server: Topic with this name already exists - Topic 'topic-0' already exists.", msg.Err.Error())
}

})

// clean up
ka.DeleteTopic(topic)
})
Expand Down
7 changes: 0 additions & 7 deletions kadmin/topic_lister.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,6 @@ type TopicLister interface {
ListTopics() tea.Msg
}

func (msg *TopicCreationStartedMsg) AwaitCompletion() tea.Msg {
select {
case <-msg.Created:
return TopicCreatedMsg{}
}
}

type TopicListedMsg struct {
Topics []Topic
}
Expand Down
6 changes: 5 additions & 1 deletion ui/components/notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ func (m *Model) SpinWithRocketMsg(msg string) tea.Cmd {

func (m *Model) ShowErrorMsg(msg string, error error) tea.Cmd {
m.state = err
m.msg = "🚨 " + styles.FG(styles.ColorRed).Render(msg+": ") +
s := ": "
if msg == "" {
s = ""
}
m.msg = "🚨 " + styles.FG(styles.ColorRed).Render(msg+s) +
styles.FG(styles.ColorWhite).Render(strings.TrimSuffix(error.Error(), "\n"))
return nil
}
Expand Down
23 changes: 13 additions & 10 deletions ui/pages/create_topic_page/create_topic_page.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ func (m *Model) Update(msg tea.Msg) tea.Cmd {
switch msg := msg.(type) {
case kadmin.TopicCreationStartedMsg:
return msg.AwaitCompletion
case kadmin.TopicCreationErrMsg:
m.initForm(initial)
return m.notifier.ShowErrorMsg("Topic creation failure", msg.Err)
case bsp.TickMsg:
cmd := m.notifier.Update(msg)
return cmd
Expand All @@ -87,10 +90,6 @@ func (m *Model) Update(msg tea.Msg) tea.Cmd {
} else {
return propagateMsgToForm(m, msg)
}
//case kadmin.TopicCreationErrorMsg:
// log.Debug("TopicCreationError", msg.Err)
// m.aTopicCreated = false
// return nil
case kadmin.TopicCreatedMsg:
m.notifier.ShowSuccessMsg("Topic created!")
m.formValues.name = ""
Expand Down Expand Up @@ -121,13 +120,17 @@ func propagateMsgToForm(m *Model, msg tea.Msg) tea.Cmd {
m.notifier.SpinWithRocketMsg("Creating topic"),
func() tea.Msg {
numPartitions, _ := strconv.Atoi(m.formValues.numPartitions)
configs := map[string]string{
"cleanup.policy": m.formValues.cleanupPolicy,
}
for _, c := range m.formValues.configs {
configs[c.key] = c.value
}
return m.topicCreator.CreateTopic(
kadmin.TopicCreationDetails{
Name: m.formValues.name,
NumPartitions: numPartitions,
Properties: map[string]string{
"cleanup.policy": m.formValues.cleanupPolicy,
},
Properties: configs,
})
})
} else {
Expand Down Expand Up @@ -156,7 +159,7 @@ func (m *Model) initForm(fs formState) {
Value(&m.formValues.name).
Validate(func(str string) error {
if str == "" {
return errors.New("Topic Name cannot be empty.")
return errors.New("topic Name cannot be empty")
}
return nil
})
Expand All @@ -166,12 +169,12 @@ func (m *Model) initForm(fs formState) {
Value(&m.formValues.numPartitions).
Validate(func(str string) error {
if str == "" {
return errors.New("Number of Partitions cannot be empty.")
return errors.New("number of Partitions cannot be empty")
}
if n, e := strconv.Atoi(str); e != nil {
return errors.New(fmt.Sprintf("'%s' is not a valid numeric partition count value", str))
} else if n <= 0 {
return errors.New("Value must be greater than zero")
return errors.New("value must be greater than zero")
}
return nil
})
Expand Down
30 changes: 27 additions & 3 deletions ui/pages/create_topic_page/create_topic_page_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package create_topic_page

import (
"errors"
"fmt"
tea "github.com/charmbracelet/bubbletea"
"github.com/stretchr/testify/assert"
"ktea/kadmin"
Expand Down Expand Up @@ -113,6 +114,24 @@ func TestCreateTopic(t *testing.T) {
assert.NotContains(t, render, "Custom Topic configurations:")
})

t.Run("creation failed", func(t *testing.T) {
mockCreator := MockTopicCreator{
CreateTopicFunc: func(details kadmin.TopicCreationDetails) tea.Msg {
return nil
},
}
m := New(&mockCreator)

m.Update(kadmin.TopicCreationErrMsg{
Err: fmt.Errorf("Topic with this name already exists - Topic 'topic-0' already exists."),
})

render := m.View(ui.NewTestKontext(), ui.TestRenderer)

assert.Contains(t, render, "Topic creation failure: Topic with this name already exists - Topic 'topic-0' already exists.")

})

t.Run("create topic", func(t *testing.T) {
mockCreator := MockTopicCreator{
CreateTopicFunc: func(details kadmin.TopicCreationDetails) tea.Msg {
Expand All @@ -137,7 +156,11 @@ func TestCreateTopic(t *testing.T) {
m.Update(keys.Key(tea.KeyDown))
cmd = m.Update(keys.Key(tea.KeyEnter))
m.Update(cmd())
// config - submit
keys.UpdateKeys(m, "delete.retention.ms=1029394")
cmd = m.Update(keys.Key(tea.KeyEnter))
keys.Submit(m)

// actual submit
msgs := keys.Submit(m)

var capturedDetails CapturedTopicCreationDetails
Expand All @@ -153,8 +176,9 @@ func TestCreateTopic(t *testing.T) {
TopicCreationDetails: kadmin.TopicCreationDetails{
"topicA",
2,
map[string]string{
"cleanup.policy": "delete-compact",
map[string]*string{

Check failure on line 179 in ui/pages/create_topic_page/create_topic_page_test.go

View workflow job for this annotation

GitHub Actions / build

cannot use map[string]*string{…} (value of type map[string]*string) as map[string]string value in struct literal
"cleanup.policy": "delete-compact",

Check failure on line 180 in ui/pages/create_topic_page/create_topic_page_test.go

View workflow job for this annotation

GitHub Actions / build

cannot use "delete-compact" (untyped string constant) as *string value in map literal
"delete.retention.ms": "1029394",

Check failure on line 181 in ui/pages/create_topic_page/create_topic_page_test.go

View workflow job for this annotation

GitHub Actions / build

cannot use "1029394" (untyped string constant) as *string value in map literal
},
},
}, capturedDetails)
Expand Down

0 comments on commit 5bbb8b2

Please sign in to comment.