Skip to content

Commit

Permalink
feat(sdk)!: Remove Stream#detectFields() (#2864)
Browse files Browse the repository at this point in the history
**This is a breaking change as this changes the API**

The method is from _corea_ era and most likely not needed anymore. If we
need similar functionality, we could add it to `cli-tools`.
  • Loading branch information
teogeb authored Nov 11, 2024
1 parent f5a326e commit c995e7f
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 163 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Changes before Tatum release are not documented in this file.

#### Removed

- **BREAKING CHANGE:** Remove `Stream#detectFields()` method (https://github.com/streamr-dev/network/pull/2864)
- **BREAKING CHANGE:** Remove `Stream#delete()` method (https://github.com/streamr-dev/network/pull/2863)
- use `StreamrClient#deleteStream()` instead
- Remove support for legacy encryption keys (https://github.com/streamr-dev/network/pull/2757)
Expand Down
82 changes: 0 additions & 82 deletions packages/sdk/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ import {
HexString,
StreamID,
StreamPartID,
collect,
toEthereumAddress,
toStreamPartID
} from '@streamr/utils'
import { isNumber, isString } from 'lodash'
import range from 'lodash/range'
import { PublishMetadata } from '../src/publish/Publisher'
import { Message } from './Message'
import { DEFAULT_PARTITION } from './StreamIDBuilder'
import { StreamMetadata, getPartitionCount } from './StreamMetadata'
import { StreamrClient } from './StreamrClient'
import {
Expand All @@ -21,46 +19,6 @@ import {
toInternalPermissionQuery
} from './permission'

const VALID_FIELD_TYPES = ['number', 'string', 'boolean', 'list', 'map'] as const

interface Field {
name: string
type: typeof VALID_FIELD_TYPES[number]
}

function getFieldType(value: any): (Field['type'] | undefined) {
const type = typeof value
switch (true) {
case Array.isArray(value): {
return 'list'
}
case type === 'object': {
return 'map'
}
case (VALID_FIELD_TYPES as readonly string[]).includes(type): {
// see https://github.com/microsoft/TypeScript/issues/36275
return type as Field['type']
}
default: {
return undefined
}
}
}

export const flatMerge = <TTarget>(...sources: (Partial<TTarget> | undefined)[]): TTarget => {
const result: Record<string, unknown> = {}
for (const source of sources) {
if (source !== undefined) {
for (const [key, value] of Object.entries(source)) {
if (value !== undefined) {
result[key] = value
}
}
}
}
return result as TTarget
}

/**
* A convenience API for managing and accessing an individual stream.
*
Expand Down Expand Up @@ -161,46 +119,6 @@ export class Stream {
return this.client.getStorageNodes(this.id)
}

/**
* Attempts to detect and update the {@link StreamMetadata.config} metadata of the stream by performing a resend.
*
* @remarks Only works on stored streams.
*
* @returns be mindful that in the case of there being zero messages stored, the returned promise will resolve even
* though fields were not updated
*/
async detectFields(): Promise<void> {
// Get last message of the stream to be used for field detecting
const sub = await this.client.resend(
toStreamPartID(this.id, DEFAULT_PARTITION),
{
last: 1
}
)

const receivedMsgs = await collect(sub)

if (!receivedMsgs.length) { return }

const lastMessage = receivedMsgs[0].content

const fields = Object.entries(lastMessage as any).map(([name, value]) => {
const type = getFieldType(value)
return !!type && {
name,
type,
}
}).filter(Boolean) as Field[] // see https://github.com/microsoft/TypeScript/issues/30621

// Save field config back to the stream
const merged = flatMerge(this.getMetadata(), {
config: {
fields
}
})
await this.setMetadata(merged)
}

/**
* Returns the partitions of the stream.
*/
Expand Down
81 changes: 0 additions & 81 deletions packages/sdk/test/integration/Stream.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import 'reflect-metadata'

import { Stream } from '../../src/Stream'
import { StreamrClient } from '../../src/StreamrClient'
import { FakeEnvironment } from '../test-utils/fake/FakeEnvironment'
import { FakeStorageNode } from '../test-utils/fake/FakeStorageNode'
Expand Down Expand Up @@ -53,84 +52,4 @@ describe('Stream', () => {
.toThrow('No storage node')
})
})

describe('detectFields', () => {

let stream: Stream

beforeEach(async () => {
stream = await createTestStream(client, module)
await stream.addToStorageNode(storageNode.getAddress(), { wait: true })
})

it('primitive types', async () => {
const msg = await stream.publish({
number: 123,
boolean: true,
object: {
k: 1,
v: 2,
},
array: [1, 2, 3],
string: 'test'
})
await client.waitForStorage(msg)

await stream.detectFields()

const expectedFields = [
{
name: 'number',
type: 'number',
},
{
name: 'boolean',
type: 'boolean',
},
{
name: 'object',
type: 'map',
},
{
name: 'array',
type: 'list',
},
{
name: 'string',
type: 'string',
},
]
expect((stream.getMetadata() as any).config?.fields).toEqual(expectedFields)
const loadedStream = await client.getStream(stream.id)
expect((loadedStream.getMetadata() as any).config?.fields).toEqual(expectedFields)
})

it('skips unsupported types', async () => {
const msg = await stream.publish({
null: null,
empty: {},
func: () => null,
nonexistent: undefined,
symbol: Symbol('test'),
// TODO: bigint: 10n,
})
await client.waitForStorage(msg)

await stream.detectFields()

const expectedFields = [
{
name: 'null',
type: 'map',
},
{
name: 'empty',
type: 'map',
},
]
expect((stream.getMetadata() as any).config.fields).toEqual(expectedFields)
const loadedStream = await client.getStream(stream.id)
expect((loadedStream.getMetadata() as any).config.fields).toEqual(expectedFields)
})
})
})

0 comments on commit c995e7f

Please sign in to comment.