Skip to content

Commit

Permalink
Add async support for agent synchronization. Remove the background ta…
Browse files Browse the repository at this point in the history
…sk and run every 15 minutes
  • Loading branch information
taylorwalton committed Mar 2, 2024
1 parent 3975174 commit cb2e4a0
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 18 deletions.
7 changes: 5 additions & 2 deletions backend/app/agents/routes/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from fastapi import Depends
from fastapi import HTTPException
from fastapi import Security
import asyncio
from loguru import logger
from sqlalchemy import delete
from sqlalchemy.ext.asyncio import AsyncSession
Expand Down Expand Up @@ -218,7 +219,7 @@ async def get_agent_by_hostname(
],
)
async def sync_all_agents(
backgroud_tasks: BackgroundTasks,
#backgroud_tasks: BackgroundTasks,
session: AsyncSession = Depends(get_db),
) -> SyncedAgentsResponse:
"""
Expand All @@ -236,7 +237,9 @@ async def sync_all_agents(
"""
logger.info("Syncing agents from Wazuh Manager")
backgroud_tasks.add_task(sync_agents, session)
#backgroud_tasks.add_task(sync_agents, session)
loop = asyncio.get_event_loop()
loop.create_task(sync_agents(session))
return SyncedAgentsResponse(
success=True,
message="Agents synced started successfully",
Expand Down
1 change: 1 addition & 0 deletions backend/app/schedulers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def schedule_enabled_jobs(scheduler):
id=job_metadata.job_id,
replace_existing=True,
)
logger.info(f"Scheduled job: {job_metadata.job_id}")
except ValueError as e:
logger.error(f"Error scheduling job: {e}")

Expand Down
21 changes: 5 additions & 16 deletions backend/app/schedulers/services/agent_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@

from app.db.db_session import get_sync_db_session
from app.schedulers.models.scheduler import JobMetadata
from app.db.db_session import get_db_session
from app.schedulers.utils.universal import scheduler_login
from app.agents.routes.agents import sync_all_agents

load_dotenv()


def agent_sync():
async def agent_sync():
"""
Synchronizes agents by sending a request to the server and updating the job metadata.
Expand All @@ -21,21 +23,8 @@ def agent_sync():
If the token retrieval fails, it prints a failure message. If the job metadata for
'agent_sync' does not exist, it prints a message indicating the absence of the metadata.
"""
# Get the scheduler auth token
headers = scheduler_login()

# Check if the token was successfully retrieved
if headers:
# Your actual task
response = requests.post(
f"http://{os.getenv('SERVER_IP')}:5000/agents/sync",
headers=headers,
)

# Process the response here if needed
print(response.json())
else:
print("Failed to retrieve token")
async with get_db_session() as session:
await sync_all_agents(session=session)

# Use get_sync_db_session to create and manage a synchronous session
with get_sync_db_session() as session:
Expand Down

0 comments on commit cb2e4a0

Please sign in to comment.