Skip to content

Commit

Permalink
Changing VirtualDisk to use Websockets
Browse files Browse the repository at this point in the history
  • Loading branch information
DaNussi committed Mar 27, 2024
1 parent 049553e commit e3268a6
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 272 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,6 @@ public void onStart() {
return;
}

boolean initRabbitMqSuccess = initRabbitmq();
if (!initRabbitMqSuccess) {
onStop();
return;
}

LOGGER.warn("Started virtual drive client " + channel);
}

Expand All @@ -73,10 +67,6 @@ private boolean initRedis() {
}
}

private boolean initRabbitmq() {
return true;
}


public String getChannel() {
return channel;
Expand All @@ -97,176 +87,21 @@ public boolean isPreferredStorageFor(AEKey what, IActionSource source) {

@Override
public long insert(AEKey what, long amount, Actionable mode, IActionSource source) {
try {
InsertRequest action = new InsertRequest(what, amount, mode);

String replyQueueName = rabbitmq.queueDeclare().getQueue();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(action.getId()).replyTo(replyQueueName).build();


CompletableFuture<InsertResponse> responseFuture = new CompletableFuture<>();
responseFuture.completeOnTimeout(new InsertResponse("", false, 0), 5, TimeUnit.SECONDS);
DeliverCallback insertCallback = ((consumerTag, delivery) -> {
if (!delivery.getProperties().getCorrelationId().equals(action.getId())) return;
InsertResponse response = new InsertResponse(action.getId(), false, 0);
try {
response = new InsertResponse(delivery.getBody());
} catch (Exception e) {
LOGGER.error("Failed to pares response");
e.printStackTrace();
}
responseFuture.complete(response);
});

String ctag = rabbitmq.basicConsume(replyQueueName, true, insertCallback, consumerTag -> {});

// LOGGER.info("Calling action " + action);
rabbitmq.basicPublish("", channel + "/insert", props, action.toBytes());

InsertResponse response = responseFuture.get();
rabbitmq.basicCancel(ctag);
// LOGGER.info("Callback for action " + response);
if (response.isSuccess()) {
return response.data;
} else {
LOGGER.error("Request was unsuccessful " + response);
return 0;
}

} catch (Exception e) {
e.printStackTrace();
return 0;
}
return 0;
}

@Override
public long extract(AEKey what, long amount, Actionable mode, IActionSource source) {
try {
ExtractRequest action = new ExtractRequest(what, amount, mode);

String replyQueueName = rabbitmq.queueDeclare().getQueue();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(action.getId()).replyTo(replyQueueName).build();

CompletableFuture<ExtractResponse> responseFuture = new CompletableFuture<>();
responseFuture.completeOnTimeout(new ExtractResponse("", false, 0), 5, TimeUnit.SECONDS);
DeliverCallback insertCallback = ((consumerTag, delivery) -> {
if (!delivery.getProperties().getCorrelationId().equals(action.getId())) return;
ExtractResponse response = new ExtractResponse(action.getId(), false, 0);
try {
response = new ExtractResponse(delivery.getBody());
} catch (Exception e) {
LOGGER.error("Failed to pares response");
e.printStackTrace();
}
responseFuture.complete(response);
});

String ctag = rabbitmq.basicConsume(replyQueueName, true, insertCallback, consumerTag -> {
});

// LOGGER.info("Calling action " + action);
rabbitmq.basicPublish("", channel + "/extract", props, action.toBytes());

ExtractResponse response = responseFuture.get();
rabbitmq.basicCancel(ctag);
// LOGGER.info("Callback for action " + response);
if (response.isSuccess()) {
return response.data;
} else {
LOGGER.error("Request was unsuccessful " + response);
return 0;
}

} catch (Exception e) {
e.printStackTrace();
return 0;
}
return 0;
}

@Override
public void getAvailableStacks(KeyCounter out) {
try {
AvailableStacksRequest action = new AvailableStacksRequest();

String replyQueueName = rabbitmq.queueDeclare().getQueue();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(action.getId()).replyTo(replyQueueName).build();

CompletableFuture<AvailableStacksResponse> responseFuture = new CompletableFuture<>();
responseFuture.completeOnTimeout(new AvailableStacksResponse("", false), 5, TimeUnit.SECONDS);
DeliverCallback insertCallback = ((consumerTag, delivery) -> {
if (!delivery.getProperties().getCorrelationId().equals(action.getId())) return;
try {
AvailableStacksResponse response = new AvailableStacksResponse(delivery.getBody());
responseFuture.complete(response);
} catch (Exception e) {
LOGGER.error("Failed to pares response");
e.printStackTrace();
AvailableStacksResponse response = new AvailableStacksResponse(action.getId(), false);
responseFuture.complete(response);
}
});

String ctag = rabbitmq.basicConsume(replyQueueName, true, insertCallback, consumerTag -> {
});

// LOGGER.info("Calling action " + action);
rabbitmq.basicPublish("", channel + "/getAvailableStacks", props, action.toBytes());

AvailableStacksResponse response = responseFuture.get();
rabbitmq.basicCancel(ctag);
// LOGGER.info("Callback for action " + response);
if (response.isSuccess()) {
response.getAvailableStacks(out);
} else {
LOGGER.error("Request was unsuccessful " + response);
}

} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public Component getDescription() {
try {
DescriptionRequest action = new DescriptionRequest();

String replyQueueName = rabbitmq.queueDeclare().getQueue();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(action.getId()).replyTo(replyQueueName).build();

CompletableFuture<DescriptionResponse> responseFuture = new CompletableFuture<>();
responseFuture.completeOnTimeout(new DescriptionResponse("", false, ""), 5, TimeUnit.SECONDS);
DeliverCallback insertCallback = ((consumerTag, delivery) -> {
if (!delivery.getProperties().getCorrelationId().equals(action.getId())) return;
DescriptionResponse response = new DescriptionResponse(action.getId(), false, "");
try {
response = new DescriptionResponse(delivery.getBody());
} catch (Exception e) {
LOGGER.error("Failed to pares response");
e.printStackTrace();
}
responseFuture.complete(response);
});

String ctag = rabbitmq.basicConsume(replyQueueName, true, insertCallback, consumerTag -> {
});

// LOGGER.info("Calling action " + action);
rabbitmq.basicPublish("", channel + "/extract", props, action.toBytes());

DescriptionResponse response = responseFuture.get();
rabbitmq.basicCancel(ctag);
// LOGGER.info("Callback for action " + response);
if (response.isSuccess()) {
return Component.literal(response.getDescription());
} else {
LOGGER.error("Request was unsuccessful " + response);
return Component.literal("DAE2 encountered an error!");
}

} catch (Exception e) {
e.printStackTrace();
return Component.literal("DAE2 encountered an error!");
}
return Component.literal("DAE2 encountered an error!");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public class VirtualDiskHost {
private InterDimensionalInterfaceBlockEntity instance;

private Jedis redis;
private Channel rabbitmq;
private String channel;

public MEStorage storage;
Expand All @@ -52,9 +51,7 @@ public VirtualDiskHost(MEStorage storage, int priority, InterDimensionalInterfac

public void onStart() {
this.redis = instance.getRedis();
this.rabbitmq = instance.getRabbitmq();

this.initRabbitMQ();
this.updateRedis();

LOGGER.warn("Started virtual drive host " + channel);
Expand All @@ -65,107 +62,6 @@ public void onStop() {
LOGGER.warn("Stopped virtual drive host " + channel);
}

private void initRabbitMQ() {
try {
rabbitmq.queueDeclare(channel + "/insert", false, false, false, null);
rabbitmq.queueDeclare(channel + "/extract", false, false, false, null);
rabbitmq.queueDeclare(channel + "/isPreferredStorageFor", false, false, false, null);
rabbitmq.queueDeclare(channel + "/getAvailableStacks", false, false, false, null);
rabbitmq.queueDeclare(channel + "/getDescription", false, false, false, null);


DeliverCallback insertCallback = ((consumerTag, delivery) -> {
var response = new InsertResponse(delivery.getProperties().getCorrelationId(), false, 0);
try {
var request = new InsertRequest(delivery.getBody());
// LOGGER.info("Incomming request " + request);
var pair = new InsertPair(request);
var callback = pair.getResponseFuture();
insertQueue.add(pair);
response = callback.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
// LOGGER.info("Outgoin response " + response);

AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(delivery.getProperties().getCorrelationId()).build();
rabbitmq.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.toBytes());
rabbitmq.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
});
rabbitmq.basicConsume(channel + "/insert", false, insertCallback, (consumerTag -> {
}));


DeliverCallback extractCallback = ((consumerTag, delivery) -> {
var response = new ExtractResponse(delivery.getProperties().getCorrelationId(), false, 0);
try {
var request = new ExtractRequest(delivery.getBody());
// LOGGER.info("Incomming request " + request);
var pair = new ExtractPair(request);
var callback = pair.getResponseFuture();
extractQueue.add(pair);
response = callback.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
// LOGGER.info("Outgoin response " + response);

AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(delivery.getProperties().getCorrelationId()).build();
rabbitmq.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.toBytes());
rabbitmq.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
});
rabbitmq.basicConsume(channel + "/extract", false, extractCallback, (consumerTag -> {
}));


DeliverCallback availableStacksCallback = ((consumerTag, delivery) -> {
var response = new AvailableStacksResponse(delivery.getProperties().getCorrelationId(), false);
try {
var request = new AvailableStacksRequest(delivery.getBody());
// LOGGER.info("Incomming request " + request);
var pair = new AvailableStacksPair(request);
var callback = pair.getResponseFuture();
availableStacksQueue.add(pair);
response = callback.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
// LOGGER.info("Outgoin response " + response);

AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(delivery.getProperties().getCorrelationId()).build();
rabbitmq.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.toBytes());
rabbitmq.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
});
rabbitmq.basicConsume(channel + "/getAvailableStacks", false, availableStacksCallback, (consumerTag -> {}));


DeliverCallback descriptionCallback = ((consumerTag, delivery) -> {
var response = new DescriptionResponse(delivery.getProperties().getCorrelationId(), false, "");
try {
var request = new DescriptionRequest(delivery.getBody());
// LOGGER.info("Incomming request " + request);
var pair = new DescriptionPair(request);
var callback = pair.getResponseFuture();
descriptionQueue.add(pair);
response = callback.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
// LOGGER.info("Outgoin response " + response);

AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(delivery.getProperties().getCorrelationId()).build();
rabbitmq.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.toBytes());
rabbitmq.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
});
rabbitmq.basicConsume(channel + "/getDescription", false, descriptionCallback, (consumerTag -> {
}));

} catch (Exception e) {
LOGGER.warn("Failed to declare rabbit mq queue for " + channel);
e.printStackTrace();
}
}

private void updateRedis() {
this.redis.hset(VirtualDiskInfo.REDIS_PATH, channel, this.info.toString());
}
Expand Down

0 comments on commit e3268a6

Please sign in to comment.