forked from Symantec/slack-autoarchive
-
Notifications
You must be signed in to change notification settings - Fork 0
/
slack_autoarchive.py
executable file
·253 lines (223 loc) · 10.9 KB
/
slack_autoarchive.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
#!/usr/bin/env python
"""
This program lets you do archive slack channels which are no longer active.
"""
# standard imports
from datetime import datetime
import os
import sys
import time
import json
# not standard imports
import requests
from config import get_channel_reaper_settings
from utils import get_logger
class ChannelReaper():
"""
This class can be used to archive slack channels.
"""
def __init__(self):
self.settings = get_channel_reaper_settings()
self.logger = get_logger('channel_reaper', './audit.log')
def get_whitelist_keywords(self):
"""
Get all whitelist keywords. If this word is used in the channel
purpose or topic, this will make the channel exempt from archiving.
"""
keywords = []
if os.path.isfile('whitelist.txt'):
with open('whitelist.txt') as filecontent:
keywords = filecontent.readlines()
# remove whitespace characters like `\n` at the end of each line
keywords = map(lambda x: x.strip(), keywords)
whitelist_keywords = self.settings.get('whitelist_keywords')
if whitelist_keywords:
keywords = keywords + whitelist_keywords.split(',')
return list(keywords)
def get_channel_alerts(self):
"""Get the alert message which is used to notify users in a channel of archival. """
archive_msg = """
This channel has had no activity for %d days. It is being auto-archived.
If you feel this is a mistake you can <https://get.slack.help/hc/en-us/articles/201563847-Archive-a-channel#unarchive-a-channel|unarchive this channel>.
This will bring it back at any point. In the future, you can add '%%noarchive' to your channel topic or purpose to avoid being archived.
This script was run from this repo: https://github.com/Symantec/slack-autoarchive
""" % self.settings.get('days_inactive')
alerts = {'channel_template': archive_msg}
if os.path.isfile('templates.json'):
with open('templates.json') as filecontent:
alerts = json.load(filecontent)
return alerts
# pylint: disable=too-many-arguments
def slack_api_http(
self,
api_endpoint=None,
payload=None,
method='GET',
# pylint: disable=unused-argument
retry=True,
retry_delay=0):
""" Helper function to query the slack api and handle errors and rate limit. """
# pylint: disable=no-member
uri = 'https://slack.com/api/' + api_endpoint
payload['token'] = self.settings.get('slack_token')
try:
# Force request to take at least 1 second. Slack docs state:
# > In general we allow applications that integrate with Slack to send
# > no more than one message per second. We allow bursts over that
# > limit for short periods.
if retry_delay > 0:
time.sleep(retry_delay)
if method == 'POST':
response = requests.post(uri, data=payload)
else:
response = requests.get(uri, params=payload)
if response.status_code == requests.codes.ok and 'error' in response.json(
) and response.json()['error'] == 'not_authed':
self.logger.error(
'Need to setup auth. eg, SLACK_TOKEN=<secret token> python slack-autoarchive.py'
)
sys.exit(1)
elif response.status_code == requests.codes.ok and response.json(
)['ok']:
return response.json()
elif response.status_code == requests.codes.too_many_requests:
retry_timeout = float(response.headers['Retry-After'])
# pylint: disable=too-many-function-args
return self.slack_api_http(api_endpoint, payload, method,
False, retry_timeout)
except Exception as error_msg:
raise Exception(error_msg)
return None
def get_all_channels(self):
""" Get a list of all non-archived channels from slack channels.list. """
payload = {'exclude_archived': 1}
api_endpoint = 'channels.list'
channels = self.slack_api_http(api_endpoint=api_endpoint,
payload=payload)['channels']
all_channels = []
for channel in channels:
all_channels.append({
'id': channel['id'],
'name': channel['name'],
'created': channel['created'],
'num_members': channel['num_members']
})
return all_channels
def get_last_message_timestamp(self, channel_history, too_old_datetime):
""" Get the last message from a slack channel, and return the time. """
last_message_datetime = too_old_datetime
last_bot_message_datetime = too_old_datetime
if 'messages' not in channel_history:
return (last_message_datetime, False) # no messages
for message in channel_history['messages']:
if 'subtype' in message and message[
'subtype'] in self.settings.get('skip_subtypes'):
continue
last_message_datetime = datetime.fromtimestamp(float(
message['ts']))
break
# for folks with the free plan, sometimes there is no last message,
# then just set last_message_datetime to epoch
if not last_message_datetime:
last_bot_message_datetime = datetime.utcfromtimestamp(0)
# return bot message time if there was no user message
if too_old_datetime >= last_bot_message_datetime > too_old_datetime:
return (last_bot_message_datetime, False)
return (last_message_datetime, True)
def is_channel_disused(self, channel, too_old_datetime):
""" Return True or False depending on if a channel is "active" or not. """
num_members = channel['num_members']
payload = {'inclusive': 0, 'oldest': 0, 'count': 50}
api_endpoint = 'channels.history'
payload['channel'] = channel['id']
channel_history = self.slack_api_http(api_endpoint=api_endpoint,
payload=payload)
(last_message_datetime, is_user) = self.get_last_message_timestamp(
channel_history, datetime.fromtimestamp(float(channel['created'])))
# mark inactive if last message is too old, but don't
# if there have been bot messages and the channel has
# at least the minimum number of members
min_members = self.settings.get('min_members')
has_min_users = (min_members == 0 or min_members > num_members)
return last_message_datetime <= too_old_datetime and (not is_user
or has_min_users)
# If you add channels to the WHITELIST_KEYWORDS constant they will be exempt from archiving.
def is_channel_whitelisted(self, channel, white_listed_channels):
""" Return True or False depending on if a channel is exempt from being archived. """
# self.settings.get('skip_channel_str')
# if the channel purpose contains the string self.settings.get('skip_channel_str'), we'll skip it.
info_payload = {'channel': channel['id']}
channel_info = self.slack_api_http(api_endpoint='channels.info',
payload=info_payload,
method='GET')
channel_purpose = channel_info['channel']['purpose']['value']
channel_topic = channel_info['channel']['topic']['value']
if self.settings.get(
'skip_channel_str') in channel_purpose or self.settings.get(
'skip_channel_str') in channel_topic:
return True
# check the white listed channels (file / env)
for white_listed_channel in white_listed_channels:
wl_channel_name = white_listed_channel.strip('#')
if wl_channel_name in channel['name']:
return True
return False
def send_channel_message(self, channel_id, message):
""" Send a message to a channel or user. """
payload = {
'channel': channel_id,
'username': 'channel_reaper',
'icon_emoji': ':ghost:',
'text': message
}
api_endpoint = 'chat.postMessage'
self.slack_api_http(api_endpoint=api_endpoint,
payload=payload,
method='POST')
def archive_channel(self, channel, alert):
""" Archive a channel, and send alert to slack admins. """
api_endpoint = 'channels.archive'
stdout_message = 'Archiving channel... %s' % channel['name']
self.logger.info(stdout_message)
if not self.settings.get('dry_run'):
channel_message = alert.format(self.settings.get('days_inactive'))
self.send_channel_message(channel['id'], channel_message)
payload = {'channel': channel['id']}
self.slack_api_http(api_endpoint=api_endpoint, payload=payload)
self.logger.info(stdout_message)
def send_admin_report(self, channels):
""" Optionally this will message admins with which channels were archived. """
if self.settings.get('admin_channel'):
channel_names = ', '.join('#' + channel['name']
for channel in channels)
admin_msg = 'Archiving %d channels: %s' % (len(channels),
channel_names)
if self.settings.get('dry_run'):
admin_msg = '[DRY RUN] %s' % admin_msg
self.send_channel_message(self.settings.get('admin_channel'),
admin_msg)
def main(self):
"""
This is the main method that checks all inactive channels and archives them.
"""
if self.settings.get('dry_run'):
self.logger.info(
'THIS IS A DRY RUN. NO CHANNELS ARE ACTUALLY ARCHIVED.')
whitelist_keywords = self.get_whitelist_keywords()
alert_templates = self.get_channel_alerts()
archived_channels = []
for channel in self.get_all_channels():
sys.stdout.write('.')
sys.stdout.flush()
channel_whitelisted = self.is_channel_whitelisted(
channel, whitelist_keywords)
channel_disused = self.is_channel_disused(
channel, self.settings.get('too_old_datetime'))
if (not channel_whitelisted and channel_disused):
archived_channels.append(channel)
self.archive_channel(channel,
alert_templates['channel_template'])
self.send_admin_report(archived_channels)
if __name__ == '__main__':
CHANNEL_REAPER = ChannelReaper()
CHANNEL_REAPER.main()