@@ -1308,23 +1308,13 @@ func TestManager_FakeInput_GoodUnitToBad(t *testing.T) {
1308
1308
ai , _ := info .NewAgentInfo (ctx , true )
1309
1309
m , err := NewManager (newDebugLogger (t ), newDebugLogger (t ), "localhost:0" , ai , apmtest .DiscardTracer , newTestMonitoringMgr (), configuration .DefaultGRPCConfig ())
1310
1310
require .NoError (t , err )
1311
- errCh := make (chan error )
1311
+ runResultChan := make (chan error , 1 )
1312
1312
go func () {
1313
- err := m .Run (ctx )
1314
- if errors .Is (err , context .Canceled ) {
1315
- err = nil
1316
- }
1317
- errCh <- err
1313
+ runResultChan <- m .Run (ctx )
1318
1314
}()
1319
1315
1320
- waitCtx , waitCancel := context .WithTimeout (ctx , 1 * time .Second )
1321
- defer waitCancel ()
1322
- if err := waitForReady (waitCtx , m ); err != nil {
1323
- require .NoError (t , err )
1324
- }
1325
-
1326
1316
binaryPath := testBinary (t , "component" )
1327
- comp := component.Component {
1317
+ healthyComp := component.Component {
1328
1318
ID : "fake-default" ,
1329
1319
InputSpec : & component.InputRuntimeSpec {
1330
1320
InputType : "fake" ,
@@ -1355,105 +1345,103 @@ func TestManager_FakeInput_GoodUnitToBad(t *testing.T) {
1355
1345
},
1356
1346
},
1357
1347
}
1348
+ // unhealthyComp is a copy of healthyComp with an error inserted in the
1349
+ // second unit
1350
+ unhealthyComp := healthyComp
1351
+ unhealthyComp .Units = make ([]component.Unit , len (healthyComp .Units ))
1352
+ copy (unhealthyComp .Units , healthyComp .Units )
1353
+ unhealthyComp .Units [1 ] = component.Unit {
1354
+ ID : "good-input" ,
1355
+ Type : client .UnitTypeInput ,
1356
+ Err : errors .New ("hard-error for config" ),
1357
+ }
1358
+ goodUnitKey := ComponentUnitKey {UnitType : client .UnitTypeInput , UnitID : "good-input" }
1358
1359
1359
- subCtx , subCancel := context .WithCancel (context .Background ())
1360
- defer subCancel ()
1361
- subErrCh := make (chan error )
1362
- go func () {
1363
- unitGood := true
1364
-
1365
- sub := m .Subscribe (subCtx , "fake-default" )
1366
- for {
1367
- select {
1368
- case <- subCtx .Done ():
1369
- return
1370
- case state := <- sub .Ch ():
1371
- t .Logf ("component state changed: %+v" , state )
1372
- if state .State == client .UnitStateFailed {
1373
- subErrCh <- fmt .Errorf ("component failed: %s" , state .Message )
1374
- return
1375
- }
1376
-
1377
- unit , ok := state .Units [ComponentUnitKey {UnitType : client .UnitTypeInput , UnitID : "good-input" }]
1378
- if ! ok {
1379
- subErrCh <- errors .New ("unit missing: good-input" )
1380
- return
1381
- }
1382
- if unitGood {
1383
- switch unit .State {
1384
- case client .UnitStateFailed :
1385
- subErrCh <- fmt .Errorf ("unit failed: %s" , unit .Message )
1386
- return
1387
- case client .UnitStateHealthy :
1388
- // good unit it; now make it bad
1389
- t .Logf ("marking good-input as having a hard-error for config" )
1390
- updatedComp := comp
1391
- updatedComp .Units = make ([]component.Unit , len (comp .Units ))
1392
- copy (updatedComp .Units , comp .Units )
1393
- updatedComp .Units [1 ] = component.Unit {
1394
- ID : "good-input" ,
1395
- Type : client .UnitTypeInput ,
1396
- Err : errors .New ("hard-error for config" ),
1397
- }
1398
- unitGood = false
1399
- err := m .Update (component.Model {Components : []component.Component {updatedComp }})
1400
- if err != nil {
1401
- subErrCh <- err
1402
- }
1403
- case client .UnitStateStarting :
1404
- // acceptable
1405
- default :
1406
- // unknown state that should not have occurred
1407
- subErrCh <- fmt .Errorf ("unit reported unexpected state: %v" , unit .State )
1408
- }
1409
- } else {
1410
- switch unit .State {
1411
- case client .UnitStateFailed :
1412
- // went to failed; stop whole component
1413
- err := m .Update (component.Model {Components : []component.Component {}})
1414
- if err != nil {
1415
- subErrCh <- err
1416
- }
1417
- case client .UnitStateStopped :
1418
- // unit was stopped
1419
- subErrCh <- nil
1420
- default :
1421
- subErrCh <- fmt .Errorf (
1422
- "good-input unit should be either failed or stopped, but it's '%s'" ,
1423
- unit .State )
1424
- }
1425
- }
1360
+ // Wait for Manager to start up
1361
+ timedWaitForReady (t , m , 1 * time .Second )
1426
1362
1427
- }
1428
- }
1429
- }()
1363
+ sub := m .Subscribe (ctx , "fake-default" )
1430
1364
1431
- defer drainErrChan ( errCh )
1432
- defer drainErrChan ( subErrCh )
1365
+ endTimer := time . NewTimer ( 30 * time . Second )
1366
+ defer endTimer . Stop ( )
1433
1367
1434
- err = m .Update (component.Model {Components : []component.Component {comp }})
1368
+ err = m .Update (component.Model {Components : []component.Component {healthyComp }})
1435
1369
require .NoError (t , err )
1436
1370
1437
- endTimer := time .NewTimer (30 * time .Second )
1438
- defer endTimer .Stop ()
1371
+ // nextState tracks the stage of the test. We expect the sequence
1372
+ // Starting -> Healthy -> Failed -> Stopped.
1373
+ nextState := client .UnitStateHealthy
1374
+
1439
1375
LOOP:
1440
1376
for {
1377
+ var state ComponentState
1441
1378
select {
1442
1379
case <- endTimer .C :
1443
- t .Fatalf ("timed out after 30 seconds" )
1444
- case err := <- errCh :
1445
- require .NoError (t , err )
1446
- case err := <- subErrCh :
1447
- require .NoError (t , err )
1448
- break LOOP
1380
+ require .Fail (t , "timed out waiting for component state update" )
1381
+ case state = <- sub .Ch ():
1382
+ t .Logf ("component state changed: %+v" , state )
1383
+ }
1384
+
1385
+ require .NotEqual (t , client .UnitStateFailed , state .State , "component should not fail" )
1386
+ unit , ok := state .Units [goodUnitKey ]
1387
+ require .True (t , ok , "unit good-input must be present" )
1388
+
1389
+ if nextState == client .UnitStateHealthy {
1390
+ // Waiting for unit to become healthy, if it's still starting skip
1391
+ // to the next update
1392
+ if unit .State == client .UnitStateStarting {
1393
+ continue LOOP
1394
+ }
1395
+ if unit .State == client .UnitStateHealthy {
1396
+ // good unit is healthy; now make it bad
1397
+ t .Logf ("marking good-input as having a hard-error for config" )
1398
+ err := m .Update (component.Model {Components : []component.Component {unhealthyComp }})
1399
+ require .NoError (t , err , "Component model update should succeed" )
1400
+
1401
+ // We next expect to transition to Failed
1402
+ nextState = client .UnitStateFailed
1403
+ } else {
1404
+ // Unit should only be starting or healthy in this stage,
1405
+ // anything else is an error.
1406
+ require .FailNowf (t , "Incorrect state" , "Expected STARTING or HEALTHY, got %v" , unit .State )
1407
+ }
1408
+ } else if nextState == client .UnitStateFailed {
1409
+ // Waiting for unit to fail, if it's still healthy skip to the next
1410
+ // update
1411
+ if unit .State == client .UnitStateHealthy {
1412
+ continue LOOP
1413
+ }
1414
+ if unit .State == client .UnitStateFailed {
1415
+ // Reached the expected state, now send an empty component model
1416
+ // to stop everything.
1417
+ err := m .Update (component.Model {Components : []component.Component {}})
1418
+ require .NoError (t , err , "Component model update should succeed" )
1419
+ nextState = client .UnitStateStopped
1420
+ } else {
1421
+ // Unit should only be healthy or failed in this stage, anything
1422
+ // else is an error.
1423
+ require .FailNow (t , "Incorrect state" , "Expected HEALTHY or FAILED, got %v" , unit .State )
1424
+ }
1425
+ } else if nextState == client .UnitStateStopped {
1426
+ // Waiting for component to stop, if it's still Failed skip to
1427
+ // the next update
1428
+ if unit .State == client .UnitStateFailed {
1429
+ continue LOOP
1430
+ }
1431
+ if unit .State == client .UnitStateStopped {
1432
+ // Success, we've finished the whole sequence
1433
+ break LOOP
1434
+ } else {
1435
+ // Unit should only be failed or stopped in this stage, anything
1436
+ // else is an error.
1437
+ require .FailNowf (t , "Incorrect state" , "Expected FAILED or STOPPED, got %v" , unit .State )
1438
+ }
1449
1439
}
1450
1440
}
1451
1441
1452
- subCancel ()
1453
1442
cancel ()
1454
-
1455
- err = <- errCh
1456
- require .NoError (t , err )
1443
+ err = <- runResultChan
1444
+ require .Equal (t , context .Canceled , err , "Run should return with context canceled, got %v" , err .Error ())
1457
1445
}
1458
1446
1459
1447
func TestManager_FakeInput_NoDeadlock (t * testing.T ) {
@@ -3671,3 +3659,12 @@ func waitForReady(ctx context.Context, m *Manager) error {
3671
3659
}
3672
3660
return nil
3673
3661
}
3662
+
3663
+ func timedWaitForReady (t * testing.T , m * Manager , timeout time.Duration ) {
3664
+ ctx , cancel := context .WithTimeout (context .Background (), timeout )
3665
+ defer cancel ()
3666
+ err := waitForReady (ctx , m )
3667
+ if err != nil {
3668
+ require .FailNow (t , "timed out waiting for Manager to start" )
3669
+ }
3670
+ }
0 commit comments