-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
38 lines (28 loc) · 1.05 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import asyncio
from jsonrpc import dispatcher, JSONRPCResponseManager
@dispatcher.add_method
def echo(message):
return f"Echo: {message}"
class EchoServerProtocol(asyncio.Protocol):
def __init__(self) -> None:
self.transport = None
def connection_made(self, transport):
peer_name = transport.get_extra_info('peername')
print('Connection from {}'.format(peer_name))
self.transport = transport
def data_received(self, data: bytes) -> None:
message = data.decode()
print('Data received: {!r}'.format(message))
response = JSONRPCResponseManager.handle(message, dispatcher)
if response:
response_data = response.json.encode()
print('Send response: {!r}'.format(response_data))
self.transport.write(response_data)
async def main():
loop = asyncio.get_running_loop()
server = await loop.create_server(
lambda: EchoServerProtocol(),
'127.0.0.1', 8888)
async with server:
await server.serve_forever()
asyncio.run(main())