Skip to content

Commit

Permalink
Mirroring auto-created/destroyed for EC2 instances (#45)
Browse files Browse the repository at this point in the history
* Set up an EventBridge Rule to listen for EC2 instance stops and starts
  and direct them to the EventListener Lambda which converts them into
  Create/Destroy ENI Mirroring Events

Signed-off-by: Chris Helma <chelma+github@amazon.com>
  • Loading branch information
chelma authored May 3, 2023
1 parent 549cfd9 commit 2d02685
Show file tree
Hide file tree
Showing 10 changed files with 731 additions and 55 deletions.
32 changes: 32 additions & 0 deletions cdk-lib/mirror-stacks/vpc-mirror-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,23 @@ export class VpcMirrorStack extends Stack {
]
})
);
listenerLambda.addToRolePolicy(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'ec2:DescribeInstances'
],
resources: ["*"]
})
);

// Make a human-readable log of the raw AWS Service events we're proccessing
const vpcLogGroup = new logs.LogGroup(this, 'LogGroup', {
logGroupName: `ArkimeInputEvents-${props.vpcId}`,
removalPolicy: RemovalPolicy.DESTROY // This is intended for debugging
});

// Capture Fargate stop/start events for processing
const fargateEventsRule = new events.Rule(this, 'RuleFargateEvents', {
eventBus: undefined, // We want to listen to the Account/Region's default bus
eventPattern: {
Expand All @@ -190,6 +201,27 @@ export class VpcMirrorStack extends Stack {
]
});

// Capture EC2 instance start/stop events. This should cover one-off instance creation, EC2 Autoscaling
// activities, and ECS-on-EC2. All three of those situations map to an ENI being created or destroyed when
// a concrete instance starts/stops, regardless of how many other steps/events are involved in the process.
//
// Unfortunately, this event does not give us the information we need to pre-screen it at the Rule level so
// we have to check if it applies to our VPC in our Lambda code.
const ec2EventsRule = new events.Rule(this, 'RuleEc2Events', {
eventBus: undefined, // We want to listen to the Account/Region's default bus
eventPattern: {
source: ["aws.ec2"],
detailType: ["EC2 Instance State-change Notification"],
detail: {
state: ["running", "shutting-down"]
}
},
targets: [
new targets.CloudWatchLogGroup(vpcLogGroup),
new targets.LambdaFunction(listenerLambda)
]
});

/**
* Configure the resources required for event-based mirroring configuration
*/
Expand Down
107 changes: 85 additions & 22 deletions cdk-lib/traffic-gen-sample/traffic-gen-stack.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import * as cdk from 'aws-cdk-lib';
import * as autoscaling from 'aws-cdk-lib/aws-autoscaling';
import * as ec2 from 'aws-cdk-lib/aws-ec2';
import * as ecs from 'aws-cdk-lib/aws-ecs';
import * as iam from 'aws-cdk-lib/aws-iam';
Expand All @@ -7,61 +8,123 @@ import * as logs from 'aws-cdk-lib/aws-logs';
import * as path from 'path'
import { Construct } from 'constructs';


export class TrafficGenStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);

// Stock VPC w/ a Public/Private subnet pair in 1 AZ along with NATGateways providing internet access to the
// private VPCs.
/**
* Set up our demo Traffic Generator's networking
*/
// This is a Stock VPC w/ a Public/Private subnet pair in 1 AZ along with NATGateways providing internet access
// to the private subnet.
const vpc = new ec2.Vpc(this, 'VPC', {maxAzs: 1});

// Set up VPC Flow Logs to enable visibility of the traffic mirroring on the user-side
const flowLogsGroup = new logs.LogGroup(this, 'FlowLogsLogGroup', {
logGroupName: `FlowLogs-${id}`,
removalPolicy: cdk.RemovalPolicy.DESTROY,
retention: logs.RetentionDays.TEN_YEARS,
});

new ec2.FlowLog(this, 'FlowLogs', {
resourceType: ec2.FlowLogResourceType.fromVpc(vpc),
destination: ec2.FlowLogDestination.toCloudWatchLogs(flowLogsGroup),
});

/**
* Set up some shared components.
*/
// Key to encrypt SSM traffic when using ECS Exec to shell into the container
const ksmEncryptionKey = new kms.Key(this, 'ECSClusterKey', {
const ssmKey = new kms.Key(this, 'SsmKey', {
enableKeyRotation: true,
});

// Create a Fargate service that runs a single instance of our traffic generation image
const cluster = new ecs.Cluster(this, 'Cluster', {
/**
* Create a Fargate service that runs our traffic generation image
*/
const fargateCluster = new ecs.Cluster(this, 'FargateCluster', {
vpc,
executeCommandConfiguration: { kmsKey: ksmEncryptionKey }
executeCommandConfiguration: { kmsKey: ssmKey }
});

const taskDefinition = new ecs.FargateTaskDefinition(this, 'TaskDef', {
const fargateTaskDef = new ecs.FargateTaskDefinition(this, 'TaskDef', {
memoryLimitMiB: 512,
cpu: 256,
});
taskDefinition.addToTaskRolePolicy(
fargateTaskDef.addToTaskRolePolicy(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ['kms:Decrypt'], // Required for ECS Exec & shelling into the container
resources: [ksmEncryptionKey.keyArn]
resources: [ssmKey.keyArn]
}),
);

const container = taskDefinition.addContainer('FargateContainer', {
const fargateContainer = fargateTaskDef.addContainer('FargateContainer', {
image: ecs.ContainerImage.fromAsset(path.resolve(__dirname, '..', '..', 'docker-traffic-gen')),
memoryLimitMiB: 512,
logging: new ecs.AwsLogDriver({ streamPrefix: 'DemoTrafficGen', mode: ecs.AwsLogDriverMode.NON_BLOCKING })
logging: new ecs.AwsLogDriver({ streamPrefix: 'DemoTrafficGenFargate', mode: ecs.AwsLogDriverMode.NON_BLOCKING })
});

const service = new ecs.FargateService(this, 'Service', {
cluster,
taskDefinition,
const fargateService = new ecs.FargateService(this, 'Service', {
cluster: fargateCluster,
taskDefinition: fargateTaskDef,
desiredCount: 1,
enableExecuteCommand: true
});

// Set up VPC Flow Logs to enable visibility of the traffic mirroring on the user-side
const flowLogsGroup = new logs.LogGroup(this, 'FlowLogsLogGroup', {
logGroupName: `FlowLogs-${id}`,
removalPolicy: cdk.RemovalPolicy.DESTROY,
retention: logs.RetentionDays.TEN_YEARS,
/**
* Create an ECS-on-EC2 Cluster that runs our traffic generation image
*/

//
const ecsAsg = new autoscaling.AutoScalingGroup(this, 'EcsASG', {
vpc: vpc,
instanceType: new ec2.InstanceType('t3.micro'), // Arbitrarily chosen
machineImage: ecs.EcsOptimizedImage.amazonLinux2(),
desiredCapacity: 3,
minCapacity: 3,
maxCapacity: 10 // Arbitrarily chosen
});

new ec2.FlowLog(this, 'FlowLogs', {
resourceType: ec2.FlowLogResourceType.fromVpc(vpc),
destination: ec2.FlowLogDestination.toCloudWatchLogs(flowLogsGroup),
const ecsCluster = new ecs.Cluster(this, 'EcsCluster', {
vpc: vpc,
executeCommandConfiguration: { kmsKey: ssmKey }
});

const ecsCapacityProvider = new ecs.AsgCapacityProvider(this, 'EcsCapacityProvider', {
autoScalingGroup: ecsAsg,
});
ecsCluster.addAsgCapacityProvider(ecsCapacityProvider);

const ecsTaskDef = new ecs.Ec2TaskDefinition(this, 'EcsTaskDef', {
networkMode: ecs.NetworkMode.BRIDGE,
});
ecsTaskDef.addToTaskRolePolicy(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ['kms:Decrypt'], // Required for ECS Exec & shelling into the container
resources: [ssmKey.keyArn]
}),
);

const ecsContainer = ecsTaskDef.addContainer('EcsContainer', {
image: ecs.ContainerImage.fromAsset(path.resolve(__dirname, '..', '..', 'docker-traffic-gen')),
logging: new ecs.AwsLogDriver({ streamPrefix: 'DemoTrafficGenEcs', mode: ecs.AwsLogDriverMode.NON_BLOCKING }),

// Because we're using the BRIDGE network type for our ECS Tasks, we can only place a single container
// on each of our t3.micro instances. We can't ask for all of their resources because ECS placement will
// fail, so we ask for a bit less than that.
cpu: 1536, // 1.5 vCPUs
memoryLimitMiB: 768, // 0.75 GiB
});

const ecsService = new ecs.Ec2Service(this, 'EcsService', {
cluster: ecsCluster,
taskDefinition: ecsTaskDef,
desiredCount: 1,
minHealthyPercent: 0,
enableExecuteCommand: true
});
}
}
53 changes: 43 additions & 10 deletions manage_arkime/aws_interactions/ec2_interactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,46 +38,79 @@ def get_subnets_of_vpc(vpc_id: str, aws_provider: AwsClientProvider) -> List[str

@dataclass
class NetworkInterface:
id: str
type: str
vpc_id: str
subnet_id: str
eni_id: str
eni_type: str

def to_dict(self):
return {
'vpc_id': self.vpc_id,
'subnet_id': self.subnet_id,
'eni_id': self.eni_id,
'eni_type': self.eni_type,
}

def get_enis_of_instance(instance_id: str, aws_provider: AwsClientProvider) -> List[NetworkInterface]:
ec2_client = aws_provider.get_ec2()
describe_instance_response = ec2_client.describe_instances(
InstanceIds=[instance_id]
)
instance_details = describe_instance_response["Reservations"][0]["Instances"][0]

network_interfaces = []
for eni in instance_details.get("NetworkInterfaces", []):
network_interfaces.append(
NetworkInterface(eni["VpcId"], eni["SubnetId"], eni["NetworkInterfaceId"], eni["InterfaceType"])
)

return network_interfaces

def get_enis_of_subnet(subnet_id: str, aws_provider: AwsClientProvider) -> List[NetworkInterface]:
ec2_client = aws_provider.get_ec2()
describe_eni_response = ec2_client.describe_network_interfaces(
Filters=[{"Name": "subnet-id", "Values": [subnet_id]}]
)
network_inferfaces = [NetworkInterface(eni["NetworkInterfaceId"], eni["InterfaceType"]) for eni in describe_eni_response.get("NetworkInterfaces", [])]
network_interfaces = []
for eni in describe_eni_response.get("NetworkInterfaces", []):
network_interfaces.append(
NetworkInterface(eni["VpcId"], eni["SubnetId"], eni["NetworkInterfaceId"], eni["InterfaceType"])
)

next_token = describe_eni_response.get("NextToken")
while next_token:
describe_eni_response = ec2_client.describe_network_interfaces(
Filters=[{"Name": "subnet-id", "Values": [subnet_id]}],
NextToken=next_token
)
next_interfaces = [NetworkInterface(eni["NetworkInterfaceId"], eni["InterfaceType"]) for eni in describe_eni_response.get("NetworkInterfaces", [])]
network_inferfaces.extend(next_interfaces)
next_interfaces = []
for eni in describe_eni_response.get("NetworkInterfaces", []):
next_interfaces.append(
NetworkInterface(eni["VpcId"], eni["SubnetId"], eni["NetworkInterfaceId"], eni["InterfaceType"])
)
network_interfaces.extend(next_interfaces)
next_token = describe_eni_response.get("NextToken")

return network_inferfaces
return network_interfaces

NON_MIRRORABLE_ENI_TYPES = ["gateway_load_balancer_endpoint", "nat_gateway"]

class NonMirrorableEniType(Exception):
def __init__(self, eni: NetworkInterface):
self.eni = eni
super().__init__(f"The ENI {eni.id} is of type {eni.type}, which is not mirrorable")
super().__init__(f"The ENI {eni.eni_id} is of type {eni.eni_type}, which is not mirrorable")

"""
Sets up a VPC Traffic Mirroring Session on a given ENI towards the specified Traffic Target using the specified
Traffic Filter and returns the Traffic Session ID.
"""
def mirror_eni(eni: NetworkInterface, traffic_target: str, traffic_filter: str, vpc_id: str, aws_provider: AwsClientProvider, virtual_network: int = 123) -> str:
if eni.type in NON_MIRRORABLE_ENI_TYPES:
if eni.eni_type in NON_MIRRORABLE_ENI_TYPES:
raise NonMirrorableEniType(eni)

ec2_client = aws_provider.get_ec2()
create_session_response = ec2_client.create_traffic_mirror_session(
NetworkInterfaceId=eni.id,
NetworkInterfaceId=eni.eni_id,
TrafficMirrorTargetId=traffic_target,
TrafficMirrorFilterId=traffic_filter,
SessionNumber=1,
Expand All @@ -88,7 +121,7 @@ def mirror_eni(eni: NetworkInterface, traffic_target: str, traffic_filter: str,
"Tags": [
{
"Key": "Name",
"Value": f"{vpc_id}-{eni.id}"
"Value": f"{vpc_id}-{eni.eni_id}"
},
]
},
Expand Down
4 changes: 2 additions & 2 deletions manage_arkime/commands/add_vpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ def _mirror_enis_in_subnet(event_bus_arn: str, cluster_name: str, vpc_id: str, s
# actually create the mirroring configuration, we should pre-screen (hasn't already been mirrored; right eni
# type).

logger.info(f"Initiating creation of mirroring session for ENI {eni.id}")
logger.info(f"Initiating creation of mirroring session for ENI {eni.eni_id}")

events.put_events(
[events.CreateEniMirrorEvent(cluster_name, vpc_id, subnet_id, eni.id, eni.type, traffic_filter_id, vni)],
[events.CreateEniMirrorEvent(cluster_name, vpc_id, subnet_id, eni.eni_id, eni.eni_type, traffic_filter_id, vni)],
event_bus_arn,
aws_provider
)
Loading

0 comments on commit 2d02685

Please sign in to comment.