Skip to content
This repository has been archived by the owner on Jul 18, 2022. It is now read-only.

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
spimminger committed Oct 21, 2015
2 parents ee288bd + 07e3ab0 commit a8cc1b0
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

# WORKER ----------------------------------------------------------------------
# Worker AppId and AppSecret
backmeup.worker.id = backmeup-worker
backmeup.worker.secret = REPLACE-WORKER
backmeup.worker.appId = backmeup-worker
backmeup.worker.appSecret = REPLACE-WORKER
# If property is not set, hostname will be used.
#backmeup.worker.name = worker-dev
# Set the maximum no of jobs being executed concurrently. Note: For each parallel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

# WORKER ----------------------------------------------------------------------
# If property is not set, a random UUID will be created at startup
#backmeup.worker.id =
#backmeup.worker.appId =
# If property is not set, hostname will be used.
#backmeup.worker.name =
backmeup.worker.appId = backmeup-worker
backmeup.worker.appSecret = REPLACE-WORKER
# Set the maximum no of jobs being executed concurrently. Note: For each parallel
# job a thread is created.
backmeup.worker.maxParallelJobs = 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

# WORKER ----------------------------------------------------------------------
# If property is not set, a random UUID will be created at startup
#backmeup.worker.id =
#backmeup.worker.appId =
# If property is not set, hostname will be used.
#backmeup.worker.name =
backmeup.worker.appId = backmeup-worker
backmeup.worker.appSecret = REPLACE-WORKER
# Set the maximum no of jobs being executed concurrently. Note: For each parallel
# job a thread is created.
backmeup.worker.maxParallelJobs = 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
public class WorkerCore {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkerCore.class);
private static final int WORKER_CONFIG_TIMEOUT_SECONDS = 60;

private final String workerId;
private final String workerSecret;
private String workerName;
Expand All @@ -49,10 +49,10 @@ public class WorkerCore {
private final AtomicInteger noOfFetchedJobs;
private final AtomicInteger noOfFinishedJobs;
private final AtomicInteger noOfFaildJobs;
@SuppressWarnings({"unused", "PMD.SingularField"})

@SuppressWarnings({ "unused", "PMD.SingularField" })
private final NumberGauge maxJobsGauge;
@SuppressWarnings({"unused", "PMD.SingularField"})
@SuppressWarnings({ "unused", "PMD.SingularField" })
private final NumberGauge noOfRunningJobsGauge;

private PluginManager pluginManager;
Expand All @@ -67,14 +67,14 @@ public class WorkerCore {
// Constructor ------------------------------------------------------------

public WorkerCore() {
this.workerId = Configuration.getProperty("backmeup.worker.id");
this.workerId = Configuration.getProperty("backmeup.worker.appId");
if (StringUtils.isEmpty(this.workerId)) {
throw new WorkerException("Worker id is not set");
}
this.workerSecret = Configuration.getProperty("backmeup.worker.secret");
if (StringUtils.isEmpty(workerSecret)) {
throw new WorkerException("Worker id is not set");

this.workerSecret = Configuration.getProperty("backmeup.worker.appSecret");
if (StringUtils.isEmpty(this.workerSecret)) {
throw new WorkerException("Worker secret is not set");
}

String wName = Configuration.getProperty("backmeup.worker.name");
Expand All @@ -90,10 +90,10 @@ public WorkerCore() {
}

this.maxWorkerThreads = Integer.parseInt(Configuration.getProperty("backmeup.worker.maxParallelJobs"));
this.maxJobsGauge = new NumberGauge(MonitorConfig.builder("maxJobs").build(), maxWorkerThreads);
this.maxJobsGauge = new NumberGauge(MonitorConfig.builder("maxJobs").build(), this.maxWorkerThreads);

this.noOfRunningJobs = new AtomicInteger(0);
this.noOfRunningJobsGauge = new NumberGauge(MonitorConfig.builder("runningJobs").build(), noOfRunningJobs);
this.noOfRunningJobsGauge = new NumberGauge(MonitorConfig.builder("runningJobs").build(), this.noOfRunningJobs);
this.noOfFetchedJobs = new AtomicInteger(0);
this.noOfFinishedJobs = new AtomicInteger(0);
this.noOfFaildJobs = new AtomicInteger(0);
Expand All @@ -105,10 +105,10 @@ public WorkerCore() {

BlockingQueue<Runnable> jobQueue = new ArrayBlockingQueue<>(this.maxWorkerThreads);
ThreadFactory threadFactory = Executors.defaultThreadFactory();
this.executorPool = new ObservableThreadPoolExecutor(this.maxWorkerThreads, this.maxWorkerThreads, 10,
TimeUnit.SECONDS, jobQueue, threadFactory);
Monitors.registerObject(workerId.toString(), this);
this.executorPool = new ObservableThreadPoolExecutor(this.maxWorkerThreads, this.maxWorkerThreads, 10, TimeUnit.SECONDS, jobQueue,
threadFactory);

Monitors.registerObject(this.workerId.toString(), this);

this.initialized = false;
setCurrentState(WorkerState.OFFLINE);
Expand Down Expand Up @@ -148,14 +148,14 @@ public int getNoOfFailedJobs() {

public void initialize() {
LOGGER.info("Authenticate backmeup-worker");
bmuServiceClient.authenticateWorker(this.workerId, this.workerSecret);
this.bmuServiceClient.authenticateWorker(this.workerId, this.workerSecret);

LOGGER.info("Initializing backmeup-worker");
boolean errorsDuringInit = false;

WorkerInfoDTO workerInfo = getWorkerInfo();
WorkerConfigDTO resp = getWorkerConfig(workerInfo, WORKER_CONFIG_TIMEOUT_SECONDS, TimeUnit.SECONDS);

try {
this.backupName = resp.getBackupNameTemplate();

Expand All @@ -166,13 +166,12 @@ public void initialize() {

this.jobReceiver = new RabbitMQJobReceiver(mqHost, mqName, 500);
this.jobReceiver.initialize();
this.jobReceiver
.addJobReceivedListener(new JobReceivedListener() {
@Override
public void jobReceived(JobReceivedEvent jre) {
executeBackupJob(jre);
}
});
this.jobReceiver.addJobReceivedListener(new JobReceivedListener() {
@Override
public void jobReceived(JobReceivedEvent jre) {
executeBackupJob(jre);
}
});
} else {
// DistributionMechanism not supported
errorsDuringInit = true;
Expand Down Expand Up @@ -219,9 +218,9 @@ public void afterExecute(Runnable r, Throwable t) {
jobThreadAterExecute(r, t);
}
});

if (Boolean.parseBoolean(Configuration.getProperty("backmeup.worker.publishMetrics", "false"))) {
PerformanceMonitor.initialize(bmuServiceClient);
PerformanceMonitor.initialize(this.bmuServiceClient);
} else {
PerformanceMonitor.initialize();
}
Expand All @@ -236,7 +235,7 @@ public void start() {
if (!this.initialized) {
throw new WorkerException("Worker not initialized");
}

PerformanceMonitor.startPublishing();

this.jobReceiver.start();
Expand All @@ -259,8 +258,8 @@ private void executeBackupJob(JobReceivedEvent jre) {
}

Long jobId = jre.getJobId();
Runnable backupJobWorker = new BackupJobWorkerThread(jobId, this.pluginManager, this.bmuServiceClient,
this.jobTempDir, this.backupName);
Runnable backupJobWorker = new BackupJobWorkerThread(jobId, this.pluginManager, this.bmuServiceClient, this.jobTempDir,
this.backupName);
this.executorPool.execute(backupJobWorker);
}

Expand Down Expand Up @@ -298,16 +297,16 @@ private WorkerInfoDTO getWorkerInfo() {

return workerInfo;
}

private WorkerConfigDTO getWorkerConfig(WorkerInfoDTO workerInfo, long timeout, TimeUnit timeUnit) {
LOGGER.info("Obtaining worker config");

int retries = 0;
final int sleepTime = 1000;
final long startTime = System.currentTimeMillis();
final long abortTime = timeUnit.toMillis(timeout);
WorkerConfigDTO config = null;

while ((config == null) && ((System.currentTimeMillis() - startTime) < abortTime)) {
try {
if (retries != 0) {
Expand All @@ -334,7 +333,6 @@ private WorkerConfigDTO getWorkerConfig(WorkerInfoDTO workerInfo, long timeout,
throw new WorkerException("Failed obtaining worker configuration");
}
}


// Nested classes and enums -----------------------------------------------

Expand Down

0 comments on commit a8cc1b0

Please sign in to comment.