Skip to content

Commit

Permalink
Remove ProducerCompletionCallback. Remove timer, see https://stackove…
Browse files Browse the repository at this point in the history
  • Loading branch information
bernardladenthin committed Mar 19, 2024
1 parent d98c024 commit a3d074c
Show file tree
Hide file tree
Showing 19 changed files with 284 additions and 228 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,16 @@ public abstract class AbstractProducer implements Producer {
protected final Consumer consumer;
protected final KeyUtility keyUtility;
protected final SecretFactory secretFactory;
protected final ProducerCompletionCallback producerCompletionCallback;

protected volatile ProducerState state = ProducerState.UNINITIALIZED;

protected final AtomicBoolean shouldRun = new AtomicBoolean(true);

public AbstractProducer(CProducer cProducer, Consumer consumer, KeyUtility keyUtility, SecretFactory secretFactory, ProducerCompletionCallback producerCompletionCallback) {
public AbstractProducer(CProducer cProducer, Consumer consumer, KeyUtility keyUtility, SecretFactory secretFactory) {
this.cProducer = cProducer;
this.consumer = consumer;
this.keyUtility = keyUtility;
this.secretFactory = secretFactory;
this.producerCompletionCallback = producerCompletionCallback;
}

@Override
Expand All @@ -70,7 +68,6 @@ public void run() {
}
}
state = ProducerState.NOT_RUNNING;
producerCompletionCallback.producerFinished();
}

@Override
Expand Down
39 changes: 19 additions & 20 deletions src/main/java/net/ladenthin/bitcoinaddressfinder/ConsumerJava.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
Expand All @@ -51,7 +51,8 @@ public class ConsumerJava implements Consumer {
* We assume a queue might be empty after this amount of time.
* If not, some keys in the queue are not checked before shutdow.
*/
static final Duration DURATION_WAIT_QUEUE_EMPTY = Duration.ofMinutes(1);
@VisibleForTesting
static Duration AWAIT_DURATION_QUEUE_EMPTY = Duration.ofMinutes(1);

/**
* The duration for a cyclic check to test the keys queue is empty.
Expand All @@ -73,7 +74,8 @@ public class ConsumerJava implements Consumer {
protected long startTime = 0;

protected final CConsumerJava consumerJava;
protected final Timer timer = new Timer();
@VisibleForTesting
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

protected Persistence persistence;
private final PersistenceUtils persistenceUtils;
Expand All @@ -85,7 +87,8 @@ public class ConsumerJava implements Consumer {
protected final AtomicLong vanityHits = new AtomicLong();
private final Pattern vanityPattern;

private final AtomicBoolean shouldRun = new AtomicBoolean(true);
@VisibleForTesting
final AtomicBoolean shouldRun = new AtomicBoolean(true);

protected ConsumerJava(CConsumerJava consumerJava, KeyUtility keyUtility, PersistenceUtils persistenceUtils) {
this.consumerJava = consumerJava;
Expand Down Expand Up @@ -113,25 +116,22 @@ protected void initLMDB() {
}

protected void startStatisticsTimer() {
long period = consumerJava.printStatisticsEveryNSeconds * Statistics.ONE_SECOND_IN_MILLISECONDS;
long period = consumerJava.printStatisticsEveryNSeconds;
if (period <= 0) {
throw new IllegalArgumentException("period must be greater than 0.");
}

startTime = System.currentTimeMillis();

timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
// get transient information
long uptime = Math.max(System.currentTimeMillis() - startTime, 1);
scheduledExecutorService.scheduleAtFixedRate(() -> {
// get transient information
long uptime = Math.max(System.currentTimeMillis() - startTime, 1);

String message = new Statistics().createStatisticsMessage(uptime, checkedKeys.get(), checkedKeysSumOfTimeToCheckContains.get(), emptyConsumer.get(), keysQueue.size(), hits.get());
String message = new Statistics().createStatisticsMessage(uptime, checkedKeys.get(), checkedKeysSumOfTimeToCheckContains.get(), emptyConsumer.get(), keysQueue.size(), hits.get());

// log the information
logger.info(message);
}
}, period, period);
// log the information
logger.info(message);
}, period, period, TimeUnit.SECONDS);
}

@Override
Expand Down Expand Up @@ -324,8 +324,7 @@ public void consumeKeys(PublicKeyBytes[] publicKeyBytes) throws InterruptedExcep
* Returns if the consume was finished or.
* @return {@code true} if the keys queue is empty, otherwise {@code false}.
*/
@VisibleForTesting
boolean waitTillKeysQueueEmpty(Duration maxWait) throws InterruptedException {
public boolean awaitKeysQueueEmpty(Duration maxWait) throws InterruptedException {
final long startTime = System.currentTimeMillis();
do {
if (keysQueue.isEmpty()) {
Expand All @@ -340,11 +339,11 @@ boolean waitTillKeysQueueEmpty(Duration maxWait) throws InterruptedException {
public void interrupt() {
try {
// the result does not matter, just try to wait some seconds to empty the queue
waitTillKeysQueueEmpty(DURATION_WAIT_QUEUE_EMPTY);
awaitKeysQueueEmpty(AWAIT_DURATION_QUEUE_EMPTY);
} catch (InterruptedException ex) {
// do nothing, it is no problem
}
shouldRun.set(false);
timer.cancel();
scheduledExecutorService.shutdown();
}
}
82 changes: 35 additions & 47 deletions src/main/java/net/ladenthin/bitcoinaddressfinder/Finder.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import net.ladenthin.bitcoinaddressfinder.configuration.CProducerJava;
Expand All @@ -43,17 +41,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Finder implements Interruptable, ProducerCompletionCallback, SecretFactory {
public class Finder implements Interruptable, SecretFactory {

/**
* Wait for 100 thousand years is enough.
* We must define a maximum time to wait for terminate. Wait for 100 thousand years is enough.
*/
private static final long MAXIMUM_RUNTIME = 365L * 100_000L;
@VisibleForTesting
static Duration AWAIT_DURATION_TERMINATE = Duration.ofDays(365L * 1000L);

protected Logger logger = LoggerFactory.getLogger(this.getClass());

private final CFinder finder;
private final Shutdown shutdown;

private final List<ProducerOpenCL> openCLProducers = new ArrayList<>();
private final List<ProducerJava> javaProducers = new ArrayList<>();
Expand All @@ -64,9 +62,8 @@ public class Finder implements Interruptable, ProducerCompletionCallback, Secret
*/
private final Random random;

private final ExecutorService producerExecutorService = Executors.newCachedThreadPool();
@VisibleForTesting
final Map<Producer, Future<?>> producerFuture = new HashMap<>();
final ExecutorService producerExecutorService = Executors.newCachedThreadPool();

private final NetworkParameters networkParameters = MainNetParams.get();
private final KeyUtility keyUtility = new KeyUtility(networkParameters, new ByteBufferUtility(false));
Expand All @@ -75,9 +72,8 @@ public class Finder implements Interruptable, ProducerCompletionCallback, Secret
@Nullable
private ConsumerJava consumerJava;

public Finder(CFinder finder, Shutdown shutdown) {
public Finder(CFinder finder) {
this.finder = finder;
this.shutdown = shutdown;
try {
random = SecureRandom.getInstanceStrong();
} catch (NoSuchAlgorithmException e) {
Expand All @@ -98,23 +94,23 @@ public void configureProducer() {
if (finder.producerJava != null) {
for (CProducerJava cProducerJava : finder.producerJava) {
cProducerJava.assertGridNumBitsCorrect();
ProducerJava producerJava = new ProducerJava(cProducerJava, consumerJava, keyUtility, this, this);
ProducerJava producerJava = new ProducerJava(cProducerJava, consumerJava, keyUtility, this);
javaProducers.add(producerJava);
}
}

if (finder.producerJavaSecretsFiles != null) {
for (CProducerJavaSecretsFiles cProducerJavaSecretsFiles : finder.producerJavaSecretsFiles) {
cProducerJavaSecretsFiles.assertGridNumBitsCorrect();
ProducerJavaSecretsFiles producerJavaSecretsFiles = new ProducerJavaSecretsFiles(cProducerJavaSecretsFiles, consumerJava, keyUtility, this, this);
ProducerJavaSecretsFiles producerJavaSecretsFiles = new ProducerJavaSecretsFiles(cProducerJavaSecretsFiles, consumerJava, keyUtility, this);
javaProducersSecretsFiles.add(producerJavaSecretsFiles);
}
}

if (finder.producerOpenCL != null) {
for (CProducerOpenCL cProducerOpenCL : finder.producerOpenCL) {
cProducerOpenCL.assertGridNumBitsCorrect();
ProducerOpenCL producerOpenCL = new ProducerOpenCL(cProducerOpenCL, consumerJava, keyUtility, this, this);
ProducerOpenCL producerOpenCL = new ProducerOpenCL(cProducerOpenCL, consumerJava, keyUtility, this);
openCLProducers.add(producerOpenCL);
}
}
Expand All @@ -128,54 +124,38 @@ public void initProducer() {

public void startProducer() {
for (Producer producer : getAllProducers()) {
Future<?> future = producerExecutorService.submit(producer);
producerFuture.put(producer, future);
producerExecutorService.submit(producer);
}
}

public void awaitTermination() {
public void shutdownAndAwaitTermination() {
try {
producerExecutorService.awaitTermination(MAXIMUM_RUNTIME, TimeUnit.DAYS);
producerExecutorService.shutdown();
producerExecutorService.awaitTermination(AWAIT_DURATION_TERMINATE.get(ChronoUnit.SECONDS), TimeUnit.SECONDS);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}

// no producers are running anymore, the consumer can be interrupted
if (consumerJava != null) {
logger.info("Interrupt: " + consumerJava);
consumerJava.interrupt();
consumerJava = null;
}
logger.info("consumerJava released.");
}

@Override
public void interrupt() {
logger.info("Shut down, please wait for remaining tasks.");

logger.info("interrupt called: delegate interrupt to all producer");
for (Producer producer : getAllProducers()) {
logger.info("Interrupt: " + producer);
producer.interrupt();
producer.waitTillProducerNotRunning();
producer.releaseProducer();
}

if (consumerJava != null) {
consumerJava.interrupt();
}

logger.info("All producers released.");
}

@Override
public void producerFinished() {
// a signal that a producer finished its work
for (Producer producer : getAllProducers()) {
if (producer.getState() == ProducerState.RUNNING ){
break;
}
}
// no producers are running anymore
try {
if (consumerJava != null) {
consumerJava.waitTillKeysQueueEmpty(Duration.ofSeconds(30L));
}
} catch (InterruptedException ex) {
logger.warn("InterruptedException during waitTillKeysQueueEmpty", ex);
}
producerExecutorService.shutdown();
shutdown.shutdown();
freeAllProducers();
logger.info("All producers released and freed.");
}

public List<Producer> getAllProducers() {
Expand All @@ -186,9 +166,17 @@ public List<Producer> getAllProducers() {
return producers;
}

public void freeAllProducers() {
javaProducers.clear();
javaProducersSecretsFiles.clear();
openCLProducers.clear();
}

public List<Consumer> getAllConsumers() {
List<Consumer> consumers = new ArrayList<>();
consumers.add(consumerJava);
if (consumerJava != null) {
consumers.add(consumerJava);
}
return consumers;
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public class ProducerJava extends AbstractProducer {

protected final CProducerJava producerJava;

public ProducerJava(CProducerJava producerJava, Consumer consumer, KeyUtility keyUtility, SecretFactory secretFactory, ProducerCompletionCallback producerCompletionCallback) {
super(producerJava, consumer, keyUtility, secretFactory, producerCompletionCallback);
public ProducerJava(CProducerJava producerJava, Consumer consumer, KeyUtility keyUtility, SecretFactory secretFactory) {
super(producerJava, consumer, keyUtility, secretFactory);
this.producerJava = producerJava;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public class ProducerJavaSecretsFiles extends ProducerJava {
@NonNull
AtomicReference<SecretsFile> currentSecretsFile = new AtomicReference<>();

public ProducerJavaSecretsFiles(CProducerJavaSecretsFiles producerJavaSecretsFiles, Consumer consumer, KeyUtility keyUtility, SecretFactory secretFactory, ProducerCompletionCallback producerCompletionCallback) {
super(producerJavaSecretsFiles, consumer, keyUtility, secretFactory, producerCompletionCallback);
public ProducerJavaSecretsFiles(CProducerJavaSecretsFiles producerJavaSecretsFiles, Consumer consumer, KeyUtility keyUtility, SecretFactory secretFactory) {
super(producerJavaSecretsFiles, consumer, keyUtility, secretFactory);
this.producerJavaSecretsFiles = producerJavaSecretsFiles;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public class ProducerOpenCL extends AbstractProducer {
@Nullable
OpenCLContext openCLContext;

public ProducerOpenCL(CProducerOpenCL producerOpenCL, Consumer consumer, KeyUtility keyUtility, SecretFactory secretFactory, ProducerCompletionCallback producerCompletionCallback) {
super(producerOpenCL, consumer, keyUtility, secretFactory, producerCompletionCallback);
public ProducerOpenCL(CProducerOpenCL producerOpenCL, Consumer consumer, KeyUtility keyUtility, SecretFactory secretFactory) {
super(producerOpenCL, consumer, keyUtility, secretFactory);
this.producerOpenCL = producerOpenCL;
this.resultReaderThreadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(producerOpenCL.maxResultReaderThreads);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

public class Statistics {

@Deprecated
public static final int ONE_SECOND_IN_MILLISECONDS = 1000;

String createStatisticsMessage(long uptime, long keys, long keysSumOfTimeToCheckContains, long emptyConsumer, long keysQueueSize, long hits) {
Expand Down
Loading

0 comments on commit a3d074c

Please sign in to comment.