diff --git a/backend/app/integrations/sap_siem/routes/sap_siem.py b/backend/app/integrations/sap_siem/routes/sap_siem.py index 3ebfe2ec..93053e83 100644 --- a/backend/app/integrations/sap_siem/routes/sap_siem.py +++ b/backend/app/integrations/sap_siem/routes/sap_siem.py @@ -2,6 +2,7 @@ from fastapi import Depends from loguru import logger from sqlalchemy.ext.asyncio import AsyncSession +from typing import Optional from app.db.db_session import get_db from app.integrations.routes import find_customer_integration @@ -12,6 +13,8 @@ from app.integrations.sap_siem.services.collect import collect_sap_siem from app.integrations.utils.utils import extract_auth_keys from app.integrations.utils.utils import get_customer_integration_response +from app.integrations.sap_siem.services.sap_siem_successful_user_login_after_using_different_ip import sap_siem_successful_user_login_with_different_ip +from app.integrations.sap_siem.services.sap_siem_failed_same_user_from_different_ip import sap_siem_failed_same_user_diff_ip integration_sap_siem_router = APIRouter() @@ -69,3 +72,43 @@ async def collect_sap_siem_route(sap_siem_request: InvokeSapSiemRequest, session await collect_sap_siem(sap_siem_request=collect_sap_siem_request) return InvokeSAPSiemResponse(success=True, message="SAP SIEM Events collected successfully.") + + +@integration_sap_siem_router.post( + "/successful_user_login_with_different_ip", + response_model=InvokeSAPSiemResponse, + description="Rule: Successful user login after using different IP addresses\n\n" + "Period: within 15 minutes\n\n" + "Prerequisite: \n\n" + "- Login attempts from different IP addresses, regardless of login status (at least 2 failed IP addresses)\n\n" + "- Successful login afterwards (from the third successful IP address)\n\n" + "Result: User compressed, IP addresses belong to an attack network", +) +async def invoke_sap_siem_successful_user_login_with_different_ip_route( + threshold: Optional[int] = 0, + time_range: Optional[int] = 15, + session: AsyncSession = Depends(get_db), +): + logger.info("Invoking SAP SIEM integration for successful user login with different IP.") + await sap_siem_successful_user_login_with_different_ip(threshold=threshold, time_range=time_range, session=session) + + return InvokeSAPSiemResponse(success=True, message="SAP SIEM Events collected successfully.") + +@integration_sap_siem_router.post( + "/same_user_failed_login_from_different_ip", + response_model=InvokeSAPSiemResponse, + description="Rule: Same user from different IP addresses\n\n" + "Period: within 10 minutes\n\n" + "Prerequisite: \n\n" + "- At least 3 failed login attempts with the same user name from 3 different IP addresses\n\n" + "Result: User compressed, IP addresses belong to an attack network", +) +async def invoke_sap_siem_same_user_failed_login_from_different_ip_route( + threshold: Optional[int] = 0, + time_range: Optional[int] = 10, + session: AsyncSession = Depends(get_db), +): + logger.info("Invoking SAP SIEM integration for same user failed login from different IP.") + await sap_siem_failed_same_user_diff_ip(threshold=threshold, time_range=time_range, session=session) + + return InvokeSAPSiemResponse(success=True, message="SAP SIEM Events collected successfully.") diff --git a/backend/app/integrations/sap_siem/schema/sap_siem.py b/backend/app/integrations/sap_siem/schema/sap_siem.py index 650f2806..fadadf90 100644 --- a/backend/app/integrations/sap_siem/schema/sap_siem.py +++ b/backend/app/integrations/sap_siem/schema/sap_siem.py @@ -204,6 +204,15 @@ class Result(BaseModel): "False", description="Whether the event has been analyzed for multiple logins", ) + event_analyzed_success_login_diff_ip: Optional[str] = Field( + "False", + description="Whether the event has been analyzed for successful login from different IP", + ) + event_analyzed_same_user_failed_diff_ip: Optional[str] = Field( + "False", + description="Whether the event has been analyzed for same user failed login from different IP", + ) + class SapSiemResponseBody(BaseModel): diff --git a/backend/app/integrations/sap_siem/services/sap_siem_failed_same_user_from_different_ip.py b/backend/app/integrations/sap_siem/services/sap_siem_failed_same_user_from_different_ip.py new file mode 100644 index 00000000..e8701597 --- /dev/null +++ b/backend/app/integrations/sap_siem/services/sap_siem_failed_same_user_from_different_ip.py @@ -0,0 +1,573 @@ +from collections import defaultdict +from datetime import datetime, timedelta +from dateutil.tz import tzutc +from typing import List +from typing import Set + +from fastapi import HTTPException +from loguru import logger +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select + +from app.connectors.dfir_iris.utils.universal import fetch_and_validate_data +from app.connectors.dfir_iris.utils.universal import initialize_client_and_case +from app.connectors.wazuh_indexer.utils.universal import create_wazuh_indexer_client +from app.integrations.sap_siem.models.sap_siem import SapSiemMultipleLogins +from app.integrations.sap_siem.schema.sap_siem import AddAssetModel +from app.integrations.sap_siem.schema.sap_siem import CaseResponse +from app.integrations.sap_siem.schema.sap_siem import InvokeSAPSiemResponse +from app.integrations.sap_siem.schema.sap_siem import IrisCasePayload +from app.integrations.sap_siem.schema.sap_siem import SapSiemWazuhIndexerResponse +from app.integrations.sap_siem.schema.sap_siem import SuspiciousLogin +from app.integrations.utils.alerts import send_to_shuffle +from app.integrations.utils.schema import ShufflePayload +from app.utils import get_customer_alert_settings + +# Global set to keep track of IPs that have already been checked +checked_ips = set() + + +async def handle_common_suspicious_login_tasks( + suspicious_login, + unique_instances, + case_ids, + create_case_fn, + session: AsyncSession, +): + """ + Handles common tasks for suspicious logins. + + Args: + suspicious_login: The suspicious login object. + unique_instances: List of unique instances. + case_ids: List of case IDs. + create_case_fn: Function to create a case. + session: The async session. + + Returns: + None + """ + case = await create_case_fn(suspicious_login, session) + case_ids.append(case.data.case_id) + user_activity = await collect_user_activity(suspicious_login) + await handle_user_activity(user_activity, unique_instances, case.data.case_id) + await mark_as_checked(suspicious_login) + alert_source_link = (await get_customer_alert_settings(suspicious_login.customer_code, session=session)).shuffle_endpoint + await send_to_shuffle( + ShufflePayload( + alert_id=case.data.case_id, + customer=suspicious_login.customer_code, + customer_code=suspicious_login.customer_code, + alert_source_link=f"{alert_source_link}/case?cid={case.data.case_id}", + rule_description=f"{case.data.case_name}", + hostname=suspicious_login.ip, + ), + session=session, + ) + + +async def handle_suspicious_login_multiple(suspicious_login, unique_instances, case_ids, session: AsyncSession): + """ + Handles suspicious login events with multiple logins. + + Args: + suspicious_login: The suspicious login event. + unique_instances: List of unique instances of the suspicious login event. + case_ids: List of case IDs associated with the suspicious login event. + session: The database session. + + Returns: + None + """ + await handle_common_suspicious_login_tasks( + suspicious_login, + unique_instances, + case_ids, + create_iris_case_multiple, + session, + ) + await update_event_analyzed_multiple_logins_flag(suspicious_login.id, suspicious_login.index) + + +async def update_event_analyzed_multiple_logins_flag(id: str, index: str): + """ + Update the event_analyzed_multiple_logins flag in the Elasticsearch document to True. + + :param suspicious_login: The suspicious login to update + + :return: None + """ + es_client = await create_wazuh_indexer_client("Wazuh-Indexer") + try: + es_client.update( + index=index, + id=id, + body={ + "doc": { + "event_analyzed_same_user_failed_diff_ip": "True", + }, + }, + ) + logger.info(f"Updated event_analyzed_same_user_failed_diff_ip flag for suspicious login: {id}") + except Exception as e: + logger.error( + f"Failed to update case created flag {e}", + ) + # Attempt to remove read-only block + try: + es_client.indices.put_settings( + index=index, + body={"index.blocks.write": None}, + ) + logger.info( + f"Removed read-only block from index {index}. Retrying update.", + ) + + # Retry the update operation + es_client.update( + index=index, + id=id, + body={"doc": {"event_analyzed_same_user_failed_diff_ip": "True"}}, + ) + logger.info( + f"Added event_analyzed_same_user_failed_diff_ip flag to index {index} for suspicious login: {id}", + ) + + # Reenable the write block + es_client.indices.put_settings( + index=index, + body={"index.blocks.write": True}, + ) + except Exception as e2: + logger.error( + f"Failed to remove read-only block from index {index}: {e2}", + ) + return False + + +async def mark_as_checked(suspicious_login): + """ + Marks a suspicious login as checked by adding it to the set of checked IPs. + + Args: + suspicious_login (Login): The suspicious login object to mark as checked. + + Returns: + None + """ + checked_ips.add((suspicious_login.loginID, suspicious_login.ip)) + + +async def handle_user_activity(user_activity: SapSiemWazuhIndexerResponse, unique_instances, case_id): + """ + Handles user activity by processing each hit in the user_activity and performing the following steps: + 1. Extracts relevant information from the hit. + 2. Checks if the current activity is already present in the unique_instances set. + 3. If not present, adds the user activity to the IRIS case. + 4. Creates an asset payload using the current activity. + 5. Updates the case with the asset payload. + 6. Updates the event analyzed multiple logins flag for the hit. + 7. Adds the current activity to the unique_instances set. + + Parameters: + - user_activity (SapSiemWazuhIndexerResponse): The user activity to be processed. + - unique_instances (set): A set containing unique instances of user activity. + - case_id (str): The ID of the IRIS case. + + Returns: + None + """ + for hit in user_activity.hits.hits: + current_activity = { + "loginID": hit.source.params_loginID, + "ip": hit.source.ip, + "country": hit.source.httpReq_country, + "errMessage": hit.source.errMessage, + "event_timestamp": hit.source.event_timestamp, + "customer_code": hit.source.customer_code, + "errDetails": hit.source.errDetails, + } + current_activity_frozenset = frozenset(current_activity.items()) + if current_activity_frozenset not in unique_instances: + logger.info(f"Adding user activity to IRIS case: {current_activity}") + current_asset = SuspiciousLogin(**current_activity) + asset_payload = create_asset_payload(asset=current_asset) + logger.info(f"Asset Payload: {asset_payload}") + await update_case_with_asset(case_id, asset_payload) + await update_event_analyzed_multiple_logins_flag(hit.id, hit.index) + unique_instances.add(current_activity_frozenset) + + +def create_asset_payload(asset: SuspiciousLogin): + """ + Create a payload for adding an asset based on a SuspiciousLogin object. + + Args: + asset (SuspiciousLogin): The SuspiciousLogin object containing the asset details. + + Returns: + AddAssetModel: The payload for adding the asset. + + """ + if asset.errMessage == "OK": + return AddAssetModel( + name=asset.loginID, + ip=asset.ip, + description=f"Country: {asset.country}\n\nMessage: {asset.errDetails}\n\nTimestamp: {asset.event_timestamp}", + asset_type=1, + compromise_status=1, + analysis_status=2, + ) + return AddAssetModel( + name=asset.loginID, + ip=asset.ip, + description=f"Country: {asset.country}\n\nMessage: {asset.errDetails}\n\nTimestamp: {asset.event_timestamp}", + asset_type=1, + analysis_status=2, + ) + + +async def update_case_with_asset(case_id: str, asset_payload): + """ + Update the case with the asset information. + + :param case_id: The ID of the case to update + :param asset_payload: The payload to update the case with + + :return: None + """ + logger.info(f"Updating IRIS case {case_id} with asset: {asset_payload}") + client, case_client = await initialize_client_and_case("DFIR-IRIS") + return await fetch_and_validate_data( + client, + case_client.add_asset, + cid=case_id, + **asset_payload.to_dict(), + ) + + +async def create_iris_case_multiple(suspicious_login: SuspiciousLogin, session: AsyncSession) -> CaseResponse: + """ + Creates an IRIS case for multiple logins with the same IP address. + + Args: + suspicious_login (SuspiciousLogin): The suspicious login information. + session (AsyncSession): The async session for database operations. + + Returns: + CaseResponse: The response containing the created case information. + """ + logger.info(f"Creating IRIS case same IP with multiple users: {suspicious_login}") + case_name = ( + f"Log Source: {suspicious_login.logSource} SAP SIEM. " f"User {suspicious_login.loginID} had multiple login failures from different IP addresses." + ) + + case_description = ( + f"Log Source: {suspicious_login.logSource}\n\n" + f"IP Address: {suspicious_login.ip}\n\n" + f"Country: {suspicious_login.country}\n\n" + f"Timestamp: {suspicious_login.event_timestamp}" + ) + + case_customer = (await get_customer_alert_settings(suspicious_login.customer_code, session=session)).iris_customer_id + + payload = IrisCasePayload( + case_name=case_name, + case_description=case_description, + case_customer=case_customer, + case_classification=18, + soc_id="1", + create_customer=False, + ) + client, case_client = await initialize_client_and_case("DFIR-IRIS") + result = await fetch_and_validate_data( + client, + case_client.add_case, + **payload.to_dict(), + ) + await update_event_analyzed_multiple_logins_flag(suspicious_login.id, suspicious_login.index) + + return CaseResponse(**result) + + +async def collect_user_activity(suspicious_logins: SuspiciousLogin) -> SapSiemWazuhIndexerResponse: + """ + Collect the IP addresses of the suspicious logins and query the database for all activity from those IP addresses. + Collects a max of 1000 records. + + :param suspicious_logins: A list of suspicious logins + + :return: List of the user Activity collected from the sap_siem table + """ + es_client = await create_wazuh_indexer_client("Wazuh-Indexer") + results = es_client.search( + #index="sap_siem_*", + index="new-integrations*", + body={ + "size": 1000, + "query": {"bool": {"must": [{"term": {"params_loginID": suspicious_logins.loginID}}]}}, + }, + ) + return SapSiemWazuhIndexerResponse(**results) + + +async def get_initial_search_results(es_client): + """ + Retrieves the initial search results from Elasticsearch. + + Args: + es_client (Elasticsearch): The Elasticsearch client. + + Returns: + dict: The search results. + """ + return es_client.search( + #index="sap_siem_*", + index="new-integrations*", + body={ + "size": 1000, + "query": {"bool": {"must": [{"term": {"event_analyzed_same_user_failed_diff_ip": "False"}}]}}, + "sort": [{"event_timestamp": {"order": "asc"}}], + }, + scroll="1m", + ) + + +async def get_next_batch_of_results(es_client, scroll_id): + """ + Retrieves the next batch of results using the provided Elasticsearch client and scroll ID. + + Args: + es_client (Elasticsearch): The Elasticsearch client. + scroll_id (str): The scroll ID to retrieve the next batch of results. + + Returns: + dict: The next batch of results. + """ + return es_client.scroll(scroll_id=scroll_id, scroll="1m") + + +async def process_hits(hits, login_id_to_ips, suspicious_activity, time_range): + """ + Process the hits received from SAP SIEM and update the login ID to IPs mapping and suspicious activity. + + Args: + hits (list): List of hits received from SAP SIEM. + login_id_to_ips (dict): Dictionary mapping login IDs to IP addresses. + suspicious_activity (dict): Dictionary mapping login IDs to a list of suspicious login objects. + + Returns: + None + """ + login_id_to_ips = defaultdict(lambda: defaultdict(list)) + + for hit in hits: + # Convert loginID to lowercase before comparing + login_id = hit.source.params_loginID.lower() + ip = hit.source.ip + errCode = hit.source.errCode + + # Ignore loginID if it does not contain a '@' + if '@' not in login_id: + logger.info(f"Ignoring loginID {login_id} as it does not contain a '@'") + continue + + # Parse the event timestamp + event_timestamp = datetime.strptime(hit.source.event_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ") + + # Add the timestamp and errCode to the list for this loginID for this IP + login_id_to_ips[login_id][ip].append((event_timestamp, errCode)) + + logger.info(f"Added timestamp {event_timestamp} for IP {ip} and loginID {login_id}") + + # Check if there are at least 3 different IPs for the same loginID within the last 10 minutes + ips_in_last_10_minutes = set() + for other_ip, other_info in login_id_to_ips[login_id].items(): + for other_timestamp, other_errCode in other_info: + if event_timestamp - timedelta(minutes=time_range) <= other_timestamp <= event_timestamp: + ips_in_last_10_minutes.add((other_ip, other_errCode)) + + # If there are at least 3 different failed IPs, log the suspicious activity + failed_ips = [ip for ip, errCode in ips_in_last_10_minutes if errCode != "0"] + logger.info(f"Failed IPs: {failed_ips}") + if len(failed_ips) >= 3: + logger.info(f"Found suspicious login: {login_id} with IP: {ip} and errCode: {errCode}") + suspicious_login = SuspiciousLogin( + _index=hit.index, + _id=hit.id, + customer_code=hit.source.customer_code, + logSource=hit.source.logSource, + loginID=hit.source.params_loginID, + country=hit.source.httpReq_country, + ip=hit.source.ip, + event_timestamp=hit.source.event_timestamp, + errMessage=hit.source.errMessage, + errDetails=hit.source.errDetails, + ) + suspicious_activity[login_id].append(suspicious_login) + logger.info(f"Added suspicious login: {suspicious_login}") + +async def check_multiple_successful_logins_by_ip(threshold: int, time_range: int) -> List[SuspiciousLogin]: + """ + Checks for multiple successful logins by IP address. + + Args: + threshold (int): The minimum number of logins required to be considered suspicious. + + Returns: + List[SuspiciousLogin]: A list of suspicious login objects. + """ + ip_to_login_ids = defaultdict(set) + suspicious_activity = defaultdict(list) + + es_client = await create_wazuh_indexer_client("Wazuh-Indexer") + scroll_id = None + + while True: + if scroll_id is None: + results = await get_initial_search_results(es_client) + else: + results = await get_next_batch_of_results(es_client, scroll_id) + + if not results["hits"]["hits"]: + break + + results = SapSiemWazuhIndexerResponse(**results) + await process_hits(results.hits.hits, ip_to_login_ids, suspicious_activity, time_range) + + scroll_id = results.scroll_id + + # Clear the scroll when you're done to free up resources + if scroll_id is not None: + es_client.clear_scroll(scroll_id=scroll_id) + + logger.info(f"Suspicious activity: {suspicious_activity}") + suspicious_activity = {ip: results for ip, results in suspicious_activity.items()} + + return [login for sublist in suspicious_activity.values() for login in sublist] + + +async def get_suspicious_ips(threshold: int, time_range: int) -> List[SuspiciousLogin]: + """ + Retrieves a list of suspicious login attempts based on the specified threshold. + + Args: + threshold (int): The number of successful logins from the same IP address that is considered suspicious. + + Returns: + List[SuspiciousLogin]: A list of SuspiciousLogin objects representing the suspicious login attempts. + """ + return await check_multiple_successful_logins_by_ip(threshold=threshold, time_range=time_range) + + +async def get_existing_database_record(session: AsyncSession, ip: str) -> SapSiemMultipleLogins: + """ + Retrieves an existing database record for the given IP address. + + Args: + session (AsyncSession): The async session object for database operations. + ip (str): The IP address to search for. + + Returns: + SapSiemMultipleLogins: The database record matching the IP address, or None if not found. + """ + result = await session.execute(select(SapSiemMultipleLogins).where(SapSiemMultipleLogins.ip == ip)) + return result.scalar_one_or_none() if result is not None else None + + +def update_existing_database_record(existing_case: SapSiemMultipleLogins, new_login_ids: Set[str]) -> None: + """ + Update the existing database record for a SapSiemMultipleLogins case with new login IDs. + + Args: + existing_case (SapSiemMultipleLogins): The existing database record to be updated. + new_login_ids (Set[str]): The new login IDs to be added to the existing record. + + Returns: + None + """ + existing_loginIDs = set(existing_case.associated_loginIDs.split(",")) + if not new_login_ids.issubset(existing_loginIDs): + updated_login_ids = existing_loginIDs.union(new_login_ids) + existing_case.associated_loginIDs = ",".join(updated_login_ids) + existing_case.last_case_created_timestamp = datetime.now() + + +def create_new_database_record(ip: str, new_login_ids: Set[str]) -> SapSiemMultipleLogins: + """ + Creates a new database record for SAP SIEM multiple logins. + + Args: + ip (str): The IP address associated with the multiple logins. + new_login_ids (Set[str]): The set of new login IDs. + + Returns: + SapSiemMultipleLogins: The newly created database record. + """ + return SapSiemMultipleLogins( + ip=ip, + last_case_created_timestamp=datetime.now(), + associated_loginIDs=",".join(new_login_ids), + ) + + +async def sap_siem_failed_same_user_diff_ip(threshold: int, time_range: int, session: AsyncSession) -> InvokeSAPSiemResponse: + """ + Finds same IP with multiple users and handles suspicious logins. + + Args: + threshold (int): The threshold value for determining suspicious logins. + session (AsyncSession): The database session. + + Returns: + InvokeSAPSiemResponse: The response indicating the success of the operation. + """ + logger.info("Finding same user from different IP addresses") + + suspicious_ips = await get_suspicious_ips(threshold, time_range) + logger.info(f"Suspicious IPs: {suspicious_ips}") + + unique_instances = set() + case_ids = [] + # Dictionary to aggregate suspicious logins by IP + aggregated_logins_by_ip = defaultdict(list) + + for suspicious_login in suspicious_ips: + aggregated_logins_by_ip[suspicious_login.ip].append(suspicious_login) + + for ip, associated_logins in aggregated_logins_by_ip.items(): + logger.info(f"IP: {ip}, Associated Logins: {associated_logins}") + if session is not None: + existing_case = await get_existing_database_record(session, ip) + + new_login_ids = {login.loginID for login in associated_logins} + if existing_case: + logger.info(f"Updating existing database record: {existing_case}") + update_existing_database_record(existing_case, new_login_ids) + else: + logger.info(f"Creating new case for IP: {ip}") + new_case = create_new_database_record(ip, new_login_ids) + session.add(new_case) + + # Create a single new IRIS case for this IP + # Modify this to include information from all associated_logins + await handle_suspicious_login_multiple( + associated_logins[0], + unique_instances, + case_ids, + session=session, + ) + else: + raise HTTPException( + status_code=500, + detail="Failed to create IRIS case", + ) + await session.commit() + + # Clear the global set + checked_ips.clear() + + return InvokeSAPSiemResponse( + success=True, + message="SAP SIEM multiple logins invoked.", + ) diff --git a/backend/app/integrations/sap_siem/services/sap_siem_successful_user_login_after_using_different_ip.py b/backend/app/integrations/sap_siem/services/sap_siem_successful_user_login_after_using_different_ip.py new file mode 100644 index 00000000..7c23b965 --- /dev/null +++ b/backend/app/integrations/sap_siem/services/sap_siem_successful_user_login_after_using_different_ip.py @@ -0,0 +1,573 @@ +from collections import defaultdict +from datetime import datetime, timedelta +from dateutil.tz import tzutc +from typing import List +from typing import Set + +from fastapi import HTTPException +from loguru import logger +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select + +from app.connectors.dfir_iris.utils.universal import fetch_and_validate_data +from app.connectors.dfir_iris.utils.universal import initialize_client_and_case +from app.connectors.wazuh_indexer.utils.universal import create_wazuh_indexer_client +from app.integrations.sap_siem.models.sap_siem import SapSiemMultipleLogins +from app.integrations.sap_siem.schema.sap_siem import AddAssetModel +from app.integrations.sap_siem.schema.sap_siem import CaseResponse +from app.integrations.sap_siem.schema.sap_siem import InvokeSAPSiemResponse +from app.integrations.sap_siem.schema.sap_siem import IrisCasePayload +from app.integrations.sap_siem.schema.sap_siem import SapSiemWazuhIndexerResponse +from app.integrations.sap_siem.schema.sap_siem import SuspiciousLogin +from app.integrations.utils.alerts import send_to_shuffle +from app.integrations.utils.schema import ShufflePayload +from app.utils import get_customer_alert_settings + +# Global set to keep track of IPs that have already been checked +checked_ips = set() + + +async def handle_common_suspicious_login_tasks( + suspicious_login, + unique_instances, + case_ids, + create_case_fn, + session: AsyncSession, +): + """ + Handles common tasks for suspicious logins. + + Args: + suspicious_login: The suspicious login object. + unique_instances: List of unique instances. + case_ids: List of case IDs. + create_case_fn: Function to create a case. + session: The async session. + + Returns: + None + """ + case = await create_case_fn(suspicious_login, session) + case_ids.append(case.data.case_id) + user_activity = await collect_user_activity(suspicious_login) + await handle_user_activity(user_activity, unique_instances, case.data.case_id) + await mark_as_checked(suspicious_login) + alert_source_link = (await get_customer_alert_settings(suspicious_login.customer_code, session=session)).shuffle_endpoint + await send_to_shuffle( + ShufflePayload( + alert_id=case.data.case_id, + customer=suspicious_login.customer_code, + customer_code=suspicious_login.customer_code, + alert_source_link=f"{alert_source_link}/case?cid={case.data.case_id}", + rule_description=f"{case.data.case_name}", + hostname=suspicious_login.ip, + ), + session=session, + ) + + +async def handle_suspicious_login_multiple(suspicious_login, unique_instances, case_ids, session: AsyncSession): + """ + Handles suspicious login events with multiple logins. + + Args: + suspicious_login: The suspicious login event. + unique_instances: List of unique instances of the suspicious login event. + case_ids: List of case IDs associated with the suspicious login event. + session: The database session. + + Returns: + None + """ + await handle_common_suspicious_login_tasks( + suspicious_login, + unique_instances, + case_ids, + create_iris_case_multiple, + session, + ) + await update_event_analyzed_multiple_logins_flag(suspicious_login.id, suspicious_login.index) + + +async def update_event_analyzed_multiple_logins_flag(id: str, index: str): + """ + Update the event_analyzed_multiple_logins flag in the Elasticsearch document to True. + + :param suspicious_login: The suspicious login to update + + :return: None + """ + es_client = await create_wazuh_indexer_client("Wazuh-Indexer") + try: + es_client.update( + index=index, + id=id, + body={ + "doc": { + "event_analyzed_success_login_diff_ip": "True", + }, + }, + ) + logger.info(f"Updated event_analyzed_success_login_diff_ip flag for suspicious login: {id}") + except Exception as e: + logger.error( + f"Failed to update case created flag {e}", + ) + # Attempt to remove read-only block + try: + es_client.indices.put_settings( + index=index, + body={"index.blocks.write": None}, + ) + logger.info( + f"Removed read-only block from index {index}. Retrying update.", + ) + + # Retry the update operation + es_client.update( + index=index, + id=id, + body={"doc": {"event_analyzed_multiple_logins": "True"}}, + ) + logger.info( + f"Added event_analyzed_multiple_logins flag to index {index} for suspicious login: {id}", + ) + + # Reenable the write block + es_client.indices.put_settings( + index=index, + body={"index.blocks.write": True}, + ) + except Exception as e2: + logger.error( + f"Failed to remove read-only block from index {index}: {e2}", + ) + return False + + +async def mark_as_checked(suspicious_login): + """ + Marks a suspicious login as checked by adding it to the set of checked IPs. + + Args: + suspicious_login (Login): The suspicious login object to mark as checked. + + Returns: + None + """ + checked_ips.add((suspicious_login.loginID, suspicious_login.ip)) + + +async def handle_user_activity(user_activity: SapSiemWazuhIndexerResponse, unique_instances, case_id): + """ + Handles user activity by processing each hit in the user_activity and performing the following steps: + 1. Extracts relevant information from the hit. + 2. Checks if the current activity is already present in the unique_instances set. + 3. If not present, adds the user activity to the IRIS case. + 4. Creates an asset payload using the current activity. + 5. Updates the case with the asset payload. + 6. Updates the event analyzed multiple logins flag for the hit. + 7. Adds the current activity to the unique_instances set. + + Parameters: + - user_activity (SapSiemWazuhIndexerResponse): The user activity to be processed. + - unique_instances (set): A set containing unique instances of user activity. + - case_id (str): The ID of the IRIS case. + + Returns: + None + """ + for hit in user_activity.hits.hits: + current_activity = { + "loginID": hit.source.params_loginID, + "ip": hit.source.ip, + "country": hit.source.httpReq_country, + "errMessage": hit.source.errMessage, + "event_timestamp": hit.source.event_timestamp, + "customer_code": hit.source.customer_code, + "errDetails": hit.source.errDetails, + } + current_activity_frozenset = frozenset(current_activity.items()) + if current_activity_frozenset not in unique_instances: + logger.info(f"Adding user activity to IRIS case: {current_activity}") + current_asset = SuspiciousLogin(**current_activity) + asset_payload = create_asset_payload(asset=current_asset) + logger.info(f"Asset Payload: {asset_payload}") + await update_case_with_asset(case_id, asset_payload) + await update_event_analyzed_multiple_logins_flag(hit.id, hit.index) + unique_instances.add(current_activity_frozenset) + + +def create_asset_payload(asset: SuspiciousLogin): + """ + Create a payload for adding an asset based on a SuspiciousLogin object. + + Args: + asset (SuspiciousLogin): The SuspiciousLogin object containing the asset details. + + Returns: + AddAssetModel: The payload for adding the asset. + + """ + if asset.errMessage == "OK": + return AddAssetModel( + name=asset.loginID, + ip=asset.ip, + description=f"Country: {asset.country}\n\nMessage: {asset.errDetails}\n\nTimestamp: {asset.event_timestamp}", + asset_type=1, + compromise_status=1, + analysis_status=2, + ) + return AddAssetModel( + name=asset.loginID, + ip=asset.ip, + description=f"Country: {asset.country}\n\nMessage: {asset.errDetails}\n\nTimestamp: {asset.event_timestamp}", + asset_type=1, + analysis_status=2, + ) + + +async def update_case_with_asset(case_id: str, asset_payload): + """ + Update the case with the asset information. + + :param case_id: The ID of the case to update + :param asset_payload: The payload to update the case with + + :return: None + """ + logger.info(f"Updating IRIS case {case_id} with asset: {asset_payload}") + client, case_client = await initialize_client_and_case("DFIR-IRIS") + return await fetch_and_validate_data( + client, + case_client.add_asset, + cid=case_id, + **asset_payload.to_dict(), + ) + + +async def create_iris_case_multiple(suspicious_login: SuspiciousLogin, session: AsyncSession) -> CaseResponse: + """ + Creates an IRIS case for multiple logins with the same IP address. + + Args: + suspicious_login (SuspiciousLogin): The suspicious login information. + session (AsyncSession): The async session for database operations. + + Returns: + CaseResponse: The response containing the created case information. + """ + logger.info(f"Creating IRIS case same IP with multiple users: {suspicious_login}") + case_name = ( + f"Log Source: {suspicious_login.logSource} SAP SIEM. " f"User {suspicious_login.loginID} had a successful login after failed attempts from different IPs." + ) + + case_description = ( + f"Log Source: {suspicious_login.logSource}\n\n" + f"IP Address: {suspicious_login.ip}\n\n" + f"Country: {suspicious_login.country}\n\n" + f"Timestamp: {suspicious_login.event_timestamp}" + ) + + case_customer = (await get_customer_alert_settings(suspicious_login.customer_code, session=session)).iris_customer_id + + payload = IrisCasePayload( + case_name=case_name, + case_description=case_description, + case_customer=case_customer, + case_classification=18, + soc_id="1", + create_customer=False, + ) + client, case_client = await initialize_client_and_case("DFIR-IRIS") + result = await fetch_and_validate_data( + client, + case_client.add_case, + **payload.to_dict(), + ) + await update_event_analyzed_multiple_logins_flag(suspicious_login.id, suspicious_login.index) + + return CaseResponse(**result) + + +async def collect_user_activity(suspicious_logins: SuspiciousLogin) -> SapSiemWazuhIndexerResponse: + """ + Collect the IP addresses of the suspicious logins and query the database for all activity from those IP addresses. + Collects a max of 1000 records. + + :param suspicious_logins: A list of suspicious logins + + :return: List of the user Activity collected from the sap_siem table + """ + es_client = await create_wazuh_indexer_client("Wazuh-Indexer") + results = es_client.search( + #index="sap_siem_*", + index="new-integrations*", + body={ + "size": 1000, + "query": {"bool": {"must": [{"term": {"params_loginID": suspicious_logins.loginID}}]}}, + }, + ) + return SapSiemWazuhIndexerResponse(**results) + + +async def get_initial_search_results(es_client): + """ + Retrieves the initial search results from Elasticsearch. + + Args: + es_client (Elasticsearch): The Elasticsearch client. + + Returns: + dict: The search results. + """ + return es_client.search( + #index="sap_siem_*", + index="new-integrations*", + body={ + "size": 1000, + "query": {"bool": {"must": [{"term": {"event_analyzed_success_login_diff_ip": "False"}}]}}, + "sort": [{"event_timestamp": {"order": "asc"}}], + }, + scroll="1m", + ) + + +async def get_next_batch_of_results(es_client, scroll_id): + """ + Retrieves the next batch of results using the provided Elasticsearch client and scroll ID. + + Args: + es_client (Elasticsearch): The Elasticsearch client. + scroll_id (str): The scroll ID to retrieve the next batch of results. + + Returns: + dict: The next batch of results. + """ + return es_client.scroll(scroll_id=scroll_id, scroll="1m") + + +async def process_hits(hits, login_id_to_ips, suspicious_activity, time_range): + """ + Process the hits received from SAP SIEM and update the login ID to IPs mapping and suspicious activity. + + Args: + hits (list): List of hits received from SAP SIEM. + login_id_to_ips (dict): Dictionary mapping login IDs to IP addresses. + suspicious_activity (dict): Dictionary mapping login IDs to a list of suspicious login objects. + + Returns: + None + """ + login_id_to_ips = defaultdict(lambda: defaultdict(list)) + + for hit in hits: + # Convert loginID to lowercase before comparing + login_id = hit.source.params_loginID.lower() + ip = hit.source.ip + errCode = hit.source.errCode + + # Ignore loginID if it does not contain a '@' + if '@' not in login_id: + logger.info(f"Ignoring loginID {login_id} as it does not contain a '@'") + continue + + # Parse the event timestamp + event_timestamp = datetime.strptime(hit.source.event_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ") + + # Add the timestamp and errCode to the list for this loginID for this IP + login_id_to_ips[login_id][ip].append((event_timestamp, errCode)) + + logger.info(f"Added timestamp {event_timestamp} for IP {ip} and loginID {login_id}") + + # Check if there are at least 2 different IPs for the same loginID within the last 15 minutes + ips_in_last_15_minutes = set() + for other_ip, other_info in login_id_to_ips[login_id].items(): + for other_timestamp, other_errCode in other_info: + if event_timestamp - timedelta(minutes=time_range) <= other_timestamp <= event_timestamp: + ips_in_last_15_minutes.add((other_ip, other_errCode)) + + # If the current hit is a successful login and there are at least 2 different failed IPs, log the suspicious activity + failed_ips = [ip for ip, errCode in ips_in_last_15_minutes if errCode != "0"] + logger.info(f"Failed IPs: {failed_ips}") + if errCode == "0" and len(failed_ips) >= 2: + logger.info(f"Found suspicious login: {login_id} with IP: {ip} and errCode: {errCode}") + suspicious_login = SuspiciousLogin( + _index=hit.index, + _id=hit.id, + customer_code=hit.source.customer_code, + logSource=hit.source.logSource, + loginID=hit.source.params_loginID, + country=hit.source.httpReq_country, + ip=hit.source.ip, + event_timestamp=hit.source.event_timestamp, + errMessage=hit.source.errMessage, + errDetails=hit.source.errDetails, + ) + suspicious_activity[login_id].append(suspicious_login) + logger.info(f"Added suspicious login: {suspicious_login}") + +async def check_multiple_successful_logins_by_ip(threshold: int, time_range: int) -> List[SuspiciousLogin]: + """ + Checks for multiple successful logins by IP address. + + Args: + threshold (int): The minimum number of logins required to be considered suspicious. + + Returns: + List[SuspiciousLogin]: A list of suspicious login objects. + """ + ip_to_login_ids = defaultdict(set) + suspicious_activity = defaultdict(list) + + es_client = await create_wazuh_indexer_client("Wazuh-Indexer") + scroll_id = None + + while True: + if scroll_id is None: + results = await get_initial_search_results(es_client) + else: + results = await get_next_batch_of_results(es_client, scroll_id) + + if not results["hits"]["hits"]: + break + + results = SapSiemWazuhIndexerResponse(**results) + await process_hits(results.hits.hits, ip_to_login_ids, suspicious_activity, time_range) + + scroll_id = results.scroll_id + + # Clear the scroll when you're done to free up resources + if scroll_id is not None: + es_client.clear_scroll(scroll_id=scroll_id) + + logger.info(f"Suspicious activity: {suspicious_activity}") + suspicious_activity = {ip: results for ip, results in suspicious_activity.items()} + + return [login for sublist in suspicious_activity.values() for login in sublist] + + +async def get_suspicious_ips(threshold: int, time_range: int) -> List[SuspiciousLogin]: + """ + Retrieves a list of suspicious login attempts based on the specified threshold. + + Args: + threshold (int): The number of successful logins from the same IP address that is considered suspicious. + + Returns: + List[SuspiciousLogin]: A list of SuspiciousLogin objects representing the suspicious login attempts. + """ + return await check_multiple_successful_logins_by_ip(threshold=threshold, time_range=time_range) + + +async def get_existing_database_record(session: AsyncSession, ip: str) -> SapSiemMultipleLogins: + """ + Retrieves an existing database record for the given IP address. + + Args: + session (AsyncSession): The async session object for database operations. + ip (str): The IP address to search for. + + Returns: + SapSiemMultipleLogins: The database record matching the IP address, or None if not found. + """ + result = await session.execute(select(SapSiemMultipleLogins).where(SapSiemMultipleLogins.ip == ip)) + return result.scalar_one_or_none() if result is not None else None + + +def update_existing_database_record(existing_case: SapSiemMultipleLogins, new_login_ids: Set[str]) -> None: + """ + Update the existing database record for a SapSiemMultipleLogins case with new login IDs. + + Args: + existing_case (SapSiemMultipleLogins): The existing database record to be updated. + new_login_ids (Set[str]): The new login IDs to be added to the existing record. + + Returns: + None + """ + existing_loginIDs = set(existing_case.associated_loginIDs.split(",")) + if not new_login_ids.issubset(existing_loginIDs): + updated_login_ids = existing_loginIDs.union(new_login_ids) + existing_case.associated_loginIDs = ",".join(updated_login_ids) + existing_case.last_case_created_timestamp = datetime.now() + + +def create_new_database_record(ip: str, new_login_ids: Set[str]) -> SapSiemMultipleLogins: + """ + Creates a new database record for SAP SIEM multiple logins. + + Args: + ip (str): The IP address associated with the multiple logins. + new_login_ids (Set[str]): The set of new login IDs. + + Returns: + SapSiemMultipleLogins: The newly created database record. + """ + return SapSiemMultipleLogins( + ip=ip, + last_case_created_timestamp=datetime.now(), + associated_loginIDs=",".join(new_login_ids), + ) + + +async def sap_siem_successful_user_login_with_different_ip(threshold: int, time_range: int, session: AsyncSession) -> InvokeSAPSiemResponse: + """ + Finds same IP with multiple users and handles suspicious logins. + + Args: + threshold (int): The threshold value for determining suspicious logins. + session (AsyncSession): The database session. + + Returns: + InvokeSAPSiemResponse: The response indicating the success of the operation. + """ + logger.info("Finding same IP with multiple users") + + suspicious_ips = await get_suspicious_ips(threshold, time_range) + logger.info(f"Suspicious IPs: {suspicious_ips}") + + unique_instances = set() + case_ids = [] + # Dictionary to aggregate suspicious logins by IP + aggregated_logins_by_ip = defaultdict(list) + + for suspicious_login in suspicious_ips: + aggregated_logins_by_ip[suspicious_login.ip].append(suspicious_login) + + for ip, associated_logins in aggregated_logins_by_ip.items(): + logger.info(f"IP: {ip}, Associated Logins: {associated_logins}") + if session is not None: + existing_case = await get_existing_database_record(session, ip) + + new_login_ids = {login.loginID for login in associated_logins} + if existing_case: + logger.info(f"Updating existing database record: {existing_case}") + update_existing_database_record(existing_case, new_login_ids) + else: + logger.info(f"Creating new case for IP: {ip}") + new_case = create_new_database_record(ip, new_login_ids) + session.add(new_case) + + # Create a single new IRIS case for this IP + # Modify this to include information from all associated_logins + await handle_suspicious_login_multiple( + associated_logins[0], + unique_instances, + case_ids, + session=session, + ) + else: + raise HTTPException( + status_code=500, + detail="Failed to create IRIS case", + ) + await session.commit() + + # Clear the global set + checked_ips.clear() + + return InvokeSAPSiemResponse( + success=True, + message="SAP SIEM multiple logins invoked.", + )