Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

memory leak #149

Open
lmb1113 opened this issue Dec 21, 2023 · 20 comments
Open

memory leak #149

lmb1113 opened this issue Dec 21, 2023 · 20 comments

Comments

@lmb1113
Copy link

lmb1113 commented Dec 21, 2023

image

image

My English is not very good, please understand.As the concurrency of my project increases, the memory also increases. After investigation, it was found that the reason was due to the publisher.startNotifyBlockedHandler() method's failure to exit and release memory in a timely manner. I am not sure what happened that caused the failure to exit. My expected outcome is publisher Close() can close normally.

After commenting out this line of code, I found it to be effective, but it is not elegant.
image

@lmb1113
Copy link
Author

lmb1113 commented Dec 21, 2023

Close your publishers and consumers when you're done with them and do not attempt to reuse them. Only close the connection itself once you've closed all associated publishers and consumers.

The issue has been resolved through conn.close.But it's not possible to repeat newConn.I did not reuse the publisher, but I reused Conn. When the publisher closed, startNotifyBlockedHandler was still blocking, so I had to reuse the publisher

@LucaWolf
Copy link

LucaWolf commented Feb 8, 2024

tough, that seems to be a leaky goroutine :-( indeed, the design does not cater for its termination. Short of moving things around [ e.g. a) provide an additional control channel like a context triggering on publisher close events or b) making the blocked chan parameter part of the publisher struct -- so it closes automatically when publisher is GC + changing the range blockings into a for/select also listening to this additional channel in addition to blockings], your best bet is to leave that goroutine commented out and live with the fact that connection blocking (has this really happened ever in real life?) will block your publishing/consuming (which ideally should have a time-out anyway).

@wagslane
Copy link
Owner

wagslane commented Mar 4, 2024

It seems to me that the most likely issue is that you have a lot of Publish() calls getting blocked. This function: startNotifyBlockedHandler waits for any in-flight publishes to finish and then stops new ones from happening. So if they are hanging indefinately for some reason that could be the root cause. I'd argue something at the infrastructure level is probably the issue.

You could add a context timeout, and that should stop them from hanging forever.

@wagslane wagslane closed this as completed Mar 4, 2024
@mjurczik
Copy link

mjurczik commented May 3, 2024

Hello, this seems still to be an issue.

I could reproduce this goroutine leak by using the example of the readme and printing the stacktrace before and after closing the publisher. The goroutine started at startup will not terminate with closing the publisher.

The problem is the goroutine is started and passes a notify chan (calling go channels chan to prevent confusion with rabbitmqs Channels) to the Connection. This chan will only be closed if the connection to rabbitmq is closed here.

But a long lived service will stay connected to the rabbitmq therefore hold the Connection alive. And only creating and closing publishers as needed. This results in the connection holding all channels in the blocks array created on startNotifyBlockedHandler, not closing it when a publisher shutdowns and keeping the goroutine at the for loop alive.

Contrary to the startNotifyFlowHandler which is also started on the creation of a publisher its chan is managed by a Channel. This channel is closed if the publisher Close function is called. Which terminates this routine started by the publisher.

I have two ideas for a fix:

  • your above mentioned context which is passed down into the function and acts as a lifetime, if the publisher is closed this lifetime is exceeded and the routine finishes. This is NOT allowed to close the channel or it will panic if the Connection is terminated and trying to close the channel aswell
  • tag the channel passed, notify the connection to remove the channel on publisher close. This would require changes in the wrapped library as well

I could implement the first. The second option would need discussion and implementation in amqp091-go library first.

Take care!

@lmb1113
Copy link
Author

lmb1113 commented May 7, 2024

Hello, this seems still to be an issue.

I could reproduce this goroutine leak by using the example of the readme and printing the stacktrace before and after closing the publisher. The goroutine started at startup will not terminate with closing the publisher.

The problem is the goroutine is started and passes a notify chan (calling go channels chan to prevent confusion with rabbitmqs Channels) to the Connection. This chan will only be closed if the connection to rabbitmq is closed here.

But a long lived service will stay connected to the rabbitmq therefore hold the Connection alive. And only creating and closing publishers as needed. This results in the connection holding all channels in the blocks array created on startNotifyBlockedHandler, not closing it when a publisher shutdowns and keeping the goroutine at the for loop alive.

Contrary to the startNotifyFlowHandler which is also started on the creation of a publisher its chan is managed by a Channel. This channel is closed if the publisher Close function is called. Which terminates this routine started by the publisher.

I have two ideas for a fix:

  • your above mentioned context which is passed down into the function and acts as a lifetime, if the publisher is closed this lifetime is exceeded and the routine finishes. This is NOT allowed to close the channel or it will panic if the Connection is terminated and trying to close the channel aswell
  • tag the channel passed, notify the connection to remove the channel on publisher close. This would require changes in the wrapped library as well

I could implement the first. The second option would need discussion and implementation in amqp091-go library first.

Take care!

once.Do rabbitmq.NewPublisher;Reusing this connection has not caused any problems after large-scale high-concurrency testing

@mjurczik
Copy link

mjurczik commented May 7, 2024

For me this is not a fix of the problem. This does not change the fact that even initializing one publisher and closing it will not stop all started goroutines from this publisher thus resulting in a leaking goroutine.
Furthermore someone can have multiple publisher which are written to simultaneously with different publisher options configured. So i'm confused how it should be possible to reuse a publisher created with different settings which does not allow to change them. Can you elaborate this more?

@lmb1113
Copy link
Author

lmb1113 commented May 7, 2024

For me this is not a fix of the problem. This does not change the fact that even initializing one publisher and closing it will not stop all started goroutines from this publisher thus resulting in a leaking goroutine. Furthermore someone can have multiple publisher which are written to simultaneously with different publisher options configured. So i'm confused how it should be possible to reuse a publisher created with different settings which does not allow to change them. Can you elaborate this more?

var once sync.Once
var oncePublish sync.Once
var conn *rabbitmq.Conn
var publisher *rabbitmq.Publisher

func GetMqConn() *rabbitmq.Conn {
	once.Do(func() {
		var err error
		conn, err = rabbitmq.NewConn(GetMqUrl())
		if err != nil {
			logging.Logger.Error("rabbitmq conn err", zap.Error(err))
		}
	})
	return conn
}

func GetPublisher() (*rabbitmq.Publisher, error) {
	var err error
	oncePublish.Do(func() {
		publisher, err = rabbitmq.NewPublisher(
			GetMqConn(),
			rabbitmq.WithPublisherOptionsLogging,
		)
	})
	return publisher, err
}

Our business is relatively simple, with only one configuration. We can use GetPublisher globally, which has withstood the test of high concurrency. If you have multiple configurations, my suggestion is to put different configurations of Publishers into the Publisher pool to reuse the same configuration of Publishers

@mjurczik
Copy link

mjurczik commented May 7, 2024

Ah thank you for your example @lmb1113 . We have a different use case, we open a lot of publishers because each client gets its own queue and exchange for incoming messages

@mjurczik
Copy link

Hello,

dont want to necro bump but @wagslane should i open a new issue for this? Or can this issue be reopened?

@wagslane wagslane reopened this May 15, 2024
@wagslane
Copy link
Owner

Sorry I've been crazy busy at work. I want to look at this when I have some time just haven't had a chance yet

@lmb1113
Copy link
Author

lmb1113 commented May 16, 2024

Hello, I noticed that each publisher has created the startNotifyBlockedHandler function. Can we adjust it to use the same function for each conn? My understanding is that the function of startNotifyBlockedHandler is to prevent publishing if the connection is not available
@mjurczik @wagslane
Reproduce memory leak process, when globally connected once, multiple publishers

@lmb1113
Copy link
Author

lmb1113 commented May 16, 2024

publisher.startNotifyBlockedHandler()->conn.startNotifyBlockedHandler()?

@hotrush
Copy link

hotrush commented Jun 21, 2024

We also met this issue. We don't reuse publishers or consumers, but when getting a lot of messages to publish we receive OOMs with ~2-3gb memory usage when regular memory usage is about 100-200mb. Hope you can fix it.

@lmb1113
Copy link
Author

lmb1113 commented Jun 21, 2024

publisher.startNotifyBlockedHandler()->conn.startNotifyBlockedHandler()?

We also met this issue. We don't reuse publishers or consumers, but when getting a lot of messages to publish we receive OOMs with ~2-3gb memory usage when regular memory usage is about 100-200mb. Hope you can fix it.

Suggest reuse publishers or consumers

@hotrush
Copy link

hotrush commented Jun 21, 2024

@lmb1113 we tried that approach but getting a lot of errors like here:

@lmb1113
Copy link
Author

lmb1113 commented Jun 21, 2024

@lmb1113 we tried that approach but getting a lot of errors like here:

Reuse connections and reuse publishers or consumers?

For me this is not a fix of the problem. This does not change the fact that even initializing one publisher and closing it will not stop all started goroutines from this publisher thus resulting in a leaking goroutine. Furthermore someone can have multiple publisher which are written to simultaneously with different publisher options configured. So i'm confused how it should be possible to reuse a publisher created with different settings which does not allow to change them. Can you elaborate this more?

var once sync.Once
var oncePublish sync.Once
var conn *rabbitmq.Conn
var publisher *rabbitmq.Publisher

func GetMqConn() *rabbitmq.Conn {
	once.Do(func() {
		var err error
		conn, err = rabbitmq.NewConn(GetMqUrl())
		if err != nil {
			logging.Logger.Error("rabbitmq conn err", zap.Error(err))
		}
	})
	return conn
}

func GetPublisher() (*rabbitmq.Publisher, error) {
	var err error
	oncePublish.Do(func() {
		publisher, err = rabbitmq.NewPublisher(
			GetMqConn(),
			rabbitmq.WithPublisherOptionsLogging,
		)
	})
	return publisher, err
}

Our business is relatively simple, with only one configuration. We can use GetPublisher globally, which has withstood the test of high concurrency. If you have multiple configurations, my suggestion is to put different configurations of Publishers into the Publisher pool to reuse the same configuration of Publishers

@hotrush Reuse connections and reuse publishers or consumers?

@hotrush
Copy link

hotrush commented Jun 21, 2024

@lmb1113 connection reused always. Publisher reused - connection/reconnection issues, publisher not reused - memory leaks. Smth like that

@wagslane
Copy link
Owner

@mjurczik So all resources are not necessarily expected to be cleaned up when you close the publisher, you would need to also close the underlying connection itself. When you close both are you still seeing a "leak"?

(And if that's not a suitable use case you might want to just drop down to the amqp lib)

@mjurczik
Copy link

mjurczik commented Jul 9, 2024

Hello,

So all resources are not necessarily expected to be cleaned up when you close the publisher, you would need to also close the underlying connection itself.

Imo this should not be expected. A connection should be reusable, removing a publisher and have them free their memory along side their started routines should not force you to close the connection aswell.
In the readme itself it says: Close your publishers and consumers when you're done with them and do not attempt to reuse them. Only close the connection itself once you've closed all associated publishers and consumers.

When you close both are you still seeing a "leak"?

Sure if i shutdown my programme and gracefully close all publisher and afterwards close the connection the routines will be stopped gracefully. That happens because on conn.Close() function shutdown closes the channels blocking the startNotifyFlowHandler.

Closing the connection seems not to be a suitable fix:

Lets say we have 3 Publishers, and we want to close one of them because it is not needed anymore.
We can't simply close it AND the connection. This will eventually break the connection required by the other still in use publishers as well. Closing only the publisher will keep the startNotifyFlowHandler routine alive, which doesn't do anything anymore, because the publisher it is assigned to and the notify will be reported to is already closed and discarded.

I thought about my "fix" with the lifetime, the problem is this would finish the routine and allow the publisher + routine to be garbage collected but the chan whould still be hold in the Connection blocks array in amqp lib. I guess it would be better to implement something similar in the ConnectionManager it holds the block channels, but on NotifyBlockedSafe does not pass them down, instead the ConnectionManager passes one channel down to amqp and on signal iterates through the selfmanaged blocks channels. Additionally a function to remove a channel from the connection manager would be needed allowing to remove it.

If you like i can implement this, create a PR and you can still decide if this does make sense for you :)

@wagslane
Copy link
Owner

Yeah this makes sense to me @mjurczik ! If we have 3 publishers on a single connection, we should be able to close 1 publisher, keep the connection alive for the other 2, and release any publisher specific resources.

If you'd like to propose a PR I will take a look when I have time

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants