This snippet is meant to demonstrate how to build a simple data-stream processing pipeline in Python β reading data line-by-line from an input stream, transforming it, and writing it to an output stream.
However, itβs best understood as an educational example. Iβll explain:
- What it is trying to do (conceptually)
- How it works line by line
This function is an example of a stream processor:
reader β converter β writer
- reader: a file-like object you can iterate over (e.g., a file,
sys.stdin) - converter: a function that transforms each line (e.g., uppercase, strip whitespace, parse JSON)
- writer: a file-like object you can write to (e.g., a file,
sys.stdout)
It processes data line by line, which is memory-efficient and commonly used in:
- log processing
- ETL pipelines
- command-line tools
- streaming large files
def processor(reader, converter, writer) -> int:Intent:
- Read lines from
reader - Apply
converter(line)to each line - Write the result to
writer - Return how many lines were processed
if not callable(converter):
raise TypeError("converter must be callable")β Ensures the transformer is a function.
count = 0Keeps track of how many lines were processed.
for line in reader:Iterates over the input stream line by line.
writer.write(converter(line))- Passes the line through the converter
- Writes the transformed output
except Exception as e:
raise RuntimeError(f"processor failed on line {count + 1}") from eWraps any error with context (which line failed).
writer.flush()
return count- Forces output to be written immediately
- Returns the number of processed lines
import sys
def to_upper(line: str) -> str:
return line.upper()
processor(sys.stdin, to_upper, sys.stdout)This would:
- Read text from standard input
- Convert each line to uppercase
- Write it to standard output
π This code demonstrates a streaming data pipeline pattern in Python: read β transform β write, line by line, using file-like objects.
Build a Unix-style CLI filter that:
- Reads from a file or
stdin - Writes to a file or
stdout - Applies a chosen transformation
# uppercase stdin β stdout
python processor.py upper
# read file, write file
python processor.py strip input.txt output.txt
# pipeline usage
cat access.log | python processor.py redact_ip | sort#!/usr/bin/env python3
from __future__ import annotations
import sys
import argparse
from typing import Callable, TextIO
# -------------------------
# Core stream processor
# -------------------------
def processor(
reader: TextIO,
converter: Callable[[str], str],
writer: TextIO
) -> int:
count = 0
for line in reader:
try:
writer.write(converter(line))
except Exception as e:
raise RuntimeError(f"processor failed on line {count + 1}") from e
count += 1
writer.flush()
return count
# -------------------------
# Converters
# -------------------------
def upper(line: str) -> str:
return line.upper()
def lower(line: str) -> str:
return line.lower()
def strip(line: str) -> str:
return line.strip() + "\n"
def redact_ip(line: str) -> str:
import re
return re.sub(r"\b\d{1,3}(\.\d{1,3}){3}\b", "[REDACTED_IP]", line)
CONVERTERS = {
"upper": upper,
"lower": lower,
"strip": strip,
"redact_ip": redact_ip,
}
# -------------------------
# CLI
# -------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="Stream text processor (stdin β transform β stdout)"
)
parser.add_argument(
"converter",
choices=CONVERTERS.keys(),
help="Transformation to apply",
)
parser.add_argument("input", nargs="?", help="Input file (default: stdin)")
parser.add_argument("output", nargs="?", help="Output file (default: stdout)")
args = parser.parse_args()
reader = open(args.input) if args.input else sys.stdin
writer = open(args.output, "w") if args.output else sys.stdout
try:
count = processor(reader, CONVERTERS[args.converter], writer)
finally:
if args.input:
reader.close()
if args.output:
writer.close()
print(f"Processed {count} lines", file=sys.stderr)
if __name__ == "__main__":
main()This behaves like standard Unix tools (sed, awk, tr).
This pattern is extremely common in production systems.
cat nginx.log | python processor.py redact_ip > safe.log- Remove PII
- Normalize formats
- Filter errors
Used in:
- observability pipelines
- compliance tools
- security audits
python processor.py strip raw_data.txt cleaned_data.txt- Extract: file or stream
- Transform: normalize, validate
- Load: database import, CSV, JSON
This is the core idea behind tools like:
- Apache Beam
- Airflow tasks
- Kafka consumers
def validate(line: str) -> str:
if "," not in line:
raise ValueError("invalid CSV")
return lineFail fast with clear line numbers.
cat big.txt \
| python processor.py lower \
| grep error \
| sort \
| uniq -cSmall tools, chained together.
Because reader and writer are just file-like objects, you can test with:
from io import StringIO
inp = StringIO("a\nb\n")
out = StringIO()
processor(inp, upper, out)
assert out.getvalue() == "A\nB\n"No filesystem needed.
This is where architecture matters.
β Simple β File & stdin compatible β Memory efficient β Pull-based only β Blocking I/O
for line in reader:
writer.write(converter(line))- Files
- CLI tools
- Batch processing
- Logs
Instead of writing directly, you yield transformed data.
def transform(lines):
for line in lines:
yield line.upper()Usage:
for out_line in transform(sys.stdin):
sys.stdout.write(out_line)β Composable β Easy to chain β Testable
β Still blocking β Output control handled elsewhere
- Complex pipelines
- Multiple transformations
- Functional-style code
async def processor(reader, writer):
async for line in reader:
await writer.write(line.upper())β Non-blocking I/O β Scales to many streams β Network-friendly
β More complex β Overkill for files β Harder debugging
- Web servers
- WebSockets
- Kafka / Redis streams
- High-throughput pipelines
| Approach | Best Use Case | Complexity | Speed |
|---|---|---|---|
| File iterator | CLI tools | Low | High |
| Generator | Data pipelines | Medium | High |
| Async streams | Network I/O | High | Very high |
- My current code = Unix filter
- Generator version = functional pipeline
- Async version = event-driven streaming system
All three solve the same problem β flowing data β at different scales.