Skip to content
This repository has been archived by the owner on Oct 30, 2022. It is now read-only.

重复消费问题 #131

Open
iguidao opened this issue Jun 28, 2018 · 4 comments
Open

重复消费问题 #131

iguidao opened this issue Jun 28, 2018 · 4 comments

Comments

@iguidao
Copy link

iguidao commented Jun 28, 2018

childe,你好:
首先非常感谢使用了hangout对我的帮助,我这里遇到一个问题,想请教你。
**问题:**我的hangout在消费kafka的并写入到Elasticsearch的时候,发现Elasticsearch上日志数据量比kafka的LOG-END-OFFSET数据要多,而kafka的LOG-END-OFFSET数据与我的日志文件数据量是一样的。
比如我的日志条数是48078条,我的kafka的LOG-END-OFFSE总和也是48078,但是我的Elasticsearch上查看到的日志是51334条,很好奇多余的几千条数据是怎么来的。
**我的理解:**我感觉可能是hangout消费kafka后,kafka并没有认为hangout消费了,所以数据显示没有被消费,则又被hangout消费了一遍,那么请问hangout消费一条数据之后,会做什么处理?
**注:**由于某些原因,我的ES版本不能升级。
下面是我的应用环境:
ES版本:Elasticsearch 2.3.5
hangout版本:hangout-0.1.8.2-ES2.3.5
kafka版本:kafka_2.11-1.1.0
zookeeper版本:zookeeper-3.4.12
下面是我的hangout配置

inputs:
    - NewKafka:
        codec: json
        topic:
          topic-test: 6
        consumer_settings:
          bootstrap.servers:  192.168.1.10:9092,192.168.1.11:9092,192.168.1.12:9092
          value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
          key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
          group.id: topic-test
outputs:
    - Elasticsearch:
        cluster: es-cluster # cluster name, required
        hosts: # required
                - 192.168.10.10:9301
                - 192.168.10.11:9301
                - 192.168.10.12:9301
                - 192.168.10.13:9301
                - 192.168.10.14:9301
        index: 'hangout-test'
        index_type: logs # default logs
        bulk_actions: 20000 # default 20000
        bulk_size: 15   #default 15
        flush_interval: 10      #default 10
        concurrent_requests: 0  #default 0
        timezone: "Asia/Shanghai" # defaut UTC 时区. 只用于生成索引名字的字符串格式化
        sniff: false
@childe
Copy link
Owner

childe commented Jun 29, 2018

很有可能是kafka group consumer 做了rebalance.
默认kafka是XX秒commit一次当前的offset. 如果新起一个消费者, 他会Join Group, 导致Group里面的成员之前的memberID失效. 当commit的时候, 新的memberID还没有拿到. 所以这段时间的数据可能会重复消费.
上面只是我的猜测 :) 你可以看一下client/server端日志, 是不是有rebalance.

@iguidao
Copy link
Author

iguidao commented Jun 29, 2018

childe,你好:
我说明以下三点:
1、如你所说,我做了个测试,重启hangout的时候,确实kafka会有rebalance group topic-test的日志信息出现,但是我昨天一直到今天并没有停止hangout,并且kafka日志也没有rebalance group topic-test的信息,最后查Elasticsearch日志数据量还是比file文件中日志数据量多(查询的是一小时的数据)。
2、为了多方面测试,我昨天用logstash2.3.4版本也消费kafka数据写入Elasticsearch,用的是同一组kafka,不同的topic,发现最后Elasticsearch和file文件中一小时的数据是一样的,并没有出现Elasticsearch多数据的情况。
3、我想到了一个问题:有没有可能是hangout消费完kafka数据并写入Elasticsearch之后成功之后返回给kafka的时候出现错误,然后并没有返回给kafka这条日志已经消费,所以再次消费了同一条数据?请问如果hangout写入Elasticsearch之后成功之后,返回给kafka的那段代码是怎么写的?有没有可能写入Elasticsearch成功因为某方面原因返回给kafka的结果是消费失败?

@iguidao
Copy link
Author

iguidao commented Jun 29, 2018

补充:我也重启过logstash,也么有出现多数据的情况

@childe
Copy link
Owner

childe commented Jun 29, 2018

kafka读数据的代码与写数据到ES的代码, 之间并没有直接的调用关系. '写入Elasticsearch之后成功之后返回给kafka的时候出现错误' 这个应该是不存在的.

但kafka消费本身有可能出现offset没有正确记录的情况. 一时想不出还有什么其他情况了. 要不加个微信聊? 64973150 我也想搞清楚到底是为啥

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants