Skip to content

Commit b138999

Browse files
authored
feat: add pokeapi source (#13)
* feat: add pokeapi source * fmt * feat: unnest for file destination * Update pokeapi_pokemon_to_json.example.yml
1 parent 9ade7bf commit b138999

File tree

4 files changed

+117
-2
lines changed

4 files changed

+117
-2
lines changed

bizon/connectors/destinations/file/src/destination.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from typing import Tuple
33

44
import polars as pl
5-
from loguru import logger
65

76
from bizon.common.models import SyncMetadata
87
from bizon.destination.destination import AbstractDestination
@@ -30,9 +29,17 @@ def write_records(self, df_destination_records: pl.DataFrame) -> Tuple[bool, str
3029
schema_keys = set([column.name for column in self.config.record_schema])
3130

3231
with open(self.config.filepath, "a") as f:
32+
3333
for value in df_destination_records["source_data"].str.json_decode().to_list():
34+
3435
assert set(value.keys()) == schema_keys, "Keys do not match the schema"
35-
f.write(f"{json.dumps(value)}\n")
36+
37+
# Unnest the source_data column
38+
row = {}
39+
for column in self.config.record_schema:
40+
row[column.name] = value[column.name]
41+
42+
f.write(f"{json.dumps(row)}\n")
3643

3744
else:
3845
df_destination_records.write_ndjson(self.config.filepath)
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
name: pokemon to json unnested
2+
3+
source:
4+
name: pokeapi
5+
stream: pokemon
6+
7+
destination:
8+
name: file
9+
config:
10+
filepath: pokemon.json
11+
buffer_flush_timeout: 2
12+
13+
unnest: true
14+
15+
record_schema:
16+
- name: name
17+
type: string
18+
- name: url
19+
type: string
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
name: pokemon to logger
2+
3+
source:
4+
name: pokeapi
5+
stream: pokemon
6+
7+
destination:
8+
name: logger
9+
config:
10+
buffer_flush_timeout: 2
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
from enum import Enum
2+
from typing import Any, List, Tuple
3+
4+
from requests.auth import AuthBase
5+
6+
from bizon.source.config import SourceConfig
7+
from bizon.source.models import SourceIteration, SourceRecord
8+
from bizon.source.source import AbstractSource
9+
10+
BASE_URL = "https://pokeapi.co/api/v2"
11+
12+
13+
# Define the streams that the source supports
14+
class PokeAPIStreams(str, Enum):
15+
POKEMON = "pokemon"
16+
BERRY = "berry"
17+
ITEM = "item"
18+
19+
20+
# Define the config class for the source
21+
class PokeAPISourceConfig(SourceConfig):
22+
stream: PokeAPIStreams
23+
24+
25+
class PeriscopeSource(AbstractSource):
26+
27+
def __init__(self, config: PokeAPISourceConfig):
28+
super().__init__(config)
29+
self.config: PokeAPISourceConfig = config
30+
31+
@property
32+
def url_entity(self) -> str:
33+
return f"{BASE_URL}/{self.config.stream}"
34+
35+
@staticmethod
36+
def streams() -> List[str]:
37+
return [item.value for item in PokeAPIStreams]
38+
39+
@staticmethod
40+
def get_config_class() -> AbstractSource:
41+
return PokeAPISourceConfig
42+
43+
def check_connection(self) -> Tuple[bool | Any | None]:
44+
# Make a request to the base URL to check if the connection is successful
45+
_ = self.session.get(self.url_entity)
46+
return True, None
47+
48+
def get_authenticator(self) -> AuthBase:
49+
# We return None because we don't need any authentication
50+
return None
51+
52+
def get_total_records_count(self) -> int | None:
53+
# Return the total number of records in the stream
54+
response = self.session.get(self.url_entity)
55+
return response.json().get("count")
56+
57+
def get_entity_list(self, pagination: dict = None) -> SourceIteration:
58+
# If pagination is provided, use the next URL to get the next set of records
59+
url = pagination.get("next") if pagination else self.url_entity
60+
response = self.session.get(url)
61+
62+
data = response.json()
63+
64+
return SourceIteration(
65+
next_pagination={"next": data.get("next")} if data.get("next") else {},
66+
records=[
67+
SourceRecord(
68+
id=record["name"],
69+
data=record,
70+
)
71+
for record in data["results"]
72+
],
73+
)
74+
75+
def get(self, pagination: dict = None) -> SourceIteration:
76+
if self.config.stream in [PokeAPIStreams.POKEMON, PokeAPIStreams.BERRY, PokeAPIStreams.ITEM]:
77+
return self.get_entity_list(pagination)
78+
79+
raise NotImplementedError(f"Stream {self.config.stream} not implemented for PokeAPI source")

0 commit comments

Comments
 (0)