-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrimesync.py
126 lines (116 loc) · 4.51 KB
/
rimesync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from sys import stderr
from time import sleep
from pathlib import Path
from subprocess import run
from shutil import copyfile, copytree
__version__ = "0.1.0"
class RimeSync:
def __init__(
self,
workspace: Path,
rime_install_dir: Path,
rime_user_dir: Path,
rime_user_id: str,
dry_run: bool = False,
) -> None:
self.rime_install_dir = rime_install_dir
self.rime_user_dir = rime_user_dir
self.rime_sync_dir = rime_user_dir / 'sync' / rime_user_id if rime_user_id is not None else None
self.dry_run = dry_run
self.workspace = workspace
def install(self, files: set[str], directories: set[str]):
if not self.dry_run:
print(f"install to: {self.rime_user_dir}")
for f in files:
src = self.workspace / 'rimedata' / f
dst = self.rime_user_dir / f
if not src.is_file():
continue
print(src)
try:
copyfile(src, dst)
except Exception as e:
print(f"warning: failed to install {f}: {e}", file=stderr)
for d in directories:
src = self.workspace / 'rimedata' / d
dst = self.rime_user_dir / d
if not src.is_dir():
continue
print(src)
try:
copytree(src, dst, dirs_exist_ok=True)
except Exception as e:
print(f"warning: failed to install {f}: {e}", file=stderr)
print("call weasel deploy")
run([self.rime_install_dir / 'WeaselDeployer.exe', '/deploy'], check=True)
sleep(1)
else:
print(f"would install to: {self.rime_user_dir}")
for f in files:
src = self.workspace / 'rimedata' / f
dst = self.rime_user_dir / f
if not src.is_file():
continue
print(src)
for d in directories:
src = self.workspace / 'rimedata' / d
dst = self.rime_user_dir / d
if not src.is_dir():
continue
print(src)
print("would call weasel deploy")
print("install complete")
def sync(self, files: set[str]):
if not self.dry_run:
print(f"sync with: {self.rime_sync_dir}")
print("call weasel sync")
run([self.rime_install_dir / 'WeaselDeployer.exe', '/sync'], check=True)
sleep(1)
print("merge userdb:")
for f in files:
src = self.workspace / 'rimedata' / f
dst = self.rime_sync_dir / f
if not src.is_file():
continue
print(src)
try:
copyfile(src, dst)
sleep(1)
except Exception as e:
print(f"warning: failed to copy {f}: {e}", file=stderr)
print("call weasel sync again")
run([self.rime_install_dir / 'WeaselDeployer.exe', '/sync'], check=True)
sleep(1)
print("process and copy back userdb:")
for f in files:
src = self.rime_sync_dir / f
dst = self.workspace / 'rimedata' / f
if not src.is_file():
continue
print(src)
try:
with open(dst, 'wb') as g:
with open(src, 'rb') as h:
g.writelines(l for l in h if not l.startswith(b'\x7f'))
except Exception as e:
print(f"warning: error in processing or writing {f}: {e}", file=stderr)
continue
else:
print(f"would sync with: {self.rime_sync_dir}")
print("would call weasel sync")
print("would merge userdb:")
for f in files:
src = self.workspace / 'rimedata' / f
dst = self.rime_sync_dir / f
if not src.is_file():
continue
print(src)
print("would call weasel sync agin")
print("process and copy back userdb:")
for f in files:
src = self.rime_sync_dir / f
dst = self.workspace / 'rimedata' / f
if not src.is_file():
continue
print(src)
print("sync complete")