Skip to content

Commit 354c330

Browse files
committed
add: use of internal broadcasted, updating transaction info in participants DB, airdrop amount in api ctx
1 parent 80cfcc5 commit 354c330

File tree

7 files changed

+71
-31
lines changed

7 files changed

+71
-31
lines changed

config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ listener:
99
addr: localhost:8000
1010

1111
broadcaster:
12-
airdrop_amount: 100urmo
12+
airdrop_amount: 100stake
1313
cosmos_rpc: rpc_url
1414
chain_id: chain_id
1515
sender_private_key: priv_key

internal/broadcaster/broadcaster.go

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -59,40 +59,58 @@ func (r *Runner) run(ctx context.Context) error {
5959
for _, participant := range participants {
6060
log := r.log.WithField("participant_nullifier", participant.Nullifier)
6161

62-
tx, err := r.genTx(ctx, 0, participant)
63-
if err != nil {
64-
log.WithError(err).Error("Failed to generate tx")
62+
if err := r.handleParticipant(ctx, participant); err != nil {
63+
log.WithError(err).Error("Failed to handle participant")
6564
continue
6665
}
66+
}
6767

68-
gasUsed, err := r.simulateTx(ctx, tx)
69-
if err != nil {
70-
log.WithError(err).Error("Failed to simulate tx")
71-
continue
72-
}
68+
return nil
69+
}
7370

74-
tx, err = r.genTx(ctx, gasUsed*3, participant)
75-
if err != nil {
76-
log.WithError(err).Error("Failed to generate tx after simulation")
77-
continue
78-
}
71+
func (r *Runner) handleParticipant(ctx context.Context, participant data.Participant) error {
72+
tx, err := r.createAirdropTx(ctx, participant)
73+
if err != nil {
74+
return fmt.Errorf("creating airdrop tx: %w", err)
75+
}
7976

80-
if err = r.broadcastTx(ctx, tx); err == nil {
81-
continue
77+
txHash, err := r.broadcastTx(ctx, tx)
78+
if err != nil {
79+
err = r.participants.New().UpdateStatus(participant.Nullifier, txHash, data.TxStatusFailed)
80+
if err != nil {
81+
return fmt.Errorf("update participant failed tx status: %w", err)
8282
}
8383

84-
log.WithError(err).Error("Failed to broadcast tx")
84+
return fmt.Errorf("broadcast tx: %w", err)
85+
}
8586

86-
// TODO: handle errors: whether we should delete the participant or assign a failed status (hard)
87-
if err = r.participants.New().Delete(participant.Nullifier); err != nil {
88-
log.WithError(err).Error("Failed to delete successful tx")
89-
continue
90-
}
87+
err = r.participants.New().UpdateStatus(participant.Nullifier, txHash, data.TxStatusCompleted)
88+
if err != nil {
89+
return fmt.Errorf("update participant completed tx status: %w", err)
9190
}
9291

9392
return nil
9493
}
9594

95+
func (r *Runner) createAirdropTx(ctx context.Context, participant data.Participant) ([]byte, error) {
96+
tx, err := r.genTx(ctx, 0, participant)
97+
if err != nil {
98+
return nil, fmt.Errorf("failed to generate tx: %w", err)
99+
}
100+
101+
gasUsed, err := r.simulateTx(ctx, tx)
102+
if err != nil {
103+
return nil, fmt.Errorf("failed to simulate tx: %w", err)
104+
}
105+
106+
tx, err = r.genTx(ctx, gasUsed*3, participant)
107+
if err != nil {
108+
return nil, fmt.Errorf("failed to generate tx after simulation: %w", err)
109+
}
110+
111+
return tx, nil
112+
}
113+
96114
func (r *Runner) genTx(ctx context.Context, gasLimit uint64, p data.Participant) ([]byte, error) {
97115
tx, err := r.buildTransferTx(p)
98116
if err != nil {
@@ -159,28 +177,28 @@ func (r *Runner) simulateTx(ctx context.Context, tx []byte) (gasUsed uint64, err
159177
return sim.GasInfo.GasUsed, nil
160178
}
161179

162-
func (r *Runner) broadcastTx(ctx context.Context, tx []byte) error {
180+
func (r *Runner) broadcastTx(ctx context.Context, tx []byte) (string, error) {
163181
grpcRes, err := r.txClient.BroadcastTx(ctx, &client.BroadcastTxRequest{
164182
Mode: client.BroadcastMode_BROADCAST_MODE_BLOCK,
165183
TxBytes: tx,
166184
})
167185
if err != nil {
168-
return fmt.Errorf("send tx: %w", err)
186+
return "", fmt.Errorf("send tx: %w", err)
169187
}
170188
r.log.Debugf("Submitted transaction to the core: %s", grpcRes.TxResponse.TxHash)
171189

172190
if grpcRes.TxResponse.Code != txCodeSuccess {
173-
return fmt.Errorf("got error code: %d, info: %s, log: %s", grpcRes.TxResponse.Code, grpcRes.TxResponse.Info, grpcRes.TxResponse.RawLog)
191+
return grpcRes.TxResponse.TxHash, fmt.Errorf("got error code: %d, info: %s, log: %s", grpcRes.TxResponse.Code, grpcRes.TxResponse.Info, grpcRes.TxResponse.RawLog)
174192
}
175193

176-
return nil
194+
return grpcRes.TxResponse.TxHash, nil
177195
}
178196

179197
func (r *Runner) buildTransferTx(p data.Participant) (types.Tx, error) {
180198
tx := &bank.MsgSend{
181199
FromAddress: r.senderAddress,
182200
ToAddress: p.Address,
183-
Amount: r.airdropCoins,
201+
Amount: r.AirdropCoins,
184202
}
185203

186204
builder := r.txConfig.NewTxBuilder()

internal/broadcaster/config.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,14 @@ type config interface {
3333
}
3434

3535
type Config struct {
36+
AirdropCoins types.Coins
37+
3638
sender cryptotypes.PrivKey
3739
senderAddress string
3840
chainID string
3941
txConfig sdkclient.TxConfig
4042
txClient txclient.ServiceClient
4143
auth authtypes.QueryClient
42-
airdropCoins types.Coins
4344
queryLimit uint64
4445
}
4546

@@ -112,7 +113,7 @@ func (b *broadcasterer) Broadcaster() Config {
112113
),
113114
txClient: txclient.NewServiceClient(cosmosRPC),
114115
auth: authtypes.NewQueryClient(cosmosRPC),
115-
airdropCoins: amount,
116+
AirdropCoins: amount,
116117
queryLimit: queryLimit,
117118
}
118119
}).(Config)

internal/data/participants.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
const (
1414
TxStatusPending = "pending"
1515
TxStatusCompleted = "completed"
16+
TxStatusFailed = "failed"
1617
)
1718

1819
const participantsTable = "participants"
@@ -21,6 +22,8 @@ type Participant struct {
2122
Nullifier string `db:"nullifier"`
2223
Address string `db:"address"`
2324
Status string `db:"status"`
25+
TxHash string `db:"tx_hash"`
26+
Amount string `db:"amount"`
2427
CreatedAt time.Time `db:"created_at"`
2528
UpdatedAt time.Time `db:"updated_at"`
2629
}
@@ -47,6 +50,8 @@ func (q *ParticipantsQ) Insert(p Participant) (*Participant, error) {
4750
"nullifier": p.Nullifier,
4851
"address": p.Address,
4952
"status": p.Status,
53+
"tx_hash": p.TxHash,
54+
"amount": p.Amount,
5055
}).Suffix("RETURNING *")
5156

5257
if err := q.db.Get(&res, stmt); err != nil {
@@ -56,8 +61,11 @@ func (q *ParticipantsQ) Insert(p Participant) (*Participant, error) {
5661
return &res, nil
5762
}
5863

59-
func (q *ParticipantsQ) UpdateStatus(nullifier, status string) error {
60-
stmt := squirrel.Update(participantsTable).Set("status", status).Where(squirrel.Eq{"nullifier": nullifier})
64+
func (q *ParticipantsQ) UpdateStatus(nullifier, txHash, status string) error {
65+
stmt := squirrel.Update(participantsTable).
66+
Set("status", status).
67+
Set("tx_hash", txHash).
68+
Where(squirrel.Eq{"nullifier": nullifier})
6169

6270
if err := q.db.Exec(stmt); err != nil {
6371
return fmt.Errorf("update participant status [nullifier=%s newStatus=%s]: %w", nullifier, status, err)

internal/service/handlers/create_airdrop.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ func CreateAirdrop(w http.ResponseWriter, r *http.Request) {
5656
Nullifier: nullifier,
5757
Address: req.Data.Attributes.Address,
5858
Status: data.TxStatusPending,
59+
Amount: AirdropAmount(r),
5960
})
6061
if err != nil {
6162
Log(r).WithError(err).WithField("nullifier", nullifier).Errorf("Failed to insert participant")

internal/service/handlers/ctx.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ type ctxKey int
1414
const (
1515
logCtxKey ctxKey = iota
1616
participantsQCtxKey
17+
airdropAmountCtxKey
1718
verifierCtxKey
1819
)
1920

@@ -37,6 +38,16 @@ func ParticipantsQ(r *http.Request) *data.ParticipantsQ {
3738
return r.Context().Value(participantsQCtxKey).(*data.ParticipantsQ).New()
3839
}
3940

41+
func CtxAirdropAmount(amount string) func(context.Context) context.Context {
42+
return func(ctx context.Context) context.Context {
43+
return context.WithValue(ctx, airdropAmountCtxKey, amount)
44+
}
45+
}
46+
47+
func AirdropAmount(r *http.Request) string {
48+
return r.Context().Value(airdropAmountCtxKey).(string)
49+
}
50+
4051
func CtxVerifier(entry *config.VerifierConfig) func(context.Context) context.Context {
4152
return func(ctx context.Context) context.Context {
4253
return context.WithValue(ctx, verifierCtxKey, entry)

internal/service/router.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ func Run(ctx context.Context, cfg *config.Config) {
2020
ape.CtxMiddleware(
2121
handlers.CtxLog(cfg.Log()),
2222
handlers.CtxVerifier(cfg.Verifier()),
23+
handlers.CtxAirdropAmount(cfg.Broadcaster().AirdropCoins.String()),
2324
),
2425
handlers.DBCloneMiddleware(cfg.DB()),
2526
)

0 commit comments

Comments
 (0)