Skip to content

Make batch create stream SendFeedback thread safe#3215

Open
jenrryyou wants to merge 1 commit intoapache:masterfrom
jenrryyou:batch_stream_send_feedback
Open

Make batch create stream SendFeedback thread safe#3215
jenrryyou wants to merge 1 commit intoapache:masterfrom
jenrryyou:batch_stream_send_feedback

Conversation

@jenrryyou
Copy link
Contributor

What problem does this PR solve?

Issue Number: resolve
Fixes #2754

Problem Summary:
批量创建stream场景下,解决 Stream SetConnected() 与 Consume() 并发导致的 SendFeedback
发送与已消费字节统计的竞态问题,使得反馈的已消费字节数准确,避免发送端错误反压。

What is changed and the side effects?

Changed:

Side effects:

  • Performance effects:
  1. 对非批量创建场景,会走到fast path,对性能无任何影响;
  2. 对批量创建场景,连接建立前的消费路径会略增 CPU,但只发生在“建连窗口期”,创建完后对性能无任何影响。
  • Breaking backward compatibility:
    兼容

Check List:

@wwbmmm
Copy link
Contributor

wwbmmm commented Feb 6, 2026

Can you add some unit test to verify this scenario?

@jenrryyou
Copy link
Contributor Author

Can you add some unit test to verify this scenario?

ok, I will try to add it.

@jenrryyou jenrryyou force-pushed the batch_stream_send_feedback branch 2 times, most recently from 3ac4cfa to d37798a Compare March 5, 2026 08:27
@jenrryyou
Copy link
Contributor Author

jenrryyou commented Mar 5, 2026

Can you add some unit test to verify this scenario?

• 新增用例:test/brpc_streaming_rpc_unittest.cpp(StreamingRpcTest.batch_create_stream_feedback_race)

• 核心思路:服务端max_buf_size设置为16B。客户端批量创建 2 条 stream,刻意阻塞 extra stream 的 SetConnected()(锁住其 _connect_mutex),让 extra stream 先 Consume() 到 64B 数据后再放开 SetConnected();修复后 SetConnected() 会基于 _atomic_local_consumed 立刻补发首个 FEEDBACK(consumed_size=64),从而使服务端 extra stream的第二次 StreamWrite(1B) 不再长期 EAGAIN,避免错误反压。如果没有这个修复,这个单测会直接因为错误反压而hang住。

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes a race in the batch stream creation path where Stream::SetConnected() and the consumer queue (Consume()) can run concurrently, causing SendFeedback to report an incorrect consumed byte count and potentially triggering incorrect backpressure on the sender.

Changes:

  • Make stream connection state and pre-connect consumed-byte accounting thread-safe using atomics.
  • Update SendFeedback to accept an explicit consumed-byte value to avoid racing on shared state.
  • Add a unit test that reliably reproduces the batch-create SetConnected() vs Consume() feedback race.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

File Description
test/brpc_streaming_rpc_unittest.cpp Adds a regression test that enlarges the race window and asserts FEEDBACK unblocks the sender correctly.
src/brpc/stream_impl.h Updates internal stream state (_connected, consumed tracking) and changes SendFeedback signature.
src/brpc/stream.cpp Implements atomic connection/consumed logic and sends an initial FEEDBACK after connect when needed.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jenrryyou jenrryyou force-pushed the batch_stream_send_feedback branch from d37798a to 96fc748 Compare March 5, 2026 11:51
@jenrryyou jenrryyou force-pushed the batch_stream_send_feedback branch from 96fc748 to 71b7595 Compare March 5, 2026 13:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants