Skip to content

Commit

Permalink
Add optional Session to Service to be able to reuse connections (#1183)
Browse files Browse the repository at this point in the history
**Pull Request Checklist**
- [ ] Fixes #<!--issue number goes here-->
- [x] Tests added
- [ ] Documentation/examples added
- [x] [Good commit messages](https://cbea.ms/git-commit/) and/or PR
title

**Description of PR**

The WorkflowService uses direct calls to requests library, which under
the hood creates a Session, makes the request, then closes the session.

This means that the TCP connection is opened and closed for each call
instead of being reused - which causes unnecessary overhead if the
service is making multiple calls.

This PR adds the possibility to use a Session object in the Service, by
default not enabled to keep the current behavior as-is.

From a performance point of view, if Argo is running locally the
advantage will be virtually non-existent ; however, we can simulate the
outcome with a remote Argo server using
[toxiproxy](https://github.com/Shopify/toxiproxy/):


```bash
toxiproxy-cli create --listen localhost:2747 --upstream localhost:2746 argo 
toxiproxy-cli toxic add -t latency -a latency=20 -a jitter=5 argo  
```

Using ipython and the magic command %timeit:
```
from hera.workflows import WorkflowsService

# default current behavior: no session
ws = WorkflowsService("https://localhost:2747", verify_ssl=False)

%timeit -n10 -r10 ws.list_workflows("argo")
133 ms ± 2.17 ms per loop (mean ± std. dev. of 10 runs, 10 loops each)

ws = WorkflowsService("https://localhost:2747", verify_ssl=False, use_session=True)
%timeit -n10 -r10 ws.list_workflows("argo")
77.4 ms ± 2.01 ms per loop (mean ± std. dev. of 10 runs, 10 loops each)
```

In toxiproxy logs (and the kubectl port-forward logs) , we can see that
the tcp open is done now only once.


Thanks!

---------

Signed-off-by: Timothé Perez <achille.ash@gmail.com>
Signed-off-by: Elliot Gunton <elliotgunton@gmail.com>
Co-authored-by: Elliot Gunton <elliotgunton@gmail.com>
  • Loading branch information
AchilleAsh and elliotgunton authored Sep 10, 2024
1 parent 1d8a17c commit e09ce27
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 75 deletions.
22 changes: 21 additions & 1 deletion scripts/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ def __str__(self) -> str:
return f"""
{signature}
assert valid_host_scheme(self.host), "The host scheme is required for service usage"
resp = requests.{self.method}(
resp = self._request(
method="{self.method}",
url={req_url},
params={params},
headers={headers},
Expand Down Expand Up @@ -422,6 +423,7 @@ def __init__(
token: Optional[str] = None,
client_certs: Optional[Tuple[str, str]] = None,
namespace: Optional[str] = None,
session: Optional[requests.Session] = None,
) -> None:
\"\"\"{models_type} service constructor.\"\"\"
self.host = cast(str, host or global_config.host)
Expand All @@ -448,7 +450,25 @@ def format_token(t):
else:
self.token = None
self.session = session or requests.Session()
self.namespace = namespace or global_config.namespace
def _request(self, method, **kwargs):
\"\"\"Make a request using the session if enabled.\"\"\"
return self.session.request(method, **kwargs)
def close(self):
\"\"\"Close the service session.\"\"\"
self.session.close()
def __enter__(self):
\"\"\"Open the service - session doesn't need to be opened.\"\"\"
return self
def __exit__(self, *_):
\"\"\"Close the service.\"\"\"
self.close()
"""


Expand Down
91 changes: 67 additions & 24 deletions src/hera/events/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __init__(
token: Optional[str] = None,
client_certs: Optional[Tuple[str, str]] = None,
namespace: Optional[str] = None,
session: Optional[requests.Session] = None,
) -> None:
"""Events service constructor."""
self.host = cast(str, host or global_config.host)
Expand All @@ -73,8 +74,26 @@ def format_token(t):
else:
self.token = None

self.session = session or requests.Session()

self.namespace = namespace or global_config.namespace

def _request(self, method, **kwargs):
"""Make a request using the session if enabled."""
return self.session.request(method, **kwargs)

def close(self):
"""Close the service session."""
self.session.close()

def __enter__(self):
"""Open the service - session doesn't need to be opened."""
return self

def __exit__(self, *_):
"""Close the service."""
self.close()

def list_event_sources(
self,
namespace: Optional[str] = None,
Expand All @@ -90,7 +109,8 @@ def list_event_sources(
) -> EventSourceList:
"""API documentation."""
assert valid_host_scheme(self.host), "The host scheme is required for service usage"
resp = requests.get(
resp = self._request(
method="get",
url=urljoin(self.host, "api/v1/event-sources/{namespace}").format(
namespace=namespace if namespace is not None else self.namespace
),
Expand Down Expand Up @@ -119,7 +139,8 @@ def list_event_sources(
def create_event_source(self, req: CreateEventSourceRequest, namespace: Optional[str] = None) -> EventSource:
"""API documentation."""
assert valid_host_scheme(self.host), "The host scheme is required for service usage"
resp = requests.post(
resp = self._request(
method="post",
url=urljoin(self.host, "api/v1/event-sources/{namespace}").format(
namespace=namespace if namespace is not None else self.namespace
),
Expand All @@ -140,7 +161,8 @@ def create_event_source(self, req: CreateEventSourceRequest, namespace: Optional
def get_event_source(self, name: str, namespace: Optional[str] = None) -> EventSource:
"""API documentation."""
assert valid_host_scheme(self.host), "The host scheme is required for service usage"
resp = requests.get(
resp = self._request(
method="get",
url=urljoin(self.host, "api/v1/event-sources/{namespace}/{name}").format(
name=name, namespace=namespace if namespace is not None else self.namespace
),
Expand All @@ -161,7 +183,8 @@ def update_event_source(
) -> EventSource:
"""API documentation."""
assert valid_host_scheme(self.host), "The host scheme is required for service usage"
resp = requests.put(
resp = self._request(
method="put",
url=urljoin(self.host, "api/v1/event-sources/{namespace}/{name}").format(
name=name, namespace=namespace if namespace is not None else self.namespace
),
Expand Down Expand Up @@ -192,7 +215,8 @@ def delete_event_source(
) -> EventSourceDeletedResponse:
"""API documentation."""
assert valid_host_scheme(self.host), "The host scheme is required for service usage"
resp = requests.delete(
resp = self._request(
method="delete",
url=urljoin(self.host, "api/v1/event-sources/{namespace}/{name}").format(
name=name, namespace=namespace if namespace is not None else self.namespace
),
Expand All @@ -218,7 +242,8 @@ def delete_event_source(
def receive_event(self, discriminator: str, req: Item, namespace: Optional[str] = None) -> EventResponse:
"""API documentation."""
assert valid_host_scheme(self.host), "The host scheme is required for service usage"
resp = requests.post(
resp = self._request(
method="post",
url=urljoin(self.host, "api/v1/events/{namespace}/{discriminator}").format(
discriminator=discriminator, namespace=namespace if namespace is not None else self.namespace
),
Expand All @@ -239,7 +264,8 @@ def receive_event(self, discriminator: str, req: Item, namespace: Optional[str]
def get_info(self) -> InfoResponse:
"""API documentation."""
assert valid_host_scheme(self.host), "The host scheme is required for service usage"
resp = requests.get(
resp = self._request(
method="get",
url=urljoin(self.host, "api/v1/info"),
params=None,
headers={"Authorization": self.token},
Expand Down Expand Up @@ -268,7 +294,8 @@ def list_sensors(
) -> SensorList:
"""API documentation."""
assert valid_host_scheme(self.host), "The host scheme is required for service usage"
resp = requests.get(
resp = self._request(
method="get",
url=urljoin(self.host, "api/v1/sensors/{namespace}").format(
namespace=namespace if namespace is not None else self.namespace
),
Expand Down Expand Up @@ -297,7 +324,8 @@ def list_sensors(
def create_sensor(self, req: CreateSensorRequest, namespace: Optional[str] = None) -> Sensor:
"""API documentation."""
assert valid_host_scheme(self.host), "The host scheme is required for service usage"
resp = requests.post(
resp = self._request(
method="post",
url=urljoin(self.host, "api/v1/sensors/{namespace}").format(
namespace=namespace if namespace is not None else self.namespace
),
Expand All @@ -318,7 +346,8 @@ def create_sensor(self, req: CreateSensorRequest, namespace: Optional[str] = Non
def get_sensor(self, name: str, namespace: Optional[str] = None, resource_version: Optional[str] = None) -> Sensor:
"""API documentation."""
assert valid_host_scheme(self.host), "The host scheme is required for service usage"
resp = requests.get(
resp = self._request(
method="get",
url=urljoin(self.host, "api/v1/sensors/{namespace}/{name}").format(
name=name, namespace=namespace if namespace is not None else self.namespace
),
Expand All @@ -337,7 +366,8 @@ def get_sensor(self, name: str, namespace: Optional[str] = None, resource_versio
def update_sensor(self, name: str, req: UpdateSensorRequest, namespace: Optional[str] = None) -> Sensor:
"""API documentation."""
assert valid_host_scheme(self.host), "The host scheme is required for service usage"
resp = requests.put(
resp = self._request(
method="put",
url=urljoin(self.host, "api/v1/sensors/{namespace}/{name}").format(
name=name, namespace=namespace if namespace is not None else self.namespace
),
Expand Down Expand Up @@ -368,7 +398,8 @@ def delete_sensor(
) -> DeleteSensorResponse:
"""API documentation."""
assert valid_host_scheme(self.host), "The host scheme is required for service usage"
resp = requests.delete(
resp = self._request(
method="delete",
url=urljoin(self.host, "api/v1/sensors/{namespace}/{name}").format(
name=name, namespace=namespace if namespace is not None else self.namespace
),
Expand Down Expand Up @@ -406,7 +437,8 @@ def watch_event_sources(
) -> EventSourceWatchEvent:
"""API documentation."""
assert valid_host_scheme(self.host), "The host scheme is required for service usage"
resp = requests.get(
resp = self._request(
method="get",
url=urljoin(self.host, "api/v1/stream/event-sources/{namespace}").format(
namespace=namespace if namespace is not None else self.namespace
),
Expand Down Expand Up @@ -452,7 +484,8 @@ def event_sources_logs(
) -> EventsourceLogEntry:
"""API documentation."""
assert valid_host_scheme(self.host), "The host scheme is required for service usage"
resp = requests.get(
resp = self._request(
method="get",
url=urljoin(self.host, "api/v1/stream/event-sources/{namespace}/logs").format(
namespace=namespace if namespace is not None else self.namespace
),
Expand Down Expand Up @@ -498,7 +531,8 @@ def watch_events(
) -> Event:
"""API documentation."""
assert valid_host_scheme(self.host), "The host scheme is required for service usage"
resp = requests.get(
resp = self._request(
method="get",
url=urljoin(self.host, "api/v1/stream/events/{namespace}").format(
namespace=namespace if namespace is not None else self.namespace
),
Expand Down Expand Up @@ -539,7 +573,8 @@ def watch_sensors(
) -> SensorWatchEvent:
"""API documentation."""
assert valid_host_scheme(self.host), "The host scheme is required for service usage"
resp = requests.get(
resp = self._request(
method="get",
url=urljoin(self.host, "api/v1/stream/sensors/{namespace}").format(
namespace=namespace if namespace is not None else self.namespace
),
Expand Down Expand Up @@ -584,7 +619,8 @@ def sensors_logs(
) -> SensorLogEntry:
"""API documentation."""
assert valid_host_scheme(self.host), "The host scheme is required for service usage"
resp = requests.get(
resp = self._request(
method="get",
url=urljoin(self.host, "api/v1/stream/sensors/{namespace}/logs").format(
namespace=namespace if namespace is not None else self.namespace
),
Expand Down Expand Up @@ -617,7 +653,8 @@ def sensors_logs(
def get_user_info(self) -> GetUserInfoResponse:
"""API documentation."""
assert valid_host_scheme(self.host), "The host scheme is required for service usage"
resp = requests.get(
resp = self._request(
method="get",
url=urljoin(self.host, "api/v1/userinfo"),
params=None,
headers={"Authorization": self.token},
Expand All @@ -634,7 +671,8 @@ def get_user_info(self) -> GetUserInfoResponse:
def get_version(self) -> Version:
"""API documentation."""
assert valid_host_scheme(self.host), "The host scheme is required for service usage"
resp = requests.get(
resp = self._request(
method="get",
url=urljoin(self.host, "api/v1/version"),
params=None,
headers={"Authorization": self.token},
Expand All @@ -659,7 +697,8 @@ def get_artifact_file(
) -> str:
"""Get an artifact."""
assert valid_host_scheme(self.host), "The host scheme is required for service usage"
resp = requests.get(
resp = self._request(
method="get",
url=urljoin(
self.host,
"artifact-files/{namespace}/{idDiscriminator}/{id}/{nodeId}/{artifactDiscriminator}/{artifactName}",
Expand All @@ -686,7 +725,8 @@ def get_artifact_file(
def get_output_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) -> str:
"""Get an output artifact by UID."""
assert valid_host_scheme(self.host), "The host scheme is required for service usage"
resp = requests.get(
resp = self._request(
method="get",
url=urljoin(self.host, "artifacts-by-uid/{uid}/{nodeId}/{artifactName}").format(
uid=uid, nodeId=node_id, artifactName=artifact_name
),
Expand All @@ -705,7 +745,8 @@ def get_output_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str)
def get_output_artifact(self, name: str, node_id: str, artifact_name: str, namespace: Optional[str] = None) -> str:
"""Get an output artifact."""
assert valid_host_scheme(self.host), "The host scheme is required for service usage"
resp = requests.get(
resp = self._request(
method="get",
url=urljoin(self.host, "artifacts/{namespace}/{name}/{nodeId}/{artifactName}").format(
name=name,
nodeId=node_id,
Expand All @@ -727,7 +768,8 @@ def get_output_artifact(self, name: str, node_id: str, artifact_name: str, names
def get_input_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) -> str:
"""Get an input artifact by UID."""
assert valid_host_scheme(self.host), "The host scheme is required for service usage"
resp = requests.get(
resp = self._request(
method="get",
url=urljoin(self.host, "input-artifacts-by-uid/{uid}/{nodeId}/{artifactName}").format(
uid=uid, nodeId=node_id, artifactName=artifact_name
),
Expand All @@ -746,7 +788,8 @@ def get_input_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str)
def get_input_artifact(self, name: str, node_id: str, artifact_name: str, namespace: Optional[str] = None) -> str:
"""Get an input artifact."""
assert valid_host_scheme(self.host), "The host scheme is required for service usage"
resp = requests.get(
resp = self._request(
method="get",
url=urljoin(self.host, "input-artifacts/{namespace}/{name}/{nodeId}/{artifactName}").format(
name=name,
nodeId=node_id,
Expand Down
Loading

0 comments on commit e09ce27

Please sign in to comment.