Skip to content

Commit

Permalink
feat(IASOClient): implement IASO client
Browse files Browse the repository at this point in the history
  • Loading branch information
nazarfil authored Jun 13, 2024
1 parent 4df120c commit a46e67e
Show file tree
Hide file tree
Showing 9 changed files with 674 additions and 8 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ pip install openhexa.toolbox

## Modules

[**openhexa.toolbox.dhis2**](docs/dhis2.md)<br>
Acquire and process data from DHIS2 instances
[**openhexa.toolbox.dhis2**](docs/dhis2.md) - Acquire and process data from DHIS2 instances <br>
[**openhexa.toolbox.iaso**](docs/iaso.md) - Acquire and process data from IASO instances <br>
51 changes: 51 additions & 0 deletions docs/iaso.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@

# OpenHEXA Toolbox IASO

Module to fetch data from IASO

* [Installation](#installation)
* [Usage](#usage)
* [Connect to an instance](#connect-to-an-instance)
* [Read data](#read-data)

## [Installation](#)

``` sh
pip install openhexa.toolbox
```

## [Usage](#)

### [Connect to an instance](#)
Credentials are required to initialize a connection to IASO instance. Credentials should contain the username and
password to connect to an instance of IASO. You have as well to provide the host name to for the api to connect to:
* Staging environment https://iaso-staging.bluesquare.org/api
* Production environment https://iaso.bluesquare.org/api

Import IASO module as:
```
from openhexa.toolbox.iaso import IASO
iaso = IASO("https://iaso-staging.bluesquare.org","username", "password")
```

### [Read data](#)
After importing IASO module, you can use provided method to fetch Projects, Organisation Units and Forms that you have
permissions for.
```
# Fetch projects
iaso.get_projects()
# Fetch organisation units
iaso.get_org_units()
# Fetch submitted forms filtered by form_ids passed in url parameters and with choice to fetch them as dataframe
iaso.get_form_instances(page=1, limit=1, as_dataframe=True,
dataframe_columns=["Date de création","Date de modification","Org unit"], ids=276)
# Fetch forms filtered by organisaiton units and projects that you have permissions to
iaso.get_forms(org_units=[781], projects=[149])
```

You can as well provide additional parameters to the method to filter on desired values as key value arguments.
You can have an overview on the arguments you can filter on API documentation of IASO.



4 changes: 4 additions & 0 deletions openhexa/toolbox/iaso/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .api_client import ApiClient
from .iaso import IASO

__all__ = ["IASO", "ApiClient"]
135 changes: 135 additions & 0 deletions openhexa/toolbox/iaso/api_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import logging
from datetime import datetime, timezone
from typing import Union

import requests
import jwt
from requests.adapters import HTTPAdapter
from urllib3 import Retry


class IASOError(Exception):
"""
Base exception for IASO API errors.
"""

def __init__(self, message: str):
self.message = message
super().__init__(self.message)
self.log_error()

def __str__(self):
return self.message

def log_error(self):
logging.error(f"IASO Error : {self.message}")


class ApiClient(requests.Session):
"""
Client to manage HTTP session with IASO API on behalf of OpenHexa toolbox
"""

def __init__(self, server_url: str, username: str, password: str):
"""
Initialize the IASO API client.
:param server_url: IASO server URL
:param username: IASO instance username
:param password: IASO instance password
Examples:
>>> client = ApiClient(server_url="http://localhost:8080", username="admin", password="<PASSWORD>")
"""
super().__init__()
self.server_url = server_url.rstrip("/")
self.username = username
self.password = password
self.headers.update(
{
"User-Agent": "Openhexa-Toolbox",
}
)
self.token = None
self.token_expiry = None
self._refresh_token = None
self.authenticate()

def request(self, method: str, url: str, *args, **kwargs) -> requests.Response:
"""
Sends HTTP request to IASO API, handles exceptions raised during request
"""
full_url = f"{self.server_url}/{url.strip('/')}/"
try:
resp = super().request(method, full_url, *args, **kwargs)
self.raise_if_error(resp)
return resp
except requests.RequestException as exc:
logging.exception(exc)
raise

def authenticate(self) -> None:
"""
Authenticates with OpenHexa API with username and password.
Calling the endpoints to fetch authorization and refresh token.
Ensures that failures are handles with status management, both with or without SSL communication
"""
credentials = {"username": self.username, "password": self.password}
response = self.request("POST", "/api/token/", json=credentials)
json_data = response.json()
self.token = json_data["access"]
self.token_expiry = self.decode_token_expiry(self.token)
self._refresh_token = json_data["refresh"]
self.headers.update({"Authorization": f"Bearer {self.token}"})
adapter = HTTPAdapter(
max_retries=Retry(
total=3,
backoff_factor=5,
allowed_methods=["HEAD", "GET"],
status_forcelist=[429, 500, 502, 503, 504],
)
)
self.mount("https://", adapter)
self.mount("http://", adapter)

def refresh_session(self) -> None:
"""
Refreshes the session token by calling the refresh endpoint and updates the authentication token
"""
response = self.request("POST", "/api/token/refresh/", json={"refresh": self._refresh_token})
self.token = response.json()["access"]
self.headers.update({"Authorization": f"Bearer {self.token}"})

def raise_if_error(self, response: requests.Response) -> None:
"""
Method to raise an exception if an error occurs during the request
We raise a custom error if a JSON message is provided with an error
:param response: the response object returned by the request
"""
if response.status_code == 401 and self._refresh_token:
self.refresh_session()
return
if response.status_code >= 300 and "json" in response.headers.get("content-type", ""):
raise IASOError(f"{response.json()}")
response.raise_for_status()

@staticmethod
def decode_token_expiry(token: str) -> Union[datetime, None]:
"""
Decodes base64 encoded JWT token and returns expiry time from 'exp' field of the JWT token
:param token: JWT token
:return: Expiry datetime or None
Examples:
>>> decode_token_expiry(token = "eyJhbGciOiJIUzI1NiJ9.eyJSb2xlIjoiQWRtaW4iLCJJc3N1ZXIiOiJJc3N1ZXIiLCJVc2VybmFt\\
ZSI6IkphdmFJblVzZSIsImV4cCI6MTcxNzY5MDEwNCwiaWF0IjoxNzE3NzYwMTA0fQ._pXcqDw0QgvznvNuhVPwYyIms3H5imH-q6A7lIQJjYQ")
"""
decoded_token = jwt.decode(token, options={"verify_signature": False})
exp_timestamp = decoded_token.get("exp")
if exp_timestamp:
return datetime.fromtimestamp(exp_timestamp, timezone.utc)
return None
126 changes: 126 additions & 0 deletions openhexa/toolbox/iaso/iaso.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import io
import typing

from openhexa.toolbox.iaso.api_client import ApiClient
import polars as pl


class IASO:
"""
The IASO toolbox provides an interface to interact with the IASO.
"""

def __init__(self, server_url: str, username: str, password: str) -> None:
"""
Initializes the IASO toolbox.
:param server_url: IASO server URL
:param username: IASO instance username
:param password: IASO instance password
Examples:
>>> from openhexa.toolbox.iaso import IASO
>>> iaso = IASO(server_url="http://iaso-staging.bluesquare.org",
>>> username="user",
>>> password="pass")
"""
self.api_client = ApiClient(server_url, username, password)

def get_projects(self, page: int = 0, limit: int = 10, **kwargs) -> dict:
"""
Fetches projects list from IASO. Method is paginated by default. Pagination can be modified and additional
arguments can be passed as key value parameters.
Examples:
>>> from openhexa.toolbox.iaso import IASO
>>> iaso = IASO(client=ApiClient(url="http://iaso-staging.bluesquare.org",
>>> username="user",
>>> password="pass"))
>>> iaso.get_projects(page=1, limit=1, id=1)
"""

params = kwargs
params.update({"page": page, "limit": limit})
response = self.api_client.get("/api/projects", params=params)
return response.json().get("projects")

def get_org_units(self, page: int = 0, limit: int = 10, **kwargs) -> dict:
"""
Fetches org units from IASO. Method is paginated by default. Pagination can be modified and additional
arguments can be passed as key value parameters.
Examples:
>>> from openhexa.toolbox.iaso import IASO
>>> iaso = IASO(client=ApiClient(url="http://iaso-staging.bluesquare.org",
>>> username="user",
>>> password="pass"))
>>> projects = iaso.get_org_units(page=1, limit=1, id=1)
"""
params = kwargs
params.update({"page": page, "limit": limit})
response = self.api_client.get("/api/orgunits", params=params)
return response.json().get("orgUnits")

def get_form_instances(
self,
page: int = 0,
limit: int = 10,
as_dataframe: bool = False,
dataframe_columns: typing.List[str] = None,
**kwargs,
) -> typing.Union[dict, pl.DataFrame]:
"""
Fetches form instances from IASO filtered by form id. Method is paginated by default.
Pagination can be modified and additional arguments can be passed as key value parameters.
There is a possiblity to fetch forms as DataFrames.
Params:
:param page: The page number of the form instance.
:param limit: The maximum number of form instances.
:param as_dataframe: If true, will return a DataFrame containing form instances.
:param dataframe_columns: The column names of the form instances.
:param kwargs: additonal arguments passed to the /forms endpoint as URL parameters.
Examples:
>>> from openhexa.toolbox.iaso import IASO
>>> iaso = IASO(url="http://iaso-staging.bluesquare.org", username="user", password="pass")
>>> form_dataframes = iaso.get_form_instances(page=1, limit=1, as_dataframe=True,
>>> dataframe_columns=["Date de création","Date de modification","Org unit"], ids=276)
"""

params = kwargs
params.update({"page": page, "limit": limit})
if as_dataframe:
params.update({"csv": "true"})
response = self.api_client.get("/api/instances", params=params)
forms = pl.read_csv(io.StringIO(response.content.decode("utf-8")))[dataframe_columns]
return forms
response = self.api_client.get("/api/instances/", params=kwargs)
forms = response.json().get("instances")
return forms

def get_forms(
self, org_units: typing.List[int], projects: typing.List[int], page: int = 0, limit: int = 10, **kwargs
) -> dict:
"""
Fetches forms from IASO. Method is paginated by default.
Pagination can be modified and additional arguments can be passed as key value parameters.
Params:
:param org_units: A required list of organization units IDs.
:param projects: A required list of project IDs.
:param page: The page number of the form.
:param limit: The maximum number of form.
:param kwargs: additonal arguments passed to the /forms endpoint as URL parameters.
Examples:
>>> from openhexa.toolbox.iaso import IASO
>>> iaso = IASO(url="http://iaso-staging.bluesquare.org",username="user",password="pass")
>>> forms_by_orgunits_and_projects = iaso.get_forms(page=1, limit=1, org_units=[300], projects=[23])
"""

if org_units is [] or projects is []:
raise ValueError("Values for org_units and projects cannot be empty lists")
params = kwargs
params.update({"page": page, "limit": limit, "org_units": org_units, "projects": projects})
response = self.api_client.post("/api/forms", data=params)
return response.json().get("forms")
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ dependencies = [
"geopandas",
"polars",
"diskcache",
"stringcase",
"pyjwt"
]

[project.optional-dependencies]
Expand Down
Loading

0 comments on commit a46e67e

Please sign in to comment.