Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 56 additions & 47 deletions adalflow/adalflow/utils/file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import logging
from typing import Mapping, Any, Optional, List, Dict


from adalflow.utils.serialization import to_dict, serialize, _deserialize_object_hook
from adalflow.utils.file_operations import get_file_operations

log = logging.getLogger(__name__)

Expand All @@ -22,23 +22,19 @@ def save_json(obj: Mapping[str, Any], f: str = "task.json") -> None:
f (str, optional): The file name. Defaults to "task".
"""
os.makedirs(os.path.dirname(f) or ".", exist_ok=True)
try:

def _write_json():
with open(f, "w") as file:
serialized_obj = serialize(obj)
file.write(serialized_obj)
except IOError as e:

try:
get_file_operations().execute_write(f, _write_json)
except Exception as e:
log.error(f"Failed to save object to JSON file {f}. Error: {e}")
raise IOError(f"Error saving object to JSON file {f}: {e}")


# def standard_save_json(obj: Mapping[str, Any], f: str = "task.json") -> None:
# os.makedirs(os.path.dirname(f) or ".", exist_ok=True)
# try:
# with open(f, "w") as file:
# json.dump(obj, file, indent=4)
# except IOError as e:
# raise IOError(f"Error saving object to JSON file {f}: {e}")


def save_csv(
obj: List[Dict[str, Any]], f: str = "task.csv", fieldnames: List[str] = None
) -> None:
Expand All @@ -51,12 +47,14 @@ def save_csv(
import csv

os.makedirs(os.path.dirname(f) or ".", exist_ok=True)
try:

def _write_csv():
with open(f, "w", newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames or obj[0].keys())
fieldnames_to_use = fieldnames or obj[0].keys()
writer = csv.DictWriter(csvfile, fieldnames=fieldnames_to_use)
writer.writeheader()
for row in obj:
filtered_row = {k: v for k, v in row.items() if k in fieldnames}
filtered_row = {k: v for k, v in row.items() if k in fieldnames_to_use}
# use json.dumps to serialize the object
for k, v in filtered_row.items():
if (
Expand All @@ -67,7 +65,11 @@ def save_csv(
):
filtered_row[k] = json.dumps(v)
writer.writerow(filtered_row)
except IOError as e:

try:
get_file_operations().execute_write(f, _write_csv)
except Exception as e:
log.error(f"Failed to save object to CSV file {f}. Error: {e}")
raise IOError(f"Error saving object to CSV file {f}: {e}")


Expand All @@ -79,11 +81,16 @@ def save_pickle(obj: Mapping[str, Any], f: str = "task.pickle") -> None:
f (str, optional): The file name. Defaults to "task".
"""
os.makedirs(os.path.dirname(f) or ".", exist_ok=True)
try:

def _write_pickle():
with open(f, "wb") as file:
pickle.dump(obj, file)

try:
get_file_operations().execute_write(f, _write_pickle)
except Exception as e:
raise Exception(f"Error saving object to pickle file {f}: {e}")
log.error(f"Failed to save object to pickle file {f}. Error: {e}")
raise IOError(f"Error saving object to pickle file {f}: {e}")


def save(obj: Mapping[str, Any], f: str = "task") -> None:
Expand All @@ -101,22 +108,6 @@ def save(obj: Mapping[str, Any], f: str = "task") -> None:
raise Exception(f"Error saving object to json and pickle files: {e}")


# def load_json(f: str = "task.json") -> Optional[Mapping[str, Any]]:
# r"""Load the object from a json file.

# Args:
# f (str, optional): The file name. Defaults to "task".
# """
# if not os.path.exists(f):
# log.warning(f"File {f} does not exist.")
# return None
# try:
# with open(f, "r") as file:
# return json.load(file)
# except Exception as e:
# raise Exception(f"Error loading object from JSON file {f}: {e}")


def load_json(f: str) -> Any:
"""Customized Load a JSON file and deserialize it.

Expand All @@ -129,10 +120,12 @@ def load_json(f: str) -> Any:
if not os.path.exists(f):
raise FileNotFoundError(f"JSON file not found: {f}")

try:
def _read_json():
with open(f, "r") as file:
data = json.load(file, object_hook=_deserialize_object_hook)
return data
return json.load(file, object_hook=_deserialize_object_hook)

try:
return get_file_operations().execute_read(f, _read_json)
except json.JSONDecodeError as e:
raise ValueError(f"Error decoding JSON file {f}: {e}")
except Exception as e:
Expand All @@ -150,10 +143,12 @@ def load_standard_json(f: str) -> Any:
if not os.path.exists(f):
raise FileNotFoundError(f"JSON file not found: {f}")

try:
def _read_standard_json():
with open(f, "r") as file:
data = json.load(file)
return data
return json.load(file)

try:
return get_file_operations().execute_read(f, _read_standard_json)
except json.JSONDecodeError as e:
raise ValueError(f"Error decoding JSON file {f}: {e}")
except Exception as e:
Expand All @@ -169,11 +164,15 @@ def load_pickle(f: str = "task.pickle") -> Optional[Mapping[str, Any]]:
if not os.path.exists(f):
log.warning(f"File {f} does not exist.")
return None
try:

def _read_pickle():
with open(f, "rb") as file:
return pickle.load(file)

try:
return get_file_operations().execute_read(f, _read_pickle)
except Exception as e:
raise Exception(f"Error loading object from pickle file {f}: {e}")
raise IOError(f"Error loading object from pickle file {f}: {e}")


def load(f: str = "task") -> Optional[Mapping[str, Any]]:
Expand Down Expand Up @@ -204,9 +203,12 @@ def load_jsonl(f: str = None) -> List[Dict[str, Any]]:
log.warning(f"File {f} does not exist.")
return []

try:
def _read_jsonl():
with jsonlines.open(f) as reader:
return list(reader)

try:
return get_file_operations().execute_read(f, _read_jsonl)
except Exception as e:
log.error(f"Error loading jsonl file {f}: {e}")
return []
Expand All @@ -226,12 +228,15 @@ def append_to_jsonl(f: str, data: Dict[str, Any]) -> None:
except ImportError:
raise ImportError("Please install jsonlines to use this function.")
os.makedirs(os.path.dirname(f) or ".", exist_ok=True)
try:

def _append_jsonl():
with jsonlines.open(f, mode="a") as writer:
# call serialize to serialize the object
serialized_data = to_dict(data)
writer.write(serialized_data)
# writer.write(data)

try:
get_file_operations().execute_write(f, _append_jsonl)
except Exception as e:
log.error(f"Error appending data to jsonl file {f}: {e}")

Expand All @@ -248,9 +253,13 @@ def write_list_to_jsonl(f: str, data: List[Dict[str, Any]]) -> None:
except ImportError:
raise ImportError("Please install jsonlines to use this function.")
os.makedirs(os.path.dirname(f) or ".", exist_ok=True)
try:

def _write_jsonl():
with jsonlines.open(f, mode="w") as writer:
for d in data:
writer.write(d)

try:
get_file_operations().execute_write(f, _write_jsonl)
except Exception as e:
log.error(f"Error writing data to jsonl file {f}: {e}")
log.error(f"Error writing data to jsonl file {f}: {e}")
Loading
Loading