-
Notifications
You must be signed in to change notification settings - Fork 2
/
sast-rqc.py
173 lines (147 loc) · 5.95 KB
/
sast-rqc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#!/usr/bin/python3
import datetime
import requests
import ast
import csv
import time
import json
import os
def create_report():
directory = 'vulnerability_reports'
if not os.path.exists(directory):
os.makedirs(directory)
current_date = datetime.datetime.now()
current_date_format = current_date.strftime("%m-%d-%Y-%Hh%M")
current_date_format_string = str(current_date_format)
file_name = "vulnerabilities-" + current_date_format_string + ".csv"
file_path = os.path.join(directory, file_name)
return file_path
def csv_to_markdown_table(csv_file_path):
with open(csv_file_path, 'r') as file:
reader = csv.reader(file)
headers = next(reader)
rows = [row for row in reader]
# Create markdown table
markdown_table = '| ' + ' | '.join(headers) + ' |\n'
markdown_table += '| ' + ' | '.join(['---'] * len(headers)) + ' |\n'
for row in rows:
markdown_table += '| ' + ' | '.join(row) + ' |\n'
return markdown_table
def json_to_csv(file_path, json_data, csv_file_path):
# Open the CSV file in append mode
with open(csv_file_path, mode='a', newline='') as csv_file:
writer = csv.writer(csv_file)
# Write the header if the file is empty
if csv_file.tell() == 0:
header = ["file", "title", "severity", "correction", "lines"]
writer.writerow(header)
# Write the data
for item in json_data:
row = [file_path, item['title'], item['severity'], item['correction'], item['lines']]
writer.writerow(row)
def save_output(name: str, value: str):
with open(os.environ['GITHUB_OUTPUT'], 'a') as output_file:
print(f'{name}<<EOF', file=output_file)
print(value, file=output_file)
print('EOF', file=output_file)
# Step 1: Authentication to obtain access token
def get_access_token(account_slug, client_id, client_key):
url = f"https://idm.stackspot.com/{account_slug}/oidc/oauth/token"
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
data = {
'client_id': client_id,
'grant_type': 'client_credentials',
'client_secret': client_key
}
response = requests.post(url, headers=headers, data=data)
response_data = response.json()
return response_data['access_token']
# Step 2: Creation of a Quick Command (RQC) execution
def create_rqc_execution(qc_slug, access_token, input_data):
url = f"https://genai-code-buddy-api.stackspot.com/v1/quick-commands/create-execution/{qc_slug}"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {access_token}'
}
data = {
'input_data': input_data
}
# print('File data to analyze:', data)
response = requests.post(
url,
headers=headers,
json=data
)
if response.status_code == 200:
decoded_content = response.content.decode('utf-8') # Decode bytes to string
extracted_value = decoded_content.strip('"') # Strip the surrounding quotes
response_data = extracted_value
print('ExecutionID:', response_data)
return response_data
else:
print(response.status_code)
print(response.content)
# Step 3: Polling for the execution status
def get_execution_status(execution_id, access_token):
url = f"https://genai-code-buddy-api.stackspot.com/v1/quick-commands/callback/{execution_id}"
headers = {'Authorization': f'Bearer {access_token}'}
i = 0
while True:
response = requests.get(
url,
headers=headers
)
response_data = response.json()
status = response_data['progress']['status']
if status in ['COMPLETED', 'FAILED']:
return response_data
else:
print("Status:", f'{status} ({i})')
print("Execution in progress, waiting...")
i+=1
time.sleep(5) # Wait for 5 seconds before polling again
# Replace the placeholders with your actual data
CLIENT_ID = os.getenv("CLIENT_ID")
CLIENT_KEY = os.getenv("CLIENT_KEY")
ACCOUNT_SLUG = os.getenv("CLIENT_REALM")
QC_SLUG = os.getenv("QC_SLUG")
CHANGED_FILES = os.getenv("CHANGED_FILES")
print(f'\033[36mFiles to analyze: {CHANGED_FILES}\033[0m')
CHANGED_FILES = ast.literal_eval(CHANGED_FILES)
all_result = []
report_path = create_report()
try:
for file_path in CHANGED_FILES:
print(f'\n\033[36mFile Path: {file_path}\033[0m')
# Open the file and read its content
with open(file_path, 'r') as file:
file_content = file.read()
# Execute the steps
access_token = get_access_token(ACCOUNT_SLUG, CLIENT_ID, CLIENT_KEY)
execution_id = create_rqc_execution(QC_SLUG, access_token, file_content)
execution_status = get_execution_status(execution_id, access_token)
result = execution_status['result']
# Remove the leading and trailing ```json and ``` for correct JSON parsing
if result.startswith("```json"):
result = result[7:-4].strip()
result_data = json.loads(result)
vulnerabilities_amount = len(result_data)
print(f"\n\033[36m{vulnerabilities_amount} item(s) have been found for file {file_path}:\033[0m")
# Iterate through each item and print the required fields
for item in result_data:
print(f"\nTitle: {item['title']}")
print(f"Severity: {item['severity']}")
print(f"Correction: {item['correction']}")
print(f"Lines: {item['lines']}")
if len(result_data) > 0:
save_output('result', result_data)
json_to_csv(file_path, result_data, report_path)
save_output('report_file', report_path)
markdown_table = csv_to_markdown_table(report_path)
save_output('report_table', markdown_table)
except OSError as e:
print(f"An error occurred while creating the directory or file: {e}")
exit(1)
except Exception as e:
print(f"An unexpected error occurred: {e}")
exit(1)