-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathread_mouse_delta.py
executable file
·152 lines (128 loc) · 3.48 KB
/
read_mouse_delta.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#!/usr/bin/python
"""
read_mouse_delta.py:
"""
from __future__ import print_function
__author__ = "Dilawar Singh"
__copyright__ = "Copyright 2017-, Dilawar Singh"
__version__ = "1.0.0"
__maintainer__ = "Dilawar Singh"
__email__ = "dilawars@ncbs.res.in"
__status__ = "Development"
import sys
import struct
import os
import time
import math
import io
import fcntl
import datetime
import socket
def close( ):
global conn_, sock_
if conn_:
sock_.close( )
conn_.close( )
user_ = os.environ.get( 'USER', ' ' )
sock_, conn_ = None, None
sockName_ = '/tmp/__MY_MOUSE_SOCKET__'
def create_socket( ):
global sock_, conn_
conn_ = None
if os.path.exists( sockName_ ):
os.remove( sockName_ )
sock_ = socket.socket( socket.AF_UNIX, socket.SOCK_STREAM )
sock_.settimeout( 1.0 ) # wait for a second.
sock_.bind( sockName_ )
connected = False
while not connected:
try:
print( '[INFO] Waiting for MOUSE socket client. timeout %s' %
sock_.gettimeout()
)
sock_.listen( 2 )
conn_, addr = sock_.accept( )
connected = True
except Exception as e:
pass
user_interrupt_ = False
trajs = [ (0,0,0) ] * 10
lastT_ = 0.0
f_ = None
q_ = [ (0,0,0) ]
def getMouseEvent( mouseF, q ):
global user_interrupt_
if user_interrupt_:
return
# This is too fast. Probably not a good idea. Use os.read( ).
fd = mouseF.fileno()
flag = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
flag = fcntl.fcntl(fd, fcntl.F_GETFL)
buf = mouseF.read(3);
t = time.time( )
if buf:
x,y = struct.unpack( "bb", buf[1:] );
q.append( (t, x, y) )
else:
q.append( (t,0,0) )
# Essential to sleep for some time.
time.sleep( 1e-3 )
def getMousePos( q ):
global user_interrupt_
global lastT_
if user_interrupt_:
sock_.close( )
return
val = q.pop( )
t1, dx, dy = val
t0, x0, y0 = trajs[-1]
trajs.append( (t1, x0 + dx, y0 + dy) )
trajs.pop( 0 )
res = compute_velocity_and_dir( trajs )
if t1 > lastT_ + 5e-3 :
lastT_ = t1
return '%.4f,%.4f,%.4f' % (t1, res[0], res[1] )
def compute_velocity_and_dir( trajs ):
vels, dirs = [ ], [ ]
for i, (t, x, y) in enumerate(trajs[1:]):
t0, x0, y0 = trajs[i]
if t > t0:
v = ((x - x0 ) ** 2.0 + (y-y0)**2.0) ** 0.5 / (t-t0)
theta = (y - y0) / max(1e-12, (x - x0))
d = math.atan( theta )
vels.append( v )
dirs.append( d )
else:
vels.append( 0 )
dirs.append( 0 )
# average direction
return sum( vels ) / len(vels), sum( dirs ) / len(dirs)
def process( path ):
global user_interrupt_
global conn_
global f_, q_
global mouseFile_
if user_interrupt_:
return False
if f_ is None:
f_ = io.open( path, "rb" )
getMouseEvent(f_, q_)
now = datetime.datetime.now().isoformat()
r = getMousePos( q_ )
if r is not None:
txt = now + ',' + r
if conn_:
conn_.sendall( txt + '\n' )
def main( path ):
create_socket( )
while 1:
x = process( path )
if __name__ == '__main__':
path = sys.argv[1]
try:
main( path )
close( )
except KeyboardInterrupt as e:
print( 'User pressed Ctrl+C' )
close( )