Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracking: Ad-hoc(batch) ingestion #18583

Open
4 of 28 tasks
st1page opened this issue Sep 18, 2024 · 10 comments
Open
4 of 28 tasks

Tracking: Ad-hoc(batch) ingestion #18583

st1page opened this issue Sep 18, 2024 · 10 comments
Assignees
Milestone

Comments

@st1page
Copy link
Contributor

st1page commented Sep 18, 2024

We will enhance the ad-hoc ingestion capability in subsequent releases, with the expectation that it will eventually be possible for users to read ad-hoc data if it is persisted on an external system.

Streaming storage

for the streaming storage, the predicate pushdown with the "offset" is required

  • kafka
    • select from source
    • TVF
  • pulsar
    • select from source
    • TVF

lake

file source(object store)

  • select from source
  • TVF
    • only support S3 currently
  • optimization
    • column pruning
    • predicate pushdown

Database

Currently we only support Create table with primary key on the CDC connector. To support it, we need design and introduce new syntax that CREATE source with CDC connector. In that case, the source can only be ad-hoc queried.

Misc

@github-actions github-actions bot added this to the release-2.1 milestone Sep 18, 2024
@kwannoel
Copy link
Contributor

Hi, I will help with this issue, starting with TVFs.

@xxchan
Copy link
Member

xxchan commented Sep 27, 2024

Have we reached consensus to support TVFs? To me, their use cases are duplicated with Sources, so they seem to be unnecessary.

I’d like to see rationales and examples where they are more useful than sources before adding them

@st1page
Copy link
Contributor Author

st1page commented Sep 27, 2024

Have we reached consensus to support TVFs? To me, their use cases are duplicated with Sources, so they seem to be unnecessary.

I’d like to see rationales and examples where they are more useful than sources before adding them

@xxchan
Copy link
Member

xxchan commented Sep 27, 2024

Thanks for the explanation!

Currently we only support the CDC table and can not create a source on a external databases's table.

Makes me think whether also related with other shared source e.g., Kafka?

We can refer to the grammer of duckDB for the cases

Compared with duckDB

  • They don't have source at all. So it might be a little different
  • Their syntax contains a ATTACH, which looks like CREATE CONNECTION we might have in the future. So maybe we should design that first.
ATTACH 'dbname=postgresscanner' AS postgres_db (TYPE POSTGRES);
SELECT * FROM postgres_query('postgres_db', 'SELECT * FROM cars LIMIT 3');

@st1page
Copy link
Contributor Author

st1page commented Sep 27, 2024

Currently we only support the CDC table and can not create a source on a external databases's table.

Makes me think whether also related with other shared source e.g., Kafka?

The issue is not related to "shared" but it is beacuse the CDC source contains multiple tables' changes. Actually that is a "CONNECTION"

@st1page
Copy link
Contributor Author

st1page commented Sep 27, 2024

Compared with duckDB

  • They don't have source at all. So it might be a little different
  • Their syntax contains a ATTACH, which looks like CREATE CONNECTION we might have in the future. So maybe we should design that first.
ATTACH 'dbname=postgresscanner' AS postgres_db (TYPE POSTGRES);
SELECT * FROM postgres_query('postgres_db', 'SELECT * FROM cars LIMIT 3');

Agree with that. cc @chenzl25. do we have plan to simplify the syntax of the TVF with connection?

@chenzl25
Copy link
Contributor

chenzl25 commented Sep 27, 2024

After the connection is supported, in my mind connection can be used in TVF directly like:

  • read_parquet(s3_connection, 's3://bucket/path/xxxx.parquet')
  • read_csv(s3_connection, 's3://bucket/path/xxxx.parquet')
  • read_json(s3_connection, 's3://bucket/path/xxxx.parquet')
  • iceberg_scan(iceberg_connection, 'database_name.table_name')
  • postgres_query(pg_connection, 'select * from t')
  • mysql_quert(my_connection, 'select * from t')

Connections contain the necessary information to allow TVF to query the external system.
I think @tabVersion will support Connection in this Q.

@tabVersion
Copy link
Contributor

After the connection is supported, in my mind connection can be used in TVF directly like:

  • read_parquet(s3_connection, 's3://bucket/path/xxxx.parquet')
  • read_csv(s3_connection, 's3://bucket/path/xxxx.parquet')
  • read_json(s3_connection, 's3://bucket/path/xxxx.parquet')
  • iceberg_scan(iceberg_connection, 'database_name.table_name')
  • postgres_query(pg_connection, 'select * from t')
  • mysql_quert(my_connection, 'select * from t')

Connections contain the necessary information to allow TVF to query the external system. I think @tabVersion will support Connection in this Q.

Yes for s3_connection and iceberg_connection.
I still have some concerns about -cdc connectors. The CDC source is a CONNECTION in concept, it contains nearly the same info as CONNECTION.

CREATE SOURCE pg_mydb WITH (
    connector = 'postgres-cdc',
    hostname = '127.0.0.1',
    port = '8306',
    username = 'root',
    password = '123456',
    database.name = 'mydb',
    slot.name = 'mydb_slot',
    debezium.schema.history.internal.skip.unparseable.ddl = 'true'
);

@tabVersion
Copy link
Contributor

@st1page Additionally, One case is for the Ad-hoc ingestion from databases. Currently we only support the CDC table and can not create a source on a external databases's table. So only TVF is clear defined method to do ad hoc ingest from Databases. We can refer to the grammer of duckDB for the cases
duckdb.org/docs/extensions/postgres.html#the-postgres_query-table-function
duckdb.org/docs/extensions/mysql#the-mysql_query-table-function

The main idea for CONNECTION is minimizing the user's effort when creating new sources/tables. It stores some props and applies to all sources/sinks/tables created from the CONNECTION.

Things get a little different here because MQs have relatively more loose ACL control than file systems, eg. S3. So I'd propose we must define BUCKET in fs CONNECTIONs.

In my prospective, we can draw a line here.

  • Adhoc ingest from MQs, eg. kafka. -> select * from source
  • Adhoc ingets from FS, eg. iceberg and s3 -> TVF

@wcy-fdu
Copy link
Contributor

wcy-fdu commented Oct 14, 2024

So I'd propose we must define BUCKET in fs CONNECTIONs.

+1 for this, we need bucket name to validate RisingWave can read from specific bucket or data directory. Here the bucket in fs_connection is like db_name in db_connection.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants