Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Notify app of client disconnection when request is in progress. #1556

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

gourav-kandoria
Copy link
Contributor

Previously, the app was not notified when the client disconnected. This caused issues especially in cases of websocket connections and SSE Events where the app continued to send data to the router, which could not deliver it to the client due to the disconnection.

Changes made:

  1. Added functionality to send a port message to notify the app of client disconnection.
  2. For now handled and tested this scenario only for ASGI websockets and ASGI HTTP, ensuring that if the app detects a client disconnection, it follows the ASGI specification.

This ensures that the app is properly informed of client disconnections and can handle them according to the ASGI spec.

@gourav-kandoria gourav-kandoria marked this pull request as draft February 4, 2025 18:13
@gourav-kandoria gourav-kandoria marked this pull request as ready for review February 4, 2025 18:20
@ac000
Copy link
Member

ac000 commented Feb 5, 2025

Would you mind splitting the python changes into a separate commit? (within this PR...)

@gourav-kandoria
Copy link
Contributor Author

gourav-kandoria commented Feb 5, 2025

Would you mind splitting the python changes into a separate commit? (within this PR...)

Sure, Doing changes as per suggetions

@gourav-kandoria gourav-kandoria force-pushed the fix_issue_apps_not_notified_when_client_disconnects branch 2 times, most recently from 86682b6 to bd552e5 Compare February 5, 2025 18:57
@gourav-kandoria
Copy link
Contributor Author

gourav-kandoria commented Feb 5, 2025

@ac000 have made the changes. Just one more thing to highlight. The exception as per asgi spect should be of subclass of OSError. (If send() is called on a closed connection the server should raise a server-specific subclass of OSError. ) . But, we are raising PyExc_RuntimeError which is fine for abnormal cases. But here this could be returned PyExc_OS. Please find this for reference- https://docs.python.org/3/c-api/exceptions.html#standard-exceptions

This caused issues especially in cases of websocket connections and
SSE Events where the app continued to send data to the router, which
could not deliver it to the client due to the disconnection.

Changes made:

    Added functionality to send a port message to notify the app of
    client disconnection in form of port message(_NXT_PORT_MSG_CLIENT_ERROR).

    On the App side, handled this message and called the registered close_hanlder
    callback if registered.
@gourav-kandoria gourav-kandoria force-pushed the fix_issue_apps_not_notified_when_client_disconnects branch from bd552e5 to 7caee0e Compare February 6, 2025 15:37
@ac000
Copy link
Member

ac000 commented Feb 6, 2025

Thanks @gourav-kandoria and the whitespace checker no longer complains :-)

@ac000
Copy link
Member

ac000 commented Feb 6, 2025

@ac000 have made the changes. Just one more thing to highlight. The exception as per asgi spect should be of subclass of OSError. (If send() is called on a closed connection the server should raise a server-specific subclass of OSError. ) . But, we are raising PyExc_RuntimeError which is fine for abnormal cases. But here this could be returned PyExc_OS. Please find this for reference- https://docs.python.org/3/c-api/exceptions.html#standard-exceptions

Yeah, OSError would make more sense here...

But then we'd have to flow errno through...

@gourav-kandoria
Copy link
Contributor Author

gourav-kandoria commented Feb 6, 2025

@ac000 have made the changes. Just one more thing to highlight. The exception as per asgi spect should be of subclass of OSError. (If send() is called on a closed connection the server should raise a server-specific subclass of OSError. ) . But, we are raising PyExc_RuntimeError which is fine for abnormal cases. But here this could be returned PyExc_OS. Please find this for reference- https://docs.python.org/3/c-api/exceptions.html#standard-exceptions

Yeah, OSError would make more sense here...

But then we'd have to flow errno through...

You mean, to flow the error all the way from router to app passing exact errorno?

@ac000
Copy link
Member

ac000 commented Feb 6, 2025

Yeah, OSError would make more sense here...
But then we'd have to flow errno through...

You mean, to flow the error all the way from router to app passing exact errorno?

Right.

As this error will happen in the router process that is where errno(3p) will be set. In this case I guess it's likely to be either ECONNRESET or EPIPE.

I'm not saying you need to do that as that's likely not a trivial change. Or we could just hardcode say ECONNRESET

However, what you have there now looks reasonable to me, but I'd like to get @hongzhidao 's overall input on this.

@ac000 ac000 requested a review from hongzhidao February 6, 2025 19:49
@ac000
Copy link
Member

ac000 commented Feb 6, 2025

@gourav-kandoria

Was there an open issue about this?

@hongzhidao
Copy link
Contributor

Hi @gourav-kandoria @ac000,

However, what you have there now looks reasonable to me, but I'd like to get @hongzhidao 's overall input on this.

Will do it, thanks for the contribution.

@gourav-kandoria
Copy link
Contributor Author

gourav-kandoria commented Feb 7, 2025

@gourav-kandoria

Was there an open issue about this?

@ac000 Exact same issue was not opened but this issue was due to this reason #1501

@gourav-kandoria
Copy link
Contributor Author

gourav-kandoria commented Feb 7, 2025

I'm not saying you need to do that as that's likely not a trivial change. Or we could just hardcode say ECONNRESET

Got it. So for now, will raise ECONNRESET

…ASGI spec

For HTTP connections:
- If the app is sending data using the send callable, according to the ASGI spec, it should throw an exception
  in case of client disconnection. Previously, even if we processed the client_error message
  and set the http->closed state, it wouldn't throw an error because it wasn't handled.
  This change ensures that the exception is raised as per the ASGI spec.

For WebSocket connections:
- If the app is awaiting on receive, it would get a 'websocket.disconnect' event.
  However, if the app continues to send data using the send callable after receiving this event,
  it wouldn't raise an error because ws->state = NXT_WS_DISCONNECTED was never set in that case.
  According to the ASGI spec, if send is called after receiving a 'websocket.disconnect' event or on a closed client,
  it should raise an exception. This change ensures that the exception is raised as per the ASGI spec.
@gourav-kandoria gourav-kandoria force-pushed the fix_issue_apps_not_notified_when_client_disconnects branch from 7caee0e to 8deb39c Compare February 7, 2025 17:22
@ac000
Copy link
Member

ac000 commented Feb 7, 2025

Hi @gourav-kandoria

Yeah, even though I said that, I'm not sure it's the right thing to do...

Do you have some reproducer for this issue?

I've been trying to myself, but no luck, I am unable to trigger either of nxt_http_request_error() or nxt_http_request_error_handler() which are the only places I see us setting r->error.

Python also seems to be notified (I.e. if m['type'] == 'websocket.disconnect': triggers) about the websocket closure even if you kill -9 the client (the kernel closes the socket) .

The only was I can get thing gummied up is if I firewall the websocket after the client connects.

With this hack

import time                                                                     
                                                                                
async def application(scope, receive, send):                                    
    if scope['type'] == 'websocket':                                            
        print("WebSocket!")                                                     
        while True:                                                             
            m = await receive()                                                 
            if m['type'] == 'websocket.connect':                                
                print("Waiting for websocket.accept")                           
                await send({'type': 'websocket.accept'})                        
                print("Got WebSocket connection")                               
                                                                                
                time.sleep(5);
                                                  
                send(                                                           
                    {                                                           
                        'type': 'websocket.send',                               
                        'bytes': m.get('bytes', None),                          
                        'text': "Test",                                         
                    }                                                           
                )                                                               

If I Ctrl-c the client while the app is sleeping, the router process never attempts to send the message (even though AFAICT the python app has called send() sending the data to the router process), perhaps because Unit knows the socket is closed.

So at the moment I am having a bit of a doubt about this whole thing...

Heh, just noticed the you pointed out an open issue... after a quick skim, I'm even more confused now... that doesn't seem to be about WebSockets and this PR does...

@gourav-kandoria
Copy link
Contributor Author

Heh, just noticed the you pointed out an open issue... after a quick skim, I'm even more confused now... that doesn't seem to be about WebSockets and this PR does...

oh sorry my bad for mixing up things. Just for the issue which I mentioned earlier. I am sharing app side code and client side script which I used to test it. Will share the scenario, why also made changes websocket related files once the changes to this thing are verified.

application :

async def application_sse(scope, receive, send):
    if scope['type'] == 'http':
        headers = [
            (b'content-type', b'text/event-stream'),
            (b'cache-control', b'no-cache'),
            (b'connection', b'keep-alive'),
        ]
        await send({
            'type': 'http.response.start',
            'status': 200,
            'headers': headers,
        })

        send_task = asyncio.create_task(send_messages_sse(send))
        receive_task = asyncio.create_task(receive_messages_sse(receive))

        await asyncio.gather(send_task, receive_task)


async def receive_messages_sse(receive):
    message = await receive()
    print(f'message received: {message}')
    if message['type'] == 'http.disconnect':
        return

async def send_messages_sse(send):
    i = 0
    while True:
        try:
            message = f"event: count\ndata: {i}\n\n"
            print(f'message sent: {message}')
            await send({
                'type': 'http.response.body',
                'body': message.encode('utf-8'),
                'more_body': True 
            })
            i+=2
            await asyncio.sleep(2)
        except Exception as err:
            print(f'err : {err}')
            break

client side script:

import asyncio
import aiohttp

async def listen_to_events():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://localhost:8001') as response:
            async for line in response.content:
                if line:
                    print(line.decode('utf-8').strip())

if __name__ == '__main__':
    asyncio.run(listen_to_events())

used ctrl+c to close the connection from client side

@ac000
Copy link
Member

ac000 commented Feb 8, 2025

Hmm, I'd never heard of Server-Sent events before, even though they pre-date WebSockets...

But, yes, I can reproduce this with the above application + curl(1). Thanks.

An interesting observation is with WebSockets we see (from router to client)

[pid 31184] sendto(27, "\201\4Test", 6, 0, NULL, 0) = 6
[pid 31184] sendto(27, "\201\4Test", 6, 0, NULL, 0) = 6
[pid 31184] sendto(27, "\201\4Test", 6, 0, NULL, 0) = 6
[pid 31184] sendmsg(16, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\10\0\0\0\315y\0\0\1\0 \1\0\0\0\0", iov_len=16}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 16

Ctrl-C'ing the client and everything behaves as expected. The connection is closed, the router process sees this and informs the Python application.

With Server-Sent Events we see instead...

[pid 31002] writev(27, [{iov_base="\r\n16\r\n", iov_len=6}, {iov_base="event: count\ndata: 6\n\n", iov_len=22}], 2) = 28
[pid 31002] writev(27, [{iov_base="\r\n16\r\n", iov_len=6}, {iov_base="event: count\ndata: 8\n\n", iov_len=22}], 2) = 28
[pid 31002] writev(27, [{iov_base="\r\n17\r\n", iov_len=6}, {iov_base="event: count\ndata: 10\n\n", iov_len=23}], 2) = -1 EPIPE (Broken pipe)
[pid 31002] --- SIGPIPE {si_signo=SIGPIPE, si_code=SI_USER, si_pid=30999, si_uid=1000} ---

Once we get -EPIPE we call into nxt_http_request_error_handler() & nxt_http_request_close_handler()

We seem to use writev(3p) for HTTP sockets and sendto(2) for WebSockets... not sure why, probably not that important, just an interesting curiosity.

With your first patch we now see

[pid 31734] writev(11, [{iov_base="\r\n16\r\n", iov_len=6}, {iov_base="event: count\ndata: 6\n\n", iov_len=22}], 2) = 28
[pid 31734] writev(11, [{iov_base="\r\n16\r\n", iov_len=6}, {iov_base="event: count\ndata: 8\n\n", iov_len=22}], 2) = 28
[pid 31734] writev(11, [{iov_base="\r\n17\r\n", iov_len=6}, {iov_base="event: count\ndata: 10\n\n", iov_len=23}], 2) = -1 EPIPE (Broken pipe)
[pid 31734] --- SIGPIPE {si_signo=SIGPIPE, si_code=SI_USER, si_pid=31731, si_uid=1000} ---
[pid 31734] sendmsg(31, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\r\0\0\0\363{\0\0\1\0!\1\0\0\0\0", iov_len=16}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 16

So the router process is now informing the Pythion application, though in this case it's still writing to stdout...AFAICT the application doesn't get the http.disconnect message...

Applying this bit of the second patch

--- a/src/python/nxt_python_asgi_http.c
+++ b/src/python/nxt_python_asgi_http.c
@@ -368,6 +368,11 @@ nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict)
                             "sent, after response already completed");
     }
 
+    if (nxt_slow_path(http->closed)) {
+        return PyErr_Format(PyExc_ConnectionResetError,
+                            "Connection Closed ");
+    }
+
     if (nxt_slow_path(http->send_future != NULL)) {
         return PyErr_Format(PyExc_RuntimeError, "Concurrent send");
     }

We don't seem to hit that if () statement as we call this function before we call nxt_http_request_error_handler()

r->error is set in nxt_router_http_request_done(), but http->closed doesn't seem to be getting set, hmm, maybe it's not supposed to and we just need to flow the error condition through... needs more investigation...

@@ -1722,6 +1726,26 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
return NXT_UNIT_OK;
}

static int
nxt_unit_process_client_error(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I remember, this line will need wrapping as it just goes over the 80 character limit...

@gourav-kandoria
Copy link
Contributor Author

gourav-kandoria commented Feb 8, 2025

r->error is set in nxt_router_http_request_done(), but http->closed doesn't seem to be getting set, hmm, maybe it's not supposed to and we just need to flow the error condition through... needs more investigation...

I just noticed a strange behaviour on my system. I was assuming port message received from router will call
nxt_unit_process_client_error, then from this below mentioned flow should happen.

nxt_unit_process_client_error -> nxt_py_asgi_close_handle -> nxt_py_asgi_http_close_handler.

If this happens everything seems to work correctly. But what I noticed is that, after starting unit. If I am attaching application process with debugger, in the nxt_unit_process_client_error function. I am able to receive req object by this statement
req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0); and now if I even stop the debugger. for subsequent tests request I am always getting this request object upon disconnection.

But if I start unit and don't attache dubgger with application process. This statement(
req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0) ) is returning NULL.

Is it like in debugging mode, this object is kept in memory otherwise not?

@gourav-kandoria
Copy link
Contributor Author

r->error is set in nxt_router_http_request_done(), but http->closed doesn't seem to be getting set, hmm, maybe it's not supposed to and we just need to flow the error condition through... needs more investigation...

http->closed is supposed to set in this function "nxt_py_asgi_http_close_handler" . which is getting set when debugger is attached with application and not when debugger is not set as I mentioned in above comment

@ac000
Copy link
Member

ac000 commented Feb 8, 2025

Certainly don't be surprised that debugging can change the behaviour of the program...

@gourav-kandoria
Copy link
Contributor Author

gourav-kandoria commented Feb 9, 2025

Certainly don't be surprised that debugging can change the behaviour of the program...

oh okay, So, In PR, wherever I made changes related to nxt_unit_request_hash_find. The purpose of those were to make sure that req object is not removed from hash while request is still in progress and it would get removed once nxt_unit_request_info_release is called after request completion. But It is getting removed through some other way.
So, I have two questions now:

  1. Is it fine, if we take this approach. I mean keeping performance or memory implications in mind ?
  2. If yes, How do I make sure, I am always able to get this request object from hash, untill it is released by nxt_unit_request_info_release

@ac000
Copy link
Member

ac000 commented Feb 10, 2025

I think we need to step back for a minute as there may be a more fundamental issue here.

With a WebSocket

(strace(1)ing the Python app)

epoll_wait(12, 

<-- Ctrl-C the client

[{events=EPOLLIN, data={u32=8, u64=139869904961544}}], 3, -1) = 1
epoll_wait(12, [{events=EPOLLIN, data={u32=8, u64=139869904961544}}], 3, 0) = 1
recvmsg(8, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\r\0\0\0\243\205\0\0\1\0 \1\0\0\0\0", iov_len=16384}], msg_iovlen=1, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=34211, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 16
recvmsg(8, {msg_namelen=0}, 0)          = -1 EAGAIN (Resource temporarily unavailable)
epoll_wait(12, [], 3, 0)                = 0
sendmsg(14, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\r\0\0\0\252\205\0\0\0\0 \0\0\0\0\0", iov_len=16}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 16
sendmsg(14, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\r\0\0\0\252\205\0\0\0\0 \1\0\0\0\0", iov_len=16}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 16
epoll_wait(12, 

So you can see that python app is sitting in epoll_wait(2), when the socket is closed, the router process notifies the python app

However, with a HTTP connection

epoll_wait(14, 

<-- Ctrl-C the client

No notification is sent to the python app...

When a WebSocket is closed nxt_py_asgi_close_handler() is called but not when a HTTP connection is closed.

@gourav-kandoria
Copy link
Contributor Author

I think we need to step back for a minute as there may be a more fundamental issue here.
sure.

@gourav-kandoria
Copy link
Contributor Author

I think we need to step back for a minute as there may be a more fundamental issue here.

With a WebSocket

(strace(1)ing the Python app)

epoll_wait(12, 

<-- Ctrl-C the client

[{events=EPOLLIN, data={u32=8, u64=139869904961544}}], 3, -1) = 1
epoll_wait(12, [{events=EPOLLIN, data={u32=8, u64=139869904961544}}], 3, 0) = 1
recvmsg(8, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\r\0\0\0\243\205\0\0\1\0 \1\0\0\0\0", iov_len=16384}], msg_iovlen=1, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=34211, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 16
recvmsg(8, {msg_namelen=0}, 0)          = -1 EAGAIN (Resource temporarily unavailable)
epoll_wait(12, [], 3, 0)                = 0
sendmsg(14, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\r\0\0\0\252\205\0\0\0\0 \0\0\0\0\0", iov_len=16}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 16
sendmsg(14, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\r\0\0\0\252\205\0\0\0\0 \1\0\0\0\0", iov_len=16}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 16
epoll_wait(12, 

So you can see that python app is sitting in epoll_wait(2), when the socket is closed, the router process notifies the python app

However, with a HTTP connection

epoll_wait(14, 

<-- Ctrl-C the client

No notification is sent to the python app...

When a WebSocket is closed nxt_py_asgi_close_handler() is called but not when a HTTP connection is closed.

Just to explain what's in the PR

In Http SSE case, The code in pr is behaving this way. If app sends data to router and router fails to write to socket due to error like closed connection. It is setting r->error flag as true. and doing all resouce cleanup etc. At last nxt_router_http_request_done is being called. it is where I have plugged the code to notify app using client error msg.

So, router is only notifying when it fails to writes bytes to client not when client connection is actually closed.

@ac000
Copy link
Member

ac000 commented Feb 10, 2025

... not when client connection is actually closed.

This is the bit we need to fix first, then the other bit may not even be an issue...

@ac000
Copy link
Member

ac000 commented Feb 10, 2025

Just some notes...

The message (NXT_PORT_MSG_WEBSOCKET_LAST) that notifies about the websocket disconnect is sent from nxt_http_websocket_error_handler()

In the router process when closing a WebSocket

[pid 36046] epoll_wait(17, 

<-- Ctrl-C the client

[{events=EPOLLIN|EPOLLRDHUP, data={u32=671094160, u64=139939295532432}}], 32, 20167) = 1
[pid 36046] recvfrom(27, "", 135, 0, NULL, NULL) = 0
[pid 36046] fstat(1, {st_mode=S_IFCHR|0620, st_rdev=makedev(0x88, 0x1), ...}) = 0
[pid 36046] write(1, "nxt_h1p_conn_ws_error: \n", 24) = 24
[pid 36046] sendmsg(16, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\10\0\0\0\313\214\0\0\2\0 \1\0\0\0\0", iov_len=16}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 16
[pid 36046] write(1, "nxt_h1p_request_close: \n", 24) = 24
[pid 36046] write(1, "nxt_h1p_shutdown: \n", 19) = 19
[pid 36046] epoll_wait(17, [{events=EPOLLIN, data={u32=671091920, u64=139939295530192}}], 32, 0) = 1
[pid 36046] recvmsg(23, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\10\0\0\0\315\214\0\0\0\0 \0\0\0\0\0", iov_len=16}, {iov_base="", iov_len=16384}], msg_iovlen=2, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=36045, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 16
[pid 36046] recvmsg(23, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable)
[pid 36046] write(1, "nxt_h1p_closing: \n", 18) = 18
[pid 36046] epoll_ctl(17, EPOLL_CTL_MOD, 23, {events=EPOLLIN|EPOLLRDHUP|EPOLLET, data={u32=671091920, u64=139939295530192}}) = 0
[pid 36046] epoll_ctl(17, EPOLL_CTL_DEL, 27, 0xa35c24) = 0
[pid 36046] epoll_wait(17, [], 32, 0)   = 0
[pid 36046] close(27)                   = 0
[pid 36046] epoll_wait(17, 

epoll_wait(2) returns EPOLLRDHUP, good...

With a HTTP connection

[pid 36043] epoll_wait(3, 

<-- Ctrl-C the client

 <unfinished ...>
[pid 36046] <... epoll_wait resumed>[{events=EPOLLIN|EPOLLRDHUP, data={u32=671099104, u64=139939295537376}}], 32, -1) = 1
[pid 36046] epoll_wait(17, 

Again we get EPOLLRDHUP from epoll_wait(2), good, but we immediately go back to epoll_wait(2) again...

With a WebSocket when the connection is closed we call

 955                 nxt_work_queue_add(ev->read_work_queue, ev->read_handler,  
 956                                    ev->task, ev, ev->data);

in nxt_epoll_poll().

When a HTTP connection is closed, we don't.

This seems to be due to ev->read == NXT_EVENT_BLOCKED in the HTTP case and ev->read == NXT_EVENT_DEFAULT in the WebSocket case...

As a POC, this hack fixes the python SSE app

Forget that, this causes the router process to crash...

diff --git a/src/nxt_epoll_engine.c b/src/nxt_epoll_engine.c
index d53df1bc..05efc01a 100644
--- a/src/nxt_epoll_engine.c
+++ b/src/nxt_epoll_engine.c
@@ -937,17 +939,23 @@ nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
 
         ev->epoll_eof = ((events & EPOLLRDHUP) != 0);
 
+        if (events & EPOLLRDHUP)
+            ev->read = NXT_EVENT_DEFAULT;
+

@ac000
Copy link
Member

ac000 commented Feb 10, 2025

When using a WebSocket we get (after Ctrl-C'ing the client)

(gdb) bt
#0  nxt_epoll_block_read (engine=0x1728050, ev=0x7f9d740028e0) at src/nxt_epoll_engine.c:512
#1  0x000000000043b5dd in nxt_conn_io_read (task=task@entry=0x7f9d740039e0, obj=0x7f9d740028e0, 
    data=0x7f9d74003d00) at src/nxt_conn_read.c:97
#2  0x0000000000413f97 in nxt_event_engine_start (engine=engine@entry=0x1728050) at src/nxt_event_engine.c:542
#3  0x000000000041c5b4 in nxt_router_thread_start (data=0x170a910) at src/nxt_router.c:3717
#4  0x0000000000412865 in nxt_thread_trampoline (data=0x170a910) at src/nxt_thread.c:126
#5  0x00007f9d81689d22 in start_thread (arg=<optimized out>) at pthread_create.c:443
#6  0x00007f9d8170ed40 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81

Calling nxt_epoll_block_read() is good...

With a HTTP connection we get

(gdb) bt
#0  nxt_epoll_enable_read (engine=0xc9d9a0, ev=0x7fcf80000cd0) at src/nxt_epoll_engine.c:418
#1  0x000000000040bbfd in nxt_port_queue_read_handler (task=task@entry=0xc9d9a0, obj=0x7fcf80000cd0, 
    data=<optimized out>) at src/nxt_port_socket.c:1013
#2  0x0000000000413f97 in nxt_event_engine_start (engine=engine@entry=0xc9d9a0) at src/nxt_event_engine.c:542
#3  0x000000000041c5b4 in nxt_router_thread_start (data=0xc8a390) at src/nxt_router.c:3717
#4  0x0000000000412865 in nxt_thread_trampoline (data=0xc8a390) at src/nxt_thread.c:126
#5  0x00007fcf86e89d22 in start_thread (arg=<optimized out>) at pthread_create.c:443
#6  0x00007fcf86f0ed40 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81

A completely different code path...

strace(1)ing the router process

[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable)
[pid 37087] writev(27<TCPv6:[[]:8000->[]:34572]>, [{iov_base="\r\n17\r\n", iov_len=6}, {iov_base="event: count\ndata: 20\n\n", iov_len=23}], 2) = 29
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\10\0\0\0\336\220\0\0\0\0\33\0\0\0\0\0", iov_len=16}, {iov_base="event: count\ndata: 22\n\n", iov_len=16384}], msg_iovlen=2, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=37086, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 39
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable)
[pid 37087] writev(27<TCPv6:[[]:8000->[]:34572]>, [{iov_base="\r\n17\r\n", iov_len=6}, {iov_base="event: count\ndata: 22\n\n", iov_len=23}], 2) = 29

<-- Ctrl-C the client

[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\10\0\0\0\336\220\0\0\0\0\33\0\0\0\0\0", iov_len=16}, {iov_base="event: count\ndata: 24\n\n", iov_len=16384}], msg_iovlen=2, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=37086, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 39
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable)
[pid 37087] writev(27<TCPv6:[[]:8000->[]:34572]>, [{iov_base="\r\n17\r\n", iov_len=6}, {iov_base="event: count\ndata: 24\n\n", iov_len=23}], 2) = 29
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\10\0\0\0\336\220\0\0\0\0\33\0\0\0\0\0", iov_len=16}, {iov_base="event: count\ndata: 26\n\n", iov_len=16384}], msg_iovlen=2, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=37086, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 39
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable)
[pid 37087] writev(27<TCPv6:[[]:8000->[]:34572]>, [{iov_base="\r\n17\r\n", iov_len=6}, {iov_base="event: count\ndata: 26\n\n", iov_len=23}], 2) = -1 EPIPE (Broken pipe)
[pid 37087] --- SIGPIPE {si_signo=SIGPIPE, si_code=SI_USER, si_pid=37084, si_uid=1000} ---
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\10\0\0\0\336\220\0\0\0\0\33\0\0\0\0\0", iov_len=16}, {iov_base="event: count\ndata: 28\n\n", iov_len=16384}], msg_iovlen=2, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=37086, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 39
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable)
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\10\0\0\0\336\220\0\0\0\0\33\0\0\0\0\0", iov_len=16}, {iov_base="event: count\ndata: 30\n\n", iov_len=16384}], msg_iovlen=2, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=37086, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 39
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable)
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\10\0\0\0\336\220\0\0\0\0\33\0\0\0\0\0", iov_len=16}, {iov_base="event: count\ndata: 32\n\n", iov_len=16384}], msg_iovlen=2, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=37086, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 39
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable)
...

The UNIX domain socket I guess is the Python app talking to the router process...

pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\10\0\0\0\336\220\0\0\0\0\33\0\0\0\0\0", iov_len=16}, {iov_base="event: count\ndata: 22\n\n", iov_len=16384}], msg_iovlen=2, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=37086, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 39

Python app sends event: count\ndata: 22\n\n to the router process, then

[pid 37087] writev(27<TCPv6:[[]:8000->[]:34572]>, [{iov_base="\r\n17\r\n", iov_len=6}, {iov_base="event: count\ndata: 22\n\n", iov_len=23}], 2) = 29

The router process then sends event: count\ndata: 22\n\n to the client.

After Crtl-C'ing the client we don't attempt to read from the client (like we do in the WebSocket case), I guess because we're not expecting to be reading any data from the client after the initial request.

That's fine, but we do need to properly handle EPOLLRDHUP, hmm, but even then that'll only fix it for epoll(7) we still need to worry about all the other poll mechanisms...

@gourav-kandoria
Copy link
Contributor Author

gourav-kandoria commented Feb 11, 2025

After Crtl-C'ing the client we don't attempt to read from the client (like we do in the WebSocket case), I guess because we're not expecting to be reading any data from the client after the initial request.

makes sense. I just want to understand. Why it won't be okay. If at the time of writing back to socket. we notify app of disconnection. I get that, from the app perspective. it would be like some bytes have been sent but not actually, because, router would have discarded them and then notify the app of disconnection.

@ac000
Copy link
Member

ac000 commented Feb 12, 2025

makes sense. I just want to understand. Why it won't be okay. If at the time of writing back to socket. we notify app of disconnection. I get that, from the app perspective. it would be like some bytes have been sent but not actually, because, router would have discarded them and then notify the app of disconnection.

I dunno, maybe that is the right approach... however at the moment if you have an application that starts a chunked transfer, (E.g. a SSE application) but then doesn't send any data you can effectively DOS the server by opening and closing a bunch of connections to the application, as you then end up with a bunch of connections in CLOSE_WAIT. E.g.

$ ss -tnp state CLOSE-WAIT | sed 's/\[[^[]*\]/[::1]/g'
Recv-Q Send-Q                        Local Address:Port            Peer Address:Port Process                           
1      0      [::1]:8000 [::1]:59800 users:(("unitd",pid=41460,fd=31))
1      0      [::1]:8000 [::1]:54674 users:(("unitd",pid=41460,fd=27))
1      0      [::1]:8000 [::1]:35932 users:(("unitd",pid=41460,fd=30))
1      0      [::1]:8000 [::1]:35918 users:(("unitd",pid=41460,fd=29))

See this cloudflare article for the gory details.

Hmm, do you want to re-work your patches to not do any WebSocket stuff as that seems to be handled correctly as is and use EPIPE for the OSError? (although I think you were only doing that for the websocket case).

@ac000
Copy link
Member

ac000 commented Feb 13, 2025

HI @gourav-kandoria

So in trying a variation of your patches to try and handle the client closing the connection, I have (hopefully it's all there as I tried to leave most of my debug code behind...)

diff --git a/src/nxt_port.h b/src/nxt_port.h
index 772fb41a..e801a2ee 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -59,6 +59,8 @@ struct nxt_port_handlers_s {
     /* Status report. */
     nxt_port_handler_t  status;
 
+    nxt_port_handler_t  client_close;
+
     nxt_port_handler_t  oosm;
     nxt_port_handler_t  shm_ack;
     nxt_port_handler_t  read_queue;
@@ -115,6 +117,8 @@ typedef enum {
     _NXT_PORT_MSG_APP_RESTART     = nxt_port_handler_idx(app_restart),
     _NXT_PORT_MSG_STATUS          = nxt_port_handler_idx(status),
 
+    _NXT_PORT_MSG_CLIENT_CLOSE    = nxt_port_handler_idx(client_close),
+
     _NXT_PORT_MSG_OOSM            = nxt_port_handler_idx(oosm),
     _NXT_PORT_MSG_SHM_ACK         = nxt_port_handler_idx(shm_ack),
     _NXT_PORT_MSG_READ_QUEUE      = nxt_port_handler_idx(read_queue),
@@ -160,6 +164,8 @@ typedef enum {
     NXT_PORT_MSG_APP_RESTART      = nxt_msg_last(_NXT_PORT_MSG_APP_RESTART),
     NXT_PORT_MSG_STATUS           = nxt_msg_last(_NXT_PORT_MSG_STATUS),
 
+    NXT_PORT_MSG_CLIENT_CLOSE     = nxt_msg_last(_NXT_PORT_MSG_CLIENT_CLOSE),
+
     NXT_PORT_MSG_OOSM             = nxt_msg_last(_NXT_PORT_MSG_OOSM),
     NXT_PORT_MSG_SHM_ACK          = nxt_msg_last(_NXT_PORT_MSG_SHM_ACK),
     NXT_PORT_MSG_READ_QUEUE       = _NXT_PORT_MSG_READ_QUEUE,
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 44ea823b..bf8f5ff1 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -5276,6 +5276,8 @@ nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
 {
     nxt_http_request_t  *r;
 
+    printf("%s: \n", __func__);
+
     r = obj;
 
     nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
@@ -5295,11 +5297,22 @@ nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
 {
     nxt_http_request_t  *r;
 
+    printf("%s: \n", __func__);
+
     r = data;
 
     nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
 
     if (r->req_rpc_data != NULL) {
+        nxt_request_rpc_data_t  *req_rpc_data = r->req_rpc_data;
+
+        printf("%s: Sending [NXT_PORT_MSG_CLIENT_CLOSE] message / [%d]...\n",
+               __func__, req_rpc_data->stream);
+        nxt_port_socket_write(task, req_rpc_data->app_port,
+                              NXT_PORT_MSG_CLIENT_CLOSE, -1,
+                              req_rpc_data->stream,
+                              task->thread->engine->port->id, NULL);
+
         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
     }
 
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 966a6c0f..fe62861c 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -74,6 +74,8 @@ static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
     nxt_unit_recv_msg_t *recv_msg);
+static int nxt_unit_process_client_close(nxt_unit_ctx_t *ctx,
+    nxt_unit_recv_msg_t *recv_msg);
 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
     nxt_unit_ctx_t *ctx);
@@ -1121,6 +1123,11 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
         rc = nxt_unit_process_websocket(ctx, &recv_msg);
         break;
 
+    case _NXT_PORT_MSG_CLIENT_CLOSE:
+        printf("%s: Got message [NXT_PORT_MSG_CLIENT_CLOSE]\n", __func__);
+        rc = nxt_unit_process_client_close(ctx, &recv_msg);
+        break;
+
     case _NXT_PORT_MSG_REMOVE_PID:
         if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
             nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
@@ -1377,18 +1384,18 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
 
         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
 
+        res = nxt_unit_request_hash_add(ctx, req);
+        if (nxt_slow_path(res != NXT_UNIT_OK)) {
+            nxt_unit_req_warn(req, "failed to add request to hash");
+
+            nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+            return NXT_UNIT_ERROR;
+        }
+
         if (req->content_length
             > (uint64_t) (req->content_buf->end - req->content_buf->free))
         {
-            res = nxt_unit_request_hash_add(ctx, req);
-            if (nxt_slow_path(res != NXT_UNIT_OK)) {
-                nxt_unit_req_warn(req, "failed to add request to hash");
-
-                nxt_unit_request_done(req, NXT_UNIT_ERROR);
-
-                return NXT_UNIT_ERROR;
-            }
-
             /*
              * If application have separate data handler, we may start
              * request processing and process data when it is arrived.
@@ -1418,7 +1425,7 @@ nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
     nxt_unit_mmap_buf_t      *b;
     nxt_unit_request_info_t  *req;
 
-    req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
+    req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0);
     if (req == NULL) {
         return NXT_UNIT_OK;
     }
@@ -1723,6 +1730,34 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
 }
 
 
+static int
+nxt_unit_process_client_close(nxt_unit_ctx_t *ctx,
+                              nxt_unit_recv_msg_t *recv_msg)
+{
+    nxt_unit_impl_t          *lib;
+    nxt_unit_callbacks_t     *cb;
+    nxt_unit_request_info_t  *req;
+
+    req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0);
+    if (req == NULL) {
+        printf("%s: ERROR [req] not found for stream [%d]\n", __func__,
+               recv_msg->stream);
+        return NXT_UNIT_OK;
+    }
+
+    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+    cb = &lib->callbacks;
+
+    if (cb->close_handler) {
+        cb->close_handler(req);
+    } else {
+        nxt_unit_request_done(req, NXT_UNIT_ERROR);
+    }
+
+    return NXT_UNIT_OK;
+}
+
+
 static int
 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
 {
@@ -6530,6 +6565,7 @@ nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
 
     case NXT_OK:
         req_impl->in_hash = 1;
+        printf("%s: Added req for stream [%d]\n", __func__, *stream);
         return NXT_UNIT_OK;
 
     default:
@@ -6557,6 +6593,7 @@ nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove)
     pthread_mutex_lock(&ctx_impl->mutex);
 
     if (remove) {
+        printf("%s: Removing req for stream [%d]\n", __func__, stream);
         res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq);
 
     } else {

But I think I see the issue you were having with looking up req in the hash table, this seems to be simply due to the fact that it isn't being added.

I.e. we don't hit this code

+        res = nxt_unit_request_hash_add(ctx, req);
+        if (nxt_slow_path(res != NXT_UNIT_OK)) {
+            nxt_unit_req_warn(req, "failed to add request to hash");
+
+            nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+            return NXT_UNIT_ERROR;
+        }
+

Trying it earlier I.e. above the if () statement, does result in it being added but also in general breakage...

Perhaps @hongzhidao has some idea?

@gourav-kandoria
Copy link
Contributor Author

gourav-kandoria commented Feb 13, 2025

See this cloudflare article for the gory details.

nice read.

Hmm, do you want to re-work your patches to not do any WebSocket stuff as that seems to be handled correctly as is and use EPIPE for the OSError? (although I think you were only doing that for the websocket case).

well, I guess keeping this would still make sense,

    ws->state = NXT_WS_DISCONNECTED;

    if (ws->receive_future == NULL) {
        return;
    }

Because, if there is no receive awaiting. then at the time of when send is called this set state would raise exception as should be done as per asgi spec. Basically the point here is to keep the state correctly regardless of whether some receive future is awaiting or not. as is done in nxt_py_asgi_http_close_handler.

Also, even if exception is occuring on router side, But it is not being propagated to application as if we see in
nxt_py_asgi_websocket_close_handler function. There isn't anything that tells us what was the OSError occured.
So, in the absence of any particular error code, we should raise any exception that is atlease subClass of osError as per asgi spec

@ac000
Copy link
Member

ac000 commented Feb 13, 2025

Summary of issues.

With the following python ASGI application

async def application(scope, receive, send):
    while True:
        m = await receive()
        if m['type'] == 'http.disconnect':
                print("Client Disconnect")
                break

        await send(
            {
                "type": "http.response.start",
                "status": 200,
                "headers": [[b"content-type", b"text/plain"]],
            }
        )
 
        await send(
            {
                'type': 'http.response.body',
                'body': b"Testing...\n",
                'more_body': True 
            }
        )

Opening a connection with curl and then Ctrl-C'ing it

$ curl rhel-9:8000/
Testing...
^C

Results in a connection stuck in CLOSE_WAIT and a leaked file descriptor

tcp   CLOSE-WAIT 1      0      [::1]:8000  [::!]:47140         

This patch at least gets rid of the connection being stuck in CLOSE_WAIT, but it doesn't look like the application is notified.

diff --git a/src/nxt_epoll_engine.c b/src/nxt_epoll_engine.c
index d53df1bc..2d5b6fd0 100644
--- a/src/nxt_epoll_engine.c
+++ b/src/nxt_epoll_engine.c
@@ -936,18 +950,26 @@ nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
 #if (NXT_HAVE_EPOLL_EDGE)
 
         ev->epoll_eof = ((events & EPOLLRDHUP) != 0);
+        if (ev->epoll_eof)
+            ev->read = NXT_EVENT_INACTIVE;

 #endif

With this python ASGI application

import asyncio

async def application(scope, receive, send):
    if scope['type'] == 'http':
        headers = [
            (b'content-type', b'text/event-stream'),
            (b'cache-control', b'no-cache'),
            (b'connection', b'keep-alive'),
        ]
        await send({
            'type': 'http.response.start',
            'status': 200,
            'headers': headers,
        })

        send_task = asyncio.create_task(send_messages_sse(send))
        receive_task = asyncio.create_task(receive_messages_sse(receive))

        await asyncio.gather(send_task, receive_task)


async def receive_messages_sse(receive):
    message = await receive()
    print(f'message received: {message}')
    if message['type'] == 'http.disconnect':
        print(f'http.disconnect')
        return

async def send_messages_sse(send):
    i = 0
    while True:
        try:
            message = f"event: count\ndata: {i}\n\n"
            print(f'message sent: {message}')
            await send({
                'type': 'http.response.body',
                'body': message.encode('utf-8'),
                'more_body': True 
            })
            i+=2
            await asyncio.sleep(2)
        except Exception as err:
            print(f'err : {err}')
            break

The above patch causes the router process to segfault.

Without the above patch, if you open a connection to the application then Ctrl-C it, you don't get the CLOSE_WAIT issue, but the application also isn't notified that the connection is closed and so keeps sending messages to the router process.

With this slightly modified version of @gourav-kandoria 's patch

diff --git a/src/nxt_port.h b/src/nxt_port.h
index 772fb41a..e801a2ee 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -59,6 +59,8 @@ struct nxt_port_handlers_s {
     /* Status report. */
     nxt_port_handler_t  status;
 
+    nxt_port_handler_t  client_close;
+
     nxt_port_handler_t  oosm;
     nxt_port_handler_t  shm_ack;
     nxt_port_handler_t  read_queue;
@@ -115,6 +117,8 @@ typedef enum {
     _NXT_PORT_MSG_APP_RESTART     = nxt_port_handler_idx(app_restart),
     _NXT_PORT_MSG_STATUS          = nxt_port_handler_idx(status),
 
+    _NXT_PORT_MSG_CLIENT_CLOSE    = nxt_port_handler_idx(client_close),
+
     _NXT_PORT_MSG_OOSM            = nxt_port_handler_idx(oosm),
     _NXT_PORT_MSG_SHM_ACK         = nxt_port_handler_idx(shm_ack),
     _NXT_PORT_MSG_READ_QUEUE      = nxt_port_handler_idx(read_queue),
@@ -160,6 +164,8 @@ typedef enum {
     NXT_PORT_MSG_APP_RESTART      = nxt_msg_last(_NXT_PORT_MSG_APP_RESTART),
     NXT_PORT_MSG_STATUS           = nxt_msg_last(_NXT_PORT_MSG_STATUS),
 
+    NXT_PORT_MSG_CLIENT_CLOSE     = nxt_msg_last(_NXT_PORT_MSG_CLIENT_CLOSE),
+
     NXT_PORT_MSG_OOSM             = nxt_msg_last(_NXT_PORT_MSG_OOSM),
     NXT_PORT_MSG_SHM_ACK          = nxt_msg_last(_NXT_PORT_MSG_SHM_ACK),
     NXT_PORT_MSG_READ_QUEUE       = _NXT_PORT_MSG_READ_QUEUE,
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 44ea823b..bf8f5ff1 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -5276,6 +5276,8 @@ nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
 {
     nxt_http_request_t  *r;
 
+    printf("%s: \n", __func__);
+
     r = obj;
 
     nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
@@ -5295,11 +5297,22 @@ nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
 {
     nxt_http_request_t  *r;
 
+    printf("%s: \n", __func__);
+
     r = data;
 
     nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
 
     if (r->req_rpc_data != NULL) {
+        nxt_request_rpc_data_t  *req_rpc_data = r->req_rpc_data;
+
+        printf("%s: Sending [NXT_PORT_MSG_CLIENT_CLOSE] message / [%d]...\n",
+               __func__, req_rpc_data->stream);
+        nxt_port_socket_write(task, req_rpc_data->app_port,
+                              NXT_PORT_MSG_CLIENT_CLOSE, -1,
+                              req_rpc_data->stream,
+                              task->thread->engine->port->id, NULL);
+
         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
     }
 
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 966a6c0f..866d1e1d 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -74,6 +74,8 @@ static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
     nxt_unit_recv_msg_t *recv_msg);
+static int nxt_unit_process_client_close(nxt_unit_ctx_t *ctx,
+    nxt_unit_recv_msg_t *recv_msg);
 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
     nxt_unit_ctx_t *ctx);
@@ -1121,6 +1123,11 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
         rc = nxt_unit_process_websocket(ctx, &recv_msg);
         break;
 
+    case _NXT_PORT_MSG_CLIENT_CLOSE:
+        printf("%s: Got message [NXT_PORT_MSG_CLIENT_CLOSE]\n", __func__);
+        rc = nxt_unit_process_client_close(ctx, &recv_msg);
+        break;
+
     case _NXT_PORT_MSG_REMOVE_PID:
         if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
             nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
@@ -1418,7 +1425,7 @@ nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
     nxt_unit_mmap_buf_t      *b;
     nxt_unit_request_info_t  *req;
 
-    req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
+    req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0);
     if (req == NULL) {
         return NXT_UNIT_OK;
     }
@@ -1723,6 +1730,35 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
 }
 
 
+static int
+nxt_unit_process_client_close(nxt_unit_ctx_t *ctx,
+                              nxt_unit_recv_msg_t *recv_msg)
+{
+    nxt_unit_impl_t          *lib;
+    nxt_unit_callbacks_t     *cb;
+    nxt_unit_request_info_t  *req;
+
+    req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0);
+    if (req == NULL) {
+        printf("%s: ERROR [req] not found for stream [%d]\n", __func__,
+               recv_msg->stream);
+        return NXT_UNIT_OK;
+    }
+
+    lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+    cb = &lib->callbacks;
+
+    if (cb->close_handler) {
+        printf("%s: Calling [cb->close_handler(req)]\n", __func__);
+        cb->close_handler(req);
+    } else {
+        nxt_unit_request_done(req, NXT_UNIT_ERROR);
+    }
+
+    return NXT_UNIT_OK;
+}
+
+
 static int
 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
 {
@@ -4826,10 +4862,22 @@ nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
             continue;
         }
 
+#if 1
+        printf("%s: Adding req to hash table\n", __func__);
+        res = nxt_unit_request_hash_add(ctx, req);
+        if (nxt_slow_path(res != NXT_UNIT_OK)) {
+            nxt_unit_req_warn(req, "failed to add request to hash");
+
+            nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+            continue;
+        }
+#endif
+
         if (req->content_length
             > (uint64_t) (req->content_buf->end - req->content_buf->free))
         {
-            res = nxt_unit_request_hash_add(ctx, req);
+#if 0
             if (nxt_slow_path(res != NXT_UNIT_OK)) {
                 nxt_unit_req_warn(req, "failed to add request to hash");
@@ -4837,7 +4885,7 @@ nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
 
                 continue;
             }
-
+#endif
             /*
              * If application have separate data handler, we may start
              * request processing and process data when it is arrived.
diff --git a/src/python/nxt_python_asgi.c b/src/python/nxt_python_asgi.c
index 702f4d8d..2ed6f964 100644
--- a/src/python/nxt_python_asgi.c
+++ b/src/python/nxt_python_asgi.c
@@ -626,10 +626,13 @@ nxt_py_asgi_request_handler(nxt_unit_request_info_t *req)
 static void
 nxt_py_asgi_close_handler(nxt_unit_request_info_t *req)
 {
+    printf("%s: \n", __func__);
+
     if (req->request->websocket_handshake) {
         nxt_py_asgi_websocket_close_handler(req);
 
     } else {
+        printf("%s: Calling [nxt_py_asgi_http_close_handler(req)]\n", __func__);
         nxt_py_asgi_http_close_handler(req);
     }
 }
diff --git a/src/python/nxt_python_asgi_http.c b/src/python/nxt_python_asgi_http.c
index cdd6357e..81c97b7e 100644
--- a/src/python/nxt_python_asgi_http.c
+++ b/src/python/nxt_python_asgi_http.c
@@ -362,6 +362,12 @@ nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict)
     Py_ssize_t              body_len, body_off;
     nxt_py_asgi_ctx_data_t  *ctx_data;
 
+    printf("%s: \n", __func__);
+
+    if (http->closed) {
+        return PyErr_Format(PyExc_RuntimeError, "peer closed conenction");
+    }
+
     if (nxt_slow_path(http->complete)) {
         return PyErr_Format(PyExc_RuntimeError,
                             "Unexpected ASGI message 'http.response.body' "
@@ -646,6 +652,7 @@ nxt_py_asgi_http_close_handler(nxt_unit_request_info_t *req)
     if (nxt_fast_path(http != NULL)) {
         http->closed = 1;
 
+        printf("%s: Calling [nxt_py_asgi_http_emit_disconnect(http)]\n", __func__);
         nxt_py_asgi_http_emit_disconnect(http);
     }
 }

On the first connection attempt and Ctrl-C it seems to do the right thing and the application stops sending messages.

However on subsequent connections it's back to the original problem. as it doesn't seem to be adding req to the hash table, I notice its streamid has changed from 8 to 9, so perhaps things didn't get cleaned up properly previously.

@hongzhidao
Copy link
Contributor

hongzhidao commented Feb 14, 2025

Hi @gourav-kandoria @ac000,
This looks like a topic of whether to support client abort, just my two cents.

  1. It can be an option like ignore_client_abort, notify_client_abort, etc, and the default behavior is ignoring it so that we don't change any behavior on the client by default.
  2. The option seems to be in the applications object.
  3. I feel we need to call nxt_conn_read() with a closed handler in the correct place to track the client connection. It seems not a good idea to rely on r->error or r->closed.
  4. Welcome to add unit tests on it.

@ac000
Copy link
Member

ac000 commented Feb 15, 2025

@hongzhidao Thanks for your input.

It can be an option like ignore_client_abort, notify_client_abort, etc, and the default behaviour is ignoring it so that we don't change any behaviour on the client by default.

Hmm, I'm curious why you think this should be optional behaviour and not just the standard behaviour of a properly functioning server?

I feel we need to call nxt_conn_read() with a closed handler in the correct place to track the client connection. It seems not a good idea to rely on r->error or r->closed.

Let's deal with the case where the client end of the connection has been closed (for whatever reason).

On Linux with epoll(7) and EPOLLRDHUP we are notified when this happens in nxt_epoll_poll() and with WebSockets this seems to be handled properly and the socket is properly closed on the server and the application is notified with a websocket.disconnect.

However with a HTTP connection things seem to go awry due to NXT_EVENT_BLOCKED (not really sure what is blocked and why) being set, we hit this condition and then we don't go through things like nxt_conn_io_read() which would detect this via reading 0 bytes, although that should be unnecessary due to EPOLLRDHUP.

But then we also do need someway of notifying the application that the client has closed the connection.

I guess this all does just confirm my suspicion that Unit was never designed with Server-sent events applications in mind...

@hongzhidao
Copy link
Contributor

hongzhidao commented Feb 15, 2025

@ac000 You are welcome.

Hmm, I'm curious why you think this should be optional behaviour and not just the standard behaviour of a properly functioning server?

Since I feel like this is a change, by default I tend not to change the original behavior, such as sometimes the connection is broken, but the application wants to continue processing requests, but welcome to continue the discussion.

Let's deal with the case where the client end of the connection has been closed (for whatever reason).
NXT_EVENT_BLOCKED (not really sure what is blocked and why) being set

In Unit, it's about the way of how to trigger the event engine. Compared to the traditional way, if any event happens, its callback handler is usually called immediately. But in Unit, we need to actively call nxt_conn_read actively (), then its handler will be called. That's why NXT_EVENT_BLOCKED is introduced.

reading 0 bytes, although that should be unnecessary due to EPOLLRDHUP.

Yep, epoll has such an event type, but Unit also supports other event engines.

@gourav-kandoria
Copy link
Contributor Author

gourav-kandoria commented Feb 17, 2025

Since I feel like this is a change, by default I tend not to change the original behavior, such as sometimes the connection is broken, but the application wants to continue processing requests, but welcome to continue the discussion.

@hongzhidao @ac000

Yes, makes sense. but if we see in traditional servers . Let's say even they don't read after initial request. Then at the time of sending data back, the application immediately know that it is writing to broken socket. But here, this information is not being propagated back to application. I also think adding options like notify_client_abort would only be an extra bit of hassle for the users. Because, any framework or application is usually written assuming the ways servers traditionaly works and some languages like python, java have specification like asgi, wsgi, servelet which also assume same thing. So, my suggestion would be if request is in progress and while sending data back if disconnected socket is detected. notify application process and then do something as per language specific specification.

However with a HTTP connection things seem to go awry due to NXT_EVENT_BLOCKED (not really sure what is blocked and
why) being set, we hit this condition and then we don't go through things like nxt_conn_io_read() which would detect this via > reading 0 bytes,

I am not sure. But It might be blocked from reading. Because, when first request is completely read, second request might also be pipelined after it as in http1.1 multiple requests can be server on same TCP connection. So, only after first request response is sent back, then router should start reading and processing second request bytes. Yeah but sure, EPOLLRDHUP and EPOLLRHUP can be listened to.

@ac000
Copy link
Member

ac000 commented Feb 17, 2025

and with WebSockets this seems to be handled properly and the socket is properly closed on the server and the application is notified with a websocket.disconnect.

Hmm, not with

import time                                                                     
from fastapi import FastAPI, WebSocket                                          
                                                                                
application = FastAPI()                                                         
                                                                                
@application.websocket("/ws")                                                   
async def websocket_endpoint(websocket: WebSocket):                             
    print("Listening on WebSocket")                                             
    await websocket.accept()                                                    
    while True:                                                                 
        await websocket.send_text(f"Testing...")                                                                
        time.sleep(2) 

When ^Cing the client, the socket is closed on the server but it seems the application continues to want to send data...

@gourav-kandoria
Copy link
Contributor Author

and with WebSockets this seems to be handled properly and the socket is properly closed on the server and the application is notified with a websocket.disconnect.

Hmm, not with

import time                                                                     
from fastapi import FastAPI, WebSocket                                          
                                                                                
application = FastAPI()                                                         
                                                                                
@application.websocket("/ws")                                                   
async def websocket_endpoint(websocket: WebSocket):                             
    print("Listening on WebSocket")                                             
    await websocket.accept()                                                    
    while True:                                                                 
        await websocket.send_text(f"Testing...")                                                                
        time.sleep(2) 

When ^Cing the client, the socket is closed on the server but it seems the application continues to want to send data...

If you try the websocket file patch it should work

@ac000
Copy link
Member

ac000 commented Feb 17, 2025

If you try the websocket file patch it should work

Hmm, actually it doesn't, because unit is half doing the right thing, it's detecting the client has closed the connection and closes the socket, no error is detected or set (r->error).

[pid 91251] recvfrom(27<TCPv6:[[::1]:8000->[::!]:40818]>, "", 135, 0, NULL, NULL) = 0
[pid 91251] sendmsg(16<UNIX:[210062->210061,@"1d818"]>, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\10\0\0\0pd\1\0\1\0!\1\0\0\0\0", iov_len=16}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 16
[pid 91251] sendto(27<TCPv6:[[::!]:8000->[::1]:40818]>, "\211\0", 2, 0, NULL, 0) = 2
[pid 91251] close(27<TCPv6:[[::1]:8000->[::1]:40818]>) = 0

Unit read 0 bytes from the client signalling eof/connection close/shutdown. It just doesn't tell the application. I'd be happy if I could get to this state with HTTP connections, then the rest should fall into place...

@ac000
Copy link
Member

ac000 commented Feb 17, 2025

The other problem this causes (that the application is still trying to send data to the client) is that running unit in the foreground and ^C'ing it leaves the application processes still running...

@gourav-kandoria
Copy link
Contributor Author

gourav-kandoria commented Feb 18, 2025

Hmm, actually it doesn't, because unit is half doing the right thing, it's detecting the client has closed the connection and closes the socket, no error is detected or set (r->error).

It might have something to do with a variable "notify"

These are the two different logs, one where exception occured in application one where it didn't

2025/02/18 22:19:23.137 [debug] 8249#8252 *114 http websocket error handler
2025/02/18 22:19:23.137 [debug] 8249#8252 *114 port{8405,0} 107: enqueue 16 notify 1, 0

2025/02/18 22:29:53.867 [debug] 15231#15237 *27 http websocket error handler
2025/02/18 22:29:53.867 [debug] 15231#15237 *27 port{15233,0} 23: enqueue 16 notify 0, 0

Now, this is even more confusing, why with simple asgi apps, it is 1 and with framework like fastapi it is 0.

@gourav-kandoria
Copy link
Contributor Author

gourav-kandoria commented Feb 18, 2025

The other problem this causes (that the application is still trying to send data to the client) is that running unit in the foreground and ^C'ing it leaves the application processes still running...

Maybe in case of fastapi, after websocket connection, port messages are not getting delivered to app. Because, when we ctrl+c the foreground running unit. Application process receives this message _NXT_PORT_MSG_QUIT and quits itself. But here it not getting this message. Same is happening for client disconnection, This message(NXT_PORT_MSG_WEBSOCKET_LAST) is not getting delivered to application process

@gourav-kandoria
Copy link
Contributor Author

gourav-kandoria commented Feb 19, 2025

The other problem this causes (that the application is still trying to send data to the client) is that running unit in the foreground and ^C'ing it leaves the application processes still running...

Maybe in case of fastapi, after websocket connection, port messages are not getting delivered to app. Because, when we ctrl+c the foreground running unit. Application process receives this message _NXT_PORT_MSG_QUIT and quits itself. But here it not getting this message. Same is happening for client disconnection, This message(NXT_PORT_MSG_WEBSOCKET_LAST) is not getting delivered to application process

Got the issue, this isn't any issue with unit but with this block of code. This loop never seems to leave CPU. giving time to event loop to process incoming messages

    while True:                                                                 
        await websocket.send_text(f"Testing...")
        print('sent message')
        time.sleep(2)

If instead of time.sleep(2) , asyncio.sleep(2) is used, app is getting disconnection notification.

@gourav-kandoria
Copy link
Contributor Author

@ac000 @hongzhidao . How do we proceed on this?

@ac000
Copy link
Member

ac000 commented Feb 24, 2025

Not forgotten about this, but was looking at some other issues.

In simple terms we just need to make Unit recognise when a client closes the connection. Still trying to figure out the how...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants