Skip to content

Commit

Permalink
Remove expired checks
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed Jun 22, 2023
1 parent 447ae24 commit b9abe3a
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 56 deletions.
14 changes: 0 additions & 14 deletions cpp/src/arrow/flight/integration_tests/test_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -638,20 +638,6 @@ class ExpirationTimeDoGetScenario : public Scenario {
}
}
}
// Re-reads after expired
for (const auto& endpoint : info->endpoints()) {
if (!endpoint.expiration_time.has_value()) {
continue;
}
const auto& expiration_time = endpoint.expiration_time.value();
if (expiration_time > Timestamp::clock::now()) {
std::this_thread::sleep_for(expiration_time - Timestamp::clock::now());
}
auto reader = client->DoGet(endpoint.ticket);
if (reader.ok()) {
return Status::Invalid("Expired data shouldn't be readable");
}
}
ARROW_ASSIGN_OR_RAISE(auto table, ConcatenateTables(tables));

// Build expected table
Expand Down
23 changes: 0 additions & 23 deletions go/arrow/internal/flight_integration/scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -1010,29 +1010,6 @@ func (tester *expirationTimeDoGetScenarioTester) RunClient(addr string, opts ...
return rdr.Err()
}
}
// Re-reads after expired
for _, ep := range info.Endpoint {
if ep.ExpirationTime == nil {
continue
}

expirationTime := ep.ExpirationTime.AsTime()
avalilableDuration := time.Until(expirationTime)
if avalilableDuration > 0 {
time.Sleep(avalilableDuration)
}

stream, err := client.DoGet(ctx, ep.Ticket)
if err != nil {
return err
}

rdr, err := flight.NewRecordReader(stream)
if err == nil {
rdr.Release()
return fmt.Errorf("expired data shouldn't be readable")
}
}

// Build expected records
mem := memory.DefaultAllocator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,25 +86,6 @@ public void client(BufferAllocator allocator, Location location, FlightClient cl
}
}

// No re-read after expiration
for (FlightEndpoint endpoint : info.getEndpoints()) {
Optional<Instant> maybeExpiration = endpoint.getExpirationTime();
if (!maybeExpiration.isPresent()) {
continue;
}
Instant now = Instant.now();
Instant expiration = maybeExpiration.get();
if (expiration.isAfter(now)) {
Thread.sleep(ChronoUnit.MILLIS.between(now, expiration) + 1);
}
IntegrationAssertions.assertThrows(FlightRuntimeException.class, () -> {
try (FlightStream stream = client.getStream(endpoint.getTicket())) {
while (stream.next()) {
}
}
});
}

// Check data
IntegrationAssertions.assertEquals(5, batches.size());
try (final VectorSchemaRoot root = VectorSchemaRoot.create(ExpirationTimeProducer.SCHEMA, allocator)) {
Expand Down

0 comments on commit b9abe3a

Please sign in to comment.