Skip to content

Commit

Permalink
1.6.1
Browse files Browse the repository at this point in the history
* Zip chunked and non-chunked content
  • Loading branch information
pambrose authored Dec 15, 2019
1 parent 7d64aa6 commit c3af2f5
Show file tree
Hide file tree
Showing 21 changed files with 310 additions and 303 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
VERSION=1.6.0
VERSION=1.6.1

default: compile

Expand Down
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ scrape_configs:
The docker images are available via:
```bash
docker pull pambrose/prometheus-proxy:1.6.0
docker pull pambrose/prometheus-agent:1.6.0
docker pull pambrose/prometheus-proxy:1.6.1
docker pull pambrose/prometheus-agent:1.6.1
```

Start a proxy container with:
Expand All @@ -106,15 +106,15 @@ Start a proxy container with:
docker run --rm -p 8082:8082 -p 8092:8092 -p 50051:50051 -p 8080:8080 \
--env ADMIN_ENABLED=true \
--env METRICS_ENABLED=true \
pambrose/prometheus-proxy:1.6.0
pambrose/prometheus-proxy:1.6.1
```

Start an agent container with:

```bash
docker run --rm -p 8083:8083 -p 8093:8093 \
--env AGENT_CONFIG='https://raw.githubusercontent.com/pambrose/prometheus-proxy/master/examples/simple.conf' \
pambrose/prometheus-agent:1.6.0
pambrose/prometheus-agent:1.6.1
```

Using the config file [simple.conf](https://raw.githubusercontent.com/pambrose/prometheus-proxy/master/examples/simple.conf),
Expand All @@ -130,7 +130,7 @@ is in your current directory, run an agent container with:
docker run --rm -p 8083:8083 -p 8093:8093 \
--mount type=bind,source="$(pwd)"/prom-agent.conf,target=/app/prom-agent.conf \
--env AGENT_CONFIG=prom-agent.conf \
pambrose/prometheus-agent:1.6.0
pambrose/prometheus-agent:1.6.1
```

**Note:** The `WORKDIR` of the proxy and agent images is `/app`, so make sure
Expand Down Expand Up @@ -248,15 +248,15 @@ docker run --rm -p 8082:8082 -p 8092:8092 -p 50440:50440 -p 8080:8080 \
--env PROXY_CONFIG=tls-no-mutual-auth.conf \
--env ADMIN_ENABLED=true \
--env METRICS_ENABLED=true \
pambrose/prometheus-proxy:1.6.0
pambrose/prometheus-proxy:1.6.1

docker run --rm -p 8083:8083 -p 8093:8093 \
--mount type=bind,source="$(pwd)"/testing/certs,target=/app/testing/certs \
--mount type=bind,source="$(pwd)"/examples/tls-no-mutual-auth.conf,target=/app/tls-no-mutual-auth.conf \
--env AGENT_CONFIG=tls-no-mutual-auth.conf \
--env PROXY_HOSTNAME=mymachine.lan:50440 \
--name docker-agent \
pambrose/prometheus-agent:1.6.0
pambrose/prometheus-agent:1.6.1
```

**Note:** The `WORKDIR` of the proxy and agent images is `/app`, so make sure
Expand Down
2 changes: 1 addition & 1 deletion bin/docker-agent.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
docker run --rm -p 8083:8083 -p 8093:8093 \
--env AGENT_CONFIG='https://raw.githubusercontent.com/pambrose/prometheus-proxy/master/examples/simple.conf' \
--env PROXY_HOSTNAME=mymachine.lan \
pambrose/prometheus-agent:1.6.0
pambrose/prometheus-agent:1.6.1
2 changes: 1 addition & 1 deletion bin/docker-proxy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

docker run --rm -p 8082:8082 -p 8092:8092 -p 50051:50051 -p 8080:8080 \
--env PROXY_CONFIG='https://raw.githubusercontent.com/pambrose/prometheus-proxy/master/examples/simple.conf' \
pambrose/prometheus-proxy:1.6.0
pambrose/prometheus-proxy:1.6.1
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ plugins {
}

group = 'io.prometheus'
version = '1.6.0'
version = '1.6.1'

sourceCompatibility = 1.8
targetCompatibility = 1.8
Expand All @@ -30,7 +30,7 @@ def protocVersion = '3.10.0'
def serializationVersion = '0.13.0'
def slf4jVersion = '1.7.28'
def tscfgVersion = '1.3.4'
def utilsVersion = '1.1.12'
def utilsVersion = '1.1.13'
def zipkinVersion = '5.9.1'

repositories {
Expand Down
2 changes: 1 addition & 1 deletion etc/compose/proxy.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
prometheus-proxy:
autoredeploy: true
image: 'pambrose/prometheus-proxy:1.6.0'
image: 'pambrose/prometheus-proxy:1.6.1'
ports:
- '8080:8080'
- '8082:8082'
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/prometheus/package-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

@VersionAnnotation(version = "1.6.0", date = "12/15/19")
@VersionAnnotation(version = "1.6.1", date = "12/15/19")
package io.prometheus;

import io.prometheus.common.VersionAnnotation;
6 changes: 3 additions & 3 deletions src/main/kotlin/io/prometheus/Agent.kt
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,11 @@ class Agent(val options: AgentOptions,

launch(Dispatchers.Default) { grpcService.writeResponsesToProxyUntilDisconnected(connectionContext) }

for (scrapeRequestAction in connectionContext.scrapeRequestChannel) {
for (scrapeRequestAction in connectionContext.scrapeRequestsChannel) {
launch(Dispatchers.Default) {
// The fetch actually occurs here
// The fetch occurs during the invoke()
val scrapeResponse = scrapeRequestAction.invoke()
connectionContext.scrapeResultChannel.send(scrapeResponse)
connectionContext.scrapeResultsChannel.send(scrapeResponse)
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions src/main/kotlin/io/prometheus/agent/AgentConnectionContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ package io.prometheus.agent

import com.github.pambrose.common.delegate.AtomicDelegates.atomicBoolean
import io.prometheus.common.ScrapeRequestAction
import io.prometheus.grpc.NonChunkedScrapeResponse
import io.prometheus.common.ScrapeResults
import kotlinx.coroutines.channels.Channel

class AgentConnectionContext {
private var disconnected by atomicBoolean(false)
val scrapeRequestChannel = Channel<ScrapeRequestAction>(Channel.UNLIMITED)
val scrapeResultChannel = Channel<NonChunkedScrapeResponse>(Channel.UNLIMITED)
val scrapeRequestsChannel = Channel<ScrapeRequestAction>(Channel.UNLIMITED)
val scrapeResultsChannel = Channel<ScrapeResults>(Channel.UNLIMITED)

fun disconnect() {
disconnected = true
scrapeRequestChannel.cancel()
scrapeResultChannel.cancel()
scrapeRequestsChannel.cancel()
scrapeResultsChannel.cancel()
}

val connected get() = !disconnected
Expand Down
40 changes: 21 additions & 19 deletions src/main/kotlin/io/prometheus/agent/AgentGrpcService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import com.github.pambrose.common.delegate.AtomicDelegates.atomicBoolean
import com.github.pambrose.common.dsl.GrpcDsl.channel
import com.github.pambrose.common.dsl.GrpcDsl.streamObserver
import com.github.pambrose.common.util.simpleClassName
import com.github.pambrose.common.util.zip
import com.github.pambrose.common.utils.TlsContext
import com.github.pambrose.common.utils.TlsContext.Companion.PLAINTEXT_CONTEXT
import com.github.pambrose.common.utils.TlsUtils.buildClientTlsContext
Expand All @@ -40,8 +39,10 @@ import io.prometheus.common.GrpcObjects
import io.prometheus.common.GrpcObjects.newAgentInfo
import io.prometheus.common.GrpcObjects.newRegisterAgentRequest
import io.prometheus.common.GrpcObjects.newScrapeResponseChunk
import io.prometheus.common.GrpcObjects.newScrapeResponseHeader
import io.prometheus.common.GrpcObjects.newScrapeResponseSummary
import io.prometheus.common.GrpcObjects.toScrapeResponse
import io.prometheus.common.GrpcObjects.toScrapeResponseHeader
import io.prometheus.common.ScrapeResults
import io.prometheus.grpc.ProxyServiceGrpc
import io.prometheus.grpc.ProxyServiceGrpc.ProxyServiceBlockingStub
import io.prometheus.grpc.ProxyServiceGrpc.ProxyServiceStub
Expand Down Expand Up @@ -224,11 +225,11 @@ class AgentGrpcService(private val agent: Agent,
newAgentInfo(agent.agentId),
streamObserver {
onNext { request ->
// This will block, but only for the duration of the send.
// The actual fetch happens at the other end of the channel
// This will block, but only very briefly for the duration of the send.
// The actual fetch happens at the other end of the channel, not here.
runBlocking {
logger.debug { "readRequestsFromProxy(): \n$request" }
connectionContext.scrapeRequestChannel.send { agentHttpService.fetchScrapeUrl(request) }
connectionContext.scrapeRequestsChannel.send { agentHttpService.fetchScrapeUrl(request) }
agent.scrapeRequestBacklogSize.incrementAndGet()
}
}
Expand Down Expand Up @@ -267,40 +268,41 @@ class AgentGrpcService(private val agent: Agent,
val nonchunkedObserver = asyncStub.writeNonChunkedResponsesToProxy(emptyResponseObserver)
val chunkedObserver = asyncStub.writeChunkedResponsesToProxy(emptyResponseObserver)

for (ncsr in connectionContext.scrapeResultChannel) {
val scrapedId = ncsr.scrapeId
val content = ncsr.contentText
for (scrapeResults: ScrapeResults in connectionContext.scrapeResultsChannel) {
val scrapedId = scrapeResults.scrapeId
val zipped = scrapeResults.contentZipped

logger.debug { "Comparing ${content.length} and ${options.maxContentSizeKbs}" }
if (content.length < (options.maxContentSizeKbs)) {
logger.debug { "Writing non-chunked msg scrapeId: $scrapedId length: ${content.length}" }
nonchunkedObserver.onNext(ncsr)
logger.debug { "Comparing ${zipped.size} and ${options.maxContentSizeKbs}" }
if (zipped.size < options.maxContentSizeKbs) {
logger.debug { "Writing non-chunked msg scrapeId: $scrapedId length: ${zipped.size}" }
val scrapeResponse = scrapeResults.toScrapeResponse()
nonchunkedObserver.onNext(scrapeResponse)
}
else {
newScrapeResponseHeader(ncsr).also {
logger.debug { "Writing header length: ${content.length} for scrapeId: $scrapedId " }
scrapeResults.toScrapeResponseHeader().also {
logger.debug { "Writing header length: ${zipped.size} for scrapeId: $scrapedId " }
chunkedObserver.onNext(it)
}

var totalByteCount = 0
var totalChunkCount = 0
val crcChecksum = CRC32()
val bais = ByteArrayInputStream(content.zip())
val checksum = CRC32()
val bais = ByteArrayInputStream(zipped)
val buffer = ByteArray(options.maxContentSizeKbs)
var readByteCount: Int

while (bais.read(buffer).also { bytesRead -> readByteCount = bytesRead } > 0) {
totalChunkCount++
totalByteCount += readByteCount
crcChecksum.update(buffer, 0, buffer.size);
checksum.update(buffer, 0, buffer.size);

newScrapeResponseChunk(ncsr.scrapeId, totalChunkCount, readByteCount, crcChecksum, buffer).also {
newScrapeResponseChunk(scrapeResults.scrapeId, totalChunkCount, readByteCount, checksum, buffer).also {
logger.debug { "Writing chunk $totalChunkCount for scrapeId: $scrapedId" }
chunkedObserver.onNext(it)
}
}

newScrapeResponseSummary(ncsr.scrapeId, totalChunkCount, totalByteCount, crcChecksum).also {
newScrapeResponseSummary(scrapeResults.scrapeId, totalChunkCount, totalByteCount, checksum).also {
logger.debug { "Writing summary totalChunkCount: $totalChunkCount for scrapeID: $scrapedId" }
chunkedObserver.onNext(it)
}
Expand Down
82 changes: 42 additions & 40 deletions src/main/kotlin/io/prometheus/agent/AgentHttpService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,58 +21,59 @@ package io.prometheus.agent
import com.github.pambrose.common.dsl.KtorDsl.get
import com.github.pambrose.common.dsl.KtorDsl.http
import com.github.pambrose.common.util.simpleClassName
import com.github.pambrose.common.util.zip
import com.google.common.net.HttpHeaders
import io.ktor.client.request.HttpRequestBuilder
import io.ktor.client.request.header
import io.ktor.client.response.HttpResponse
import io.ktor.client.response.readText
import io.ktor.http.isSuccess
import io.prometheus.Agent
import io.prometheus.common.GrpcObjects
import io.prometheus.grpc.NonChunkedScrapeResponse
import io.prometheus.common.ScrapeResults
import io.prometheus.grpc.ScrapeRequest
import mu.KLogging
import java.io.IOException
import java.util.concurrent.atomic.AtomicReference

class AgentHttpService(val agent: Agent) {

suspend fun fetchScrapeUrl(request: ScrapeRequest): NonChunkedScrapeResponse {
val responseArg = GrpcObjects.ScrapeResponseArg(agentId = request.agentId, scrapeId = request.scrapeId)
val scrapeCounterMsg = AtomicReference("")
val path = request.path
val pathContext = agent.pathManager[path]
suspend fun fetchScrapeUrl(request: ScrapeRequest): ScrapeResults =
ScrapeResults(agentId = request.agentId, scrapeId = request.scrapeId).also { scrapeResults ->
val scrapeMsg = AtomicReference("")
val path = request.path
val pathContext = agent.pathManager[path]

if (pathContext == null) {
logger.warn { "Invalid path in fetchScrapeUrl(): $path" }
scrapeCounterMsg.set("invalid_path")
if (request.debugEnabled)
responseArg.setDebugInfo("None", "Invalid path: $path")
} else {
val requestTimer = if (agent.isMetricsEnabled) agent.startTimer() else null
val url = pathContext.url
logger.debug { "Fetching $pathContext" }
if (pathContext == null) {
logger.warn { "Invalid path in fetchScrapeUrl(): $path" }
scrapeMsg.set("invalid_path")
if (request.debugEnabled)
scrapeResults.setDebugInfo("None", "Invalid path: $path")
}
else {
val requestTimer = if (agent.isMetricsEnabled) agent.startTimer() else null
val url = pathContext.url
logger.debug { "Fetching $pathContext" }

try {
http {
get(url, getSetUp(request), getBlock(url, responseArg, scrapeCounterMsg, request.debugEnabled))
// Content is fetched here
try {
http {
get(url, getSetUp(request), getBlock(url, scrapeResults, scrapeMsg, request.debugEnabled))
}
} catch (e: IOException) {
logger.info { "Failed HTTP request: $url [${e.simpleClassName}: ${e.message}]" }
if (request.debugEnabled)
scrapeResults.setDebugInfo(url, "${e.simpleClassName} - ${e.message}")
} catch (e: Throwable) {
logger.warn(e) { "fetchScrapeUrl() $e - $url" }
if (request.debugEnabled)
scrapeResults.setDebugInfo(url, "${e.simpleClassName} - ${e.message}")
} finally {
requestTimer?.observeDuration()
}
}
} catch (e: IOException) {
logger.info { "Failed HTTP request: $url [${e.simpleClassName}: ${e.message}]" }
if (request.debugEnabled)
responseArg.setDebugInfo(url, "${e.simpleClassName} - ${e.message}")
} catch (e: Throwable) {
logger.warn(e) { "fetchScrapeUrl() $e - $url" }
if (request.debugEnabled)
responseArg.setDebugInfo(url, "${e.simpleClassName} - ${e.message}")
} finally {
requestTimer?.observeDuration()
}
}

agent.updateScrapeCounter(scrapeCounterMsg.get())
return GrpcObjects.newScrapeResponse(responseArg)
}
agent.updateScrapeCounter(scrapeMsg.get())
}

private fun getSetUp(request: ScrapeRequest): HttpRequestBuilder.() -> Unit = {
val accept: String? = request.accept
Expand All @@ -81,16 +82,17 @@ class AgentHttpService(val agent: Agent) {
}

private fun getBlock(url: String,
responseArg: GrpcObjects.ScrapeResponseArg,
responseArg: ScrapeResults,
scrapeCounterMsg: AtomicReference<String>,
debugEnabled: Boolean): suspend (HttpResponse) -> Unit =
{ response ->
responseArg.statusCode = response.status
responseArg.statusCode = response.status.value

if (response.status.isSuccess()) {
responseArg.apply {
contentText = response.readText()
contentType = response.headers[HttpHeaders.CONTENT_TYPE].orEmpty()
// Zip the content here
contentZipped = response.readText().zip()
validResponse = true
}
if (debugEnabled)
Expand All @@ -99,10 +101,10 @@ class AgentHttpService(val agent: Agent) {
}
else {
if (debugEnabled)
responseArg.setDebugInfo(url, "Unsucessful response code ${responseArg.statusCode}")
scrapeCounterMsg.set("unsuccessful")
responseArg.setDebugInfo(url, "Unsucessful response code ${responseArg.statusCode}")
scrapeCounterMsg.set("unsuccessful")
}
}
}

companion object : KLogging()
}
Loading

0 comments on commit c3af2f5

Please sign in to comment.