Skip to content

Commit

Permalink
tests: style: fix names
Browse files Browse the repository at this point in the history
  • Loading branch information
jimf5 committed Dec 10, 2023
1 parent d19c007 commit c36921b
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 75 deletions.
58 changes: 29 additions & 29 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@


def self_signed_cert(test_dir):
name = "localhost"
k = crypto.PKey()
k.generate_key(crypto.TYPE_RSA, 2048)
cert = crypto.X509()
cert.get_subject().CN = name
cert.set_issuer(cert.get_subject())
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(365 * 86400) # 365 days
cert.set_pubkey(k)
cert.sign(k, "sha512")
(test_dir / f"{name}.key").write_text(
crypto.dump_privatekey(crypto.FILETYPE_PEM, k).decode("utf-8")
_name = "localhost"
_k = crypto.PKey()
_k.generate_key(crypto.TYPE_RSA, 2048)
_cert = crypto.X509()
_cert.get_subject().CN = _name
_cert.set_issuer(_cert.get_subject())
_cert.gmtime_adj_notBefore(0)
_cert.gmtime_adj_notAfter(365 * 86400) # 365 days
_cert.set_pubkey(_k)
_cert.sign(_k, "sha512")
(test_dir / f"{_name}.key").write_text(
crypto.dump_privatekey(crypto.FILETYPE_PEM, _k).decode("utf-8")
)
(test_dir / f"{name}.crt").write_text(
crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode("utf-8")
(test_dir / f"{_name}.crt").write_text(
crypto.dump_certificate(crypto.FILETYPE_PEM, _cert).decode("utf-8")
)


Expand All @@ -49,28 +49,28 @@ def testdir(tmp_path_factory):

@pytest.fixture(scope="module")
def nginx_config(request, testdir, logger):
tmpl = Environment().from_string(request.module.NGINX_CONFIG)
settings = request.param
test_globals = (
_tmpl = Environment().from_string(request.module.NGINX_CONFIG)
_settings = request.param
_test_globals = (
f"pid {testdir}/nginx.pid;\nerror_log {testdir}/error.log debug;\n"
)
test_globals += os.getenv("TEST_NGINX_GLOBALS", "")
settings["test_globals"] = test_globals
test_globals_http = f"root {testdir};\naccess_log {testdir}/access.log;\n"
test_globals_http += os.getenv("TEST_NGINX_GLOBALS_HTTP", "")
settings["test_globals_http"] = test_globals_http
test_globals_stream = os.getenv("TEST_NGINX_GLOBALS_STREAM", "")
settings["test_globals_stream"] = test_globals_stream
conf = tmpl.render(settings)
logger.debug(conf)
return conf
_test_globals += os.getenv("TEST_NGINX_GLOBALS", "")
_settings["test_globals"] = _test_globals
_test_globals_http = f"root {testdir};\naccess_log {testdir}/access.log;\n"
_test_globals_http += os.getenv("TEST_NGINX_GLOBALS_HTTP", "")
_settings["test_globals_http"] = _test_globals_http
_test_globals_stream = os.getenv("TEST_NGINX_GLOBALS_STREAM", "")
_settings["test_globals_stream"] = _test_globals_stream
_conf = _tmpl.render(_settings)
logger.debug(_conf)
return _conf


@pytest.fixture(scope="module")
def nginx(testdir, nginx_config, certs, logger):
# logger.debug(CAPABILITIES)
(testdir / "nginx.conf").write_text(nginx_config)
args = [
_args = [
NGINX_BINARY,
"-p",
f"{testdir}",
Expand All @@ -80,7 +80,7 @@ def nginx(testdir, nginx_config, certs, logger):
"error.log",
]
logger.info("Starting nginx...")
_nginx = subprocess.Popen(args)
_nginx = subprocess.Popen(_args)
logger.debug(f"path={NGINX_BINARY}")
logger.debug(f"args={' '.join(_nginx.args[1:])}")
logger.debug(f"pid={_nginx.pid}")
Expand Down
16 changes: 8 additions & 8 deletions tests/otelcol_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ def _collect(spans):

@pytest.fixture(scope="module")
def otelcollector(logger):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
trace_service_pb2_grpc.add_TraceServiceServicer_to_server(
TraceService(), server
TraceService(), _server
)
listen_addr = "localhost:4317"
server.add_insecure_port(listen_addr)
server.start()
logger.info(f"Starting OTelcol server on {listen_addr}")
_listen_addr = "localhost:4317"
_server.add_insecure_port(_listen_addr)
_server.start()
logger.info(f"Starting OTelcol server on {_listen_addr}")
yield
logger.info(f"Stopping OTelcol server on {listen_addr}")
server.stop(grace=None)
logger.info(f"Stopping OTelcol server on {_listen_addr}")
_server.stop(grace=None)


@pytest.fixture
Expand Down
76 changes: 38 additions & 38 deletions tests/test_otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def rheaders():

@pytest.fixture
def span_list(http_ver, spans):
span_list = []
_span_list = []
for _spans in (
_res[0].scope_spans[0].spans
for _res in (
Expand All @@ -128,51 +128,51 @@ def span_list(http_ver, spans):
== f"test_http{http_ver}"
)
):
span_list += _spans
return span_list
_span_list += _spans
return _span_list


@pytest.fixture
def session(http_ver, url):
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
with niquests.Session(multiplexed=True) as s:
with niquests.Session(multiplexed=True) as _s:
if http_ver == 3:
parsed = urlparse(url)
assert parsed.scheme == "https", "Only https:// URLs are supported."
host = parsed.hostname
if parsed.port is not None:
port = parsed.port
_parsed = urlparse(url)
assert (
_parsed.scheme == "https"
), "Only https:// URLs are supported."
if _parsed.port is not None:
_port = _parsed.port
else:
port = 8443
s.quic_cache_layer.add_domain(host, port)
yield s
_port = 8443
_s.quic_cache_layer.add_domain(_parsed.hostname, _port)
yield _s


@pytest.fixture
def simple_client(url, logger):
def do_get(sock, path):
http_send = f"GET {parsed.path}\n".encode()
logger.debug(f"{http_send=}")
sock.sendall(http_send)
http_recv = ssock.recv(1024)
logger.debug(f"{http_recv=}")
return http_recv.decode("utf-8")

parsed = urlparse(url)
host = parsed.hostname
if parsed.port is not None:
port = parsed.port
_http_send = f"GET {path}\n".encode()
logger.debug(f"{_http_send=}")
sock.sendall(_http_send)
_http_recv = sock.recv(1024)
logger.debug(f"{_http_recv=}")
return _http_recv.decode("utf-8")

_parsed = urlparse(url)
if _parsed.port is not None:
_port = _parsed.port
else:
port = 8443 if parsed.scheme == "https" else 8080
with socket.create_connection((host, port)) as sock:
if parsed.scheme == "https":
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
with ctx.wrap_socket(sock, server_hostname="localhost") as ssock:
yield do_get(ssock, parsed.path)
_port = 8443 if _parsed.scheme == "https" else 8080
with socket.create_connection((_parsed.hostname, _port)) as _sock:
if _parsed.scheme == "https":
_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
_ctx.check_hostname = False
_ctx.verify_mode = ssl.CERT_NONE
with _ctx.wrap_socket(_sock, server_hostname="localhost") as _ssock:
yield do_get(_ssock, _parsed.path)
else:
yield do_get(sock, parsed.path)
yield do_get(_sock, _parsed.path)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -324,10 +324,10 @@ class TestOTelGenerateSpans:
],
)
def test_do_requests(self, session, http_ver, url, headers, response):
r = session.get(url, headers=headers, verify=False)
collect_rheaders(r.headers)
assert r.status_code == 200
assert r.text == response
_r = session.get(url, headers=headers, verify=False)
collect_rheaders(_r.headers)
assert _r.status_code == 200
assert _r.text == response


@pytest.mark.parametrize(
Expand Down Expand Up @@ -502,8 +502,8 @@ def test_custom_metrics(
def test_variables(self, http_ver, span_list, rheaders, name, value, idx):
if http_ver == 0:
if "Parent" not in name and value.startswith("span_list"):
id_len = 16 if "span_id" in value else 32
assert len(binascii.hexlify(eval(value)).decode()) == id_len
_id_len = 16 if "span_id" in value else 32
assert len(binascii.hexlify(eval(value)).decode()) == _id_len
else:
pytest.skip("no headers support")
else:
Expand Down

0 comments on commit c36921b

Please sign in to comment.