Skip to content

Commit

Permalink
[ADH-5175] Improve CopyScheduler logging
Browse files Browse the repository at this point in the history
  • Loading branch information
tigrulya-exe committed Oct 25, 2024
1 parent 6aae4f2 commit d6cc744
Showing 1 changed file with 38 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public CopyScheduler(SmartContext context, MetaStore metaStore) {
try {
this.numFileDiffUseless.addAndGet(metaStore.getUselessFileDiffNum());
} catch (MetaStoreException e) {
LOG.error("Failed to get num of useless file diffs!");
LOG.error("Failed to get num of useless file diffs!", e);
}
this.fileDiffArchive = new CopyOnWriteArrayList<>();
this.fileEqualityStrategy = FileEqualityStrategy.from(conf);
Expand All @@ -204,20 +204,24 @@ public ScheduleResult onSchedule(CmdletInfo cmdletInfo, ActionInfo actionInfo,
long diffId = fileDiffChains.get(path).getHead();
if (diffId == -1) {
// FileChain is already empty
LOG.warn("File chain not found for path {}", path);
return ScheduleResult.FAIL;
}
FileDiff fileDiff = fileDiffCache.get(diffId);
if (fileDiff == null) {
LOG.warn("File diff cache entry not found for path {}", path);
return ScheduleResult.FAIL;
}
if (fileDiff.getState() != FileDiffState.PENDING) {
// If file diff is applied or failed
doOnFileChain(path, ScheduleTask.FileChain::removeHead);
fileLocks.remove(path);
LOG.warn("File diff is not PENDING for path {}", path);
return ScheduleResult.FAIL;
}
// wait dependent file diff
if (requireWait(fileDiff)) {
LOG.debug("File {} is locked by another action, it will be processed later", path);
return ScheduleResult.RETRY;
}

Expand Down Expand Up @@ -250,9 +254,7 @@ public ScheduleResult onSchedule(CmdletInfo cmdletInfo, ActionInfo actionInfo,
int appendLen = (int) (Long.parseLong(strLen) >> 20);
if (appendLen > 0) {
if (!rateLimiter.tryAcquire(appendLen)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cancel Scheduling COPY action {} due to throttling.", actionInfo);
}
LOG.debug("Cancel Scheduling COPY action {} due to throttling.", actionInfo);
return ScheduleResult.RETRY;
}
}
Expand Down Expand Up @@ -300,18 +302,27 @@ public List<String> getSupportedActions() {
}

private boolean isFileLocked(String path) {
if (fileLocks.isEmpty()) {
LOG.debug("File Lock is empty. Current path = {}", path);
if (fileLocks.contains(path)) {
LOG.debug("File {} is locked: it's in fileLocks", path);
return true;
}

if (initialSyncQueue.containsKey(path)) {
LOG.debug("File {} is locked: it's in initial sync queue", path);
return true;
}

// File is locked
return fileLocks.contains(path)
// File is in base sync queue
|| initialSyncQueue.containsKey(path)
// File Chain is not ready
|| !fileDiffChains.containsKey(path)
// File Chain is empty
|| fileDiffChains.get(path).isEmpty();
if (!fileDiffChains.containsKey(path)) {
LOG.debug("File {} is locked: no file diff chain found for this path", path);
return true;
}

if (!fileDiffChains.get(path).isEmpty()) {
LOG.debug("File {} is locked: file diff chain for this path is empty", path);
return true;
}

return false;
}

private boolean requireWait(FileDiff fileDiff) {
Expand Down Expand Up @@ -350,7 +361,7 @@ public boolean onSubmit(CmdletInfo cmdletInfo, ActionInfo actionInfo)
@Override
public void onActionFinished(CmdletInfo cmdletInfo, ActionInfo actionInfo) {
// Remove lock
FileDiff fileDiff = null;
FileDiff fileDiff;
if (actionInfo.isFinished()) {
try {
long did = actionDiffMap.get(actionInfo.getActionId());
Expand Down Expand Up @@ -602,17 +613,17 @@ private FileStatus getFileStatus(String path) {
* add fileDiff to Cache, if diff is already in cache, then print error log
*/
private void addDiffToCache(FileDiff fileDiff) {
LOG.debug("Add FileDiff Cache into file_diff cache");
LOG.debug("Adding {} to file diff cache", fileDiff);
if (fileDiffCache.containsKey(fileDiff.getDiffId())) {
LOG.error("FileDiff {} already in cache!", fileDiff);
LOG.error("File diff {} is already in cache!", fileDiff);
return;
}
fileDiffCache.put(fileDiff.getDiffId(), fileDiff);
}

private synchronized void updateFileDiffInCache(Long diffId,
FileDiffState fileDiffState) throws MetaStoreException {
LOG.debug("Update FileDiff");
private synchronized void updateFileDiffInCache(
Long diffId, FileDiffState fileDiffState) throws MetaStoreException {
LOG.debug("Update file diff wit id {}: new state {}", diffId, fileDiffState);
if (!fileDiffCache.containsKey(diffId)) {
return;
}
Expand Down Expand Up @@ -642,7 +653,7 @@ private synchronized void updateFileDiffArchive(long diffId, FileDiffState state
* delete cache and remove file lock if necessary
*/
private void deleteDiffInCache(Long diffId) {
LOG.debug("Delete FileDiff in cache");
LOG.debug("Remove file diff wit id {} from cache", diffId);
if (fileDiffCache.containsKey(diffId)) {
FileDiff fileDiff = fileDiffCache.get(diffId);
fileDiffCache.remove(diffId);
Expand All @@ -666,7 +677,7 @@ private synchronized void pushCacheToDB() throws MetaStoreException {

// Push cache to metastore
if (!updatedFileDiffs.isEmpty()) {
LOG.debug("Push FileDiff from cache to metastore");
LOG.debug("Inserting file diff cache to metastore");
metaStore.updateFileDiff(updatedFileDiffs);
}
// Remove file diffs in cache and file lock
Expand Down Expand Up @@ -716,7 +727,7 @@ private void processPendingDiffs(
addToFileDiffArchive(fileDiff);
}

LOG.debug("Size of Pending diffs {}", fileDiffs.size());
LOG.debug("Start processing pending diffs of size {}", fileDiffs.size());
if (fileDiffs.isEmpty() && initialSyncQueue.isEmpty()) {
LOG.debug("All Backup directories are synced");
return;
Expand Down Expand Up @@ -884,7 +895,7 @@ void mergeAppend() throws MetaStoreException {
if (fileLocks.contains(filePath)) {
return;
}
LOG.debug("Append Merge Triggered!");
LOG.debug("Append chain merge triggered for path {}", filePath);
// Lock file to avoid File Chain being processed
fileLocks.add(filePath);
try {
Expand Down Expand Up @@ -928,7 +939,7 @@ void mergeAppend() throws MetaStoreException {
}

void mergeDelete(FileDiff fileDiff) throws MetaStoreException {
LOG.debug("Delete Merge Triggered!");
LOG.debug("Delete chain merge triggered for path {}", filePath);
for (FileDiff archiveDiff : fileDiffArchive) {
if (archiveDiff.getDiffId() == fileDiff.getDiffId()) {
break;
Expand All @@ -950,7 +961,7 @@ void mergeRename(FileDiff fileDiff) throws MetaStoreException {
if (fileLocks.contains(filePath)) {
return;
}
LOG.debug("Rename Merge Triggered!");
LOG.debug("Rename chain merge triggered for path {}", filePath);
// Lock file to avoid File Chain being processed
fileLocks.add(filePath);
try {
Expand Down Expand Up @@ -1082,7 +1093,7 @@ public void run() {
try {
numFileDiffUseless.addAndGet(-metaStore.deleteUselessFileDiff(maxNumRecords));
} catch (MetaStoreException e) {
LOG.error("Error occurs when delete useless file diff!");
LOG.error("Error occurs when delete useless file diff!", e);
}
}
}
Expand Down

0 comments on commit d6cc744

Please sign in to comment.