Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public Result<Boolean> connectDataSource(@Parameter(hidden = true) @RequestAttri
*
* @param loginUser login user
* @param id data source id
* @return connect result code
* @return A Result object containing true if the connection is successful, false otherwise.
*/
@Operation(summary = "connectionTest", description = "CONNECT_DATA_SOURCE_TEST_NOTES")
@Parameters({
Expand All @@ -236,7 +236,7 @@ public Result<Boolean> connectDataSource(@Parameter(hidden = true) @RequestAttri
@ApiException(CONNECTION_TEST_FAILURE)
public Result<Boolean> connectionTest(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable("id") int id) {
dataSourceService.connectionTest(id);
dataSourceService.connectionTest(loginUser, id);
return Result.success(true);
}

Expand Down Expand Up @@ -337,6 +337,34 @@ public Result<Object> getKerberosStartupState(@Parameter(hidden = true) @Request
return success(Status.SUCCESS.getMsg(), CommonUtils.getKerberosStartupState());
}

/**
* Retrieves the list of databases available in a specific data source.
*
* @param loginUser the current logged-in user (injected from session)
* @param datasourceId the unique identifier of the data source
* @return a list of database names/options accessible to the user
*/
@Operation(summary = "databases", description = "GET_DATASOURCE_DATABASE_NOTES")
@Parameters({
@Parameter(name = "datasourceId", description = "DATA_SOURCE_ID", required = true, schema = @Schema(implementation = int.class, example = "1"))
})
@GetMapping(value = "/databases")
@ResponseStatus(HttpStatus.OK)
@ApiException(GET_DATASOURCE_DATABASES_ERROR)
public Result<Object> getDatabases(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("datasourceId") Integer datasourceId) {
List<ParamsOptions> options = dataSourceService.getDatabases(loginUser, datasourceId);
return Result.success(options);
}

/**
* Retrieves the list of tables within a specific database of a data source.
*
* @param loginUser the current logged-in user (injected from session)
* @param datasourceId the unique identifier of the data source
* @param database the name of the database to query
* @return a list of table names/options accessible to the user
*/
@Operation(summary = "tables", description = "GET_DATASOURCE_TABLES_NOTES")
@Parameters({
@Parameter(name = "datasourceId", description = "DATA_SOURCE_ID", required = true, schema = @Schema(implementation = int.class, example = "1")),
Expand All @@ -345,37 +373,37 @@ public Result<Object> getKerberosStartupState(@Parameter(hidden = true) @Request
@GetMapping(value = "/tables")
@ResponseStatus(HttpStatus.OK)
@ApiException(GET_DATASOURCE_TABLES_ERROR)
public Result<Object> getTables(@RequestParam("datasourceId") Integer datasourceId,
@RequestParam(value = "database") String database) {
List<ParamsOptions> options = dataSourceService.getTables(datasourceId, database);
public Result<Object> getTables(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("datasourceId") Integer datasourceId,
@RequestParam("database") String database) {
List<ParamsOptions> options = dataSourceService.getTables(loginUser, datasourceId, database);
return Result.success(options);
}

/**
* Retrieves the column details (schema) for a specific table.
*
* @param loginUser the current logged-in user (injected from session)
* @param datasourceId the unique identifier of the data source
* @param database the name of the database containing the table
* @param tableName the name of the table to query columns for
* @return a list of column definitions (name, type, etc.) for the specified table
*/
@Operation(summary = "tableColumns", description = "GET_DATASOURCE_TABLE_COLUMNS_NOTES")
@Parameters({
@Parameter(name = "datasourceId", description = "DATA_SOURCE_ID", required = true, schema = @Schema(implementation = int.class, example = "1")),
@Parameter(name = "tableName", description = "TABLE_NAME", required = true, schema = @Schema(implementation = String.class, example = "test")),
@Parameter(name = "database", description = "DATABASE", required = true, schema = @Schema(implementation = String.class, example = "test"))
@Parameter(name = "database", description = "DATABASE", required = true, schema = @Schema(implementation = String.class, example = "test")),
@Parameter(name = "tableName", description = "TABLE_NAME", required = true, schema = @Schema(implementation = String.class, example = "test"))
})
@GetMapping(value = "/tableColumns")
@ResponseStatus(HttpStatus.OK)
@ApiException(GET_DATASOURCE_TABLE_COLUMNS_ERROR)
public Result<Object> getTableColumns(@RequestParam("datasourceId") Integer datasourceId,
@RequestParam("tableName") String tableName,
@RequestParam(value = "database") String database) {
List<ParamsOptions> options = dataSourceService.getTableColumns(datasourceId, database, tableName);
public Result<Object> getTableColumns(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("datasourceId") Integer datasourceId,
@RequestParam("database") String database,
@RequestParam("tableName") String tableName) {
List<ParamsOptions> options = dataSourceService.getTableColumns(loginUser, datasourceId, database, tableName);
return Result.success(options);
}

@Operation(summary = "databases", description = "GET_DATASOURCE_DATABASE_NOTES")
@Parameters({
@Parameter(name = "datasourceId", description = "DATA_SOURCE_ID", required = true, schema = @Schema(implementation = int.class, example = "1"))
})
@GetMapping(value = "/databases")
@ResponseStatus(HttpStatus.OK)
@ApiException(GET_DATASOURCE_DATABASES_ERROR)
public Result<Object> getDatabases(@RequestParam("datasourceId") Integer datasourceId) {
List<ParamsOptions> options = dataSourceService.getDatabases(datasourceId);
return Result.success(options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.api.service;

import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.User;
Expand Down Expand Up @@ -96,12 +97,13 @@ public interface DataSourceService {
void checkConnection(DbType type, ConnectionParam parameter);

/**
* test connection
* Tests the connectivity of a specific data source.
*
* @param id datasource id
* @return connect result code
* @param loginUser the current logged-in user (required for permission check)
* @param id the unique identifier of the data source to test
* @throws ServiceException if the resource doesn't exist, permission is denied, or connection fails
*/
void connectionTest(int id);
void connectionTest(User loginUser, int id);

/**
* delete datasource
Expand Down Expand Up @@ -131,26 +133,36 @@ public interface DataSourceService {
List<DataSource> authedDatasource(User loginUser, Integer userId);

/**
* get tables
* @param datasourceId
* @param database
* @return
* Retrieves the list of databases (or schemas) available in a specific data source.
*
* @param loginUser current logged-in user
* @param datasourceId ID of the data source
* @return list of {@link ParamsOptions} representing database/schema names
* @throws ServiceException if permission denied, resource not found, or connection fails
*/
List<ParamsOptions> getTables(Integer datasourceId, String database);
List<ParamsOptions> getDatabases(User loginUser, Integer datasourceId);

/**
* get table columns
* @param datasourceId
* @param database
* @param tableName
* @return
* Retrieves the list of tables from a specific database within a data source.
*
* @param loginUser the current logged-in user (required for permission check)
* @param datasourceId the unique identifier of the data source
* @param database the specific database/schema name to query (nullable for some DB types like SQLite)
* @return a list of {@link ParamsOptions} containing table names and optional metadata (e.g., comments)
* @throws ServiceException if permission denied, resource not found, or connection fails
*/
List<ParamsOptions> getTableColumns(Integer datasourceId, String database, String tableName);
List<ParamsOptions> getTables(User loginUser, Integer datasourceId, String database);

/**
* get databases
* @param datasourceId
* @return
* Retrieves the list of columns for a specific table in a data source.
*
* @param loginUser current logged-in user
* @param datasourceId ID of the data source
* @param database database/schema name
* @param tableName table name to query
* @return list of {@link ParamsOptions} representing column names and types
* @throws ServiceException if permission denied, resource not found, or connection fails
*/
List<ParamsOptions> getDatabases(Integer datasourceId);
List<ParamsOptions> getTableColumns(User loginUser, Integer datasourceId, String database, String tableName);

}
Original file line number Diff line number Diff line change
Expand Up @@ -330,18 +330,19 @@
throw new ServiceException(Status.CONNECTION_TEST_FAILURE);
}

/**
* test connection
*
* @param id datasource id
* @return connect result code
*/
@Override
public void connectionTest(int id) {
public void connectionTest(User loginUser, int id) {
DataSource dataSource = dataSourceMapper.selectById(id);

if (dataSource == null) {
throw new ServiceException(Status.RESOURCE_NOT_EXIST);
}

if (!canOperatorPermissions(loginUser, new Object[]{id}, AuthorizationType.DATASOURCE,
ApiFuncIdentificationConstant.DATASOURCE)) {
throw new ServiceException(Status.USER_NO_OPERATION_PERM);
}

checkConnection(dataSource.getType(),
DataSourceUtils.buildConnectionParams(dataSource.getType(), dataSource.getConnectionParams()));
}
Expand Down Expand Up @@ -417,9 +418,72 @@
}

@Override
public List<ParamsOptions> getTables(Integer datasourceId, String database) {
public List<ParamsOptions> getDatabases(User loginUser, Integer datasourceId) {

DataSource dataSource = dataSourceMapper.selectById(datasourceId);

if (dataSource == null) {
throw new ServiceException(Status.QUERY_DATASOURCE_ERROR);
}

if (!canOperatorPermissions(loginUser, new Object[]{datasourceId}, AuthorizationType.DATASOURCE,
ApiFuncIdentificationConstant.DATASOURCE)) {
throw new ServiceException(Status.USER_NO_OPERATION_PERM);
}

List<String> tableList;
BaseConnectionParam connectionParam =
(BaseConnectionParam) DataSourceUtils.buildConnectionParams(
dataSource.getType(),
dataSource.getConnectionParams());

if (null == connectionParam) {
throw new ServiceException(Status.DATASOURCE_CONNECT_FAILED);
}

Connection connection =
DataSourceUtils.getConnection(dataSource.getType(), connectionParam);
ResultSet rs = null;

try {
if (null == connection) {
throw new ServiceException(Status.DATASOURCE_CONNECT_FAILED);
}
if (dataSource.getType() == DbType.POSTGRESQL) {
rs = connection.createStatement().executeQuery(Constants.DATABASES_QUERY_PG);
} else {
rs = connection.createStatement().executeQuery(Constants.DATABASES_QUERY);
}
tableList = new ArrayList<>();
while (rs.next()) {
String name = rs.getString(1);
tableList.add(name);
}
} catch (Exception e) {
log.error("Get databases error, datasourceId:{}.", datasourceId, e);
throw new ServiceException(Status.GET_DATASOURCE_TABLES_ERROR);
} finally {
closeResult(rs);
releaseConnection(connection);
}

List<ParamsOptions> options = getParamsOptions(tableList);
return options;
}

@Override
public List<ParamsOptions> getTables(User loginUser, Integer datasourceId, String database) {
DataSource dataSource = dataSourceMapper.selectById(datasourceId);

if (dataSource == null) {
throw new ServiceException(Status.QUERY_DATASOURCE_ERROR);
}

if (!canOperatorPermissions(loginUser, new Object[]{datasourceId}, AuthorizationType.DATASOURCE,
ApiFuncIdentificationConstant.DATASOURCE)) {
throw new ServiceException(Status.USER_NO_OPERATION_PERM);
}

List<String> tableList;
BaseConnectionParam connectionParam =
(BaseConnectionParam) DataSourceUtils.buildConnectionParams(
Expand Down Expand Up @@ -477,8 +541,19 @@
}

@Override
public List<ParamsOptions> getTableColumns(Integer datasourceId, String database, String tableName) {
public List<ParamsOptions> getTableColumns(User loginUser, Integer datasourceId, String database,
String tableName) {
DataSource dataSource = dataSourceMapper.selectById(datasourceId);

if (dataSource == null) {
throw new ServiceException(Status.QUERY_DATASOURCE_ERROR);
}

if (!canOperatorPermissions(loginUser, new Object[]{datasourceId}, AuthorizationType.DATASOURCE,
ApiFuncIdentificationConstant.DATASOURCE)) {

Check warning

Code scanning / CodeQL

Potential database resource leak Warning

This Statement is not always closed on method exit.
throw new ServiceException(Status.USER_NO_OPERATION_PERM);
}

Check warning

Code scanning / CodeQL

Potential database resource leak Warning

This Statement is not always closed on method exit.

BaseConnectionParam connectionParam =
(BaseConnectionParam) DataSourceUtils.buildConnectionParams(
dataSource.getType(),
Expand Down Expand Up @@ -522,55 +597,6 @@
return options;
}

@Override
public List<ParamsOptions> getDatabases(Integer datasourceId) {

DataSource dataSource = dataSourceMapper.selectById(datasourceId);

if (dataSource == null) {
throw new ServiceException(Status.QUERY_DATASOURCE_ERROR);
}

List<String> tableList;
BaseConnectionParam connectionParam =
(BaseConnectionParam) DataSourceUtils.buildConnectionParams(
dataSource.getType(),
dataSource.getConnectionParams());

if (null == connectionParam) {
throw new ServiceException(Status.DATASOURCE_CONNECT_FAILED);
}

Connection connection =
DataSourceUtils.getConnection(dataSource.getType(), connectionParam);
ResultSet rs = null;

try {
if (null == connection) {
throw new ServiceException(Status.DATASOURCE_CONNECT_FAILED);
}
if (dataSource.getType() == DbType.POSTGRESQL) {
rs = connection.createStatement().executeQuery(Constants.DATABASES_QUERY_PG);
} else {
rs = connection.createStatement().executeQuery(Constants.DATABASES_QUERY);
}
tableList = new ArrayList<>();
while (rs.next()) {
String name = rs.getString(1);
tableList.add(name);
}
} catch (Exception e) {
log.error("Get databases error, datasourceId:{}.", datasourceId, e);
throw new ServiceException(Status.GET_DATASOURCE_TABLES_ERROR);
} finally {
closeResult(rs);
releaseConnection(connection);
}

List<ParamsOptions> options = getParamsOptions(tableList);
return options;
}

private List<ParamsOptions> getParamsOptions(List<String> columnList) {
List<ParamsOptions> options = null;
if (CollectionUtils.isNotEmpty(columnList)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ DELETE_DATA_SOURCE_NOTES=delete data source
VERIFY_DATA_SOURCE_NOTES=verify data source
UNAUTHORIZED_DATA_SOURCE_NOTES=unauthorized data source
AUTHORIZED_DATA_SOURCE_NOTES=authorized data source
GET_DATASOURCE_DATABASE_NOTES=get datasource databases
GET_DATASOURCE_TABLES_NOTES=get datasource tables
GET_DATASOURCE_TABLE_COLUMNS_NOTES=get datasource table columns
DELETE_SCHEDULE_NOTES=delete schedule by id
QUERY_ALERT_GROUP_LIST_PAGING_NOTES=query alert group list paging
QUERY_AUTHORIZED_AND_USER_CREATED_PROJECT_NOTES= query authorized and user created project
Expand All @@ -234,8 +237,6 @@ QUERY_WORKFLOW_DEFINITION_VERSIONS_NOTES=query workflow definition versions
SWITCH_WORKFLOW_DEFINITION_VERSION_NOTES=switch workflow definition version
VERSION=version
STATE=state
GET_DATASOURCE_TABLES_NOTES=get datasource table
GET_DATASOURCE_TABLE_COLUMNS_NOTES=get datasource table columns
TABLE_NAME=table name
AUDIT_LOG_TAG=audit log related operation
TASK_DEFINITION_TAG=task definition related operation
Expand Down
Loading
Loading