Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Nov 15, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent aee5323 commit 3e1c469
Showing 1 changed file with 4 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -261,7 +261,7 @@ static MessageQueueSchemaUtils.ConsumerWrapper createRabbitmqConsumer(

try {
Connection connection = connectionConfig.getConnectionFactory().newConnection();
Channel channel = setupChannel(connectionConfig, connection);
Channel channel = setupChannel(connection);
if (channel == null) {
throw new RuntimeException("None of RabbitMQ channels are available");
}
@@ -294,13 +294,8 @@ private static void setupQueue(Channel channel, Configuration rabbitmqConfig, St
}
}

private static Channel setupChannel(
RMQConnectionConfig rmqConnectionConfig, Connection connection) throws Exception {
Channel chan = connection.createChannel();
if (rmqConnectionConfig.getPrefetchCount().isPresent()) {
chan.basicQos(rmqConnectionConfig.getPrefetchCount().get(), true);
}
return chan;
private static Channel setupChannel(Connection connection) throws Exception {
return connection.createChannel();
}

static DataFormat getDataFormat(Configuration rabbitmqConfig) {
@@ -321,7 +316,7 @@ private static class RabbitmqConsumerWrapper
@Override
public List<String> getRecords(String queue, int pollTimeOutMills) {
try {
GetResponse response = channel.basicGet(queue, true);
GetResponse response = channel.basicGet(queue, false);
return response == null
? Collections.emptyList()
: Collections.singletonList(

0 comments on commit 3e1c469

Please sign in to comment.