Skip to content

Commit

Permalink
Draft dummy 1 local machine 2 peers to buy and sell
Browse files Browse the repository at this point in the history
  • Loading branch information
hoang-ho committed Feb 28, 2021
0 parents commit f724702
Show file tree
Hide file tree
Showing 11 changed files with 816 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.idea/
.gradle/
build/
230 changes: 230 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
# Lab 1

The Service has the following RPC calls

```
service MarketPlace {
rpc lookup(BuyRequest) returns (Ack);
rpc reply(SellerId) returns (Ack);
rpc buy(PeerId) returns (Ack);
}
message Ack {
string message = 1;
}
message BuyRequest {
string productName = 1;
int32 hopCount = 2;
repeated PeerId peer = 3;
}
message SellerId {
string IPAddress = 1;
int32 port = 2;
repeated PeerId peer = 3;
}
message PeerId {
string IPAddress = 1;
int32 port = 2;
}
```

For each RPC call, the caller have the pass in the required parameter and the callee will return an acknowledge message immediately (except for the buy RPC) and then perform further processing

The code for all peers are in main/java/com/p2p/grpc/Peer.java. We have Runner class in Runner.java to simulate the marketplace service

```java
public class Runner {
public static void main(String[] args) {
// Initialize the peer
Peer peer1 = new Peer(1, "localhost",8080);
Peer peer2 = new Peer(2, "localhost",8081);
peer1.setNeighbor(peer2);
peer2.setNeighbor(peer1);

// create two threads to run the peer
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
peer1.run(3, 0, "fish");
}
});

Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
peer2.run(-1, 0, "fish");
}
});

t1.start();
t2.start();
}
}
```

The run function for peer is as follow:

```java
public class Peer {
...
public void run(int amountSell, int amountBuy, String product) {
this.stock = amountSell;
this.amountSell = this.stock;
this.amountBuy = amountBuy;
this.product = product;
Server server =
ServerBuilder.forPort(port).addService(new MarketPlaceImpl()).executor(Executors.newFixedThreadPool(10)).build();

try {
server.start();
logger.info("starting a server at " + IPAddress + " " + port);
// start buying if we are buyer
if (this.stock < 0) {
// perform lookup
Thread t = new Thread(new Runnable() {
@Override
public void run() {
ManagedChannel channel = ManagedChannelBuilder.forAddress(neighbors.getIPAddress(),
neighbors.getPort()).usePlaintext().build();
try {
MarketPlaceGrpc.MarketPlaceBlockingStub stub = MarketPlaceGrpc.newBlockingStub(channel);
PeerId peerId = PeerId.newBuilder().setIPAddress(IPAddress).setPort(port).build();
BuyRequest request =
BuyRequest.newBuilder().setProductName(product).setHopCount(1).addPeer(peerId).build();
logger.info(port + " send a buy request to " + neighbors.IPAddress + " " + neighbors.port);
stub.lookup(request);
} finally {
channel.shutdown();
}
}
});
t.start();
}
server.awaitTermination();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
...
}
```

Here, I create a server to listen for RPC, and for the buyer peer (stock < 0), I create a client inside a thread to perform the lookup RPC call. See that both server and client are created with ThreadPool. How the callee will handle each RPC is as follow:

```java
private class MarketPlaceImpl extends MarketPlaceGrpc.MarketPlaceImplBase {
@Override
public void lookup(BuyRequest request, StreamObserver<Ack> streamObserver) {
logger.info("Receive lookup request at " + port);
streamObserver.onNext(Ack.newBuilder().setMessage("Ack").build());
streamObserver.onCompleted();
lookupHelper(request);
}

@Override
public void reply(SellerId message, StreamObserver<Ack> streamObserver) {
logger.info("Receive reply request at " + port);
streamObserver.onNext(Ack.newBuilder().setMessage("Ack").build());
streamObserver.onCompleted();
replyHelper(message);
}

@Override
public void buy(PeerId seller, StreamObserver<Ack> streamObserver) {
logger.info("Receive a buy request at " + port);
buyHelper();
streamObserver.onNext(Ack.newBuilder().setMessage("Ack").build());
streamObserver.onCompleted();
}
}
```

The callee will return an acknowledgement message to the lookup RPC and perform further processing. How that processing works is as follow:

```java
public class Peer {
...
private void lookupHelper(BuyRequest request) {
// if peer is a seller and is selling the product, reply to the caller
if (amountSell > 0 && request.getProductName().equals(product)) {
// RPC reply
int lastIdx = request.getPeerCount() - 1;
String IPAddress = request.getPeer(lastIdx).getIPAddress();
int lastPort = request.getPeer(lastIdx).getPort();
List<PeerId> peers = new ArrayList<>(request.getPeerList());
peers.remove(lastIdx);
SellerId message =
SellerId.newBuilder().setIPAddress(this.IPAddress).setPort(port).addAllPeer(peers).build();

ManagedChannel channel = ManagedChannelBuilder.forAddress(IPAddress, lastPort).usePlaintext().build();
try {
MarketPlaceGrpc.MarketPlaceBlockingStub stub = MarketPlaceGrpc.newBlockingStub(channel);
logger.info("Send reply request at " + port);
Ack acknowledge = stub.reply(message);
} finally {
channel.shutdown();
}
}
// else pass the message to its neighbor, i.e., flooding
}
...
}
```

The lookupHelper will decide whether to reply or to flood. For milestone 1, lookup RPC's callee only need to reply, so its creates a client and perform the RPC reply(SellerId), now it becomes the reply RPC's caller. The reply RPC's callee will immediately return an acknowledgement message and perform further processing via replyHelper():

```java
public class Peer {
private void replyHelper(SellerId message) {
// if the message arrives to the original sender
if (message.getPeerList().size() == 0) {
// open up a thread to send a buy request
String sellerIPAddress = message.getIPAddress();
int sellerPort = message.getPort();
PeerId peer = PeerId.newBuilder().setIPAddress(IPAddress).setPort(port).build();
ManagedChannel channel = ManagedChannelBuilder.forAddress(sellerIPAddress, sellerPort).usePlaintext().build();
try {
MarketPlaceGrpc.MarketPlaceBlockingStub stub = MarketPlaceGrpc.newBlockingStub(channel);
logger.info("Send buy request at " + port);
Ack acknowledge = stub.buy(peer);
// increment your amount buy this need to be synchronized
this.amountBuy += 1;
} finally {
channel.shutdown();
}
}
// else open up a thread pass the message back the path
}
}
```

For milestone 1, we don't need to traverse back the path, so the reply(SellerId) arrives at the original sender, and the sender will further call a buy RPC. For the buyHelper, we in fact just need to decrement the amount in the stock and restock if needed, so no further RPC is needed. Once the buyHelper() finish, the callee return an acknowledgement message to the caller.

To run:
```
$ ./gradlew clean build
```

This is to generate the code for gRPC and to compile the code. Then run the Runner.java file. The log should be as follow:

```
Feb 28, 2021 1:24:45 AM com.p2p.grpc.Peer run
INFO: starting a server at localhost 8080
Feb 28, 2021 1:24:45 AM com.p2p.grpc.Peer run
INFO: starting a server at localhost 8081
Feb 28, 2021 1:24:45 AM com.p2p.grpc.Peer$1 run
INFO: 8081 send a buy request to localhost 8080
Feb 28, 2021 1:24:46 AM com.p2p.grpc.Peer$MarketPlaceImpl lookup
INFO: Receive lookup request at 8080
Feb 28, 2021 1:24:46 AM com.p2p.grpc.Peer lookupHelper
INFO: Send reply request at 8080
Feb 28, 2021 1:24:46 AM com.p2p.grpc.Peer$MarketPlaceImpl reply
INFO: Receive reply request at 8081
Feb 28, 2021 1:24:46 AM com.p2p.grpc.Peer replyHelper
INFO: Send buy request at 8081
Feb 28, 2021 1:24:46 AM com.p2p.grpc.Peer$MarketPlaceImpl buy
INFO: Receive a buy request at 8080
Feb 28, 2021 1:24:46 AM com.p2p.grpc.Peer replyHelper
INFO: Buying fish succeeds.
```
71 changes: 71 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.14'
}
}

plugins {
id 'java'
id "com.google.protobuf" version "0.8.15"
id 'idea'
}

def grpcVersion = '1.36.0' // CURRENT_GRPC_VERSION
def protobufVersion = '3.12.0'
def protocVersion = protobufVersion
def nettyTcNativeVersion = '2.0.26.Final'


group 'org.example'
version '1.0-SNAPSHOT'

repositories {
mavenCentral()
}


dependencies {
compile "io.grpc:grpc-netty-shaded:${grpcVersion}"
compile "io.grpc:grpc-protobuf:${grpcVersion}"
compile "io.grpc:grpc-stub:${grpcVersion}"
compile group: 'com.google.protobuf', name: 'protobuf-java', version: '3.14.0'
compile group: 'com.google.protobuf', name: 'protobuf-java-util', version: '3.14.0'

compile 'com.google.protobuf:protobuf-gradle-plugin:0.8.14'

compileOnly "javax.annotation:javax.annotation-api:1.2"

implementation "com.google.protobuf:protobuf-java-util:${protobufVersion}"

runtimeOnly "io.netty:netty-tcnative-boringssl-static:${nettyTcNativeVersion}"
testImplementation "io.grpc:grpc-testing:${grpcVersion}"
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.6.0'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
}

sourceSets {
main {
java {
srcDirs 'build/generated/source/proto/main/grpc'
srcDirs 'build/generated/source/proto/main/java'
}
}
}


protobuf {
protoc { artifact = "com.google.protobuf:protoc:${protocVersion}" }
plugins {
grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" }
}
generateProtoTasks {
all()*.plugins { grpc {} }
}
}

test {
useJUnitPlatform()
}
Binary file added gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
5 changes: 5 additions & 0 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.7-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
Loading

0 comments on commit f724702

Please sign in to comment.