forked from SSHcom/privx-sdk-for-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadd-role.py
196 lines (154 loc) · 5.11 KB
/
add-role.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# An example how to use PrivX API for granting roles to users.
# Can be used as basis for automating temporary role grants via ServiceNow or
# similar services.
# Requires Python 3.6+
import datetime
import sys
import config
# this importation for demonstration purpose only
# for proper importation of privx_api module
# see https://github.com/SSHcom/privx-sdk-for-python#getting-started
try:
# Running example with pip-installed SDK
import privx_api
except ImportError:
# Running example without installing SDK
from utils import load_privx_api_lib_path
load_privx_api_lib_path()
import privx_api
# Initialize the API.
api = privx_api.PrivXAPI(
config.HOSTNAME,
config.HOSTPORT,
config.CA_CERT,
config.OAUTH_CLIENT_ID,
config.OAUTH_CLIENT_SECRET,
)
# Authenticate.
api.authenticate(config.API_CLIENT_ID, config.API_CLIENT_SECRET)
# Target-user name
EMAIL = "alice@example.com"
# Name of the role to grant to the target user
ROLE = "Example Role"
# Specify the validity of the grant as one of the following:
# * PERMANENT: The grant never expires.
# * FLOATING: Validity starts running after user permissions are checked.
# * TIME_RESTRICTED: Specify timestamps for grant start and end.
GRANT_TYPE = "FLOATING"
# Validity of the grant in hours (for time-restricted grants only)
VALIDITY = 1
def get_user_id(email: str) -> str:
"""
Return the ID of the user with the given email.
:param email: Email address of the user.
:return: Matching user's ID. None if the user cannot be found.
"""
response = api.search_users(search_payload={"keywords": EMAIL})
if not response.ok:
print(response.data)
sys.exit(1)
for item in response.data.get("items"):
if item.get("email") == EMAIL:
return item.get("id")
else:
return None
def get_user_roles(user_id: str) -> list:
"""
Return the current roles of the target user.
:param user_id: ID of the target user.
:return: List of target-user's roles.
"""
response = api.get_user_roles(user_id)
if not response.ok:
print(response.data)
sys.exit(1)
return response.data.get("items")
def get_role_id(name: str) -> str:
"""
Return the ID of the named role.
:param name: Name of the role.
:return: Matching role's ID. None if the role cannot be found.
"""
response = api.get_roles()
if not response.ok:
print(response.data)
sys.exit(1)
for role in response.data.get("items"):
if role.get("name") == ROLE:
return role.get("id")
else:
return None
def get_requested_role(role_id: str, grant_type: str, duration: int = None) -> object:
"""
Return the role object for granting target-role membership.
:param role_id: ID of the target role.
:param grant_type: Grant type for specifying membership validity.
:param duration: Validity period of the grant.
Only applicable to non-permanent grant types.
:return: Role object for the specified grant.
"""
duration = 1 if not duration else abs(int(duration))
start = datetime.datetime.utcnow()
end = start + datetime.timedelta(hours=duration)
roles = {
"PERMANENT": {
"id": role_id,
"name": ROLE,
"grant_type": "PERMANENT",
"grant_start": None,
"grant_end": None,
"floating_length": None,
"explicit": True,
},
"FLOATING": {
"id": role_id,
"name": ROLE,
"grant_type": "FLOATING",
"grant_start": None,
"grant_end": None,
"floating_length": duration,
"explicit": True,
},
"TIME_RESTRICTED": {
"id": role_id,
"name": ROLE,
"grant_type": "TIME_RESTRICTED",
"grant_start": start.isoformat() + "Z",
"grant_end": end.isoformat() + "Z",
"floating_length": 0,
"explicit": True,
},
}
if grant_type not in roles:
print(f"Invalid grant type {grant_type}. Exiting...")
sys.exit(1)
return roles[grant_type]
def main():
print(f"Searching for user {EMAIL}")
userID = get_user_id(EMAIL)
if not userID:
print(f"Cannot find user with email {EMAIL}. Exiting...")
sys.exit(1)
print(f"Fetching target role {ROLE}")
roleID = get_role_id(ROLE)
if not roleID:
print(f"Cannot find role with name {ROLE}. Exiting...")
sys.exit(1)
print("Fetching user's existing roles")
userRolesQuery = api.get_user_roles(userID)
if not userRolesQuery.ok:
print(userRolesQuery.data)
sys.exit(1)
print("Determining new role setup for user.")
userRoles = get_user_roles(userID)
role = get_requested_role(roleID, GRANT_TYPE, VALIDITY)
userRoles.append(role)
print("Updating roles for user")
response = api.set_user_roles(userID, userRoles)
if response.ok:
print("User roles updated.")
else:
print(response.data)
print("User-role update failed.")
if __name__ == "__main__":
main()