77
-78
-79
-80
-81
+ | @bp.route("/agents/<agent_id>/delete", methods=["POST"])
-def delete_agent(agent_id: str) -> Any:
- """
- Endpoint to delete an agent.
- Args:
- agent_id (str): The ID of the agent to be deleted.
- Returns:
- json: A JSON response indicating whether the deletion was successful.
- """
- service = AgentService()
- result = service.delete_agent_db(agent_id=agent_id)
-
- universal_service = UniversalService()
- agent_service = WazuhManagerAgentService(universal_service)
- agent_service.delete_agent(agent_id=agent_id)
+93
+94
+95
+96
+97
| @bp.route("/agents/<agent_id>/delete", methods=["POST"])
+def delete_agent(agent_id: str) -> Any:
+ """
+ Endpoint to delete an agent.
+ Args:
+ agent_id (str): The ID of the agent to be deleted.
+ Returns:
+ json: A JSON response indicating whether the deletion was successful.
+ """
+ service = AgentService()
+ result = service.delete_agent_db(agent_id=agent_id)
- return result
+ universal_service = UniversalService()
+ agent_service = WazuhManagerAgentService(universal_service)
+ agent_service.delete_agent(agent_id=agent_id)
+
+ return result
|
@@ -1451,29 +1451,29 @@
Source code in app\routes\agents.py
- 23
-24
-25
-26
-27
+ | @bp.route("/agents/<agent_id>", methods=["GET"])
-def get_agent(agent_id: str) -> Any:
- """
- Endpoint to get the details of a specific agent.
- Args:
- agent_id (str): The ID of the agent to retrieve.
- Returns:
- json: A JSON response containing the details of the agent.
- """
- service = AgentService()
- agent = service.get_agent(agent_id=agent_id)
- return agent
+34
+35
+36
+37
+38
| @bp.route("/agents/<agent_id>", methods=["GET"])
+def get_agent(agent_id: str) -> Any:
+ """
+ Endpoint to get the details of a specific agent.
+ Args:
+ agent_id (str): The ID of the agent to retrieve.
+ Returns:
+ json: A JSON response containing the details of the agent.
+ """
+ service = AgentService()
+ agent = service.get_agent(agent_id=agent_id)
+ return agent
|
@@ -1546,11 +1546,7 @@
Source code in app\routes\agents.py
- 96
- 97
- 98
- 99
-100
+ | @bp.route("/agents/<agent_id>/vulnerabilities", methods=["GET"])
-def get_agent_vulnerabilities(agent_id: str) -> Any:
- """
- Endpoint to get the vulnerabilities of a specific agent.
- Args:
- agent_id (str): The ID of the agent whose vulnerabilities are to be fetched.
- Returns:
- json: A JSON response containing the vulnerabilities of the agent.
- """
- universal_service = UniversalService()
- vulnerability_service = VulnerabilityService(universal_service)
-
- agent_vulnerabilities = vulnerability_service.agent_vulnerabilities(agent_id=agent_id)
- return agent_vulnerabilities
+109
+110
+111
+112
+113
+114
+115
| @bp.route("/agents/<agent_id>/vulnerabilities", methods=["GET"])
+def get_agent_vulnerabilities(agent_id: str) -> Any:
+ """
+ Endpoint to get the vulnerabilities of a specific agent.
+ Args:
+ agent_id (str): The ID of the agent whose vulnerabilities are to be fetched.
+ Returns:
+ json: A JSON response containing the vulnerabilities of the agent.
+ """
+ universal_service = UniversalService()
+ vulnerability_service = VulnerabilityService(universal_service)
+
+ agent_vulnerabilities = vulnerability_service.agent_vulnerabilities(
+ agent_id=agent_id,
+ )
+ return agent_vulnerabilities
|
@@ -1617,25 +1621,25 @@
Source code in app\routes\agents.py
- 11
-12
-13
-14
-15
+ | @bp.route("/agents", methods=["GET"])
-def get_agents() -> Any:
- """
- Endpoint to get a list of all agents. It processes each agent and returns the results.
- Returns:
- json: A JSON response containing the list of all available agents along with their connection verification status.
- """
- service = AgentService()
- agents = service.get_all_agents()
- return agents
+20
+21
+22
+23
+24
| @bp.route("/agents", methods=["GET"])
+def get_agents() -> Any:
+ """
+ Endpoint to get a list of all agents. It processes each agent and returns the results.
+ Returns:
+ json: A JSON response containing the list of all available agents along with their connection verification status.
+ """
+ service = AgentService()
+ agents = service.get_all_agents()
+ return agents
|
@@ -1708,29 +1712,29 @@
Source code in app\routes\agents.py
- 37
-38
-39
-40
-41
+ | @bp.route("/agents/<agent_id>/critical", methods=["POST"])
-def mark_as_critical(agent_id: str) -> Any:
- """
- Endpoint to mark an agent as critical.
- Args:
- agent_id (str): The ID of the agent to mark as critical.
- Returns:
- json: A JSON response containing the updated agent information after being marked as critical.
- """
- service = AgentService()
- result = service.mark_agent_as_critical(agent_id=agent_id)
- return result
+48
+49
+50
+51
+52
| @bp.route("/agents/<agent_id>/critical", methods=["POST"])
+def mark_as_critical(agent_id: str) -> Any:
+ """
+ Endpoint to mark an agent as critical.
+ Args:
+ agent_id (str): The ID of the agent to mark as critical.
+ Returns:
+ json: A JSON response containing the updated agent information after being marked as critical.
+ """
+ service = AgentService()
+ result = service.mark_agent_as_critical(agent_id=agent_id)
+ return result
|
@@ -1775,25 +1779,25 @@
Source code in app\routes\agents.py
- 65
-66
-67
-68
-69
+ | @bp.route("/agents/sync", methods=["POST"])
-def sync_agents() -> Any:
- """
- Endpoint to synchronize all agents.
- Returns:
- json: A JSON response containing the updated information of all synchronized agents.
- """
- service = AgentSyncService()
- result = service.sync_agents()
- return jsonify(result)
+74
+75
+76
+77
+78
| @bp.route("/agents/sync", methods=["POST"])
+def sync_agents() -> Any:
+ """
+ Endpoint to synchronize all agents.
+ Returns:
+ json: A JSON response containing the updated information of all synchronized agents.
+ """
+ service = AgentSyncService()
+ result = service.sync_agents()
+ return jsonify(result)
|
@@ -1866,29 +1870,29 @@
Source code in app\routes\agents.py
- 51
-52
-53
-54
-55
+ | @bp.route("/agents/<agent_id>/noncritical", methods=["POST"])
-def unmark_agent_critical(agent_id: str) -> Any:
- """
- Endpoint to unmark an agent as critical.
- Args:
- agent_id (str): The ID of the agent to unmark as critical.
- Returns:
- json: A JSON response containing the updated agent information after being unmarked as critical.
- """
- service = AgentService()
- result = service.mark_agent_as_non_critical(agent_id=agent_id)
- return result
+62
+63
+64
+65
+66
| @bp.route("/agents/<agent_id>/noncritical", methods=["POST"])
+def unmark_agent_critical(agent_id: str) -> Any:
+ """
+ Endpoint to unmark an agent as critical.
+ Args:
+ agent_id (str): The ID of the agent to unmark as critical.
+ Returns:
+ json: A JSON response containing the updated agent information after being unmarked as critical.
+ """
+ service = AgentService()
+ result = service.mark_agent_as_non_critical(agent_id=agent_id)
+ return result
|
@@ -1941,11 +1945,7 @@
Source code in app\services\agents\agents.py
- 17
- 18
- 19
- 20
- 21
+ | class AgentService:
- """
- A service class that encapsulates the logic for managing agents.
- """
-
- def get_all_agents(self) -> List[Dict[str, Union[str, bool]]]:
- """
- Retrieves all agents from the database.
+163
+164
+165
+166
+167
| class AgentService:
+ """
+ A service class that encapsulates the logic for managing agents.
+ """
- Returns:
- List[dict]: A list of dictionaries where each dictionary represents the serialized data of an agent.
- """
- agents = db.session.query(AgentMetadata).all()
- return agent_metadatas_schema.dump(agents)
-
- def get_agent(self, agent_id: str) -> Dict[str, Union[str, bool]]:
- """
- Retrieves a specific agent from the database using its ID.
+ def get_all_agents(self) -> List[Dict[str, Union[str, bool]]]:
+ """
+ Retrieves all agents from the database.
+
+ Returns:
+ List[dict]: A list of dictionaries where each dictionary represents the serialized data of an agent.
+ """
+ agents = db.session.query(AgentMetadata).all()
+ return agent_metadatas_schema.dump(agents)
- Args:
- agent_id (str): The ID of the agent to retrieve.
-
- Returns:
- dict: A dictionary representing the serialized data of the agent if found, otherwise a message indicating
- that the agent was not found.
- """
- agent = db.session.query(AgentMetadata).filter_by(agent_id=agent_id).first()
- if agent is None:
- return {"message": f"Agent with ID {agent_id} not found"}
- return agent_metadata_schema.dump(agent)
-
- def mark_agent_as_critical(self, agent_id: str) -> Dict[str, Union[str, bool]]:
- """
- Marks a specific agent as critical.
+ def get_agent(self, agent_id: str) -> Dict[str, Union[str, bool]]:
+ """
+ Retrieves a specific agent from the database using its ID.
+
+ Args:
+ agent_id (str): The ID of the agent to retrieve.
+
+ Returns:
+ dict: A dictionary representing the serialized data of the agent if found, otherwise a message indicating
+ that the agent was not found.
+ """
+ agent = db.session.query(AgentMetadata).filter_by(agent_id=agent_id).first()
+ if agent is None:
+ return {"message": f"Agent with ID {agent_id} not found"}
+ return agent_metadata_schema.dump(agent)
- Args:
- agent_id (str): The ID of the agent to mark as critical.
-
- Returns:
- dict: A dictionary representing a success message if the operation was successful, otherwise an error
- message.
- """
- agent = db.session.query(AgentMetadata).filter_by(agent_id=agent_id).first()
-
- if agent is None:
- return {"message": f"Agent {agent_id} not found", "success": False}
-
- agent.mark_as_critical()
- agent_details = agent_metadata_schema.dump(agent)
- if agent_details["critical_asset"] is False:
- return {
- "message": f"Agent {agent_id} failed to mark agent as critical",
- "success": False,
- }
- return {"message": f"Agent {agent_id} marked as critical", "success": True}
-
- def mark_agent_as_non_critical(self, agent_id: str) -> Dict[str, Union[str, bool]]:
- """
- Marks a specific agent as non-critical.
+ def mark_agent_as_critical(self, agent_id: str) -> Dict[str, Union[str, bool]]:
+ """
+ Marks a specific agent as critical.
+
+ Args:
+ agent_id (str): The ID of the agent to mark as critical.
+
+ Returns:
+ dict: A dictionary representing a success message if the operation was successful, otherwise an error
+ message.
+ """
+ agent = db.session.query(AgentMetadata).filter_by(agent_id=agent_id).first()
+
+ if agent is None:
+ return {"message": f"Agent {agent_id} not found", "success": False}
+
+ agent.mark_as_critical()
+ agent_details = agent_metadata_schema.dump(agent)
+ if agent_details["critical_asset"] is False:
+ return {
+ "message": f"Agent {agent_id} failed to mark agent as critical",
+ "success": False,
+ }
+ return {"message": f"Agent {agent_id} marked as critical", "success": True}
- Args:
- agent_id (str): The ID of the agent to mark as non-critical.
-
- Returns:
- dict: A dictionary representing a success message if the operation was successful, otherwise an error
- message.
- """
- agent = db.session.query(AgentMetadata).filter_by(agent_id=agent_id).first()
-
- if agent is None:
- return {"message": f"Agent {agent_id} not found", "success": False}
-
- agent.mark_as_non_critical()
- agent_details = agent_metadata_schema.dump(agent)
- if agent_details["critical_asset"] is True:
- return {
- "message": f"Agent {agent_id} failed to mark agent as non-critical",
- "success": False,
- }
- return {"message": f"Agent {agent_id} marked as non-critical", "success": True}
-
- def create_agent(self, agent: Dict[str, str]) -> Optional[AgentMetadata]:
- """
- Creates a new agent in the database.
+ def mark_agent_as_non_critical(self, agent_id: str) -> Dict[str, Union[str, bool]]:
+ """
+ Marks a specific agent as non-critical.
+
+ Args:
+ agent_id (str): The ID of the agent to mark as non-critical.
+
+ Returns:
+ dict: A dictionary representing a success message if the operation was successful, otherwise an error
+ message.
+ """
+ agent = db.session.query(AgentMetadata).filter_by(agent_id=agent_id).first()
+
+ if agent is None:
+ return {"message": f"Agent {agent_id} not found", "success": False}
+
+ agent.mark_as_non_critical()
+ agent_details = agent_metadata_schema.dump(agent)
+ if agent_details["critical_asset"] is True:
+ return {
+ "message": f"Agent {agent_id} failed to mark agent as non-critical",
+ "success": False,
+ }
+ return {"message": f"Agent {agent_id} marked as non-critical", "success": True}
- Args:
- agent (dict): A dictionary containing the information of an agent.
-
- Returns:
- The agent object if the agent was successfully created, None otherwise.
- """
- try:
- agent_last_seen = datetime.strptime(
- agent["agent_last_seen"],
- "%Y-%m-%dT%H:%M:%S+00:00",
- ) # Convert to datetime
- except ValueError:
- logger.info(
- f"Invalid format for agent_last_seen: {agent['agent_last_seen']}. Fixing...",
- )
- agent_last_seen = datetime.strptime(
- "1970-01-01T00:00:00+00:00",
- "%Y-%m-%dT%H:%M:%S+00:00",
- ) # Use the epoch time as default
-
- agent_metadata = AgentMetadata(
- agent_id=agent["agent_id"],
- hostname=agent["agent_name"],
- ip_address=agent["agent_ip"],
- os=agent["agent_os"],
- last_seen=agent_last_seen, # Use the datetime object
- critical_asset=False,
- )
- logger.info(f"Agent metadata: {agent_metadata}")
-
- try:
- db.session.add(agent_metadata)
- db.session.commit()
- return agent_metadata
- except Exception as e:
- logger.error(f"Failed to create agent: {e}")
- return None
-
- def delete_agent_db(self, agent_id: str) -> Dict[str, Union[str, bool]]:
- """
- Deletes a specific agent from the database using its ID.
+ def create_agent(self, agent: Dict[str, str]) -> Optional[AgentMetadata]:
+ """
+ Creates a new agent in the database.
+
+ Args:
+ agent (dict): A dictionary containing the information of an agent.
+
+ Returns:
+ The agent object if the agent was successfully created, None otherwise.
+ """
+ try:
+ agent_last_seen = datetime.strptime(
+ agent["agent_last_seen"],
+ "%Y-%m-%dT%H:%M:%S+00:00",
+ ) # Convert to datetime
+ except ValueError:
+ logger.info(
+ f"Invalid format for agent_last_seen: {agent['agent_last_seen']}. Fixing...",
+ )
+ agent_last_seen = datetime.strptime(
+ "1970-01-01T00:00:00+00:00",
+ "%Y-%m-%dT%H:%M:%S+00:00",
+ ) # Use the epoch time as default
+
+ agent_metadata = AgentMetadata(
+ agent_id=agent["agent_id"],
+ hostname=agent["agent_name"],
+ ip_address=agent["agent_ip"],
+ os=agent["agent_os"],
+ last_seen=agent_last_seen, # Use the datetime object
+ critical_asset=False,
+ )
+ logger.info(f"Agent metadata: {agent_metadata}")
+
+ try:
+ db.session.add(agent_metadata)
+ db.session.commit()
+ return agent_metadata
+ except Exception as e:
+ logger.error(f"Failed to create agent: {e}")
+ return None
- Args:
- agent_id (str): The ID of the agent to delete.
-
- Returns:
- dict: A dictionary representing a success message if the operation was successful, otherwise an error
- message.
- """
- agent = db.session.query(AgentMetadata).filter_by(agent_id=agent_id).first()
- if agent is None:
- return {"message": f"Agent with ID {agent_id} not found", "success": False}
- try:
- db.session.delete(agent)
- db.session.commit()
- return {"message": f"Agent with ID {agent_id} deleted", "success": True}
- except Exception as e:
- logger.error(f"Failed to delete agent: {e}")
- return {
- "message": f"Failed to delete agent with ID {agent_id}",
- "success": False,
- }
+ def delete_agent_db(self, agent_id: str) -> Dict[str, Union[str, bool]]:
+ """
+ Deletes a specific agent from the database using its ID.
+
+ Args:
+ agent_id (str): The ID of the agent to delete.
+
+ Returns:
+ dict: A dictionary representing a success message if the operation was successful, otherwise an error
+ message.
+ """
+ agent = db.session.query(AgentMetadata).filter_by(agent_id=agent_id).first()
+ if agent is None:
+ return {"message": f"Agent with ID {agent_id} not found", "success": False}
+ try:
+ db.session.delete(agent)
+ db.session.commit()
+ return {"message": f"Agent with ID {agent_id} deleted", "success": True}
+ except Exception as e:
+ logger.error(f"Failed to delete agent: {e}")
+ return {
+ "message": f"Failed to delete agent with ID {agent_id}",
+ "success": False,
+ }
|
@@ -2315,11 +2319,7 @@
Source code in app\services\agents\agents.py
- 98
- 99
-100
-101
-102
+ | def create_agent(self, agent: Dict[str, str]) -> Optional[AgentMetadata]:
- """
- Creates a new agent in the database.
-
- Args:
- agent (dict): A dictionary containing the information of an agent.
-
- Returns:
- The agent object if the agent was successfully created, None otherwise.
- """
- try:
- agent_last_seen = datetime.strptime(
- agent["agent_last_seen"],
- "%Y-%m-%dT%H:%M:%S+00:00",
- ) # Convert to datetime
- except ValueError:
- logger.info(
- f"Invalid format for agent_last_seen: {agent['agent_last_seen']}. Fixing...",
- )
- agent_last_seen = datetime.strptime(
- "1970-01-01T00:00:00+00:00",
- "%Y-%m-%dT%H:%M:%S+00:00",
- ) # Use the epoch time as default
-
- agent_metadata = AgentMetadata(
- agent_id=agent["agent_id"],
- hostname=agent["agent_name"],
- ip_address=agent["agent_ip"],
- os=agent["agent_os"],
- last_seen=agent_last_seen, # Use the datetime object
- critical_asset=False,
- )
- logger.info(f"Agent metadata: {agent_metadata}")
-
- try:
- db.session.add(agent_metadata)
- db.session.commit()
- return agent_metadata
- except Exception as e:
- logger.error(f"Failed to create agent: {e}")
- return None
+138
+139
+140
+141
+142
| def create_agent(self, agent: Dict[str, str]) -> Optional[AgentMetadata]:
+ """
+ Creates a new agent in the database.
+
+ Args:
+ agent (dict): A dictionary containing the information of an agent.
+
+ Returns:
+ The agent object if the agent was successfully created, None otherwise.
+ """
+ try:
+ agent_last_seen = datetime.strptime(
+ agent["agent_last_seen"],
+ "%Y-%m-%dT%H:%M:%S+00:00",
+ ) # Convert to datetime
+ except ValueError:
+ logger.info(
+ f"Invalid format for agent_last_seen: {agent['agent_last_seen']}. Fixing...",
+ )
+ agent_last_seen = datetime.strptime(
+ "1970-01-01T00:00:00+00:00",
+ "%Y-%m-%dT%H:%M:%S+00:00",
+ ) # Use the epoch time as default
+
+ agent_metadata = AgentMetadata(
+ agent_id=agent["agent_id"],
+ hostname=agent["agent_name"],
+ ip_address=agent["agent_ip"],
+ os=agent["agent_os"],
+ last_seen=agent_last_seen, # Use the datetime object
+ critical_asset=False,
+ )
+ logger.info(f"Agent metadata: {agent_metadata}")
+
+ try:
+ db.session.add(agent_metadata)
+ db.session.commit()
+ return agent_metadata
+ except Exception as e:
+ logger.error(f"Failed to create agent: {e}")
+ return None
|
@@ -2478,11 +2482,7 @@
Source code in app\services\agents\agents.py
- 140
-141
-142
-143
-144
+ | def delete_agent_db(self, agent_id: str) -> Dict[str, Union[str, bool]]:
- """
- Deletes a specific agent from the database using its ID.
-
- Args:
- agent_id (str): The ID of the agent to delete.
-
- Returns:
- dict: A dictionary representing a success message if the operation was successful, otherwise an error
- message.
- """
- agent = db.session.query(AgentMetadata).filter_by(agent_id=agent_id).first()
- if agent is None:
- return {"message": f"Agent with ID {agent_id} not found", "success": False}
- try:
- db.session.delete(agent)
- db.session.commit()
- return {"message": f"Agent with ID {agent_id} deleted", "success": True}
- except Exception as e:
- logger.error(f"Failed to delete agent: {e}")
- return {
- "message": f"Failed to delete agent with ID {agent_id}",
- "success": False,
- }
+163
+164
+165
+166
+167
| def delete_agent_db(self, agent_id: str) -> Dict[str, Union[str, bool]]:
+ """
+ Deletes a specific agent from the database using its ID.
+
+ Args:
+ agent_id (str): The ID of the agent to delete.
+
+ Returns:
+ dict: A dictionary representing a success message if the operation was successful, otherwise an error
+ message.
+ """
+ agent = db.session.query(AgentMetadata).filter_by(agent_id=agent_id).first()
+ if agent is None:
+ return {"message": f"Agent with ID {agent_id} not found", "success": False}
+ try:
+ db.session.delete(agent)
+ db.session.commit()
+ return {"message": f"Agent with ID {agent_id} deleted", "success": True}
+ except Exception as e:
+ logger.error(f"Failed to delete agent: {e}")
+ return {
+ "message": f"Failed to delete agent with ID {agent_id}",
+ "success": False,
+ }
|
@@ -2607,11 +2611,7 @@
Source code in app\services\agents\agents.py
- 32
-33
-34
-35
-36
+ | def get_agent(self, agent_id: str) -> Dict[str, Union[str, bool]]:
- """
- Retrieves a specific agent from the database using its ID.
-
- Args:
- agent_id (str): The ID of the agent to retrieve.
-
- Returns:
- dict: A dictionary representing the serialized data of the agent if found, otherwise a message indicating
- that the agent was not found.
- """
- agent = db.session.query(AgentMetadata).filter_by(agent_id=agent_id).first()
- if agent is None:
- return {"message": f"Agent with ID {agent_id} not found"}
- return agent_metadata_schema.dump(agent)
+46
+47
+48
+49
+50
| def get_agent(self, agent_id: str) -> Dict[str, Union[str, bool]]:
+ """
+ Retrieves a specific agent from the database using its ID.
+
+ Args:
+ agent_id (str): The ID of the agent to retrieve.
+
+ Returns:
+ dict: A dictionary representing the serialized data of the agent if found, otherwise a message indicating
+ that the agent was not found.
+ """
+ agent = db.session.query(AgentMetadata).filter_by(agent_id=agent_id).first()
+ if agent is None:
+ return {"message": f"Agent with ID {agent_id} not found"}
+ return agent_metadata_schema.dump(agent)
|
@@ -2680,23 +2684,23 @@
Source code in app\services\agents\agents.py
- 22
-23
-24
-25
-26
+ | def get_all_agents(self) -> List[Dict[str, Union[str, bool]]]:
- """
- Retrieves all agents from the database.
-
- Returns:
- List[dict]: A list of dictionaries where each dictionary represents the serialized data of an agent.
- """
- agents = db.session.query(AgentMetadata).all()
- return agent_metadatas_schema.dump(agents)
+30
+31
+32
+33
+34
| def get_all_agents(self) -> List[Dict[str, Union[str, bool]]]:
+ """
+ Retrieves all agents from the database.
+
+ Returns:
+ List[dict]: A list of dictionaries where each dictionary represents the serialized data of an agent.
+ """
+ agents = db.session.query(AgentMetadata).all()
+ return agent_metadatas_schema.dump(agents)
|
@@ -2779,11 +2783,7 @@
Source code in app\services\agents\agents.py
- 48
-49
-50
-51
-52
+ | def mark_agent_as_critical(self, agent_id: str) -> Dict[str, Union[str, bool]]:
- """
- Marks a specific agent as critical.
-
- Args:
- agent_id (str): The ID of the agent to mark as critical.
-
- Returns:
- dict: A dictionary representing a success message if the operation was successful, otherwise an error
- message.
- """
- agent = db.session.query(AgentMetadata).filter_by(agent_id=agent_id).first()
-
- if agent is None:
- return {"message": f"Agent {agent_id} not found", "success": False}
-
- agent.mark_as_critical()
- agent_details = agent_metadata_schema.dump(agent)
- if agent_details["critical_asset"] is False:
- return {
- "message": f"Agent {agent_id} failed to mark agent as critical",
- "success": False,
- }
- return {"message": f"Agent {agent_id} marked as critical", "success": True}
+71
+72
+73
+74
+75
| def mark_agent_as_critical(self, agent_id: str) -> Dict[str, Union[str, bool]]:
+ """
+ Marks a specific agent as critical.
+
+ Args:
+ agent_id (str): The ID of the agent to mark as critical.
+
+ Returns:
+ dict: A dictionary representing a success message if the operation was successful, otherwise an error
+ message.
+ """
+ agent = db.session.query(AgentMetadata).filter_by(agent_id=agent_id).first()
+
+ if agent is None:
+ return {"message": f"Agent {agent_id} not found", "success": False}
+
+ agent.mark_as_critical()
+ agent_details = agent_metadata_schema.dump(agent)
+ if agent_details["critical_asset"] is False:
+ return {
+ "message": f"Agent {agent_id} failed to mark agent as critical",
+ "success": False,
+ }
+ return {"message": f"Agent {agent_id} marked as critical", "success": True}
|
@@ -2908,53 +2912,53 @@
Source code in app\services\agents\agents.py
- | def mark_agent_as_non_critical(self, agent_id: str) -> Dict[str, Union[str, bool]]:
- """
- Marks a specific agent as non-critical.
-
- Args:
- agent_id (str): The ID of the agent to mark as non-critical.
-
- Returns:
- dict: A dictionary representing a success message if the operation was successful, otherwise an error
- message.
- """
- agent = db.session.query(AgentMetadata).filter_by(agent_id=agent_id).first()
-
- if agent is None:
- return {"message": f"Agent {agent_id} not found", "success": False}
-
- agent.mark_as_non_critical()
- agent_details = agent_metadata_schema.dump(agent)
- if agent_details["critical_asset"] is True:
- return {
- "message": f"Agent {agent_id} failed to mark agent as non-critical",
- "success": False,
- }
- return {"message": f"Agent {agent_id} marked as non-critical", "success": True}
+ | def mark_agent_as_non_critical(self, agent_id: str) -> Dict[str, Union[str, bool]]:
+ """
+ Marks a specific agent as non-critical.
+
+ Args:
+ agent_id (str): The ID of the agent to mark as non-critical.
+
+ Returns:
+ dict: A dictionary representing a success message if the operation was successful, otherwise an error
+ message.
+ """
+ agent = db.session.query(AgentMetadata).filter_by(agent_id=agent_id).first()
+
+ if agent is None:
+ return {"message": f"Agent {agent_id} not found", "success": False}
+
+ agent.mark_as_non_critical()
+ agent_details = agent_metadata_schema.dump(agent)
+ if agent_details["critical_asset"] is True:
+ return {
+ "message": f"Agent {agent_id} failed to mark agent as non-critical",
+ "success": False,
+ }
+ return {"message": f"Agent {agent_id} marked as non-critical", "success": True}
|
@@ -2986,11 +2990,7 @@
Source code in app\services\agents\agents.py
- 166
-167
-168
-169
-170
+ | class AgentSyncService:
- def __init__(self):
- self.agent_service = AgentService()
-
- def collect_wazuh_details(self, connector_name: str) -> Tuple[Optional[str], Optional[str], Optional[str]]:
- """
- Collects the information of all Wazuh API credentials using the WazuhIndexerConnector class details.
+268
+269
+270
+271
+272
+273
+274
+275
+276
+277
+278
+279
| class AgentSyncService:
+ def __init__(self):
+ self.agent_service = AgentService()
- Returns:
- tuple: A tuple containing the connection URL, username, and password.
- """
- connector_instance = connector_factory.create(connector_name, connector_name)
- connection_successful = connector_instance.verify_connection()
- if connection_successful:
- connection_details = Connector.get_connector_info_from_db(connector_name)
- return (
- connection_details.get("connector_url"),
- connection_details.get("connector_username"),
- connection_details.get("connector_password"),
- )
- else:
- return None, None, None
-
- def collect_wazuh_agents(self, connection_url: str, wazuh_auth_token: str) -> Optional[List[Dict[str, str]]]:
- """
- Collects the information of all agents from the Wazuh API.
-
- Returns:
- list: A list containing the information of all Wazuh agents.
- """
- logger.info("Collecting Wazuh Agents")
- try:
- headers = {"Authorization": f"Bearer {wazuh_auth_token}"}
- limit = 1000
- agents_collected = requests.get(
- f"{connection_url}/agents?limit={limit}",
- headers=headers,
- verify=False,
- )
- if agents_collected.status_code == 200:
- wazuh_agents_list = []
- for agent in agents_collected.json()["data"]["affected_items"]:
- os_name = agent.get("os", {}).get("name", "Unknown")
- last_keep_alive = agent.get("lastKeepAlive", "Unknown")
- wazuh_agents_list.append(
- {
- "agent_id": agent["id"],
- "agent_name": agent["name"],
- "agent_ip": agent["ip"],
- "agent_os": os_name,
- "agent_last_seen": last_keep_alive,
- },
- )
- logger.info(f"Collected Wazuh Agent: {agent['name']}")
- return wazuh_agents_list
- else:
- return None
- except Exception as e:
- logger.error(f"Failed to collect Wazuh Agents: {e}")
- return None
-
- def sync_agents(self) -> Dict[str, Union[str, bool, List[Dict[str, str]]]]:
- (
- connection_url,
- connection_username,
- connection_password,
- ) = self.collect_wazuh_details("Wazuh-Manager")
- if connection_url is None:
- return {
- "message": "Failed to get Wazuh-Manager API details",
- "success": False,
- }
-
- wazuh_manager_connector = WazuhManagerConnector("Wazuh-Manager")
- wazuh_auth_token = wazuh_manager_connector.get_auth_token()
- if wazuh_auth_token is None:
- return {
- "message": "Failed to get Wazuh-Manager API Auth Token",
- "success": False,
- }
-
- wazuh_agents_list = self.collect_wazuh_agents(connection_url, wazuh_auth_token)
- if wazuh_agents_list is None:
- return {
- "message": "Failed to collect Wazuh-Manager Agents",
- "success": False,
- }
-
- logger.info(f"Collected {wazuh_agents_list} Wazuh Agents")
-
- agents_added_list = []
- for agent in wazuh_agents_list:
- agent_info = self.agent_service.get_agent(agent["agent_id"])
- logger.info(f"Agent info: {agent_info}")
- if "message" in agent_info:
- self.agent_service.create_agent(agent)
- agents_added_list.append(agent)
-
- return {
- "message": "Successfully synced agents.",
- "success": True,
- "agents_added": agents_added_list,
- }
+ def collect_wazuh_details(
+ self,
+ connector_name: str,
+ ) -> Tuple[Optional[str], Optional[str], Optional[str]]:
+ """
+ Collects the information of all Wazuh API credentials using the WazuhIndexerConnector class details.
+
+ Returns:
+ tuple: A tuple containing the connection URL, username, and password.
+ """
+ connector_instance = connector_factory.create(connector_name, connector_name)
+ connection_successful = connector_instance.verify_connection()
+ if connection_successful:
+ connection_details = Connector.get_connector_info_from_db(connector_name)
+ return (
+ connection_details.get("connector_url"),
+ connection_details.get("connector_username"),
+ connection_details.get("connector_password"),
+ )
+ else:
+ return None, None, None
+
+ def collect_wazuh_agents(
+ self,
+ connection_url: str,
+ wazuh_auth_token: str,
+ ) -> Optional[List[Dict[str, str]]]:
+ """
+ Collects the information of all agents from the Wazuh API.
+
+ Returns:
+ list: A list containing the information of all Wazuh agents.
+ """
+ logger.info("Collecting Wazuh Agents")
+ try:
+ headers = {"Authorization": f"Bearer {wazuh_auth_token}"}
+ limit = 1000
+ agents_collected = requests.get(
+ f"{connection_url}/agents?limit={limit}",
+ headers=headers,
+ verify=False,
+ )
+ if agents_collected.status_code == 200:
+ wazuh_agents_list = []
+ for agent in agents_collected.json()["data"]["affected_items"]:
+ os_name = agent.get("os", {}).get("name", "Unknown")
+ last_keep_alive = agent.get("lastKeepAlive", "Unknown")
+ wazuh_agents_list.append(
+ {
+ "agent_id": agent["id"],
+ "agent_name": agent["name"],
+ "agent_ip": agent["ip"],
+ "agent_os": os_name,
+ "agent_last_seen": last_keep_alive,
+ },
+ )
+ logger.info(f"Collected Wazuh Agent: {agent['name']}")
+ return wazuh_agents_list
+ else:
+ return None
+ except Exception as e:
+ logger.error(f"Failed to collect Wazuh Agents: {e}")
+ return None
+
+ def sync_agents(self) -> Dict[str, Union[str, bool, List[Dict[str, str]]]]:
+ (
+ connection_url,
+ connection_username,
+ connection_password,
+ ) = self.collect_wazuh_details("Wazuh-Manager")
+ if connection_url is None:
+ return {
+ "message": "Failed to get Wazuh-Manager API details",
+ "success": False,
+ }
+
+ wazuh_manager_connector = WazuhManagerConnector("Wazuh-Manager")
+ wazuh_auth_token = wazuh_manager_connector.get_auth_token()
+ if wazuh_auth_token is None:
+ return {
+ "message": "Failed to get Wazuh-Manager API Auth Token",
+ "success": False,
+ }
+
+ wazuh_agents_list = self.collect_wazuh_agents(connection_url, wazuh_auth_token)
+ if wazuh_agents_list is None:
+ return {
+ "message": "Failed to collect Wazuh-Manager Agents",
+ "success": False,
+ }
+
+ logger.info(f"Collected {wazuh_agents_list} Wazuh Agents")
+
+ agents_added_list = []
+ for agent in wazuh_agents_list:
+ agent_info = self.agent_service.get_agent(agent["agent_id"])
+ logger.info(f"Agent info: {agent_info}")
+ if "message" in agent_info:
+ self.agent_service.create_agent(agent)
+ agents_added_list.append(agent)
+
+ return {
+ "message": "Successfully synced agents.",
+ "success": True,
+ "agents_added": agents_added_list,
+ }
|
@@ -3244,14 +3262,7 @@
Source code in app\services\agents\agents.py
- 189
-190
-191
-192
-193
-194
-195
-196
+ | def collect_wazuh_agents(self, connection_url: str, wazuh_auth_token: str) -> Optional[List[Dict[str, str]]]:
- """
- Collects the information of all agents from the Wazuh API.
-
- Returns:
- list: A list containing the information of all Wazuh agents.
- """
- logger.info("Collecting Wazuh Agents")
- try:
- headers = {"Authorization": f"Bearer {wazuh_auth_token}"}
- limit = 1000
- agents_collected = requests.get(
- f"{connection_url}/agents?limit={limit}",
- headers=headers,
- verify=False,
- )
- if agents_collected.status_code == 200:
- wazuh_agents_list = []
- for agent in agents_collected.json()["data"]["affected_items"]:
- os_name = agent.get("os", {}).get("name", "Unknown")
- last_keep_alive = agent.get("lastKeepAlive", "Unknown")
- wazuh_agents_list.append(
- {
- "agent_id": agent["id"],
- "agent_name": agent["name"],
- "agent_ip": agent["ip"],
- "agent_os": os_name,
- "agent_last_seen": last_keep_alive,
- },
- )
- logger.info(f"Collected Wazuh Agent: {agent['name']}")
- return wazuh_agents_list
- else:
- return None
- except Exception as e:
- logger.error(f"Failed to collect Wazuh Agents: {e}")
- return None
+225
+226
+227
+228
+229
+230
+231
+232
+233
+234
+235
+236
| def collect_wazuh_agents(
+ self,
+ connection_url: str,
+ wazuh_auth_token: str,
+) -> Optional[List[Dict[str, str]]]:
+ """
+ Collects the information of all agents from the Wazuh API.
+
+ Returns:
+ list: A list containing the information of all Wazuh agents.
+ """
+ logger.info("Collecting Wazuh Agents")
+ try:
+ headers = {"Authorization": f"Bearer {wazuh_auth_token}"}
+ limit = 1000
+ agents_collected = requests.get(
+ f"{connection_url}/agents?limit={limit}",
+ headers=headers,
+ verify=False,
+ )
+ if agents_collected.status_code == 200:
+ wazuh_agents_list = []
+ for agent in agents_collected.json()["data"]["affected_items"]:
+ os_name = agent.get("os", {}).get("name", "Unknown")
+ last_keep_alive = agent.get("lastKeepAlive", "Unknown")
+ wazuh_agents_list.append(
+ {
+ "agent_id": agent["id"],
+ "agent_name": agent["name"],
+ "agent_ip": agent["ip"],
+ "agent_os": os_name,
+ "agent_last_seen": last_keep_alive,
+ },
+ )
+ logger.info(f"Collected Wazuh Agent: {agent['name']}")
+ return wazuh_agents_list
+ else:
+ return None
+ except Exception as e:
+ logger.error(f"Failed to collect Wazuh Agents: {e}")
+ return None
|
@@ -3361,11 +3387,7 @@
Source code in app\services\agents\agents.py
- 170
-171
-172
-173
-174
+ | def collect_wazuh_details(self, connector_name: str) -> Tuple[Optional[str], Optional[str], Optional[str]]:
- """
- Collects the information of all Wazuh API credentials using the WazuhIndexerConnector class details.
-
- Returns:
- tuple: A tuple containing the connection URL, username, and password.
- """
- connector_instance = connector_factory.create(connector_name, connector_name)
- connection_successful = connector_instance.verify_connection()
- if connection_successful:
- connection_details = Connector.get_connector_info_from_db(connector_name)
- return (
- connection_details.get("connector_url"),
- connection_details.get("connector_username"),
- connection_details.get("connector_password"),
- )
- else:
- return None, None, None
+187
+188
+189
+190
+191
+192
+193
+194
| def collect_wazuh_details(
+ self,
+ connector_name: str,
+) -> Tuple[Optional[str], Optional[str], Optional[str]]:
+ """
+ Collects the information of all Wazuh API credentials using the WazuhIndexerConnector class details.
+
+ Returns:
+ tuple: A tuple containing the connection URL, username, and password.
+ """
+ connector_instance = connector_factory.create(connector_name, connector_name)
+ connection_successful = connector_instance.verify_connection()
+ if connection_successful:
+ connection_details = Connector.get_connector_info_from_db(connector_name)
+ return (
+ connection_details.get("connector_url"),
+ connection_details.get("connector_username"),
+ connection_details.get("connector_password"),
+ )
+ else:
+ return None, None, None
|
diff --git a/backend/site/alerts/index.html b/backend/site/alerts/index.html
index ff1c4607..4692e0f3 100644
--- a/backend/site/alerts/index.html
+++ b/backend/site/alerts/index.html
@@ -589,10 +589,7 @@
Source code in app\routes\alerts.py
- 6
- 7
- 8
- 9
+ | @bp.route("/alerts", methods=["GET"])
-def get_alerts() -> jsonify:
- """
- Retrieves all alerts from the AlertsService.
-
- This endpoint retrieves all available alerts from the AlertsService. It does this by creating an instance of
- the AlertsService class and calling its `collect_alerts` method. The result is a list of all alerts currently
- available.
-
- Returns:
- jsonify: A JSON response containing a list of alerts. Each item in the list is a dictionary representing an alert,
- containing all its associated data.
- """
- service = AlertsService()
- alerts = service.collect_alerts()
- return jsonify(alerts)
+21
+22
+23
+24
| @bp.route("/alerts", methods=["GET"])
+def get_alerts() -> jsonify:
+ """
+ Retrieves all alerts from the AlertsService.
+
+ This endpoint retrieves all available alerts from the AlertsService. It does this by creating an instance of
+ the AlertsService class and calling its `collect_alerts` method. The result is a list of all alerts currently
+ available.
+
+ Returns:
+ jsonify: A JSON response containing a list of alerts. Each item in the list is a dictionary representing an alert,
+ containing all its associated data.
+ """
+ service = AlertsService()
+ alerts = service.collect_alerts()
+ return jsonify(alerts)
|
@@ -729,8 +729,7 @@
Source code in app\services\WazuhIndexer\alerts.py
- 8
- 9
+ | class AlertsService:
- """
- A service class that encapsulates the logic for pulling alerts from the Wazuh-Indexer.
-
- Attributes:
- connector_url (str): The url to the Wazuh-Indexer.
- connector_username (str): The username for the Wazuh-Indexer.
- connector_password (str): The password for the Wazuh-Indexer.
- es (Elasticsearch): The Elasticsearch client.
- """
-
- SKIP_INDEX_NAMES: Dict[str, bool] = {
- "wazuh-statistics": True,
- "wazuh-monitoring": True,
- }
-
- def __init__(self):
- """
- Initializes the service by collecting Wazuh-Indexer details and creating an Elasticsearch client.
- """
- (
- self.connector_url,
- self.connector_username,
- self.connector_password,
- ) = UniversalService().collect_wazuhindexer_details("Wazuh-Indexer")
- self.es = Elasticsearch(
- [self.connector_url],
- http_auth=(self.connector_username, self.connector_password),
- verify_certs=False,
- timeout=15,
- max_retries=10,
- retry_on_timeout=False,
- )
-
- def is_index_skipped(self, index_name: str) -> bool:
- """
- Checks whether the given index name should be skipped.
-
- Args:
- index_name (str): The name of the index.
-
- Returns:
- bool: True if the index should be skipped, False otherwise.
- """
- for skipped in self.SKIP_INDEX_NAMES:
- if index_name.startswith(skipped):
- return True
- return False
-
- def collect_alerts(self) -> Dict[str, object]:
- """
- Collects the alerts from the Wazuh-Indexer where the index name starts with "wazuh_"
- and is not in the SKIP_INDEX_NAMES list.
- Returns the 10 previous alerts based on the `timestamp_utc` field.
-
- Returns:
- Dict[str, object]: A dictionary containing success status and alerts or an error message.
- """
- if not all(
- [self.connector_url, self.connector_username, self.connector_password],
- ):
- return {
- "message": "Failed to collect Wazuh-Indexer details",
- "success": False,
- }
-
- indices_list = UniversalService().collect_indices()
- if not indices_list["success"]:
- return {"message": "Failed to collect indices", "success": False}
-
- alerts_summary = []
- for index_name in indices_list["indices_list"]:
- if not index_name.startswith("wazuh_") or self.is_index_skipped(index_name):
- continue
-
- alerts = self._collect_alerts(index_name)
- if alerts["success"] and len(alerts["alerts"]) > 0:
- alerts_summary.append(
- {
- "index_name": index_name,
- "total_alerts": len(alerts["alerts"]),
- "last_10_alerts": alerts["alerts"],
- },
- )
-
- return {
- "message": "Successfully collected alerts",
- "success": True,
- "alerts_summary": alerts_summary,
- }
-
- def _collect_alerts(self, index_name: str) -> Dict[str, object]:
- """
- Elasticsearch query to get the 10 most recent alerts where the `rule_level` is 12 or higher or the
- `syslog_level` field is `ALERT` and return the results in descending order by the `timestamp_utc` field.
-
- Args:
- index_name (str): The name of the index to query.
-
- Returns:
- Dict[str, object]: A dictionary containing success status and alerts or an error message.
- """
- logger.info(f"Collecting alerts from {index_name}")
- query = self._build_query()
- try:
- alerts = self.es.search(index=index_name, body=query, size=10)
- alerts_list = [alert for alert in alerts["hits"]["hits"]]
- return {
- "message": "Successfully collected alerts",
- "success": True,
- "alerts": alerts_list,
- }
- except Exception as e:
- logger.error(f"Failed to collect alerts: {e}")
- return {"message": "Failed to collect alerts", "success": False}
-
- @staticmethod
- def _build_query() -> Dict[str, object]:
- """
- Builds the Elasticsearch query to get the 10 most recent alerts where the `rule_level` is 12 or higher or
- the `syslog_level` field is `ALERT`.
-
- Returns:
- Dict[str, object]: A dictionary representing the Elasticsearch query.
- """
- return {
- "query": {
- "bool": {
- "should": [
- {"range": {"rule_level": {"gte": 12}}},
- {"match": {"syslog_level": "ALERT"}},
- ],
- },
- },
- "sort": [{"timestamp_utc": {"order": "desc"}}],
- }
+143
+144
| class AlertsService:
+ """
+ A service class that encapsulates the logic for pulling alerts from the Wazuh-Indexer.
+
+ Attributes:
+ connector_url (str): The url to the Wazuh-Indexer.
+ connector_username (str): The username for the Wazuh-Indexer.
+ connector_password (str): The password for the Wazuh-Indexer.
+ es (Elasticsearch): The Elasticsearch client.
+ """
+
+ SKIP_INDEX_NAMES: Dict[str, bool] = {
+ "wazuh-statistics": True,
+ "wazuh-monitoring": True,
+ }
+
+ def __init__(self):
+ """
+ Initializes the service by collecting Wazuh-Indexer details and creating an Elasticsearch client.
+ """
+ (
+ self.connector_url,
+ self.connector_username,
+ self.connector_password,
+ ) = UniversalService().collect_wazuhindexer_details("Wazuh-Indexer")
+ self.es = Elasticsearch(
+ [self.connector_url],
+ http_auth=(self.connector_username, self.connector_password),
+ verify_certs=False,
+ timeout=15,
+ max_retries=10,
+ retry_on_timeout=False,
+ )
+
+ def is_index_skipped(self, index_name: str) -> bool:
+ """
+ Checks whether the given index name should be skipped.
+
+ Args:
+ index_name (str): The name of the index.
+
+ Returns:
+ bool: True if the index should be skipped, False otherwise.
+ """
+ for skipped in self.SKIP_INDEX_NAMES:
+ if index_name.startswith(skipped):
+ return True
+ return False
+
+ def collect_alerts(self) -> Dict[str, object]:
+ """
+ Collects the alerts from the Wazuh-Indexer where the index name starts with "wazuh_"
+ and is not in the SKIP_INDEX_NAMES list.
+ Returns the 10 previous alerts based on the `timestamp_utc` field.
+
+ Returns:
+ Dict[str, object]: A dictionary containing success status and alerts or an error message.
+ """
+ if not all(
+ [self.connector_url, self.connector_username, self.connector_password],
+ ):
+ return {
+ "message": "Failed to collect Wazuh-Indexer details",
+ "success": False,
+ }
+
+ indices_list = UniversalService().collect_indices()
+ if not indices_list["success"]:
+ return {"message": "Failed to collect indices", "success": False}
+
+ alerts_summary = []
+ for index_name in indices_list["indices_list"]:
+ if not index_name.startswith("wazuh_") or self.is_index_skipped(index_name):
+ continue
+
+ alerts = self._collect_alerts(index_name)
+ if alerts["success"] and len(alerts["alerts"]) > 0:
+ alerts_summary.append(
+ {
+ "index_name": index_name,
+ "total_alerts": len(alerts["alerts"]),
+ "last_10_alerts": alerts["alerts"],
+ },
+ )
+
+ return {
+ "message": "Successfully collected alerts",
+ "success": True,
+ "alerts_summary": alerts_summary,
+ }
+
+ def _collect_alerts(self, index_name: str) -> Dict[str, object]:
+ """
+ Elasticsearch query to get the 10 most recent alerts where the `rule_level` is 12 or higher or the
+ `syslog_level` field is `ALERT` and return the results in descending order by the `timestamp_utc` field.
+
+ Args:
+ index_name (str): The name of the index to query.
+
+ Returns:
+ Dict[str, object]: A dictionary containing success status and alerts or an error message.
+ """
+ logger.info(f"Collecting alerts from {index_name}")
+ query = self._build_query()
+ try:
+ alerts = self.es.search(index=index_name, body=query, size=10)
+ alerts_list = [alert for alert in alerts["hits"]["hits"]]
+ return {
+ "message": "Successfully collected alerts",
+ "success": True,
+ "alerts": alerts_list,
+ }
+ except Exception as e:
+ logger.error(f"Failed to collect alerts: {e}")
+ return {"message": "Failed to collect alerts", "success": False}
+
+ @staticmethod
+ def _build_query() -> Dict[str, object]:
+ """
+ Builds the Elasticsearch query to get the 10 most recent alerts where the `rule_level` is 12 or higher or
+ the `syslog_level` field is `ALERT`.
+
+ Returns:
+ Dict[str, object]: A dictionary representing the Elasticsearch query.
+ """
+ return {
+ "query": {
+ "bool": {
+ "should": [
+ {"range": {"rule_level": {"gte": 12}}},
+ {"match": {"syslog_level": "ALERT"}},
+ ],
+ },
+ },
+ "sort": [{"timestamp_utc": {"order": "desc"}}],
+ }
|
@@ -1031,8 +1031,7 @@
Source code in app\services\WazuhIndexer\alerts.py
- 24
-25
+ | def __init__(self):
- """
- Initializes the service by collecting Wazuh-Indexer details and creating an Elasticsearch client.
- """
- (
- self.connector_url,
- self.connector_username,
- self.connector_password,
- ) = UniversalService().collect_wazuhindexer_details("Wazuh-Indexer")
- self.es = Elasticsearch(
- [self.connector_url],
- http_auth=(self.connector_username, self.connector_password),
- verify_certs=False,
- timeout=15,
- max_retries=10,
- retry_on_timeout=False,
- )
+40
+41
| def __init__(self):
+ """
+ Initializes the service by collecting Wazuh-Indexer details and creating an Elasticsearch client.
+ """
+ (
+ self.connector_url,
+ self.connector_username,
+ self.connector_password,
+ ) = UniversalService().collect_wazuhindexer_details("Wazuh-Indexer")
+ self.es = Elasticsearch(
+ [self.connector_url],
+ http_auth=(self.connector_username, self.connector_password),
+ verify_certs=False,
+ timeout=15,
+ max_retries=10,
+ retry_on_timeout=False,
+ )
|
@@ -1110,8 +1110,7 @@
Source code in app\services\WazuhIndexer\alerts.py
- 57
-58
+ | def collect_alerts(self) -> Dict[str, object]:
- """
- Collects the alerts from the Wazuh-Indexer where the index name starts with "wazuh_"
- and is not in the SKIP_INDEX_NAMES list.
- Returns the 10 previous alerts based on the `timestamp_utc` field.
-
- Returns:
- Dict[str, object]: A dictionary containing success status and alerts or an error message.
- """
- if not all(
- [self.connector_url, self.connector_username, self.connector_password],
- ):
- return {
- "message": "Failed to collect Wazuh-Indexer details",
- "success": False,
- }
-
- indices_list = UniversalService().collect_indices()
- if not indices_list["success"]:
- return {"message": "Failed to collect indices", "success": False}
-
- alerts_summary = []
- for index_name in indices_list["indices_list"]:
- if not index_name.startswith("wazuh_") or self.is_index_skipped(index_name):
- continue
-
- alerts = self._collect_alerts(index_name)
- if alerts["success"] and len(alerts["alerts"]) > 0:
- alerts_summary.append(
- {
- "index_name": index_name,
- "total_alerts": len(alerts["alerts"]),
- "last_10_alerts": alerts["alerts"],
- },
- )
-
- return {
- "message": "Successfully collected alerts",
- "success": True,
- "alerts_summary": alerts_summary,
- }
+97
+98
| def collect_alerts(self) -> Dict[str, object]:
+ """
+ Collects the alerts from the Wazuh-Indexer where the index name starts with "wazuh_"
+ and is not in the SKIP_INDEX_NAMES list.
+ Returns the 10 previous alerts based on the `timestamp_utc` field.
+
+ Returns:
+ Dict[str, object]: A dictionary containing success status and alerts or an error message.
+ """
+ if not all(
+ [self.connector_url, self.connector_username, self.connector_password],
+ ):
+ return {
+ "message": "Failed to collect Wazuh-Indexer details",
+ "success": False,
+ }
+
+ indices_list = UniversalService().collect_indices()
+ if not indices_list["success"]:
+ return {"message": "Failed to collect indices", "success": False}
+
+ alerts_summary = []
+ for index_name in indices_list["indices_list"]:
+ if not index_name.startswith("wazuh_") or self.is_index_skipped(index_name):
+ continue
+
+ alerts = self._collect_alerts(index_name)
+ if alerts["success"] and len(alerts["alerts"]) > 0:
+ alerts_summary.append(
+ {
+ "index_name": index_name,
+ "total_alerts": len(alerts["alerts"]),
+ "last_10_alerts": alerts["alerts"],
+ },
+ )
+
+ return {
+ "message": "Successfully collected alerts",
+ "success": True,
+ "alerts_summary": alerts_summary,
+ }
|
@@ -1263,8 +1263,7 @@
Source code in app\services\WazuhIndexer\alerts.py
- 42
-43
+ | def is_index_skipped(self, index_name: str) -> bool:
- """
- Checks whether the given index name should be skipped.
-
- Args:
- index_name (str): The name of the index.
-
- Returns:
- bool: True if the index should be skipped, False otherwise.
- """
- for skipped in self.SKIP_INDEX_NAMES:
- if index_name.startswith(skipped):
- return True
- return False
+55
+56
| def is_index_skipped(self, index_name: str) -> bool:
+ """
+ Checks whether the given index name should be skipped.
+
+ Args:
+ index_name (str): The name of the index.
+
+ Returns:
+ bool: True if the index should be skipped, False otherwise.
+ """
+ for skipped in self.SKIP_INDEX_NAMES:
+ if index_name.startswith(skipped):
+ return True
+ return False
|
diff --git a/backend/site/artifacts/index.html b/backend/site/artifacts/index.html
index 1c017adc..89402ebf 100644
--- a/backend/site/artifacts/index.html
+++ b/backend/site/artifacts/index.html
@@ -1038,12 +1038,7 @@
Source code in app\routes\velociraptor.py
- 69
- 70
- 71
- 72
- 73
- 74
+ | @bp.route("/velociraptor/artifacts/collection", methods=["POST"])
-def collect_artifact():
- """
- Endpoint to collect an artifact.
- It collects the artifact name and client name from the request body and returns the results.
-
- Returns:
- json: A JSON response containing the result of the artifact collection operation.
- """
- req_data = request.get_json()
- artifact_name = req_data["artifact_name"]
- client_name = req_data["client_name"]
- service = UniversalService()
- client_id = service.get_client_id(client_name=client_name)["results"][0][
- "client_id"
- ]
- if client_id is None:
- return (
- jsonify(
- {
- "message": f"{client_name} has not been seen in the last 30 seconds and may not be online with the "
- "Velociraptor server.",
- "success": False,
- },
- ),
- 500,
- )
-
- artifact_service = ArtifactsService()
- artifact_results = artifact_service.run_artifact_collection(
- client_id=client_id,
- artifact=artifact_name,
- )
- return artifact_results
+102
+103
+104
+105
+106
+107
| @bp.route("/velociraptor/artifacts/collection", methods=["POST"])
+def collect_artifact():
+ """
+ Endpoint to collect an artifact.
+ It collects the artifact name and client name from the request body and returns the results.
+
+ Returns:
+ json: A JSON response containing the result of the artifact collection operation.
+ """
+ req_data = request.get_json()
+ artifact_name = req_data["artifact_name"]
+ client_name = req_data["client_name"]
+ service = UniversalService()
+ client_id = service.get_client_id(client_name=client_name)["results"][0][
+ "client_id"
+ ]
+ if client_id is None:
+ return (
+ jsonify(
+ {
+ "message": f"{client_name} has not been seen in the last 30 seconds and may not be online with the "
+ "Velociraptor server.",
+ "success": False,
+ },
+ ),
+ 500,
+ )
+
+ artifact_service = ArtifactsService()
+ artifact_results = artifact_service.run_artifact_collection(
+ client_id=client_id,
+ artifact=artifact_name,
+ )
+ return artifact_results
|
@@ -1158,8 +1158,7 @@
Source code in app\routes\velociraptor.py
- 10
-11
+ | @bp.route("/velociraptor/artifacts", methods=["GET"])
-def get_artifacts():
- """
- Endpoint to list all available artifacts.
- It processes each artifact to verify the connection and returns the results.
-
- Returns:
- json: A JSON response containing the list of all available artifacts along with their connection verification
- status.
- """
- service = ArtifactsService()
- artifacts = service.collect_artifacts()
- return artifacts
+22
+23
| @bp.route("/velociraptor/artifacts", methods=["GET"])
+def get_artifacts():
+ """
+ Endpoint to list all available artifacts.
+ It processes each artifact to verify the connection and returns the results.
+
+ Returns:
+ json: A JSON response containing the list of all available artifacts along with their connection verification
+ status.
+ """
+ service = ArtifactsService()
+ artifacts = service.collect_artifacts()
+ return artifacts
|
@@ -1237,9 +1237,7 @@
Source code in app\routes\velociraptor.py
- 24
-25
-26
+ | @bp.route("/velociraptor/artifacts/linux", methods=["GET"])
-def get_artifacts_linux():
- """
- Endpoint to list all available Linux artifacts.
- It processes each artifact to verify the connection and returns the results where the name
- begins with `Linux`.
-
- Returns:
- json: A JSON response containing the list of all available Linux artifacts along with their connection verification
- status.
- """
- service = ArtifactsService()
- linux_artifacts = service.collect_artifacts_linux()
- return linux_artifacts
+37
+38
+39
| @bp.route("/velociraptor/artifacts/linux", methods=["GET"])
+def get_artifacts_linux():
+ """
+ Endpoint to list all available Linux artifacts.
+ It processes each artifact to verify the connection and returns the results where the name
+ begins with `Linux`.
+
+ Returns:
+ json: A JSON response containing the list of all available Linux artifacts along with their connection verification
+ status.
+ """
+ service = ArtifactsService()
+ linux_artifacts = service.collect_artifacts_linux()
+ return linux_artifacts
|
@@ -1318,11 +1318,7 @@
Source code in app\routes\velociraptor.py
- 54
-55
-56
-57
-58
+ | @bp.route("/velociraptor/artifacts/mac", methods=["GET"])
-def get_artifacts_mac():
- """
- Endpoint to list all available MacOS artifacts.
- It processes each artifact to verify the connection and returns the results where the name
- begins with `MacOS`.
-
- Returns:
- json: A JSON response containing the list of all available MacOS artifacts along with their connection verification
- status.
- """
- service = ArtifactsService()
- mac_artifacts = service.collect_artifacts_macos()
- return mac_artifacts
+67
+68
+69
+70
+71
| @bp.route("/velociraptor/artifacts/mac", methods=["GET"])
+def get_artifacts_mac():
+ """
+ Endpoint to list all available MacOS artifacts.
+ It processes each artifact to verify the connection and returns the results where the name
+ begins with `MacOS`.
+
+ Returns:
+ json: A JSON response containing the list of all available MacOS artifacts along with their connection verification
+ status.
+ """
+ service = ArtifactsService()
+ mac_artifacts = service.collect_artifacts_macos()
+ return mac_artifacts
|
@@ -1399,10 +1399,7 @@
Source code in app\routes\velociraptor.py
- 39
-40
-41
-42
+ | @bp.route("/velociraptor/artifacts/windows", methods=["GET"])
-def get_artifacts_windows():
- """
- Endpoint to list all available Windows artifacts.
- It processes each artifact to verify the connection and returns the results where the name
- begins with `Windows`.
-
- Returns:
- json: A JSON response containing the list of all available Windows artifacts along with their connection verification
- status.
- """
- service = ArtifactsService()
- windows_artifacts = service.collect_artifacts_windows()
- return windows_artifacts
+52
+53
+54
+55
| @bp.route("/velociraptor/artifacts/windows", methods=["GET"])
+def get_artifacts_windows():
+ """
+ Endpoint to list all available Windows artifacts.
+ It processes each artifact to verify the connection and returns the results where the name
+ begins with `Windows`.
+
+ Returns:
+ json: A JSON response containing the list of all available Windows artifacts along with their connection verification
+ status.
+ """
+ service = ArtifactsService()
+ windows_artifacts = service.collect_artifacts_windows()
+ return windows_artifacts
|
@@ -1478,9 +1478,7 @@
Source code in app\services\Velociraptor\artifacts.py
- 4
- 5
- 6
+ 6
7
8
9
@@ -1610,139 +1608,141 @@ | class ArtifactsService:
- """
- A service class that encapsulates the logic for pulling artifacts from Velociraptor.
- """
-
- def __init__(self):
- self.universal_service = UniversalService()
-
- def _create_query(self, query: str) -> str:
- """
- Create a query string.
-
- Args:
- query (str): The query to be executed.
-
- Returns:
- str: The created query string.
- """
- return query
-
- def _get_artifact_key(self, client_id: str, artifact: str) -> str:
- """
- Construct the artifact key.
-
- Args:
- client_id (str): The ID of the client.
- artifact (str): The name of the artifact.
-
- Returns:
- str: The constructed artifact key.
- """
- return f"collect_client(client_id='{client_id}', artifacts=['{artifact}'])"
-
- def collect_artifacts(self) -> dict:
- """
- Collect the artifacts from Velociraptor.
-
- Returns:
- dict: A dictionary with the success status, a message, and potentially the artifacts.
- """
- query = self._create_query("SELECT name FROM artifact_definitions()")
- return self.universal_service.execute_query(query)
-
- def collect_artifacts_prefixed(self, prefix: str) -> dict:
- """
- Collect the artifacts from Velociraptor that have a name beginning with a specific prefix.
-
- Args:
- prefix (str): The prefix to filter the artifacts.
-
- Returns:
- dict: A dictionary with the success status, a message, and potentially the artifacts.
- """
- artifacts_response = self.collect_artifacts()
- if not artifacts_response["success"]:
- return artifacts_response
-
- filtered_artifacts = [
- artifact
- for artifact in artifacts_response["results"]
- if artifact["name"].startswith(prefix)
- ]
-
- return {
- "success": True,
- "message": f"Successfully collected {prefix} artifacts",
- "artifacts": filtered_artifacts,
- }
-
- def collect_artifacts_linux(self) -> dict:
- """
- Collect the artifacts from Velociraptor that have a name beginning with `Linux`.
-
- Returns:
- dict: A dictionary with the success status, a message, and potentially the artifacts.
- """
- return self.collect_artifacts_prefixed("Linux.")
-
- def collect_artifacts_windows(self) -> dict:
- """
- Collect the artifacts from Velociraptor that have a name beginning with `Windows`.
-
- Returns:
- dict: A dictionary with the success status, a message, and potentially the artifacts.
- """
- return self.collect_artifacts_prefixed("Windows.")
-
- def collect_artifacts_macos(self) -> dict:
- """
- Collect the artifacts from Velociraptor that have a name beginning with `MacOS`.
-
- Returns:
- dict: A dictionary with the success status, a message, and potentially the artifacts.
- """
- return self.collect_artifacts_prefixed("MacOS.")
-
- def run_artifact_collection(self, client_id: str, artifact: str) -> dict:
- """
- Run an artifact collection on a specific client.
-
- Args:
- client_id (str): The ID of the client.
- artifact (str): The name of the artifact.
-
- Returns:
- dict: A dictionary with the success status, a message, and potentially the results.
- """
- try:
- query = self._create_query(
- f"SELECT collect_client(client_id='{client_id}', artifacts=['{artifact}']) FROM scope()",
- )
- flow = self.universal_service.execute_query(query)
- logger.info(f"Successfully ran artifact collection on {flow}")
-
- artifact_key = self._get_artifact_key(client_id, artifact)
- flow_id = flow["results"][0][artifact_key]["flow_id"]
- logger.info(f"Successfully ran artifact collection on {flow_id}")
-
- completed = self.universal_service.watch_flow_completion(flow_id)
- logger.info(f"Successfully watched flow completion on {completed}")
-
- results = self.universal_service.read_collection_results(
- client_id,
- flow_id,
- artifact,
- )
- return results
- except Exception as err:
- logger.error(f"Failed to run artifact collection: {err}")
- return {
- "message": "Failed to run artifact collection",
- "success": False,
- }
+136
+137
+138
| class ArtifactsService:
+ """
+ A service class that encapsulates the logic for pulling artifacts from Velociraptor.
+ """
+
+ def __init__(self):
+ self.universal_service = UniversalService()
+
+ def _create_query(self, query: str) -> str:
+ """
+ Create a query string.
+
+ Args:
+ query (str): The query to be executed.
+
+ Returns:
+ str: The created query string.
+ """
+ return query
+
+ def _get_artifact_key(self, client_id: str, artifact: str) -> str:
+ """
+ Construct the artifact key.
+
+ Args:
+ client_id (str): The ID of the client.
+ artifact (str): The name of the artifact.
+
+ Returns:
+ str: The constructed artifact key.
+ """
+ return f"collect_client(client_id='{client_id}', artifacts=['{artifact}'])"
+
+ def collect_artifacts(self) -> dict:
+ """
+ Collect the artifacts from Velociraptor.
+
+ Returns:
+ dict: A dictionary with the success status, a message, and potentially the artifacts.
+ """
+ query = self._create_query("SELECT name FROM artifact_definitions()")
+ return self.universal_service.execute_query(query)
+
+ def collect_artifacts_prefixed(self, prefix: str) -> dict:
+ """
+ Collect the artifacts from Velociraptor that have a name beginning with a specific prefix.
+
+ Args:
+ prefix (str): The prefix to filter the artifacts.
+
+ Returns:
+ dict: A dictionary with the success status, a message, and potentially the artifacts.
+ """
+ artifacts_response = self.collect_artifacts()
+ if not artifacts_response["success"]:
+ return artifacts_response
+
+ filtered_artifacts = [
+ artifact
+ for artifact in artifacts_response["results"]
+ if artifact["name"].startswith(prefix)
+ ]
+
+ return {
+ "success": True,
+ "message": f"Successfully collected {prefix} artifacts",
+ "artifacts": filtered_artifacts,
+ }
+
+ def collect_artifacts_linux(self) -> dict:
+ """
+ Collect the artifacts from Velociraptor that have a name beginning with `Linux`.
+
+ Returns:
+ dict: A dictionary with the success status, a message, and potentially the artifacts.
+ """
+ return self.collect_artifacts_prefixed("Linux.")
+
+ def collect_artifacts_windows(self) -> dict:
+ """
+ Collect the artifacts from Velociraptor that have a name beginning with `Windows`.
+
+ Returns:
+ dict: A dictionary with the success status, a message, and potentially the artifacts.
+ """
+ return self.collect_artifacts_prefixed("Windows.")
+
+ def collect_artifacts_macos(self) -> dict:
+ """
+ Collect the artifacts from Velociraptor that have a name beginning with `MacOS`.
+
+ Returns:
+ dict: A dictionary with the success status, a message, and potentially the artifacts.
+ """
+ return self.collect_artifacts_prefixed("MacOS.")
+
+ def run_artifact_collection(self, client_id: str, artifact: str) -> dict:
+ """
+ Run an artifact collection on a specific client.
+
+ Args:
+ client_id (str): The ID of the client.
+ artifact (str): The name of the artifact.
+
+ Returns:
+ dict: A dictionary with the success status, a message, and potentially the results.
+ """
+ try:
+ query = self._create_query(
+ f"SELECT collect_client(client_id='{client_id}', artifacts=['{artifact}']) FROM scope()",
+ )
+ flow = self.universal_service.execute_query(query)
+ logger.info(f"Successfully ran artifact collection on {flow}")
+
+ artifact_key = self._get_artifact_key(client_id, artifact)
+ flow_id = flow["results"][0][artifact_key]["flow_id"]
+ logger.info(f"Successfully ran artifact collection on {flow_id}")
+
+ completed = self.universal_service.watch_flow_completion(flow_id)
+ logger.info(f"Successfully watched flow completion on {completed}")
+
+ results = self.universal_service.read_collection_results(
+ client_id,
+ flow_id,
+ artifact,
+ )
+ return results
+ except Exception as err:
+ logger.error(f"Failed to run artifact collection: {err}")
+ return {
+ "message": "Failed to run artifact collection",
+ "success": False,
+ }
|
@@ -1796,23 +1796,23 @@
Source code in app\services\Velociraptor\artifacts.py
- 37
-38
-39
+ | def collect_artifacts(self) -> dict:
- """
- Collect the artifacts from Velociraptor.
-
- Returns:
- dict: A dictionary with the success status, a message, and potentially the artifacts.
- """
- query = self._create_query("SELECT name FROM artifact_definitions()")
- return self.universal_service.execute_query(query)
+45
+46
+47
| def collect_artifacts(self) -> dict:
+ """
+ Collect the artifacts from Velociraptor.
+
+ Returns:
+ dict: A dictionary with the success status, a message, and potentially the artifacts.
+ """
+ query = self._create_query("SELECT name FROM artifact_definitions()")
+ return self.universal_service.execute_query(query)
|
@@ -1857,21 +1857,21 @@
Source code in app\services\Velociraptor\artifacts.py
- 73
-74
-75
+ | def collect_artifacts_linux(self) -> dict:
- """
- Collect the artifacts from Velociraptor that have a name beginning with `Linux`.
-
- Returns:
- dict: A dictionary with the success status, a message, and potentially the artifacts.
- """
- return self.collect_artifacts_prefixed("Linux.")
+80
+81
+82
| def collect_artifacts_linux(self) -> dict:
+ """
+ Collect the artifacts from Velociraptor that have a name beginning with `Linux`.
+
+ Returns:
+ dict: A dictionary with the success status, a message, and potentially the artifacts.
+ """
+ return self.collect_artifacts_prefixed("Linux.")
|
@@ -1916,21 +1916,21 @@
Source code in app\services\Velociraptor\artifacts.py
- | def collect_artifacts_macos(self) -> dict:
- """
- Collect the artifacts from Velociraptor that have a name beginning with `MacOS`.
-
- Returns:
- dict: A dictionary with the success status, a message, and potentially the artifacts.
- """
- return self.collect_artifacts_prefixed("MacOS.")
+ | def collect_artifacts_macos(self) -> dict:
+ """
+ Collect the artifacts from Velociraptor that have a name beginning with `MacOS`.
+
+ Returns:
+ dict: A dictionary with the success status, a message, and potentially the artifacts.
+ """
+ return self.collect_artifacts_prefixed("MacOS.")
|
@@ -2003,9 +2003,7 @@
Source code in app\services\Velociraptor\artifacts.py
- 47
-48
-49
+ | def collect_artifacts_prefixed(self, prefix: str) -> dict:
- """
- Collect the artifacts from Velociraptor that have a name beginning with a specific prefix.
-
- Args:
- prefix (str): The prefix to filter the artifacts.
-
- Returns:
- dict: A dictionary with the success status, a message, and potentially the artifacts.
- """
- artifacts_response = self.collect_artifacts()
- if not artifacts_response["success"]:
- return artifacts_response
-
- filtered_artifacts = [
- artifact
- for artifact in artifacts_response["results"]
- if artifact["name"].startswith(prefix)
- ]
-
- return {
- "success": True,
- "message": f"Successfully collected {prefix} artifacts",
- "artifacts": filtered_artifacts,
- }
+71
+72
+73
| def collect_artifacts_prefixed(self, prefix: str) -> dict:
+ """
+ Collect the artifacts from Velociraptor that have a name beginning with a specific prefix.
+
+ Args:
+ prefix (str): The prefix to filter the artifacts.
+
+ Returns:
+ dict: A dictionary with the success status, a message, and potentially the artifacts.
+ """
+ artifacts_response = self.collect_artifacts()
+ if not artifacts_response["success"]:
+ return artifacts_response
+
+ filtered_artifacts = [
+ artifact
+ for artifact in artifacts_response["results"]
+ if artifact["name"].startswith(prefix)
+ ]
+
+ return {
+ "success": True,
+ "message": f"Successfully collected {prefix} artifacts",
+ "artifacts": filtered_artifacts,
+ }
|
@@ -2096,21 +2096,21 @@
Source code in app\services\Velociraptor\artifacts.py
- 82
-83
-84
+ | def collect_artifacts_windows(self) -> dict:
- """
- Collect the artifacts from Velociraptor that have a name beginning with `Windows`.
-
- Returns:
- dict: A dictionary with the success status, a message, and potentially the artifacts.
- """
- return self.collect_artifacts_prefixed("Windows.")
+89
+90
+91
| def collect_artifacts_windows(self) -> dict:
+ """
+ Collect the artifacts from Velociraptor that have a name beginning with `Windows`.
+
+ Returns:
+ dict: A dictionary with the success status, a message, and potentially the artifacts.
+ """
+ return self.collect_artifacts_prefixed("Windows.")
|
@@ -2197,9 +2197,7 @@
Source code in app\services\Velociraptor\artifacts.py
- 100
-101
-102
+ | def run_artifact_collection(self, client_id: str, artifact: str) -> dict:
- """
- Run an artifact collection on a specific client.
-
- Args:
- client_id (str): The ID of the client.
- artifact (str): The name of the artifact.
-
- Returns:
- dict: A dictionary with the success status, a message, and potentially the results.
- """
- try:
- query = self._create_query(
- f"SELECT collect_client(client_id='{client_id}', artifacts=['{artifact}']) FROM scope()",
- )
- flow = self.universal_service.execute_query(query)
- logger.info(f"Successfully ran artifact collection on {flow}")
-
- artifact_key = self._get_artifact_key(client_id, artifact)
- flow_id = flow["results"][0][artifact_key]["flow_id"]
- logger.info(f"Successfully ran artifact collection on {flow_id}")
-
- completed = self.universal_service.watch_flow_completion(flow_id)
- logger.info(f"Successfully watched flow completion on {completed}")
-
- results = self.universal_service.read_collection_results(
- client_id,
- flow_id,
- artifact,
- )
- return results
- except Exception as err:
- logger.error(f"Failed to run artifact collection: {err}")
- return {
- "message": "Failed to run artifact collection",
- "success": False,
- }
+136
+137
+138
| def run_artifact_collection(self, client_id: str, artifact: str) -> dict:
+ """
+ Run an artifact collection on a specific client.
+
+ Args:
+ client_id (str): The ID of the client.
+ artifact (str): The name of the artifact.
+
+ Returns:
+ dict: A dictionary with the success status, a message, and potentially the results.
+ """
+ try:
+ query = self._create_query(
+ f"SELECT collect_client(client_id='{client_id}', artifacts=['{artifact}']) FROM scope()",
+ )
+ flow = self.universal_service.execute_query(query)
+ logger.info(f"Successfully ran artifact collection on {flow}")
+
+ artifact_key = self._get_artifact_key(client_id, artifact)
+ flow_id = flow["results"][0][artifact_key]["flow_id"]
+ logger.info(f"Successfully ran artifact collection on {flow_id}")
+
+ completed = self.universal_service.watch_flow_completion(flow_id)
+ logger.info(f"Successfully watched flow completion on {completed}")
+
+ results = self.universal_service.read_collection_results(
+ client_id,
+ flow_id,
+ artifact,
+ )
+ return results
+ except Exception as err:
+ logger.error(f"Failed to run artifact collection: {err}")
+ return {
+ "message": "Failed to run artifact collection",
+ "success": False,
+ }
|
diff --git a/backend/site/cases/index.html b/backend/site/cases/index.html
index 81b486f4..9b57d199 100644
--- a/backend/site/cases/index.html
+++ b/backend/site/cases/index.html
@@ -1051,11 +1051,7 @@
Source code in app\routes\dfir_iris.py
- 53
-54
-55
-56
-57
+ | @bp.route("/dfir_iris/cases/<case_id>/note", methods=["POST"])
-def create_case_note(case_id: str):
- """
- Endpoint to create a note for a specific case in DFIR IRIS.
-
- Args:
- case_id (str): The ID of the case to create a note for.
-
- Returns:
- json: A JSON response containing the result of the note creation operation.
- """
- note_title = request.json["note_title"]
- note_content = request.json["note_content"]
- case_service = CasesService()
- notes_service = NotesService()
- created_note = notes_service.create_case_note(case_id=case_id, note_title=note_title, note_content=note_content)
- return created_note
+69
+70
+71
+72
+73
+74
+75
+76
| @bp.route("/dfir_iris/cases/<case_id>/note", methods=["POST"])
+def create_case_note(case_id: str):
+ """
+ Endpoint to create a note for a specific case in DFIR IRIS.
+
+ Args:
+ case_id (str): The ID of the case to create a note for.
+
+ Returns:
+ json: A JSON response containing the result of the note creation operation.
+ """
+ note_title = request.json["note_title"]
+ note_content = request.json["note_content"]
+ notes_service = NotesService()
+ created_note = notes_service.create_case_note(
+ cid=case_id,
+ note_title=note_title,
+ note_content=note_content,
+ )
+ return created_note
|
@@ -1127,27 +1133,27 @@
Source code in app\routes\dfir_iris.py
- | @bp.route("/dfir_iris/alerts", methods=["GET"])
-def get_alerts():
- """
- Endpoint to retrieve all alerts from DFIR IRIS.
-
- Returns:
- json: A JSON response containing the list of alerts.
- """
- service = AlertsService()
- alerts = service.list_alerts()
- return alerts
+ | @bp.route("/dfir_iris/alerts", methods=["GET"])
+def get_alerts():
+ """
+ Endpoint to retrieve all alerts from DFIR IRIS.
+
+ Returns:
+ json: A JSON response containing the list of alerts.
+ """
+ service = AlertsService()
+ alerts = service.list_alerts()
+ return alerts
|
@@ -1219,10 +1225,7 @@
Source code in app\routes\dfir_iris.py
- 22
-23
-24
-25
+ | @bp.route("/dfir_iris/cases/<case_id>", methods=["GET"])
-def get_case(case_id: str):
- """
- Endpoint to retrieve a specific case from DFIR IRIS.
-
- Args:
- case_id (str): The ID of the case to retrieve.
+35
+36
+37
+38
| @bp.route("/dfir_iris/cases/<case_id>", methods=["GET"])
+def get_case(case_id: str):
+ """
+ Endpoint to retrieve a specific case from DFIR IRIS.
- Returns:
- json: A JSON response containing the case data.
- """
- service = CasesService()
- case = service.get_case(case_id=case_id)
- return case
+ Args:
+ case_id (str): The ID of the case to retrieve.
+
+ Returns:
+ json: A JSON response containing the case data.
+ """
+ service = CasesService()
+ case = service.get_case(case_id=case_id)
+ return case
|
@@ -1317,33 +1323,33 @@
Source code in app\routes\dfir_iris.py
- 71
-72
-73
-74
-75
-76
-77
-78
-79
+ | @bp.route("/dfir_iris/cases/<case_id>/assets", methods=["GET"])
-def get_case_assets(case_id: str):
- """
- Endpoint to retrieve assets of a specific case from DFIR IRIS.
-
- Args:
- case_id (str): The ID of the case to retrieve assets from.
-
- Returns:
- json: A JSON response containing the list of assets for the case.
- """
- asset_service = AssetsService()
- assets = asset_service.get_case_assets(case_id=case_id)
- return assets
+84
+85
+86
+87
+88
+89
+90
+91
+92
| @bp.route("/dfir_iris/cases/<case_id>/assets", methods=["GET"])
+def get_case_assets(case_id: str):
+ """
+ Endpoint to retrieve assets of a specific case from DFIR IRIS.
+
+ Args:
+ case_id (str): The ID of the case to retrieve assets from.
+
+ Returns:
+ json: A JSON response containing the list of assets for the case.
+ """
+ asset_service = AssetsService()
+ assets = asset_service.get_case_assets(cid=case_id)
+ return assets
|
@@ -1415,11 +1421,7 @@
Source code in app\routes\dfir_iris.py
- 37
-38
-39
-40
-41
+ | @bp.route("/dfir_iris/cases/<case_id>/notes", methods=["GET"])
-def get_case_notes(case_id: str):
- """
- Endpoint to retrieve notes of a specific case from DFIR IRIS.
-
- Args:
- case_id (str): The ID of the case to retrieve notes from.
-
- Returns:
- json: A JSON response containing the list of notes for the case.
- """
- case_service = CasesService()
- notes_service = NotesService()
- notes = notes_service.get_case_notes(case_id=case_id)
- return notes
+51
+52
+53
+54
| @bp.route("/dfir_iris/cases/<case_id>/notes", methods=["GET"])
+def get_case_notes(case_id: int):
+ """
+ Endpoint to retrieve notes of a specific case from DFIR IRIS.
+
+ Args:
+ case_id (str): The ID of the case to retrieve notes from.
+
+ Returns:
+ json: A JSON response containing the list of notes for the case.
+ """
+ notes_service = NotesService()
+ notes = notes_service.get_case_notes(search_term="%", cid=case_id)
+ return notes
|
@@ -1487,9 +1491,7 @@
Source code in app\routes\dfir_iris.py
- 10
-11
-12
+ | @bp.route("/dfir_iris/cases", methods=["GET"])
-def get_cases():
- """
- Endpoint to retrieve all the cases from DFIR IRIS.
-
- Returns:
- json: A JSON response containing the list of cases.
- """
- service = CasesService()
- cases = service.list_cases()
- return cases
+20
+21
+22
| @bp.route("/dfir_iris/cases", methods=["GET"])
+def get_cases():
+ """
+ Endpoint to retrieve all the cases from DFIR IRIS.
+
+ Returns:
+ json: A JSON response containing the list of cases.
+ """
+ service = CasesService()
+ cases = service.list_cases()
+ return cases
|
diff --git a/backend/site/connectors/index.html b/backend/site/connectors/index.html
index 7648d521..c008f1cb 100644
--- a/backend/site/connectors/index.html
+++ b/backend/site/connectors/index.html
@@ -3361,9 +3361,7 @@
Source code in app\routes\connectors.py
- 34
-35
-36
+ | @bp.route("/connectors/<id>", methods=["GET"])
-def get_connector_details(id: str):
- """
- Endpoint to retrieve the details of a connector.
-
- Args:
- id (str): The ID of the connector to retrieve.
-
- Returns:
- json: A JSON response containing the details of the connector.
- """
- service = ConnectorService(db)
- connector = service.validate_connector_exists(int(id))
-
- if connector["success"]:
- connector = Connectors.query.get(id)
- instantiated_connector = service.process_connector(connector.connector_name)
- return jsonify(instantiated_connector)
- else:
- return jsonify(connector), 404
+53
+54
+55
| @bp.route("/connectors/<id>", methods=["GET"])
+def get_connector_details(id: str):
+ """
+ Endpoint to retrieve the details of a connector.
+
+ Args:
+ id (str): The ID of the connector to retrieve.
+
+ Returns:
+ json: A JSON response containing the details of the connector.
+ """
+ service = ConnectorService(db)
+ connector = service.validate_connector_exists(int(id))
+
+ if connector["success"]:
+ connector = Connectors.query.get(id)
+ instantiated_connector = service.process_connector(connector.connector_name)
+ return jsonify(instantiated_connector)
+ else:
+ return jsonify(connector), 404
|
@@ -3443,8 +3443,7 @@
Source code in app\routes\connectors.py
- 14
-15
+ | @bp.route("/connectors", methods=["GET"])
-def list_connectors_available():
- """
- Endpoint to retrieve all available connectors.
-
- Returns:
- json: A JSON response containing the list of all available connectors along with their connection verification status.
- """
- connectors_service = ConnectorService(db)
- connectors = ConnectorsAvailable.query.all()
- result = connectors_available_schema.dump(connectors)
-
- instantiated_connectors = [
- connectors_service.process_connector(connector["connector_name"])
- for connector in result
- if connectors_service.process_connector(connector["connector_name"])
- ]
-
- return jsonify(instantiated_connectors)
+32
+33
| @bp.route("/connectors", methods=["GET"])
+def list_connectors_available():
+ """
+ Endpoint to retrieve all available connectors.
+
+ Returns:
+ json: A JSON response containing the list of all available connectors along with their connection verification status.
+ """
+ connectors_service = ConnectorService(db)
+ connectors = ConnectorsAvailable.query.all()
+ result = connectors_available_schema.dump(connectors)
+
+ instantiated_connectors = [
+ connectors_service.process_connector(connector["connector_name"])
+ for connector in result
+ if connectors_service.process_connector(connector["connector_name"])
+ ]
+
+ return jsonify(instantiated_connectors)
|
@@ -3560,10 +3560,7 @@
Source code in app\routes\connectors.py
- |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|