Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add black code formatter #90

Merged
merged 1 commit into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[flake8]
max-line-length = 88
extend-ignore = E203, E704
2 changes: 2 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ jobs:
- name: Checkout repository
uses: actions/checkout@v4

- uses: psf/black@stable

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
Expand Down
5 changes: 5 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"editor.defaultFormatter": "ms-python.black-formatter",
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}
44 changes: 28 additions & 16 deletions gcn_classic_to_kafka/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@


def signal_handler(signum, frame):
log.info('Exiting due to signal %d', signum)
log.info("Exiting due to signal %d", signum)

Check warning on line 26 in gcn_classic_to_kafka/cli.py

View check run for this annotation

Codecov / codecov/patch

gcn_classic_to_kafka/cli.py#L26

Added line #L26 was not covered by tests
sys.exit(128 + signum)


Expand All @@ -36,19 +36,31 @@
# Parse netloc like it is done for HTTP URLs.
# This ensures that we will get the correct behavior for hostname:port
# splitting even for IPv6 addresses.
return urllib.parse.urlparse(f'http://{host_port_str}')
return urllib.parse.urlparse(f"http://{host_port_str}")

Check warning on line 39 in gcn_classic_to_kafka/cli.py

View check run for this annotation

Codecov / codecov/patch

gcn_classic_to_kafka/cli.py#L39

Added line #L39 was not covered by tests


@click.command()
@click.option(
'--listen', type=host_port, default=':8081', show_default=True,
help='Hostname and port to listen on for GCN Classic')
"--listen",
type=host_port,
default=":8081",
show_default=True,
help="Hostname and port to listen on for GCN Classic",
)
@click.option(
'--prometheus', type=host_port, default=':8000', show_default=True,
help='Hostname and port to listen on for Prometheus metric reporting')
"--prometheus",
type=host_port,
default=":8000",
show_default=True,
help="Hostname and port to listen on for Prometheus metric reporting",
)
@click.option(
'--loglevel', type=click.Choice(logging._levelToName.values()),
default='DEBUG', show_default=True, help='Log level')
"--loglevel",
type=click.Choice(logging._levelToName.values()),
default="DEBUG",
show_default=True,
help="Log level",
)
def main(listen, prometheus, loglevel):
"""Pump GCN Classic notices to a Kafka broker.

Expand All @@ -64,21 +76,21 @@
"""
logging.basicConfig(level=loglevel)

prometheus_client.start_http_server(prometheus.port,
prometheus.hostname or '0.0.0.0')
log.info('Prometheus listening on %s', prometheus.netloc)
prometheus_client.start_http_server(

Check warning on line 79 in gcn_classic_to_kafka/cli.py

View check run for this annotation

Codecov / codecov/patch

gcn_classic_to_kafka/cli.py#L79

Added line #L79 was not covered by tests
prometheus.port, prometheus.hostname or "0.0.0.0"
)
log.info("Prometheus listening on %s", prometheus.netloc)

Check warning on line 82 in gcn_classic_to_kafka/cli.py

View check run for this annotation

Codecov / codecov/patch

gcn_classic_to_kafka/cli.py#L82

Added line #L82 was not covered by tests

config = gcn_kafka.config_from_env()
config['client.id'] = __package__
config['on_delivery'] = kafka_delivered_cb
config["client.id"] = __package__
config["on_delivery"] = kafka_delivered_cb

Check warning on line 86 in gcn_classic_to_kafka/cli.py

View check run for this annotation

Codecov / codecov/patch

gcn_classic_to_kafka/cli.py#L85-L86

Added lines #L85 - L86 were not covered by tests

producer = gcn_kafka.Producer(config)
client = client_connected(producer)

async def serve():
server = await asyncio.start_server(
client, listen.hostname, listen.port)
log.info('GCN server listening on %s', listen.netloc)
server = await asyncio.start_server(client, listen.hostname, listen.port)
log.info("GCN server listening on %s", listen.netloc)

Check warning on line 93 in gcn_classic_to_kafka/cli.py

View check run for this annotation

Codecov / codecov/patch

gcn_classic_to_kafka/cli.py#L92-L93

Added lines #L92 - L93 were not covered by tests
async with server:
await server.serve_forever()

Expand Down
4 changes: 2 additions & 2 deletions gcn_classic_to_kafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
try:
notice_type_enum = gcn.NoticeType(notice_type_int)
except ValueError:
notice_type_str = 'UNKNOWN'
notice_type_str = "UNKNOWN"

Check warning on line 41 in gcn_classic_to_kafka/common.py

View check run for this annotation

Codecov / codecov/patch

gcn_classic_to_kafka/common.py#L41

Added line #L41 was not covered by tests
else:
notice_type_str = notice_type_enum.name
return notice_type_str
Expand Down Expand Up @@ -66,4 +66,4 @@
'gcn.classic.voevent.LVC_PRELIMINARY'

"""
return f'gcn.classic.{flavor}.{notice_type_str}'
return f"gcn.classic.{flavor}.{notice_type_str}"
29 changes: 16 additions & 13 deletions gcn_classic_to_kafka/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,25 @@
import prometheus_client

connected = prometheus_client.Gauge(
'connected', 'Number of active connections from GCN Classic',
namespace=__package__)
"connected", "Number of active connections from GCN Classic", namespace=__package__
)

iamalive = prometheus_client.Counter(
'iamalive',
'GCN notices received of any type, including iamalives',
namespace=__package__)
"iamalive",
"GCN notices received of any type, including iamalives",
namespace=__package__,
)

received = prometheus_client.Counter(
'received',
'GCN Classic notices received by notice type and flavor',
labelnames=['notice_type_int', 'notice_type_str', 'flavor'],
namespace=__package__)
"received",
"GCN Classic notices received by notice type and flavor",
labelnames=["notice_type_int", "notice_type_str", "flavor"],
namespace=__package__,
)

delivered = prometheus_client.Counter(
'delivered',
'Kafka messages delivered',
labelnames=['topic', 'partition', 'successful'],
namespace=__package__)
"delivered",
"Kafka messages delivered",
labelnames=["topic", "partition", "successful"],
namespace=__package__,
)
49 changes: 26 additions & 23 deletions gcn_classic_to_kafka/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,34 @@
log = logging.getLogger(__name__)

bin_len = 160
int4 = struct.Struct('!l')
ignore_notice_types = {gcn.NoticeType.IM_ALIVE,
gcn.NoticeType.VOE_11_IM_ALIVE,
gcn.NoticeType.VOE_20_IM_ALIVE}
int4 = struct.Struct("!l")
ignore_notice_types = {
gcn.NoticeType.IM_ALIVE,
gcn.NoticeType.VOE_11_IM_ALIVE,
gcn.NoticeType.VOE_20_IM_ALIVE,
}


def client_connected(producer: confluent_kafka.Producer, timeout: float = 90):

async def client_connected_cb(reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
async def client_connected_cb(
reader: asyncio.StreamReader, writer: asyncio.StreamWriter
):
async def read():
bin_data = await reader.readexactly(bin_len)
voe_len, = int4.unpack(await reader.readexactly(int4.size))
(voe_len,) = int4.unpack(await reader.readexactly(int4.size))
voe_data = await reader.readexactly(voe_len)
txt_len, = int4.unpack(await reader.readexactly(int4.size))
(txt_len,) = int4.unpack(await reader.readexactly(int4.size))
txt_data = await reader.readexactly(txt_len)
log.debug('Read %d + %d + %d bytes', bin_len, voe_len, txt_len)
log.debug("Read %d + %d + %d bytes", bin_len, voe_len, txt_len)
return bin_data, voe_data, txt_data

async def process():
bin_data, voe_data, txt_data = await asyncio.wait_for(
read(), timeout)
bin_data, voe_data, txt_data = await asyncio.wait_for(read(), timeout)
metrics.iamalive.inc()

bin_notice_type, = int4.unpack_from(bin_data)
log.info('Received notice of type 0x%08X', bin_notice_type)
(bin_notice_type,) = int4.unpack_from(bin_data)
log.info("Received notice of type 0x%08X", bin_notice_type)
if bin_notice_type in ignore_notice_types:
return

Expand All @@ -54,35 +56,36 @@ async def process():

if bin_notice_type != voe_notice_type:
log.warning(
'Binary (0x%08X) and VOEvent (0x%08X) notice types differ',
bin_notice_type, voe_notice_type)
"Binary (0x%08X) and VOEvent (0x%08X) notice types differ",
bin_notice_type,
voe_notice_type,
)

# The text notices do not contain a machine-readable notice type.
txt_notice_type = bin_notice_type

for notice_type_int, data, flavor in [
[bin_notice_type, bin_data, 'binary'],
[voe_notice_type, voe_data, 'voevent'],
[txt_notice_type, txt_data, 'text'],
[bin_notice_type, bin_data, "binary"],
[voe_notice_type, voe_data, "voevent"],
[txt_notice_type, txt_data, "text"],
]:
notice_type_str = notice_type_int_to_str(notice_type_int)
metrics.received.labels(
notice_type_int, notice_type_str, flavor).inc()
metrics.received.labels(notice_type_int, notice_type_str, flavor).inc()
topic = topic_for_notice_type_str(notice_type_str, flavor)
producer.produce(topic, data)

# Wait for any outstanding messages to be delivered and delivery
# report callbacks to be triggered.
producer.poll(0)

peer, *_ = writer.get_extra_info('peername')
log.info('Client connected from %s', peer)
peer, *_ = writer.get_extra_info("peername")
log.info("Client connected from %s", peer)
try:
with metrics.connected.track_inprogress():
while True:
await process()
finally:
log.info('Closing connection from %s', peer)
log.info("Closing connection from %s", peer)
writer.close()
await writer.wait_closed()

Expand Down
24 changes: 15 additions & 9 deletions gcn_classic_to_kafka/test/test_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,24 @@ def make_packet(bin_notice_type, voe_notice_type=None):
</What>
</voe:VOEvent>
""".encode()
text = b'Hello world'
text = b"Hello world"
return (
struct.pack('!l156xl', bin_notice_type, len(voevent)) + voevent +
struct.pack('!l', len(text)) + text)
struct.pack("!l156xl", bin_notice_type, len(voevent))
+ voevent
+ struct.pack("!l", len(text))
+ text
)


@pytest_asyncio.fixture
async def start_server():
producer = mock.Mock()
cb = client_connected(producer=producer, timeout=timeout)
server = await asyncio.start_server(cb, '127.0.0.1')
server = await asyncio.start_server(cb, "127.0.0.1")

async with server:
server_task = asyncio.create_task(server.serve_forever())
socket, = server.sockets
(socket,) = server.sockets
host, port = socket.getsockname()
yield producer, host, port
server_task.cancel()
Expand Down Expand Up @@ -96,10 +99,13 @@ async def test_socket(start_server):
await writer.drain()
await asyncio.sleep(0.25 * timeout)
assert not reader.at_eof()
producer.produce.assert_has_calls([
mock.call('gcn.classic.binary.LVC_TEST', mock.ANY),
mock.call('gcn.classic.voevent.LVC_TEST', mock.ANY),
mock.call('gcn.classic.text.LVC_TEST', mock.ANY)])
producer.produce.assert_has_calls(
[
mock.call("gcn.classic.binary.LVC_TEST", mock.ANY),
mock.call("gcn.classic.voevent.LVC_TEST", mock.ANY),
mock.call("gcn.classic.text.LVC_TEST", mock.ANY),
]
)

writer.close()
await writer.wait_closed()
Expand Down
Loading
Loading