Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data Streams - Streams Direct examples #60

Merged
merged 6 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion data-streams/getting-started/hardhat/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Getting Started with Data Streams

This guide shows you how to read data from a Data Streams feed, verify the answer onchain, and store the answer onchain. This example uses a [Chainlink Automation Log Trigger](https://docs.chain.link/chainlink-automation/guides/log-trigger) to check for events that require data. For this example:
This guide shows you how to read data from a Data Streams feed, verify the answer onchain, and store the answer onchain. This example uses the [Streams Trade](https://docs.chain.link/data-streams#streams-trade-using-data-streams-with-chainlink-automation) implementation of Data Streams and a [Chainlink Automation Log Trigger](https://docs.chain.link/chainlink-automation/guides/log-trigger) to check for events that require data. For this example:

- The log trigger comes from a simple emitter contract.
- Chainlink Automation then uses `StreamsLookup` to retrieve a signed report from the Data Streams Engine, return the data in a callback, and run the [`performUpkeep` function](https://docs.chain.link/chainlink-automation/reference/automation-interfaces#performupkeep-function-for-log-triggers) on your registered upkeep contract.
Expand Down
8 changes: 8 additions & 0 deletions data-streams/streams-direct/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Using Data Streams with the Streams Direct implementation

This folder contains the code examples to follow the Chainlink Data Streams guides with the [Streams Direct](https://docs.chain.link/data-streams#streams-direct-using-data-streams-with-your-own-bot) implementation.

To learn more, see the following guides:

- [Fetch and decode reports via a REST API](https://docs.chain.link/data-streams/tutorials/streams-direct-api)
- [Streams and decode reports via WebSocket](https://docs.chain.link/data-streams/tutorials/streams-direct-ws)
170 changes: 170 additions & 0 deletions data-streams/streams-direct/api/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// client.go

package client

import (
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"

"github.com/ethereum/go-ethereum/common/hexutil"
)

type SingleReport struct {
FeedID hexutil.Bytes `json:"feedID"`
ValidFromTimestamp uint32 `json:"validFromTimestamp"`
ObservationsTimestamp uint32 `json:"observationsTimestamp"`
FullReport hexutil.Bytes `json:"fullReport"`
}

type SingleReportResponse struct {
Report SingleReport `json:"report"`
}

type BulkReportResponse struct {
Reports []SingleReport `json:"reports"`
}

const (
path = "/api/v1/reports"
bulkPath = "/api/v1/reports/bulk"
)

func GenerateHMAC(method string, path string, body []byte, clientId string, timestamp int64, userSecret string) string {
serverBodyHash := sha256.New()
serverBodyHash.Write(body)
serverBodyHashString := fmt.Sprintf("%s %s %s %s %d",
method,
path,
hex.EncodeToString(serverBodyHash.Sum(nil)),
clientId,
timestamp)
fmt.Println("Generating HMAC with the following: ", serverBodyHashString)
signedMessage := hmac.New(sha256.New, []byte(userSecret))
signedMessage.Write([]byte(serverBodyHashString))
userHmac := hex.EncodeToString(signedMessage.Sum(nil))
return userHmac
}

func GenerateAuthHeaders(method string, pathAndParams string, clientId string, userSecret string) http.Header {
header := http.Header{}
timestamp := time.Now().UTC().UnixMilli()
hmacString := GenerateHMAC(method, pathAndParams, []byte(""), clientId, timestamp, userSecret)

header.Add("Authorization", clientId)
header.Add("X-Authorization-Timestamp", strconv.FormatInt(timestamp, 10))
header.Add("X-Authorization-Signature-SHA256", hmacString)
return header
}

func FetchSingleReportSingleFeed(feedId string) (SingleReport, error) {
baseUrl := os.Getenv("BASE_URL") // Example: api.testnet-dataengine.chain.link
clientId := os.Getenv("CLIENT_ID") // Example: "00000000-0000-0000-0000-000000000000"
userSecret := os.Getenv("CLIENT_SECRET") // Example: "your-secret"

timestamp := time.Now().UTC().UnixMilli() - 500

params := url.Values{
"feedID": {feedId},
"timestamp": {fmt.Sprintf("%d", timestamp/1000)},
}

req := &http.Request{
Method: http.MethodGet,
URL: &url.URL{
Scheme: "https",
Host: baseUrl,
Path: path,
RawQuery: params.Encode(),
},
}
req.Header = GenerateAuthHeaders(req.Method, req.URL.RequestURI(), clientId, userSecret)
fmt.Println("base: ", baseUrl)
fmt.Println("header: ", req.Header)
fmt.Println("params: ", params)

rawRes, err := http.DefaultClient.Do(req)
if err != nil {
return SingleReport{}, err
}
defer rawRes.Body.Close()

body, err := io.ReadAll(rawRes.Body)
if err != nil {
return SingleReport{}, err
}

if rawRes.StatusCode != http.StatusOK {
// Error messages are typically descriptive
return SingleReport{}, fmt.Errorf("unexpected status code %d: %v", rawRes.StatusCode, string(body))
}

var res SingleReportResponse
err = json.Unmarshal(body, &res)
if err != nil {
return SingleReport{}, err
}

return res.Report, nil
}

func FetchSingleReportManyFeeds(feedIds []string) ([]SingleReport, error) {
baseUrl := os.Getenv("BASE_URL") //Example: api.testnet-dataengine.chain.link
clientId := os.Getenv("CLIENT_ID") // Example: "00000000-0000-0000-0000-000000000000"
userSecret := os.Getenv("CLIENT_SECRET") // Example: "your-secret"

timestamp := time.Now().UTC().UnixMilli() - 500

params := url.Values{
"feedIDs": {strings.Join(feedIds, ",")},
"timestamp": {fmt.Sprintf("%d", timestamp/1000)},
}

req := &http.Request{
Method: http.MethodGet,
URL: &url.URL{
Scheme: "https",
Host: baseUrl,
Path: bulkPath,
RawQuery: params.Encode(),
},
}

req.Header = GenerateAuthHeaders(req.Method, req.URL.RequestURI(), clientId, userSecret)
fmt.Println("base: ", baseUrl)
fmt.Println("header: ", req.Header)
fmt.Println("params: ", params)

rawRes, err := http.DefaultClient.Do(req)
if err != nil {
return []SingleReport{}, err
}
defer rawRes.Body.Close()

body, err := io.ReadAll(rawRes.Body)
if err != nil {
return []SingleReport{}, err
}

if rawRes.StatusCode != http.StatusOK {
// Error messages are typically descriptive
return []SingleReport{}, fmt.Errorf("unexpected status code %d: %v", rawRes.StatusCode, string(body))
}

var res BulkReportResponse
err = json.Unmarshal(body, &res)
if err != nil {
return []SingleReport{}, err
}

return res.Reports, nil
}
24 changes: 24 additions & 0 deletions data-streams/streams-direct/api/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
module data-streams-direct

go 1.21

require (
github.com/ethereum/go-ethereum v1.12.2 // Ethereum blockchain interaction library
github.com/pkg/errors v0.9.1 // Library for handling errors
github.com/smartcontractkit/chainlink/v2 v2.2.1-0.20230823171354-1ead9ee6f6bb // Chainlink core components library
)

replace (
// Resolves version mismatch between cosmosSDK and hdevalence/ed25519consensus
filippo.io/edwards25519 => filippo.io/edwards25519 v1.0.0-rc.1

// Adds ARM support by updating CosmWasm to v1.2.4
github.com/CosmWasm/wasmvm => github.com/CosmWasm/wasmvm v1.2.4

//// Fix go mod tidy issue for ambiguous imports from go-ethereum
//// See https://github.com/ugorji/go/issues/279
github.com/btcsuite/btcd => github.com/btcsuite/btcd v0.22.1

// Aligns protobuf version with cosmos SDK requirements
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1
)
129 changes: 129 additions & 0 deletions data-streams/streams-direct/api/internal/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// decoder.go

package internal

import (
"encoding/binary"
"fmt"

"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/pkg/errors"
mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils"
v3report "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3/types"
)

type ReportWithContext struct {
FeedId mercuryutils.FeedID
FeedVersion mercuryutils.FeedVersion
V3Report *v3report.Report
Round uint8
Epoch uint32
Digest []byte
}

type FullReport struct {
ReportContext [3][32]byte
ReportBlob []byte
RawRs [][32]byte
RawSs [][32]byte
RawVs [32]byte
}

func mustNewType(t string) abi.Type {
result, err := abi.NewType(t, "", []abi.ArgumentMarshaling{})
if err != nil {
panic(fmt.Sprintf("Unexpected error during abi.NewType: %s", err))
}
return result
}

var schema = abi.Arguments{
{Name: "reportContext", Type: mustNewType("bytes32[3]")},
{Name: "reportBlob", Type: mustNewType("bytes")},
{Name: "rawRs", Type: mustNewType("bytes32[]")},
{Name: "rawSs", Type: mustNewType("bytes32[]")},
{Name: "rawVs", Type: mustNewType("bytes32")},
}

/*
DecodeFullReport reads the "fullReport" from the API response into a struct containing the report context, report data,
and raw signatures. This functions requires no prep to use, because the schema for the "fullReport" blob is
common among all report versions (basic, premium, etc),
*/

func DecodeFullReport(fullReport []byte) (*FullReport, error) {
values, err := schema.Unpack(fullReport)
if err != nil {
return nil, fmt.Errorf("failed to decode FullReport: %w", err)
}
decoded := new(FullReport)
if err = schema.Copy(decoded, values); err != nil {
return nil, fmt.Errorf("failed to copy FullReport values to struct: %w", err)
}

return decoded, nil
}

/*
DecodeReportData takes the report blob (FullReport.ReportBlob), extracts the feeds id, calculates the version from the feed id,
and finally decodes the report blob using the lib that correlates with the version. The resulting interface can be cast into
the correct report type as needed.
*/
func DecodeReportData(reportBlob []byte) (mercuryutils.FeedID, interface{}, error) {
feedIdAbi := abi.Arguments{
{Name: "feedId", Type: mustNewType("bytes32")},
}
reportElements := map[string]interface{}{}
if err := feedIdAbi.UnpackIntoMap(reportElements, reportBlob); err != nil {
return mercuryutils.FeedID{}, nil, err
}
feedIdInterface, ok := reportElements["feedId"]
if !ok {
return mercuryutils.FeedID{}, nil, errors.Errorf("unpacked ReportBlob has no 'feedId'")
}
feedIdBytes, ok := feedIdInterface.([32]byte)
if !ok {
return mercuryutils.FeedID{}, nil, errors.Errorf("cannot cast ReportBlob feedId to [32]byte, type is %T", feedIdBytes)
}
feedID := mercuryutils.FeedID(feedIdBytes)

switch feedID.Version() {
case mercuryutils.REPORT_V3:
res, err := v3report.Decode(reportBlob)
return feedID, res, err
default:
return mercuryutils.FeedID{}, nil, errors.Errorf("unknown report version %d", feedID.Version())
}
}

/*
DecodeFullReportAndReportData takes the full report payload, decodes the fullReport blob, and then decodes the report data.
*/
func DecodeFullReportAndReportData(payload []byte) (*ReportWithContext, error) {
fullReport, err := DecodeFullReport(payload)
if err != nil {
return nil, err
}

feedID, report, err := DecodeReportData(fullReport.ReportBlob)
if err != nil {
return nil, err
}

result := &ReportWithContext{
FeedId: feedID,
FeedVersion: feedID.Version(),
Digest: fullReport.ReportContext[0][:],
Round: fullReport.ReportContext[1][31],
Epoch: binary.BigEndian.Uint32(fullReport.ReportContext[1][32-5 : 32-1]),
}

switch feedID.Version() {
case mercuryutils.REPORT_V3:
result.V3Report = report.(*v3report.Report)
default:
return nil, errors.Errorf("unknown report version %d", feedID.Version())
}

return result, nil
}
Loading