From 4742962acad61fdf6a9dad6a0574b584fa9df5ed Mon Sep 17 00:00:00 2001 From: engshahrad Date: Sun, 28 Jan 2024 00:48:46 +0000 Subject: [PATCH 1/7] using underscore for obvious throwaway variables --- scheduler/Estimator.py | 10 +++++----- scheduler/baselineSlackAnalysis.py | 2 +- scheduler/dataFrameGarbageCollector.py | 2 +- scheduler/resetLastDecisions.py | 2 +- scheduler/resetRoutingDecisions.py | 2 +- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/scheduler/Estimator.py b/scheduler/Estimator.py index 3aa7fab..22cced9 100644 --- a/scheduler/Estimator.py +++ b/scheduler/Estimator.py @@ -245,7 +245,7 @@ def getExecutionTime(self, host): ) # if (selectedInits.shape[0]) >= self.windowSize: # selectedInits = selectedInits.head(self.windowSize) - for i, record in selectedInits.iterrows(): + for _, record in selectedInits.iterrows(): durations.append(record["duration"]) if mode == "best-case": if host == "s": @@ -413,7 +413,7 @@ def getFuncExecutionTime(self, func, host, mode): # selectedInits = pd.concat([grouped.get_group(group) for i, group in enumerate(grouped.groups) if i>=len(grouped)-self.windowSize]) # if (selectedInits.shape[0]) >= self.windowSize: # selectedInits = selectedInits.head(self.windowSize) - for i, record in selectedInits.iterrows(): + for _, record in selectedInits.iterrows(): durations.append(record["duration"]) if mode == "best-case": if host == "s": @@ -626,7 +626,7 @@ def getCost(self): ) # if (selectedInits.shape[0]) >= self.windowSize: # selectedInits = selectedInits.head(self.windowSize) - for i, record in selectedInits.iterrows(): + for _, record in selectedInits.iterrows(): durations.append(record["duration"]) if mode == "best-case": et = self.getUpperBound(durations) @@ -666,7 +666,7 @@ def getFuncCost(self, mode, func): ) # if (selectedInits.shape[0]) >= self.windowSize: # selectedInits = selectedInits.head(self.windowSize) - for i, record in selectedInits.iterrows(): + for _, record in selectedInits.iterrows(): durations.append(record["duration"]) if mode == "best-case": et = self.getUpperBound(durations) @@ -695,7 +695,7 @@ def getDurationsList(self, func, host): selectedInits = pd.concat( islice(map(itemgetter(1), g), max(0, g.ngroups - self.windowSize), None) ) - for i, record in selectedInits.iterrows(): + for _, record in selectedInits.iterrows(): durations.append(record["duration"]) return durations diff --git a/scheduler/baselineSlackAnalysis.py b/scheduler/baselineSlackAnalysis.py index 9d458a7..b010d42 100644 --- a/scheduler/baselineSlackAnalysis.py +++ b/scheduler/baselineSlackAnalysis.py @@ -126,7 +126,7 @@ def selectRecords(self): selectedInits["start"] = pd.to_datetime(selectedInits["start"]) selectedInits.sort_values(by=["start"], ascending=False, inplace=True) selectedRecords = [] - for i, record in selectedInits.iterrows(): + for _, record in selectedInits.iterrows(): selectedReq = self.dataframe.loc[ (self.dataframe["reqID"] == record["reqID"]) & (self.dataframe["host"] == "s") diff --git a/scheduler/dataFrameGarbageCollector.py b/scheduler/dataFrameGarbageCollector.py index a4a2e5b..f6d14d6 100644 --- a/scheduler/dataFrameGarbageCollector.py +++ b/scheduler/dataFrameGarbageCollector.py @@ -123,7 +123,7 @@ def selectRecords(self): selectedInits["start"] = pd.to_datetime(selectedInits["start"]) selectedInits.sort_values(by=["start"], ascending=False, inplace=True) selectedRecords = [] - for i, record in selectedInits.iterrows(): + for _, record in selectedInits.iterrows(): selectedReq = self.dataframe.loc[ (self.dataframe["reqID"] == record["reqID"]) & (self.dataframe["host"] == "s") diff --git a/scheduler/resetLastDecisions.py b/scheduler/resetLastDecisions.py index 0983f3b..2b0b4d9 100644 --- a/scheduler/resetLastDecisions.py +++ b/scheduler/resetLastDecisions.py @@ -37,7 +37,7 @@ def __init__(self, workflow, vmNum, mode): self.config.write(configfile) for decisionMode in decisionModes: finalDecision = [] - for i in range(functionNum): + for _ in range(functionNum): eachFunc = [0] * int(vmNum) finalDecision.append(eachFunc) workflow_json["lastDecision" + "_" + decisionMode] = finalDecision diff --git a/scheduler/resetRoutingDecisions.py b/scheduler/resetRoutingDecisions.py index 51d31a8..57bf7f6 100644 --- a/scheduler/resetRoutingDecisions.py +++ b/scheduler/resetRoutingDecisions.py @@ -70,7 +70,7 @@ def resetRouting(self): rates = [25, 50, 75, 95] for percent in rates: finalDecision = [] - for i in range(self.functionNum): + for _ in range(self.functionNum): eachFunc = [0] * int(self.vmNum) finalDecision.append(eachFunc) self.routing["routing" + "_" + str(percent)] = str(finalDecision) From cb7f1bb8fc9967472a03069d8f751475eb68ef77 Mon Sep 17 00:00:00 2001 From: engshahrad Date: Sun, 28 Jan 2024 01:02:20 +0000 Subject: [PATCH 2/7] fixed binary singleton comparisons --- log-parser/get-workflow-logs/getNewServerlessLogs.py | 2 +- scheduler/Estimator.py | 2 +- scheduler/baselineSlackAnalysis.py | 2 +- scheduler/rpsCIScheduler.py | 8 ++++---- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/log-parser/get-workflow-logs/getNewServerlessLogs.py b/log-parser/get-workflow-logs/getNewServerlessLogs.py index 9fdfc43..81d3690 100644 --- a/log-parser/get-workflow-logs/getNewServerlessLogs.py +++ b/log-parser/get-workflow-logs/getNewServerlessLogs.py @@ -174,7 +174,7 @@ def pullLogs(self, function): self.tempTimeStampRecorded[function] = lastLog_date[0] print("Time for func: ", function, " is::", lastLog_date[0]) endFlag = False - while endFlag != True: + while endFlag is False: tempDate = datetime.datetime.strptime( self.tempTimeStampRecorded[function], "%Y-%m-%d %H:%M:%S.%f" ) diff --git a/scheduler/Estimator.py b/scheduler/Estimator.py index 22cced9..88f2fc0 100644 --- a/scheduler/Estimator.py +++ b/scheduler/Estimator.py @@ -432,7 +432,7 @@ def getFuncExecutionTime(self, func, host, mode): # newMergingPatternChanges def get_num_per_req(self, func, test): - if test == True: + if test is True: return 1 selectedInits = self.dataframe.loc[(self.dataframe["function"] == func)] counts = (selectedInits.groupby(["reqID"]).size().reset_index(name="counts"))[ diff --git a/scheduler/baselineSlackAnalysis.py b/scheduler/baselineSlackAnalysis.py index b010d42..a625e5b 100644 --- a/scheduler/baselineSlackAnalysis.py +++ b/scheduler/baselineSlackAnalysis.py @@ -292,7 +292,7 @@ def completeLSLF(self, duration, criticalPath): if d[1] == d2[0]: terminalFlag = False break - if terminalFlag == True: + if terminalFlag is True: terminals.append(d[1]) for t in terminals: self.lf[t] = duration diff --git a/scheduler/rpsCIScheduler.py b/scheduler/rpsCIScheduler.py index eeed6d5..2339b1c 100644 --- a/scheduler/rpsCIScheduler.py +++ b/scheduler/rpsCIScheduler.py @@ -168,7 +168,7 @@ def resolveOffloadingSolutions(self): decisionModes = ["default"] else: totalTripleDecisioin = x.tripleCaseDecision(len(cpus)) - if totalTripleDecisioin == True: + if totalTripleDecisioin is True: decisionModes = ["default", "worst-case", "best-case"] logging.info("latency mode, similar distributions") else: @@ -234,10 +234,10 @@ def resolveOffloadingSolutions(self): finalDecision = list(finalDecision) for function in range(len(finalDecision)): allZero = all(item == 0 for item in list(finalDecision[function])) - if allZero == False: + if allZero is False: AllZeroesFlag = False break - if AllZeroesFlag == True: + if AllZeroesFlag is True: for function in range(len(finalDecision)): if function != 0: vmOffset = np.full(len(finalDecision[function]), 0.05) @@ -319,7 +319,7 @@ def resolveOffloadingSolutions(self): os.path.exists( str(Path(os.path.dirname(os.path.abspath(__file__)))) + "/forcedLock.txt" ) - ) and (checkingFlag == True): + ) and (checkingFlag is True): os.remove( str(Path(os.path.dirname(os.path.abspath(__file__)))) + "/forcedLock.txt" ) From 07da5a3907e9f70448cf87b146899350fa48fa70 Mon Sep 17 00:00:00 2001 From: engshahrad Date: Sun, 28 Jan 2024 01:35:27 +0000 Subject: [PATCH 3/7] cleanup of some comments --- .../get-workflow-logs/getNewDatastoreLogs.py | 6 +--- .../get-workflow-logs/getNewServerlessLogs.py | 2 -- scheduler/Estimator.py | 14 +++----- scheduler/LatencyModel.py | 1 - scheduler/baselineSlackAnalysis.py | 19 ---------- scheduler/dataFrameGarbageCollector.py | 19 +--------- scheduler/datastoreGarbageCollector.py | 1 - scheduler/getInvocationRate.py | 6 ---- scheduler/monitoring.py | 3 -- scheduler/resetLastDecisions.py | 9 ++--- scheduler/rpsCIScheduler.py | 11 ------ scheduler/rpsMultiHostSolver.py | 36 ++++++++----------- 12 files changed, 24 insertions(+), 103 deletions(-) diff --git a/log-parser/get-workflow-logs/getNewDatastoreLogs.py b/log-parser/get-workflow-logs/getNewDatastoreLogs.py index 09f4e47..7eb18ad 100644 --- a/log-parser/get-workflow-logs/getNewDatastoreLogs.py +++ b/log-parser/get-workflow-logs/getNewDatastoreLogs.py @@ -54,7 +54,6 @@ def __init__(self, workflow): self.startTest = datetime.datetime.strptime( (startDate), "%Y-%m-%d %H:%M:%S.%f" ) - # self.startTest = datetime.datetime.strptime((startDate), "%Y-%m-%d %H:%M:%S.%f") self.dictData["function"] = [] self.dictData["reqID"] = [] self.dictData["start"] = [] @@ -272,10 +271,7 @@ def saveNewLogs(self): def keepWindowSize(self, df): serverlessDF = df.loc[df["host"] == "s"] - df.drop( - df[(df["host"] == "s")].index, - inplace=True, - ) + df.drop(df[(df["host"] == "s")].index, inplace=True) df["start"] = pd.to_datetime(df["start"], utc=True) df.sort_values(by=["start"], ascending=False, inplace=True) vmSelectedDF = df.groupby(["host", "function"]).head(self.windowSize) diff --git a/log-parser/get-workflow-logs/getNewServerlessLogs.py b/log-parser/get-workflow-logs/getNewServerlessLogs.py index 81d3690..28a46b0 100644 --- a/log-parser/get-workflow-logs/getNewServerlessLogs.py +++ b/log-parser/get-workflow-logs/getNewServerlessLogs.py @@ -33,7 +33,6 @@ def __init__(self, workflow): self.config.read(path) self.rankerConfig = self.config["settings"] self.windowSize = int(self.rankerConfig["windowSize"]) - # self.windowSize = 50 self.dictData["function"] = [] self.dictData["reqID"] = [] self.dictData["start"] = [] @@ -141,7 +140,6 @@ def checkLatestTimeStamp(self, func): self.tempTimeStampRecorded = copy.deepcopy(workflow_json) if func in workflow_json.keys(): lastRecordedTimestamp = str(workflow_json[func]) - # lastRecordedTimestamp = str(workflow_json[func][0]["time_utc"] arrayTS = lastRecordedTimestamp.split() self.lastTimestamp = arrayTS[0] + "T" + arrayTS[1] print("TT::::", self.lastTimestamp) diff --git a/scheduler/Estimator.py b/scheduler/Estimator.py index 88f2fc0..ff77748 100644 --- a/scheduler/Estimator.py +++ b/scheduler/Estimator.py @@ -368,7 +368,9 @@ def getPubSubMessageSize(self): else: psSize = psSizeSeries.item() pubSubSize[func] = psSize - nonzero_vals = [ pubSubSize[func] for func in pubSubSize.keys() if pubSubSize[func] != 0 ] + nonzero_vals = [ + pubSubSize[func] for func in pubSubSize.keys() if pubSubSize[func] != 0 + ] if len(nonzero_vals) != 0: average_nonzeroes = np.mean(np.array(nonzero_vals)) for func in pubSubSize.keys(): @@ -404,13 +406,10 @@ def getFuncExecutionTime(self, func, host, mode): if (len(selectedInits) == 0) and (len(durations) == 0): return 0 if len(selectedInits) != 0: - # newMergingPatternChanges g = selectedInits.groupby(selectedInits["reqID"], sort=False) selectedInits = pd.concat( islice(map(itemgetter(1), g), max(0, g.ngroups - self.windowSize), None) ) - # grouped = selectedInits.groupby('reqID') - # selectedInits = pd.concat([grouped.get_group(group) for i, group in enumerate(grouped.groups) if i>=len(grouped)-self.windowSize]) # if (selectedInits.shape[0]) >= self.windowSize: # selectedInits = selectedInits.head(self.windowSize) for _, record in selectedInits.iterrows(): @@ -430,7 +429,6 @@ def getFuncExecutionTime(self, func, host, mode): exeTime = self.getMean(durations) return exeTime - # newMergingPatternChanges def get_num_per_req(self, func, test): if test is True: return 1 @@ -474,7 +472,6 @@ def getComLatency(self, child, parent, childHost, parentHost, mode): # "NOTFOUND:::", parent, "::", parentHost, "-->", child, ":::", childHost # ) return "NotFound" - # newMergingPatternChanges selectedInitsParentFinish = ( selectedInitsParent[(selectedInitsParent.reqID.isin(reqs))] )[["reqID", "finish"]] @@ -585,7 +582,6 @@ def getComLatency(self, child, parent, childHost, parentHost, mode): if type(x) == str else x.replace(tzinfo=None) ) - # newDF['duration'] = (newDF['start'] - newDF['end']).dt.microseconds* 0.001 newDF["duration"] = ( (newDF["start"] - newDF["end"]).dt.total_seconds().mul(1000).astype(int) ) @@ -688,7 +684,7 @@ def getDurationsList(self, func, host): selectedInits["start"] = pd.to_datetime(selectedInits["start"]) selectedInits.sort_values(by=["start"], ascending=False, inplace=True) if (len(selectedInits) == 0) and (len(durations) == 0): - # print("No records found for {} running in {}".format(func, host)) + print("No records found for {} running in {}".format(func, host)) return [] if len(selectedInits) != 0: g = selectedInits.groupby(selectedInits["reqID"], sort=False) @@ -706,7 +702,7 @@ def distributions_SMDTest(self, vmDurations, serverlessDuration): vmStd = np.std(np.array(vmDurations)) serverlessStd = np.std(np.array(serverlessDuration)) SMD = abs(vmMean - serverlessMean) / math.sqrt( - ((vmStd**2) + (serverlessStd**2)) / 2 + ((vmStd ** 2) + (serverlessStd ** 2)) / 2 ) print("SMD::", SMD) if SMD < 0.1: diff --git a/scheduler/LatencyModel.py b/scheduler/LatencyModel.py index 4b1e393..45c49df 100644 --- a/scheduler/LatencyModel.py +++ b/scheduler/LatencyModel.py @@ -46,7 +46,6 @@ def getLinearAddedLatency(self, msgSize): (float(vmCoefficients[i])) - (float(serverlessCoefficients[i])) ) return addedLatency - # return 0 if __name__ == "__main__": diff --git a/scheduler/baselineSlackAnalysis.py b/scheduler/baselineSlackAnalysis.py index a625e5b..5b9ed19 100644 --- a/scheduler/baselineSlackAnalysis.py +++ b/scheduler/baselineSlackAnalysis.py @@ -17,15 +17,12 @@ class baselineSlackAnalysisClass: def __init__(self, workflow): self.workflow = workflow - # jsonPath = os.getcwd() + "/log-parser/get-workflow-logs/data/" + self.workflow+".json" - # dataframePath = os.getcwd() + "/log-parser/get-workflow-logs/data/" + self.workflow + "/NEWWgeneratedData.pkl" jsonPath = ( str(Path(os.path.dirname(os.path.abspath(__file__))).resolve().parents[0]) + "/log-parser/get-workflow-logs/data/" + self.workflow + ".json" ) - # dataframePath = str(Path(os.getcwd()).resolve().parents[0]) + "/log-parser/get-workflow-logs/data/" + self.workflow + "/generatedDataFrame.pkl" dfDir = Path( str(Path(os.path.dirname(os.path.abspath(__file__))).parents[0]) + "/log-parser/get-workflow-logs/data/" @@ -112,7 +109,6 @@ def __init__(self, workflow): len(self.predecessors[self.workflowFunctions.index(func)]) ) - 1 self.memories = workflow_json["memory"] - # self.dataframe = pd.read_pickle(dataframePath) self.selectedIDs = self.selectRecords() self.observations = self.getObservations() self.slackCalculations() @@ -131,22 +127,18 @@ def selectRecords(self): (self.dataframe["reqID"] == record["reqID"]) & (self.dataframe["host"] == "s") ] - # newMergingPatternChanges createdSet = selectedReq["function"].copy() createdSet = set(createdSet.to_numpy()) if (selectedReq.shape[0] >= self.recordNum) and ( len(createdSet) == len(self.workflowFunctions) ): - # if selectedReq.shape[0] >= self.recordNum: selectedRecords.append(record["reqID"]) if len(selectedRecords) >= self.windowSize: selectedRecords = selectedRecords[: self.windowSize] - # print("SELECTEDRECORDS::::", selectedRecords) return selectedRecords def getObservations(self): for func in self.workflowFunctions: - # newMergingPatternChanges for reqID in self.selectedIDs: df2 = self.dataframe.loc[ ( @@ -155,11 +147,8 @@ def getObservations(self): ) ] if df2.shape[0] == 1: - # merging = self.dataframe.loc[((self.dataframe["reqID"] == reqID) & (self.dataframe["function"] == func)), "mergingPoint"] - # if merging == None: selectedDuration = df2.iloc[0]["duration"] self.slackData[func].append(selectedDuration) - # self.dataframe.loc[((self.dataframe["reqID"] == reqID) & (self.dataframe["function"] == func)), "duration"] elif len(self.predecessors[self.workflowFunctions.index(func)]) > 1: start = df2["start"].max() finish = df2["finish"].max() @@ -169,7 +158,6 @@ def getObservations(self): duration = df2["duration"].mean() self.slackData[func].append(duration) for entry in self.slackData.keys(): - # newMergingPatternChanges if entry not in self.workflowFunctions: for reqID in self.selectedIDs: prevFunc = entry.split("-")[0] @@ -206,14 +194,10 @@ def getObservations(self): ] else: start = self.avg_datetime(dfNext["start"]) - # start = dfNext.loc[dfNext["mergingPoint"] == prevFunc].iloc[0][ - # "start" - # ] duration = ((start - finish).total_seconds()) * 1000 self.slackData[entry].append(duration) - # newMergingPatternChanges def avg_datetime(self, series): dt_min = series.min() deltas = [x - dt_min for x in series] @@ -231,8 +215,6 @@ def getUpperBound(self, array): return upperBound def getMedian(self, array): - # median = array.quantile(0.5) - median = statistics.median(array) statistics.quantiles return median @@ -353,7 +335,6 @@ def slackCalculations(self): for col in self.slackData.keys(): slack = self.lf[col] - self.ef[col] slackResults[col][decisionMode] = slack - # print(slackResults) with open( ( (os.path.dirname(os.path.abspath(__file__))) diff --git a/scheduler/dataFrameGarbageCollector.py b/scheduler/dataFrameGarbageCollector.py index f6d14d6..ff2b42e 100644 --- a/scheduler/dataFrameGarbageCollector.py +++ b/scheduler/dataFrameGarbageCollector.py @@ -103,7 +103,6 @@ def __init__(self, workflow): self.config.read(path) self.rankerConfig = self.config["settings"] self.windowSize = int(self.rankerConfig["windowSize"]) - # self.windowSize = 50 self.slackData = {} self.dependencies = [] self.recordNum = len(self.workflowFunctions) @@ -128,14 +127,12 @@ def selectRecords(self): (self.dataframe["reqID"] == record["reqID"]) & (self.dataframe["host"] == "s") ] - # newMergingPatternChanges createdSet = selectedReq["function"].copy() createdSet = set(createdSet.to_numpy()) if (selectedReq.shape[0] >= self.recordNum) and ( len(createdSet) == len(self.workflowFunctions) ): selectedRecords.append(record["reqID"]) - # print("selected::: ", selectedRecords) if len(selectedRecords) >= self.windowSize: selectedRecords = selectedRecords[: self.windowSize] self.finalRecodreqID = selectedRecords[-1] @@ -144,18 +141,12 @@ def selectRecords(self): def getLastObservations(self): print(self.finalRecodreqID) for func in self.workflowFunctions: - # print("BEFORE::", self.dataframe.shape[0]) selectedReq = self.dataframe.loc[ (self.dataframe["reqID"] == self.finalRecodreqID) & (self.dataframe["function"] == func) ] selectedReqForFunc = selectedReq.iloc[0]["start"] startDate = selectedReqForFunc - # startDate = datetime.date((selectedReqForFunc["start"]) - # self.dataframe["start"] = pd.to_datetime( - # self.dataframe["start"], format="%Y-%m-%d %H:%M:%S.%f" - # ) - # self.dataframe selected2 = self.dataframe.loc[ (self.dataframe["function"] == func) & (self.dataframe["host"] == "s") ] @@ -166,15 +157,7 @@ def getLastObservations(self): ((selected2["start"]) < startDate) & (~selected2["reqID"].isin(self.selectedIDs)) ] - # print(func, ",:::,", selected2.index) - self.dataframe.drop( - selected2.index, - inplace=True, - ) - # print("AFTER::", self.dataframe.shape[0]) - # selected = pd.merge(self.dataframe,selected, indicator=True, how='outer').query('_merge=="left_only"').drop('_merge', axis=1) - # print("func:::", func) - # print(selected2) + self.dataframe.drop(selected2.index, inplace=True) if __name__ == "__main__": diff --git a/scheduler/datastoreGarbageCollector.py b/scheduler/datastoreGarbageCollector.py index b3ee2be..f27afec 100644 --- a/scheduler/datastoreGarbageCollector.py +++ b/scheduler/datastoreGarbageCollector.py @@ -28,7 +28,6 @@ def remove(self): query = self.datastore_client.query(kind="Merging") results = list(query.fetch()) for res in results: - # print(res.key.id_or_name) if (res["Date"].replace(tzinfo=None)) <= ( datetime.datetime.utcnow() - timedelta(minutes=60 * 4) ): diff --git a/scheduler/getInvocationRate.py b/scheduler/getInvocationRate.py index cc10417..3c8bfc2 100644 --- a/scheduler/getInvocationRate.py +++ b/scheduler/getInvocationRate.py @@ -81,8 +81,6 @@ def __init__(self, workflow): def getRPS(self): self.dataframe["start"] = pd.to_datetime(self.dataframe["start"]) self.dataframe.sort_values(by=["start"], ascending=True, inplace=True) - # print(self.dataframe["start"]) - # print(self.dataframe["start"].diff()) diff = ( self.dataframe["start"] .diff() @@ -94,7 +92,6 @@ def getRPS(self): diff = diff[1:] diff = diff / 1000 diff = 1 / diff - # print(diff) percentiles = [25, 50, 75, 95] results = {} medianIR = np.percentile(diff, 50) @@ -118,9 +115,6 @@ def getRPS(self): # print(countt) ## m = np.amax(x) - # Per95 = np.percentile(x, 96) - # print(Per95) - if __name__ == "__main__": diff --git a/scheduler/monitoring.py b/scheduler/monitoring.py index 7ff9664..013c3db 100644 --- a/scheduler/monitoring.py +++ b/scheduler/monitoring.py @@ -32,18 +32,15 @@ def __init__(self): pubsubMeanMsgSize = {} for col in result.columns: topics[col[2]] = [] - # print(col[2]) for rec in range(len(result[col])): x = str(result[col].iloc[rec]) if "mean" in x: match = re.findall(r"mean: .+", x, flags=re.IGNORECASE) mean = match[0].replace("mean:", "") topics[col[2]].append(float(mean)) - # print(result[col][1]) for topic in topics: nptopic = np.array(topics[topic]) pubsubMeanMsgSize[topic] = np.mean(nptopic) - # print(pubsubMeanMsgSize) topic = [] size = [] diff --git a/scheduler/resetLastDecisions.py b/scheduler/resetLastDecisions.py index 2b0b4d9..c551754 100644 --- a/scheduler/resetLastDecisions.py +++ b/scheduler/resetLastDecisions.py @@ -181,11 +181,10 @@ def __init__(self, workflow, vmNum, mode): utilFilePath, forcedLockFile, VMcachDataframe, - juliaStdinPath, + juliaStdinPath, juliaStdoutPath, solverInPath, - solverOutPath - + solverOutPath, ] finalPaths = filePaths + dfPaths + irPaths for filePath in finalPaths: @@ -196,10 +195,6 @@ def __init__(self, workflow, vmNum, mode): print(filePath, "does not exist") -# print(initFunc) -# sys.exit(0) - - if __name__ == "__main__": workflow = sys.argv[1] vmNum = int(sys.argv[2]) diff --git a/scheduler/rpsCIScheduler.py b/scheduler/rpsCIScheduler.py index 2339b1c..428e19b 100644 --- a/scheduler/rpsCIScheduler.py +++ b/scheduler/rpsCIScheduler.py @@ -13,7 +13,6 @@ from getInvocationRate import InvocationRate import sys -# import psutil, os import logging logging.basicConfig( @@ -222,10 +221,8 @@ def resolveOffloadingSolutions(self): decisions.append(x) if len(decisions) == 0: continue - # print("decisions::", decisions) AllZeroesFlag = True finalDecision = np.mean(decisions, axis=0) - # print("Average for case:", finalDecision) finalDecision = finalDecision / 100 capArray = [0]*(len(finalDecision)) for i in range(len(capArray)): @@ -271,7 +268,6 @@ def resolveOffloadingSolutions(self): initType = sys.argv[1] if initType == "forced": triggerType = "resolve" - # Added by mohamed to allow locking if os.path.exists( str(Path(os.path.dirname(os.path.abspath(__file__)))) + "/lock.txt" ): @@ -308,7 +304,6 @@ def resolveOffloadingSolutions(self): logging.info(str(pid)) logging.info("LOCK CREATED!!!") logging.info(str(datetime.datetime.now())) - # triggerType = "resolve" try: solver = CIScheduler(triggerType) except: @@ -325,10 +320,4 @@ def resolveOffloadingSolutions(self): ) logging.info("Lock released!!!") logging.info(str(datetime.datetime.now())) - # print( - # "LOCK removed-> search for lock file:", - # os.path.exists( - # str(Path(os.path.dirname(os.path.abspath(__file__)))) + "/lock.txt" - # ), - # ) print("--- %s seconds ---" % (time.time() - start_time)) diff --git a/scheduler/rpsMultiHostSolver.py b/scheduler/rpsMultiHostSolver.py index 77624f2..dc060c9 100644 --- a/scheduler/rpsMultiHostSolver.py +++ b/scheduler/rpsMultiHostSolver.py @@ -368,7 +368,7 @@ def suggestBestOffloadingMultiVMJulia(self, availResources, alpha, verbose): ] func_costs_1 = [ - ((10**5) * 2) + ((10 ** 5) * 2) * (1 - alpha) * self.rps * self.estimator.get_num_per_req(offloadingCandidates[i], False) @@ -544,18 +544,13 @@ def suggestBestOffloadingMultiVMJulia(self, availResources, alpha, verbose): } for node in path[1:-1] ] - # print(x_2) X_2.append(x_2) x_3 = { "tmpIndex": (self.offloadingCandidates.index(path[1]), c[0]), "coeff": ( self.getCommunicationLatency( - (path[1]), - (path[0]), - c[0], - "s", - self.decisionMode, + (path[1]), (path[0]), c[0], "s", self.decisionMode ) ), } @@ -571,7 +566,7 @@ def suggestBestOffloadingMultiVMJulia(self, availResources, alpha, verbose): ################################################ func_costs_1 = [ - ((10**5) * 2) + ((10 ** 5) * 2) * (1 - alpha) * self.rps * self.estimator.get_num_per_req(offloadingCandidates[i], False) @@ -640,7 +635,7 @@ def calcLatencyCost(self, alpha, offloadingCandidates, availResources, sol): return sum( [ ( - ((10**5) * 2) + ((10 ** 5) * 2) * (1 - alpha) * self.rps * self.estimator.get_num_per_req(offloadingCandidates[i], False) @@ -671,7 +666,7 @@ def calcLatencyCost(self, alpha, offloadingCandidates, availResources, sol): [ ( ( - (10**3) + (10 ** 3) * (alpha) * ( abs( @@ -814,7 +809,7 @@ def suggestBestOffloadingMultiVMGekko(self, availResources, alpha, verbose): model.sum( [ ( - ((10**5) * 2) + ((10 ** 5) * 2) * (1 - alphaConst) * self.rps * self.estimator.get_num_per_req( @@ -905,7 +900,7 @@ def suggestBestOffloadingMultiVMGekko(self, availResources, alpha, verbose): # ) # + ( - (10**3) + (10 ** 3) * (alphaConst) * ( model.abs2( @@ -956,7 +951,6 @@ def suggestBestOffloadingMultiVMGekko(self, availResources, alpha, verbose): self.getAllPaths() self.getSlackForPath() model = GEKKO(remote=False) - # print(f'GEKKO Path = {model.path}') zero = model.Const(0) one = model.Const(1) alphaConst = model.Const(alpha) @@ -1209,7 +1203,7 @@ def suggestBestOffloadingMultiVMGekko(self, availResources, alpha, verbose): model.sum( [ ( - ((10**5) * 2) + ((10 ** 5) * 2) * (1 - alphaConst) * self.rps * self.estimator.get_num_per_req( @@ -1260,7 +1254,7 @@ def suggestBestOffloadingMultiVMGekko(self, availResources, alpha, verbose): [ ( ( - (10**3) + (10 ** 3) * (alphaConst) * ( model.abs2( @@ -1467,7 +1461,7 @@ def suggestBestOffloadingMultiVMGekko(self, availResources, alpha, verbose): cost = sum( [ ( - ((10**5) * 2) + ((10 ** 5) * 2) * (1 - alpha) * self.rps * self.GetServerlessCostEstimate(offloadingCandidates[i]) @@ -1488,7 +1482,7 @@ def suggestBestOffloadingMultiVMGekko(self, availResources, alpha, verbose): [ ( ( - ((10**5) * 2) + ((10 ** 5) * 2) * (1 - alpha) * ( sum( @@ -1554,7 +1548,7 @@ def suggestBestOffloadingMultiVMGekko(self, availResources, alpha, verbose): ) ) + ( - (10**3) + (10 ** 3) * (alpha) * ( # max(( @@ -1601,7 +1595,7 @@ def suggestBestOffloadingMultiVMGekko(self, availResources, alpha, verbose): cost1 = sum( [ ( - ((10**5) * 2) + ((10 ** 5) * 2) * (1 - alpha) * self.rps * self.GetServerlessCostEstimate(offloadingCandidates[i]) @@ -1627,7 +1621,7 @@ def suggestBestOffloadingMultiVMGekko(self, availResources, alpha, verbose): [ ( ( - ((10**5) * 2) + ((10 ** 5) * 2) * (1 - alpha) * ( sum( @@ -1706,7 +1700,7 @@ def suggestBestOffloadingMultiVMGekko(self, availResources, alpha, verbose): [ ( ( - (10**3) + (10 ** 3) * (alpha) * ( # max(( From de3f6940cb2b5d62dfe354dde11f299e746da2fa Mon Sep 17 00:00:00 2001 From: engshahrad Date: Sun, 28 Jan 2024 02:21:29 +0000 Subject: [PATCH 4/7] using margin percent to prevent repetitive multiplications --- host-agents/monitoring-agent/predictor.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/host-agents/monitoring-agent/predictor.h b/host-agents/monitoring-agent/predictor.h index 91c81c3..48cebbd 100644 --- a/host-agents/monitoring-agent/predictor.h +++ b/host-agents/monitoring-agent/predictor.h @@ -19,7 +19,7 @@ class predictor double max_util, prediction = 0.0; double alpha = 0.85; double ema_margin = 0.2; - double violation_margin = 0.2; + double violation_margin_percent = 20; int size; // tje following variables are for the Markov Chain predictor double mc_margin = 0.05; @@ -63,7 +63,7 @@ class predictor prediction = (alpha * max_util + (1-alpha)*old_value ) * (1+ema_margin); if ( prediction > 100) prediction = 100; - if ( (int(max_util) - int(old_value) > 100*violation_margin ) || (int(old_value) - int(max_util) > 100*violation_margin ) || (prediction > old_value + 100*violation_margin)|| (prediction + 100 *violation_margin < old_value ) || (initialFlag == 1)) + if ( (int(max_util) - int(old_value) > violation_margin_percent ) || (int(old_value) - int(max_util) > violation_margin_percent ) || (prediction > old_value + violation_margin_percent)|| (prediction + violation_margin_percent < old_value ) || (initialFlag == 1)) violation = 1; struct result {double prediction; size_t violation;}; return result {prediction, violation}; @@ -128,7 +128,7 @@ class predictor if (prediction > 100) prediction = 100; - if ( (int(max_util) - int(old_value) > 100*violation_margin ) || (int(old_value) - int(max_util) > 100*violation_margin ) || (prediction > old_value + 100*violation_margin)|| (prediction + 100 *violation_margin < old_value ) || (initialFlag == 1)) + if ( (int(max_util) - int(old_value) > violation_margin_percent ) || (int(old_value) - int(max_util) > violation_margin_percent ) || (prediction > old_value + violation_margin_percent)|| (prediction + violation_margin_percent < old_value ) || (initialFlag == 1)) violation = 1; struct result {double prediction; size_t violation;}; From d9275539c5524e22bcf8ff0d723a2b1accbc2413 Mon Sep 17 00:00:00 2001 From: engshahrad Date: Sun, 28 Jan 2024 02:42:21 +0000 Subject: [PATCH 5/7] removed duplicate imports --- host-agents/execution-agent/cleanup-queue.py | 3 --- log-parser/get-workflow-logs/getNewDatastoreLogs.py | 1 - scheduler/datastoreGarbageCollector.py | 1 - scheduler/resetRoutingDecisions.py | 1 - scheduler/rpsCIScheduler.py | 3 +-- 5 files changed, 1 insertion(+), 8 deletions(-) diff --git a/host-agents/execution-agent/cleanup-queue.py b/host-agents/execution-agent/cleanup-queue.py index 70633ac..c31cfdf 100644 --- a/host-agents/execution-agent/cleanup-queue.py +++ b/host-agents/execution-agent/cleanup-queue.py @@ -5,7 +5,6 @@ from pathlib import Path import os import json -import subprocess import docker from google.cloud import functions_v1 import wget @@ -14,8 +13,6 @@ import sys import configparser from time import sleep -import docker -import sys from datetime import timedelta configPath = ( diff --git a/log-parser/get-workflow-logs/getNewDatastoreLogs.py b/log-parser/get-workflow-logs/getNewDatastoreLogs.py index 7eb18ad..263963f 100644 --- a/log-parser/get-workflow-logs/getNewDatastoreLogs.py +++ b/log-parser/get-workflow-logs/getNewDatastoreLogs.py @@ -6,7 +6,6 @@ import pandas as pd from getNewLogs import GetLog import datetime -import configparser import logging logging.basicConfig( diff --git a/scheduler/datastoreGarbageCollector.py b/scheduler/datastoreGarbageCollector.py index f27afec..2603a66 100644 --- a/scheduler/datastoreGarbageCollector.py +++ b/scheduler/datastoreGarbageCollector.py @@ -3,7 +3,6 @@ import datetime import os from pathlib import Path -import datetime import configparser diff --git a/scheduler/resetRoutingDecisions.py b/scheduler/resetRoutingDecisions.py index 57bf7f6..8ffeab7 100644 --- a/scheduler/resetRoutingDecisions.py +++ b/scheduler/resetRoutingDecisions.py @@ -1,4 +1,3 @@ -import sys import os import json from pathlib import Path diff --git a/scheduler/rpsCIScheduler.py b/scheduler/rpsCIScheduler.py index 428e19b..0005fbb 100644 --- a/scheduler/rpsCIScheduler.py +++ b/scheduler/rpsCIScheduler.py @@ -1,7 +1,6 @@ # import rankerConfig import time import numpy as np -import sys import configparser import os import pandas as pd @@ -271,7 +270,7 @@ def resolveOffloadingSolutions(self): if os.path.exists( str(Path(os.path.dirname(os.path.abspath(__file__)))) + "/lock.txt" ): - print("LOCK EXISTSSS!!") + print("LOCK EXISTS!!") if initType == "forced": logging.info(str(datetime.datetime.now())) logging.info("Forced trigger!!!") From fd86cf0ee369b9f6b570f993e1426733f78315c4 Mon Sep 17 00:00:00 2001 From: engshahrad Date: Sun, 28 Jan 2024 02:43:02 +0000 Subject: [PATCH 6/7] using sys.exit to not rely on the site module being available --- manageHostTopic.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/manageHostTopic.py b/manageHostTopic.py index 12542f3..00dcd76 100644 --- a/manageHostTopic.py +++ b/manageHostTopic.py @@ -1,4 +1,5 @@ import argparse +import sys from google.cloud import pubsub_v1 @@ -90,16 +91,16 @@ def delete_subscriber(subscriber, subscription_id, args): def check_args(args): if not args.create and not args.delete: print("Specify either --create or --delete flag to indicate mode.") - exit(1) + sys.exit(1) if args.create and args.delete: print("Specify either --create or --delete flag to indicate mode.") - exit(1) + sys.exit(1) if args.delete and args.index is None: print("Specify --index value to indicate which VM topic & subscriber to delete") - exit(1) + sys.exit(1) if args.create and args.index is not None: print("Do not specify --index value when creating VM topic & subscriber") - exit(1) + sys.exit(1) if __name__ == "__main__": From d573f183ee2003c957e8a2e7985c313b10625c12 Mon Sep 17 00:00:00 2001 From: engshahrad Date: Sun, 28 Jan 2024 02:46:52 +0000 Subject: [PATCH 7/7] removed unused variable --- host-agents/execution-agent/execution-agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/host-agents/execution-agent/execution-agent.py b/host-agents/execution-agent/execution-agent.py index c15514f..abd120d 100644 --- a/host-agents/execution-agent/execution-agent.py +++ b/host-agents/execution-agent/execution-agent.py @@ -3,7 +3,7 @@ def main(): - for i in range(0, int(sys.argv[1])): + for _ in range(0, int(sys.argv[1])): with open("/tmp/output.log", "a") as output: subprocess.Popen( "python3 vmModule.py", shell=True, stdout=output, stderr=output