Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Support joins over nested fields with indexes on nested fields #381

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

andrei-ionescu
Copy link
Contributor

@andrei-ionescu andrei-ionescu commented Mar 11, 2021

What is the context for this pull request?

What changes were proposed in this pull request?

This PR adds support for hybrid scans when filtering over nested fields using indexes created over nested fields.

Given the nestedDataset dataset with schema

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- nested: struct (nullable = true)
 |    |-- field1: string (nullable = true)
 |    |-- nst: struct (nullable = true)
 |    |    |-- field1: string (nullable = true)
 |    |    |-- field2: string (nullable = true)

and the following data

+---+-----+-----------------+
|id |name |nested           |
+---+-----+-----------------+
|2  |name2|[va2, [wa2, wb2]]|
|1  |name1|[va1, [wa1, wb1]]|
+---+-----+-----------------+

The transformed plans for hybrid scans will have need to accommodate the union between the latest files arriving in the dataset having the schema above and the index schema bellow

root
 |-- __hs_nested.nested.nst.field1: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- __hs_nested.nested.nst.field2: string (nullable = true)

Files Appended

The optimized plan

Union
:- Project [id#1, name#2, __hs_nested.nested.nst.field2#3]
:  +- Filter (isnotnull(__hs_nested.nested.nst.field1#0) && (__hs_nested.nested.nst.field1#0 = wa1))
:     +- Relation[__hs_nested.nested.nst.field1#0,id#1,name#2,__hs_nested.nested.nst.field2#3] 
:        Hyperspace(Type: CI, Name: idx_nested, LogVersion: 1)
+- Project [id#100, name#101, nested#102.nst.field2]
   +- Filter (isnotnull(nested#102) && (nested#102.nst.field1 = wa1))
      +- Relation[id#1,name#2,nested#102] parquet

The Spark plan

Union
:- Project [id#1, name#2, __hs_nested.nested.nst.field2#3]
:  +- Filter (isnotnull(__hs_nested.nested.nst.field1#0) && (__hs_nested.nested.nst.field1#0 = wa1))
:     +- FileScan parquet [__hs_nested.nested.nst.field1#0,id#1,name#2,__hs_nested.nested.nst.field2#3] 
:        Batched: false, Format: Parquet, 
:        Location: InMemoryFileIndex[file:/..../spark_warehouse/indexes/idx_nested/v__=0], 
:        PartitionFilters: [], PushedFilters: [IsNotNull(__hs_nested.nested.nst.field1)], ReadSchema: 
:        struct<__hs_nested.nested.nst.field1:string,id:int,name:string,__hs_nested.nested.nst.field2:string>
+- Project [id#100, name#101, nested#102.nst.field2]
   +- Filter (isnotnull(nested#102) && (nested#102.nst.field1 = wa1))
      +- FileScan parquet [id#100,name#101,nested#102] 
         Batched: false, Format: Parquet, 
         Location: InMemoryFileIndex[file:/..../tableN2/appended_file.parquet], 
         PartitionFilters: [], PushedFilters: [IsNotNull(nested)], ReadSchema: 
         struct<id:int,name:string,nested:struct<field1:string,nst:struct<field1:string,field2:string>>>

Files Deleted

The optimized plan

Project [id#1, name#2, __hs_nested.nested.nst.field2#3]
+- Filter (isnotnull(__hs_nested.nested.nst.field1#0) && (__hs_nested.nested.nst.field1#0 = wa1))
   +- Project [__hs_nested.nested.nst.field1#0, id#1, name#2, __hs_nested.nested.nst.field2#3]
      +- Filter NOT (_data_file_id#368L = 3)
         +- Relation[__hs_nested.nested.nst.field1#0,id#1,name#2,__hs_nested.nested.nst.field2#3,_data_file_id#368L] 
            Hyperspace(Type: CI, Name: indexWithLineage, LogVersion: 1)

The Spark plan

Project [id#1, name#2, __hs_nested.nested.nst.field2#3]
+- Filter (isnotnull(__hs_nested.nested.nst.field1#0) && (__hs_nested.nested.nst.field1#0 = wa1))
   +- Project [__hs_nested.nested.nst.field1#0, id#1, name#2, __hs_nested.nested.nst.field2#3]
      +- Filter NOT (_data_file_id#368L = 3)
         +- FileScan parquet [__hs_nested.nested.nst.field1#0,id#1,name#2,__hs_nested.nested.nst.field2#3] 
            Batched: false, Format: Parquet, 
            Location: InMemoryFileIndex[file:/..../spark_warehouse/indexes/idx_nested/v__=0], 
            PartitionFilters: [], PushedFilters: [IsNotNull(__hs_nested.nested.nst.field1)], ReadSchema: 
            struct<__hs_nested.nested.nst.field1:string,id:int,name:string,__hs_nested.nested.nst.field2:string>

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  • Existing unit and integration tests for not breaking the existing functionalities.
  • Unit and integration test added for the new functionalities.

@andrei-ionescu
Copy link
Contributor Author

@sezruby, @imback82 Here is the 3rd PR that completes the support of nested fields in filters.

@sezruby sezruby self-assigned this Mar 11, 2021
@sezruby sezruby assigned andrei-ionescu and unassigned sezruby Mar 11, 2021
@andrei-ionescu andrei-ionescu force-pushed the nested_fields_filter_hybrid branch 5 times, most recently from e4e656a to fc87035 Compare March 18, 2021 18:18
@andrei-ionescu andrei-ionescu force-pushed the nested_fields_filter_hybrid branch 4 times, most recently from e082fc8 to ba8a684 Compare March 24, 2021 10:13
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[PROPOSAL]: Index nested fields
2 participants