-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathi3n-bar
executable file
·102 lines (79 loc) · 2.93 KB
/
i3n-bar
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#!/usr/bin/env python
import argparse
import sys
import getpass
import i3
import zmq
# Enable daemon mode on Subscription thread:
i3.Subscription.daemon = True
instance = None
debug_enabled = False
def display(msg):
""" Display a text on stdout and flush it.
"""
sys.stdout.write('%s\n' % msg)
sys.stdout.flush()
debug('Display: %r' % msg)
def debug(msg):
""" Print debug text on stderr.
"""
if debug_enabled:
line = u'[i3n-daemon/%s] %s\n' % (instance, msg)
sys.stderr.write(line.encode('utf-8'))
def get_current_workspace():
""" Get the current focused workspace.
"""
workspaces = [w for w in i3.get_workspaces() if w['focused']]
assert len(workspaces) == 1, 'Only one workspace should be focused at once'
return workspaces[0]
def main(args):
debug(u'i3n daemon is running')
annotations = {} # Store annotations for each workspace
def cb_workspace_change(event, data, subscription):
""" Callback used when a current workspace is changed.
"""
data = [w for w in data if w['visible'] and w['output'] == args.output]
assert len(data) == 1, 'only 1 workspace should match'
workspace = data[0]
debug(u'Changed workspace to %s' % workspace['name'])
display(annotations.get(workspace['name'], ' '))
i3.Subscription(cb_workspace_change, 'workspace')
context = zmq.Context()
if args.slave:
# Socket used to receive updates from master:
socket = context.socket(zmq.SUB)
socket.connect('ipc://%s.slave' % args.ipc)
else:
# Socket used to receive updates from client:
socket = context.socket(zmq.SUB)
socket.bind('ipc://%s' % args.ipc)
# Socket used to relay updates to slaves:
socket_relay = context.socket(zmq.PUB)
socket_relay.bind('ipc://%s.slave' % args.ipc)
socket.setsockopt(zmq.SUBSCRIBE, '')
while True:
msg = socket.recv_json()
debug(u'Received message: %r' % msg)
if msg['type'] == u'set':
annotations[msg['workspace']] = msg['annotation']
cur_workspace = get_current_workspace()
if msg['workspace'] == cur_workspace['name'] and args.output == cur_workspace['output']:
display(msg['annotation'])
# Relay message to slave instances:
if not args.slave:
debug('Message relayed to slaves')
socket_relay.send_json(msg)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('output')
parser.add_argument('--ipc', default='/tmp/i3n-%s' % getpass.getuser())
parser.add_argument('--slave', action='store_true', default=False)
parser.add_argument('-d', '--debug', action='store_true', default=False)
args = parser.parse_args()
instance = args.output
debug_enabled = args.debug
try:
main(args)
except Exception as err:
debug(u'Fatal error:')
raise