Skip to content

Commit

Permalink
Fix: Connection Limit Invalid for Queue/Topic Limits Greater than 100 (
Browse files Browse the repository at this point in the history
…#73)

* fix: connection string invalid for queue limit > 100

* fix: connection string invalid for topic limit > 100

* chore: move max per page constant to BaseHelper.cs
  • Loading branch information
Lukejkw authored Jan 30, 2023
1 parent 557300b commit e254568
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 60 deletions.
2 changes: 2 additions & 0 deletions PurpleExplorer/Helpers/BaseHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ namespace PurpleExplorer.Helpers;

public abstract class BaseHelper
{
protected const int MaxRequestItemsPerPage = 100;

protected ManagementClient GetManagementClient(ServiceBusConnectionString connectionString)
{
if (connectionString.UseManagedIdentity)
Expand Down
3 changes: 0 additions & 3 deletions PurpleExplorer/Helpers/ITopicHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ public interface ITopicHelper
{
public Task<NamespaceInfo> GetNamespaceInfo(ServiceBusConnectionString connectionString);
public Task<IList<ServiceBusTopic>> GetTopicsAndSubscriptions(ServiceBusConnectionString connectionString);
public Task<ServiceBusTopic> GetTopic(ServiceBusConnectionString connectionString, string topicPath, bool retrieveSubscriptions);
public Task<IList<ServiceBusSubscription>> GetSubscriptions(ServiceBusConnectionString connectionString, string topicPath);
public Task<ServiceBusSubscription> GetSubscription(ServiceBusConnectionString connectionString, string topicPath, string subscriptionName);
public Task<IList<Message>> GetDlqMessages(ServiceBusConnectionString connectionString, string topic, string subscription);
public Task<IList<Models.Message>> GetMessagesBySubscription(ServiceBusConnectionString connectionString, string topicName, string subscriptionName);
public Task SendMessage(ServiceBusConnectionString connectionString, string topicPath, string content);
Expand Down
81 changes: 55 additions & 26 deletions PurpleExplorer/Helpers/QueueHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Core;
using Microsoft.Azure.ServiceBus.Management;
using PurpleExplorer.Models;
using Message = PurpleExplorer.Models.Message;
using AzureMessage = Microsoft.Azure.ServiceBus.Message;
Expand All @@ -22,29 +23,15 @@ public QueueHelper(AppSettings appSettings)

public async Task<IList<ServiceBusQueue>> GetQueues(ServiceBusConnectionString connectionString)
{
IList<ServiceBusQueue> queues = new List<ServiceBusQueue>();
var client = GetManagementClient(connectionString);
var queuesInfo = await client.GetQueuesRuntimeInfoAsync(_appSettings.QueueListFetchCount);
await client.CloseAsync();

await Task.WhenAll(queuesInfo.Select(async queue =>
{
var queueName = queue.Path;
var newQueue = new ServiceBusQueue(queue)
{
Name = queueName
};
queues.Add(newQueue);
}));

var queues = await GetQueues(client);
await client.CloseAsync();
return queues;
}

public async Task SendMessage(ServiceBusConnectionString connectionString, string queueName, string content)
{
var message = new AzureMessage {Body = Encoding.UTF8.GetBytes(content)};
var message = new AzureMessage { Body = Encoding.UTF8.GetBytes(content) };
await SendMessage(connectionString, queueName, message);
}

Expand Down Expand Up @@ -72,7 +59,7 @@ public async Task<IList<Message>> GetDlqMessages(ServiceBusConnectionString conn

return receivedMessages.Select(message => new Message(message, true)).ToList();
}

public async Task DeadletterMessage(ServiceBusConnectionString connectionString, string queue, Message message)
{
var receiver = GetMessageReceiver(connectionString, queue, ReceiveMode.PeekLock);
Expand All @@ -95,7 +82,7 @@ public async Task DeadletterMessage(ServiceBusConnectionString connectionString,

await receiver.CloseAsync();
}

public async Task DeleteMessage(ServiceBusConnectionString connectionString, string queue,
Message message, bool isDlq)
{
Expand All @@ -121,18 +108,19 @@ public async Task DeleteMessage(ServiceBusConnectionString connectionString, str

await receiver.CloseAsync();
}

private async Task<AzureMessage> PeekDlqMessageBySequenceNumber(ServiceBusConnectionString connectionString, string queue, long sequenceNumber)

private async Task<AzureMessage> PeekDlqMessageBySequenceNumber(ServiceBusConnectionString connectionString,
string queue, long sequenceNumber)
{
var deadletterPath = EntityNameHelper.FormatDeadLetterPath(queue);

var receiver = GetMessageReceiver(connectionString, deadletterPath, ReceiveMode.PeekLock);
var azureMessage = await receiver.PeekBySequenceNumberAsync(sequenceNumber);
await receiver.CloseAsync();

return azureMessage;
}

public async Task ResubmitDlqMessage(ServiceBusConnectionString connectionString, string queue, Message message)
{
var azureMessage = await PeekDlqMessageBySequenceNumber(connectionString, queue, message.SequenceNumber);
Expand Down Expand Up @@ -165,7 +153,7 @@ public async Task<long> PurgeMessages(ServiceBusConnectionString connectionStrin
await receiver.CloseAsync();
return purgedCount;
}

public async Task<long> TransferDlqMessages(ServiceBusConnectionString connectionString, string queuePath)
{
var path = EntityNameHelper.FormatDeadLetterPath(queuePath);
Expand Down Expand Up @@ -193,7 +181,7 @@ public async Task<long> TransferDlqMessages(ServiceBusConnectionString connectio
}
finally
{
if (receiver != null)
if (receiver != null)
await receiver.CloseAsync();

if (sender != null)
Expand All @@ -202,4 +190,45 @@ public async Task<long> TransferDlqMessages(ServiceBusConnectionString connectio

return transferredCount;
}

private async Task<List<ServiceBusQueue>> GetQueues(ManagementClient client)
{
var queueInfos = new List<QueueRuntimeInfo>();
var numberOfPages = _appSettings.QueueListFetchCount / MaxRequestItemsPerPage;
var remainder = _appSettings.QueueListFetchCount % (numberOfPages * MaxRequestItemsPerPage);

for (int pageCount = 0; pageCount < numberOfPages; pageCount++)
{
var numberToSkip = MaxRequestItemsPerPage * pageCount;
var page = await client.GetQueuesRuntimeInfoAsync(MaxRequestItemsPerPage, numberToSkip);
if (page.Any())
{
queueInfos.AddRange(page);
}
else
{
return queueInfos
.Select(q => new ServiceBusQueue(q)
{
Name = q.Path
}).ToList();
}
}

if (remainder > 0)
{
var numberAlreadyFetched = numberOfPages > 0
? MaxRequestItemsPerPage * numberOfPages
: 0;
var remainingItems = await client.GetQueuesRuntimeInfoAsync(
remainder,
numberAlreadyFetched);
queueInfos.AddRange(remainingItems);
}

return queueInfos.Select(q => new ServiceBusQueue(q)
{
Name = q.Path
}).ToList();
}
}
106 changes: 75 additions & 31 deletions PurpleExplorer/Helpers/TopicHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using AvaloniaEdit.Utils;
using Message = PurpleExplorer.Models.Message;
using AzureMessage = Microsoft.Azure.ServiceBus.Message;

Expand All @@ -23,41 +24,76 @@ public TopicHelper(AppSettings appSettings)

public async Task<IList<ServiceBusTopic>> GetTopicsAndSubscriptions(ServiceBusConnectionString connectionString)
{
IList<ServiceBusTopic> topics = new List<ServiceBusTopic>();
var client = GetManagementClient(connectionString);
var busTopics = await client.GetTopicsAsync(_appSettings.TopicListFetchCount);
var topics = await GetTopicsWithSubscriptions(client);
await client.CloseAsync();
return topics;
}

private async Task<ServiceBusTopic> CreateTopicWithSubscriptions(ManagementClient client, TopicDescription topicDescription)
{
var topic = new ServiceBusTopic(topicDescription);
var subscriptions = await GetSubscriptions(client, topicDescription.Path);
topic.AddSubscriptions(subscriptions.ToArray());
return topic;
}

private async Task<List<ServiceBusTopic>> GetTopicsWithSubscriptions(ManagementClient client)
{
var topicDescriptions = new List<TopicDescription>();
var numberOfPages = _appSettings.TopicListFetchCount / MaxRequestItemsPerPage;
var remainder = _appSettings.TopicListFetchCount % (numberOfPages * MaxRequestItemsPerPage);

await Task.WhenAll(busTopics.Select(async topic =>
for (int pageCount = 0; pageCount < numberOfPages; pageCount++)
{
var newTopic = new ServiceBusTopic(topic);
var numberToSkip = MaxRequestItemsPerPage * pageCount;
var page = await client.GetTopicsAsync(MaxRequestItemsPerPage, numberToSkip);
if (page.Any())
{
topicDescriptions.AddRange(page);
}
else
{
return (await Task.WhenAll(topicDescriptions
.Select(async topic => await CreateTopicWithSubscriptions(client, topic)))).ToList();
}
}

var subscriptions = await GetSubscriptions(connectionString, newTopic.Name);
newTopic.AddSubscriptions(subscriptions.ToArray());
topics.Add(newTopic);
}));
if (remainder > 0)
{
var numberAlreadyFetched = numberOfPages > 0
? MaxRequestItemsPerPage * numberOfPages
: 0;
var remainingItems = await client.GetTopicsAsync(
remainder,
numberAlreadyFetched);
topicDescriptions.AddRange(remainingItems);
}

return topics;
var topics = await Task.WhenAll(topicDescriptions
.Select(async topic => await CreateTopicWithSubscriptions(client, topic)));
return topics.ToList();
}

public async Task<ServiceBusTopic> GetTopic(ServiceBusConnectionString connectionString, string topicPath, bool retrieveSubscriptions)
public async Task<ServiceBusTopic> GetTopic(ServiceBusConnectionString connectionString, string topicPath,
bool retrieveSubscriptions)
{
var client = GetManagementClient(connectionString);
var busTopics = await client.GetTopicAsync(topicPath);
await client.CloseAsync();

var newTopic = new ServiceBusTopic(busTopics);

if (retrieveSubscriptions)
{
var subscriptions = await GetSubscriptions(connectionString, newTopic.Name);
var subscriptions = await GetSubscriptions(client, newTopic.Name);
newTopic.AddSubscriptions(subscriptions.ToArray());
}


await client.CloseAsync();
return newTopic;
}

public async Task<ServiceBusSubscription> GetSubscription(ServiceBusConnectionString connectionString, string topicPath, string subscriptionName)

public async Task<ServiceBusSubscription> GetSubscription(ServiceBusConnectionString connectionString,
string topicPath, string subscriptionName)
{
var client = GetManagementClient(connectionString);
var runtimeInfo = await client.GetSubscriptionRuntimeInfoAsync(topicPath, subscriptionName);
Expand All @@ -66,12 +102,12 @@ public async Task<ServiceBusSubscription> GetSubscription(ServiceBusConnectionSt
return new ServiceBusSubscription(runtimeInfo);
}

public async Task<IList<ServiceBusSubscription>> GetSubscriptions(ServiceBusConnectionString connectionString, string topicPath)
private async Task<IList<ServiceBusSubscription>> GetSubscriptions(
ManagementClient client,
string topicPath)
{
IList<ServiceBusSubscription> subscriptions = new List<ServiceBusSubscription>();
var client = GetManagementClient(connectionString);
var topicSubscription = await client.GetSubscriptionsRuntimeInfoAsync(topicPath);
await client.CloseAsync();

foreach (var sub in topicSubscription)
{
Expand All @@ -81,7 +117,8 @@ public async Task<IList<ServiceBusSubscription>> GetSubscriptions(ServiceBusConn
return subscriptions;
}

public async Task<IList<Message>> GetMessagesBySubscription(ServiceBusConnectionString connectionString, string topicName,
public async Task<IList<Message>> GetMessagesBySubscription(ServiceBusConnectionString connectionString,
string topicName,
string subscriptionName)
{
var path = EntityNameHelper.FormatSubscriptionPath(topicName, subscriptionName);
Expand All @@ -94,7 +131,8 @@ public async Task<IList<Message>> GetMessagesBySubscription(ServiceBusConnection
return result;
}

public async Task<IList<Message>> GetDlqMessages(ServiceBusConnectionString connectionString, string topic, string subscription)
public async Task<IList<Message>> GetDlqMessages(ServiceBusConnectionString connectionString, string topic,
string subscription)
{
var path = EntityNameHelper.FormatSubscriptionPath(topic, subscription);
var deadletterPath = EntityNameHelper.FormatDeadLetterPath(path);
Expand All @@ -115,7 +153,7 @@ public async Task<NamespaceInfo> GetNamespaceInfo(ServiceBusConnectionString con

public async Task SendMessage(ServiceBusConnectionString connectionString, string topicPath, string content)
{
var message = new AzureMessage {Body = Encoding.UTF8.GetBytes(content)};
var message = new AzureMessage { Body = Encoding.UTF8.GetBytes(content) };
await SendMessage(connectionString, topicPath, message);
}

Expand All @@ -126,7 +164,8 @@ public async Task SendMessage(ServiceBusConnectionString connectionString, strin
await topicClient.CloseAsync();
}

public async Task DeleteMessage(ServiceBusConnectionString connectionString, string topicPath, string subscriptionPath,
public async Task DeleteMessage(ServiceBusConnectionString connectionString, string topicPath,
string subscriptionPath,
Message message, bool isDlq)
{
var path = EntityNameHelper.FormatSubscriptionPath(topicPath, subscriptionPath);
Expand All @@ -153,7 +192,8 @@ public async Task DeleteMessage(ServiceBusConnectionString connectionString, str
await receiver.CloseAsync();
}

public async Task<long> PurgeMessages(ServiceBusConnectionString connectionString, string topicPath, string subscriptionPath,
public async Task<long> PurgeMessages(ServiceBusConnectionString connectionString, string topicPath,
string subscriptionPath,
bool isDlq)
{
var path = EntityNameHelper.FormatSubscriptionPath(topicPath, subscriptionPath);
Expand All @@ -177,11 +217,12 @@ public async Task<long> PurgeMessages(ServiceBusConnectionString connectionStrin
return purgedCount;
}

public async Task<long> TransferDlqMessages(ServiceBusConnectionString connectionString, string topicPath, string subscriptionPath)
public async Task<long> TransferDlqMessages(ServiceBusConnectionString connectionString, string topicPath,
string subscriptionPath)
{
var path = EntityNameHelper.FormatSubscriptionPath(topicPath, subscriptionPath);
path = EntityNameHelper.FormatDeadLetterPath(path);

long transferredCount = 0;
MessageReceiver receiver = null;
TopicClient sender = null;
Expand All @@ -205,7 +246,7 @@ public async Task<long> TransferDlqMessages(ServiceBusConnectionString connectio
}
finally
{
if (receiver != null)
if (receiver != null)
await receiver.CloseAsync();

if (sender != null)
Expand All @@ -215,7 +256,8 @@ public async Task<long> TransferDlqMessages(ServiceBusConnectionString connectio
return transferredCount;
}

private async Task<AzureMessage> PeekDlqMessageBySequenceNumber(ServiceBusConnectionString connectionString, string topicPath,
private async Task<AzureMessage> PeekDlqMessageBySequenceNumber(ServiceBusConnectionString connectionString,
string topicPath,
string subscriptionPath, long sequenceNumber)
{
var path = EntityNameHelper.FormatSubscriptionPath(topicPath, subscriptionPath);
Expand All @@ -224,11 +266,12 @@ private async Task<AzureMessage> PeekDlqMessageBySequenceNumber(ServiceBusConnec
var receiver = GetMessageReceiver(connectionString, deadletterPath, ReceiveMode.PeekLock);
var azureMessage = await receiver.PeekBySequenceNumberAsync(sequenceNumber);
await receiver.CloseAsync();

return azureMessage;
}

public async Task ResubmitDlqMessage(ServiceBusConnectionString connectionString, string topicPath, string subscriptionPath,
public async Task ResubmitDlqMessage(ServiceBusConnectionString connectionString, string topicPath,
string subscriptionPath,
Message message)
{
var azureMessage = await PeekDlqMessageBySequenceNumber(connectionString, topicPath, subscriptionPath,
Expand All @@ -240,7 +283,8 @@ public async Task ResubmitDlqMessage(ServiceBusConnectionString connectionString
await DeleteMessage(connectionString, topicPath, subscriptionPath, message, true);
}

public async Task DeadletterMessage(ServiceBusConnectionString connectionString, string topicPath, string subscriptionPath,
public async Task DeadletterMessage(ServiceBusConnectionString connectionString, string topicPath,
string subscriptionPath,
Message message)
{
var path = EntityNameHelper.FormatSubscriptionPath(topicPath, subscriptionPath);
Expand Down

0 comments on commit e254568

Please sign in to comment.