From 801930a8977f2460b46626702f1374351cef8139 Mon Sep 17 00:00:00 2001
From: Tzach Shabtay <tshabtay@clearstreet.io>
Date: Thu, 22 Oct 2020 23:28:14 -0400
Subject: [PATCH] added pagination to message query

This will allow the client to refresh the view during long query and also throttle it to avoid starving other clients
---
 .../kafka/messages/single_topic_input.tsx     | 30 +++++++++++++++++--
 1 file changed, 27 insertions(+), 3 deletions(-)

diff --git a/src/client/kafka/messages/single_topic_input.tsx b/src/client/kafka/messages/single_topic_input.tsx
index d22d5ed..dd88973 100644
--- a/src/client/kafka/messages/single_topic_input.tsx
+++ b/src/client/kafka/messages/single_topic_input.tsx
@@ -125,9 +125,33 @@ export class SingleTopicInput extends React.Component<Props, State> {
         this.setState({ loadingMessages: true })
         this.props.onDataFetchStarted()
         const topic = this.props.topic
-        const response = await fetch(`/api/messages/${topic}/${this.state.partition}?limit=${this.state.limit}&offset=${this.state.offset}&search=${this.props.search}&timeout=${timeout}`)
-        const data = await response.json()
-        this.props.onDataFetched(data)
+        let cursor = this.state.offset
+        const end = cursor + this.state.limit
+        let out: any = null
+        let limit = this.state.limit
+        if (limit > 1000) {
+            limit = 1000
+        }
+        while (cursor < end) {
+            const response = await fetch(`/api/messages/${topic}/${this.state.partition}?limit=${limit}&offset=${cursor}&search=${this.props.search}&timeout=${timeout}`)
+            cursor += limit
+            const data = await response.json()
+            if (!out) {
+                out = data
+            } else if (data.messages) {
+                out.messages = [...out.messages, ...data.messages]
+            }
+            if (data.error) {
+                out.error = data.error
+            }
+            if (data.hasTimeout) {
+                out.hasTimeout = data.hasTimeout
+            }
+            this.props.onDataFetched(out)
+            if (out.error || out.hasTimeout) {
+                break
+            }
+        }
         this.setState({loadingMessages: false})
     }