Skip to content

Commit

Permalink
fix: Multi-thread recreate view/table gives error (#16465)
Browse files Browse the repository at this point in the history
* fix: Multi-thread recreate view gives error

* fix test
  • Loading branch information
zhyass authored Sep 19, 2024
1 parent 2010fae commit 047f081
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 14 deletions.
16 changes: 10 additions & 6 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1222,7 +1222,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
&req.name_ident.tenant,
*id.data,
*seq_db_id.data,
false,
true,
false,
&mut txn,
)
Expand Down Expand Up @@ -3373,9 +3373,9 @@ async fn construct_drop_table_txn_operations(
return if if_exists {
Ok((0, 0))
} else {
return Err(KVAppError::AppError(AppError::UnknownTable(
Err(KVAppError::AppError(AppError::UnknownTable(
UnknownTable::new(tbname, "drop_table_by_id"),
)));
)))
};
}

Expand All @@ -3390,9 +3390,13 @@ async fn construct_drop_table_txn_operations(
let mut tb_meta = tb_meta.unwrap();
// drop a table with drop_on time
if tb_meta.drop_on.is_some() {
return Err(KVAppError::AppError(AppError::DropTableWithDropTime(
DropTableWithDropTime::new(&dbid_tbname.table_name),
)));
return if if_exists {
Ok((0, 0))
} else {
Err(KVAppError::AppError(AppError::DropTableWithDropTime(
DropTableWithDropTime::new(&dbid_tbname.table_name),
)))
};
}

tb_meta.drop_on = Some(Utc::now());
Expand Down
35 changes: 35 additions & 0 deletions tests/suites/0_stateless/02_ddl/02_0000_create_drop_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env python3

import sqlalchemy
import os
from concurrent.futures import ThreadPoolExecutor, as_completed

def recreate_view(con):
with con.begin() as c:
c.execute(sqlalchemy.text("DROP VIEW IF EXISTS v_issue_16188"))
with con.begin() as c:
c.execute(sqlalchemy.text("CREATE OR REPLACE VIEW v_issue_16188 as select a,b from t_issue_16188"))

def main():
tcp_port = os.getenv("QUERY_MYSQL_HANDLER_PORT")
if tcp_port is None:
port = "3307"
else:
port = tcp_port

uri = "mysql+pymysql://root:root@localhost:" + port + "/"
con = sqlalchemy.create_engine(uri, future=True)
with con.begin() as c:
c.execute(sqlalchemy.text("DROP TABLE IF EXISTS t_issue_16188"))
c.execute(sqlalchemy.text("CREATE TABLE t_issue_16188 (a int not null, b int not null)"))

with ThreadPoolExecutor(max_workers=64) as executor:
futures = []
for _ in range(10):
futures.append(executor.submit(recreate_view, con))

for future in as_completed(futures):
future.result()

if __name__ == '__main__':
main()
Empty file.
18 changes: 10 additions & 8 deletions tests/suites/1_stateful/09_http_handler/09_0007_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def do_logout(_case_id, session_token):


def do_verify(session_token):
for token in [session_token, 'xxx']:
for token in [session_token, "xxx"]:
print("---- verify token ", token)
response = requests.get(
verify_url,
Expand All @@ -62,7 +62,7 @@ def do_verify(session_token):
print(response.status_code)
print(response.text)

for a in [auth, ('u', 'p')]:
for a in [auth, ("u", "p")]:
print("---- verify password: ", a)
response = requests.post(
verify_url,
Expand Down Expand Up @@ -116,9 +116,12 @@ def fake_expired_token(ty):
"nonce": "",
"sid": "",
}
return "bend-v1-" + ty + '-' + base64.b64encode(
json.dumps(expired_claim).encode("utf-8")
).decode("utf-8")
return (
"bend-v1-"
+ ty
+ "-"
+ base64.b64encode(json.dumps(expired_claim).encode("utf-8")).decode("utf-8")
)


def main():
Expand All @@ -134,7 +137,6 @@ def main():
pprint(query_resp.get("session").get("need_sticky"))
pprint(query_resp.get("session").get("need_refresh"))


# cluster
query_resp = do_query("select count(*) from system.clusters", session_token)
num_nodes = int(query_resp.get("data")[0][0])
Expand All @@ -156,7 +158,7 @@ def main():
# errors
do_query("select 2", "xxx")
do_query("select 3", "bend-v1-s-xxx")
do_query("select 4", fake_expired_token('s'))
do_query("select 4", fake_expired_token("s"))
do_query("select 5", refresh_token)

renew_resp = do_refresh(1, refresh_token, session_token)
Expand All @@ -174,7 +176,7 @@ def main():
# errors
do_refresh(2, "xxx", session_token)
do_refresh(3, "bend-v1-xxx", session_token)
do_refresh(4, fake_expired_token('r'), session_token)
do_refresh(4, fake_expired_token("r"), session_token)
do_refresh(5, session_token, session_token)

# test new_refresh_token works
Expand Down

0 comments on commit 047f081

Please sign in to comment.