Skip to content

Conversation

@zhuqi-lucas
Copy link
Collaborator

@zhuqi-lucas zhuqi-lucas commented Jan 30, 2026

Upstream:
apache#19924

PR Description

Summary

This PR introduces a high-performance streaming architecture for reading JSON array format files ([{...}, {...}, ...]) in DataFusion. The new design processes arbitrarily large JSON array files with constant memory usage (~32MB) instead of loading the entire file into memory.

Motivation

The previous implementation had critical limitations:

  1. Memory explosion: Reading a 33GB JSON array file required 100GB+ memory (full load + serde_json parsing + intermediate strings)
  2. Triple parsing overhead: Content was parsed 3 times (serde_json → String → arrow-json)
  3. No streaming support: Couldn't process files larger than available memory

Solution Architecture

The new design uses a streaming character substitution approach that converts JSON array format to NDJSON on-the-fly:

┌─────────────────────────────────────────────────────────────────────────┐
│                         S3 / Object Store (async)                       │
│                    (Large JSON Array File, e.g. 33GB)                   │
└─────────────────────────────────────────────────────────────────────────┘
                                │
                                ▼ async stream (Bytes chunks)
┌─────────────────────────────────────────────────────────────────────────┐
│                      Async Task (tokio runtime)                         │
│         Reads chunks from object store, sends to sync channel           │
└─────────────────────────────────────────────────────────────────────────┘
                                │
                                ▼ std::sync::mpsc::sync_channel<Bytes>
                                │   (bounded, ~16MB buffer)
                                ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                   Blocking Task (spawn_blocking)                        │
│  ┌──────────────┐   ┌────────────────────────┐   ┌──────────────────┐  │
│  │ChannelReader │ → │JsonArrayToNdjsonReader │ → │ Arrow JsonReader │  │
│  │   (Read)     │   │  [{},...] → {}\n{}     │   │  (RecordBatch)   │  │
│  └──────────────┘   └────────────────────────┘   └──────────────────┘  │
└─────────────────────────────────────────────────────────────────────────┘
                                │
                                ▼ tokio::sync::mpsc::channel<RecordBatch>
┌─────────────────────────────────────────────────────────────────────────┐
│                      ReceiverStream (async)                             │
│                   → DataFusion execution engine                         │
└─────────────────────────────────────────────────────────────────────────┘

Key Components

  1. JsonArrayToNdjsonReader: A streaming Read + BufRead adapter that performs character-level transformation:

    • Skips leading [ and trailing ]
    • Converts top-level , to \n
    • Tracks nesting depth to preserve nested arrays/objects
    • Handles JSON strings correctly (including escaped quotes)
  2. ChannelReader: Bridges async-to-sync boundary by receiving Bytes chunks from a channel and implementing std::io::Read

  3. JsonArrayStream: Custom stream wrapper that holds SpawnedTask handles for proper cancel-safety

Memory Budget (~32MB total)

Component Memory
sync_channel buffer (128 chunks × ~128KB) ~16MB
JsonArrayToNdjsonReader (2 × 2MB buffers) ~4MB
Arrow JsonReader internal ~8MB
Miscellaneous ~4MB

API Changes

  • Renamed format_array option to newline_delimited (inverted semantics for clarity)
    • newline_delimited: true (default) → NDJSON format
    • newline_delimited: false → JSON array format
  • Renamed NdJsonReadOptions to JsonReadOptions (with deprecation alias)
  • Removed unused compression_level field from JsonOptions

Testing

  • Added comprehensive unit tests for JsonArrayToNdjsonReader including:
    • Nested objects and arrays
    • Escaped quotes in strings
    • Buffer boundary handling
    • Empty arrays
    • Trailing content validation
  • Added integration tests for JsonOpener with JSON array format
  • Updated existing tests to use new API

Limitations

  • JSON array format does not support range-based file scanning (returns clear error)
  • Validation of JSON structure is best-effort (malformed JSON may produce partial results)

Copilot AI review requested due to automatic review settings January 30, 2026 09:19
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes a buffer boundary bug in JsonArrayToNdjsonReader where data could be lost when processing large JSON arrays that exceed the internal buffer size. The fix changes the implementation to use BufReader's fill_buf()/consume() pattern instead of Read::read(), which allows precise control over byte consumption and prevents data loss.

Changes:

  • Wrapped the inner reader in BufReader<R> for precise byte consumption control
  • Refactored fill_internal_buffer() to use fill_buf()/consume() pattern instead of read()
  • Added comprehensive tests to verify the fix with data larger than the 64KB buffer size

Reviewed changes

Copilot reviewed 2 out of 3 changed files in this pull request and generated 2 comments.

File Description
datafusion/datasource-json/src/utils.rs Core fix: Changed JsonArrayToNdjsonReader to use BufReader wrapper and refactored buffer filling logic to prevent data loss at buffer boundaries; added comprehensive tests
datafusion/datasource-json/Cargo.toml Added serde_json dependency for test validation
Cargo.lock Updated lock file with new dependency

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@zhuqi-lucas zhuqi-lucas changed the title Fix fix_buffer_boundary_bug Redesign json array streaming for datafusion Jan 31, 2026
@zhuqi-lucas zhuqi-lucas force-pushed the fix_buffer_boundary_bug branch from 365c8fc to 8d4fcd2 Compare February 1, 2026 07:15
@zhuqi-lucas zhuqi-lucas force-pushed the fix_buffer_boundary_bug branch from 8d4fcd2 to cc9e0ed Compare February 1, 2026 07:30
@zhuqi-lucas zhuqi-lucas force-pushed the fix_buffer_boundary_bug branch from bb29301 to 981ec98 Compare February 2, 2026 03:39
@zhuqi-lucas zhuqi-lucas merged commit 9522508 into branch-51 Feb 3, 2026
58 of 59 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant