Skip to content

Commit

Permalink
Merge pull request #274 from tmaeno/master
Browse files Browse the repository at this point in the history
jedi enhancement
  • Loading branch information
tmaeno authored Oct 15, 2024
2 parents 591e760 + c7ffc65 commit 17d374e
Show file tree
Hide file tree
Showing 28 changed files with 41 additions and 46 deletions.
2 changes: 1 addition & 1 deletion PandaPkgInfo.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = "0.4.3"
release_version = "0.4.4"
2 changes: 1 addition & 1 deletion pandajedi/jedibrokerage/AtlasProdTaskBroker.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def runImpl(self):
data_location_check_period = 7
# main
lastJediTaskID = None
siteMapper = self.taskBufferIF.getSiteMapper()
siteMapper = self.taskBufferIF.get_site_mapper()
while True:
try:
taskInputList = self.inputList.get(1)
Expand Down
2 changes: 1 addition & 1 deletion pandajedi/jedibrokerage/JobBrokerBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def set_task_common(self, attr_name, attr_value):
self.task_common[attr_name] = attr_value

def refresh(self):
self.siteMapper = self.taskBufferIF.getSiteMapper()
self.siteMapper = self.taskBufferIF.get_site_mapper()

def setLiveCounter(self, liveCounter):
self.liveCounter = liveCounter
Expand Down
2 changes: 1 addition & 1 deletion pandajedi/jedibrokerage/TaskBrokerBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def __init__(self, taskBufferIF, ddmIF):
self.refresh()

def refresh(self):
self.siteMapper = self.taskBufferIF.getSiteMapper()
self.siteMapper = self.taskBufferIF.get_site_mapper()


Interaction.installSC(TaskBrokerBase)
21 changes: 4 additions & 17 deletions pandajedi/jedicore/JediTaskBuffer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
# DB API for JEDI

import datetime

# logger
from pandacommon.pandalogger.PandaLogger import PandaLogger
from pandaserver.brokerage.SiteMapper import SiteMapper
from pandaserver.taskbuffer import TaskBuffer

from pandajedi.jediconfig import jedi_config
Expand All @@ -24,25 +21,13 @@ def __init__(self, conn, nDBConnection=1):
CommandReceiveInterface.__init__(self, conn)
TaskBuffer.TaskBuffer.__init__(self)
TaskBuffer.TaskBuffer.init(self, jedi_config.db.dbhost, jedi_config.db.dbpasswd, nDBConnection=nDBConnection)
# site mapper
self.siteMapper = SiteMapper(self)
# update time for site mapper
self.dateTimeForSM = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
logger.debug("__init__")

# query an SQL
def querySQL(self, sql, varMap, arraySize=1000):
with self.proxyPool.get() as proxy:
return proxy.querySQLS(sql, varMap, arraySize)[1]

# get SiteMapper
def getSiteMapper(self):
timeNow = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
if datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) - self.dateTimeForSM > datetime.timedelta(minutes=10):
self.siteMapper = SiteMapper(self)
self.dateTimeForSM = timeNow
return self.siteMapper

# get work queue map
def getWorkQueueMap(self):
with self.proxyPool.get() as proxy:
Expand Down Expand Up @@ -445,12 +430,14 @@ def registerTaskInOneShot_JEDI(
# set tasks to be assigned
def setScoutJobDataToTasks_JEDI(self, vo, prodSourceLabel):
with self.proxyPool.get() as proxy:
return proxy.setScoutJobDataToTasks_JEDI(vo, prodSourceLabel, self.siteMapper)
tmp_site_mapper = self.get_site_mapper()
return proxy.setScoutJobDataToTasks_JEDI(vo, prodSourceLabel, tmp_site_mapper)

# prepare tasks to be finished
def prepareTasksToBeFinished_JEDI(self, vo, prodSourceLabel, nTasks=50, simTasks=None, pid="lock", noBroken=False):
with self.proxyPool.get() as proxy:
return proxy.prepareTasksToBeFinished_JEDI(vo, prodSourceLabel, nTasks, simTasks, pid, noBroken, self.siteMapper)
tmp_site_mapper = self.get_site_mapper()
return proxy.prepareTasksToBeFinished_JEDI(vo, prodSourceLabel, nTasks, simTasks, pid, noBroken, tmp_site_mapper)

# get tasks to be assigned
def getTasksToAssign_JEDI(self, vo, prodSourceLabel, workQueue, resource_name):
Expand Down
2 changes: 1 addition & 1 deletion pandajedi/jedidog/AtlasProdWatchDog.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def doActionForReassign(self, gTmpLog):
# get DDM I/F
ddmIF = self.ddmIF.getInterface(self.vo)
# get site mapper
siteMapper = self.taskBufferIF.getSiteMapper()
siteMapper = self.taskBufferIF.get_site_mapper()
# get tasks to get reassigned
taskList = self.taskBufferIF.getTasksToReassign_JEDI(self.vo, self.prodSourceLabel)

Expand Down
2 changes: 1 addition & 1 deletion pandajedi/jedidog/AtlasQueueFillerWatchDog.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def refresh(self):
# work queue mapper
self.workQueueMapper = self.taskBufferIF.getWorkQueueMap()
# site mapper
self.siteMapper = self.taskBufferIF.getSiteMapper()
self.siteMapper = self.taskBufferIF.get_site_mapper()
# all sites
allSiteList = []
for siteName, tmpSiteSpec in self.siteMapper.siteSpecList.items():
Expand Down
5 changes: 3 additions & 2 deletions pandajedi/jedidog/AtlasTaskWithholderWatchDog.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
import traceback

from pandacommon.pandalogger.PandaLogger import PandaLogger
from pandaserver.dataservice import DataServiceUtils

from pandajedi.jedibrokerage import AtlasBrokerUtils
from pandajedi.jediconfig import jedi_config
from pandajedi.jedicore.MsgWrapper import MsgWrapper
from pandaserver.dataservice import DataServiceUtils

from .WatchDogBase import WatchDogBase

Expand Down Expand Up @@ -44,7 +45,7 @@ def refresh(self):
# work queue mapper
self.workQueueMapper = self.taskBufferIF.getWorkQueueMap()
# site mapper
self.siteMapper = self.taskBufferIF.getSiteMapper()
self.siteMapper = self.taskBufferIF.get_site_mapper()
# all sites
allSiteList = []
for siteName, tmpSiteSpec in self.siteMapper.siteSpecList.items():
Expand Down
2 changes: 1 addition & 1 deletion pandajedi/jedidog/WatchDogBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def __init__(self, taskBufferIF, ddmIF):

# refresh
def refresh(self):
self.siteMapper = self.taskBufferIF.getSiteMapper()
self.siteMapper = self.taskBufferIF.get_site_mapper()

# pre-action
def pre_action(self, tmpLog, vo, prodSourceLabel, pid, *args, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion pandajedi/jedigen/TaskGeneratorBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def __init__(self, taskBufferIF, ddmIF):
self.refresh()

def refresh(self):
self.siteMapper = self.taskBufferIF.getSiteMapper()
self.siteMapper = self.taskBufferIF.get_site_mapper()


Interaction.installSC(TaskGeneratorBase)
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def initialize(self):
self.ddmIF = DDMInterface()
self.ddmIF.setupInterface()
# get SiteMapper
# siteMapper = self.tbIF.getSiteMapper()
# siteMapper = self.tbIF.get_site_mapper()
# get work queue mapper
# workQueueMapper = self.tbIF.getWorkQueueMap()
# get TaskSetupper
Expand Down Expand Up @@ -83,7 +83,7 @@ def process(self, msg_obj):
inputList = ListWithLock(tmpList)
# create thread
threadPool = ThreadPool()
siteMapper = self.tbIF.getSiteMapper()
siteMapper = self.tbIF.get_site_mapper()
taskSetupper = TaskSetupper(vo, prodSourceLabel)
taskSetupper.initializeMods(self.tbIF, self.ddmIF)
gen_thr = JobGeneratorThread(
Expand Down
3 changes: 2 additions & 1 deletion pandajedi/jedimsgprocessor/panda_to_jedi_msg_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import socket

from pandacommon.pandalogger import logger_utils

from pandajedi.jedicore.ThreadUtils import ListWithLock, ThreadPool
from pandajedi.jediddm.DDMInterface import DDMInterface
from pandajedi.jedimsgprocessor.base_msg_processor import BaseMsgProcPlugin
Expand Down Expand Up @@ -57,7 +58,7 @@ def process(self, msg_obj, decoded_data=None):
inputList = ListWithLock(tmpList)
# create thread
threadPool = ThreadPool()
siteMapper = self.tbIF.getSiteMapper()
siteMapper = self.tbIF.get_site_mapper()
taskSetupper = TaskSetupper(vo, prodSourceLabel)
taskSetupper.initializeMods(self.tbIF, self.ddmIF)
gen = JobGeneratorThread(
Expand Down
4 changes: 3 additions & 1 deletion pandajedi/jediorder/JobGenerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def start(self):
try:
tmpLog.debug("start")
# get SiteMapper
siteMapper = self.taskBufferIF.getSiteMapper()
siteMapper = self.taskBufferIF.get_site_mapper()
tmpLog.debug("got siteMapper")
# get work queue mapper
workQueueMapper = self.taskBufferIF.getWorkQueueMap()
Expand Down Expand Up @@ -781,6 +781,8 @@ def runImpl(self):
esJobsetMap=esJobsetMap,
getEsJobsetMap=True,
unprocessedMap=unprocessedMap,
bulk_job_insert=True,
trust_user=True,
)
resSubmit += tmpResSubmit
self.taskBufferIF.lockTask_JEDI(taskSpec.jediTaskID, self.pid)
Expand Down
2 changes: 1 addition & 1 deletion pandajedi/jedipprocess/PostProcessorBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(self, taskBufferIF, ddmIF):

# refresh
def refresh(self):
self.siteMapper = self.taskBufferIF.getSiteMapper()
self.siteMapper = self.taskBufferIF.get_site_mapper()

# basic post procedure
def doBasicPostProcess(self, taskSpec, tmpLog):
Expand Down
2 changes: 1 addition & 1 deletion pandajedi/jedirefine/TaskRefinerBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(self, taskBufferIF, ddmIF):

# refresh
def refresh(self):
self.siteMapper = self.taskBufferIF.getSiteMapper()
self.siteMapper = self.taskBufferIF.get_site_mapper()

# initialize
def initializeRefiner(self, tmpLog):
Expand Down
2 changes: 1 addition & 1 deletion pandajedi/jedisetup/AtlasTaskSetupper.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def doSetup(self, taskSpec, datasetToRegister, pandaJobs):
datasetToRegister.append(tmpFileSpec.datasetID)
tmpLog.info(f"datasetToRegister={str(datasetToRegister)}")
# get site mapper
siteMapper = self.taskBufferIF.getSiteMapper()
siteMapper = self.taskBufferIF.get_site_mapper()

# loop over all datasets
avDatasetList = []
Expand Down
3 changes: 2 additions & 1 deletion pandajedi/jedisetup/SimpleTaskSetupper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

# logger
from pandacommon.pandalogger.PandaLogger import PandaLogger

from pandajedi.jedicore import Interaction
from pandajedi.jedicore.MsgWrapper import MsgWrapper

Expand Down Expand Up @@ -42,7 +43,7 @@ def doSetup(self, taskSpec, datasetToRegister, pandaJobs):
if datasetToRegister:
tmpLog.info(f"datasetToRegister={str(datasetToRegister)}")
# get site mapper
siteMapper = self.taskBufferIF.getSiteMapper()
siteMapper = self.taskBufferIF.get_site_mapper()

# loop over all datasets
avDatasetList = []
Expand Down
2 changes: 1 addition & 1 deletion pandajedi/jedisetup/TaskSetupperBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def __init__(self, taskBufferIF, ddmIF):
self.refresh()

def refresh(self):
self.siteMapper = self.taskBufferIF.getSiteMapper()
self.siteMapper = self.taskBufferIF.get_site_mapper()


Interaction.installSC(TaskSetupperBase)
3 changes: 2 additions & 1 deletion pandajedi/jeditest/brokerTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import sys

from pandacommon.pandalogger.PandaLogger import PandaLogger

from pandajedi.jedicore.JediTaskBufferInterface import JediTaskBufferInterface
from pandajedi.jedicore.MsgWrapper import MsgWrapper
from pandajedi.jedicore.ThreadUtils import ThreadPool
Expand All @@ -17,7 +18,7 @@
tbIF = JediTaskBufferInterface()
tbIF.setupInterface()

siteMapper = tbIF.getSiteMapper()
siteMapper = tbIF.get_site_mapper()

ddmIF = DDMInterface()
ddmIF.setupInterface()
Expand Down
2 changes: 1 addition & 1 deletion pandajedi/jeditest/jobGenForOneTask.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
tbIF = JediTaskBufferInterface()
tbIF.setupInterface()

siteMapper = tbIF.getSiteMapper()
siteMapper = tbIF.get_site_mapper()

ddmIF = DDMInterface()
ddmIF.setupInterface()
Expand Down
2 changes: 1 addition & 1 deletion pandajedi/jeditest/jobSplitterTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
tbIF = JediTaskBufferInterface()
tbIF.setupInterface()

siteMapper = tbIF.getSiteMapper()
siteMapper = tbIF.get_site_mapper()


ddmIF = DDMInterface()
Expand Down
2 changes: 1 addition & 1 deletion pandajedi/jeditest/jobThrottlerTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
cloud = "WORLD"

# get SiteMapper
siteMapper = tbIF.getSiteMapper()
siteMapper = tbIF.get_site_mapper()
wqMap = tbIF.getWorkQueueMap()

jt = JobThrottler(vo, prodSourceLabel)
Expand Down
2 changes: 1 addition & 1 deletion pandajedi/jeditest/taskBrokerTestWithTask.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
tbIF = JediTaskBufferInterface()
tbIF.setupInterface()

siteMapper = tbIF.getSiteMapper()
siteMapper = tbIF.get_site_mapper()


ddmIF = DDMInterface()
Expand Down
3 changes: 2 additions & 1 deletion pandajedi/jeditest/testOneTaskToRefine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import sys

from pandacommon.pandalogger.PandaLogger import PandaLogger

from pandajedi.jedicore.JediTaskBufferInterface import JediTaskBufferInterface
from pandajedi.jedicore.MsgWrapper import MsgWrapper
from pandajedi.jediddm.DDMInterface import DDMInterface
Expand All @@ -14,7 +15,7 @@
tbIF = JediTaskBufferInterface()
tbIF.setupInterface()

siteMapper = tbIF.getSiteMapper()
siteMapper = tbIF.get_site_mapper()

ddmIF = DDMInterface()
ddmIF.setupInterface()
Expand Down
3 changes: 2 additions & 1 deletion pandajedi/jeditest/testOneTaskToTaskBroker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import sys

from pandacommon.pandalogger.PandaLogger import PandaLogger

from pandajedi.jedicore.JediTaskBufferInterface import JediTaskBufferInterface
from pandajedi.jedicore.MsgWrapper import MsgWrapper
from pandajedi.jediddm.DDMInterface import DDMInterface
Expand All @@ -14,7 +15,7 @@
tbIF = JediTaskBufferInterface()
tbIF.setupInterface()

siteMapper = tbIF.getSiteMapper()
siteMapper = tbIF.get_site_mapper()

ddmIF = DDMInterface()
ddmIF.setupInterface()
Expand Down
2 changes: 1 addition & 1 deletion pandajedi/jeditest/testPandaDDMMap.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
tb_if.setupInterface()

# Get the site mapper
site_mapper = tb_if.getSiteMapper()
site_mapper = tb_if.get_site_mapper()

# Define a random site of panda sites
panda_sites = [
Expand Down
2 changes: 1 addition & 1 deletion pandajedi/jedithrottle/JobThrottlerBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def refresh(self):
self.maxNumJobs = None
self.minPriority = None
self.underNqLimit = False
self.siteMapper = self.taskBufferIF.getSiteMapper()
self.siteMapper = self.taskBufferIF.get_site_mapper()

# set maximum number of jobs to be submitted
def setMaxNumJobs(self, maxNumJobs):
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ authors = [
]
dependencies = [
'panda-common>=0.0.38',
'panda-server>=0.3.21',
'panda-server>=0.3.23',
'python-daemon',
'numpy',
'pyyaml',
Expand Down

0 comments on commit 17d374e

Please sign in to comment.