25
25
import org .apache .paimon .fs .Path ;
26
26
import org .apache .paimon .operation .Lock ;
27
27
import org .apache .paimon .options .CatalogOptions ;
28
- import org .apache .paimon .options .Options ;
29
28
import org .apache .paimon .schema .Schema ;
30
29
import org .apache .paimon .schema .SchemaChange ;
31
30
import org .apache .paimon .schema .SchemaManager ;
46
45
import java .sql .ResultSet ;
47
46
import java .sql .SQLException ;
48
47
import java .util .AbstractMap ;
48
+ import java .util .HashMap ;
49
49
import java .util .List ;
50
50
import java .util .Map ;
51
51
import java .util .Optional ;
@@ -70,19 +70,23 @@ public class JdbcCatalog extends AbstractCatalog {
70
70
private Map <String , String > configuration ;
71
71
private final String warehouse ;
72
72
73
- protected JdbcCatalog (FileIO fileIO , String catalogName , Options options , String warehouse ) {
73
+ protected JdbcCatalog (
74
+ FileIO fileIO , String catalogName , Map <String , String > config , String warehouse ) {
74
75
super (fileIO );
75
76
if (!StringUtils .isBlank (catalogName )) {
76
77
this .catalogName = catalogName ;
77
78
}
78
- this .configuration = options . toMap () ;
79
+ this .configuration = config ;
79
80
this .warehouse = warehouse ;
80
81
Preconditions .checkNotNull (configuration , "Invalid catalog properties: null" );
81
82
this .connections =
82
83
new JdbcClientPool (
83
- options .get (CatalogOptions .CLIENT_POOL_SIZE ),
84
- options .get (CatalogOptions .URI ),
85
- this .configuration );
84
+ Integer .parseInt (
85
+ config .getOrDefault (
86
+ CatalogOptions .CLIENT_POOL_SIZE .key (),
87
+ CatalogOptions .CLIENT_POOL_SIZE .defaultValue ().toString ())),
88
+ configuration .get (CatalogOptions .URI .key ()),
89
+ configuration );
86
90
try {
87
91
initializeCatalogTablesIfNeed ();
88
92
} catch (SQLException e ) {
@@ -97,7 +101,7 @@ private void initializeCatalogTablesIfNeed() throws SQLException, InterruptedExc
97
101
boolean initializeCatalogTables =
98
102
Boolean .parseBoolean (
99
103
configuration .getOrDefault (INITIALIZE_CATALOG_TABLES .key (), "false" ));
100
- String uri = configuration .get (CatalogOptions .URI );
104
+ String uri = configuration .get (CatalogOptions .URI . key () );
101
105
Preconditions .checkNotNull (uri , "JDBC connection URI is required" );
102
106
if (initializeCatalogTables ) {
103
107
// Check and create catalog table.
@@ -133,6 +137,24 @@ private void initializeCatalogTablesIfNeed() throws SQLException, InterruptedExc
133
137
return conn .prepareStatement (JdbcUtils .CREATE_DATABASE_PROPERTIES_TABLE )
134
138
.execute ();
135
139
});
140
+
141
+ // Check and create distributed lock table.
142
+ connections .run (
143
+ conn -> {
144
+ DatabaseMetaData dbMeta = conn .getMetaData ();
145
+ ResultSet tableExists =
146
+ dbMeta .getTables (
147
+ null /* catalog name */ ,
148
+ null /* schemaPattern */ ,
149
+ JdbcUtils
150
+ .DISTRIBUTED_LOCKS_TABLE_NAME /* tableNamePattern */ ,
151
+ null /* types */ );
152
+ if (tableExists .next ()) {
153
+ return true ;
154
+ }
155
+ return conn .prepareStatement (JdbcUtils .CREATE_DISTRIBUTED_LOCK_TABLE_SQL )
156
+ .execute ();
157
+ });
136
158
}
137
159
}
138
160
@@ -149,6 +171,12 @@ public List<String> listDatabases() {
149
171
row -> row .getString (JdbcUtils .TABLE_DATABASE ),
150
172
JdbcUtils .LIST_ALL_TABLE_DATABASES_SQL ,
151
173
catalogName ));
174
+
175
+ namespaces .addAll (
176
+ fetch (
177
+ row -> row .getString (JdbcUtils .DATABASE_NAME ),
178
+ JdbcUtils .LIST_ALL_PROPERTY_DATABASES_SQL ,
179
+ catalogName ));
152
180
return namespaces ;
153
181
}
154
182
@@ -177,15 +205,10 @@ protected void createDatabaseImpl(String name, Map<String, String> properties) {
177
205
throw new RuntimeException (String .format ("Database already exists: %s" , name ));
178
206
}
179
207
180
- Map <String , String > createProps ;
181
- if (properties == null || properties .isEmpty ()) {
182
- createProps = ImmutableMap .of (DATABASE_EXISTS_PROPERTY , "true" );
183
- } else {
184
- createProps =
185
- ImmutableMap .<String , String >builder ()
186
- .putAll (properties )
187
- .put (DATABASE_EXISTS_PROPERTY , "true" )
188
- .build ();
208
+ Map <String , String > createProps = new HashMap <>();
209
+ createProps .put (DATABASE_EXISTS_PROPERTY , "true" );
210
+ if (properties != null && !properties .isEmpty ()) {
211
+ createProps .putAll (properties );
189
212
}
190
213
191
214
if (!createProps .containsKey (DB_LOCATION_PROP )) {
@@ -197,21 +220,9 @@ protected void createDatabaseImpl(String name, Map<String, String> properties) {
197
220
198
221
@ Override
199
222
protected void dropDatabaseImpl (String name ) {
200
- if (!databaseExists (name )) {
201
- return ;
202
- }
203
- try {
204
- List <String > tablesInDatabase = listTables (name );
205
- // Database cannot be deleted when a table exists
206
- if (tablesInDatabase != null && !tablesInDatabase .isEmpty ()) {
207
- throw new RuntimeException (
208
- String .format (
209
- "Database %s is not empty. %s tables exist." ,
210
- name , tablesInDatabase .size ()));
211
- }
212
- } catch (DatabaseNotExistException e ) {
213
- throw new RuntimeException (e );
214
- }
223
+ // delete table
224
+ execute (connections , JdbcUtils .DELETE_TABLES_SQL , catalogName , name );
225
+ // delete properties
215
226
execute (connections , JdbcUtils .DELETE_ALL_DATABASE_PROPERTIES_SQL , catalogName , name );
216
227
}
217
228
@@ -315,11 +326,7 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
315
326
}
316
327
// Update table metadata
317
328
updateTableMetadataLocation (
318
- connections ,
319
- catalogName ,
320
- fromTable ,
321
- toPath .toString (),
322
- fromPath .toString ());
329
+ connections , catalogName , toTable , toPath .toString (), fromPath .toString ());
323
330
}
324
331
} catch (Exception e ) {
325
332
throw new RuntimeException ("Failed to rename table " + fromTable .getFullName (), e );
@@ -380,6 +387,11 @@ public boolean tableExists(Identifier identifier) {
380
387
connections , catalogName , identifier .getDatabaseName (), identifier .getObjectName ());
381
388
}
382
389
390
+ @ Override
391
+ public boolean caseSensitive () {
392
+ return false ;
393
+ }
394
+
383
395
@ Override
384
396
public Optional <CatalogLock .Factory > lockFactory () {
385
397
return lockEnabled ()
0 commit comments