Skip to content

Flink sql 执行报错 #245

@gbguanbo

Description

@gbguanbo

from pyalink.alink import *
#ExecutionEnvironment、BatchTableEnvironment、StreamExecutionEnvironment 和 StreamTableEnvironment。
benv, btenv, senv, stenv = useRemoteEnv("localhost", 8081, 1)

jars = "file:///home/guanbo/flink-1.13.3/lib/flink-csv-1.13.3.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/flink-dist_2.11-1.13.3.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/flink-json-1.13.3.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/flink-shaded-zookeeper-3.4.14.jar"
+"file:///home/guanbo/flink-1.13.3/lib/flink-table_2.11-1.13.3.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/flink-table-blink_2.11-1.13.3.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/alink_core_flink-1.13_2.11-1.6.2.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/alink_python_flink-1.13_2.11-1.6.2.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/log4j-1.2-api-2.12.1.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/log4j-api-2.12.1.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/log4j-core-2.12.1.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/log4j-slf4j-impl-2.12.1.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/flink-connector-base-1.13.3.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/flink-streaming-java_2.11-1.13.3.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/flink-table-api-java-1.13.3.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/flink-table-api-java-bridge_2.11-1.13.3.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/flink-table-common-1.13.3.jar"
stenv.get_config().get_configuration().set_string("pipeline.jars", jars)
source_ddl = """
CREATE TABLE source_table(
id STRING,
price DOUBLE,
ts BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'test-topic',
'properties.bootstrap.servers' = '172.20.8.230:9092',
'properties.group.id' = 'test_3',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""

sink_ddl = """
CREATE TABLE sink_table(
id STRING
) WITH (
'connector' = 'kafka',
'topic' = 'sink_topic',
'properties.bootstrap.servers' = '172.20.8.230:9092',
'format' = 'json'
)
"""

stenv.execute_sql(source_ddl)
stenv.execute_sql(sink_ddl)

stenv.sql_query("SELECT id FROM source_table")
.execute_insert("sink_table")
StreamOperator.execute()

Py4JJavaError: An error occurred while calling o4729.sqlQuery.
: org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed.
at org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:147)
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
at org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:151)
at org.apache.flink.table.planner.ParserImpl.parse(ParserImpl.java:92)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:45)
at org.apache.flink.table.catalog.DatabaseCalciteSchema.convertCatalogTable(DatabaseCalciteSchema.java:176)
at org.apache.flink.table.catalog.DatabaseCalciteSchema.convertTable(DatabaseCalciteSchema.java:114)
at org.apache.flink.table.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:95)
at java.util.Optional.map(Optional.java:215)
at org.apache.flink.table.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:83)
at org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143)
at org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
at org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112)
at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:184)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3085)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3070)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3335)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
at org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:142)
... 15 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The following properties are requested:
connector=kafka
format=json
properties.bootstrap.servers=172.20.8.230:9092
properties.group.id=test_3
scan.startup.mode=latest-offset
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=id
schema.1.data-type=DOUBLE
schema.1.name=price
schema.2.data-type=BIGINT
schema.2.name=ts
topic=test-topic

The following factories have been considered:
com.alibaba.flink.ml.operator.ops.table.MLTableSourceFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:300)
at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:178)
at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:139)
at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:93)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:41)
... 41 more

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions