-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathrun_client.py
132 lines (104 loc) · 4.2 KB
/
run_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# This file is a part of WTFIX.
#
# Copyright (C) 2018-2021 John Cass <john.cass77@gmail.com>
#
# WTFIX is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# WTFIX is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse
import asyncio
import logging
import os
import sys
import signal
from wtfix.conf import settings
from wtfix.core.exceptions import ImproperlyConfigured
from wtfix.pipeline import BasePipeline
from wtfix.protocol.contextlib import connection_manager
logger = settings.logger
parser = argparse.ArgumentParser(description="Start a FIX connection")
parser.add_argument(
"--connection",
default="default",
help="the configuration settings to use for the connection (default: 'default')",
)
parser.add_argument(
"-new_session",
action="store_true",
help="reset sequence numbers and start a new session",
)
_shutting_down = asyncio.Event()
async def graceful_shutdown(pipeline, sig_name=None, error=None):
if pipeline.stopping_event.is_set():
# Nothing to do
return
if _shutting_down.is_set():
# Only try to shut down once
logger.warning(f"Shutdown already in progress! Ignoring signal '{sig_name}'.")
return
_shutting_down.set()
if sig_name is not None:
logger.info(f"Received signal {sig_name}! Initiating graceful shutdown...")
else:
logger.info("Initiating graceful shutdown...")
await pipeline.stop(error=error)
async def main():
logging.basicConfig(
level=settings.LOGGING_LEVEL,
format="%(asctime)s - %(threadName)s - %(module)s - %(levelname)s - %(message)s",
)
args = parser.parse_args()
exit_code = os.EX_OK
with connection_manager(args.connection) as conn:
fix_pipeline = BasePipeline(
connection_name=conn.name, new_session=args.new_session
)
try:
# Graceful shutdown on termination signals.
# See: https://docs.python.org/3.7/library/asyncio-eventloop.html#set-signal-handlers-for-sigint-and-sigterm
loop = asyncio.get_running_loop()
for sig_name in {"SIGINT", "SIGTERM"}:
loop.add_signal_handler(
getattr(signal, sig_name),
lambda: asyncio.create_task(
graceful_shutdown(fix_pipeline, sig_name=sig_name)
),
)
await fix_pipeline.start()
except ImproperlyConfigured as e:
# User needs to fix config issue before restart is attempted. Set os.EX_OK so that system process
# monitors like Supervisor do not attempt a restart immediately.
await graceful_shutdown(fix_pipeline, error=e)
except KeyboardInterrupt:
logger.info("Received keyboard interrupt! Initiating shutdown...")
await graceful_shutdown(fix_pipeline)
except Exception as e:
await graceful_shutdown(fix_pipeline, error=e)
exit_code = os.EX_UNAVAILABLE # Abnormal termination
finally:
# Report tasks that are still running after shutdown.
tasks = [
task
for task in asyncio.all_tasks()
if task is not asyncio.current_task() and not task.cancelled()
]
if tasks:
task_output = "\n".join(str(task) for task in tasks)
logger.warning(
f"There are still {len(tasks)} tasks running that have not been cancelled! Cancelling them now...\n"
f"{task_output}."
)
for task in tasks:
task.cancel()
sys.exit(exit_code)
if __name__ == "__main__":
asyncio.run(main())