Skip to content
This repository has been archived by the owner on Jun 10, 2022. It is now read-only.

Commit

Permalink
fixed #123
Browse files Browse the repository at this point in the history
  • Loading branch information
nmred committed Jun 23, 2017
1 parent 0924e90 commit 285b6d7
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 13 deletions.
34 changes: 34 additions & 0 deletions src/Kafka/Consumer/Assignment.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class Assignment

private $offsets = array();

private $lastOffsets = array();

private $fetchOffsets = array();

private $consumerOffsets = array();
Expand Down Expand Up @@ -166,6 +168,22 @@ public function getOffsets()
return $this->offsets;
}

// }}}
// {{{ public function setLastOffsets()

public function setLastOffsets($offsets)
{
$this->lastOffsets = $offsets;
}

// }}}
// {{{ public function getOffsets()

public function getLastOffsets()
{
return $this->lastOffsets;
}

// }}}
// {{{ public function setFetchOffsets()

Expand Down Expand Up @@ -211,6 +229,9 @@ public function setConsumerOffset($topic, $part, $offset)

public function getConsumerOffset($topic, $part)
{
if (!isset($this->consumerOffsets[$topic][$part])) {
return false;
}
return $this->consumerOffsets[$topic][$part];
}

Expand Down Expand Up @@ -262,6 +283,19 @@ public function setPrecommitOffset($topic, $part, $offset)
$this->precommitOffsets[$topic][$part] = $offset;
}

// }}}
// {{{ public function clearOffset()

public function clearOffset()
{
$this->offsets = array();
$this->lastOffsets = array();
$this->fetchOffsets = array();
$this->consumerOffsets = array();
$this->commitOffsets = array();
$this->precommitOffsets = array();
}

// }}}
// }}}
}
57 changes: 46 additions & 11 deletions src/Kafka/Consumer/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class Process

protected $isRunning = true;

protected $messages = array();

// }}}
// {{{ functions
// {{{ public function __construct()
Expand Down Expand Up @@ -442,7 +444,6 @@ protected function offset()
if (!$connect) {
return;
}
$resetOffset = \Kafka\ConsumerConfig::getInstance()->getOffsetReset();
$data = array();
foreach ($topicList as $topic) {
$item = array(
Expand All @@ -453,7 +454,7 @@ protected function offset()
$item['partitions'][] = array(
'partition_id' => $partId,
'offset' => 1,
'time' => ($resetOffset == 'latest') ? -1 : -2,
'time' => -1,
);
$data[] = $item;
}
Expand Down Expand Up @@ -481,17 +482,20 @@ public function succOffset($result, $fd)
//$this->debug($msg);

$offsets = \Kafka\Consumer\Assignment::getInstance()->getOffsets();
$lastOffsets = \Kafka\Consumer\Assignment::getInstance()->getLastOffsets();
foreach ($result as $topic) {
foreach ($topic['partitions'] as $part) {
if ($part['errorCode'] != 0) {
$this->stateConvert($part['errorCode']);
break 2;
}

$offsets[$topic['topicName']][$part['partition']] = $part['offsets'][0];
$offsets[$topic['topicName']][$part['partition']] = end($part['offsets']);
$lastOffsets[$topic['topicName']][$part['partition']] = $part['offsets'][0];
}
}
\Kafka\Consumer\Assignment::getInstance()->setOffsets($offsets);
\Kafka\Consumer\Assignment::getInstance()->setLastOffsets($lastOffsets);
$this->state->succRun(\Kafka\Consumer\State::REQUEST_OFFSET, $fd);
}

Expand Down Expand Up @@ -554,8 +558,17 @@ public function succFetchOffset($result)
$assign->setFetchOffsets($offsets);

$consumerOffsets = $assign->getConsumerOffsets();
$lastOffsets = $assign->getLastOffsets();
if (empty($consumerOffsets)) {
$assign->setConsumerOffsets($assign->getFetchOffsets());
$consumerOffsets = $assign->getFetchOffsets();
foreach ($consumerOffsets as $topic => $value) {
foreach ($value as $partId => $offset) {
if (isset($lastOffsets[$topic][$partId]) && $lastOffsets[$topic][$partId] > $offset) {
$consumerOffsets[$topic][$partId] = $offset + 1;
}
}
}
$assign->setConsumerOffsets($consumerOffsets);
$assign->setCommitOffsets($assign->getFetchOffsets());
}
$this->state->succRun(\Kafka\Consumer\State::REQUEST_FETCH_OFFSET);
Expand All @@ -566,6 +579,7 @@ public function succFetchOffset($result)

protected function fetch()
{
$this->messages = array();
$context = array();
$broker = \Kafka\Broker::getInstance();
$topics = \Kafka\Consumer\Assignment::getInstance()->getTopics();
Expand Down Expand Up @@ -624,10 +638,14 @@ public function succFetch($result, $fd)
}

$offset = $assign->getConsumerOffset($topic['topicName'], $part['partition']);
if ($offset === false) {
return; // current is rejoin....
}
foreach ($part['messages'] as $message) {
if ($this->consumer != null) {
call_user_func($this->consumer, $topic['topicName'], $part['partition'], $message);
}
$this->messages[$topic['topicName']][$part['partition']][] = $message;
//if ($this->consumer != null) {
// call_user_func($this->consumer, $topic['topicName'], $part['partition'], $message);
//}
$offset = $message['offset'];
}

Expand Down Expand Up @@ -694,10 +712,21 @@ public function succCommit($result)
foreach ($topic['partitions'] as $part) {
if ($part['errorCode'] != 0) {
$this->stateConvert($part['errorCode']);
break 2;
return; // not call user consumer function
}
}
}

foreach ($this->messages as $topic => $value) {
foreach ($value as $part => $messages) {
foreach ($messages as $message) {
if ($this->consumer != null) {
call_user_func($this->consumer, $topic, $part, $message);
}
}
}
}
$this->messages = array();
}

// }}}
Expand Down Expand Up @@ -725,23 +754,29 @@ protected function stateConvert($errorCode, $context = null)
\Kafka\Protocol::UNKNOWN_MEMBER_ID,
);

$assign = \Kafka\Consumer\Assignment::getInstance();
if (in_array($errorCode, $recoverCodes)) {
$this->state->recover();
$assign->clearOffset();
return false;
}

if (in_array($errorCode, $rejoinCodes)) {
if ($errorCode == \Kafka\Protocol::UNKNOWN_MEMBER_ID) {
$assign = \Kafka\Consumer\Assignment::getInstance();
$assign->setMemberId('');
}
$assign->clearOffset();
$this->state->rejoin();
return false;
}

if (\Kafka\Protocol::OFFSET_OUT_OF_RANGE == $errorCode) {
$assign = \Kafka\Consumer\Assignment::getInstance();
$offsets = $assign->getOffsets();
$resetOffset = \Kafka\ConsumerConfig::getInstance()->getOffsetReset();
if ($resetOffset == 'latest') {
$offsets = $assign->getLastOffsets();
} else {
$offsets = $assign->getOffsets();
}
list($topic, $partId) = $context;
if (isset($offsets[$topic][$partId])) {
$assign->setConsumerOffset($topic, $partId, $offsets[$topic][$partId]);
Expand Down
36 changes: 34 additions & 2 deletions src/Kafka/Consumer/State.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class State
'interval' => 2000,
),
self::REQUEST_COMMIT_OFFSET => array(
'interval' => 2000,
'norepeat' => true,
),
);

Expand Down Expand Up @@ -130,6 +130,9 @@ public function init()
public function start()
{
foreach ($this->requests as $request => $option) {
if (isset($option['norepeat']) && $option['norepeat']) {
continue;
}
$interval = isset($option['interval']) ? $option['interval'] : 200;
\Amp\repeat(function ($watcherId) use ($request, $option) {
if ($this->checkRun($request) && $option['func'] != null) {
Expand Down Expand Up @@ -177,6 +180,16 @@ public function succRun($key, $context = null)
$this->callStatus[$key]['status'] = (self::STATUS_LOOP | self::STATUS_FINISH);
break;
case self::REQUEST_OFFSET:
if (!isset($this->callStatus[$key]['context'])) {
$this->callStatus[$key]['status'] = (self::STATUS_LOOP | self::STATUS_FINISH);
break;
}
unset($this->callStatus[$key]['context'][$context]);
$contextStatus = $this->callStatus[$key]['context'];
if (empty($contextStatus)) {
$this->callStatus[$key]['status'] = (self::STATUS_LOOP | self::STATUS_FINISH);
}
break;
case self::REQUEST_FETCH:
if (!isset($this->callStatus[$key]['context'])) {
$this->callStatus[$key]['status'] = (self::STATUS_LOOP | self::STATUS_FINISH);
Expand All @@ -186,6 +199,7 @@ public function succRun($key, $context = null)
$contextStatus = $this->callStatus[$key]['context'];
if (empty($contextStatus)) {
$this->callStatus[$key]['status'] = (self::STATUS_LOOP | self::STATUS_FINISH);
call_user_func($this->requests[self::REQUEST_COMMIT_OFFSET]['func']);
}
break;
}
Expand Down Expand Up @@ -350,6 +364,17 @@ protected function checkRun($key)
return false;
case self::REQUEST_HEARTGROUP:
case self::REQUEST_OFFSET:
if (($status & self::STATUS_PROCESS) == self::STATUS_PROCESS) {
return false;
}
$syncStatus = $this->callStatus[self::REQUEST_SYNCGROUP]['status'];
if (($syncStatus & self::STATUS_FINISH) != self::STATUS_FINISH) {
return false;
}
if (($status & self::STATUS_LOOP) == self::STATUS_LOOP) {
return true;
}
return false;
case self::REQUEST_FETCH_OFFSET:
if (($status & self::STATUS_PROCESS) == self::STATUS_PROCESS) {
return false;
Expand All @@ -358,19 +383,26 @@ protected function checkRun($key)
if (($syncStatus & self::STATUS_FINISH) != self::STATUS_FINISH) {
return false;
}
$offsetStatus = $this->callStatus[self::REQUEST_OFFSET]['status'];
if (($offsetStatus & self::STATUS_FINISH) != self::STATUS_FINISH) {
return false;
}
if (($status & self::STATUS_LOOP) == self::STATUS_LOOP) {
return true;
}
return false;
case self::REQUEST_FETCH:
case self::REQUEST_COMMIT_OFFSET:
if (($status & self::STATUS_PROCESS) == self::STATUS_PROCESS) {
return false;
}
$fetchOffsetStatus = $this->callStatus[self::REQUEST_FETCH_OFFSET]['status'];
if (($fetchOffsetStatus & self::STATUS_FINISH) != self::STATUS_FINISH) {
return false;
}
$commitOffsetStatus = $this->callStatus[self::REQUEST_COMMIT_OFFSET]['status'];
if (($commitOffsetStatus & self::STATUS_PROCESS) == self::STATUS_PROCESS) {
return false;
}
if (($status & self::STATUS_LOOP) == self::STATUS_LOOP) {
return true;
}
Expand Down

0 comments on commit 285b6d7

Please sign in to comment.