Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Seasonal cleanup #115

Merged
merged 7 commits into from
Jan 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions host-agents/execution-agent/cleanup-queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from pathlib import Path
import os
import json
import subprocess
import docker
from google.cloud import functions_v1
import wget
Expand All @@ -14,8 +13,6 @@
import sys
import configparser
from time import sleep
import docker
import sys
from datetime import timedelta

configPath = (
Expand Down
2 changes: 1 addition & 1 deletion host-agents/execution-agent/execution-agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@


def main():
for i in range(0, int(sys.argv[1])):
for _ in range(0, int(sys.argv[1])):
with open("/tmp/output.log", "a") as output:
subprocess.Popen(
"python3 vmModule.py", shell=True, stdout=output, stderr=output
Expand Down
6 changes: 3 additions & 3 deletions host-agents/monitoring-agent/predictor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class predictor
double max_util, prediction = 0.0;
double alpha = 0.85;
double ema_margin = 0.2;
double violation_margin = 0.2;
double violation_margin_percent = 20;
int size;
// tje following variables are for the Markov Chain predictor
double mc_margin = 0.05;
Expand Down Expand Up @@ -63,7 +63,7 @@ class predictor
prediction = (alpha * max_util + (1-alpha)*old_value ) * (1+ema_margin);
if ( prediction > 100)
prediction = 100;
if ( (int(max_util) - int(old_value) > 100*violation_margin ) || (int(old_value) - int(max_util) > 100*violation_margin ) || (prediction > old_value + 100*violation_margin)|| (prediction + 100 *violation_margin < old_value ) || (initialFlag == 1))
if ( (int(max_util) - int(old_value) > violation_margin_percent ) || (int(old_value) - int(max_util) > violation_margin_percent ) || (prediction > old_value + violation_margin_percent)|| (prediction + violation_margin_percent < old_value ) || (initialFlag == 1))
violation = 1;
struct result {double prediction; size_t violation;};
return result {prediction, violation};
Expand Down Expand Up @@ -128,7 +128,7 @@ class predictor
if (prediction > 100)
prediction = 100;

if ( (int(max_util) - int(old_value) > 100*violation_margin ) || (int(old_value) - int(max_util) > 100*violation_margin ) || (prediction > old_value + 100*violation_margin)|| (prediction + 100 *violation_margin < old_value ) || (initialFlag == 1))
if ( (int(max_util) - int(old_value) > violation_margin_percent ) || (int(old_value) - int(max_util) > violation_margin_percent ) || (prediction > old_value + violation_margin_percent)|| (prediction + violation_margin_percent < old_value ) || (initialFlag == 1))
violation = 1;

struct result {double prediction; size_t violation;};
Expand Down
7 changes: 1 addition & 6 deletions log-parser/get-workflow-logs/getNewDatastoreLogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import pandas as pd
from getNewLogs import GetLog
import datetime
import configparser
import logging

logging.basicConfig(
Expand Down Expand Up @@ -54,7 +53,6 @@ def __init__(self, workflow):
self.startTest = datetime.datetime.strptime(
(startDate), "%Y-%m-%d %H:%M:%S.%f"
)
# self.startTest = datetime.datetime.strptime((startDate), "%Y-%m-%d %H:%M:%S.%f")
self.dictData["function"] = []
self.dictData["reqID"] = []
self.dictData["start"] = []
Expand Down Expand Up @@ -272,10 +270,7 @@ def saveNewLogs(self):

def keepWindowSize(self, df):
serverlessDF = df.loc[df["host"] == "s"]
df.drop(
df[(df["host"] == "s")].index,
inplace=True,
)
df.drop(df[(df["host"] == "s")].index, inplace=True)
df["start"] = pd.to_datetime(df["start"], utc=True)
df.sort_values(by=["start"], ascending=False, inplace=True)
vmSelectedDF = df.groupby(["host", "function"]).head(self.windowSize)
Expand Down
4 changes: 1 addition & 3 deletions log-parser/get-workflow-logs/getNewServerlessLogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ def __init__(self, workflow):
self.config.read(path)
self.rankerConfig = self.config["settings"]
self.windowSize = int(self.rankerConfig["windowSize"])
# self.windowSize = 50
self.dictData["function"] = []
self.dictData["reqID"] = []
self.dictData["start"] = []
Expand Down Expand Up @@ -141,7 +140,6 @@ def checkLatestTimeStamp(self, func):
self.tempTimeStampRecorded = copy.deepcopy(workflow_json)
if func in workflow_json.keys():
lastRecordedTimestamp = str(workflow_json[func])
# lastRecordedTimestamp = str(workflow_json[func][0]["time_utc"]
arrayTS = lastRecordedTimestamp.split()
self.lastTimestamp = arrayTS[0] + "T" + arrayTS[1]
print("TT::::", self.lastTimestamp)
Expand Down Expand Up @@ -174,7 +172,7 @@ def pullLogs(self, function):
self.tempTimeStampRecorded[function] = lastLog_date[0]
print("Time for func: ", function, " is::", lastLog_date[0])
endFlag = False
while endFlag != True:
while endFlag is False:
tempDate = datetime.datetime.strptime(
self.tempTimeStampRecorded[function], "%Y-%m-%d %H:%M:%S.%f"
)
Expand Down
9 changes: 5 additions & 4 deletions manageHostTopic.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import argparse
import sys

from google.cloud import pubsub_v1

Expand Down Expand Up @@ -90,16 +91,16 @@ def delete_subscriber(subscriber, subscription_id, args):
def check_args(args):
if not args.create and not args.delete:
print("Specify either --create or --delete flag to indicate mode.")
exit(1)
sys.exit(1)
if args.create and args.delete:
print("Specify either --create or --delete flag to indicate mode.")
exit(1)
sys.exit(1)
if args.delete and args.index is None:
print("Specify --index value to indicate which VM topic & subscriber to delete")
exit(1)
sys.exit(1)
if args.create and args.index is not None:
print("Do not specify --index value when creating VM topic & subscriber")
exit(1)
sys.exit(1)


if __name__ == "__main__":
Expand Down
26 changes: 11 additions & 15 deletions scheduler/Estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def getExecutionTime(self, host):
)
# if (selectedInits.shape[0]) >= self.windowSize:
# selectedInits = selectedInits.head(self.windowSize)
for i, record in selectedInits.iterrows():
for _, record in selectedInits.iterrows():
durations.append(record["duration"])
if mode == "best-case":
if host == "s":
Expand Down Expand Up @@ -368,7 +368,9 @@ def getPubSubMessageSize(self):
else:
psSize = psSizeSeries.item()
pubSubSize[func] = psSize
nonzero_vals = [ pubSubSize[func] for func in pubSubSize.keys() if pubSubSize[func] != 0 ]
nonzero_vals = [
pubSubSize[func] for func in pubSubSize.keys() if pubSubSize[func] != 0
]
if len(nonzero_vals) != 0:
average_nonzeroes = np.mean(np.array(nonzero_vals))
for func in pubSubSize.keys():
Expand Down Expand Up @@ -404,16 +406,13 @@ def getFuncExecutionTime(self, func, host, mode):
if (len(selectedInits) == 0) and (len(durations) == 0):
return 0
if len(selectedInits) != 0:
# newMergingPatternChanges
g = selectedInits.groupby(selectedInits["reqID"], sort=False)
selectedInits = pd.concat(
islice(map(itemgetter(1), g), max(0, g.ngroups - self.windowSize), None)
)
# grouped = selectedInits.groupby('reqID')
# selectedInits = pd.concat([grouped.get_group(group) for i, group in enumerate(grouped.groups) if i>=len(grouped)-self.windowSize])
# if (selectedInits.shape[0]) >= self.windowSize:
# selectedInits = selectedInits.head(self.windowSize)
for i, record in selectedInits.iterrows():
for _, record in selectedInits.iterrows():
durations.append(record["duration"])
if mode == "best-case":
if host == "s":
Expand All @@ -430,9 +429,8 @@ def getFuncExecutionTime(self, func, host, mode):
exeTime = self.getMean(durations)
return exeTime

# newMergingPatternChanges
def get_num_per_req(self, func, test):
if test == True:
if test is True:
return 1
selectedInits = self.dataframe.loc[(self.dataframe["function"] == func)]
counts = (selectedInits.groupby(["reqID"]).size().reset_index(name="counts"))[
Expand Down Expand Up @@ -474,7 +472,6 @@ def getComLatency(self, child, parent, childHost, parentHost, mode):
# "NOTFOUND:::", parent, "::", parentHost, "-->", child, ":::", childHost
# )
return "NotFound"
# newMergingPatternChanges
selectedInitsParentFinish = (
selectedInitsParent[(selectedInitsParent.reqID.isin(reqs))]
)[["reqID", "finish"]]
Expand Down Expand Up @@ -585,7 +582,6 @@ def getComLatency(self, child, parent, childHost, parentHost, mode):
if type(x) == str
else x.replace(tzinfo=None)
)
# newDF['duration'] = (newDF['start'] - newDF['end']).dt.microseconds* 0.001
newDF["duration"] = (
(newDF["start"] - newDF["end"]).dt.total_seconds().mul(1000).astype(int)
)
Expand Down Expand Up @@ -626,7 +622,7 @@ def getCost(self):
)
# if (selectedInits.shape[0]) >= self.windowSize:
# selectedInits = selectedInits.head(self.windowSize)
for i, record in selectedInits.iterrows():
for _, record in selectedInits.iterrows():
durations.append(record["duration"])
if mode == "best-case":
et = self.getUpperBound(durations)
Expand Down Expand Up @@ -666,7 +662,7 @@ def getFuncCost(self, mode, func):
)
# if (selectedInits.shape[0]) >= self.windowSize:
# selectedInits = selectedInits.head(self.windowSize)
for i, record in selectedInits.iterrows():
for _, record in selectedInits.iterrows():
durations.append(record["duration"])
if mode == "best-case":
et = self.getUpperBound(durations)
Expand All @@ -688,14 +684,14 @@ def getDurationsList(self, func, host):
selectedInits["start"] = pd.to_datetime(selectedInits["start"])
selectedInits.sort_values(by=["start"], ascending=False, inplace=True)
if (len(selectedInits) == 0) and (len(durations) == 0):
# print("No records found for {} running in {}".format(func, host))
print("No records found for {} running in {}".format(func, host))
return []
if len(selectedInits) != 0:
g = selectedInits.groupby(selectedInits["reqID"], sort=False)
selectedInits = pd.concat(
islice(map(itemgetter(1), g), max(0, g.ngroups - self.windowSize), None)
)
for i, record in selectedInits.iterrows():
for _, record in selectedInits.iterrows():
durations.append(record["duration"])
return durations

Expand All @@ -706,7 +702,7 @@ def distributions_SMDTest(self, vmDurations, serverlessDuration):
vmStd = np.std(np.array(vmDurations))
serverlessStd = np.std(np.array(serverlessDuration))
SMD = abs(vmMean - serverlessMean) / math.sqrt(
((vmStd**2) + (serverlessStd**2)) / 2
((vmStd ** 2) + (serverlessStd ** 2)) / 2
)
print("SMD::", SMD)
if SMD < 0.1:
Expand Down
1 change: 0 additions & 1 deletion scheduler/LatencyModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ def getLinearAddedLatency(self, msgSize):
(float(vmCoefficients[i])) - (float(serverlessCoefficients[i]))
)
return addedLatency
# return 0


if __name__ == "__main__":
Expand Down
23 changes: 2 additions & 21 deletions scheduler/baselineSlackAnalysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@
class baselineSlackAnalysisClass:
def __init__(self, workflow):
self.workflow = workflow
# jsonPath = os.getcwd() + "/log-parser/get-workflow-logs/data/" + self.workflow+".json"
# dataframePath = os.getcwd() + "/log-parser/get-workflow-logs/data/" + self.workflow + "/NEWWgeneratedData.pkl"
jsonPath = (
str(Path(os.path.dirname(os.path.abspath(__file__))).resolve().parents[0])
+ "/log-parser/get-workflow-logs/data/"
+ self.workflow
+ ".json"
)
# dataframePath = str(Path(os.getcwd()).resolve().parents[0]) + "/log-parser/get-workflow-logs/data/" + self.workflow + "/generatedDataFrame.pkl"
dfDir = Path(
str(Path(os.path.dirname(os.path.abspath(__file__))).parents[0])
+ "/log-parser/get-workflow-logs/data/"
Expand Down Expand Up @@ -112,7 +109,6 @@ def __init__(self, workflow):
len(self.predecessors[self.workflowFunctions.index(func)])
) - 1
self.memories = workflow_json["memory"]
# self.dataframe = pd.read_pickle(dataframePath)
self.selectedIDs = self.selectRecords()
self.observations = self.getObservations()
self.slackCalculations()
Expand All @@ -126,27 +122,23 @@ def selectRecords(self):
selectedInits["start"] = pd.to_datetime(selectedInits["start"])
selectedInits.sort_values(by=["start"], ascending=False, inplace=True)
selectedRecords = []
for i, record in selectedInits.iterrows():
for _, record in selectedInits.iterrows():
selectedReq = self.dataframe.loc[
(self.dataframe["reqID"] == record["reqID"])
& (self.dataframe["host"] == "s")
]
# newMergingPatternChanges
createdSet = selectedReq["function"].copy()
createdSet = set(createdSet.to_numpy())
if (selectedReq.shape[0] >= self.recordNum) and (
len(createdSet) == len(self.workflowFunctions)
):
# if selectedReq.shape[0] >= self.recordNum:
selectedRecords.append(record["reqID"])
if len(selectedRecords) >= self.windowSize:
selectedRecords = selectedRecords[: self.windowSize]
# print("SELECTEDRECORDS::::", selectedRecords)
return selectedRecords

def getObservations(self):
for func in self.workflowFunctions:
# newMergingPatternChanges
for reqID in self.selectedIDs:
df2 = self.dataframe.loc[
(
Expand All @@ -155,11 +147,8 @@ def getObservations(self):
)
]
if df2.shape[0] == 1:
# merging = self.dataframe.loc[((self.dataframe["reqID"] == reqID) & (self.dataframe["function"] == func)), "mergingPoint"]
# if merging == None:
selectedDuration = df2.iloc[0]["duration"]
self.slackData[func].append(selectedDuration)
# self.dataframe.loc[((self.dataframe["reqID"] == reqID) & (self.dataframe["function"] == func)), "duration"]
elif len(self.predecessors[self.workflowFunctions.index(func)]) > 1:
start = df2["start"].max()
finish = df2["finish"].max()
Expand All @@ -169,7 +158,6 @@ def getObservations(self):
duration = df2["duration"].mean()
self.slackData[func].append(duration)
for entry in self.slackData.keys():
# newMergingPatternChanges
if entry not in self.workflowFunctions:
for reqID in self.selectedIDs:
prevFunc = entry.split("-")[0]
Expand Down Expand Up @@ -206,14 +194,10 @@ def getObservations(self):
]
else:
start = self.avg_datetime(dfNext["start"])
# start = dfNext.loc[dfNext["mergingPoint"] == prevFunc].iloc[0][
# "start"
# ]

duration = ((start - finish).total_seconds()) * 1000
self.slackData[entry].append(duration)

# newMergingPatternChanges
def avg_datetime(self, series):
dt_min = series.min()
deltas = [x - dt_min for x in series]
Expand All @@ -231,8 +215,6 @@ def getUpperBound(self, array):
return upperBound

def getMedian(self, array):
# median = array.quantile(0.5)

median = statistics.median(array)
statistics.quantiles
return median
Expand Down Expand Up @@ -292,7 +274,7 @@ def completeLSLF(self, duration, criticalPath):
if d[1] == d2[0]:
terminalFlag = False
break
if terminalFlag == True:
if terminalFlag is True:
terminals.append(d[1])
for t in terminals:
self.lf[t] = duration
Expand Down Expand Up @@ -353,7 +335,6 @@ def slackCalculations(self):
for col in self.slackData.keys():
slack = self.lf[col] - self.ef[col]
slackResults[col][decisionMode] = slack
# print(slackResults)
with open(
(
(os.path.dirname(os.path.abspath(__file__)))
Expand Down
Loading
Loading