-
Notifications
You must be signed in to change notification settings - Fork 0
5.2 Stream API with nano‐second timeout precision
Mark Bednarczyk edited this page Oct 22, 2024
·
1 revision
This implementation enhances the StreamProcessor with high-precision timeout detection for missing or invalid stream segments. It provides nanosecond-level precision for timing measurement and supports fine-grained control over stream lifecycle management.
- Core Components
- Timeout Detection
- Configuration
- Usage Examples
- Advanced Features
- Performance Considerations
- Implementation Details
The primary component responsible for tracking segment gaps and timeouts:
public class StreamSegmentTracker {
private final NavigableMap<Long, StreamSegment> segments;
private final long timeoutNanos;
private volatile long expectedSequence;
private volatile long lastActivityNanos;
}
Key responsibilities:
- Maintains ordered sequence of stream segments
- Tracks timing information at nanosecond precision
- Detects gaps in sequence space
- Monitors inactivity periods
Represents missing segments in the stream:
public record SequenceGap(
long startSequence,
long length
) implements Comparable<SequenceGap>
Encapsulates timeout information:
public record StreamTimeoutEvent(
StreamKey key,
StreamMetadata metadata,
Set<SequenceGap> gaps,
long lastActivityTime
) {
public Duration getInactiveDuration() {
return Duration.ofNanos(System.nanoTime() - lastActivityTime);
}
}
-
Activity Tracking
private void updateActivity() { lastActivityNanos = System.nanoTime(); }
-
Timeout Checking
public boolean hasTimedOut() { long now = System.nanoTime(); return (now - lastActivityNanos) > timeoutNanos; }
-
Periodic Monitoring
timeoutExecutor.scheduleAtFixedRate( this::checkTimeouts, timeoutNanos, timeoutNanos, TimeUnit.NANOSECONDS );
The system maintains a sorted map of segments and identifies gaps:
public Set<SequenceGap> getGaps() {
Set<SequenceGap> gaps = new TreeSet<>();
if (segments.isEmpty()) return gaps;
long current = expectedSequence;
for (Map.Entry<Long, StreamSegment> entry : segments.entrySet()) {
if (entry.getKey() > current) {
gaps.add(new SequenceGap(current, entry.getKey() - current));
}
current = entry.getKey() + entry.getValue().getLength();
}
return gaps;
}
// Create with 1-second timeout
StreamProcessor processor = new StreamProcessor(
TimeUnit.SECONDS.toNanos(1)
);
// Create with custom timeout
long timeoutNanos = TimeUnit.MILLISECONDS.toNanos(500);
StreamProcessor processor = new StreamProcessor(timeoutNanos);
ThreadFactory factory = r -> {
Thread t = new Thread(r, "Stream-Timeout-Monitor");
t.setDaemon(true);
return t;
};
ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor(factory);
// Create processor
StreamProcessor processor = new StreamProcessor(
TimeUnit.SECONDS.toNanos(1)
);
// Add timeout handler
processor.addTimeoutListener(event -> {
System.out.printf(
"Stream %s timed out after %s%n",
event.key(),
event.getInactiveDuration()
);
// Log gap information
event.gaps().forEach(gap ->
System.out.printf(
"Gap at sequence %d, length %d%n",
gap.startSequence(),
gap.length()
)
);
});
// Process packets
processor.processPacket(packet);
processor.addTimeoutListener(event -> {
if (event.gaps().size() <= 3) {
// Request retransmission for small gaps
requestRetransmission(event.key(), event.gaps());
} else {
// Close stream for larger gaps
processor.closeStream(event.key());
}
});
public Stream buildPartial() {
return new StreamImpl(
packets,
ByteBuffer.wrap(streamData.toByteArray()),
buildMetadata(),
false // incomplete flag
);
}
public interface TimeoutStrategy {
boolean shouldTimeout(
StreamContext context,
Set<SequenceGap> gaps,
Duration inactivity
);
}
- Uses
ConcurrentSkipListMap
for efficient concurrent access - Implements weak references for packet storage
- Provides configurable buffer sizes
- Thread-safe segment tracking
- Non-blocking timeout detection
- Lock-free activity updates
- Uses
System.nanoTime()
for high-precision timing - Accounts for system clock resolution
- Handles timer wraparound
private void checkTimeouts() {
long now = System.nanoTime();
activeStreams.forEach((key, context) -> {
if (context.hasTimedOut()) {
handleTimeout(key, context);
}
});
}
private void processContiguousSegments() {
while (!segments.isEmpty()) {
Map.Entry<Long, StreamSegment> entry = segments.firstEntry();
if (entry.getKey() != expectedSequence) {
break;
}
StreamSegment segment = segments.remove(entry.getKey());
processSegment(segment);
expectedSequence = segment.getSequenceNumber() +
segment.getLength();
}
}
public void shutdown() {
timeoutExecutor.shutdown();
try {
if (!timeoutExecutor.awaitTermination(
timeoutNanos,
TimeUnit.NANOSECONDS)) {
timeoutExecutor.shutdownNow();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
timeoutExecutor.shutdownNow();
}
}
try {
StreamSegment segment = extractSegment(packet);
context.getSegmentTracker().addSegment(segment);
} catch (InvalidSegmentException e) {
handleInvalidSegment(key, packet, e);
}
private void handleTimeout(StreamKey key, StreamContext context) {
Set<SequenceGap> gaps = context.getSegmentTracker().getGaps();
StreamTimeoutEvent event = new StreamTimeoutEvent(
key,
context.getMetadata(),
gaps,
context.getLastActivityTime()
);
notifyTimeoutListeners(event);
if (shouldCloseStream(event)) {
closeStream(key);
}
}
Would you like me to:
- Add monitoring and metrics documentation?
- Expand the error handling section?
- Include integration examples with different network protocols?