diff --git a/README.md b/README.md index adc967f..6361137 100644 --- a/README.md +++ b/README.md @@ -296,7 +296,18 @@ A tenant can be registered by calling the following API ] } ``` - +```json +[ + { + "id":"1000001", + "eventTime":"2020-06-28T00:00:00Z", + "tenant":"1", + "payload":"send_email", + "mode":"UPSERT", + "deliveryOption":"PAYLOAD_ONLY" + } +] +``` ### find an event `GET /events/find?id=?&tenant=?` @@ -305,8 +316,109 @@ A tenant can be registered by calling the following API fires an event without changing its final status -## cron APIs -coming up... +## Cron support +`BigBen` provides full support for scheduling of cron expressions in a +distributed, fault tolerant manner. `BigBen` uses [cron-utils](https://github.com/jmrozanec/cron-utils) +open source package to handle cron parsing and calculating execution times. +Please see more details on various types of crons supported by the `cron-utils` package. +The coverage is quite exhaustive (QUARTZ, UNIX, Cron4J, and Spring). + +### How are crons executed? +`BigBen` uses `hazelcast` to create a lock-free distributed cron execution system. +`hazelcast` partitions its data in `271` partitions and it takes care of distributing +these partitions equally among the cluster nodes. All the cron expressions are hashed to +these partitions, which means crons get distributed across the cluster. + +Each node then spawns a thread (pool) called `cron-runner` executes every second and +checks which **local** crons are ready to execute. Note that there's no cross +node communication or locking involved in executing these crons. +Each cron requires a `tenant` which dictates how the cron is to be triggered +(much like any other event in `BigBen`). + +## cron execution guarantees +`BigBen` aims to guarantee that +1. As long as at least one node is available, the cron will execute +2. A cron will always be executed on one node only (If the node goes down then the +subsequent executions will happen on another node) +3. Each cron trigger is tried four times (like other events in `BigBen`) +(default is now, 1 second later, 2 seconds later, 4 seconds later) +4. If all tries result in failure (e.g. if tenant's http service is not +responding or kafka cluster is down) and if cron log events support is enabled + (see below) then the event is stored in log table with the (last) associated failure. + All intermediate failures are also logged in the configured `log4j` appenders as well. +5. The minimum execution interval supported is 1 second. + +## cron event log support +`BigBen` can optionally record each execution of cron trigger in a table called `cron-events` +(see `bigben-schema.cql` for table details). + +The `cron-events` uses fully qualified cron-id (a combination of user +supplied cron-id, cron-type (e.g. QUARTZ, UNIX, etc), tenant) and a `logGranularity` +time unit as partition keys and the cron `executionTime` as the event time. +The table also stores what event is triggered at the trigger time. The `cron-events` +also supports log event retention as well. + +E.g. you can set up a cron top execute every 1 minute and keep records +grouped together at DAILY level with retention of 1 week for each event. + +This support is optional. By default, the log events are turned off. + +_the cron log events are stored with consistency ONE_. You can use a different +consistency by providing `cron.log.events.write.consistency` in `bigben.yaml` - - +## cron APIs +`POST /cron` + +Sets up a cron. (**_requires tenant to be set up first_**) + +Sample request payload with event log enabled: +```json +{ + "tenant": "tenant1", + "id": "cronId1", + "expression": "*/5 * * * * ? *", + "type": "QUARTZ", + "logGranularity": "MINUTES", + "retention": 2, + "retentionUnits": "MINUTES" +} +``` +Sample request payload with event log disabled: +```json +{ + "tenant": "tenant1", + "id": "cronId1", + "expression": "*/5 * * * * ? *", + "type": "QUARTZ" +} +``` +The response comes back with a `description` that tells how the cron +will be executed. + +Sample response payload: +```json +{ + "cron": { + "id": "cronId1", + "expression": "*/5 * * * * ? *", + "type": "QUARTZ", + "tenant": "tenant1", + "logGranularity": "MINUTES", + "retention": 2, + "retentionUnits": "MINUTES" + }, + "description": "every 5 seconds" +} +``` +That's it! This cron will execute every 5 seconds as long as any node in cluster +is alive. + +`GET /cron/{tenant}/{id}` + +Returns all crons (if multiple types) identified by this tenant and +cronId combination. + +`DELETE /cron/{tenant}/{id}/{type}` + +Deletes a cron with specific combination. + diff --git a/app/pom.xml b/app/pom.xml index c1af5cb..c993fca 100644 --- a/app/pom.xml +++ b/app/pom.xml @@ -3,7 +3,7 @@ com.walmartlabs.bigben bigben - 1.0.7-SNAPSHOT + 1.0.9-SNAPSHOT bigben-app BigBen:app diff --git a/app/src/main/kotlin/com/walmartlabs/bigben/app/run.kt b/app/src/main/kotlin/com/walmartlabs/bigben/app/run.kt index c3018ce..262c9fe 100644 --- a/app/src/main/kotlin/com/walmartlabs/bigben/app/run.kt +++ b/app/src/main/kotlin/com/walmartlabs/bigben/app/run.kt @@ -64,14 +64,8 @@ fun Application.routes() { post("/generation/random") { call.respond(EventGenerator.generateEvents(call.receive())) } route("/cron") { post { call.fromAPIResponse(CronService.upsert(call.receive())) } - get("/describe") { call.fromAPIResponse(CronService.describe(call.receive())) } get("/{tenant}/{id}") { - call.fromAPIResponse( - CronService.get( - call.parameters["tenant"]!!, call.parameters["id"]!!, - call.request.queryParameters["describe"]?.toBoolean() - ) - ) + call.fromAPIResponse(CronService.get(call.parameters["tenant"]!!, call.parameters["id"]!!)) } delete("/{tenant}/{id}/{type}") { call.fromAPIResponse(CronService.delete(call.parameters["tenant"]!!, call.parameters["id"]!!, call.parameters["type"]!!)) diff --git a/build/bin/bigben-schema.cql b/build/bin/bigben-schema.cql new file mode 100644 index 0000000..d2fb241 --- /dev/null +++ b/build/bin/bigben-schema.cql @@ -0,0 +1,76 @@ +-- DROP KEYSPACE IF EXISTS bigben; + +CREATE +KEYSPACE IF NOT EXISTS bigben +WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; + +-- DROP TABLE IF EXISTS bigben.buckets; + +CREATE TABLE IF NOT EXISTS bigben.buckets +( + id timestamp PRIMARY KEY, + count bigint, + failed_shards text, + modified_at timestamp, + processed_at timestamp, + status text +); + +-- DROP TABLE IF EXISTS bigben.lookups; + +CREATE TABLE IF NOT EXISTS bigben.lookups +( + tenant text, + xref_id text, + bucket_id timestamp, + event_id text, + event_time timestamp, + l_m timestamp, + payload text, + shard int, + PRIMARY KEY ((tenant, xref_id) +) + ); + +-- DROP TABLE IF EXISTS bigben.events; + +CREATE TABLE IF NOT EXISTS bigben.events +( + bucket_id timestamp, + shard int, + event_time timestamp, + id text, + error text, + payload text, + processed_at timestamp, + status text, + tenant text, + xref_id text, + PRIMARY KEY ((bucket_id, shard), + event_time, + id +) + ) WITH CLUSTERING ORDER BY (event_time ASC, id ASC); + +-- DROP TABLE IF EXISTS bigben.kv_table; + +CREATE TABLE IF NOT EXISTS bigben.kv_table +( + key text, + column text, + l_m timestamp, + value text, + PRIMARY KEY (key, column) +) WITH CLUSTERING ORDER BY (column ASC); + +-- DROP TABLE IF EXISTS bigben.cron_events; + +CREATE TABLE IF NOT EXISTS bigben.cron_events +( + cron_id text, + bucket_id timestamp, + event_time TIMESTAMP, + event text, + PRIMARY KEY ((cron_id, bucket_id),event_time) +) WITH CLUSTERING ORDER BY (event_time DESC); + diff --git a/build/bin/bigben.jar b/build/bin/bigben.jar new file mode 100644 index 0000000..6322aa6 Binary files /dev/null and b/build/bin/bigben.jar differ diff --git a/build/bin/bigben.yaml b/build/bin/bigben.yaml new file mode 100644 index 0000000..655861b --- /dev/null +++ b/build/bin/bigben.yaml @@ -0,0 +1,146 @@ +# top level modules +modules: + - name: domain + class: com.walmartlabs.bigben.providers.domain.cassandra.CassandraModule + - name: processors + object: com.walmartlabs.bigben.processors.ProcessorRegistry + - name: hz + class: com.walmartlabs.bigben.utils.hz.Hz + - name: scheduler + object: com.walmartlabs.bigben.SchedulerModule + - name: events + object: com.walmartlabs.bigben.EventModule + - name: messaging + object: com.walmartlabs.bigben.kafka.KafkaModule + enabled: ${kafka.module.enabled:-false} + - name: cron + object: com.walmartlabs.bigben.cron.CronRunner + enabled: ${cron.module.enabled:-false} + +# hazelcast properties +hz: + template: file://hz.template.xml + group: + name: bigben-dev + password: bigben-dev + network: + autoIncrementPort: true + members: 127.0.0.1 + port: 5701 + map: + store: + writeDelay: 30 + +# message related properties +messaging.producer.factory.class: com.walmartlabs.bigben.kafka.KafkaMessageProducerFactory + +# cassandra related properties +cassandra: + keyspace: bigben + cluster: + contactPoints: 127.0.0.1 + clusterName: bigben-cluster + port: 9042 + localDataCenter: null + coreConnectionsPerLocalHost: 1 + maxConnectionsPerLocalHost: 1 + coreConnectionsPerRemoteHost: 1 + maxConnectionsPerRemoteHost: 1 + maxRequestsPerLocalConnection: 32768 + maxRequestsPerRemoteConnection: 2048 + newLocalConnectionThreshold: 3000 + newRemoteConnectionThreshold: 400 + poolTimeoutMillis: 0 + keepTCPConnectionAlive: true + connectionTimeOut: 5000 + readTimeout: 12000 + reconnectPeriod: 5 + username: null + password: null + downgradingConsistency: false + writeConsistency: LOCAL_ONE + readConsistency: LOCAL_ONE + +# kafka consumer properties +kafka: + consumers: + - num.consumers: ${num.consumers:-8} + processor.impl.class: com.walmartlabs.bigben.kafka.ProcessorImpl + topics: ${bigben.inbound.topic.name:-null} + max.poll.wait.time: ${max.poll.wait.time:-10000} + message.retry.max.count: ${message.retry.max.count:-10} + config: + key.deserializer: org.apache.kafka.common.serialization.StringDeserializer + value.deserializer: org.apache.kafka.common.serialization.StringDeserializer + bootstrap.servers: ${bigben.inbound.topic.bootstrap.servers:-null} + #fetch.min.bytes: 1 + group.id: ${group.id:-bigben-inbound} + heartbeat.interval.ms: ${heartbeat.interval.ms:-3000} + session.timeout.ms: 30000 + auto.offset.reset: ${auto.offset.reset:-latest} + fetch.max.bytes: 324000 + max.poll.interval.ms: 30000 + max.poll.records: 100 + receive.buffer.bytes: 65536 + request.timeout.ms: 60000 + #send.buffer.bytes: 131072 + enable.auto.commit: ${enable.auto.commit:-false} + producer: + config: # this is default kafka producer config, these values will be used if not supplied during the tenant registration + key.serializer: org.apache.kafka.common.serialization.StringSerializer + value.serializer: org.apache.kafka.common.serialization.StringSerializer + acks: "1" + buffer.memory: 32400 + retries: 3 + +# system properties +task: + executor: + #retry.thread.count: 8 + retry.time.units: SECONDS + delay: 1 + max.retries: 3 + backoff.multiplier: 2 + +app.server.port: 8080 +generic.future.max.get.time: 60 + +events: + scheduler.enabled: true + schedule.scan.interval.minutes: 1 + num.shard.submitters: 8 + receiver: + shard.size: 1000 + lapse.offset.minutes: 0 + delete: + max.retries: 3 + initial.delay: 1 + backoff.multiplier: 1 + submit: + initial.delay: 1 + backoff.multiplier: 1 + max.retries: 3 + processor: + max.retries: 3 + initial.delay: 1 + backoff.multiplier: 2 + eager.loading: true + tasks: + max.events.in.memory: 100000 + scheduler.worker.threads: 8 + +# bucket manager / loader related properties +buckets: + backlog.check.limit: 1440 # 1 Day + background: + load.fetch.size: 100 + load.wait.interval.seconds: 15 + +cron: + runner: + core.pool.size: 8 + load: + max.retries: 10 + delay: 1 + backoff.multiplier: 1 + time.units: "SECONDS" \ No newline at end of file diff --git a/build/docker/app_run.sh b/build/docker/app_run.sh index c6519a7..93b91c5 100755 --- a/build/docker/app_run.sh +++ b/build/docker/app_run.sh @@ -56,7 +56,7 @@ function start() { -e JAVA_OPTS="${JAVA_OPTS} -Dbigben.configs=uri://${APP_ROOT}/overrides.yaml,uri://${APP_ROOT}/bigben.yaml \ -Dapp.server.port=${SERVER_PORT} \ -Dbigben.log.file=${APP_ROOT}/logs/bigben_app_${app_port}.log \ - -Dbigben.log.config=${APP_ROOT} \ + -Dbigben.log.config=${APP_ROOT}/log4j.xml \ -Dhazelcast.local.publicAddress=${HOST_IP}:${hz_port}" \ --name "${APP_CONTAINER_NAME}_$app_port" sandeepmalik/bigben:1 let i=i+1 diff --git a/build/docker/cassandra_run.sh b/build/docker/cassandra_run.sh index f5e9139..49c215d 100755 --- a/build/docker/cassandra_run.sh +++ b/build/docker/cassandra_run.sh @@ -3,7 +3,7 @@ set -e CASSANDRA_CONTAINER_NAME=${CASSANDRA_CONTAINER_NAME:-bigben_cassandra} CASSANDRA_PORT=${CASSANDRA_PORT:-9042} CASSANDRA_GOSSIP_PORT=${CASSANDRA_GOSSIP_PORT:-7000} -HOST_IP=${HOST_IP:-`ifconfig | grep -Eo 'inet (addr:)?([0-9]*\.){3}[0-9]*' | grep -Eo '([0-9]*\.){3}[0-9]*' | grep -v '127.0.0.1'`} +HOST_IP=${HOST_IP:-`ifconfig | grep -Eo 'inet (addr:)?([0-9]*\.){3}[0-9]*' | grep -Eo '([0-9]*\.){3}[0-9]*' | grep -v '127.*'`} echo "determined host ip: $HOST_IP" echo "stopping ${CASSANDRA_CONTAINER_NAME}, if running" docker stop ${CASSANDRA_CONTAINER_NAME} || true diff --git a/build/exec/app_run.sh b/build/exec/app_run.sh index 35239f8..3a508af 100755 --- a/build/exec/app_run.sh +++ b/build/exec/app_run.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -export HOST_IP=${HOST_IP:-`ifconfig | grep -Eo 'inet (addr:)?([0-9]*\.){3}[0-9]*' | grep -Eo '([0-9]*\.){3}[0-9]*' | grep -v '127.0.0.1'`} +export HOST_IP=${HOST_IP:-`ifconfig | grep -Eo 'inet (addr:)?([0-9]*\.){3}[0-9]*' | grep -Eo '([0-9]*\.){3}[0-9]*' | grep -v '127.*'`} export SERVER_PORT=${SERVER_PORT:-8080} APP_ROOT=${PWD}/../configs export HZ_MEMBER_IPS=${HZ_MEMBER_IPS:-${HOST_IP}} @@ -43,4 +43,4 @@ while [[ ${i} -lt $(($NUM_INSTANCES + 1)) ]]; do tail -f ${LOG_FILE} fi let i=i+1 -done \ No newline at end of file +done diff --git a/cassandra/pom.xml b/cassandra/pom.xml index e8b3baa..78819ed 100644 --- a/cassandra/pom.xml +++ b/cassandra/pom.xml @@ -4,7 +4,7 @@ bigben com.walmartlabs.bigben - 1.0.7-SNAPSHOT + 1.0.9-SNAPSHOT bigben-cassandra diff --git a/cassandra/src/main/kotlin/com/walmartlabs/bigben/providers/domain/cassandra/CassandraModule.kt b/cassandra/src/main/kotlin/com/walmartlabs/bigben/providers/domain/cassandra/CassandraModule.kt index f754b3d..a65ff78 100644 --- a/cassandra/src/main/kotlin/com/walmartlabs/bigben/providers/domain/cassandra/CassandraModule.kt +++ b/cassandra/src/main/kotlin/com/walmartlabs/bigben/providers/domain/cassandra/CassandraModule.kt @@ -19,16 +19,33 @@ */ package com.walmartlabs.bigben.providers.domain.cassandra -import com.datastax.driver.core.* +import com.datastax.driver.core.Cluster +import com.datastax.driver.core.CodecRegistry import com.datastax.driver.core.HostDistance.LOCAL import com.datastax.driver.core.HostDistance.REMOTE -import com.datastax.driver.core.policies.* +import com.datastax.driver.core.PoolingOptions +import com.datastax.driver.core.PreparedStatement +import com.datastax.driver.core.ProtocolOptions +import com.datastax.driver.core.Session +import com.datastax.driver.core.SocketOptions +import com.datastax.driver.core.policies.ConstantReconnectionPolicy +import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy +import com.datastax.driver.core.policies.DefaultRetryPolicy +import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy +import com.datastax.driver.core.policies.TokenAwarePolicy import com.datastax.driver.mapping.Mapper import com.datastax.driver.mapping.Mapper.Option.consistencyLevel import com.datastax.driver.mapping.Mapper.Option.saveNullFields +import com.datastax.driver.mapping.Mapper.Option.ttl import com.datastax.driver.mapping.MappingManager import com.google.common.util.concurrent.ListenableFuture -import com.walmartlabs.bigben.entities.* +import com.walmartlabs.bigben.entities.Bucket +import com.walmartlabs.bigben.entities.EntityProvider +import com.walmartlabs.bigben.entities.Event +import com.walmartlabs.bigben.entities.EventLoader +import com.walmartlabs.bigben.entities.EventLookup +import com.walmartlabs.bigben.entities.EventStatus +import com.walmartlabs.bigben.entities.KV import com.walmartlabs.bigben.extns.nowUTC import com.walmartlabs.bigben.utils.commons.Module import com.walmartlabs.bigben.utils.commons.ModuleRegistry @@ -54,8 +71,8 @@ open class CassandraModule : EntityProvider, ClusterFactory, EventLo private val session: Session private val clusterConfig = ClusterConfig::class.java.fromJson(map("cassandra.cluster").json()) - private val writeConsistency = consistencyLevel(clusterConfig.writeConsistency) - private val readConsistency = consistencyLevel(clusterConfig.readConsistency) + private val writeConsistency = clusterConfig.writeConsistency + private val readConsistency = clusterConfig.readConsistency init { l.info("initialing the Cassandra module") @@ -77,7 +94,7 @@ open class CassandraModule : EntityProvider, ClusterFactory, EventLo Bucket::class.java -> BucketC() as T EventLookup::class.java -> EventLookupC() as T KV::class.java -> KVC() as T - else -> throw IllegalArgumentException("unknown entity $type") + else -> type.newInstance() } } @@ -91,12 +108,14 @@ open class CassandraModule : EntityProvider, ClusterFactory, EventLo } override fun fetch(selector: T): ListenableFuture { + val readC = (selector as? ConsistencyOverride)?.read()?.let { it } ?: readConsistency return mappingManager.mapper(selector::class.java).let { + val readConsistency = consistencyLevel(readC) when (selector) { is EventC -> { require( - selector.eventTime != null && selector.id != null && - selector.shard != null && selector.shard!! >= 0 + selector.eventTime != null && selector.id != null && + selector.shard != null && selector.shard!! >= 0 ) { "event keys not provided: $selector" } it.getAsync(selector.bucketId, selector.shard, selector.eventTime, selector.id, readConsistency).transform { it } } @@ -115,7 +134,7 @@ open class CassandraModule : EntityProvider, ClusterFactory, EventLo else -> throw IllegalArgumentException("unknown selector: $selector") } }.apply { - transform { if (l.isDebugEnabled) l.debug("fetched entity: {}", it) } + transform { if (l.isDebugEnabled) l.debug("fetched entity: {}, readConsistency: {}", it, readC) } } } @@ -126,8 +145,8 @@ open class CassandraModule : EntityProvider, ClusterFactory, EventLo when (selector) { is EventC -> { require( - selector.eventTime != null && selector.id != null && selector.bucketId != null && - selector.shard != null && selector.shard!! >= 0 + selector.eventTime != null && selector.id != null && selector.bucketId != null && + selector.shard != null && selector.shard!! >= 0 ) { "event keys not provided: $selector" } } is BucketC -> { @@ -141,10 +160,13 @@ open class CassandraModule : EntityProvider, ClusterFactory, EventLo require(selector.key != null && selector.column != null) { "kv keys not provided: $selector" } selector.lastModified = nowUTC() } - else -> throw IllegalArgumentException("unknown selector: $selector") } - if (l.isDebugEnabled) l.debug("saving entity {}", selector) - m.saveAsync(selector, saveNullFields(false), writeConsistency).transform { _ -> if (l.isDebugEnabled) l.debug("saved entity {}", selector); selector } + val writeConsistency = (selector as? ConsistencyOverride)?.write()?.let { it } ?: writeConsistency + val ttl = (selector as? TTLOverride)?.ttl()?.let { it } ?: 0 + + if (l.isDebugEnabled) l.debug("saving entity {}, ttl: {}, writeConsistency: {}", selector, ttl, writeConsistency) + m.saveAsync(selector, saveNullFields(false), consistencyLevel(writeConsistency), ttl(ttl)) + .transform { if (l.isDebugEnabled) l.debug("saved entity {}", selector); selector } } } @@ -155,8 +177,8 @@ open class CassandraModule : EntityProvider, ClusterFactory, EventLo when (selector) { is EventC -> { require( - selector.eventTime != null && selector.id != null && - selector.shard != null && selector.shard!! >= 0 + selector.eventTime != null && selector.id != null && + selector.shard != null && selector.shard!! >= 0 ) { "event keys not provided: $selector" } } is BucketC -> { @@ -168,42 +190,42 @@ open class CassandraModule : EntityProvider, ClusterFactory, EventLo is KVC -> { require(selector.key != null && selector.column != null) { "kv keys not provided: $selector" } } - else -> throw IllegalArgumentException("unknown selector: $selector") } - if (l.isDebugEnabled) l.debug("deleting entity: {}", selector) - m.deleteAsync(selector, writeConsistency).transform { _ -> if (l.isDebugEnabled) l.debug("deleted entity {}", selector); selector } + val deleteConsistency = (selector as? ConsistencyOverride)?.delete()?.let { it } ?: writeConsistency + if (l.isDebugEnabled) l.debug("deleting entity: {}, deleteConsistency: $deleteConsistency", selector) + m.deleteAsync(selector, consistencyLevel(deleteConsistency)).transform { if (l.isDebugEnabled) l.debug("deleted entity {}", selector); selector } } } override fun create(): Cluster { return Cluster.builder() - .withCodecRegistry(CodecRegistry().register(EnumCodec(EventStatus.values().toSet())).register(ZdtCodec())) - .withClusterName(clusterConfig.clusterName) - .withPort(clusterConfig.port) - .also { clusterConfig.compression?.run { it.withCompression(ProtocolOptions.Compression.valueOf(this)) } } - .withRetryPolicy(if (clusterConfig.downgradingConsistency) DowngradingConsistencyRetryPolicy.INSTANCE else DefaultRetryPolicy.INSTANCE) - .also { - clusterConfig.localDataCenter?.run { - it.withLoadBalancingPolicy(TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().withLocalDc(this).withUsedHostsPerRemoteDc(0).build())) - } - } - .withReconnectionPolicy(ConstantReconnectionPolicy(clusterConfig.reconnectPeriod)) - .withSocketOptions(SocketOptions().apply { - connectTimeoutMillis = clusterConfig.connectionTimeOut - readTimeoutMillis = clusterConfig.readTimeout - keepAlive = clusterConfig.keepTCPConnectionAlive - }) - .withPoolingOptions(PoolingOptions().apply { - clusterConfig.apply { - setConnectionsPerHost(LOCAL, coreConnectionsPerLocalHost, maxConnectionsPerLocalHost) - setConnectionsPerHost(REMOTE, coreConnectionsPerRemoteHost, maxConnectionsPerRemoteHost) + .withCodecRegistry(CodecRegistry().register(EnumCodec(EventStatus.values().toSet())).register(ZdtCodec())) + .withClusterName(clusterConfig.clusterName) + .withPort(clusterConfig.port) + .also { clusterConfig.compression?.run { it.withCompression(ProtocolOptions.Compression.valueOf(this)) } } + .withRetryPolicy(if (clusterConfig.downgradingConsistency) DowngradingConsistencyRetryPolicy.INSTANCE else DefaultRetryPolicy.INSTANCE) + .also { + clusterConfig.localDataCenter?.run { + it.withLoadBalancingPolicy(TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().withLocalDc(this).withUsedHostsPerRemoteDc(0).build())) + } } - heartbeatIntervalSeconds = 60 - }) - .also { clusterConfig.username?.run { it.withCredentials(this, clusterConfig.password) } } - .addContactPoints(*clusterConfig.contactPoints.split(",").toTypedArray()) - .apply { decorate(this) } - .build() + .withReconnectionPolicy(ConstantReconnectionPolicy(clusterConfig.reconnectPeriod)) + .withSocketOptions(SocketOptions().apply { + connectTimeoutMillis = clusterConfig.connectionTimeOut + readTimeoutMillis = clusterConfig.readTimeout + keepAlive = clusterConfig.keepTCPConnectionAlive + }) + .withPoolingOptions(PoolingOptions().apply { + clusterConfig.apply { + setConnectionsPerHost(LOCAL, coreConnectionsPerLocalHost, maxConnectionsPerLocalHost) + setConnectionsPerHost(REMOTE, coreConnectionsPerRemoteHost, maxConnectionsPerRemoteHost) + } + heartbeatIntervalSeconds = 60 + }) + .also { clusterConfig.username?.run { it.withCredentials(this, clusterConfig.password) } } + .addContactPoints(*clusterConfig.contactPoints.split(",").toTypedArray()) + .apply { decorate(this) } + .build() } protected open fun decorate(builder: Cluster.Builder) { diff --git a/cassandra/src/main/kotlin/com/walmartlabs/bigben/providers/domain/cassandra/Entities.kt b/cassandra/src/main/kotlin/com/walmartlabs/bigben/providers/domain/cassandra/Entities.kt index f461393..566e344 100644 --- a/cassandra/src/main/kotlin/com/walmartlabs/bigben/providers/domain/cassandra/Entities.kt +++ b/cassandra/src/main/kotlin/com/walmartlabs/bigben/providers/domain/cassandra/Entities.kt @@ -19,10 +19,21 @@ */ package com.walmartlabs.bigben.providers.domain.cassandra -import com.datastax.driver.mapping.annotations.* +import com.datastax.driver.core.ConsistencyLevel +import com.datastax.driver.mapping.annotations.ClusteringColumn +import com.datastax.driver.mapping.annotations.Column +import com.datastax.driver.mapping.annotations.PartitionKey +import com.datastax.driver.mapping.annotations.Table +import com.datastax.driver.mapping.annotations.Transient import com.hazelcast.nio.ObjectDataInput import com.hazelcast.nio.ObjectDataOutput -import com.walmartlabs.bigben.entities.* +import com.walmartlabs.bigben.entities.Bucket +import com.walmartlabs.bigben.entities.Event +import com.walmartlabs.bigben.entities.EventDeliveryOption +import com.walmartlabs.bigben.entities.EventLookup +import com.walmartlabs.bigben.entities.EventResponse +import com.walmartlabs.bigben.entities.EventStatus +import com.walmartlabs.bigben.entities.KV import com.walmartlabs.bigben.extns.utc import com.walmartlabs.bigben.hz.HzObjectFactory.Companion.BIGBEN_FACTORY_ID import com.walmartlabs.bigben.hz.HzObjectFactory.ObjectId.BUCKET @@ -32,13 +43,25 @@ import java.util.* /** * Created by smalik3 on 2/26/18 */ + +interface ConsistencyOverride { + fun read(): ConsistencyLevel? = null + fun write(): ConsistencyLevel? = null + fun delete(): ConsistencyLevel? = null +} + +interface TTLOverride { + fun ttl(): Int = 0 +} + @Table(name = "buckets") data class BucketC(@PartitionKey @Column(name = "id") override var bucketId: ZonedDateTime? = null, override var status: EventStatus? = null, override var count: Long? = null, @Column(name = "processed_at") override var processedAt: ZonedDateTime? = null, @Column(name = "modified_at") override var updatedAt: ZonedDateTime? = null, - @Column(name = "failed_shards", codec = FailedShardsCodec::class) override var failedShards: Set? = null) : Bucket { + @Column(name = "failed_shards", codec = FailedShardsCodec::class) + override var failedShards: Set? = null) : Bucket, ConsistencyOverride { @Transient override fun getFactoryId() = BIGBEN_FACTORY_ID @@ -84,7 +107,7 @@ data class EventC(@ClusteringColumn @Column(name = "event_time") override var ev @Column(name = "processed_at") override var processedAt: ZonedDateTime? = null, override var payload: String? = null, @Transient override var eventResponse: EventResponse? = null, - @Transient override var deliveryOption: EventDeliveryOption? = null) : Event + @Transient override var deliveryOption: EventDeliveryOption? = null) : Event, ConsistencyOverride @Table(name = "lookups") data class EventLookupC(@PartitionKey override var tenant: String? = null, @@ -94,11 +117,11 @@ data class EventLookupC(@PartitionKey override var tenant: String? = null, @Column(name = "event_time") override var eventTime: ZonedDateTime? = null, @Column(name = "event_id") override var eventId: String? = null, override var payload: String? = null, - @Column(name = "l_m") var lastModified: ZonedDateTime? = null) : EventLookup + @Column(name = "l_m") var lastModified: ZonedDateTime? = null) : EventLookup, ConsistencyOverride @Table(name = "kv_table") data class KVC(@PartitionKey override var key: String? = null, @ClusteringColumn override var column: String? = null, override var value: String? = null, @Column(name = "l_m") var lastModified: ZonedDateTime? = null -) : KV +) : KV, ConsistencyOverride diff --git a/cassandra/src/main/resources/bigben-schema.cql b/cassandra/src/main/resources/bigben-schema.cql index 4276616..d2fb241 100644 --- a/cassandra/src/main/resources/bigben-schema.cql +++ b/cassandra/src/main/resources/bigben-schema.cql @@ -1,55 +1,76 @@ -- DROP KEYSPACE IF EXISTS bigben; -CREATE KEYSPACE IF NOT EXISTS bigben WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; +CREATE +KEYSPACE IF NOT EXISTS bigben +WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; -- DROP TABLE IF EXISTS bigben.buckets; -CREATE TABLE IF NOT EXISTS bigben.buckets ( - id timestamp PRIMARY KEY, - count bigint, - failed_shards text, - modified_at timestamp, - processed_at timestamp, - status text +CREATE TABLE IF NOT EXISTS bigben.buckets +( + id timestamp PRIMARY KEY, + count bigint, + failed_shards text, + modified_at timestamp, + processed_at timestamp, + status text ); -- DROP TABLE IF EXISTS bigben.lookups; -CREATE TABLE IF NOT EXISTS bigben.lookups ( - tenant text, - xref_id text, - bucket_id timestamp, - event_id text, - event_time timestamp, - l_m timestamp, - payload text, - shard int, - PRIMARY KEY ((tenant, xref_id)) -); +CREATE TABLE IF NOT EXISTS bigben.lookups +( + tenant text, + xref_id text, + bucket_id timestamp, + event_id text, + event_time timestamp, + l_m timestamp, + payload text, + shard int, + PRIMARY KEY ((tenant, xref_id) +) + ); -- DROP TABLE IF EXISTS bigben.events; -CREATE TABLE IF NOT EXISTS bigben.events ( - bucket_id timestamp, - shard int, - event_time timestamp, - id text, - error text, - payload text, - processed_at timestamp, - status text, - tenant text, - xref_id text, - PRIMARY KEY ((bucket_id, shard), event_time, id) -) WITH CLUSTERING ORDER BY (event_time ASC, id ASC); +CREATE TABLE IF NOT EXISTS bigben.events +( + bucket_id timestamp, + shard int, + event_time timestamp, + id text, + error text, + payload text, + processed_at timestamp, + status text, + tenant text, + xref_id text, + PRIMARY KEY ((bucket_id, shard), + event_time, + id +) + ) WITH CLUSTERING ORDER BY (event_time ASC, id ASC); -- DROP TABLE IF EXISTS bigben.kv_table; -CREATE TABLE IF NOT EXISTS bigben.kv_table ( - key text, - column text, - l_m timestamp, - value text, - PRIMARY KEY (key, column) +CREATE TABLE IF NOT EXISTS bigben.kv_table +( + key text, + column text, + l_m timestamp, + value text, + PRIMARY KEY (key, column) ) WITH CLUSTERING ORDER BY (column ASC); +-- DROP TABLE IF EXISTS bigben.cron_events; + +CREATE TABLE IF NOT EXISTS bigben.cron_events +( + cron_id text, + bucket_id timestamp, + event_time TIMESTAMP, + event text, + PRIMARY KEY ((cron_id, bucket_id),event_time) +) WITH CLUSTERING ORDER BY (event_time DESC); + diff --git a/commons/pom.xml b/commons/pom.xml index 1b0484c..1137192 100644 --- a/commons/pom.xml +++ b/commons/pom.xml @@ -4,7 +4,7 @@ bigben com.walmartlabs.bigben - 1.0.7-SNAPSHOT + 1.0.9-SNAPSHOT bigben-commons diff --git a/commons/src/main/kotlin/com/walmartlabs/bigben/utils/_extns.kt b/commons/src/main/kotlin/com/walmartlabs/bigben/utils/_extns.kt index 9b7c938..0902fb2 100644 --- a/commons/src/main/kotlin/com/walmartlabs/bigben/utils/_extns.kt +++ b/commons/src/main/kotlin/com/walmartlabs/bigben/utils/_extns.kt @@ -23,22 +23,24 @@ import com.fasterxml.jackson.core.JsonGenerator import com.fasterxml.jackson.core.JsonParser import com.fasterxml.jackson.core.Version import com.fasterxml.jackson.core.type.TypeReference -import com.fasterxml.jackson.databind.* +import com.fasterxml.jackson.databind.DeserializationContext +import com.fasterxml.jackson.databind.JsonDeserializer +import com.fasterxml.jackson.databind.JsonSerializer +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.SerializerProvider import com.fasterxml.jackson.databind.module.SimpleModule import com.fasterxml.jackson.dataformat.yaml.YAMLFactory import com.fasterxml.jackson.module.kotlin.KotlinModule import com.google.common.base.Throwables import mu.KotlinLogging -import org.slf4j.LoggerFactory import java.time.ZonedDateTime /** * Created by smalik3 on 2/21/18 */ inline fun logger() = KotlinLogging.logger(unwrapCompanionClass(T::class.java).name) -//LoggerFactory.getLogger(unwrapCompanionClass(T::class.java).name)!! -fun logger(name: String) = LoggerFactory.getLogger(name)!! +fun logger(name: String) = KotlinLogging.logger(name) fun unwrapCompanionClass(ofClass: Class): Class<*> { return if (ofClass.enclosingClass != null && ofClass.enclosingClass.kotlin.isCompanion) { diff --git a/cron/pom.xml b/cron/pom.xml index 59c1e60..3cddc82 100644 --- a/cron/pom.xml +++ b/cron/pom.xml @@ -4,7 +4,7 @@ bigben com.walmartlabs.bigben - 1.0.7-SNAPSHOT + 1.0.9-SNAPSHOT bigben-cron @@ -22,9 +22,8 @@ 7.0.2 - com.datastax.cassandra - cassandra-driver-extras - 3.3.0 + com.walmartlabs.bigben + bigben-cassandra diff --git a/cron/src/main/kotlin/com/walmartlabs/bigben/cron/cron-hz.kt b/cron/src/main/kotlin/com/walmartlabs/bigben/cron/cron-hz.kt index 66a82f1..aced241 100644 --- a/cron/src/main/kotlin/com/walmartlabs/bigben/cron/cron-hz.kt +++ b/cron/src/main/kotlin/com/walmartlabs/bigben/cron/cron-hz.kt @@ -34,7 +34,7 @@ import com.walmartlabs.bigben.utils.commons.Props.int */ data class Crons @JvmOverloads constructor(var crons: MutableMap = HashMap()) : DataSerializable { override fun writeData(out: ObjectDataOutput) = out.run { writeInt(crons.size); crons.forEach { writeUTF(it.value.json()) } } - override fun readData(ins: ObjectDataInput) = ins.run { (1..readInt()).forEach { Cron::class.java.fromJson(readUTF()).apply { crons[cronId()] = this } } } + override fun readData(ins: ObjectDataInput) = ins.run { (1..readInt()).forEach { Cron::class.java.fromJson(readUTF()).apply { crons[fqdnCronId()] = this } } } } class CronMapStore : MapStore { diff --git a/cron/src/main/kotlin/com/walmartlabs/bigben/cron/cron-processors.kt b/cron/src/main/kotlin/com/walmartlabs/bigben/cron/cron-processors.kt index 92a5c03..956c608 100644 --- a/cron/src/main/kotlin/com/walmartlabs/bigben/cron/cron-processors.kt +++ b/cron/src/main/kotlin/com/walmartlabs/bigben/cron/cron-processors.kt @@ -19,21 +19,14 @@ */ package com.walmartlabs.bigben.cron -import com.cronutils.model.CronType -import com.cronutils.model.definition.CronDefinitionBuilder -import com.cronutils.model.time.ExecutionTime -import com.cronutils.parser.CronParser import com.hazelcast.map.AbstractEntryProcessor import com.hazelcast.nio.ObjectDataInput import com.hazelcast.nio.ObjectDataOutput import com.hazelcast.nio.serialization.DataSerializable -import com.walmartlabs.bigben.extns.utc import com.walmartlabs.bigben.utils.fromJson import com.walmartlabs.bigben.utils.json import com.walmartlabs.bigben.utils.typeRefJson import java.io.Serializable -import java.time.ZoneId -import java.time.ZonedDateTime /** * Created by smalik3 on 7/6/18 @@ -52,34 +45,13 @@ class CronDeleteEntryProcessor(cronId: String? = null) : DataSerializableEntryPr class CronEntryProcessor(c: String? = null) : DataSerializableEntryProcessor(c, true) { override fun process(entry: MutableMap.MutableEntry): Any? { val cron = Cron::class.java.fromJson(value!!) - return entry.setValue(entry.value.apply { CronRunner.crons.values.forEach { this!!.crons[cron.cronId()] = cron } }).let { null } - } -} - -class CronMatchExecutionTimeProcessor(millis: Long? = null) : DataSerializableEntryProcessor(millis?.toString(), true) { - override fun process(entry: MutableMap.MutableEntry): List { - val zdt = utc(value!!.toLong()) - return ArrayList(entry.value.crons.filter { it.value.executionTime().isMatch(zdt) }.values.map { it.json() }) + return entry.setValue(entry.value.apply { CronRunner.crons.values.forEach { this!!.crons[cron.fqdnCronId()] = cron } }).let { null } } } class CronUpdateExecutionTimeEntryProcessor(cronId: String? = null, lastExecution: String? = null) : DataSerializableEntryProcessor((cronId to lastExecution).json(), true) { override fun process(entry: MutableMap.MutableEntry): Any? { val (cronId, lastExecution) = typeRefJson>(value!!) - return entry.setValue(entry.value.apply { this!!.crons[cronId]?.let { it.lastExecutionTime = ZonedDateTime.parse(lastExecution) } }).let { null } + return entry.setValue(entry.value.apply { this!!.crons[cronId]?.let { it.lastExecutionTime = lastExecution } }).let { null } } -} - -fun main(args: Array) { - val c = CronParser(CronDefinitionBuilder.instanceDefinitionFor(CronType.UNIX)).parse("* * * * *") - val et = ExecutionTime.forCron(c) - var zdt = ZonedDateTime.now(ZoneId.of("UTC")) - var z = zdt - println(zdt) - (1..10).forEach { - val match = et.isMatch(z) - z = z.plusSeconds(1) - zdt = et.nextExecution(zdt).get() - println("match = $match, zdt = $zdt, z = $z") - } -} +} \ No newline at end of file diff --git a/cron/src/main/kotlin/com/walmartlabs/bigben/cron/cron.kt b/cron/src/main/kotlin/com/walmartlabs/bigben/cron/cron.kt index 7c4975a..be7bff9 100644 --- a/cron/src/main/kotlin/com/walmartlabs/bigben/cron/cron.kt +++ b/cron/src/main/kotlin/com/walmartlabs/bigben/cron/cron.kt @@ -24,29 +24,55 @@ import com.cronutils.model.CronType import com.cronutils.model.definition.CronDefinitionBuilder import com.cronutils.model.time.ExecutionTime import com.cronutils.parser.CronParser +import com.datastax.driver.core.ConsistencyLevel +import com.datastax.driver.mapping.annotations.Column +import com.datastax.driver.mapping.annotations.PartitionKey +import com.datastax.driver.mapping.annotations.Table +import com.datastax.driver.mapping.annotations.Transient import com.fasterxml.jackson.annotation.JsonInclude import com.fasterxml.jackson.annotation.JsonInclude.Include.NON_EMPTY +import com.google.common.cache.CacheBuilder import com.google.common.util.concurrent.ListenableFuture import com.walmartlabs.bigben.BigBen.module import com.walmartlabs.bigben.cron.CronRunner.crons import com.walmartlabs.bigben.entities.Event import com.walmartlabs.bigben.entities.EventResponse -import com.walmartlabs.bigben.entities.EventStatus -import com.walmartlabs.bigben.entities.KV -import com.walmartlabs.bigben.extns.* +import com.walmartlabs.bigben.entities.EventStatus.TRIGGERED +import com.walmartlabs.bigben.extns.event +import com.walmartlabs.bigben.extns.nowUTC +import com.walmartlabs.bigben.extns.response +import com.walmartlabs.bigben.extns.save +import com.walmartlabs.bigben.extns.toResponse import com.walmartlabs.bigben.processors.ProcessorRegistry -import com.walmartlabs.bigben.utils.* +import com.walmartlabs.bigben.providers.domain.cassandra.ConsistencyOverride +import com.walmartlabs.bigben.providers.domain.cassandra.TTLOverride +import com.walmartlabs.bigben.utils.catching import com.walmartlabs.bigben.utils.commons.Module import com.walmartlabs.bigben.utils.commons.ModuleRegistry +import com.walmartlabs.bigben.utils.commons.Props import com.walmartlabs.bigben.utils.commons.Props.int +import com.walmartlabs.bigben.utils.done import com.walmartlabs.bigben.utils.hz.Hz +import com.walmartlabs.bigben.utils.json +import com.walmartlabs.bigben.utils.listenable +import com.walmartlabs.bigben.utils.logger +import com.walmartlabs.bigben.utils.reduce +import com.walmartlabs.bigben.utils.rootCause +import com.walmartlabs.bigben.utils.stackTraceAsString +import com.walmartlabs.bigben.utils.transform +import com.walmartlabs.bigben.utils.transformAsync import java.time.ZonedDateTime import java.time.temporal.ChronoUnit -import java.time.temporal.ChronoUnit.* +import java.time.temporal.ChronoUnit.DAYS +import java.time.temporal.ChronoUnit.HOURS +import java.time.temporal.ChronoUnit.MINUTES +import java.time.temporal.ChronoUnit.MONTHS +import java.time.temporal.ChronoUnit.WEEKS +import java.time.temporal.ChronoUnit.YEARS import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executors.newScheduledThreadPool -import java.util.concurrent.TimeUnit.SECONDS +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicReference @@ -55,42 +81,38 @@ import java.util.concurrent.atomic.AtomicReference */ @JsonInclude(NON_EMPTY) data class Cron( - val id: String, val expression: String, val type: CronType, val tenant: String, - var lastExecutionTime: ZonedDateTime?, val lastUpdated: ZonedDateTime?, - val tracingEnabled: Boolean = false, val tracingGranularity: ChronoUnit = DAYS + val id: String, val expression: String, val type: CronType, val tenant: String, + var lastExecutionTime: String?, val lastUpdated: String?, + val logGranularity: ChronoUnit?, val retention: Int?, val retentionUnits: TimeUnit = TimeUnit.DAYS ) { - private val computed = ConcurrentHashMap() - - private fun parsed() = computed.computeIfAbsent(0) { CronRunner.parser(type).parse(expression)!! } as com.cronutils.model.Cron - internal fun executionTime() = computed.computeIfAbsent(1) { println("computing"); ExecutionTime.forCron(parsed()) } as ExecutionTime - init { - require(tracingGranularity in supportedGranularities) { "only $supportedGranularities granularities are supported" } + require(logGranularity in supportedGranularities) { "invalid granularity: $logGranularity, only $supportedGranularities granularities are supported" } } - fun cronId() = cronId(tenant, id, type) - override fun toString() = "${cronId()}:$expression" + fun fqdnCronId() = fqdnCronId(tenant, id, type) + override fun toString() = "${fqdnCronId()}:$expression" companion object { - fun cronId(tenant: String, id: String, type: CronType) = "$tenant/$id/$type" - private val supportedGranularities = setOf(ChronoUnit.SECONDS, MINUTES, HOURS, DAYS, WEEKS, MONTHS, YEARS) + fun fqdnCronId(tenant: String, id: String, type: CronType) = "$tenant/$id/$type" + private val supportedGranularities = setOf(MINUTES, HOURS, DAYS, WEEKS, MONTHS, YEARS) } - fun toGranularity(zdt: ZonedDateTime): String { - return when (tracingGranularity) { - YEARS -> zdt.year.toString() - MONTHS -> "${zdt.year}/${zdt.monthValue}" - WEEKS -> "${zdt.year}/${zdt.monthValue}/${WEEKS.between(zdt.withDayOfMonth(1), zdt)}" - DAYS -> "${zdt.year}/${zdt.dayOfYear}" - HOURS -> "${zdt.year}/${zdt.dayOfYear}/${zdt.hour}" - MINUTES -> "${zdt.year}/${zdt.dayOfYear}/${zdt.hour}/${zdt.minute}" - SECONDS -> "${zdt.year}/${zdt.dayOfYear}/${zdt.hour}/${zdt.minute}/${zdt.second}" - else -> throw IllegalArgumentException("unsupported unit: $tracingGranularity") - } - } + fun describe(locale: Locale = Locale.US) = CronDescriptor.instance(locale).describe(CronRunner.parser(type).parse(expression))!! +} - fun describe(locale: Locale = Locale.US) = CronDescriptor.instance(locale).run { describe(parsed()) }!! +@Table(name = "cron_events") +data class CronLogEvent( + @Transient var cron: Cron? = null, + @PartitionKey @Column(name = "cron_id") var cronId: String? = null, + @PartitionKey(1) @Column(name = "bucket_id") var bucketId: ZonedDateTime? = null, + @Column(name = "event_time") var eventTime: ZonedDateTime? = null, + var event: String? = null) : ConsistencyOverride, TTLOverride { + override fun write() = ConsistencyLevel.valueOf(Props.string("cron.log.events.write.consistency", "ONE")) + override fun ttl(): Int { + val retention = cron!!.retention + return if (retention != null) TimeUnit.SECONDS.convert(retention.toLong(), cron!!.retentionUnits).toInt() else 0 + } } object CronRunner : Module { @@ -98,39 +120,71 @@ object CronRunner : Module { private val l = logger() internal val crons = module().hz.getMap("crons") + private val invalidExecutionTimesRetention = 1L to TimeUnit.MINUTES + + private val executionTimes = CacheBuilder.newBuilder().expireAfterAccess(invalidExecutionTimesRetention.first, invalidExecutionTimesRetention.second) + .removalListener { l.info { "removing cron from execution: ${it.key}" } } + .build()!! + override fun init(registry: ModuleRegistry) { l.info("initializing the cron module: starting the cron runner(s)") val lastRun = AtomicReference() + val processor = module() + workers.scheduleAtFixedRate({ executionTimes.cleanUp() }, invalidExecutionTimesRetention.first, + invalidExecutionTimesRetention.first, invalidExecutionTimesRetention.second) workers.scheduleAtFixedRate({ - try { - val now = nowUTC().withNano(0) - if (lastRun.get() == null || now > lastRun.get()) { - lastRun.set(now) - val nowString = now.toString() - @Suppress("UNCHECKED_CAST") - val matches = (crons.executeOnKeys( - crons.localKeySet(), CronMatchExecutionTimeProcessor( - now.toInstant().toEpochMilli() - ) - ) as MutableMap>).values.flatten() - .map { Cron::class.java.fromJson(it) } - if (matches.isNotEmpty()) { - matches.map { c -> - val e = EventResponse( - c.id, nowString, c.tenant, eventId = "${c.type}/$nowString", - triggeredAt = nowString, eventStatus = EventStatus.TRIGGERED, payload = c.expression - ).event() - if (l.isDebugEnabled) l.debug("triggering event for cron: ${c.cronId()} at $nowString") - module()(e).transformAsync { updateCronExecutionTime(c, now, it!!) } - }.reduce() - .done({ l.error("cron-failed: time: $nowString, crons: ${matches.map { it.cronId() }}") }) - { if (l.isDebugEnabled) l.debug("cron-successful: time: $nowString, crons: ${matches.map { it.cronId() }}") } - } - } - } catch (e: Exception) { - l.error("error in running cron", e.rootCause()!!) - } - }, 0, 1, SECONDS) + try { + val now = nowUTC().withNano(0) + if (lastRun.get() == null || now > lastRun.get()) { + lastRun.set(now) + if (l.isDebugEnabled) l.debug("finding executable crons at: $now") + val nowString = now.toString() + val matches = crons.localKeySet().mapNotNull { crons[it] }.flatMap { + it.crons.values.filter { + executionTimes.get("${it.fqdnCronId()}/${it.expression}") { + l.info { "cron-parse: calculating execution time profile for cron: ${it.fqdnCronId()}" } + val parsed = parser(it.type).parse(it.expression) + ExecutionTime.forCron(parsed) + }.isMatch(now).apply { + if (this && l.isDebugEnabled) l.debug("cron-match: ${it.fqdnCronId()}, time: $nowString") + } + } + } + if (matches.isNotEmpty() && l.isDebugEnabled) { + l.debug("cron-match: matching crons at $nowString => ${matches.map { it.fqdnCronId() }}") + } + + matches.map { c -> + val e = EventResponse( + c.fqdnCronId(), nowString, c.tenant, eventId = "${c.fqdnCronId()}/$nowString", + triggeredAt = nowString, eventStatus = TRIGGERED, payload = c.expression + ).event() + if (l.isDebugEnabled) l.debug("cron-trigger: ${c.fqdnCronId()} at $nowString") + processor(e).catching { + EventResponse( + c.fqdnCronId(), nowString, c.tenant, eventId = "${c.fqdnCronId()}/$nowString", + triggeredAt = nowString, eventStatus = TRIGGERED, payload = c.expression + ).event().apply { error = it.stackTraceAsString() } + }.transformAsync { + if (l.isDebugEnabled) { + l.debug { "cron-trigger-successful: ${c.fqdnCronId()} at $nowString" } + l.debug { "cron-execution-time-update: ${c.fqdnCronId()} at $nowString" } + } + updateCronExecutionTime(c, now, it!!) + .done({ l.error(it.rootCause()) { "cron-execution-time-update-failure: ${c.fqdnCronId()} at $nowString" } }) { + l.debug { "cron-execution-time-update-successful: ${c.fqdnCronId()} at $nowString" } + } + } + }.reduce().done({ + l.error { "cron-trigger-failure: there were some cron failures at $nowString" } + + }) + { if (matches.isNotEmpty()) l.debug { "cron-trigger-successful: all crons succeeded" } } + } + } catch (e: Exception) { + l.error("error in running cron", e.rootCause()!!) + } + }, 0, 1, TimeUnit.SECONDS) } private val parsers = ConcurrentHashMap() @@ -138,28 +192,28 @@ object CronRunner : Module { private val index = AtomicInteger() private val workers = - newScheduledThreadPool(int("cron.runner.core.pool.size")) { Thread(it, "cron-runner#${index.incrementAndGet()}") } + newScheduledThreadPool(int("cron.runner.core.pool.size")) { Thread(it, "cron-runner#${index.incrementAndGet()}") } - private fun updateCronExecutionTime( - cron: Cron, - executionTime: ZonedDateTime, - event: Event - ): ListenableFuture { + private fun updateCronExecutionTime(cron: Cron, executionTime: ZonedDateTime, event: Event): + ListenableFuture { val f = - crons.submitToKey(cron.partition(), CronUpdateExecutionTimeEntryProcessor(cron.cronId(), executionTime.toString())) - .listenable().transform { cron } - return if (cron.tracingEnabled) { + crons.submitToKey(cron.partition(), CronUpdateExecutionTimeEntryProcessor(cron.fqdnCronId(), executionTime.toString())) + .listenable().transform { cron } + return if (cron.logGranularity != null) { f.transformAsync { - save { - it.key = "${cron.cronId()}:${cron.toGranularity(executionTime)}" - it.column = executionTime.toString(); it.value = event.toResponse().yaml() + save { + it.cron = cron + it.cronId = cron.fqdnCronId() + it.bucketId = executionTime.truncatedTo(cron.logGranularity) + it.eventTime = executionTime + it.event = event.toResponse().json() }.transform { cron } } } else f } } -private fun Cron.partition() = module().hz.partitionService.getPartition(cronId()).partitionId +private fun Cron.partition() = module().hz.partitionService.getPartition(fqdnCronId()).partitionId private fun String.partition() = module().hz.partitionService.getPartition(this).partitionId object CronService { @@ -168,19 +222,21 @@ object CronService { fun upsert(cron: Cron) = response { if (l.isInfoEnabled) l.info("creating/updating cron: $cron") - val cronId = cron.cronId() + require(cron.tenant in module().registeredTenants()) { "unknown tenant: ${cron.tenant}" } + val description = cron.describe() + val cronId = cron.fqdnCronId() val pId = cron.partition() if (l.isDebugEnabled) l.debug("cron: $cronId hashed to partition: $pId") - crons.executeOnKey(pId, CronEntryProcessor(cron.copy(lastUpdated = nowUTC(), lastExecutionTime = null).json())) + crons.executeOnKey(pId, CronEntryProcessor(cron.copy(lastUpdated = nowUTC().toString(), lastExecutionTime = null).json())) if (l.isDebugEnabled) l.debug("cron: $cronId updated successfully") - mapOf("status" to "OK") + CronDescription(cron, description) } fun delete(tenant: String, id: String, type: String) = response { val types = if (type == "*") CronType.values().toSet() else setOf(CronType.valueOf(type)) if (l.isInfoEnabled) l.info("deleting cron: $tenant/$id, types: $types") types.forEach { - val cronId = Cron.cronId(tenant, id, it) + val cronId = Cron.fqdnCronId(tenant, id, it) val pId = cronId.partition() if (l.isDebugEnabled) l.debug("cron: $cronId hashed to partition: $pId") crons.executeOnKey(pId, CronDeleteEntryProcessor(cronId)) @@ -192,12 +248,8 @@ object CronService { @JsonInclude(NON_EMPTY) data class CronDescription(val cron: Cron, val description: String?) - fun get(tenant: String, id: String, describe: Boolean?) = response { + fun get(tenant: String, id: String) = response { crons.values.flatMap { it.crons.values.filter { it.tenant == tenant && it.id == id } } - .map { CronDescription(it, describe?.run { it.describe() }) } - } - - fun describe(cron: Cron) = response { - CronDescription(cron, cron.describe()) + .map { CronDescription(it, it.describe()) } } } diff --git a/kafka/pom.xml b/kafka/pom.xml index e46147a..23faecb 100644 --- a/kafka/pom.xml +++ b/kafka/pom.xml @@ -4,7 +4,7 @@ bigben com.walmartlabs.bigben - 1.0.7-SNAPSHOT + 1.0.9-SNAPSHOT bigben-kafka diff --git a/lib/pom.xml b/lib/pom.xml index 7804448..7401083 100644 --- a/lib/pom.xml +++ b/lib/pom.xml @@ -4,7 +4,7 @@ com.walmartlabs.bigben bigben - 1.0.7-SNAPSHOT + 1.0.9-SNAPSHOT bigben-lib diff --git a/lib/src/main/kotlin/com/walmartlabs/bigben/extns/_bigben_extns.kt b/lib/src/main/kotlin/com/walmartlabs/bigben/extns/_bigben_extns.kt index 7027da0..68f56d1 100644 --- a/lib/src/main/kotlin/com/walmartlabs/bigben/extns/_bigben_extns.kt +++ b/lib/src/main/kotlin/com/walmartlabs/bigben/extns/_bigben_extns.kt @@ -20,6 +20,7 @@ package com.walmartlabs.bigben.extns import com.walmartlabs.bigben.BigBen.entityProvider +import com.walmartlabs.bigben.entities.Error import com.walmartlabs.bigben.entities.Event import com.walmartlabs.bigben.entities.EventDeliveryOption import com.walmartlabs.bigben.entities.EventDeliveryOption.FULL_EVENT @@ -40,13 +41,15 @@ fun Event.toResponse() = eventResponse?.let { it } ?: EventResponse( id = xrefId, eventId = id, triggeredAt = processedAt?.toString(), tenant = tenant, eventTime = eventTime?.toString(), payload = payload, - eventStatus = status, deliveryOption = deliveryOption(this) + eventStatus = status, deliveryOption = deliveryOption(this), error = Error(-1, error) ) fun EventResponse.event() = entityProvider().let { it.raw(it.selector(Event::class.java)) }.also { val t = ZonedDateTime.parse(triggeredAt) it.tenant = tenant; it.xrefId = id; it.eventTime = ZonedDateTime.parse(eventTime)!!; it.payload = payload - it.id = eventId; it.bucketId = t.bucket(); it.processedAt = t; if (eventId == null) it.deliveryOption = deliveryOption + it.id = eventId; it.bucketId = t.bucket(); it.processedAt = t + if (eventId == null) it.deliveryOption = deliveryOption + it.error = error?.message } fun BitSet.toSet(): MutableSet = stream().boxed().collect(Collectors.toSet())!! diff --git a/lib/src/main/kotlin/com/walmartlabs/bigben/processors/no_ops.kt b/lib/src/main/kotlin/com/walmartlabs/bigben/processors/no_ops.kt index 0d29bfb..9cde78e 100644 --- a/lib/src/main/kotlin/com/walmartlabs/bigben/processors/no_ops.kt +++ b/lib/src/main/kotlin/com/walmartlabs/bigben/processors/no_ops.kt @@ -24,12 +24,19 @@ import com.google.common.util.concurrent.ListenableFuture import com.walmartlabs.bigben.entities.Event import com.walmartlabs.bigben.entities.EventResponse import com.walmartlabs.bigben.utils.Json +import com.walmartlabs.bigben.utils.logger /** * Created by smalik3 on 6/25/18 */ class NoOpCustomClassProcessor(tenant: String, props: Json) : EventProcessor { + + companion object { + val l = logger("/dev/null") + } + override fun invoke(t: Event): ListenableFuture { + l.debug { "redirecting event: $t" } return immediateFuture(t) } } diff --git a/pom.xml b/pom.xml index cd62d2a..9946244 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ com.walmartlabs.bigben bigben pom - 1.0.7-SNAPSHOT + 1.0.9-SNAPSHOT commons lib @@ -33,14 +33,14 @@ 2.9.4.1 3.7 1.2 - 1.1.1 + 1.1.2 true jcenter - http://jcenter.bintray.com + https://jcenter.bintray.com @@ -87,27 +87,27 @@ com.walmartlabs.bigben bigben-commons - 1.0.7-SNAPSHOT + 1.0.9-SNAPSHOT com.walmartlabs.bigben bigben-lib - 1.0.7-SNAPSHOT + 1.0.9-SNAPSHOT com.walmartlabs.bigben bigben-cassandra - 1.0.7-SNAPSHOT + 1.0.9-SNAPSHOT com.walmartlabs.bigben bigben-kafka - 1.0.7-SNAPSHOT + 1.0.9-SNAPSHOT com.walmartlabs.bigben bigben-cron - 1.0.7-SNAPSHOT + 1.0.9-SNAPSHOT org.slf4j