Skip to content

Commit

Permalink
Add transformDeferred hook
Browse files Browse the repository at this point in the history
  • Loading branch information
smaugfm committed Jul 14, 2023
1 parent 301fb60 commit 31f2a11
Show file tree
Hide file tree
Showing 13 changed files with 185 additions and 115 deletions.
3 changes: 3 additions & 0 deletions .idea/codeStyles/Project.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ repositories {
}

dependencies {
implementation("io.github.smaugfm:lunchmoney:1.0.1")
implementation("io.github.smaugfm:lunchmoney:1.0.2")
}
```

Expand Down
6 changes: 5 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ plugins {
}

group = "io.github.smaugfm"
version = "1.0.1"
version = "1.0.2"
val isReleaseVersion = !version.toString().endsWith("SNAPSHOT")

repositories {
Expand All @@ -30,6 +30,7 @@ val reactorNetty = "1.1.2"
val mockserver = "5.15.0"
val logback = "1.4.5"
val javaVersion = "11"
val resilience4jVersion = "1.7.0"

dependencies {
api("io.projectreactor:reactor-core:$reactorCore")
Expand All @@ -39,6 +40,9 @@ dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.5.0")
testImplementation("org.mock-server:mockserver-netty:$mockserver")
testImplementation("org.mock-server:mockserver-client-java:$mockserver")
testImplementation("io.github.resilience4j:resilience4j-retry:$resilience4jVersion")
testImplementation("io.github.resilience4j:resilience4j-reactor:$resilience4jVersion")
testImplementation("io.github.resilience4j:resilience4j-kotlin:$resilience4jVersion")
testImplementation("io.projectreactor:reactor-test:$reactorCore")
testImplementation("io.mockk:mockk:1.13.4")
testImplementation("com.willowtreeapps.assertk:assertk-jvm:0.25")
Expand Down
2 changes: 1 addition & 1 deletion detekt-baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
<ID>LongParameterList:LunchmoneyApi.kt$LunchmoneyApi$( name: String, typeName: LunchmoneyAssetType, balance: BigDecimal, subtypeName: String? = null, displayName: String? = null, balanceAsOf: Instant? = null, currency: Currency? = null, institutionName: String? = null, closedOn: LocalDate? = null, excludeTransactions: Boolean? = null )</ID>
<ID>LongParameterList:LunchmoneyApi.kt$LunchmoneyApi$( tagId: Long? = null, recurringId: Long? = null, plaidAccountId: Long? = null, categoryId: Long? = null, assetId: Long? = null, groupId: Long? = null, isGroup: Boolean? = null, status: LunchmoneyTransactionStatus? = null, offset: Long? = null, limit: Long? = null, startDate: LocalDate? = null, endDate: LocalDate? = null, debitAsNegative: Boolean? = null, pending: Boolean? = null )</ID>
<ID>LongParameterList:LunchmoneyApi.kt$LunchmoneyApi$( transactions: List&lt;LunchmoneyInsertTransaction>, applyRules: Boolean? = null, skipDuplicates: Boolean? = null, checkForRecurring: Boolean? = null, debitAsNegative: Boolean? = null, skipBalanceUpdate: Boolean? = null )</ID>
<ID>TooGenericExceptionCaught:RequestExecutor.kt$RequestExecutor$error: Throwable</ID>
<ID>TooManyFunctions:LunchmoneyApi.kt$LunchmoneyApi : LunchmoneyApiInternal</ID>
<ID>TooManyFunctions:RequestExecutor.kt$RequestExecutor</ID>
<ID>UtilityClassWithPublicConstructor:TestBase.kt$TestBase</ID>
</ManuallySuppressedIssues>
</SmellBaseline>
25 changes: 16 additions & 9 deletions src/main/kotlin/io/github/smaugfm/lunchmoney/api/LunchmoneyApi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,14 @@ import io.github.smaugfm.lunchmoney.response.LunchmoneyUpdateTransactionResponse
import io.github.smaugfm.lunchmoney.response.UpsertBudgetCategoryGroupResponse
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.json.JsonBuilder
import org.reactivestreams.Publisher
import reactor.core.publisher.Mono
import reactor.netty.resources.ConnectionProvider
import java.math.BigDecimal
import java.time.Instant
import java.time.LocalDate
import java.util.Currency
import java.util.function.Function

/**
* Non-blocking Reactor-based Lunchmoney API
Expand All @@ -78,30 +80,35 @@ class LunchmoneyApi internal constructor(
token: String,
baseUrl: String,
port: Int,
jsonBuilderActions: List<JsonBuilder.() -> Unit> = listOf(DEFAULT_JSON_BUILDER),
reactorNettyConnectionProvider: ConnectionProvider? = null
jsonBuilderCustomizer: List<JsonBuilder.() -> Unit> = listOf(DEFAULT_JSON_BUILDER),
reactorNettyConnectionProvider: ConnectionProvider? = null,
requestTransformer: Function<Publisher<Any>, Publisher<Any>>? = null
) : LunchmoneyApiInternal(
token,
baseUrl,
port,
jsonBuilderActions,
reactorNettyConnectionProvider
jsonBuilderCustomizer,
reactorNettyConnectionProvider,
requestTransformer
) {
/**
* @param token developer [token](https://lunchmoney.dev/#authentication)
* @param jsonBuilderAction customize internal [kotlinx.serialization.json.Json] instance
* @param jsonBuilderCustomizer customize internal [kotlinx.serialization.json.Json] instance
* @param reactorNettyConnectionProvider customize internal netty [reactor.netty.http.client.HttpClient]
* @param requestTransformer applies through [Mono.transformDeferred] to all resulting [Mono]'s
*/
constructor(
token: String,
jsonBuilderAction: JsonBuilder.() -> Unit = {},
reactorNettyConnectionProvider: ConnectionProvider? = null
jsonBuilderCustomizer: JsonBuilder.() -> Unit = {},
reactorNettyConnectionProvider: ConnectionProvider? = null,
requestTransformer: Function<Publisher<Any>, Publisher<Any>>? = null
) : this(
token,
LUNCHMONEY_DEV_BASE_URL,
DEFAULT_HTTP_PORT,
listOf(jsonBuilderAction, DEFAULT_JSON_BUILDER),
reactorNettyConnectionProvider
listOf(jsonBuilderCustomizer, DEFAULT_JSON_BUILDER),
reactorNettyConnectionProvider,
requestTransformer
)

fun createAsset(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,32 @@ import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonBuilder
import kotlinx.serialization.json.JsonNamingStrategy
import kotlinx.serialization.serializer
import org.reactivestreams.Publisher
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
import reactor.netty.resources.ConnectionProvider
import java.util.function.Function

open class LunchmoneyApiInternal internal constructor(
token: String,
baseUrl: String,
port: Int,
jsonBuilderActions: List<JsonBuilder.() -> Unit>,
reactorNettyConnectionProvider: ConnectionProvider?
jsonBuilderCustomizer: List<JsonBuilder.() -> Unit>,
reactorNettyConnectionProvider: ConnectionProvider?,
requestTransformer: Function<Publisher<Any>, Publisher<Any>>?
) {
internal val requestExecutor = RequestExecutor(
token,
Json(builderAction = {
jsonBuilderActions.forEach { this.it() }
jsonBuilderCustomizer.forEach { this.it() }
}),
port,
if (reactorNettyConnectionProvider != null) {
HttpClient.create(reactorNettyConnectionProvider).baseUrl(baseUrl)
} else {
HttpClient.create().baseUrl(baseUrl)
}
},
port,
requestTransformer
)

internal inline fun <reified R, reified T, A : LunchmoneyAbstractApiRequest<R, T>> execute(
Expand Down
128 changes: 53 additions & 75 deletions src/main/kotlin/io/github/smaugfm/lunchmoney/api/RequestExecutor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@ import kotlinx.serialization.json.Json
import kotlinx.serialization.json.encodeToStream
import kotlinx.serialization.serializer
import mu.KotlinLogging
import org.reactivestreams.Publisher
import reactor.core.publisher.Mono
import reactor.netty.ByteBufMono
import reactor.netty.http.client.HttpClient
import reactor.netty.http.client.HttpClientResponse
import java.io.ByteArrayOutputStream
import java.io.IOException
import java.util.function.Function

val log = KotlinLogging.logger { }

internal class RequestExecutor(
token: String,
val json: Json,
private val httpClient: HttpClient,
private val port: Int = 443,
private val httpClient: HttpClient
private val requestTransformer: Function<Publisher<Any>, Publisher<Any>>?
) {
private val authorizationHeader = "Bearer $token"

Expand All @@ -39,20 +41,26 @@ internal class RequestExecutor(
paramsSerializer: KSerializer<T>
): Mono<R> = requestMono
.flatMap { request: A ->
val body = requestBodyToByteBuffer(paramsSerializer, request.body())
httpClient
.port(port)
.headers { addHeaders(request.method(), it) }
.request(request.method())
.uri(request.pathAndQuery())
.send(Mono.just(body))
.send(requestBodyToByteBuffer(paramsSerializer, request.body()))
.responseSingle { resp, byteBufMono ->
processResponse(resp, byteBufMono, responseSerializer)
.doOnNext {
log.debug { "Response (${resp.status()}): $it" }
}
.doOnNext { log.debug { "Response (${resp.status()}): $it" } }
}.doOnSubscribe {
log.debug { "Performing Lunchmoney API request $request" }
}.let {
@Suppress("UNCHECKED_CAST")
if (requestTransformer != null) {
it.transformDeferred(
requestTransformer as Function<in Publisher<R>, out Publisher<R>>
)
} else {
it
}
}
}

Expand All @@ -73,92 +81,62 @@ internal class RequestExecutor(
resp: HttpClientResponse,
byteBufMono: ByteBufMono,
serializer: KSerializer<R>
): Mono<R> {
val statusCode = resp.status().code()
return byteBufMono
): Mono<R> =
byteBufMono
.asString()
.flatMap { body: String ->
try {
Mono.justOrEmpty(
deserializeResponseBody(
serializer,
statusCode,
body
)
).switchIfEmpty(
if (isOkResponse(resp)) {
Mono.empty()
} else {
Mono.error(
LunchmoneyApiResponseException(
body,
null,
statusCode
)
)
}
)
} catch (e: LunchmoneyApiResponseException) {
Mono.error(e)
} catch (error: Throwable) {
Mono.error(
LunchmoneyApiResponseException(
body,
error,
statusCode
)
)
}
deserializeResponseBody(serializer, resp.status().code(), body)
.transformDeferred { mapUnknownError(it, body, resp.status().code()) }
}.transformDeferred { errorOnEmptyResponse(it, resp) }

private fun <R> errorOnEmptyResponse(mono: Mono<R>, resp: HttpClientResponse): Mono<R> =
mono.switchIfEmpty(
if (isOkResponse(resp)) {
Mono.empty()
} else {
Mono.error(LunchmoneyApiResponseException(resp.status().code()))
}
}
)

private fun <R> mapUnknownError(mono: Mono<R>, body: String, statusCode: Int): Mono<R> =
mono.onErrorMap({ it !is LunchmoneyApiResponseException }) {
LunchmoneyApiResponseException(
body,
it,
statusCode
)
}

private fun <R> deserializeResponseBody(
serializer: KSerializer<R>,
status: Int,
body: String
): R? =
try {
doDeserialize(serializer, body)
} catch (e: SerializationException) {
val error = deserializeApiError(body)
if (error != null) {
throw error.toException(body, status)
} else {
throw e
): Mono<R> =
doDeserialize(serializer, body)
.onErrorResume(SerializationException::class.java) {
deserializeApiError(body)
.flatMap { Mono.error(it.toException(body, status)) }
}
}

private fun deserializeApiError(body: String): ApiErrorResponse? =
try {
doDeserialize<ApiErrorResponse>(
json.serializersModule.serializer(),
body
)
} catch (other: SerializationException) {
log.warn(other) { "Unknown error response" }
null
}
private fun deserializeApiError(body: String) =
doDeserialize<ApiErrorResponse>(json.serializersModule.serializer(), body)

private fun <T> doDeserialize(serializer: KSerializer<T>, body: String): T =
json.decodeFromString(serializer, body)
private fun <T> doDeserialize(serializer: KSerializer<T>, body: String): Mono<T> =
Mono.fromCallable { json.decodeFromString(serializer, body) }

private fun <T> serializeRequestBody(serializer: KSerializer<T>, body: T): ByteArray {
val os = ByteArrayOutputStream()
json.encodeToStream(serializer, body, os)
return os.toByteArray()
}

private fun <T> requestBodyToByteBuffer(serializer: KSerializer<T>, body: T?): ByteBuf {
if (body == null) {
return Unpooled.EMPTY_BUFFER
}
return try {
val res: ByteArray = serializeRequestBody(serializer, body)
Unpooled.wrappedBuffer(res)
} catch (e: IOException) {
throw LunchmoneyApiRequestException(e)
}
}
private fun <T> requestBodyToByteBuffer(
serializer: KSerializer<T>,
body: T?
): Mono<ByteBuf> =
Mono.justOrEmpty(body)
.mapNotNull { Unpooled.wrappedBuffer(serializeRequestBody(serializer, it!!)) }
.onErrorMap { LunchmoneyApiRequestException(it) }

private fun isOkResponse(response: HttpClientResponse) =
response.status().codeAsText().startsWith("2")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.github.smaugfm.lunchmoney.exception

import io.github.smaugfm.lunchmoney.response.ApiErrorResponse
import io.netty.handler.codec.http.HttpResponseStatus

@Suppress("MemberVisibilityCanBePrivate")
class LunchmoneyApiResponseException : LunchmoneyApiException {
Expand All @@ -10,23 +11,32 @@ class LunchmoneyApiResponseException : LunchmoneyApiException {

constructor(
body: String,
cause: Throwable?,
cause: ApiErrorResponse,
statusCode: Int
) : super(cause!!) {
this.apiErrorResponse = null
) : super(errorMessage(cause)) {
this.body = body
this.apiErrorResponse = cause
this.statusCode = statusCode
}

constructor(
body: String,
apiErrorResponse: ApiErrorResponse,
cause: Throwable,
statusCode: Int
) : super(cause) {
this.body = body
this.apiErrorResponse = null
this.statusCode = statusCode
}

constructor(
statusCode: Int
) : super(
errorMessage(apiErrorResponse)
"Response body is empty but status is $statusCode " +
HttpResponseStatus.valueOf(statusCode).reasonPhrase()
) {
this.apiErrorResponse = apiErrorResponse
this.body = body
this.body = ""
this.apiErrorResponse = null
this.statusCode = statusCode
}

Expand Down
Loading

0 comments on commit 31f2a11

Please sign in to comment.