Skip to content

Commit

Permalink
Improve atomic write
Browse files Browse the repository at this point in the history
  • Loading branch information
TimKoornstra committed Jul 9, 2024
1 parent 56c6759 commit 7d6747f
Showing 1 changed file with 94 additions and 55 deletions.
149 changes: 94 additions & 55 deletions src/api/batch_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import os
import sys
import shutil
from typing import List, Tuple
from typing import List, Tuple, Dict
import tempfile

# > Third-party dependencies
Expand Down Expand Up @@ -67,11 +67,12 @@ def batch_decoding_worker(predicted_queue: multiprocessing.Queue,
decoded_predictions = batch_decode(encoded_predictions, tokenizer)

logging.debug("Outputting predictions...")
outputted_predictions = output_predictions(decoded_predictions,
batch_groups,
batch_identifiers,
output_path,
batch_metadata)
outputted_predictions = save_prediction_outputs(
decoded_predictions,
batch_groups,
batch_identifiers,
output_path,
batch_metadata)
total_outputs += len(outputted_predictions)

for output in outputted_predictions:
Expand Down Expand Up @@ -144,26 +145,27 @@ def create_tokenizer(model_path: str) -> Tokenizer:
return tokenizer


def output_predictions(predictions: List[Tuple[float, str]],
groups: List[str],
identifiers: List[str],
output_path: str,
batch_metadata: List[dict]) -> List[str]:
def save_prediction_outputs(
prediction_data: List[Tuple[float, str]],
group_ids: List[str],
image_ids: List[str],
base_output_path: str,
image_metadata: List[Dict]
) -> List[str]:
"""
Generate output texts based on the predictions and save to files
atomically.
Generate output texts based on predictions and save to files atomically.
Parameters
----------
predictions: List[Tuple[float, str]]
prediction_data: List[Tuple[float, str]]
List of tuples containing confidence and predicted text for each image.
groups: List[str]
group_ids: List[str]
List of group IDs for each image.
identifiers: List[str]
List of identifiers for each image.
output_path: str
image_ids: List[str]
List of unique identifiers for each image.
base_output_path: str
Base path where prediction outputs should be saved.
batch_metadata: List[dict]
image_metadata: List[Dict]
List of metadata dictionaries for each image.
Returns
Expand All @@ -178,39 +180,76 @@ def output_predictions(predictions: List[Tuple[float, str]],
atomically.
- Logs messages regarding directory creation and saving.
"""
outputs = []
for i, (confidence, pred_text) in enumerate(predictions):
group_id = groups[i]
identifier = identifiers[i]
metadata = batch_metadata[i]
text = f"{identifier}\t{metadata}\t{confidence}\t{pred_text}"
outputs.append(text)

# Output the text to a file atomically
output_dir = os.path.join(output_path, group_id)
if not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
logging.debug("Created output directory: %s", output_dir)

final_path = os.path.join(output_dir, identifier + ".txt")

# Create a temporary file in the same directory
with tempfile.NamedTemporaryFile(mode='w',
dir=output_path,
delete=False,
encoding="utf-8") as temp_file:
temp_file.write(text + "\n")
temp_path = temp_file.name

# Atomically replace the destination file
try:
shutil.move(temp_path, final_path)
logging.debug(f"Atomically wrote file: {final_path}")
except Exception as e:
logging.error("Failed to atomically write file: %s. Error: %s",
final_path, e)
# Clean up the temporary file if the move failed
os.unlink(temp_path)
raise

return outputs
output_texts = []

# Create a temporary directory for intermediate files
with tempfile.TemporaryDirectory(prefix="prediction_outputs_") as temp_dir:
for prediction, group_id, image_id, metadata in zip(
prediction_data, group_ids, image_ids, image_metadata
):
confidence, predicted_text = prediction
output_text = (f"{image_id}\t{metadata}\t{confidence}"
f"\t{predicted_text}")
output_texts.append(output_text)

group_output_dir = os.path.join(base_output_path, group_id)
os.makedirs(group_output_dir, exist_ok=True)
logging.debug("Ensured output directory exists: %s",
group_output_dir)

output_file_path = os.path.join(
group_output_dir, f"{image_id}.txt")

try:
write_file_atomically(output_text, output_file_path, temp_dir)
logging.debug("Atomically wrote file: %s", output_file_path)
except IOError as e:
logging.error("Failed to write file %s. Error: %s",
output_file_path, e)
raise

return output_texts


def write_file_atomically(content: str, target_path: str, temp_dir: str) \
-> None:
"""
Write content to a file atomically, using a separate temporary directory.
Parameters
----------
content: str
The content to write to the file.
target_path: str
The path where the file should be written.
temp_dir: str
Path to the temporary directory for intermediate files.
Raises
------
IOError
If the file cannot be written.
"""
temp_file_path = None
try:
# Create a temporary file in the provided temporary directory
with tempfile.NamedTemporaryFile(mode='w', dir=temp_dir,
delete=False, encoding="utf-8") \
as temp_file:
temp_file.write(content + "\n")
temp_file_path = temp_file.name

# On POSIX systems, this is atomic. On Windows, it's the best we can
# do.
os.replace(temp_file_path, target_path)
except IOError as e:
if temp_file_path and os.path.exists(temp_file_path):
# Clean up the temporary file if it exists
os.unlink(temp_file_path)
raise IOError(
f"Failed to atomically write file: {target_path}. Error: {e}")
except Exception as e:
if temp_file_path and os.path.exists(temp_file_path):
# Clean up the temporary file if it exists
os.unlink(temp_file_path)
raise e # Re-raise the exception after cleanup

0 comments on commit 7d6747f

Please sign in to comment.