Skip to content

Commit

Permalink
Add cached blueky client
Browse files Browse the repository at this point in the history
  • Loading branch information
strideynet committed Jul 24, 2023
1 parent 2e4f80f commit fbb8551
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 17 deletions.
47 changes: 47 additions & 0 deletions api/bsky_cached.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package api

import (
"context"
"fmt"
"github.com/strideynet/bsky-furry-feed/bluesky"
"sync"
"time"
)

type cachedBlueSkyClient struct {
renewalThreshold time.Duration
creds *bluesky.Credentials

mu sync.Mutex

fetchedAt time.Time
cachedClient *bluesky.Client
}

func (cb *cachedBlueSkyClient) Get(ctx context.Context) (c *bluesky.Client, err error) {
ctx, span := tracer.Start(ctx, "cachedBlueSkyClient.Get")
defer func() {
endSpan(span, err)
}()
cb.mu.Lock()
defer cb.mu.Unlock()

// If client was created within the last five minutes, return that client.
if time.Since(cb.fetchedAt) < cb.renewalThreshold {
if cb.cachedClient != nil {
span.AddEvent("client created within last five minutes, returning this client.")
return cb.cachedClient, nil
}
}
span.AddEvent("no client created within last five minutes, will attempt to create new client.")

// Otherwise return a new client.
c, err = bluesky.ClientFromCredentials(ctx, cb.creds)
if err != nil {
return nil, fmt.Errorf("fetching token from credentials: %w", err)
}
cb.cachedClient = c
cb.fetchedAt = time.Now()

return c, nil
}
11 changes: 5 additions & 6 deletions api/moderation.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@ import (
"fmt"

"github.com/bufbuild/connect-go"
"github.com/strideynet/bsky-furry-feed/bluesky"
v1 "github.com/strideynet/bsky-furry-feed/proto/bff/v1"
"github.com/strideynet/bsky-furry-feed/store"
"go.uber.org/zap"
)

type ModerationServiceHandler struct {
store *store.PGXStore
log *zap.Logger
blueskyCredentials *bluesky.Credentials
store *store.PGXStore
log *zap.Logger
clientCache *cachedBlueSkyClient
}

func (m *ModerationServiceHandler) Ping(ctx context.Context, req *connect.Request[v1.PingRequest]) (*connect.Response[v1.PingResponse], error) {
Expand Down Expand Up @@ -118,11 +117,11 @@ func (m *ModerationServiceHandler) ProcessApprovalQueue(ctx context.Context, req

// Follow them if its an approval
if statusToSet == v1.ActorStatus_ACTOR_STATUS_APPROVED {
bskyClient, err := bluesky.ClientFromCredentials(ctx, m.blueskyCredentials)
c, err := m.clientCache.Get(ctx)
if err != nil {
return nil, fmt.Errorf("creating bsky client: %w", err)
}
if err := bskyClient.Follow(ctx, actorDID); err != nil {
if err := c.Follow(ctx, actorDID); err != nil {
return nil, fmt.Errorf("following approved actor: %w", err)
}
}
Expand Down
23 changes: 20 additions & 3 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,24 @@ import (
"github.com/strideynet/bsky-furry-feed/feed"
"github.com/strideynet/bsky-furry-feed/proto/bff/v1/bffv1pbconnect"
"github.com/strideynet/bsky-furry-feed/store"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"net/http"
"time"
)

var tracer = otel.Tracer("github.com/strideynet/bsky-furry-feed/api")

func endSpan(span trace.Span, err error) {
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
span.End()
}

func handleErr(w http.ResponseWriter, log *zap.Logger, err error) {
log.Error("failed to handle request", zap.Error(err))
w.WriteHeader(500)
Expand Down Expand Up @@ -68,9 +82,12 @@ func New(

// Mount Buf Connect services
modSvcHandler := &ModerationServiceHandler{
store: pgxStore,
log: log,
blueskyCredentials: bskyCredentials,
store: pgxStore,
log: log,
clientCache: &cachedBlueSkyClient{
creds: bskyCredentials,
renewalThreshold: time.Minute * 5,
},
}

mux.Handle(
Expand Down
15 changes: 7 additions & 8 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,6 @@ import (
"context"
"errors"
"fmt"
v1 "github.com/strideynet/bsky-furry-feed/proto/bff/v1"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"net/http"
"sync"
"time"

"github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/api/bsky"
"github.com/bluesky-social/indigo/events"
Expand All @@ -23,12 +15,19 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/strideynet/bsky-furry-feed/bluesky"
v1 "github.com/strideynet/bsky-furry-feed/proto/bff/v1"
"github.com/strideynet/bsky-furry-feed/store"
typegen "github.com/whyrusleeping/cbor-gen"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
"net/http"
"sync"
"time"
)

// tracer is the BFF wide tracer. This is different to the tracer used in
Expand Down

1 comment on commit fbb8551

@vercel
Copy link

@vercel vercel bot commented on fbb8551 Jul 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.