Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#ide
.idea/

# Binaries for programs and plugins
*.exe
*.exe~
Expand Down
11 changes: 4 additions & 7 deletions cmd/a_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
signer_engines "github.com/tez-capital/tezpay/engines/signer"
transactor_engines "github.com/tez-capital/tezpay/engines/transactor"
"github.com/tez-capital/tezpay/extension"
"github.com/tez-capital/tezpay/state"
"github.com/tez-capital/tezpay/utils"
"github.com/trilitech/tzgo/tezos"
)
Expand All @@ -38,13 +37,11 @@ func loadConfigurationEnginesExtensions() (*configurationAndEngines, error) {
return nil, errors.Join(constants.ErrConfigurationLoadFailed, err)
}

signerEngine := state.Global.SignerOverride
if signerEngine == nil {
signerEngine, err = signer_engines.Load(string(config.PayoutConfiguration.WalletMode))
if err != nil {
return nil, errors.Join(constants.ErrSignerLoadFailed, err)
}
signerEngine, err := signer_engines.InitGCSigner(context.Background(), config.GCPSigner)
if err != nil {
return nil, err
}

// for testing point transactor to testnet
// transactorEngine, err := clients.InitDefaultTransactor("https://rpc.tzkt.io/ghostnet/", "https://api.ghostnet.tzkt.io/") // (config.Network.RpcUrl, config.Network.TzktUrl)
transactorEngine, err := transactor_engines.InitDefaultTransactor(config)
Expand Down
16 changes: 11 additions & 5 deletions cmd/continual.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"os"
"time"

ctx "context"

"github.com/samber/lo"
"github.com/spf13/cobra"
"github.com/tez-capital/tezpay/common"
Expand Down Expand Up @@ -51,9 +53,13 @@ func processCycleInContinualMode(context *configurationAndEngines, forceConfirma
}()

config, collector, signer, transactor := context.Unwrap()
fsReporter := reporter_engines.NewFileSystemReporter(config, &common.ReporterEngineOptions{
DryRun: isDryRun,
})

gcReporter, err := reporter_engines.NewGCSReporter(ctx.Background(), config.GCPBucket)
if err != nil {
slog.Error("reporter_engines.NewGCSReporter", "error", err.Error())
return processed
}
defer gcReporter.Close()

// refresh engine params - for protocol upgrades
if err := errors.Join(transactor.RefreshParams(), collector.RefreshParams()); err != nil {
Expand Down Expand Up @@ -88,7 +94,7 @@ func processCycleInContinualMode(context *configurationAndEngines, forceConfirma

slog.Info("checking reports of past payouts")
preparationResult := assertRunWithResult(func() (*common.PreparePayoutsResult, error) {
return core.PrepareCyclePayouts(generationResult, config, common.NewPreparePayoutsEngineContext(collector, signer, fsReporter, notifyAdminFactory(config)), &common.PreparePayoutsOptions{})
return core.PrepareCyclePayouts(generationResult, config, common.NewPreparePayoutsEngineContext(collector, signer, gcReporter, notifyAdminFactory(config)), &common.PreparePayoutsOptions{})
}, EXIT_OPERTION_FAILED)

if len(preparationResult.ValidPayouts) == 0 {
Expand All @@ -109,7 +115,7 @@ func processCycleInContinualMode(context *configurationAndEngines, forceConfirma

slog.Info("executing payouts", "valid", len(preparationResult.ValidPayouts), "invalid", len(preparationResult.InvalidPayouts), "accumulated", len(preparationResult.AccumulatedPayouts), "already_successful", len(preparationResult.ReportsOfPastSuccessfulPayouts))
executionResult := assertRunWithResult(func() (*common.ExecutePayoutsResult, error) {
return core.ExecutePayouts(preparationResult, config, common.NewExecutePayoutsEngineContext(signer, transactor, fsReporter, notifyAdminFactory(config)), &common.ExecutePayoutsOptions{
return core.ExecutePayouts(preparationResult, config, common.NewExecutePayoutsEngineContext(signer, transactor, gcReporter, notifyAdminFactory(config)), &common.ExecutePayoutsOptions{
MixInContractCalls: mixInContractCalls,
MixInFATransfers: mixInFATransfers,
DryRun: isDryRun,
Expand Down
15 changes: 10 additions & 5 deletions cmd/pay.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
ctx "context"
"errors"
"fmt"
"log/slog"
Expand Down Expand Up @@ -33,9 +34,13 @@ var payCmd = &cobra.Command{
mixInFATransfers, _ := cmd.Flags().GetBool(DISABLE_SEPARATE_FA_PAYOUTS_FLAG)
isDryRun, _ := cmd.Flags().GetBool(DRY_RUN_FLAG)

fsReporter := reporter_engines.NewFileSystemReporter(config, &common.ReporterEngineOptions{
DryRun: isDryRun,
})
gcReporter, err := reporter_engines.NewGCSReporter(ctx.Background(), config.GCPBucket)
if err != nil {
slog.Info("reporter_engines.NewGCSReporter", "cycle", cycle, "error", err.Error())
return
}
defer gcReporter.Close()

stdioReporter := reporter_engines.NewStdioReporter(config)

if !state.Global.IsDonationPromptDisabled() && !config.IsDonatingToTezCapital() {
Expand Down Expand Up @@ -92,7 +97,7 @@ var payCmd = &cobra.Command{

slog.Info("checking past reports")
preparationResult := assertRunWithResult(func() (*common.PreparePayoutsResult, error) {
return core.PrepareCyclePayouts(generationResult, config, common.NewPreparePayoutsEngineContext(collector, signer, fsReporter, notifyAdminFactory(config)), &common.PreparePayoutsOptions{})
return core.PrepareCyclePayouts(generationResult, config, common.NewPreparePayoutsEngineContext(collector, signer, gcReporter, notifyAdminFactory(config)), &common.PreparePayoutsOptions{})
}, EXIT_OPERTION_FAILED)

switch {
Expand Down Expand Up @@ -128,7 +133,7 @@ var payCmd = &cobra.Command{
slog.Info("executing payouts")
executionResult := assertRunWithResult(func() (*common.ExecutePayoutsResult, error) {
var reporter common.ReporterEngine
reporter = fsReporter
reporter = gcReporter
if reportToStdout, _ := cmd.Flags().GetBool(REPORT_TO_STDOUT); reportToStdout {
reporter = stdioReporter
}
Expand Down
2 changes: 2 additions & 0 deletions configuration/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ func ConfigurationToRuntimeConfiguration(configuration *LatestConfigurationType)
Extensions: configuration.Extensions,
SourceBytes: []byte{},
DisableAnalytics: configuration.DisableAnalytics,
GCPBucket: configuration.GCPBucket,
GCPSigner: configuration.GCPSigner,
}, nil
}

Expand Down
2 changes: 2 additions & 0 deletions configuration/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ type RuntimeConfiguration struct {
Extensions []tezpay_configuration.ExtensionConfigurationV0
SourceBytes []byte `json:"-"`
DisableAnalytics bool `json:"disable_analytics,omitempty"`
GCPBucket string
GCPSigner string
}

func GetDefaultRuntimeConfiguration() RuntimeConfiguration {
Expand Down
2 changes: 2 additions & 0 deletions configuration/v/v0.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ type ConfigurationV0 struct {
Extensions []ExtensionConfigurationV0 `json:"extensions,omitempty" comment:"extensions (for custom functionality)"`
SourceBytes []byte `json:"-"`
DisableAnalytics bool `json:"disable_analytics,omitempty" comment:"disables analytics, please consider leaving it enabled🙏"`
GCPBucket string `json:"gcp_bucket" comment:"GCP storage bucket name"`
GCPSigner string `json:"gcp_signer" comment:"GCP KMS key source"`
}

type NotificatorConfigurationBase struct {
Expand Down
2 changes: 1 addition & 1 deletion core/prepare/0_PreparePayouts.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func PreparePayouts(ctx *PayoutPrepareContext, options *common.PreparePayoutsOpt
reportsOfPastSuccesfulPayouts := make([]common.PayoutReport, 0, count)
for _, blueprint := range ctx.PayoutBlueprints {
reports, err := ctx.GetReporter().GetExistingReports(blueprint.Cycle)
if err != nil && !os.IsNotExist(err) {
if err != nil && !os.IsNotExist(err) && err.Error() != "storage: object doesn't exist" {
return nil, errors.Join(constants.ErrPayoutsFromFileLoadFailed, fmt.Errorf("cycle: %d", blueprint.Cycle), err)
}
reportResidues := utils.FilterReportsByBaker(reports, ctx.configuration.BakerPKH)
Expand Down
143 changes: 143 additions & 0 deletions engines/reporter/gcp_bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package reporter_engines

import (
"context"
"encoding/json"
"fmt"
"io"
"path"
"sort"

"cloud.google.com/go/storage"
"github.com/gocarina/gocsv"
"github.com/samber/lo"
"github.com/tez-capital/tezpay/common"
"github.com/tez-capital/tezpay/constants"
"github.com/tez-capital/tezpay/utils"
)

type GCSReporter struct {
client *storage.Client
bucket string
ctx context.Context
}

// NewGCSReporter creates a GCS-backed reporter. The caller is responsible for
// providing a context and the target bucket name. Credentials are picked up by
// the default application credentials of the environment.
func NewGCSReporter(ctx context.Context, bucket string) (*GCSReporter, error) {
client, err := storage.NewClient(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create GCS client: %w", err)
}
return &GCSReporter{
client: client,
bucket: bucket,
ctx: ctx,
}, nil
}

// helper to read an object from GCS
func (engine *GCSReporter) readObject(objectPath string) ([]byte, error) {
rc, err := engine.client.Bucket(engine.bucket).Object(objectPath).NewReader(engine.ctx)
if err != nil {
return nil, err
}
defer rc.Close()
return io.ReadAll(rc)
}

// helper to write an object to GCS
func (engine *GCSReporter) writeObject(objectPath string, data []byte, contentType string) error {
w := engine.client.Bucket(engine.bucket).Object(objectPath).NewWriter(engine.ctx)
if contentType != "" {
w.ContentType = contentType
}
if _, err := w.Write(data); err != nil {
w.Close()
return err
}
return w.Close()
}

func (engine *GCSReporter) GetExistingReports(cycle int64) ([]common.PayoutReport, error) {
sourceFile := path.Join(fmt.Sprintf("%d", cycle), constants.PAYOUT_REPORT_FILE_NAME)
data, err := engine.readObject(sourceFile)
if err != nil {
return []common.PayoutReport{}, err
}
reports := make([]common.PayoutReport, 0)
if err := gocsv.UnmarshalBytes(data, &reports); err != nil {
return []common.PayoutReport{}, err
}
return reports, nil
}

func (engine *GCSReporter) ReportPayouts(payouts []common.PayoutReport) error {
if len(payouts) == 0 {
return nil
}
cyclesToBeWritten := lo.Uniq(lo.Map(payouts, func(pr common.PayoutReport, _ int) int64 { return pr.Cycle }))

for _, cycle := range cyclesToBeWritten {
targetFile := path.Join(fmt.Sprintf("%d", cycle), constants.PAYOUT_REPORT_FILE_NAME)
// keep the exact sort logic from FsReporter (descending by Amount)
sort.Slice(payouts, func(i, j int) bool { return !payouts[i].Amount.IsLess(payouts[j].Amount) })
reports := lo.Filter(payouts, func(payout common.PayoutReport, _ int) bool { return payout.Cycle == cycle })
csvData, err := gocsv.MarshalBytes(reports)
if err != nil {
return err
}
if err := engine.writeObject(targetFile, csvData, "text/csv"); err != nil {
return err
}
}
return nil
}

func (engine *GCSReporter) ReportInvalidPayouts(payouts []common.PayoutRecipe) error {
invalid := utils.OnlyInvalidPayouts(payouts)
if len(invalid) == 0 {
return nil
}
cyclesToBeWritten := lo.Uniq(lo.Map(invalid, func(pr common.PayoutRecipe, _ int) int64 { return pr.Cycle }))
for _, cycle := range cyclesToBeWritten {
targetFile := path.Join(fmt.Sprintf("%d", cycle), constants.INVALID_REPORT_FILE_NAME)
// convert to PayoutReport as FsReporter did
reports := lo.Map(utils.FilterPayoutsByCycle(invalid, cycle), mapPayoutRecipeToPayoutReport)
csvData, err := gocsv.MarshalBytes(reports)
if err != nil {
return err
}
if err := engine.writeObject(targetFile, csvData, "text/csv"); err != nil {
return err
}
}
return nil
}

func (engine *GCSReporter) ReportCycleSummary(summary common.CyclePayoutSummary) error {
targetFile := path.Join(fmt.Sprintf("%d", summary.Cycle), constants.REPORT_SUMMARY_FILE_NAME)
data, err := json.MarshalIndent(summary, "", "\t")
if err != nil {
return err
}
return engine.writeObject(targetFile, data, "application/json")
}

func (engine *GCSReporter) GetExistingCycleSummary(cycle int64) (*common.CyclePayoutSummary, error) {
sourceFile := path.Join(fmt.Sprintf("%d", cycle), constants.REPORT_SUMMARY_FILE_NAME)
data, err := engine.readObject(sourceFile)
if err != nil {
return nil, err
}
var summary common.CyclePayoutSummary
if err := json.Unmarshal(data, &summary); err != nil {
return nil, err
}
return &summary, nil
}

func (engine *GCSReporter) Close() error {
return engine.client.Close()
}
Loading