Skip to content

Commit

Permalink
Velo collect agent org (#256)
Browse files Browse the repository at this point in the history
* collect velo org per agent testing

* refactor: Collect Velociraptor agents per organization

This code change modifies the `sync_agents_velociraptor` function in the `sync.py` file. It now collects Velociraptor agents per organization, allowing for more efficient synchronization. This refactor improves the organization and management of Velociraptor agents in the system.

* refactor: add velo org to agents table

* add velo org when collecting agents

* refactor: Add velociraptor_org field to BaseBody model

This code change adds the `velociraptor_org` field to the `BaseBody` model in the `artifacts.py` file. The `velociraptor_org` field allows for specifying the organization of the client when collecting artifacts. This enhancement improves the flexibility and accuracy of artifact collection in the system.

* refactor: Extract filename from process_name in SocfortressThreatIntelRequest

This code change adds a validator to the `process_name` field in the `SocfortressThreatIntelRequest` model. The validator extracts the filename from the `process_name` using the `os.path.basename` function. This enhancement improves the consistency and reliability of the filename extraction process in the Socfortress threat intelligence module.

* precommit fixes
  • Loading branch information
taylorwalton authored Jun 27, 2024
1 parent ba63008 commit 17a0656
Show file tree
Hide file tree
Showing 11 changed files with 259 additions and 59 deletions.
32 changes: 32 additions & 0 deletions backend/alembic/versions/fed7739bd07c_add_velo_org_to_agents.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Add Velo Org to Agents
Revision ID: fed7739bd07c
Revises: 39c3aaec0084
Create Date: 2024-06-27 09:22:59.354696
"""
from typing import Sequence
from typing import Union

import sqlalchemy as sa
import sqlmodel.sql.sqltypes

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "fed7739bd07c"
down_revision: Union[str, None] = "39c3aaec0084"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column("agents", sa.Column("velociraptor_org", sqlmodel.sql.sqltypes.AutoString(length=256), nullable=True))
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("agents", "velociraptor_org")
# ### end Alembic commands ###
120 changes: 71 additions & 49 deletions backend/app/agents/services/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from app.agents.schema.agents import SyncedWazuhAgent
from app.agents.velociraptor.schema.agents import VelociraptorAgent
from app.agents.velociraptor.schema.agents import VelociraptorClients
from app.agents.velociraptor.schema.agents import VelociraptorOrganizations
from app.agents.wazuh.schema.agents import WazuhAgent
from app.agents.wazuh.schema.agents import WazuhAgentsList
from app.connectors.models import Connectors
Expand Down Expand Up @@ -40,7 +41,7 @@ async def fetch_wazuh_agents() -> WazuhAgentsList:
)


async def fetch_velociraptor_clients() -> VelociraptorClients:
async def fetch_velociraptor_clients(org_id: str) -> VelociraptorClients:
"""
Fetches clients from Velociraptor service.
Expand All @@ -50,12 +51,29 @@ async def fetch_velociraptor_clients() -> VelociraptorClients:
Returns:
VelociraptorClientsList: The fetched clients.
"""
collected_velociraptor_agents = await velociraptor_services.collect_velociraptor_clients()
collected_velociraptor_agents = await velociraptor_services.collect_velociraptor_clients(org_id=org_id)
return VelociraptorClients(
clients=collected_velociraptor_agents,
)


async def fetch_velociraptor_organizations() -> VelociraptorOrganizations:
"""
Fetches organizations from Velociraptor service.
Args:
None
Returns:
VelociraptorOrgsList: The fetched orgs.
"""
collected_velociraptor_orgs = await velociraptor_services.collect_velociraptor_organizations()
logger.info(f"Collected Velociraptor Orgs: {collected_velociraptor_orgs}")
return VelociraptorOrganizations(
organizations=collected_velociraptor_orgs,
)


async def fetch_velociraptor_agent(agent_name: str) -> VelociraptorAgent:
"""
Fetches agent details from Velociraptor service.
Expand Down Expand Up @@ -298,53 +316,57 @@ async def sync_agents_velociraptor() -> SyncedAgentsResponse:
:rtype: SyncedAgentsResponse
"""
agents_added_list: List[VelociraptorAgent] = []

velociraptor_clients = await fetch_velociraptor_clients()
velociraptor_clients = velociraptor_clients.clients if hasattr(velociraptor_clients, "clients") else []

async with get_db_session() as session: # Create a new session here
existing_agents_query = select(Agents)
result = await session.execute(existing_agents_query)
existing_agents = result.scalars().all()

for agent in existing_agents:
logger.info(f"Collecting Velociraptor Agent for {agent.hostname}")

try:
# Build the velociraptor_agent where the hostname or `client_id` is that equal to the `agents`
velociraptor_agent = next(
(
client
for client in velociraptor_clients
if client.os_info.hostname == agent.hostname or client.client_id == agent.velociraptor_id
),
None,
)
# Convert Unix epoch timestamp to datetime
last_seen_at = datetime.fromtimestamp(
int(velociraptor_agent.last_seen_at) / 1e6,
) # Divide by 1e6 to convert from microseconds to seconds
# Convert datetime to ISO 8601 format without fractional seconds
last_seen_at_iso = last_seen_at.replace(tzinfo=timezone.utc).isoformat(timespec="seconds")
velociraptor_agent = VelociraptorAgent(
velociraptor_id=velociraptor_agent.client_id,
velociraptor_last_seen=last_seen_at_iso,
velociraptor_agent_version=velociraptor_agent.agent_information.version,
)

except Exception as e:
logger.error(
f"Failed to collect Velociraptor Agent for {agent.hostname}: {e}",
)
continue

if velociraptor_agent:
# Update the agent with the Velociraptor client's details
await update_agent_with_velociraptor_in_db(session, agent, velociraptor_agent)
agents_added_list.append(velociraptor_agent)

# Close the session
await session.close()
velo_orgs = await fetch_velociraptor_organizations()
logger.info(f"Collected Velociraptor Orgs: {velo_orgs}")
for org in velo_orgs.organizations:
velociraptor_clients = await fetch_velociraptor_clients(org_id=org.OrgId)
logger.info(f"Collected Velociraptor Clients: {velociraptor_clients}")
velociraptor_clients = velociraptor_clients.clients if hasattr(velociraptor_clients, "clients") else []

async with get_db_session() as session: # Create a new session here
existing_agents_query = select(Agents)
result = await session.execute(existing_agents_query)
existing_agents = result.scalars().all()

for agent in existing_agents:
logger.info(f"Collecting Velociraptor Agent for {agent.hostname}")

try:
# Build the velociraptor_agent where the hostname or `client_id` is that equal to the `agents`
velociraptor_agent = next(
(
client
for client in velociraptor_clients
if client.os_info.hostname == agent.hostname or client.client_id == agent.velociraptor_id
),
None,
)
# Convert Unix epoch timestamp to datetime
last_seen_at = datetime.fromtimestamp(
int(velociraptor_agent.last_seen_at) / 1e6,
) # Divide by 1e6 to convert from microseconds to seconds
# Convert datetime to ISO 8601 format without fractional seconds
last_seen_at_iso = last_seen_at.replace(tzinfo=timezone.utc).isoformat(timespec="seconds")
velociraptor_agent = VelociraptorAgent(
velociraptor_id=velociraptor_agent.client_id,
velociraptor_last_seen=last_seen_at_iso,
velociraptor_agent_version=velociraptor_agent.agent_information.version,
velociraptor_org=org.OrgId,
)

except Exception as e:
logger.error(
f"Failed to collect Velociraptor Agent for {agent.hostname}: {e}",
)
continue

if velociraptor_agent:
# Update the agent with the Velociraptor client's details
await update_agent_with_velociraptor_in_db(session, agent, velociraptor_agent)
agents_added_list.append(velociraptor_agent)

# Close the session
await session.close()

logger.info(f"Agents Added List: {agents_added_list}")
return SyncedAgentsResponse(
Expand Down
53 changes: 53 additions & 0 deletions backend/app/agents/velociraptor/schema/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class VelociraptorAgent(BaseModel):
client_id: Optional[str] = Field("n/a", alias="velociraptor_id")
client_last_seen: str = Field(..., alias="velociraptor_last_seen")
client_version: str = Field(..., alias="velociraptor_agent_version")
client_org: str = Field(..., alias="velociraptor_org")

@property
def client_last_seen_as_datetime(self):
Expand Down Expand Up @@ -53,3 +54,55 @@ class VelociraptorClient(BaseModel):

class VelociraptorClients(BaseModel):
clients: List[VelociraptorClient]


class Version(BaseModel):
name: str
version: str
commit: str
build_time: str
ci_build_url: str
compiler: str


class Installer(BaseModel):
service_name: str
install_path: str
service_description: Optional[str] = None


class LocalBuffer(BaseModel):
memory_size: int
disk_size: int
filename_linux: str
filename_windows: str
filename_darwin: str


class ClientConfig(BaseModel):
server_urls: List[str]
ca_certificate: str
nonce: str
writeback_darwin: str
writeback_linux: str
writeback_windows: str
tempdir_windows: str
max_poll: int
nanny_max_connection_delay: int
windows_installer: Installer
darwin_installer: Installer
version: Version
use_self_signed_ssl: bool
pinned_server_name: str
max_upload_size: int
local_buffer: LocalBuffer


class Organization(BaseModel):
Name: str
OrgId: str
_client_config: ClientConfig


class VelociraptorOrganizations(BaseModel):
organizations: List[Organization]
24 changes: 22 additions & 2 deletions backend/app/agents/velociraptor/services/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,38 @@ def create_query(query: str) -> str:
return query


async def collect_velociraptor_clients() -> list:
async def collect_velociraptor_clients(org_id: str) -> list:
"""
Collects all clients from Velociraptor.
Returns:
list: A list of all clients.
"""
velociraptor_service = await UniversalService.create("Velociraptor")
# query = create_query(
# "SELECT * FROM clients()",
# )
query = create_query(
"SELECT * FROM clients()",
f"SELECT * FROM query(org_id='{org_id}', query='SELECT * FROM clients()')",
)
flow = velociraptor_service.execute_query(query)
logger.info(f"Successfully ran artifact collection on {flow}")
return flow["results"]


async def collect_velociraptor_organizations() -> list:
"""
Collects all organizations from Velociraptor.
Returns:
list: A list of all organizations.
"""
velociraptor_service = await UniversalService.create("Velociraptor")
query = create_query(
"SELECT * FROM orgs()",
)
flow = velociraptor_service.execute_query(query)
logger.info(f"Successfully ran artifact collection on {flow}")
return flow["results"]


Expand Down
50 changes: 50 additions & 0 deletions backend/app/connectors/velociraptor/routes/artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,40 @@ async def get_velociraptor_id(session: AsyncSession, hostname: str) -> str:
return agent.velociraptor_id


async def get_velociraptor_org(session: AsyncSession, hostname: str) -> str:
"""
Retrieves the velociraptor_org associated with the given hostname.
Args:
session (AsyncSession): The database session.
hostname (str): The hostname of the agent.
Returns:
str: The velociraptor_org associated with the hostname.
Raises:
HTTPException: If the agent with the given hostname is not found or if the velociraptor_org is not available.
"""
logger.info(f"Getting velociraptor_org from hostname {hostname}")
result = await session.execute(select(Agents).filter(Agents.hostname == hostname))
agent = result.scalars().first()

if not agent:
raise HTTPException(
status_code=404,
detail=f"Agent with hostname {hostname} not found",
)

if agent.velociraptor_org is None:
raise HTTPException(
status_code=404,
detail=f"Velociraptor ORG for hostname {hostname} is not available",
)

logger.info(f"velociraptor_org for hostname {hostname} is {agent.velociraptor_org}")
return agent.velociraptor_org


async def update_agent_quarantine_status(
session: AsyncSession,
quarantine_body: QuarantineBody,
Expand Down Expand Up @@ -321,6 +355,11 @@ async def collect_artifact(
collect_artifact_body.hostname,
)

collect_artifact_body.velociraptor_org = await get_velociraptor_org(
session,
collect_artifact_body.hostname,
)

# Assuming run_artifact_collection is an async function and takes a session as a parameter
return await run_artifact_collection(collect_artifact_body)

Expand Down Expand Up @@ -358,6 +397,11 @@ async def run_command(
session,
run_command_body.hostname,
)

run_command_body.velociraptor_org = await get_velociraptor_org(
session,
run_command_body.hostname,
)
# Run the command
return await run_remote_command(run_command_body)

Expand Down Expand Up @@ -396,6 +440,12 @@ async def quarantine(
session,
quarantine_body.hostname,
)

quarantine_body.velociraptor_org = await get_velociraptor_org(
session,
quarantine_body.hostname,
)

# Quarantine the host
quarantine_response = await quarantine_host(quarantine_body)

Expand Down
1 change: 1 addition & 0 deletions backend/app/connectors/velociraptor/schema/artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class QuarantineArtifactsEnum(str, Enum):
class BaseBody(BaseModel):
hostname: str = Field(..., description="Name of the client")
velociraptor_id: Optional[str] = Field(None, description="Client ID of the client")
velociraptor_org: Optional[str] = Field(None, description="Organization of the client")


class CollectArtifactBody(BaseBody):
Expand Down
Loading

0 comments on commit 17a0656

Please sign in to comment.