Skip to content

Commit c91873a

Browse files
authored
Refactoring: move the CLI flags prefix '.' in the prefix (#10227)
Signed-off-by: Marco Pracucci <marco@pracucci.com>
1 parent 6dcd7a3 commit c91873a

File tree

1 file changed

+32
-32
lines changed

1 file changed

+32
-32
lines changed

pkg/storage/ingest/config.go

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ const (
2222
consumeFromEnd = "end"
2323
consumeFromTimestamp = "timestamp"
2424

25-
kafkaConfigFlagPrefix = "ingest-storage.kafka"
26-
targetConsumerLagAtStartupFlag = kafkaConfigFlagPrefix + ".target-consumer-lag-at-startup"
27-
maxConsumerLagAtStartupFlag = kafkaConfigFlagPrefix + ".max-consumer-lag-at-startup"
25+
kafkaConfigFlagPrefix = "ingest-storage.kafka."
26+
targetConsumerLagAtStartupFlag = kafkaConfigFlagPrefix + "target-consumer-lag-at-startup"
27+
maxConsumerLagAtStartupFlag = kafkaConfigFlagPrefix + "max-consumer-lag-at-startup"
2828
)
2929

3030
var (
@@ -59,7 +59,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
5959
f.BoolVar(&cfg.Enabled, "ingest-storage.enabled", false, "True to enable the ingestion via object storage.")
6060

6161
cfg.KafkaConfig.RegisterFlagsWithPrefix(kafkaConfigFlagPrefix, f)
62-
cfg.Migration.RegisterFlagsWithPrefix("ingest-storage.migration", f)
62+
cfg.Migration.RegisterFlagsWithPrefix("ingest-storage.migration.", f)
6363
}
6464

6565
// Validate the config.
@@ -145,46 +145,46 @@ func (cfg *KafkaConfig) RegisterFlags(f *flag.FlagSet) {
145145
func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
146146
cfg.concurrentFetchersFetchBackoffConfig = defaultFetchBackoffConfig
147147

148-
f.StringVar(&cfg.Address, prefix+".address", "", "The Kafka backend address.")
149-
f.StringVar(&cfg.Topic, prefix+".topic", "", "The Kafka topic name.")
150-
f.StringVar(&cfg.ClientID, prefix+".client-id", "", "The Kafka client ID.")
151-
f.DurationVar(&cfg.DialTimeout, prefix+".dial-timeout", 2*time.Second, "The maximum time allowed to open a connection to a Kafka broker.")
152-
f.DurationVar(&cfg.WriteTimeout, prefix+".write-timeout", 10*time.Second, "How long to wait for an incoming write request to be successfully committed to the Kafka backend.")
153-
f.IntVar(&cfg.WriteClients, prefix+".write-clients", 1, "The number of Kafka clients used by producers. When the configured number of clients is greater than 1, partitions are sharded among Kafka clients. A higher number of clients may provide higher write throughput at the cost of additional Metadata requests pressure to Kafka.")
148+
f.StringVar(&cfg.Address, prefix+"address", "", "The Kafka backend address.")
149+
f.StringVar(&cfg.Topic, prefix+"topic", "", "The Kafka topic name.")
150+
f.StringVar(&cfg.ClientID, prefix+"client-id", "", "The Kafka client ID.")
151+
f.DurationVar(&cfg.DialTimeout, prefix+"dial-timeout", 2*time.Second, "The maximum time allowed to open a connection to a Kafka broker.")
152+
f.DurationVar(&cfg.WriteTimeout, prefix+"write-timeout", 10*time.Second, "How long to wait for an incoming write request to be successfully committed to the Kafka backend.")
153+
f.IntVar(&cfg.WriteClients, prefix+"write-clients", 1, "The number of Kafka clients used by producers. When the configured number of clients is greater than 1, partitions are sharded among Kafka clients. A higher number of clients may provide higher write throughput at the cost of additional Metadata requests pressure to Kafka.")
154154

155-
f.StringVar(&cfg.SASLUsername, prefix+".sasl-username", "", "The username used to authenticate to Kafka using the SASL plain mechanism. To enable SASL, configure both the username and password.")
156-
f.Var(&cfg.SASLPassword, prefix+".sasl-password", "The password used to authenticate to Kafka using the SASL plain mechanism. To enable SASL, configure both the username and password.")
155+
f.StringVar(&cfg.SASLUsername, prefix+"sasl-username", "", "The username used to authenticate to Kafka using the SASL plain mechanism. To enable SASL, configure both the username and password.")
156+
f.Var(&cfg.SASLPassword, prefix+"sasl-password", "The password used to authenticate to Kafka using the SASL plain mechanism. To enable SASL, configure both the username and password.")
157157

158-
f.StringVar(&cfg.ConsumerGroup, prefix+".consumer-group", "", "The consumer group used by the consumer to track the last consumed offset. The consumer group must be different for each ingester. If the configured consumer group contains the '<partition>' placeholder, it is replaced with the actual partition ID owned by the ingester. When empty (recommended), Mimir uses the ingester instance ID to guarantee uniqueness.")
159-
f.DurationVar(&cfg.ConsumerGroupOffsetCommitInterval, prefix+".consumer-group-offset-commit-interval", time.Second, "How frequently a consumer should commit the consumed offset to Kafka. The last committed offset is used at startup to continue the consumption from where it was left.")
158+
f.StringVar(&cfg.ConsumerGroup, prefix+"consumer-group", "", "The consumer group used by the consumer to track the last consumed offset. The consumer group must be different for each ingester. If the configured consumer group contains the '<partition>' placeholder, it is replaced with the actual partition ID owned by the ingester. When empty (recommended), Mimir uses the ingester instance ID to guarantee uniqueness.")
159+
f.DurationVar(&cfg.ConsumerGroupOffsetCommitInterval, prefix+"consumer-group-offset-commit-interval", time.Second, "How frequently a consumer should commit the consumed offset to Kafka. The last committed offset is used at startup to continue the consumption from where it was left.")
160160

161-
f.DurationVar(&cfg.LastProducedOffsetPollInterval, prefix+".last-produced-offset-poll-interval", time.Second, "How frequently to poll the last produced offset, used to enforce strong read consistency.")
162-
f.DurationVar(&cfg.LastProducedOffsetRetryTimeout, prefix+".last-produced-offset-retry-timeout", 10*time.Second, "How long to retry a failed request to get the last produced offset.")
161+
f.DurationVar(&cfg.LastProducedOffsetPollInterval, prefix+"last-produced-offset-poll-interval", time.Second, "How frequently to poll the last produced offset, used to enforce strong read consistency.")
162+
f.DurationVar(&cfg.LastProducedOffsetRetryTimeout, prefix+"last-produced-offset-retry-timeout", 10*time.Second, "How long to retry a failed request to get the last produced offset.")
163163

164-
f.StringVar(&cfg.ConsumeFromPositionAtStartup, prefix+".consume-from-position-at-startup", consumeFromLastOffset, fmt.Sprintf("From which position to start consuming the partition at startup. Supported options: %s.", strings.Join(consumeFromPositionOptions, ", ")))
165-
f.Int64Var(&cfg.ConsumeFromTimestampAtStartup, prefix+".consume-from-timestamp-at-startup", 0, fmt.Sprintf("Milliseconds timestamp after which the consumption of the partition starts at startup. Only applies when consume-from-position-at-startup is %s", consumeFromTimestamp))
164+
f.StringVar(&cfg.ConsumeFromPositionAtStartup, prefix+"consume-from-position-at-startup", consumeFromLastOffset, fmt.Sprintf("From which position to start consuming the partition at startup. Supported options: %s.", strings.Join(consumeFromPositionOptions, ", ")))
165+
f.Int64Var(&cfg.ConsumeFromTimestampAtStartup, prefix+"consume-from-timestamp-at-startup", 0, fmt.Sprintf("Milliseconds timestamp after which the consumption of the partition starts at startup. Only applies when consume-from-position-at-startup is %s", consumeFromTimestamp))
166166

167167
howToDisableConsumerLagAtStartup := fmt.Sprintf("Set both -%s and -%s to 0 to disable waiting for maximum consumer lag being honored at startup.", targetConsumerLagAtStartupFlag, maxConsumerLagAtStartupFlag)
168168
f.DurationVar(&cfg.TargetConsumerLagAtStartup, targetConsumerLagAtStartupFlag, 2*time.Second, "The best-effort maximum lag a consumer tries to achieve at startup. "+howToDisableConsumerLagAtStartup)
169169
f.DurationVar(&cfg.MaxConsumerLagAtStartup, maxConsumerLagAtStartupFlag, 15*time.Second, "The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. "+howToDisableConsumerLagAtStartup)
170170

171-
f.BoolVar(&cfg.AutoCreateTopicEnabled, prefix+".auto-create-topic-enabled", true, "Enable auto-creation of Kafka topic on startup if it doesn't exist. If creating the topic fails and the topic doesn't already exist, Mimir will fail to start.")
172-
f.IntVar(&cfg.AutoCreateTopicDefaultPartitions, prefix+".auto-create-topic-default-partitions", -1, "When auto-creation of Kafka topic is enabled and this value is positive, Mimir will create the topic with this number of partitions. When the value is -1 the Kafka broker will use the default number of partitions (num.partitions configuration).")
171+
f.BoolVar(&cfg.AutoCreateTopicEnabled, prefix+"auto-create-topic-enabled", true, "Enable auto-creation of Kafka topic on startup if it doesn't exist. If creating the topic fails and the topic doesn't already exist, Mimir will fail to start.")
172+
f.IntVar(&cfg.AutoCreateTopicDefaultPartitions, prefix+"auto-create-topic-default-partitions", -1, "When auto-creation of Kafka topic is enabled and this value is positive, Mimir will create the topic with this number of partitions. When the value is -1 the Kafka broker will use the default number of partitions (num.partitions configuration).")
173173

174-
f.IntVar(&cfg.ProducerMaxRecordSizeBytes, prefix+".producer-max-record-size-bytes", maxProducerRecordDataBytesLimit, "The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes.")
175-
f.Int64Var(&cfg.ProducerMaxBufferedBytes, prefix+".producer-max-buffered-bytes", 1024*1024*1024, "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit.")
174+
f.IntVar(&cfg.ProducerMaxRecordSizeBytes, prefix+"producer-max-record-size-bytes", maxProducerRecordDataBytesLimit, "The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes.")
175+
f.Int64Var(&cfg.ProducerMaxBufferedBytes, prefix+"producer-max-buffered-bytes", 1024*1024*1024, "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit.")
176176

177-
f.DurationVar(&cfg.WaitStrongReadConsistencyTimeout, prefix+".wait-strong-read-consistency-timeout", 20*time.Second, "The maximum allowed for a read requests processed by an ingester to wait until strong read consistency is enforced. 0 to disable the timeout.")
177+
f.DurationVar(&cfg.WaitStrongReadConsistencyTimeout, prefix+"wait-strong-read-consistency-timeout", 20*time.Second, "The maximum allowed for a read requests processed by an ingester to wait until strong read consistency is enforced. 0 to disable the timeout.")
178178

179-
f.IntVar(&cfg.FetchConcurrencyMax, prefix+".fetch-concurrency-max", 0, "The maximum number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. Concurrent fetch requests are issued only when there is sufficient backlog of records to consume. 0 to disable.")
180-
f.BoolVar(&cfg.UseCompressedBytesAsFetchMaxBytes, prefix+".use-compressed-bytes-as-fetch-max-bytes", true, "When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently.")
181-
f.IntVar(&cfg.MaxBufferedBytes, prefix+".max-buffered-bytes", 100_000_000, "The maximum number of buffered records ready to be processed. This limit applies to the sum of all inflight requests. Set to 0 to disable the limit.")
179+
f.IntVar(&cfg.FetchConcurrencyMax, prefix+"fetch-concurrency-max", 0, "The maximum number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. Concurrent fetch requests are issued only when there is sufficient backlog of records to consume. 0 to disable.")
180+
f.BoolVar(&cfg.UseCompressedBytesAsFetchMaxBytes, prefix+"use-compressed-bytes-as-fetch-max-bytes", true, "When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently.")
181+
f.IntVar(&cfg.MaxBufferedBytes, prefix+"max-buffered-bytes", 100_000_000, "The maximum number of buffered records ready to be processed. This limit applies to the sum of all inflight requests. Set to 0 to disable the limit.")
182182

183-
f.IntVar(&cfg.IngestionConcurrencyMax, prefix+".ingestion-concurrency-max", 0, "The maximum number of concurrent ingestion streams to the TSDB head. Every tenant has their own set of streams. 0 to disable.")
184-
f.IntVar(&cfg.IngestionConcurrencyBatchSize, prefix+".ingestion-concurrency-batch-size", 150, "The number of timeseries to batch together before ingesting to the TSDB head. Only use this setting when -ingest-storage.kafka.ingestion-concurrency-max is greater than 0.")
185-
f.IntVar(&cfg.IngestionConcurrencyQueueCapacity, prefix+".ingestion-concurrency-queue-capacity", 5, "The number of batches to prepare and queue to ingest to the TSDB head. Only use this setting when -ingest-storage.kafka.ingestion-concurrency-max is greater than 0.")
186-
f.IntVar(&cfg.IngestionConcurrencyTargetFlushesPerShard, prefix+".ingestion-concurrency-target-flushes-per-shard", 80, "The expected number of times to ingest timeseries to the TSDB head after batching. With fewer flushes, the overhead of splitting up the work is higher than the benefit of parallelization. Only use this setting when -ingest-storage.kafka.ingestion-concurrency-max is greater than 0.")
187-
f.IntVar(&cfg.IngestionConcurrencyEstimatedBytesPerSample, prefix+".ingestion-concurrency-estimated-bytes-per-sample", 500, "The estimated number of bytes a sample has at time of ingestion. This value is used to estimate the timeseries without decompressing them. Only use this setting when -ingest-storage.kafka.ingestion-concurrency-max is greater than 0.")
183+
f.IntVar(&cfg.IngestionConcurrencyMax, prefix+"ingestion-concurrency-max", 0, "The maximum number of concurrent ingestion streams to the TSDB head. Every tenant has their own set of streams. 0 to disable.")
184+
f.IntVar(&cfg.IngestionConcurrencyBatchSize, prefix+"ingestion-concurrency-batch-size", 150, "The number of timeseries to batch together before ingesting to the TSDB head. Only use this setting when -ingest-storage.kafka.ingestion-concurrency-max is greater than 0.")
185+
f.IntVar(&cfg.IngestionConcurrencyQueueCapacity, prefix+"ingestion-concurrency-queue-capacity", 5, "The number of batches to prepare and queue to ingest to the TSDB head. Only use this setting when -ingest-storage.kafka.ingestion-concurrency-max is greater than 0.")
186+
f.IntVar(&cfg.IngestionConcurrencyTargetFlushesPerShard, prefix+"ingestion-concurrency-target-flushes-per-shard", 80, "The expected number of times to ingest timeseries to the TSDB head after batching. With fewer flushes, the overhead of splitting up the work is higher than the benefit of parallelization. Only use this setting when -ingest-storage.kafka.ingestion-concurrency-max is greater than 0.")
187+
f.IntVar(&cfg.IngestionConcurrencyEstimatedBytesPerSample, prefix+"ingestion-concurrency-estimated-bytes-per-sample", 500, "The estimated number of bytes a sample has at time of ingestion. This value is used to estimate the timeseries without decompressing them. Only use this setting when -ingest-storage.kafka.ingestion-concurrency-max is greater than 0.")
188188
}
189189

190190
func (cfg *KafkaConfig) Validate() error {
@@ -269,5 +269,5 @@ func (cfg *MigrationConfig) RegisterFlags(f *flag.FlagSet) {
269269
}
270270

271271
func (cfg *MigrationConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
272-
f.BoolVar(&cfg.DistributorSendToIngestersEnabled, prefix+".distributor-send-to-ingesters-enabled", false, "When both this option and ingest storage are enabled, distributors write to both Kafka and ingesters. A write request is considered successful only when written to both backends.")
272+
f.BoolVar(&cfg.DistributorSendToIngestersEnabled, prefix+"distributor-send-to-ingesters-enabled", false, "When both this option and ingest storage are enabled, distributors write to both Kafka and ingesters. A write request is considered successful only when written to both backends.")
273273
}

0 commit comments

Comments
 (0)