Skip to content

Commit 67c0a9d

Browse files
committed
[Java] AutoStop only required on extend recording error for clean up.
1 parent b008423 commit 67c0a9d

File tree

1 file changed

+54
-71
lines changed

1 file changed

+54
-71
lines changed

aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java

+54-71
Original file line numberDiff line numberDiff line change
@@ -1372,72 +1372,57 @@ private void startRecordingSession(
13721372
final Image image,
13731373
final boolean autoStop)
13741374
{
1375-
final Subscription subscription = image.subscription();
1376-
try
1377-
{
1378-
final int sessionId = image.sessionId();
1379-
final int streamId = subscription.streamId();
1380-
final String sourceIdentity = image.sourceIdentity();
1381-
final int termBufferLength = image.termBufferLength();
1382-
final int mtuLength = image.mtuLength();
1383-
final int initialTermId = image.initialTermId();
1384-
final long startPosition = image.joinPosition();
1385-
final int segmentFileLength = max(ctx.segmentFileLength(), termBufferLength);
1386-
1387-
final long recordingId = catalog.addNewRecording(
1388-
startPosition,
1389-
cachedEpochClock.time(),
1390-
initialTermId,
1391-
segmentFileLength,
1392-
termBufferLength,
1393-
mtuLength,
1394-
sessionId,
1395-
streamId,
1396-
strippedChannel,
1397-
originalChannel,
1398-
sourceIdentity);
1399-
1400-
final Counter position = RecordingPos.allocate(
1401-
aeron, counterMetadataBuffer, recordingId, sessionId, streamId, strippedChannel, sourceIdentity);
1402-
position.setOrdered(startPosition);
1403-
1404-
final RecordingSession session = new RecordingSession(
1405-
correlationId,
1406-
recordingId,
1407-
startPosition,
1408-
segmentFileLength,
1409-
originalChannel,
1410-
recordingEventsProxy,
1411-
image,
1412-
position,
1413-
archiveDirChannel,
1414-
ctx,
1415-
controlSession,
1416-
ctx.recordChecksumBuffer(),
1417-
ctx.recordChecksum(),
1418-
autoStop);
1419-
1420-
recordingSessionByIdMap.put(recordingId, session);
1421-
recorder.addSession(session);
1375+
final int sessionId = image.sessionId();
1376+
final int streamId = image.subscription().streamId();
1377+
final String sourceIdentity = image.sourceIdentity();
1378+
final int termBufferLength = image.termBufferLength();
1379+
final int mtuLength = image.mtuLength();
1380+
final int initialTermId = image.initialTermId();
1381+
final long startPosition = image.joinPosition();
1382+
final int segmentFileLength = max(ctx.segmentFileLength(), termBufferLength);
1383+
1384+
final long recordingId = catalog.addNewRecording(
1385+
startPosition,
1386+
cachedEpochClock.time(),
1387+
initialTermId,
1388+
segmentFileLength,
1389+
termBufferLength,
1390+
mtuLength,
1391+
sessionId,
1392+
streamId,
1393+
strippedChannel,
1394+
originalChannel,
1395+
sourceIdentity);
1396+
1397+
final Counter position = RecordingPos.allocate(
1398+
aeron, counterMetadataBuffer, recordingId, sessionId, streamId, strippedChannel, sourceIdentity);
1399+
position.setOrdered(startPosition);
1400+
1401+
final RecordingSession session = new RecordingSession(
1402+
correlationId,
1403+
recordingId,
1404+
startPosition,
1405+
segmentFileLength,
1406+
originalChannel,
1407+
recordingEventsProxy,
1408+
image,
1409+
position,
1410+
archiveDirChannel,
1411+
ctx,
1412+
controlSession,
1413+
ctx.recordChecksumBuffer(),
1414+
ctx.recordChecksum(),
1415+
autoStop);
14221416

1423-
controlSession.attemptSignal(
1424-
correlationId,
1425-
recordingId,
1426-
subscription.registrationId(),
1427-
image.joinPosition(),
1428-
RecordingSignal.START);
1417+
recordingSessionByIdMap.put(recordingId, session);
1418+
recorder.addSession(session);
14291419

1430-
}
1431-
catch (final Exception ex)
1432-
{
1433-
if (autoStop)
1434-
{
1435-
removeRecordingSubscription(subscription.registrationId());
1436-
CloseHelper.close(errorHandler, subscription);
1437-
}
1438-
1439-
throw ex;
1440-
}
1420+
controlSession.attemptSignal(
1421+
correlationId,
1422+
recordingId,
1423+
image.subscription().registrationId(),
1424+
image.joinPosition(),
1425+
RecordingSignal.START);
14411426
}
14421427

14431428
private void extendRecordingSession(
@@ -1449,7 +1434,6 @@ private void extendRecordingSession(
14491434
final Image image,
14501435
final boolean autoStop)
14511436
{
1452-
final Subscription subscription = image.subscription();
14531437
try
14541438
{
14551439
if (recordingSessionByIdMap.containsKey(recordingId))
@@ -1467,7 +1451,7 @@ private void extendRecordingSession(
14671451
counterMetadataBuffer,
14681452
recordingId,
14691453
image.sessionId(),
1470-
subscription.streamId(),
1454+
image.subscription().streamId(),
14711455
strippedChannel,
14721456
image.sourceIdentity());
14731457

@@ -1496,19 +1480,18 @@ private void extendRecordingSession(
14961480
controlSession.attemptSignal(
14971481
correlationId,
14981482
recordingId,
1499-
subscription.registrationId(),
1483+
image.subscription().registrationId(),
15001484
image.joinPosition(),
15011485
RecordingSignal.EXTEND);
15021486
}
15031487
catch (final Exception ex)
15041488
{
1489+
errorHandler.onError(ex);
15051490
if (autoStop)
15061491
{
1507-
removeRecordingSubscription(subscription.registrationId());
1508-
CloseHelper.close(errorHandler, subscription);
1492+
removeRecordingSubscription(image.subscription().registrationId());
1493+
CloseHelper.close(errorHandler, image.subscription());
15091494
}
1510-
1511-
throw ex;
15121495
}
15131496
}
15141497

0 commit comments

Comments
 (0)