Skip to content

Commit

Permalink
Add support for virtual columns (#42)
Browse files Browse the repository at this point in the history
* Add support for virtual columns

* Improve docs slightly
  • Loading branch information
winsmith authored Dec 5, 2024
1 parent 316b589 commit e899372
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 6 deletions.
8 changes: 8 additions & 0 deletions Sources/DataTransferObjects/Query/CustomQuery.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public struct CustomQuery: Codable, Hashable, Equatable {
compilationStatus: CompilationStatus? = nil,
restrictions: [QueryTimeInterval]? = nil,
dataSource: String? = nil,
virtualColumns: [VirtualColumn]? = nil,
sampleFactor: Int? = nil,
descending: Bool? = nil,
filter: Filter? = nil,
Expand Down Expand Up @@ -41,6 +42,7 @@ public struct CustomQuery: Codable, Hashable, Equatable {
self.dataSource = DataSource(type: .table, name: dataSource)
}

self.virtualColumns = virtualColumns
self.sampleFactor = sampleFactor
self.descending = descending
self.baseFilters = baseFilters
Expand Down Expand Up @@ -72,6 +74,7 @@ public struct CustomQuery: Codable, Hashable, Equatable {
compilationStatus: CompilationStatus? = nil,
restrictions: [QueryTimeInterval]? = nil,
dataSource: DataSource?,
virtualColumns: [VirtualColumn]? = nil,
sampleFactor: Int? = nil,
descending: Bool? = nil,
filter: Filter? = nil,
Expand Down Expand Up @@ -101,6 +104,7 @@ public struct CustomQuery: Codable, Hashable, Equatable {
self.compilationStatus = compilationStatus
self.restrictions = restrictions
self.dataSource = dataSource
self.virtualColumns = virtualColumns
self.sampleFactor = sampleFactor
self.descending = descending
self.baseFilters = baseFilters
Expand Down Expand Up @@ -159,6 +163,8 @@ public struct CustomQuery: Codable, Hashable, Equatable {
public var restrictions: [QueryTimeInterval]?
public var dataSource: DataSource?

public var virtualColumns: [VirtualColumn]?

/// The sample factor to apply to this query
///
/// To speed up calculation, you can sample e.g. 1/10 or 1/100 of the signals, and get a good idea of the shapre of the available data.
Expand Down Expand Up @@ -232,6 +238,7 @@ public struct CustomQuery: Codable, Hashable, Equatable {
hasher.combine(compilationStatus)
hasher.combine(restrictions)
hasher.combine(dataSource)
hasher.combine(virtualColumns)
hasher.combine(sampleFactor)
hasher.combine(descending)
hasher.combine(baseFilters)
Expand Down Expand Up @@ -269,6 +276,7 @@ public struct CustomQuery: Codable, Hashable, Equatable {
compilationStatus = try container.decodeIfPresent(CompilationStatus.self, forKey: CustomQuery.CodingKeys.compilationStatus)
restrictions = try container.decodeIfPresent([QueryTimeInterval].self, forKey: CustomQuery.CodingKeys.restrictions)
dataSource = try container.decodeIfPresent(DataSource.self, forKey: CustomQuery.CodingKeys.dataSource)
virtualColumns = try container.decodeIfPresent([VirtualColumn].self, forKey: CustomQuery.CodingKeys.virtualColumns)
sampleFactor = try container.decodeIfPresent(Int.self, forKey: CustomQuery.CodingKeys.sampleFactor)
descending = try container.decodeIfPresent(Bool.self, forKey: CustomQuery.CodingKeys.descending)
baseFilters = try container.decodeIfPresent(BaseFilters.self, forKey: CustomQuery.CodingKeys.baseFilters)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/// Expression virtual columns use Druid's native expression system to allow defining query time transforms of inputs from one or more columns.
///
/// https://druid.apache.org/docs/latest/querying/math-expr
public struct ExpressionVirtualColumn: Codable, Hashable, Equatable {
public init(name: String, expression: String, outputType: String? = nil) {
self.name = name
self.expression = expression
self.outputType = outputType
}

Check warning on line 10 in Sources/DataTransferObjects/Query/Virtual Column/ExpressionVirtualColumn.swift

View workflow job for this annotation

GitHub Actions / Run Swiftlint

Lines should not have trailing whitespace. (trailing_whitespace)
public let name: String
public let expression: String
public let outputType: String?
}


Check warning on line 16 in Sources/DataTransferObjects/Query/Virtual Column/ExpressionVirtualColumn.swift

View workflow job for this annotation

GitHub Actions / Run Swiftlint

Files should have a single trailing newline. (trailing_newline)

Check warning on line 16 in Sources/DataTransferObjects/Query/Virtual Column/ExpressionVirtualColumn.swift

View workflow job for this annotation

GitHub Actions / Run Swiftlint

Limit vertical whitespace to a single empty line. Currently 2. (vertical_whitespace)
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/// This virtual column provides an alternative way to use 'list filtered' dimension spec as a virtual column. It has optimized access to the underlying column value indexes that can provide a small performance improvement in some cases.

Check warning on line 1 in Sources/DataTransferObjects/Query/Virtual Column/ListFilteredVirtualColumn.swift

View workflow job for this annotation

GitHub Actions / Run Swiftlint

Line should be 200 characters or less: currently 237 characters (line_length)
public struct ListFilteredVirtualColumn: Codable, Hashable, Equatable {
public init(name: String, delegate: String, values: [String], isAllowList: Bool? = nil) {
self.name = name
self.delegate = delegate
self.values = values
self.isAllowList = isAllowList
}

/// The output name of the virtual column
public let name: String

/// The name of the multi-value STRING input column to filter
public let delegate: String

/// Set of STRING values to allow or deny
public let values: [String]

/// If true, the output of the virtual column will be limited to the set specified by values,
/// else it will provide all values except those specified.
public let isAllowList: Bool?
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
public indirect enum VirtualColumn: Codable, Hashable, Equatable {
case expression(ExpressionVirtualColumn)
case listFiltered(ListFilteredVirtualColumn)

enum CodingKeys: String, CodingKey {
case type
}

public init(from decoder: Decoder) throws {
let values = try decoder.container(keyedBy: CodingKeys.self)
let type = try values.decode(String.self, forKey: .type)

switch type {
case "expression":
self = try .expression(ExpressionVirtualColumn(from: decoder))
case "mv-filtered":
self = try .listFiltered(ListFilteredVirtualColumn(from: decoder))
default:
throw EncodingError.invalidValue("Invalid type", .init(codingPath: [CodingKeys.type], debugDescription: "Invalid Type: \(type)", underlyingError: nil))
}
}

public func encode(to encoder: Encoder) throws {
var container = encoder.container(keyedBy: CodingKeys.self)

switch self {
case let .expression(virtualColumn):
try container.encode("expression", forKey: .type)
try virtualColumn.encode(to: encoder)
case let .listFiltered(virtualColumn):
try container.encode("mv-filtered", forKey: .type)
try virtualColumn.encode(to: encoder)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,29 @@ public struct KinesisIOConfig: Codable, Hashable, Equatable {
/// reaching their task duration. The period value specifies the maximum time between iterations. Defaults to PT30S
public let period: String?

/// If a supervisor is managing a datasource for the first time, it obtains a set of starting sequence numbers from Kinesis. This flag determines whether a supervisor retrieves the earliest or latest sequence numbers in Kinesis. Under normal circumstances, subsequent tasks start from where the previous segments ended so this flag is only used on the first run.
/// If a supervisor is managing a datasource for the first time, it obtains a set of starting sequence numbers from Kinesis. This flag determines whether a supervisor retrieves the earliest or
/// latest sequence numbers in Kinesis. Under normal circumstances, subsequent tasks start from where the previous segments ended so this flag is only used on the first run.
public let useEarliestSequenceNumber: Bool?

/// ISO 8601 period. The length of time to wait before declaring a publishing task as failed and terminating it. If the value is too low, tasks may never publish. The publishing clock for a task begins roughly after taskDuration elapses.
/// ISO 8601 period. The length of time to wait before declaring a publishing task as failed and terminating it. If the value is too low, tasks may never publish. The publishing clock for a task
/// begins roughly after taskDuration elapses.
public let completionTimeout: String?

/// ISO 8601 period. Configures tasks to reject messages with timestamps earlier than this period before the task was created. For example, if this property is set to PT1H and the supervisor creates a task at 2016-01-01T12:00Z, Druid drops messages with timestamps earlier than 2016-01-01T11:00Z. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a streaming and a nightly batch ingestion pipeline. You can specify only one of the late message rejection properties.
/// ISO 8601 period. Configures tasks to reject messages with timestamps earlier than this period before the task was created. For example, if this property is set to PT1H and the supervisor
/// creates a task at 2016-01-01T12:00Z, Druid drops messages with timestamps earlier than 2016-01-01T11:00Z. This may help prevent concurrency issues if your data stream has late messages and
/// you have multiple pipelines that need to operate on the same segments, such as a streaming and a nightly batch ingestion pipeline. You can specify only one of the late message rejection
/// properties.
public let lateMessageRejectionPeriod: String?

/// ISO 8601 period. Configures tasks to reject messages with timestamps later than this period after the task reached its task duration. For example, if this property is set to PT1H, the task duration is set to PT1H and the supervisor creates a task at 2016-01-01T12:00Z, Druid drops messages with timestamps later than 2016-01-01T14:00Z. Tasks sometimes run past their task duration, such as in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause Druid to drop messages unexpectedly whenever a task runs past its originally configured task duration.
/// ISO 8601 period. Configures tasks to reject messages with timestamps later than this period after the task reached its task duration. For example, if this property is set to PT1H, the task
/// duration is set to PT1H and the supervisor creates a task at 2016-01-01T12:00Z, Druid drops messages with timestamps later than 2016-01-01T14:00Z. Tasks sometimes run past their task duration,
/// such as in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause Druid to drop messages unexpectedly whenever a task runs past its originally configured task
/// duration.
public let earlyMessageRejectionPeriod: String?

/// ISO 8601 date time. Configures tasks to reject messages with timestamps earlier than this date time. For example, if this property is set to 2016-01-01T11:00Z and the supervisor creates a task at 2016-01-01T12:00Z, Druid drops messages with timestamps earlier than 2016-01-01T11:00Z. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a realtime and a nightly batch ingestion pipeline.
/// ISO 8601 date time. Configures tasks to reject messages with timestamps earlier than this date time. For example, if this property is set to 2016-01-01T11:00Z and the supervisor creates a
/// task at 2016-01-01T12:00Z, Druid drops messages with timestamps earlier than 2016-01-01T11:00Z. This can prevent concurrency issues if your data stream has late messages and you have multiple
/// pipelines that need to operate on the same segments, such as a realtime and a nightly batch ingestion pipeline.
public let lateMessageRejectionStartDateTime: String?

/// Defines auto scaling behavior for ingestion tasks. See Task autoscaler for more information.
Expand All @@ -91,7 +101,8 @@ public struct KinesisIOConfig: Codable, Hashable, Equatable {

/// Improved Supervisor rolling restarts
///
/// The stopTaskCount config now prioritizes stopping older tasks first. As part of this change, you must also explicitly set a value for stopTaskCount. It no longer defaults to the same value as taskCount.
/// The stopTaskCount config now prioritizes stopping older tasks first. As part of this change, you must also explicitly set a value for stopTaskCount. It no longer defaults to the same value as
/// taskCount.
///
/// See https://github.com/apache/druid/pull/15859
public let stopTaskCount: Int?
Expand Down
124 changes: 124 additions & 0 deletions Tests/QueryTests/VirtualColumnTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
@testable import DataTransferObjects
import XCTest

final class VirtualColumnTests: XCTestCase {
let tdValueString = """
{
"aggregations": [
{ "fieldName": "clientUser", "name": "count", "type": "thetaSketch" }
],
"dimension": {
"dimension": "calculatedSystemVersion",
"outputName": "fooPage",
"type": "default"
},
"granularity": "all",
"metric": { "metric": "count", "type": "numeric" },
"queryType": "topN",
"threshold": 10,
"virtualColumns": [
{
"expression": "nvl(majorMinorSystemVersion,concat('TelemetryDeck.Device.operatingSystem+'-'+nvl(OSVersion,'unknown')))",
"name": "calculatedSystemVersion",
"outputType": "STRING",
"type": "expression"
}
]
}
"""
.filter { !$0.isWhitespace }

let tdValue = CustomQuery(
queryType: .topN,
virtualColumns: [
.expression(
.init(
name: "calculatedSystemVersion",
expression: "nvl(majorMinorSystemVersion,concat('TelemetryDeck.Device.operatingSystem+'-'+nvl(OSVersion,'unknown')))",
outputType: "STRING"
)
)
],
granularity: .all,
aggregations: [
.thetaSketch(.init(type: .thetaSketch, name: "count", fieldName: "clientUser"))
],
threshold: 10,
metric: .numeric(.init(metric: "count")),
dimension: .default(.init(dimension: "calculatedSystemVersion", outputName: "fooPage"))
)

let testedType = CustomQuery.self

func testDecodingDocsExample() throws {
let decodedValue = try JSONDecoder.telemetryDecoder.decode(testedType, from: tdValueString.data(using: .utf8)!)
XCTAssertEqual(tdValue, decodedValue)
}

func testEncodingDocsExample() throws {
let encodedValue = try JSONEncoder.telemetryEncoder.encode(tdValue)
XCTAssertEqual(tdValueString, String(data: encodedValue, encoding: .utf8)!)
}
}

final class ExpressionVirtualColumnTests: XCTestCase {
let docsValueString = """
{
"expression": "<rowexpression>",
"name": "<nameofthevirtualcolumn>",
"outputType": "FLOAT",
"type": "expression"
}
"""
.filter { !$0.isWhitespace }

let docsValue = VirtualColumn.expression(
ExpressionVirtualColumn(name: "<nameofthevirtualcolumn>", expression: "<rowexpression>", outputType: "FLOAT")
)

let testedType = VirtualColumn.self

func testDecodingDocsExample() throws {
let decodedValue = try JSONDecoder.telemetryDecoder.decode(testedType, from: docsValueString.data(using: .utf8)!)
XCTAssertEqual(docsValue, decodedValue)
}

func testEncodingDocsExample() throws {
let encodedValue = try JSONEncoder.telemetryEncoder.encode(docsValue)
XCTAssertEqual(docsValueString, String(data: encodedValue, encoding: .utf8)!)
}
}

final class ListFilteredVirtualColumnTests: XCTestCase {
let docsValueString = """
{
"delegate": "dim3",
"isAllowList": true,
"name": "filteredDim3",
"type": "mv-filtered",
"values": ["hello", "world"]
}
"""
.filter { !$0.isWhitespace }

let docsValue = VirtualColumn.listFiltered(
.init(
name: "filteredDim3",
delegate: "dim3",
values: ["hello", "world"],
isAllowList: true
)
)

let testedType = VirtualColumn.self

func testDecodingDocsExample() throws {
let decodedValue = try JSONDecoder.telemetryDecoder.decode(testedType, from: docsValueString.data(using: .utf8)!)
XCTAssertEqual(docsValue, decodedValue)
}

func testEncodingDocsExample() throws {
let encodedValue = try JSONEncoder.telemetryEncoder.encode(docsValue)
XCTAssertEqual(docsValueString, String(data: encodedValue, encoding: .utf8)!)
}
}

0 comments on commit e899372

Please sign in to comment.