Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a gRPC connector to backfila #398

Merged
merged 6 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ sourceSets {
dependencies {
implementation(libs.moshiCore)
implementation(libs.moshiKotlin)
implementation(libs.wireGrpcClient)
implementation(libs.wireRuntime)
implementation(libs.guice)
implementation(libs.retrofit)
Expand All @@ -33,6 +34,14 @@ wire {
sourcePath {
srcDir("src/main/proto")
}
kotlin {
includes = listOf(
"app.cash.backfila.protos.clientservice.BackfilaClientService",
)
rpcRole = "client"
javaInterop = true
exclusive = true
}
java {
}
}
Expand Down
3 changes: 3 additions & 0 deletions client/src/main/kotlin/app/cash/backfila/client/Connectors.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package app.cash.backfila.client
object Connectors {
const val HTTP = "HTTP"
const val ENVOY = "ENVOY"
const val GRPC = "GRPC"
}

data class HttpHeader(val name: String, val value: String)

data class HttpConnectorData @JvmOverloads constructor(val url: String, val headers: List<HttpHeader> = listOf())

data class EnvoyConnectorData @JvmOverloads constructor(val clusterType: String, val headers: List<HttpHeader> = listOf())

data class GrpcConnectorData @JvmOverloads constructor(val url: String, val headers: List<HttpHeader> = listOf())
7 changes: 7 additions & 0 deletions client/src/main/proto/app/cash/backfila/client_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,10 @@ message FinalizeBackfillRequest {

message FinalizeBackfillResponse {
}

service BackfilaClientService {
rpc PrepareBackfill(PrepareBackfillRequest) returns (PrepareBackfillResponse);
rpc GetNextBatchRange(GetNextBatchRangeRequest) returns (GetNextBatchRangeResponse);
rpc RunBatch(RunBatchRequest) returns (RunBatchResponse);
rpc FinalizeBackfill(FinalizeBackfillRequest) returns (FinalizeBackfillResponse);
}
2 changes: 1 addition & 1 deletion client/src/main/proto/app/cash/backfila/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,4 @@ message CheckBackfillStatusResponse {
RUNNING = 2;
COMPLETE = 3;
}
}
}
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ miskTailwind = { module = "com.squareup.misk:misk-tailwind", version.ref = "misk
miskTesting = { module = "com.squareup.misk:misk-testing", version.ref = "misk" }
wireCompiler = { module = "com.squareup.wire:wire-compiler", version.ref = "wire" }
wireGradlePlugin = { module = "com.squareup.wire:wire-gradle-plugin", version.ref = "wire" }
wireGrpcClient = { module = "com.squareup.wire:wire-grpc-client", version.ref = "wire" }
wireMoshiAdapter = { module = "com.squareup.wire:wire-moshi-adapter", version.ref = "wire" }
wireRuntime = { module = "com.squareup.wire:wire-runtime", version.ref = "wire" }
wireSchema = { module = "com.squareup.wire:wire-schema", version.ref = "wire" }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package app.cash.backfila.client

import app.cash.backfila.protos.clientservice.BackfilaClientServiceClient
import app.cash.backfila.protos.clientservice.GetNextBatchRangeRequest
import app.cash.backfila.protos.clientservice.GetNextBatchRangeResponse
import app.cash.backfila.protos.clientservice.PrepareBackfillRequest
import app.cash.backfila.protos.clientservice.PrepareBackfillResponse
import app.cash.backfila.protos.clientservice.RunBatchRequest
import app.cash.backfila.protos.clientservice.RunBatchResponse

internal class GrpcCallbackConnector internal constructor(
private val api: BackfilaClientServiceClient,
private val connectionLogData: String,
) : BackfilaCallbackConnector {

override fun prepareBackfill(request: PrepareBackfillRequest): PrepareBackfillResponse {
return api.PrepareBackfill().executeBlocking(request)
}

override suspend fun getNextBatchRange(request: GetNextBatchRangeRequest):
GetNextBatchRangeResponse {
return api.GetNextBatchRange().execute(request)
}

override suspend fun runBatch(request: RunBatchRequest): RunBatchResponse {
return api.RunBatch().execute(request)
}

override fun connectionLogData() = connectionLogData
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package app.cash.backfila.client

import app.cash.backfila.client.interceptors.OkHttpClientSpecifiedHeadersInterceptor
import app.cash.backfila.client.interceptors.OkHttpClientSpecifiedHeadersInterceptor.Companion.headersSizeWithinLimit
import app.cash.backfila.protos.clientservice.BackfilaClientServiceClient
import com.squareup.moshi.Moshi
import com.squareup.wire.GrpcClient
import java.net.URL
import javax.inject.Inject
import javax.inject.Singleton
import misk.client.HttpClientConfigUrlProvider
import misk.client.HttpClientFactory
import misk.client.HttpClientsConfig
import misk.moshi.adapter

@Singleton
class GrpcCallbackConnectorProvider @Inject constructor(
private val httpClientsConfig: HttpClientsConfig,
private val httpClientFactory: HttpClientFactory,
private val httpClientConfigUrlProvider: HttpClientConfigUrlProvider,
private val moshi: Moshi,
) : BackfilaCallbackConnectorProvider {

override fun validateExtraData(connectorExtraData: String?) {
connectorExtraData.let {
checkNotNull(connectorExtraData) { "Extra data required for GRPC connector" }
val fromJson = adapter().fromJson(connectorExtraData)
checkNotNull(fromJson) { "Failed to parse GRPC connector extra data JSON" }
checkNotNull(fromJson.url) { "GRPC connector extra data must contain a URL" }

if (fromJson.headers.isNotEmpty()) {
check(headersSizeWithinLimit(fromJson.headers)) { "Headers too large" }

for (header in fromJson.headers) {
checkNotNull(header.name) { "Header names must be set" }
checkNotNull(header.value) { "Header values must be set" }
}
}
}
}

override fun clientFor(
serviceName: String,
connectorExtraData: String?,
): BackfilaCallbackConnector {
val extraData = connectorExtraData.let { adapter().fromJson(connectorExtraData) }
val url = URL(extraData!!.url)
// If client-specified HTTP headers are specified, honor them.
val headers: List<HttpHeader>? = extraData!!.headers

val httpClientEndpointConfig = httpClientsConfig[url]
var okHttpClient = httpClientFactory.create(httpClientEndpointConfig)
if (!headers.isNullOrEmpty()) {
okHttpClient = okHttpClient.newBuilder()
.addInterceptor(OkHttpClientSpecifiedHeadersInterceptor(headers))
.build()
}

val baseUrl = httpClientConfigUrlProvider.getUrl(httpClientEndpointConfig)
val grpcClient = GrpcClient.Builder()
.client(okHttpClient)
.baseUrl(baseUrl)
.build()
val api = grpcClient.create(BackfilaClientServiceClient::class)
val logData = "grpcConfig: ${httpClientEndpointConfig.url}, " +
"url: ${httpClientEndpointConfig.url}, " +
"headers: $headers"
return GrpcCallbackConnector(api, logData)
}

private fun adapter() = moshi.adapter<GrpcConnectorData>()
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ class HttpCallbackConnectorProvider @Inject constructor(
override fun validateExtraData(connectorExtraData: String?) {
checkNotNull(connectorExtraData, { "Extra data required for HTTP connector" })
val fromJson = adapter().fromJson(connectorExtraData)
checkNotNull(fromJson, { "Failed to parse HTTP connector extra data JSON" })
checkNotNull(fromJson.url, { "HTTP connector extra data must contain a URL" })
checkNotNull(fromJson) { "Failed to parse HTTP connector extra data JSON" }
checkNotNull(fromJson.url) { "HTTP connector extra data must contain a URL" }

if (!fromJson.headers.isNullOrEmpty()) {
check(headersSizeWithinLimit(fromJson.headers)) { "Headers too large" }

for (header in fromJson.headers) {
checkNotNull(header.name, { "Header names must be set" })
checkNotNull(header.value, { "Header values must be set" })
checkNotNull(header.name) { "Header names must be set" }
checkNotNull(header.value) { "Header values must be set" }
}
}
}
Expand All @@ -46,6 +46,7 @@ class HttpCallbackConnectorProvider @Inject constructor(
): BackfilaCallbackConnector {
val extraData = adapter().fromJson(connectorExtraData!!)
val url = URL(extraData!!.url)
// If client-specified HTTP headers are specified, honor them.
val headers = extraData!!.headers

val httpClientEndpointConfig = httpClientsConfig[url]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import app.cash.backfila.client.BackfilaCallbackConnectorProvider
import app.cash.backfila.client.Connectors
import app.cash.backfila.client.EnvoyCallbackConnectorProvider
import app.cash.backfila.client.ForConnectors
import app.cash.backfila.client.GrpcCallbackConnectorProvider
import app.cash.backfila.client.HttpCallbackConnectorProvider
import app.cash.backfila.dashboard.BackfilaDashboardModule
import app.cash.backfila.dashboard.BackfilaWebActionsModule
Expand Down Expand Up @@ -60,6 +61,9 @@ class BackfilaServiceModule(
newMapBinder<String, BackfilaCallbackConnectorProvider>(ForConnectors::class)
.addBinding(Connectors.ENVOY)
.to(EnvoyCallbackConnectorProvider::class.java)
newMapBinder<String, BackfilaCallbackConnectorProvider>(ForConnectors::class)
.addBinding(Connectors.GRPC)
.to(GrpcCallbackConnectorProvider::class.java)

newMultibinder<Interceptor>(HttpClientNetworkInterceptor::class)

Expand Down
Loading