Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block resuming a partition merge schema change #4936

Open
wants to merge 1 commit into
base: 8.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions db/osqlblockproc.c
Original file line number Diff line number Diff line change
Expand Up @@ -1367,8 +1367,9 @@ void *bplog_commit_timepart_resuming_sc(void *p)
} else {
logmsg(LOGMSG_ERROR, "%s: shard '%s', rc %d\n", __func__,
sc->tablename, sc->sc_rc);
sc_set_running(&iq, sc, sc->tablename, 0, NULL, 0, 0, __func__,
__LINE__);
if (sc->set_running)
sc_set_running(&iq, sc, sc->tablename, 0, NULL, 0, 0, __func__,
__LINE__);
free_schema_change_type(sc);
error = 1;
}
Expand Down
3 changes: 0 additions & 3 deletions db/osqlcomm.c
Original file line number Diff line number Diff line change
Expand Up @@ -5991,13 +5991,10 @@ static int _process_partitioned_table_merge(struct ireq *iq)
*/
sc->nothrevent = 1; /* we need do_alter_table to run first */
sc->finalize = 0;
enum comdb2_partition_type tt = sc->partition.type;
sc->partition.type = PARTITION_NONE;

strncpy(sc->tablename, first_shard->tablename, sizeof(sc->tablename));

rc = start_schema_change_tran(iq, NULL);
sc->partition.type = tt;
iq->sc->sc_next = iq->sc_pending;
iq->sc_pending = iq->sc;
if (rc != SC_COMMIT_PENDING) {
Expand Down
18 changes: 11 additions & 7 deletions schemachange/sc_alter_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,17 @@ int do_alter_table(struct ireq *iq, struct schema_change_type *s,
struct scinfo scinfo;
struct errstat err = {0};

if (s->partition.type == PARTITION_MERGE)
db = get_dbtable_by_name(s->tablename);
if (db == NULL) {
sc_errf(s, "Table not found:%s\n", s->tablename);
return SC_TABLE_DOESNOT_EXIST;
}


/* note for a partition merge, if we reuse the first shard (i.e. it is aliased),
* we need to alter it here instead of running do_merge_table
*/
if (s->partition.type == PARTITION_MERGE && !db->sqlaliasname)
return do_merge_table(iq, s, tran);

#ifdef DEBUG_SC
Expand All @@ -414,12 +424,6 @@ int do_alter_table(struct ireq *iq, struct schema_change_type *s,
gbl_use_plan = 1;
gbl_sc_last_writer_time = 0;

db = get_dbtable_by_name(s->tablename);
if (db == NULL) {
sc_errf(s, "Table not found:%s\n", s->tablename);
return SC_TABLE_DOESNOT_EXIST;
}

if (s->resume == SC_PREEMPT_RESUME) {
newdb = db->sc_to;
changed = s->schema_change;
Expand Down
82 changes: 64 additions & 18 deletions schemachange/sc_logic.c
Original file line number Diff line number Diff line change
Expand Up @@ -940,15 +940,59 @@ static int verify_sc_resumed_for_shard(const char *shardname,
static int verify_sc_resumed_for_all_shards(void *obj, void *arg)
{
struct timepart_sc_resuming *tpt_sc = (struct timepart_sc_resuming *)obj;
int rc = 0;

/* corner case, shard merging: all shard schema changes need
* to share the same destination table newdb;
* at this point we will block resuming a shard merging, give
* the complexity of the changes involved
*/
struct schema_change_type *sc = tpt_sc->s;
while (sc) {
if (sc->partition.type == PARTITION_MERGE) {
break;
}
sc = sc->sc_next;
}
if (sc) {
/* merge operation, resume not supported */
logmsg(LOGMSG_ERROR, "%s partition merging detected %s, aborting\n", __func__,
tpt_sc->s->tablename);
sc = tpt_sc->s;
while (sc) {
mark_schemachange_over(sc->tablename);
struct dbtable *tbl = get_dbtable_by_name(sc->tablename);
if (tbl) {
sc_del_unused_files(tbl);
}
sc->sc_rc = SC_ABORTED;
sc = sc->sc_next;
}
return 0;
}

/* we need to start all the shards already in progress */
sc = tpt_sc->s;
while (sc) {
rc = start_schema_change(sc);
if (rc != SC_OK && rc != SC_ASYNC) {
logmsg(LOGMSG_ERROR,
"%s: failed to resume schema change for table '%s' rc %d\n",
__func__, sc->tablename, rc);
sc->sc_rc = SC_ABORTED;
return -1;
}
sc = sc->sc_next;
}

/* all shards resumed, including the next shard if any */
/* are all shards resumed, including the next shard if any */
if (tpt_sc->nshards > timepart_get_num_shards(tpt_sc->viewname))
return 0;

/* start new sc for shards that were not resumed */
timepart_sc_arg_t sc_arg = {0};
sc_arg.view_name = tpt_sc->viewname;
sc_arg.s = tpt_sc->s;
/* start new sc for shards that were not resumed */
sc_arg.check_extra_shard = 1;
timepart_foreach_shard(tpt_sc->viewname, verify_sc_resumed_for_shard, &sc_arg);
assert(sc_arg.s != tpt_sc->s);
Expand Down Expand Up @@ -1105,19 +1149,7 @@ int resume_schema_change(void)

MEMORY_SYNC;

/* start the schema change back up */
rc = start_schema_change(s);
if (rc != SC_OK && rc != SC_ASYNC) {
logmsg(
LOGMSG_ERROR,
"%s: failed to resume schema change for table '%s' rc %d\n",
__func__, s->tablename, rc);
/* start_schema_change will free if this fails */
/*
free_schema_change_type(s);
*/
continue;
} else if (s->timepartition_name) {
if (s->timepartition_name) {
struct timepart_sc_resuming *tpt_sc = NULL;
tpt_sc = hash_find(tpt_sc_hash, &s->timepartition_name);
if (tpt_sc == NULL) {
Expand All @@ -1138,9 +1170,23 @@ int resume_schema_change(void)
tpt_sc->s = s;
tpt_sc->nshards++;
}
} else if (s->finalize == 0 && s->kind != SC_ALTERTABLE_PENDING) {
s->sc_next = sc_resuming;
sc_resuming = s;
} else {
/* start the schema change back up */
rc = start_schema_change(s);
if (rc != SC_OK && rc != SC_ASYNC) {
logmsg(
LOGMSG_ERROR,
"%s: failed to resume schema change for table '%s' rc %d\n",
__func__, s->tablename, rc);
/* start_schema_change will free if this fails */
/*
free_schema_change_type(s);
*/
continue;
} else if (s->finalize == 0 && s->kind != SC_ALTERTABLE_PENDING) {
s->sc_next = sc_resuming;
sc_resuming = s;
}
}
}
}
Expand Down