Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add facility to sync functions #126

Merged
merged 13 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/nsls2api/api/models/stats_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ class StatsModel(pydantic.BaseModel):
beamline_count: int
commissioning_proposal_count: int
facility_data_health: bool
nsls2_proposals_per_cycle: Optional[list[ProposalsPerCycleModel]] = []
nsls2_proposals_per_cycle: Optional[list[ProposalsPerCycleModel]]
lbms_proposals_per_cycle: Optional[list[ProposalsPerCycleModel]]


class AboutModel(pydantic.BaseModel):
Expand Down
13 changes: 4 additions & 9 deletions src/nsls2api/api/v1/facility_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,12 @@ async def get_facility_cycles(facility: FacilityName):
include_in_schema=True,
)
async def get_proposals_for_cycle(facility: FacilityName, cycle: str):
if facility.name != "nsls2":
# TODO: Add other facilities
return fastapi.responses.JSONResponse(
{"message": f"Not implemented for the {facility.name} facility."},
status_code=501,
)

proposal_list = await proposal_service.fetch_proposals_for_cycle(cycle)
proposal_list = await proposal_service.fetch_proposals_for_cycle(cycle, facility)
if proposal_list is None:
return fastapi.responses.JSONResponse(
{"error": f"No proposals were found for cycle {cycle}"},
{
"error": f"No proposals were found for cycle {cycle} for facility {facility.name}"
},
status_code=404,
)
model = CycleProposalList(
Expand Down
28 changes: 25 additions & 3 deletions src/nsls2api/api/v1/jobs_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
)
from nsls2api.services import background_service

SYNC_ROUTES_IN_SCHEMA = False
# TODO: This flag should be automatically set to be True for development but False for production.
SYNC_ROUTES_IN_SCHEMA = True

router = fastapi.APIRouter(tags=["jobs"])

Expand Down Expand Up @@ -82,9 +83,29 @@ async def sync_proposal_types(facility: FacilityName = FacilityName.nsls2):
dependencies=[Depends(get_current_user)],
include_in_schema=SYNC_ROUTES_IN_SCHEMA,
tags=["sync"],
deprecated=True,
)
async def sync_proposals_for_cycle(request: Request, cycle: str) -> BackgroundJob:
sync_params = JobSyncParameters(cycle=cycle)
async def sync_proposals_for_cycle(
request: Request, cycle: str, facility: FacilityName = FacilityName.nsls2
) -> BackgroundJob:
sync_params = JobSyncParameters(cycle=cycle, facility=facility)
job = await background_service.create_background_job(
JobActions.synchronize_proposals_for_cycle,
sync_parameters=sync_params,
)
return job


@router.get(
"/sync/facility/{facility}/cycle/{cycle}/proposals",
dependencies=[Depends(get_current_user)],
include_in_schema=SYNC_ROUTES_IN_SCHEMA,
tags=["sync"],
)
async def sync_proposals_for_facility_cycle(
request: Request, facility: FacilityName, cycle: str
) -> BackgroundJob:
sync_params = JobSyncParameters(cycle=cycle, facility=facility)
job = await background_service.create_background_job(
JobActions.synchronize_proposals_for_cycle,
sync_parameters=sync_params,
Expand All @@ -108,6 +129,7 @@ async def sync_cycles(facility: FacilityName = FacilityName.nsls2):
"/sync/update-cycles/{facility}",
include_in_schema=SYNC_ROUTES_IN_SCHEMA,
tags=["sync"],
summary="Updates the local (nsls2core DB) cycle <-> proposal mapping",
)
async def sync_update_cycles(
request: fastapi.Request,
Expand Down
1 change: 0 additions & 1 deletion src/nsls2api/api/v1/proposal_api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from http.client import HTTPException
from typing import Annotated

import fastapi
Expand Down
14 changes: 13 additions & 1 deletion src/nsls2api/api/v1/stats_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ async def stats():

facility_data_health = await facility_service.is_healthy("nsls2")

# Get the NSLS-II proposals per cycle
nsls2_proposals_per_cycle: list[ProposalsPerCycleModel] = []

nsls2_cycle_list = await facility_service.facility_cycles("nsls2")
for cycle in nsls2_cycle_list:
proposal_list = await proposal_service.fetch_proposals_for_cycle(cycle)
Expand All @@ -35,13 +35,25 @@ async def stats():
)
nsls2_proposals_per_cycle.append(model)

# Get the LBMS proposals per cycle
lbms_proposals_per_cycle: list[ProposalsPerCycleModel] = []
lbms_cycle_list = await facility_service.facility_cycles("lbms")
for cycle in lbms_cycle_list:
proposal_list = await proposal_service.fetch_proposals_for_cycle(cycle)
if proposal_list is not None:
model = ProposalsPerCycleModel(
cycle=cycle, proposal_count=len(proposal_list)
)
lbms_proposals_per_cycle.append(model)

model = StatsModel(
facility_count=facilities,
beamline_count=beamlines,
proposal_count=total_proposals,
commissioning_proposal_count=commissioning,
facility_data_health=facility_data_health,
nsls2_proposals_per_cycle=nsls2_proposals_per_cycle,
lbms_proposals_per_cycle=lbms_proposals_per_cycle,
)
return model

Expand Down
4 changes: 2 additions & 2 deletions src/nsls2api/services/background_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ async def worker_function():
)
case JobActions.synchronize_proposals_for_cycle:
logger.info(
f"Processing job {job.id} to synchronize proposals for cycle {job.sync_parameters.cycle} (from {job.sync_parameters.sync_source})."
f"Processing job {job.id} to synchronize proposals for the {job.sync_parameters.facility} facility's cycle {job.sync_parameters.cycle} (from {job.sync_parameters.sync_source})."
)
await sync_service.worker_synchronize_proposals_for_cycle_from_pass(
job.sync_parameters.cycle
job.sync_parameters.cycle, job.sync_parameters.facility
)
case JobActions.synchronize_proposal_types:
logger.info(
Expand Down
10 changes: 5 additions & 5 deletions src/nsls2api/services/pass_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ async def get_saf_from_proposal(


async def get_commissioning_proposals_by_year(
year: str, facility: FacilityName = FacilityName.nsls2
year: str, facility_name: FacilityName = FacilityName.nsls2
) -> Optional[list[PassProposal]]:
pass_facility = await facility_service.pass_id_for_facility(facility)
pass_facility = await facility_service.pass_id_for_facility(facility_name)
if not pass_facility:
error_message: str = f"Facility {facility} does not have a PASS ID."
error_message: str = f"Facility {facility_name} does not have a PASS ID."
logger.error(error_message)
raise PassException(error_message)

Expand All @@ -142,11 +142,11 @@ async def get_commissioning_proposals_by_year(
PassProposal(**commissioning_proposal)
)
except ValidationError as error:
error_message = f"Error validating commissioning proposal data received from PASS for year {str(year)} at {facility} facility."
error_message = f"Error validating commissioning proposal data received from PASS for year {str(year)} at {facility_name} facility."
logger.error(error_message)
raise PassException(error_message) from error
except Exception as error:
error_message = f"Error retrieving commissioning proposal information from PASS for year {str(year)} at {facility} facility."
error_message = f"Error retrieving commissioning proposal information from PASS for year {str(year)} at {facility_name} facility."
logger.exception(error_message)
raise PassException(error_message) from error

Expand Down
8 changes: 6 additions & 2 deletions src/nsls2api/services/proposal_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,12 @@ async def recently_updated(count=5, beamline: str | None = None):
# return result


async def fetch_proposals_for_cycle(cycle_name: str) -> list[str]:
cycle = await Cycle.find_one(Cycle.name == cycle_name)
async def fetch_proposals_for_cycle(
cycle_name: str, facility_name: FacilityName = FacilityName.nsls2
) -> list[str]:
cycle = await Cycle.find_one(
Cycle.name == cycle_name, Cycle.facility == facility_name
)
if cycle is None:
raise LookupError(f"Cycle {cycle} not found in local database.")
return cycle.proposals
Expand Down
32 changes: 23 additions & 9 deletions src/nsls2api/services/sync_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,17 +317,23 @@ async def synchronize_proposal_from_pass(proposal_id: str) -> None:
logger.debug(f"Response: {response}")


async def update_proposals_with_cycle(cycle_name: str) -> None:
async def update_proposals_with_cycle(
cycle_name: str, facility_name: FacilityName = FacilityName.nsls2
) -> None:
"""
Update the cycle <-> proposals mapping for the given cycle.

:param cycle_name: The name of the cycle to process proposals for.
:type cycle_name: str
"""

proposal_list = await proposal_service.fetch_proposals_for_cycle(cycle_name)
proposal_list = await proposal_service.fetch_proposals_for_cycle(
cycle_name, facility_name=facility_name
)

logger.info(f"Found {len(proposal_list)} proposals for cycle {cycle_name}.")
logger.info(
f"Found {len(proposal_list)} proposals for {facility_name} cycle {cycle_name}."
)

for proposal_id in proposal_list:
# Add the cycle to the Proposal object
Expand All @@ -352,21 +358,29 @@ async def worker_synchronize_proposal_from_pass(proposal_id: str) -> None:
)


async def worker_synchronize_proposals_for_cycle_from_pass(cycle: str) -> None:
async def worker_synchronize_proposals_for_cycle_from_pass(
cycle: str, facility_name: FacilityName = FacilityName.nsls2
) -> None:
start_time = datetime.datetime.now()

cycle_year = await facility_service.cycle_year(cycle)
cycle_year = await facility_service.cycle_year(cycle, facility_name=facility_name)

proposals = await proposal_service.fetch_proposals_for_cycle(cycle)
logger.info(f"Synchronizing {len(proposals)} proposals for {cycle} cycle.")
proposals = await proposal_service.fetch_proposals_for_cycle(
cycle, facility_name=facility_name
)
logger.info(
f"Synchronizing {len(proposals)} proposals for facility {facility_name} in {cycle} cycle."
)

for proposal_id in proposals:
logger.info(f"Synchronizing proposal {proposal_id}.")
await synchronize_proposal_from_pass(proposal_id)

commissioning_proposals: list[
PassProposal
] = await pass_service.get_commissioning_proposals_by_year(cycle_year)
] = await pass_service.get_commissioning_proposals_by_year(
cycle_year, facility_name=facility_name
)
logger.info(
f"Synchronizing {len(proposals)} commissioning proposals for the year {cycle_year}."
)
Expand All @@ -375,7 +389,7 @@ async def worker_synchronize_proposals_for_cycle_from_pass(cycle: str) -> None:
await synchronize_proposal_from_pass(str(proposal.Proposal_ID))

# Now update the cycle information for each proposal
await update_proposals_with_cycle(cycle)
await update_proposals_with_cycle(cycle, facility_name=facility_name)

time_taken = datetime.datetime.now() - start_time
logger.info(
Expand Down
Loading