Skip to content

Commit f0383f4

Browse files
committed
chore: use HasKafka in favor of KAdmin
1 parent ba286d3 commit f0383f4

File tree

7 files changed

+42
-132
lines changed

7 files changed

+42
-132
lines changed

hw-kafka-client.cabal

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,8 @@ library
5555
build-tool-depends: c2hs:c2hs
5656
if impl(ghc <8.0)
5757
build-depends: semigroups
58-
exposed-modules: Kafka.Admin
59-
Kafka.Admin.AdminProperties
60-
Kafka.Admin.Types
58+
exposed-modules: Kafka.Topic
59+
Kafka.Topic.Types
6160
Kafka.Consumer
6261
Kafka.Consumer.ConsumerProperties
6362
Kafka.Consumer.Subscription

src/Kafka/Admin/AdminProperties.hs

Lines changed: 0 additions & 43 deletions
This file was deleted.

src/Kafka/Internal/RdKafka.chs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1161,11 +1161,10 @@ rdKafkaErrorIsRetriable ptr = boolFromCInt <$> rdKafkaErrorIsRetriable' ptr
11611161
rdKafkaErrorTxnRequiresAbort :: RdKafkaErrorTPtr -> IO Bool
11621162
rdKafkaErrorTxnRequiresAbort ptr = boolFromCInt <$> rdKafkaErrorTxnRequiresAbort' ptr
11631163

1164-
-- Admin API
1164+
-- Topics
11651165
{#enum rd_kafka_admin_op_t as ^ {underscoreToCase} deriving (Show, Eq) #}
11661166

1167-
data RdKafkaTopicResultT
1168-
{#pointer *rd_kafka_topic_result_t as RdKafkaTopicResultTPtr foreign -> RdKafkaTopicResultT #}
1167+
11691168

11701169
data RdKafkaAdminOptionsT
11711170
{#pointer *rd_kafka_AdminOptions_t as RdKafkaAdminOptionsTPtr foreign -> RdKafkaAdminOptionsT #}
Lines changed: 27 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,60 +1,42 @@
1-
module Kafka.Admin(
1+
module Kafka.Topic(
22
module X
3-
, newKAdmin
43
, createTopic
54
, deleteTopic
6-
, closeKAdmin
75
) where
86

9-
import Control.Monad
10-
import Control.Monad.IO.Class
11-
import Data.Text
12-
import Data.List.NonEmpty
13-
import qualified Data.List.NonEmpty as NEL
14-
import qualified Data.Text as T
7+
import Control.Monad.IO.Class
8+
import Data.List.NonEmpty
9+
import qualified Data.List.NonEmpty as NEL
10+
import Data.Text
11+
import qualified Data.Text as T
1512

1613
import Kafka.Internal.RdKafka
1714
import Kafka.Internal.Setup
1815

19-
import Kafka.Types as X
20-
import Kafka.Admin.AdminProperties as X
21-
import Kafka.Admin.Types as X
16+
import Kafka.Topic.Types as X
17+
import Kafka.Types as X
2218

23-
newKAdmin ::( MonadIO m )
24-
=> AdminProperties
25-
-> m (Either KafkaError KAdmin)
26-
newKAdmin properties = liftIO $ do
27-
kafkaConfig@(KafkaConf kafkaConf' _ _) <- kafkaConf ( KafkaProps $ adminProps properties)
28-
maybeKafka <- newRdKafkaT RdKafkaConsumer kafkaConf'
29-
case maybeKafka of
30-
Left err -> pure $ Left $ KafkaError err
31-
Right kafka -> pure $ Right $ KAdmin (Kafka kafka) kafkaConfig
32-
33-
closeKAdmin :: KAdmin
34-
-> IO ()
35-
closeKAdmin ka = void $ rdKafkaConsumerClose (getRdKafka ka)
3619
--- CREATE TOPIC ---
37-
createTopic :: KAdmin
38-
-> NewTopic
39-
-> IO (Either KafkaError TopicName)
40-
createTopic kAdmin topic = liftIO $ do
41-
let kafkaPtr = getRdKafka kAdmin
20+
createTopic :: HasKafka k => k -> NewTopic -> IO (Either KafkaError TopicName)
21+
createTopic k topic = do
22+
let kafkaPtr = getRdKafka k
4223
queue <- newRdKafkaQueue kafkaPtr
43-
opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny
24+
opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny
4425

4526
topicRes <- withNewTopic topic $ \topic' -> rdKafkaCreateTopic kafkaPtr topic' opts queue
4627
case topicRes of
47-
Left err -> do
28+
Left err -> do
4829
pure $ Left (NEL.head err)
4930
Right _ -> do
5031
pure $ Right $ topicName topic
5132

5233
--- DELETE TOPIC ---
53-
deleteTopic :: KAdmin
34+
deleteTopic :: HasKafka k
35+
=> k
5436
-> TopicName
5537
-> IO (Either KafkaError TopicName)
56-
deleteTopic kAdmin topic = liftIO $ do
57-
let kafkaPtr = getRdKafka kAdmin
38+
deleteTopic k topic = liftIO $ do
39+
let kafkaPtr = getRdKafka k
5840
queue <- newRdKafkaQueue kafkaPtr
5941
opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny
6042

@@ -65,15 +47,15 @@ deleteTopic kAdmin topic = liftIO $ do
6547
Right _ -> do
6648
pure $ Right topic
6749

68-
withNewTopic :: NewTopic
50+
withNewTopic :: NewTopic
6951
-> (RdKafkaNewTopicTPtr -> IO a)
7052
-> IO (Either (NonEmpty KafkaError) a)
7153
withNewTopic t transform = do
7254
mkNewTopicRes <- mkNewTopic t newTopicPtr
7355
case mkNewTopicRes of
7456
Left err -> do
7557
return $ Left err
76-
Right topic -> do
58+
Right topic -> do
7759
res <- transform topic
7860
return $ Right res
7961

@@ -93,23 +75,23 @@ newTopicPtr :: NewTopic -> IO (Either KafkaError RdKafkaNewTopicTPtr)
9375
newTopicPtr topic = do
9476
ptrRes <- newRdKafkaNewTopic (unpack $ unTopicName $ topicName topic) (unPartitionCount $ topicPartitionCount topic) (unReplicationFactor $ topicReplicationFactor topic)
9577
case ptrRes of
96-
Left str -> pure $ Left (KafkaError $ T.pack str)
78+
Left str -> pure $ Left (KafkaError $ T.pack str)
9779
Right ptr -> pure $ Right ptr
9880

9981
oldTopicPtr :: TopicName -> IO (Either KafkaError RdKafkaDeleteTopicTPtr)
10082
oldTopicPtr tName = do
10183
res <- newRdKafkaDeleteTopic $ unpack . unTopicName $ tName
10284
case res of
103-
Left str -> pure $ Left (KafkaError $ T.pack str)
85+
Left str -> pure $ Left (KafkaError $ T.pack str)
10486
Right ptr -> pure $ Right ptr
10587

106-
mkNewTopic :: NewTopic
88+
mkNewTopic :: NewTopic
10789
-> (NewTopic -> IO (Either KafkaError a))
10890
-> IO (Either (NonEmpty KafkaError) a)
10991
mkNewTopic topic create = do
11092
res <- create topic
11193
case res of
112-
Left err -> pure $ Left (NEL.singleton err)
94+
Left err -> pure $ Left (singletonList err)
11395
Right resource -> pure $ Right resource
11496

11597
rmOldTopic :: TopicName
@@ -118,5 +100,8 @@ rmOldTopic :: TopicName
118100
rmOldTopic tName remove = do
119101
res <- remove tName
120102
case res of
121-
Left err -> pure $ Left (NEL.singleton err)
103+
Left err -> pure $ Left (singletonList err)
122104
Right resource -> pure $ Right resource
105+
106+
singletonList :: a -> NonEmpty a
107+
singletonList x = x :| []

src/Kafka/Admin/Types.hs renamed to src/Kafka/Topic/Types.hs

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,12 @@
1-
module Kafka.Admin.Types (
2-
KAdmin(..)
3-
, PartitionCount (..)
1+
module Kafka.Topic.Types (
2+
PartitionCount (..)
43
, ReplicationFactor (..)
54
, NewTopic (..)
65
) where
76

87
import Data.Map
98

109
import Kafka.Types
11-
import Kafka.Internal.Setup
12-
13-
data KAdmin = KAdmin {
14-
kaKafkaPtr :: !Kafka
15-
, kaKafkaConf :: !KafkaConf
16-
}
17-
18-
instance HasKafka KAdmin where
19-
getKafka = kaKafkaPtr
20-
{-# INLINE getKafka #-}
21-
22-
instance HasKafkaConf KAdmin where
23-
getKafkaConf = kaKafkaConf
24-
{-# INLINE getKafkaConf #-}
2510

2611
newtype PartitionCount = PartitionCount { unPartitionCount :: Int } deriving (Show, Eq)
2712
newtype ReplicationFactor = ReplicationFactor { unReplicationFactor :: Int } deriving (Show, Eq)

tests-it/Kafka/IntegrationSpec.hs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import Kafka.Consumer
1818
import qualified Data.Text as T
1919
import Kafka.Metadata
2020
import Kafka.Producer
21-
import Kafka.Admin
21+
import Kafka.Topic
2222
import Kafka.TestEnv
2323
import Test.Hspec
2424

@@ -174,20 +174,19 @@ spec = do
174174
forM_ res $ \rcs ->
175175
forM_ rcs ((`shouldBe` Set.fromList (headersToList testHeaders)) . Set.fromList . headersToList . crHeaders)
176176

177-
describe "Kafka.Admin.Spec" $ do
177+
describe "Kafka.Topic.Spec" $ do
178178
let topicName = addRandomChars "admin.topic.created." 5
179179

180180
topicsMVar <- runIO newEmptyMVar
181181

182-
specWithAdmin "Create topic" $ do
183-
184-
it "should create a new topic" $ \(admin :: KAdmin) -> do
182+
specWithConsumer "Read all topics" consumerProps $ do
183+
184+
it "should create a topic" $ \(consumer :: KafkaConsumer) -> do
185185
tName <- topicName
186186
let newTopic = mkNewTopic (TopicName ( T.pack(tName) ))
187-
result <- createTopic admin newTopic
187+
result <- createTopic consumer newTopic
188188
result `shouldSatisfy` isRight
189189

190-
specWithConsumer "Read all topics" consumerProps $ do
191190

192191
it "should return all the topics" $ \(consumer :: KafkaConsumer) -> do
193192
res <- allTopicsMetadata consumer (Timeout 1000)
@@ -205,13 +204,12 @@ spec = do
205204
topicsLen `shouldSatisfy` (>0)
206205
hasTopic `shouldBe` True
207206

208-
specWithAdmin "Remove topics" $ do
209-
210-
it "should delete all the topics currently existing" $ \(admin ::KAdmin) -> do
207+
it "should delete all the topics currently existing" $ \(consumer :: KafkaConsumer) -> do
211208
topics <- takeMVar topicsMVar
212209
forM_ topics $ \topic -> do
213-
result <- deleteTopic admin topic
210+
result <- deleteTopic consumer topic
214211
result `shouldSatisfy` isRight
212+
215213
----------------------------------------------------------------------------------------------------------------
216214

217215
data ReadState = Skip | Read

tests-it/Kafka/TestEnv.hs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import qualified System.Random as Rnd
1515
import Control.Concurrent
1616
import Kafka.Consumer as C
1717
import Kafka.Producer as P
18-
import Kafka.Admin as A
1918

2019
import Test.Hspec
2120

@@ -58,9 +57,6 @@ producerProps = P.brokersList [brokerAddress]
5857
<> P.setCallback (logCallback (\l s1 s2 -> print $ "[Producer] " <> show l <> ": " <> s1 <> ", " <> s2))
5958
<> P.setCallback (errorCallback (\e r -> print $ "[Producer] " <> show e <> ": " <> r))
6059

61-
adminProperties :: AdminProperties
62-
adminProperties = A.brokers [brokerAddress]
63-
6460
testSubscription :: TopicName -> Subscription
6561
testSubscription t = topics [t]
6662
<> offsetReset Earliest
@@ -80,9 +76,6 @@ mkConsumerWith props = do
8076
(RebalanceAssign _) -> putMVar var True
8177
_ -> pure ()
8278

83-
mkAdmin :: IO KAdmin
84-
mkAdmin = newKAdmin adminProperties >>= \(Right k) -> pure k
85-
8679
specWithConsumer :: String -> ConsumerProperties -> SpecWith KafkaConsumer -> Spec
8780
specWithConsumer s p f =
8881
beforeAll (mkConsumerWith p)
@@ -97,9 +90,3 @@ specWithKafka s p f =
9790
beforeAll ((,) <$> mkConsumerWith p <*> mkProducer)
9891
$ afterAll (\(consumer, producer) -> void $ closeProducer producer >> closeConsumer consumer)
9992
$ describe s f
100-
101-
specWithAdmin :: String -> SpecWith KAdmin -> Spec
102-
specWithAdmin s f =
103-
beforeAll mkAdmin
104-
$ afterAll (void . closeKAdmin)
105-
$ describe s f

0 commit comments

Comments
 (0)