Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ft/cachetable/cachetable-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ struct cachefile {
void (*end_checkpoint_userdata)(CACHEFILE cf, int fd, void *userdata); // after checkpointing cachefiles call this function.
void (*note_pin_by_checkpoint)(CACHEFILE cf, void *userdata); // add a reference to the userdata to prevent it from being removed from memory
void (*note_unpin_by_checkpoint)(CACHEFILE cf, void *userdata); // add a reference to the userdata to prevent it from being removed from memory

void (*note_pin_by_backup)(CACHEFILE cf, void *userdata); // add a reference to the userdata to prevent it from being removed from memory
void (*note_unpin_by_backup)(CACHEFILE cf, void *userdata); // add a reference to the userdata to prevent it from being removed from memory
BACKGROUND_JOB_MANAGER bjm;
};

Expand Down Expand Up @@ -413,6 +416,8 @@ class checkpointer {
void add_background_job();
void remove_background_job();
void end_checkpoint(void (*testcallback_f)(void*), void* testextra);
void begin_backup();
void end_backup();
TOKULOGGER get_logger();
// used during begin_checkpoint
void increment_num_txns();
Expand Down
37 changes: 36 additions & 1 deletion ft/cachetable/cachetable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2793,6 +2793,37 @@ void toku_cachetable_end_checkpoint(CHECKPOINTER cp, TOKULOGGER UU(logger),
cp->end_checkpoint(testcallback_f, testextra);
}

// in_backup begin

struct iterate_note_pin_backup {
static int fn(const CACHEFILE &cf, uint32_t UU(idx), void **UU(extra)) {
assert(cf->note_pin_by_backup);
cf->note_pin_by_backup(cf, cf->userdata);
return 0;
}
};

struct iterate_note_unpin_backup {
static int fn(const CACHEFILE &cf, uint32_t UU(idx), void **UU(extra)) {
assert(cf->note_unpin_by_backup);
cf->note_unpin_by_backup(cf, cf->userdata);
return 0;
}
};
void toku_cachetable_begin_backup(CACHETABLE ct)
{
ct->cf_list.read_lock();
ct->cf_list.m_active_fileid.iterate<void *, iterate_note_pin_backup::fn>(nullptr);
ct->cf_list.read_unlock();
}

void toku_cachetable_end_backup(CACHETABLE ct)
{
ct->cf_list.m_active_fileid.iterate<void *, iterate_note_unpin_backup::fn>(nullptr);
}

// in_backup end

TOKULOGGER toku_cachefile_logger (CACHEFILE cf) {
return cf->cachetable->cp.get_logger();
}
Expand Down Expand Up @@ -2909,7 +2940,9 @@ toku_cachefile_set_userdata (CACHEFILE cf,
void (*begin_checkpoint_userdata)(LSN, void*),
void (*end_checkpoint_userdata)(CACHEFILE, int, void*),
void (*note_pin_by_checkpoint)(CACHEFILE, void*),
void (*note_unpin_by_checkpoint)(CACHEFILE, void*)) {
void (*note_unpin_by_checkpoint)(CACHEFILE, void*),
void (*note_pin_by_backup)(CACHEFILE, void*),
void (*note_unpin_by_backup)(CACHEFILE, void*)) {
cf->userdata = userdata;
cf->log_fassociate_during_checkpoint = log_fassociate_during_checkpoint;
cf->close_userdata = close_userdata;
Expand All @@ -2919,6 +2952,8 @@ toku_cachefile_set_userdata (CACHEFILE cf,
cf->end_checkpoint_userdata = end_checkpoint_userdata;
cf->note_pin_by_checkpoint = note_pin_by_checkpoint;
cf->note_unpin_by_checkpoint = note_unpin_by_checkpoint;
cf->note_pin_by_backup = note_pin_by_backup;
cf->note_unpin_by_backup = note_unpin_by_backup;
}

void *toku_cachefile_get_userdata(CACHEFILE cf) {
Expand Down
6 changes: 5 additions & 1 deletion ft/cachetable/cachetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ void toku_cachetable_begin_checkpoint (CHECKPOINTER cp, struct tokulogger *logge
void toku_cachetable_end_checkpoint(CHECKPOINTER cp, struct tokulogger *logger,
void (*testcallback_f)(void*), void * testextra);

void toku_cachetable_begin_backup(CACHETABLE ct);
void toku_cachetable_end_backup(CACHETABLE ct);

// Shuts down checkpoint thread
// Requires no locks be held that are taken by the checkpoint function
Expand Down Expand Up @@ -285,7 +287,9 @@ void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata,
void (*begin_checkpoint_userdata)(LSN, void*),
void (*end_checkpoint_userdata)(CACHEFILE, int, void*),
void (*note_pin_by_checkpoint)(CACHEFILE, void*),
void (*note_unpin_by_checkpoint)(CACHEFILE, void*));
void (*note_unpin_by_checkpoint)(CACHEFILE, void*),
void (*note_pin_by_backup)(CACHEFILE, void*),
void (*note_unpin_by_backup)(CACHEFILE, void*));
// Effect: Store some cachefile-specific user data. When the last reference to a cachefile is closed, we call close_userdata().
// Before starting a checkpoint, we call checkpoint_prepare_userdata().
// When the cachefile needs to be checkpointed, we call checkpoint_userdata().
Expand Down
4 changes: 3 additions & 1 deletion ft/ft-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,9 @@ struct ft {
uint32_t num_txns;
// A checkpoint is running. If true, then keep this header around for checkpoint, like a transaction
bool pinned_by_checkpoint;

// Number of backups that are using this FT. If it is nonzero, keep this header around until backup
// is completd.
uint32_t num_backups;
// is this ft a blackhole? if so, all messages are dropped.
bool blackhole;

Expand Down
84 changes: 55 additions & 29 deletions ft/ft.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,13 +306,42 @@ static void unpin_by_checkpoint_callback(FT ft, void *extra) {
}

// maps to cf->note_unpin_by_checkpoint
//Must be protected by ydb lock.
//Called by end_checkpoint, which grabs ydb lock around note_unpin
// Must be protected by ydb lock.
// Called by end_checkpoint, which grabs ydb lock around note_unpin
static void ft_note_unpin_by_checkpoint (CACHEFILE UU(cachefile), void *header_v) {
FT ft = (FT) header_v;
toku_ft_remove_reference(ft, false, ZERO_LSN, unpin_by_checkpoint_callback, NULL);
}


// maps to cf->note_pin_by_backup
// Must be protected by ydb lock.
// Is only called by backup begin, which holds it
static void ft_note_pin_by_backup (CACHEFILE UU(cachefile), void *header_v) {
// Note: open_close lock is held by checkpoint begin
FT ft = (FT) header_v;
toku_ft_grab_reflock(ft);
assert(toku_ft_needed_unlocked(ft));
ft->num_backups++;
toku_ft_release_reflock(ft);
}

// Requires: the reflock is held.
static void unpin_by_backup_callback(FT ft, void *extra) {
invariant(extra == NULL);
invariant(ft->num_backups > 0);
ft->num_backups--;
}

// maps to cf->note_unpin_by_backup
// Must be protected by ydb lock.
// Called by end_backup, which grabs ydb lock around note_unpin
static void ft_note_unpin_by_backup (CACHEFILE UU(cachefile), void *header_v) {
FT ft = (FT) header_v;
toku_ft_remove_reference(ft, false, ZERO_LSN, unpin_by_backup_callback, NULL);
}


//
// End of Functions that are callbacks to the cachefile
/////////////////////////////////////////////////////////////////////////
Expand All @@ -332,38 +361,33 @@ static void setup_initial_ft_root_node(FT ft, BLOCKNUM blocknum) {
}

static void ft_init(FT ft, FT_OPTIONS options, CACHEFILE cf) {
// fake, prevent unnecessary upgrade logic
ft->layout_version_read_from_disk = FT_LAYOUT_VERSION;
ft->checkpoint_header = NULL;
// fake, prevent unnecessary upgrade logic
ft->layout_version_read_from_disk = FT_LAYOUT_VERSION;
ft->checkpoint_header = NULL;

toku_list_init(&ft->live_ft_handles);
toku_list_init(&ft->live_ft_handles);

// intuitively, the comparator points to the FT's cmp descriptor
ft->cmp.create(options->compare_fun, &ft->cmp_descriptor, options->memcmp_magic);
ft->update_fun = options->update_fun;
// intuitively, the comparator points to the FT's cmp descriptor
ft->cmp.create(options->compare_fun, &ft->cmp_descriptor,
options->memcmp_magic);
ft->update_fun = options->update_fun;

if (ft->cf != NULL) {
assert(ft->cf == cf);
}
ft->cf = cf;
ft->in_memory_stats = ZEROSTATS;
if (ft->cf != NULL) {
assert(ft->cf == cf);
}
ft->cf = cf;
ft->in_memory_stats = ZEROSTATS;

setup_initial_ft_root_node(ft, ft->h->root_blocknum);
toku_cachefile_set_userdata(ft->cf,
ft,
ft_log_fassociate_during_checkpoint,
ft_close,
ft_free,
ft_checkpoint,
ft_begin_checkpoint,
ft_end_checkpoint,
ft_note_pin_by_checkpoint,
ft_note_unpin_by_checkpoint);
setup_initial_ft_root_node(ft, ft->h->root_blocknum);
toku_cachefile_set_userdata(
ft->cf, ft, ft_log_fassociate_during_checkpoint, ft_close, ft_free,
ft_checkpoint, ft_begin_checkpoint, ft_end_checkpoint,
ft_note_pin_by_checkpoint, ft_note_unpin_by_checkpoint,
ft_note_pin_by_backup, ft_note_unpin_by_backup);

ft->blocktable.verify_no_free_blocknums();
ft->blocktable.verify_no_free_blocknums();
}


static FT_HEADER
ft_header_create(FT_OPTIONS options, BLOCKNUM root_blocknum, TXNID root_xid_that_created)
{
Expand Down Expand Up @@ -455,7 +479,9 @@ int toku_read_ft_and_store_in_cachefile (FT_HANDLE ft_handle, CACHEFILE cf, LSN
ft_begin_checkpoint,
ft_end_checkpoint,
ft_note_pin_by_checkpoint,
ft_note_unpin_by_checkpoint);
ft_note_unpin_by_checkpoint,
ft_note_pin_by_backup,
ft_note_unpin_by_backup);
*header = ft;
return 0;
}
Expand All @@ -475,7 +501,7 @@ static int
ft_get_reference_count(FT ft) {
uint32_t pinned_by_checkpoint = ft->pinned_by_checkpoint ? 1 : 0;
int num_handles = toku_list_num_elements_est(&ft->live_ft_handles);
return pinned_by_checkpoint + ft->num_txns + num_handles;
return pinned_by_checkpoint + ft->num_txns + ft-> num_backups + num_handles;
}

// a ft is needed in memory iff its reference count is non-zero
Expand Down
20 changes: 7 additions & 13 deletions ft/tests/cachetable-pin-checkpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -358,21 +358,15 @@ cachetable_test (void) {
toku_cachetable_create(&ct, test_limit, ZERO_LSN, nullptr);
const char *fname1 = TOKU_TEST_FILENAME;
unlink(fname1);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR | O_CREAT,
S_IRWXU | S_IRWXG | S_IRWXO);
assert(r == 0);

toku_cachefile_set_userdata(
f1,
NULL,
&dummy_log_fassociate,
&dummy_close_usr,
&dummy_free_usr,
&dummy_chckpnt_usr,
&test_begin_checkpoint,
&dummy_end,
&dummy_note_pin,
&dummy_note_unpin
);

f1, NULL, &dummy_log_fassociate, &dummy_close_usr, &dummy_free_usr,
&dummy_chckpnt_usr, &test_begin_checkpoint, &dummy_end, &dummy_note_pin,
&dummy_note_unpin, &dummy_note_pin, &dummy_note_unpin);

toku_pthread_t time_tid;
toku_pthread_t checkpoint_tid;
toku_pthread_t move_tid[NUM_MOVER_THREADS];
Expand Down
16 changes: 6 additions & 10 deletions ft/tests/cachetable-put-checkpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -487,20 +487,16 @@ cachetable_test (void) {
toku_cachetable_create(&ct, test_limit, ZERO_LSN, nullptr);
const char *fname1 = TOKU_TEST_FILENAME;
unlink(fname1);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR | O_CREAT,
S_IRWXU | S_IRWXG | S_IRWXO);
assert(r == 0);

toku_cachefile_set_userdata(
f1,
NULL,
&dummy_log_fassociate,
&dummy_close_usr,
&dummy_free_usr,
f1, NULL, &dummy_log_fassociate, &dummy_close_usr, &dummy_free_usr,
&dummy_chckpnt_usr,
test_begin_checkpoint, // called in begin_checkpoint
&dummy_end,
&dummy_note_pin,
&dummy_note_unpin
);
&dummy_end, &dummy_note_pin, &dummy_note_unpin, &dummy_note_pin,
&dummy_note_unpin);

toku_pthread_t time_tid;
toku_pthread_t checkpoint_tid;
Expand Down
16 changes: 4 additions & 12 deletions ft/tests/cachetable-simple-close.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,10 @@ static void free_usr(CACHEFILE UU(cf), void* UU(p)) {
}

static void set_cf_userdata(CACHEFILE f1) {
toku_cachefile_set_userdata(
f1,
NULL,
&dummy_log_fassociate,
&close_usr,
&free_usr,
&dummy_chckpnt_usr,
&dummy_begin,
&dummy_end,
&dummy_note_pin,
&dummy_note_unpin
);
toku_cachefile_set_userdata(f1, NULL, &dummy_log_fassociate, &close_usr,
&free_usr, &dummy_chckpnt_usr, &dummy_begin,
&dummy_end, &dummy_note_pin, &dummy_note_unpin,
&dummy_note_pin, &dummy_note_unpin);
}

bool keep_me;
Expand Down
16 changes: 5 additions & 11 deletions ft/tests/cachetable-test.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,9 @@ static void dummy_note_unpin(CACHEFILE UU(cf), void* UU(p)) { }
static UU() void
create_dummy_functions(CACHEFILE cf)
{
void *ud = NULL;
toku_cachefile_set_userdata(cf,
ud,
&dummy_log_fassociate,
&dummy_close_usr,
&dummy_free_usr,
&dummy_chckpnt_usr,
&dummy_begin,
&dummy_end,
&dummy_note_pin,
&dummy_note_unpin);
void *ud = NULL;
toku_cachefile_set_userdata(cf, ud, &dummy_log_fassociate, &dummy_close_usr,
&dummy_free_usr, &dummy_chckpnt_usr, &dummy_begin,
&dummy_end, &dummy_note_pin, &dummy_note_unpin,
&dummy_note_pin, &dummy_note_unpin);
};
12 changes: 7 additions & 5 deletions ft/tests/fifo-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,17 @@ test_enqueue(int n) {
if (i == 0) {
xids = toku_xids_get_root_xids();
} else {
int r = toku_xids_create_child(toku_xids_get_root_xids(), &xids, (TXNID)i);
assert_zero(r);
int r = toku_xids_create_child(toku_xids_get_root_xids(), &xids,
(TXNID)i);
assert_zero(r);
}
MSN msn = next_dummymsn();
if (startmsn.msn == ZERO_MSN.msn)
startmsn = msn;
enum ft_msg_type type = (enum ft_msg_type) i;
startmsn = msn;
enum ft_msg_type type = (enum ft_msg_type)i;
DBT k, v;
ft_msg msg(toku_fill_dbt(&k, thekey, thekeylen), toku_fill_dbt(&v, theval, thevallen), type, msn, xids);
ft_msg msg(toku_fill_dbt(&k, thekey, thekeylen),
toku_fill_dbt(&v, theval, thevallen), type, msn, xids);
msg_buffer.enqueue(msg, true, nullptr);
toku_xids_destroy(&xids);
toku_free(thekey);
Expand Down
Loading