Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] FCL Events Service & authn-refresh updates #1914

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,584 changes: 2,012 additions & 572 deletions package-lock.json

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion packages/fcl-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"@onflow/fcl-bundle": "1.4.2",
"@onflow/typedefs": "1.3.0",
"@types/estree": "^1.0.1",
"@types/eventsource": "^1.1.15",
"@types/jest": "^29.5.4",
"@types/node": "^18.13.0",
"eslint": "^8.35.0",
Expand Down Expand Up @@ -61,6 +62,7 @@
"@onflow/util-semver": "1.0.2",
"@onflow/util-template": "1.2.2",
"@onflow/util-uid": "1.2.2",
"cross-fetch": "^3.1.6"
"cross-fetch": "^3.1.6",
"eventsource": "^2.0.2"
}
}
114 changes: 114 additions & 0 deletions packages/fcl-core/src/current-user/events-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import {CurrentUser, Service, StreamConnection} from "@onflow/typedefs"
import EventEmitter from "events"
import {execService} from "./exec-service"
import {
StreamInfo,
streamConnectorManager,
} from "./exec-service/streams/stream-connector-manager"

export enum EventTypes {
AUTHN_REFRESH = "authn-refresh",
}

export type WalletEvent = {
type: EventTypes
}

const SUBSCRIBE_EVENTS_SERVICE_TYPE = "subscribe-events"

class EventsManager extends EventEmitter {
private stream: Promise<StreamConnection<{message: any}>> | null = null
private service: Service | null = null

constructor(
private platform: string,
private subscribeCurrentUser: (
fn: (user: CurrentUser | null) => void
) => void
) {
super()
this.watchCurrentUser()
}

// Watch the current user to maintain a connection to the events service
private watchCurrentUser() {
this.subscribeCurrentUser((currentUser: CurrentUser | null) => {
function findSubscribeEventsService() {
if (!currentUser?.loggedIn) {
return null
}

const service = (currentUser.services as Service[]).find(
s => s.type === SUBSCRIBE_EVENTS_SERVICE_TYPE
)
if (!service) {
return null
}

return service
}

// Only update the connection if the service has changed
const service = findSubscribeEventsService()
if (JSON.stringify(service) !== JSON.stringify(this.service)) {
this.service = service
this.subscribe(service)
}
})
}

private async subscribe(service: Service | null) {
// Close the connection if the service is null
if (!service) {
this.close()
}

// Create a new connection
const newStream = execService({
service,
config: {},
opts: {},
platform: this.platform,
})
.then((streamInfo: StreamInfo) => {
// Connect to the stream using the appropriate connector
return streamConnectorManager.connect(streamInfo)
})
.then((stream: StreamConnection<any>) => {
// Attach the event listener once the stream is ready
stream.on("message", (data: WalletEvent) => {
this.emit(data.type, data)
})
return stream
})

// Close the previous connection & update the stream
this.close()
this.stream = newStream
}

// Close the current connection
close() {
if (this.stream) {
this.stream.then(stream => stream.close())
this.stream = null
}
}
}

let eventsManager: EventsManager | null = null

export function initEventsManager(
platform: string,
subscribeCurrentUser: (fn: (user: CurrentUser | null) => void) => void
) {
eventsManager = new EventsManager(platform, subscribeCurrentUser)
}

// Get the events manager singleton
export function getEventsManager() {
if (!eventsManager) {
throw new Error("EventsManager not initialized")
}
return eventsManager
}
1 change: 1 addition & 0 deletions packages/fcl-core/src/current-user/exec-service/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {getServiceRegistry} from "./plugins"
import {getChainId} from "../../utils"
import {VERSION} from "../../VERSION"
import {configLens} from "../../default-config"

const execStrategy = async ({service, body, config, opts}) => {
const strategy = getServiceRegistry().getStrategy(service.method)
return strategy({service, body, config, opts})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,37 @@
import {invariant} from "@onflow/util-invariant"
import {LEVELS, log} from "@onflow/util-logger"
import {isRequired, isString, isObject, isFunc} from "../../utils/is"
import {isRequired, isString, isFunc} from "../../utils/is"
import {CORE_STRATEGIES} from "../../utils/constants"

const stub = () => {
throw new Error(`Platform specific Core Strategies are not initialized`)
import {
StreamConnector,
StreamInfo,
streamConnectorManager,
} from "./streams/stream-connector-manager"
import {Service, StreamConnection} from "@onflow/typedefs"

interface ServicePlugin {
f_type: string
type: string
name: string
services: Service[]
serviceStrategy: ServiceStrategy
streamConnectors?: {
[key: string]: StreamConnector
}
}

const stubCoreStrategies = {
[CORE_STRATEGIES["EXT/RPC"]]: stub,
[CORE_STRATEGIES["HTTP/POST"]]: stub,
[CORE_STRATEGIES["IFRAME/RPC"]]: stub,
[CORE_STRATEGIES["POP/RPC"]]: stub,
[CORE_STRATEGIES["TAB/RPC"]]: stub,
[CORE_STRATEGIES["EXT/RPC"]]: stub,
interface ServiceStrategy {
method: string
exec: Function
}

const supportedPlugins = ["ServicePlugin"]
const supportedServicePlugins = ["discovery-service"]

const validateDiscoveryPlugin = servicePlugin => {
const {services, serviceStrategy} = servicePlugin
const validateDiscoveryPlugin = (servicePlugin: ServicePlugin) => {
const {services, serviceStrategy, streamConnectors} = servicePlugin
invariant(
Array.isArray(services) && services.length,
Array.isArray(services) && services.length > 0,
"Array of Discovery Services is required"
)

Expand Down Expand Up @@ -51,20 +60,48 @@ const validateDiscoveryPlugin = servicePlugin => {
"Service strategy exec function is required"
)

return {discoveryServices: services, serviceStrategy}
return {
discoveryServices: services,
serviceStrategy,
streamConnectors: streamConnectors || [],
}
}

const ServiceRegistry = ({coreStrategies}) => {
const ServiceRegistry = ({
coreStrategies,
streamConnectors,
}: {
coreStrategies: {
[key: string]: Function
}
streamConnectors: {
[key: string]: (stream: StreamInfo) => Promise<StreamConnection<any>>
}
}) => {
let services = new Set()
let strategies = new Map(Object.entries(coreStrategies))

const add = servicePlugin => {
// Add all stream connectors
for (const [type, connect] of Object.entries(streamConnectors)) {
streamConnectorManager.add(type, connect)
}

const setServices = (discoveryServices: Service[]) =>
(services = new Set([...discoveryServices]))

const getServices = () => [...services]

const getStrategy = (method: string) => strategies.get(method)

const getStrategies = () => [...strategies.keys()]

const add = (servicePlugin: ServicePlugin) => {
invariant(
supportedServicePlugins.includes(servicePlugin.type),
`Service Plugin type ${servicePlugin.type} is not supported`
)
if (servicePlugin.type === "discovery-service") {
const {discoveryServices, serviceStrategy} =
const {discoveryServices, serviceStrategy, streamConnectors} =
validateDiscoveryPlugin(servicePlugin)
setServices(discoveryServices)
if (!strategies.has(serviceStrategy.method)) {
Expand All @@ -76,29 +113,25 @@ const ServiceRegistry = ({coreStrategies}) => {
level: LEVELS.warn,
})
}

// Add all stream connectors from the service plugin
for (const [type, connect] of Object.entries(streamConnectors)) {
streamConnectorManager.add(type, connect)
}
}
}

const setServices = discoveryServices =>
(services = new Set([...discoveryServices]))

const getServices = () => [...services]

const getStrategy = method => strategies.get(method)

const getStrategies = () => [...strategies.keys()]

return Object.freeze({
add,
getServices,
getStrategy,
getStrategies,
add,
})
}

const validatePlugins = plugins => {
const validatePlugins = (plugins: ServicePlugin[]) => {
let pluginsArray
invariant(plugins, "No plugins supplied")
invariant(!!plugins, "No plugins supplied")

if (!Array.isArray(plugins)) {
pluginsArray = [plugins]
Expand All @@ -122,7 +155,7 @@ const PluginRegistry = () => {

const getPlugins = () => pluginsMap

const add = plugins => {
const add = (plugins: ServicePlugin[]) => {
const pluginsArray = validatePlugins(plugins)
for (const p of pluginsArray) {
pluginsMap.set(p.name, p)
Expand All @@ -138,28 +171,43 @@ const PluginRegistry = () => {
})
}

let serviceRegistry
let serviceRegistry: ReturnType<typeof ServiceRegistry>
const getIsServiceRegistryInitialized = () =>
typeof serviceRegistry !== "undefined"

export const initServiceRegistry = ({coreStrategies}) => {
export const initServiceRegistry = ({
coreStrategies,
streamConnectors,
}: {
coreStrategies: {
[key: string]: Function
}
streamConnectors: {
[key: string]: (stream: StreamInfo) => Promise<StreamConnection<any>>
}
}) => {
if (getIsServiceRegistryInitialized()) {
return serviceRegistry
}
const _serviceRegistry = ServiceRegistry({coreStrategies})
const _serviceRegistry = ServiceRegistry({coreStrategies, streamConnectors})
serviceRegistry = _serviceRegistry

return _serviceRegistry
}

export const getServiceRegistry = () => {
if (!getIsServiceRegistryInitialized()) {
console.warn(
"Registry is not initalized, it will be initialized with stub core strategies"
)

return initServiceRegistry({coreStrategies: stubCoreStrategies})
return initServiceRegistry({
coreStrategies: {},
streamConnectors: {},
})
}

return serviceRegistry
}

export const pluginRegistry = PluginRegistry()
Loading