Skip to content

Commit

Permalink
change: increase batch_size of stac items, and increase max_concurren…
Browse files Browse the repository at this point in the history
…t for more parallel requests to COG for indexer
  • Loading branch information
print-sid8 committed Jan 15, 2025
1 parent 2e6fa50 commit 4d99ad8
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 12 deletions.
13 changes: 7 additions & 6 deletions src/rasteret/stac/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(
name: Optional[str] = None,
cloud_provider: Optional[CloudProvider] = None,
cloud_config: Optional[CloudConfig] = None,
max_concurrent: int = 50,
max_concurrent: int = 100,
):
self.data_source = data_source
self.stac_api = stac_api
Expand All @@ -59,6 +59,7 @@ def __init__(
self.cloud_config = cloud_config
self.name = name
self.max_concurrent = max_concurrent
self.batch_size = 50

@property
def band_map(self) -> Dict[str, str]:
Expand Down Expand Up @@ -96,8 +97,7 @@ async def build_index(

# 2. Process in batches, adding COG metadata
processed_items = []
batch_size = 10
total_batches = (len(stac_items) + batch_size - 1) // batch_size
total_batches = (len(stac_items) + self.batch_size - 1) // self.batch_size

logger.info(
f"Processing {len(stac_items)} scenes (each scene has multiple bands)..."
Expand All @@ -107,15 +107,16 @@ async def build_index(
max_concurrent=self.max_concurrent,
cloud_provider=self.cloud_provider,
cloud_config=self.cloud_config,
batch_size=self.batch_size,
) as cog_parser:

for i in range(0, len(stac_items), batch_size):
batch = stac_items[i : i + batch_size]
for i in range(0, len(stac_items), self.batch_size):
batch = stac_items[i : i + self.batch_size]
batch_records = await self._process_batch(batch, cog_parser)
if batch_records:
processed_items.extend(batch_records)
logger.info(
f"Processed scene batch {(i//batch_size)+1}/{total_batches} yielding {len(batch_records)} band assets"
f"Processed scene batch {(i//self.batch_size)+1}/{total_batches} yielding {len(batch_records)} band assets"
)

total_assets = sum(len(item["assets"]) for item in stac_items)
Expand Down
15 changes: 9 additions & 6 deletions src/rasteret/stac/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ class AsyncCOGHeaderParser:

def __init__(
self,
max_concurrent: int = 50,
max_concurrent: int = 100,
batch_size: int = 50,
cache_ttl: int = 3600, # 1 hour
retry_attempts: int = 3,
cloud_provider: Optional[CloudProvider] = None,
Expand All @@ -90,6 +91,7 @@ def __init__(
self.retry_attempts = retry_attempts
self.cloud_provider = cloud_provider
self.cloud_config = cloud_config
self.batch_size = batch_size

# Connection optimization
self.connector = httpx.Limits(
Expand Down Expand Up @@ -129,19 +131,20 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.client.aclose()

async def process_cog_headers_batch(
self, urls: List[str], batch_size: int = 10
self,
urls: List[str],
) -> List[Optional[CogMetadata]]:
"""Process multiple URLs in parallel with smart batching."""

results = []
total = len(urls)

logger.info(
f"Processing {total} COG headers {'(single batch)' if total <= batch_size else f'in {(total + batch_size - 1) // batch_size} batches of {batch_size}'}"
f"Processing {total} COG headers {'(single batch)' if total <= self.batch_size else f'in {(total + self.batch_size - 1) // self.batch_size} batches of {self.batch_size}'}"
)

for i in range(0, total, batch_size):
batch = urls[i : min(i + batch_size, total)]
for i in range(0, total, self.batch_size):
batch = urls[i : min(i + self.batch_size, total)]
batch_start = time.time()

tasks = [self.parse_cog_header(url) for url in batch]
Expand All @@ -157,7 +160,7 @@ async def process_cog_headers_batch(
batch_time = time.time() - batch_start
remaining = total - (i + len(batch))
batch_msg = (
f"Processed batch {i//batch_size + 1}/{(total + batch_size - 1) // batch_size} "
f"Processed batch {i//self.batch_size + 1}/{(total + self.batch_size - 1) // self.batch_size} "
f"({len(batch)} {'header' if len(batch) == 1 else 'headers'}) "
f"in {batch_time:.2f}s. "
f"{'Completed!' if remaining == 0 else f'Remaining: {remaining}'}"
Expand Down

0 comments on commit 4d99ad8

Please sign in to comment.