Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Multi-thread recreate view/table gives error #16465

Merged
merged 2 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1222,7 +1222,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
&req.name_ident.tenant,
*id.data,
*seq_db_id.data,
false,
true,
false,
&mut txn,
)
Expand Down Expand Up @@ -3373,9 +3373,9 @@ async fn construct_drop_table_txn_operations(
return if if_exists {
Ok((0, 0))
} else {
return Err(KVAppError::AppError(AppError::UnknownTable(
Err(KVAppError::AppError(AppError::UnknownTable(
UnknownTable::new(tbname, "drop_table_by_id"),
)));
)))
};
}

Expand All @@ -3390,9 +3390,13 @@ async fn construct_drop_table_txn_operations(
let mut tb_meta = tb_meta.unwrap();
// drop a table with drop_on time
if tb_meta.drop_on.is_some() {
return Err(KVAppError::AppError(AppError::DropTableWithDropTime(
DropTableWithDropTime::new(&dbid_tbname.table_name),
)));
return if if_exists {
Ok((0, 0))
} else {
Err(KVAppError::AppError(AppError::DropTableWithDropTime(
DropTableWithDropTime::new(&dbid_tbname.table_name),
)))
};
}

tb_meta.drop_on = Some(Utc::now());
Expand Down
35 changes: 35 additions & 0 deletions tests/suites/0_stateless/02_ddl/02_0000_create_drop_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env python3

import sqlalchemy
import os
from concurrent.futures import ThreadPoolExecutor, as_completed

def recreate_view(con):
with con.begin() as c:
c.execute(sqlalchemy.text("DROP VIEW IF EXISTS v_issue_16188"))
with con.begin() as c:
c.execute(sqlalchemy.text("CREATE OR REPLACE VIEW v_issue_16188 as select a,b from t_issue_16188"))

def main():
tcp_port = os.getenv("QUERY_MYSQL_HANDLER_PORT")
if tcp_port is None:
port = "3307"
else:
port = tcp_port

uri = "mysql+pymysql://root:root@localhost:" + port + "/"
con = sqlalchemy.create_engine(uri, future=True)
with con.begin() as c:
c.execute(sqlalchemy.text("DROP TABLE IF EXISTS t_issue_16188"))
c.execute(sqlalchemy.text("CREATE TABLE t_issue_16188 (a int not null, b int not null)"))

with ThreadPoolExecutor(max_workers=64) as executor:
futures = []
for _ in range(10):
futures.append(executor.submit(recreate_view, con))

for future in as_completed(futures):
future.result()

if __name__ == '__main__':
main()
Empty file.
18 changes: 10 additions & 8 deletions tests/suites/1_stateful/09_http_handler/09_0007_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def do_logout(_case_id, session_token):


def do_verify(session_token):
for token in [session_token, 'xxx']:
for token in [session_token, "xxx"]:
print("---- verify token ", token)
response = requests.get(
verify_url,
Expand All @@ -62,7 +62,7 @@ def do_verify(session_token):
print(response.status_code)
print(response.text)

for a in [auth, ('u', 'p')]:
for a in [auth, ("u", "p")]:
print("---- verify password: ", a)
response = requests.post(
verify_url,
Expand Down Expand Up @@ -116,9 +116,12 @@ def fake_expired_token(ty):
"nonce": "",
"sid": "",
}
return "bend-v1-" + ty + '-' + base64.b64encode(
json.dumps(expired_claim).encode("utf-8")
).decode("utf-8")
return (
"bend-v1-"
+ ty
+ "-"
+ base64.b64encode(json.dumps(expired_claim).encode("utf-8")).decode("utf-8")
)


def main():
Expand All @@ -134,7 +137,6 @@ def main():
pprint(query_resp.get("session").get("need_sticky"))
pprint(query_resp.get("session").get("need_refresh"))


# cluster
query_resp = do_query("select count(*) from system.clusters", session_token)
num_nodes = int(query_resp.get("data")[0][0])
Expand All @@ -156,7 +158,7 @@ def main():
# errors
do_query("select 2", "xxx")
do_query("select 3", "bend-v1-s-xxx")
do_query("select 4", fake_expired_token('s'))
do_query("select 4", fake_expired_token("s"))
do_query("select 5", refresh_token)

renew_resp = do_refresh(1, refresh_token, session_token)
Expand All @@ -174,7 +176,7 @@ def main():
# errors
do_refresh(2, "xxx", session_token)
do_refresh(3, "bend-v1-xxx", session_token)
do_refresh(4, fake_expired_token('r'), session_token)
do_refresh(4, fake_expired_token("r"), session_token)
do_refresh(5, session_token, session_token)

# test new_refresh_token works
Expand Down
Loading