The KubeMQ SDK for Go enables Go developers to seamlessly communicate with the KubeMQ server, implementing various communication patterns such as Events, EventStore, Commands, Queries, and Queues.
- KubeMQ Go SDK
- Table of Contents
- Prerequisites
- Installation
- Running Examples
- SDK Overview
- PubSub Events Operations
- PubSub EventsStore Operations
- Commands & Queries – Commands Operations
- Commands & Queries – Queries Operations
- Queues Operations
- Go SDK 1.17 higher
- KubeMQ server running locally or accessible over the network
go get github.com/kubemq-io/kubemq-go
The examples are standalone projects that showcase the usage of the SDK. To run the examples, ensure you have a running instance of KubeMQ.
The SDK implements all communication patterns available through the KubeMQ server:
- PubSub
- Events
- EventStore
- Commands & Queries (CQ)
- Commands
- Queries
- Queues
Create a new Events channel.
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
Ctx | context | The context for the request. | None | Yes |
ChannelName | String | Name of the channel you want to create | None | Yes |
Name | Type | Description |
---|---|---|
Err | error | Error message if any |
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func createEventsChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventsClient, err := kubemq.NewEventsClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("events-channel-creator"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := eventsClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := eventsClient.Create(ctx, "events-channel"); err != nil {
log.Fatal(err)
}
}
Delete an existing Events channel.
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
ChannelName | String | Name of the channel you want to delete | None | Yes |
Name | Type | Description |
---|---|---|
Err | error | Error message if any |
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func deleteEventsChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventsClient, err := kubemq.NewEventsClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("events-channel-delete"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := eventsClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := eventsClient.Delete(ctx, "events-channel"); err != nil {
log.Fatal(err)
}
}
Retrieve a list of Events channels.
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
SearchQuery | String | Search query to filter channels (optional) | None | No |
Returns a list where each PubSubChannel
has the following attributes:
Name | Type | Description |
---|---|---|
Name | String | The name of the Pub/Sub channel. |
Type | String | The type of the Pub/Sub channel. |
LastActivity | long | The timestamp of the last activity on the channel, represented in milliseconds since epoch. |
IsActive | boolean | Indicates whether the channel is active or not. |
Incoming | PubSubStats | The statistics related to incoming messages for this channel. |
Outgoing | PubSubStats | The statistics related to outgoing messages for this channel. |
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func listEventsChannels() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventsClient, err := kubemq.NewEventsClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("events-channel-lister"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := eventsClient.Close()
if err != nil {
log.Fatal(err)
}
}()
channels, err := eventsClient.List(ctx, "")
if err != nil {
log.Fatal(err)
}
for _, channel := range channels {
log.Println(channel)
}
}
Sends a message to an Events channel.
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
Id | String | Unique identifier for the event message. | None | No |
Channel | String | The channel to which the event message is sent. | None | Yes |
Metadata | String | Metadata associated with the event message. | None | No |
Body | byte[] | Body of the event message in bytes. | Empty byte array | No |
Tags | Map<String, String> | Tags associated with the event message as key-value pairs. | Empty Map | No |
Name | Type | Description |
---|---|---|
Err | error | Error message if any |
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
Channel | String | The channel to subscribe to. | None | Yes |
Group | String | The group to subscribe with. | None | No |
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
"time"
)
func sendSubscribeEvents() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventsClient, err := kubemq.NewEventsClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("events-send-subscribe"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := eventsClient.Close()
if err != nil {
log.Fatal(err)
}
}()
subReq := &kubemq.EventsSubscription{
Channel: "events-channel",
Group: "",
ClientId: "",
}
err = eventsClient.Subscribe(ctx, subReq, func(msg *kubemq.Event, err error) {
log.Println(msg.String())
})
if err != nil {
log.Fatal(err)
}
time.Sleep(300 * time.Second)
err = eventsClient.Send(ctx, &kubemq.Event{
Channel: "events-channel",
Metadata: "some-metadata",
Body: []byte("hello kubemq - sending event"),
})
if err != nil {
log.Fatal(err)
}
time.Sleep(1 * time.Second)
}
Create a new Events Store channel.
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
Ctx | context | The context for the request. | None | Yes |
ChannelName | String | Name of the channel you want to create | None | Yes |
Name | Type | Description |
---|---|---|
Err | error | Error message if any |
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func createEventsStoreChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventsStoreClient, err := kubemq.NewEventsStoreClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("events-store-channel-creator"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := eventsStoreClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := eventsStoreClient.Create(ctx, "events-store-channel"); err != nil {
log.Fatal(err)
}
}
Delete an existing Events Store channel.
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
ChannelName | String | Name of the channel you want to delete | None | Yes |
Name | Type | Description |
---|---|---|
Err | error | Error message if any |
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func deleteEventsStoreChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventsStoreClient, err := kubemq.NewEventsStoreClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("events-store-channel-delete"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := eventsStoreClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := eventsStoreClient.Delete(ctx, "events-store-channel"); err != nil {
log.Fatal(err)
}
}
Retrieve a list of Events channels.
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
SearchQuery | String | Search query to filter channels (optional) | None | No |
Returns a list where each PubSubChannel
has the following attributes:
Name | Type | Description |
---|---|---|
Name | String | The name of the Pub/Sub channel. |
Type | String | The type of the Pub/Sub channel. |
LastActivity | long | The timestamp of the last activity on the channel, represented in milliseconds since epoch. |
IsActive | boolean | Indicates whether the channel is active or not. |
Incoming | PubSubStats | The statistics related to incoming messages for this channel. |
Outgoing | PubSubStats | The statistics related to outgoing messages for this channel. |
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func listEventsStoreChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventsStoreClient, err := kubemq.NewEventsStoreClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("events-store-channel-lister"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := eventsStoreClient.Close()
if err != nil {
log.Fatal(err)
}
}()
channels, err := eventsStoreClient.List(ctx, "")
if err != nil {
log.Fatal(err)
}
for _, channel := range channels {
log.Println(channel)
}
}
Sends a message to an Events channel.
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
Id | String | Unique identifier for the event message. | None | No |
Channel | String | The channel to which the event message is sent. | None | Yes |
Metadata | String | Metadata associated with the event message. | None | No |
Body | byte[] | Body of the event message in bytes. | Empty byte array | No |
Tags | Map<String, String> | Tags associated with the event message as key-value pairs. | Empty Map | No |
Name | Type | Description |
---|---|---|
Err | error | Error message if any |
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
Channel | String | The channel to subscribe to. | None | Yes |
Group | String | The group to subscribe with. | None | No |
SubscriptionType | EventsStoreSubscription | The Subscription | None | Yes |
Type | Value | Description |
---|---|---|
StartNewOnly | 1 | Start storing events from the point when the subscription is made |
StartFromFirst | 2 | Start storing events from the first event available |
StartFromLast | 3 | Start storing events from the last event available |
StartAtSequence | 4 | Start storing events from a specific sequence number |
StartAtTime | 5 | Start storing events from a specific point in time |
StartAtTimeDelta | 6 | Start storing events from a specific time delta in seconds |
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
"time"
)
func sendSubscribeEventsStore() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventsStoreClient, err := kubemq.NewEventsStoreClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("events-store-send-subscribe"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := eventsStoreClient.Close()
if err != nil {
log.Fatal(err)
}
}()
subReq := &kubemq.EventsStoreSubscription{
Channel: "events-store-channel",
Group: "",
ClientId: "",
SubscriptionType: kubemq.StartFromFirstEvent(),
}
err = eventsStoreClient.Subscribe(ctx, subReq, func(msg *kubemq.EventStoreReceive, err error) {
log.Println(msg.String())
})
if err != nil {
log.Fatal(err)
}
time.Sleep(1 * time.Second)
result, err := eventsStoreClient.Send(ctx, &kubemq.EventStore{
Channel: "events-store-channel",
Metadata: "some-metadata",
Body: []byte("hello kubemq - sending event store"),
})
if err != nil {
log.Fatal(err)
}
log.Println(result)
time.Sleep(1 * time.Second)
}
Create a new Command channel.
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
Ctx | context | The context for the request. | None | Yes |
ChannelName | String | Name of the channel you want to create | None | Yes |
Name | Type | Description |
---|---|---|
Err | error | Error message if any |
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func createCommandsChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
commandsClient, err := kubemq.NewCommandsClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := commandsClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := commandsClient.Create(ctx, "commands.A"); err != nil {
log.Fatal(err)
}
}
Delete an existing Command channel.
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
ChannelName | String | Name of the channel you want to delete | None | Yes |
Name | Type | Description |
---|---|---|
Err | error | Error message if any |
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func deleteCommandsChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
commandsClient, err := kubemq.NewCommandsClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := commandsClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := commandsClient.Delete(ctx, "commands.A"); err != nil {
log.Fatal(err)
}
}
Retrieve a list of Command channels.
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
SearchQuery | String | Search query to filter channels (optional) | None | No |
Returns a list where each CQChannel
has the following attributes:
Name | Type | Description |
---|---|---|
Name | String | The name of the Pub/Sub channel. |
Type | String | The type of the Pub/Sub channel. |
LastActivity | long | The timestamp of the last activity on the channel, represented in milliseconds since epoch. |
IsActive | boolean | Indicates whether the channel is active or not. |
Incoming | PubSubStats | The statistics related to incoming messages for this channel. |
Outgoing | PubSubStats | The statistics related to outgoing messages for this channel. |
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func listCommandsChannels() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
commandsClient, err := kubemq.NewCommandsClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := commandsClient.Close()
if err != nil {
log.Fatal(err)
}
}()
channels, err := commandsClient.List(ctx, "")
if err != nil {
log.Fatal(err)
}
for _, channel := range channels {
log.Println(channel)
}
}
Sends a command request to a Command channel.
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
Id | String | The ID of the command message. | None | Yes |
Channel | String | The channel through which the command message will be sent. | None | Yes |
Metadata | String | Additional metadata associated with the command message. | None | No |
Body | byte[] | The body of the command message as bytes. | Empty byte array | No |
Tags | Map<String, String> | A dictionary of key-value pairs representing tags associated with the command message. | Empty Map | No |
Timeout | Duration | The maximum time duration for waiting to response. | None | Yes |
Name | Type | Description |
---|---|---|
CommandId | String | Command Id |
ResponseClientId | String | The client ID associated with the command response. |
Executed | boolean | Indicates if the command has been executed. |
ExecutedAt | time | The timestamp when the command response was created. |
Error | String | The error message if there was an error. |
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
Channel | String | The channel for the subscription. | None | Yes |
Group | String | The group associated with the subscription. | None | No |
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
"time"
)
func sendReceiveCommands() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
commandsClient, err := kubemq.NewCommandsClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("sendReceiveCommands"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := commandsClient.Close()
if err != nil {
log.Fatal(err)
}
}()
subRequest := &kubemq.CommandsSubscription{
Channel: "commands",
ClientId: "",
Group: "",
}
log.Println("subscribing to commands")
err = commandsClient.Subscribe(ctx, subRequest, func(cmd *kubemq.CommandReceive, err error) {
log.Println(cmd.String())
resp := &kubemq.Response{
RequestId: cmd.Id,
ResponseTo: cmd.ResponseTo,
Metadata: "some-metadata",
ExecutedAt: time.Now(),
}
if err := commandsClient.Response(ctx, resp); err != nil {
log.Fatal(err)
}
})
if err != nil {
log.Fatal(err)
}
time.Sleep(1 * time.Second)
log.Println("sending command")
result, err := commandsClient.Send(ctx, kubemq.NewCommand().
SetChannel("commands").
SetMetadata("some-metadata").
SetBody([]byte("hello kubemq - sending command")).
SetTimeout(time.Duration(10)*time.Second))
if err != nil {
log.Fatal(err)
}
log.Println(result)
}
Create a new Query channel.
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
Ctx | context | The context for the request. | None | Yes |
ChannelName | String | Name of the channel you want to create | None | Yes |
Name | Type | Description |
---|---|---|
Err | error | Error message if any |
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func createQueriesChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queriesClient, err := kubemq.NewQueriesClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queriesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := queriesClient.Create(ctx, "queries.A"); err != nil {
log.Fatal(err)
}
}
Delete an existing Query channel.
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
ChannelName | String | Name of the channel you want to delete | None | Yes |
Name | Type | Description |
---|---|---|
Err | error | Error message if any |
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func deleteQueriesChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queriesClient, err := kubemq.NewQueriesClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queriesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := queriesClient.Delete(ctx, "queries.A"); err != nil {
log.Fatal(err)
}
}
Retrieve a list of Query channels.
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
SearchQuery | String | Search query to filter channels (optional) | None | No |
Returns a list where each CQChannel
has the following attributes:
Name | Type | Description |
---|---|---|
Name | String | The name of the Pub/Sub channel. |
Type | String | The type of the Pub/Sub channel. |
LastActivity | long | The timestamp of the last activity on the channel, represented in milliseconds since epoch. |
IsActive | boolean | Indicates whether the channel is active or not. |
Incoming | PubSubStats | The statistics related to incoming messages for this channel. |
Outgoing | PubSubStats | The statistics related to outgoing messages for this channel. |
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func listQueriesChannels() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queriesClient, err := kubemq.NewQueriesClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queriesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
channels, err := queriesClient.List(ctx, "")
if err != nil {
log.Fatal(err)
}
for _, channel := range channels {
log.Println(channel)
}
}
Sends a query request to a Query channel.
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
Id | String | The ID of the query message. | None | Yes |
Channel | String | The channel through which the query message will be sent. | None | Yes |
Metadata | String | Additional metadata associated with the query message. | None | No |
Body | byte[] | The body of the query message as bytes. | Empty byte array | No |
Tags | Map<String, String> | A dictionary of key-value pairs representing tags associated with the query message. | Empty Map | No |
Timeout | Duration | The maximum time duration for waiting to response. | None | Yes |
Name | Type | Description |
---|---|---|
QueryId | String | Query Id |
ResponseClientId | String | The client ID associated with the query response. |
Executed | boolean | Indicates if the query has been executed. |
Metadata | String | Additional metadata associated with the query response message. |
Body | byte[] | The body of the query response message as bytes. |
Tags | Map<String, String> | A dictionary of key-value pairs representing tags associated with the query response message. |
ExecutedAt | time | The timestamp when the query response was created. |
Error | String | The error message if there was an error. |
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
Channel | String | The channel for the subscription. | None | Yes |
Group | String | The group associated with the subscription. | None | No |
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
"time"
)
func sendReceiveQueries() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queriesClient, err := kubemq.NewQueriesClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("sendReceiveQueries"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queriesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
subRequest := &kubemq.QueriesSubscription{
Channel: "queries",
ClientId: "",
Group: "",
}
log.Println("subscribing to queries")
err = queriesClient.Subscribe(ctx, subRequest, func(query *kubemq.QueryReceive, err error) {
log.Println(query.String())
resp := &kubemq.Response{
RequestId: query.Id,
ResponseTo: query.ResponseTo,
Metadata: "some-metadata",
ExecutedAt: time.Now(),
Body: []byte("hello kubemq - sending query response"),
}
if err := queriesClient.Response(ctx, resp); err != nil {
log.Fatal(err)
}
})
if err != nil {
log.Fatal(err)
}
time.Sleep(1 * time.Second)
log.Println("sending query")
result, err := queriesClient.Send(ctx, kubemq.NewQuery().
SetChannel("queries").
SetMetadata("some-metadata").
SetBody([]byte("hello kubemq - sending query")).
SetTimeout(time.Duration(10)*time.Second))
if err != nil {
log.Fatal(err)
}
log.Println(result)
}
Create a new Queue channel.
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
Ctx | context | The context for the request. | None | Yes |
ChannelName | String | Name of the channel you want to create | None | Yes |
Name | Type | Description |
---|---|---|
Err | error | Error message if any |
package main
import (
"context"
"github.com/kubemq-io/kubemq-go/queues_stream"
"log"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
queues_stream.WithAddress("localhost", 50000),
queues_stream.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queuesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := queuesClient.Create(ctx, "queues.A"); err != nil {
log.Fatal(err)
}
}
Delete an existing Queue channel.
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
ChannelName | String | Name of the channel you want to delete | None | Yes |
Name | Type | Description |
---|---|---|
Err | error | Error message if any |
package main
import (
"context"
"github.com/kubemq-io/kubemq-go/queues_stream"
"log"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
queues_stream.WithAddress("localhost", 50000),
queues_stream.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queuesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := queuesClient.Delete(ctx, "queues.A"); err != nil {
log.Fatal(err)
}
}
Retrieve a list of Queue channels.
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
SearchQuery | String | Search query to filter channels (optional) | None | No |
Returns a list where each QueuesChannel
has the following attributes:
Name | Type | Description |
---|---|---|
Name | String | The name of the Pub/Sub channel. |
Type | String | The type of the Pub/Sub channel. |
LastActivity | long | The timestamp of the last activity on the channel, represented in milliseconds since epoch. |
IsActive | boolean | Indicates whether the channel is active or not. |
Incoming | PubSubStats | The statistics related to incoming messages for this channel. |
Outgoing | PubSubStats | The statistics related to outgoing messages for this channel. |
package main
import (
"context"
"github.com/kubemq-io/kubemq-go/queues_stream"
"log"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
queues_stream.WithAddress("localhost", 50000),
queues_stream.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queuesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
channels, err := queuesClient.List(ctx, "")
if err != nil {
log.Fatal(err)
}
for _, channel := range channels {
log.Println(channel)
}
}
Send and receive messages from a Queue channel.
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
Id | String | The unique identifier for the message. | None | No |
Channel | String | The channel of the message. | None | Yes |
Metadata | String | The metadata associated with the message. | None | No |
Body | byte[] | The body of the message. | new byte[0] | No |
Tags | Map<String, String> | The tags associated with the message. | new HashMap<>() | No |
PolicyDelaySeconds | int | The delay in seconds before the message becomes available in the queue. | None | No |
PolicyExpirationSeconds | int | The expiration time in seconds for the message. | None | No |
PolicyMaxReceiveCount | int | The number of receive attempts allowed for the message before it is moved to the dead letter queue. | None | No |
PolicyMaxReceiveQueue | String | The dead letter queue where the message will be moved after reaching the maximum receive attempts. | None | No |
Name | Type | Description |
---|---|---|
Id | String | The unique identifier of the message. |
SentAt | LocalDateTime | The timestamp when the message was sent. |
ExpiredAt | LocalDateTime | The timestamp when the message will expire. |
DelayedTo | LocalDateTime | The timestamp when the message will be delivered. |
IsError | boolean | Indicates if there was an error while sending the message. |
Error | String | The error message if isError is true. |
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
Channel | String | The channel to poll messages from. | None | Yes |
MaxItems | int | The maximum number of messages to poll. | 1 | No |
WaitTimeout | int | The wait timeout in seconds for polling messages. | 60 | No |
AutoAck | boolean | Indicates if messages should be auto-acknowledged. | false | No |
VisibilitySeconds | int | Add a visibility timeout feature for messages. | 0 | No |
Name | Type | Description |
---|---|---|
Messages | List | The list of received queue messages. |
Name | Type | Description |
---|---|---|
Id | String | The unique identifier for the message. |
Channel | String | The channel from which the message was received. |
Metadata | String | Metadata associated with the message. |
Body | byte[] | The body of the message in byte array format. |
ClientID | String | The ID of the client that sent the message. |
Tags | Map<String, String> |
Key-value pairs representing tags for the message. |
Timestamp | Instant | The timestamp when the message was created. |
Sequence | long | The sequence number of the message. |
ReceiveCount | int | The number of times the message has been received. |
ReRouted | boolean | Indicates whether the message was rerouted. |
ReRoutedFromQueue | String | The name of the queue from which the message was rerouted. |
ExpirationAt | Instant | The expiration time of the message, if applicable. |
DelayedTo | Instant | The time the message is delayed until, if applicable. |
package main
import (
"context"
"github.com/kubemq-io/kubemq-go/queues_stream"
"log"
"time"
)
func sendAndReceive() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
queues_stream.WithAddress("localhost", 50000),
queues_stream.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queuesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
msg := queues_stream.NewQueueMessage().
SetId("message_id").
SetChannel("sendAndReceive").
SetMetadata("some-metadata").
SetBody([]byte("hello world from KubeMQ queue")).
SetTags(map[string]string{
"key1": "value1",
"key2": "value2",
}).
SetPolicyDelaySeconds(1).
SetPolicyExpirationSeconds(10).
SetPolicyMaxReceiveCount(3).
SetPolicyMaxReceiveQueue("dlq")
result, err := queuesClient.Send(ctx, msg)
if err != nil {
log.Fatal(err)
}
log.Println(result)
pollRequest := queues_stream.NewPollRequest().
SetChannel("sendAndReceive").
SetMaxItems(1).
SetWaitTimeout(10).
SetAutoAck(true)
msgs, err := queuesClient.Poll(ctx, pollRequest)
//if err != nil {
// log.Fatal(err)
//}
//AckAll - Acknowledge all messages
//if err := msgs.AckAll(); err != nil {
// log.Fatal(err)
//}
//NackAll - Not Acknowledge all messages
//if err := msgs.NAckAll(); err != nil {
// log.Fatal(err)
//}
// RequeueAll - Requeue all messages
//if err := msgs.ReQueueAll("requeue-queue-channel"); err != nil {
// log.Fatal(err)
//}
for _, msg := range msgs.Messages {
log.Println(msg.String())
// Ack - Acknowledge message
if err := msg.Ack(); err != nil {
log.Fatal(err)
}
// Nack - Not Acknowledge message
//if err := msg.NAck(); err != nil {
// log.Fatal(err)
//}
// Requeue - Requeue message
//if err := msg.ReQueue("requeue-queue-channel"); err != nil {
// log.Fatal(err)
//}
}
}
package main
import (
"context"
"github.com/kubemq-io/kubemq-go/queues_stream"
"log"
"time"
)
func sendAndReceiveWithVisibilityExpiration() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
queues_stream.WithAddress("localhost", 50000),
queues_stream.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queuesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
msg := queues_stream.NewQueueMessage().
SetId("message_id").
SetChannel("sendAndReceiveWithVisibility").
SetMetadata("some-metadata").
SetBody([]byte("hello world from KubeMQ queue - with visibility"))
result, err := queuesClient.Send(ctx, msg)
if err != nil {
log.Fatal(err)
}
log.Println(result)
pollRequest := queues_stream.NewPollRequest().
SetChannel("sendAndReceiveWithVisibility").
SetMaxItems(1).
SetWaitTimeout(10).
SetVisibilitySeconds(2)
msgs, err := queuesClient.Poll(ctx, pollRequest)
for _, msg := range msgs.Messages {
log.Println(msg.String())
log.Println("Received message, waiting 3 seconds before ack")
time.Sleep(3 * time.Second)
if err := msg.Ack(); err != nil {
log.Fatal(err)
}
}
}
func sendAndReceiveWithVisibilityExtension() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
queues_stream.WithAddress("localhost", 50000),
queues_stream.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queuesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
msg := queues_stream.NewQueueMessage().
SetId("message_id").
SetChannel("sendAndReceiveWithVisibility").
SetMetadata("some-metadata").
SetBody([]byte("hello world from KubeMQ queue - with visibility"))
result, err := queuesClient.Send(ctx, msg)
if err != nil {
log.Fatal(err)
}
log.Println(result)
pollRequest := queues_stream.NewPollRequest().
SetChannel("sendAndReceiveWithVisibility").
SetMaxItems(1).
SetWaitTimeout(10).
SetVisibilitySeconds(2)
msgs, err := queuesClient.Poll(ctx, pollRequest)
for _, msg := range msgs.Messages {
log.Println(msg.String())
log.Println("Received message, waiting 1 seconds before ack")
time.Sleep(1 * time.Second)
log.Println("Extending visibility for 3 seconds and waiting 2 seconds before ack")
if err := msg.ExtendVisibility(3); err != nil {
log.Fatal(err)
}
time.Sleep(2 * time.Second)
if err := msg.Ack(); err != nil {
log.Fatal(err)
}
}
}