-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathsimplpedpop.py
319 lines (252 loc) · 10.2 KB
/
simplpedpop.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
from secrets import token_bytes as random_bytes
from typing import List, NamedTuple, NewType, Tuple, Optional, NoReturn
from secp256k1proto.bip340 import schnorr_sign, schnorr_verify
from secp256k1proto.secp256k1 import GE, Scalar
from .util import (
BIP_TAG,
FaultyParticipantOrCoordinatorError,
FaultyCoordinatorError,
UnknownFaultyParticipantOrCoordinatorError,
)
from .vss import VSS, VSSCommitment
###
### Exceptions
###
class SecshareSumError(ValueError):
pass
###
### Proofs of possession (pops)
###
Pop = NewType("Pop", bytes)
POP_MSG_TAG = BIP_TAG + "pop message"
def pop_msg(idx: int) -> bytes:
return idx.to_bytes(4, byteorder="big")
def pop_prove(seckey: bytes, idx: int, aux_rand: bytes = 32 * b"\x00") -> Pop:
sig = schnorr_sign(
pop_msg(idx), seckey, aux_rand=random_bytes(32), tag_prefix=POP_MSG_TAG
)
return Pop(sig)
def pop_verify(pop: Pop, pubkey: bytes, idx: int) -> bool:
return schnorr_verify(pop_msg(idx), pubkey, pop, tag_prefix=POP_MSG_TAG)
###
### Messages
###
class ParticipantMsg(NamedTuple):
com: VSSCommitment
pop: Pop
class CoordinatorMsg(NamedTuple):
coms_to_secrets: List[GE]
sum_coms_to_nonconst_terms: List[GE]
pops: List[Pop]
def to_bytes(self) -> bytes:
return b"".join(
[
P.to_bytes_compressed_with_infinity()
for P in self.coms_to_secrets + self.sum_coms_to_nonconst_terms
]
) + b"".join(self.pops)
class CoordinatorInvestigationMsg(NamedTuple):
partial_pubshares: List[GE]
###
### Other common definitions
###
class DKGOutput(NamedTuple):
secshare: Optional[bytes] # None for coordinator
threshold_pubkey: bytes
pubshares: List[bytes]
def assemble_sum_coms(
coms_to_secrets: List[GE], sum_coms_to_nonconst_terms: List[GE]
) -> VSSCommitment:
# Sum the commitments to the secrets
return VSSCommitment(
[GE.sum(*(c for c in coms_to_secrets))] + sum_coms_to_nonconst_terms
)
###
### Participant
###
class ParticipantState(NamedTuple):
t: int
n: int
idx: int
com_to_secret: GE
class ParticipantInvestigationData(NamedTuple):
n: int
idx: int
secshare: Scalar
pubshare: GE
# To keep the algorithms of SimplPedPop and EncPedPop purely non-interactive
# computations, we omit explicit invocations of an interactive equality check
# protocol. ChillDKG will take care of invoking the equality check protocol.
def participant_step1(
seed: bytes, t: int, n: int, idx: int
) -> Tuple[
ParticipantState,
ParticipantMsg,
# The following return value is a list of n partial secret shares generated
# by this participant. The item at index i is supposed to be made available
# to participant i privately, e.g., via an external secure channel. See also
# the function participant_step2_prepare_secshare().
List[Scalar],
]:
if t > n:
raise ValueError
if idx >= n:
raise IndexError
if len(seed) != 32:
raise ValueError
vss = VSS.generate(seed, t) # OverflowError if t >= 2**32
partial_secshares_from_me = vss.secshares(n)
pop = pop_prove(vss.secret().to_bytes(), idx)
com = vss.commit()
com_to_secret = com.commitment_to_secret()
msg = ParticipantMsg(com, pop)
state = ParticipantState(t, n, idx, com_to_secret)
return state, msg, partial_secshares_from_me
# Helper function to prepare the secshare for participant idx's
# participant_step2() by summing the partial_secshares returned by all
# participants' participant_step1().
#
# In a pure run of SimplPedPop where secret shares are sent via external secure
# channels (i.e., EncPedPop is not used), each participant needs to run this
# function in preparation of their participant_step2(). Since this computation
# involves secret data, it cannot be delegated to the coordinator as opposed to
# other aggregation steps.
#
# If EncPedPop is used instead (as a wrapper of SimplPedPop), the coordinator
# can securely aggregate the encrypted partial secshares into an encrypted
# secshare by exploiting the additively homomorphic property of the encryption.
def participant_step2_prepare_secshare(
partial_secshares: List[Scalar],
) -> Scalar:
secshare: Scalar # REVIEW Work around missing type annotation of Scalar.sum
secshare = Scalar.sum(*partial_secshares)
return secshare
def participant_step2(
state: ParticipantState,
cmsg: CoordinatorMsg,
secshare: Scalar,
) -> Tuple[DKGOutput, bytes]:
t, n, idx, com_to_secret = state
coms_to_secrets, sum_coms_to_nonconst_terms, pops = cmsg
assert len(coms_to_secrets) == n
assert len(sum_coms_to_nonconst_terms) == t - 1
assert len(pops) == n
if coms_to_secrets[idx] != com_to_secret:
raise FaultyCoordinatorError(
"Coordinator sent unexpected first group element for local index"
)
for i in range(n):
if i == idx:
# No need to check our own pop.
continue
if coms_to_secrets[i].infinity:
raise FaultyParticipantOrCoordinatorError(
i, "Participant sent invalid commitment"
)
# This can be optimized: We serialize the coms_to_secrets[i] here, but
# schnorr_verify (inside pop_verify) will need to deserialize it again, which
# involves computing a square root to obtain the y coordinate.
if not pop_verify(pops[i], coms_to_secrets[i].to_bytes_xonly(), i):
raise FaultyParticipantOrCoordinatorError(
i, "Participant sent invalid proof-of-knowledge"
)
sum_coms = assemble_sum_coms(coms_to_secrets, sum_coms_to_nonconst_terms)
# Verifying the tweaked secshare against the tweaked pubshare is equivalent
# to verifying the untweaked secshare against the untweaked pubshare, but
# avoids computing the untweaked pubshare in the happy path and thereby
# moves a group addition to the error path.
sum_coms_tweaked, tweak, pubtweak = sum_coms.invalid_taproot_commit()
pubshare_tweaked = sum_coms_tweaked.pubshare(idx)
secshare_tweaked = secshare + tweak
if not VSSCommitment.verify_secshare(secshare_tweaked, pubshare_tweaked):
pubshare = pubshare_tweaked - pubtweak
raise UnknownFaultyParticipantOrCoordinatorError(
ParticipantInvestigationData(n, idx, secshare, pubshare),
"Received invalid secshare, "
"consider investigation procedure to determine faulty party",
)
threshold_pubkey = sum_coms_tweaked.commitment_to_secret()
pubshares = [
sum_coms_tweaked.pubshare(i)
if i != idx
else pubshare_tweaked # We have computed our own pubshare already.
for i in range(n)
]
dkg_output = DKGOutput(
secshare_tweaked.to_bytes(),
threshold_pubkey.to_bytes_compressed(),
[pubshare.to_bytes_compressed() for pubshare in pubshares],
)
eq_input = t.to_bytes(4, byteorder="big") + sum_coms.to_bytes()
return dkg_output, eq_input
def participant_investigate(
error: UnknownFaultyParticipantOrCoordinatorError,
cinv: CoordinatorInvestigationMsg,
partial_secshares: List[Scalar],
) -> NoReturn:
n, idx, secshare, pubshare = error.inv_data
partial_pubshares = cinv.partial_pubshares
if GE.sum(*partial_pubshares) != pubshare:
raise FaultyCoordinatorError("Sum of partial pubshares not equal to pubshare")
if Scalar.sum(*partial_secshares) != secshare:
raise SecshareSumError("Sum of partial secshares not equal to secshare")
for i in range(n):
if not VSSCommitment.verify_secshare(
partial_secshares[i], partial_pubshares[i]
):
if i != idx:
raise FaultyParticipantOrCoordinatorError(
i, "Participant sent invalid partial secshare"
)
else:
# We are not faulty, so the coordinator must be.
raise FaultyCoordinatorError(
"Coordinator fiddled with the share from me to myself"
)
# We now know:
# - The sum of the partial secshares is equal to the secshare.
# - The sum of the partial pubshares is equal to the pubshare.
# - Every partial secshare matches its corresponding partial pubshare.
# Hence, the secshare matches the pubshare.
assert VSSCommitment.verify_secshare(secshare, pubshare)
# This should never happen (unless the caller fiddled with the inputs).
raise RuntimeError(
"participant_investigate() was called, but all inputs are consistent."
)
###
### Coordinator
###
def coordinator_step(
pmsgs: List[ParticipantMsg], t: int, n: int
) -> Tuple[CoordinatorMsg, DKGOutput, bytes]:
# Sum the commitments to the i-th coefficients for i > 0
#
# This procedure corresponds to the one described by Pedersen in Section 5.1
# of "Non-Interactive and Information-Theoretic Secure Verifiable Secret
# Sharing". However, we don't sum the commitments to the secrets (i == 0)
# because they'll be necessary to check the pops.
coms_to_secrets = [pmsg.com.commitment_to_secret() for pmsg in pmsgs]
# But we can sum the commitments to the non-constant terms.
sum_coms_to_nonconst_terms = [
GE.sum(*(pmsg.com.commitment_to_nonconst_terms()[j] for pmsg in pmsgs))
for j in range(t - 1)
]
pops = [pmsg.pop for pmsg in pmsgs]
cmsg = CoordinatorMsg(coms_to_secrets, sum_coms_to_nonconst_terms, pops)
sum_coms = assemble_sum_coms(coms_to_secrets, sum_coms_to_nonconst_terms)
sum_coms_tweaked, _, _ = sum_coms.invalid_taproot_commit()
threshold_pubkey = sum_coms_tweaked.commitment_to_secret()
pubshares = [sum_coms_tweaked.pubshare(i) for i in range(n)]
dkg_output = DKGOutput(
None,
threshold_pubkey.to_bytes_compressed(),
[pubshare.to_bytes_compressed() for pubshare in pubshares],
)
eq_input = t.to_bytes(4, byteorder="big") + sum_coms.to_bytes()
return cmsg, dkg_output, eq_input
def coordinator_investigate(
pmsgs: List[ParticipantMsg],
) -> List[CoordinatorInvestigationMsg]:
n = len(pmsgs)
all_partial_pubshares = [[pmsg.com.pubshare(i) for pmsg in pmsgs] for i in range(n)]
return [CoordinatorInvestigationMsg(all_partial_pubshares[i]) for i in range(n)]