Skip to content

Commit

Permalink
Adding gRPC Kubernetes sample (#75)
Browse files Browse the repository at this point in the history
* Initialize gRPC and kubernetes example.

* Adding sample tests to the workflows.

* Feedback: Updating logback-classing version + scalaVersion.

* Reverting logback-classic to 1.2.x

* Reformatting following scalafmt.
  • Loading branch information
samueleresca authored Oct 2, 2023
1 parent 32260d2 commit 6da8087
Show file tree
Hide file tree
Showing 17 changed files with 512 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ jobs:

- name: Test pekko-sample-fsm-scala
run: cd pekko-sample-fsm-scala && sbt test

- name: Test pekko-sample-grpc-kubernetes-scala
run: cd pekko-sample-grpc-kubernetes-scala && ./scripts/setup-minikube-for-linux.sh && ./scripts/test.sh

- name: Test pekko-sample-persistence-java
run: cd pekko-sample-persistence-java && mvn test
Expand Down
86 changes: 86 additions & 0 deletions pekko-sample-grpc-kubernetes-scala/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Apache Pekko gRPC Kubernetes

This is an example of an Apache Pekko HTTP service communicating with a gRPC service. Both services are deployed to Kubernetes. The HTTP service uses [Kubernetes API discovery mechanism](https://pekko.apache.org/docs/pekko-management/current/discovery/index.html#discovery-method-kubernetes-api) to find the gRPC service and send messages to the gRPC service.

The Apache Pekko HTTP application discovers the gRPC application using [Akka Discovery](https://developer.lightbend.com/docs/akka-management/current/discovery.html).
It uses the `pekko-dns` mechanism which relies on the `SRV` records created by Kubernetes.
All the technologies used in this example are open source.

## Usage

### Prerequisites

Install the following:

* [Docker](https://docs.docker.com/install/)
* [Kubectl](https://kubernetes.io/docs/tasks/tools/install-kubectl/)
* [Minikube](https://github.com/kubernetes/minikube)
* [Sbt](https://www.scala-sbt.org/)

### Publish images

You can publish the Docker images of the `grpcservice` and `httptogrpc` applications. You can use the `sbt docker:publishLocal` command to publish the images to your local Docker registry.

### Running

Once minikube is running and ingress enabled with `minikube addons enable ingress`, the two applications can be deployed using:

`kubectl apply -f kubernetes/grpcservice.yml`

and

`kubectl apply -f kubernetes/httptogrpc.yml`

Verify the deployments:

```
$ kubectl get deployments
NAME DESIRED CURRENT UP-TO-DATE AVAILABLE AGE
grpcservice-v0-1-0-snapshot 1 1 1 1 40s
httptogrpc-v0-1-0-snapshot 1 1 1 1 40s
```

There are services for both:
```
$ kubectl get services
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
grpcservice ClusterIP 10.106.188.203 <none> 8080/TCP 1m
httptogrpc ClusterIP 10.103.134.197 <none> 8080/TCP 1m
```

Ingress just for the HTTP app:

```
$ kubectl get ingress
NAME HOSTS ADDRESS PORTS AGE
httptogrpc superservice.com 80 2m
```

The HTTP application periodically hits the gRPC application. You can view the logs of the HTTP application by running
```
$ kubectl logs service/httptogrpc
10/01 18:18:10 INFO [HttpToGrpc-pekko.actor.default-dispatcher-34] o.a.p.a.ActorSystemImpl - Scheduled say hello to chris
10/01 18:18:10 INFO [HttpToGrpc-pekko.actor.default-dispatcher-28] o.a.p.a.ActorSystemImpl - Scheduled say hello response Success(HelloReply(Hello, Christopher,UnknownFieldSet(Map())))
```

And you can send a HTTP request via `Ingress` to the `httptogrpc` application:

```
$ curl -v --header 'Host: superservice.com' $(minikube ip)/hello/donkey
> GET /hello/donkey HTTP/1.1
> Host: superservice.com
> User-Agent: curl/7.59.0
> Accept: */*
>
< HTTP/1.1 200 OK
< Server: nginx/1.13.12
< Date: Wed, 15 Aug 2018 07:03:56 GMT
< Content-Type: text/plain; charset=UTF-8
< Content-Length: 13
< Connection: keep-alive
<
* Connection #0 to host 192.168.99.100 left intact
Hello, donkey%
```

The `Host` header needs to be set as that is how minikube [Ingress](https://github.com/kubernetes/ingress-nginx) routes requests to services.
40 changes: 40 additions & 0 deletions pekko-sample-grpc-kubernetes-scala/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
name := "pekko-grpc-kubernetes"
scalaVersion := "2.13.11"

lazy val pekkoVersion = "1.0.1"
lazy val discoveryVersion = "1.0.0"
lazy val pekkoHttpVersion = "1.0.0"

lazy val root = (project in file("."))
.aggregate(httpToGrpc, grpcService)

// HTTP service that calls out to a gRPC back end
lazy val httpToGrpc = (project in file("http-to-grpc"))
.enablePlugins(PekkoGrpcPlugin, DockerPlugin, JavaAppPackaging)
.settings(
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-actor-typed" % pekkoVersion,
"org.apache.pekko" %% "pekko-slf4j" % pekkoVersion,
"org.apache.pekko" %% "pekko-discovery" % pekkoVersion,
"org.apache.pekko" %% "pekko-stream" % pekkoVersion,
"org.apache.pekko" %% "pekko-parsing" % pekkoHttpVersion,
"org.apache.pekko" %% "pekko-http-core" % pekkoHttpVersion,
"org.apache.pekko" %% "pekko-http" % pekkoHttpVersion,
"org.apache.pekko" %% "pekko-http-spray-json" % pekkoHttpVersion,
"org.apache.pekko" %% "pekko-discovery-kubernetes-api" % discoveryVersion,
"ch.qos.logback" % "logback-classic" % "1.2.12"),
dockerExposedPorts := Seq(8080))

// gRPC back end that echoes back messages
lazy val grpcService = (project in file("grpc-service"))
.enablePlugins(PekkoGrpcPlugin, DockerPlugin, JavaAppPackaging)
.settings(
dockerExposedPorts := Seq(8080),
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-actor" % pekkoVersion,
"org.apache.pekko" %% "pekko-actor-typed" % pekkoVersion,
"org.apache.pekko" %% "pekko-slf4j" % pekkoVersion,
"org.apache.pekko" %% "pekko-stream" % pekkoVersion,
"org.apache.pekko" %% "pekko-discovery" % pekkoVersion,
"org.apache.pekko" %% "pekko-http" % pekkoHttpVersion,
"ch.qos.logback" % "logback-classic" % "1.2.12"))
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
syntax = "proto3";

option java_multiple_files = true;
option java_package = "sample.grpckubernetes";
option java_outer_classname = "HelloWorldProto";

service GreeterService {
rpc SayHello (HelloRequest) returns (HelloReply) {}
rpc SayHelloToAll (stream HelloRequest) returns (stream HelloReply) {}
}

message HelloRequest {
string name = 1;
}

message HelloReply {
string message = 1;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pekko {
loglevel = "DEBUG"
http.server.preview.enable-http2 = on
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package sample.grpckubernetes

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.model.{ HttpRequest, HttpResponse }
import org.apache.pekko.http.scaladsl.{ Http, HttpConnectionContext }
import org.apache.pekko.stream.{ ActorMaterializer, Materializer }

import scala.concurrent.{ ExecutionContext, Future }

object GreeterServer {

def main(args: Array[String]): Unit = {
val system: ActorSystem = ActorSystem("GreeterServer")
new GreeterServer(system).run()
}
}

class GreeterServer(system: ActorSystem) {

def run(): Future[Http.ServerBinding] = {
implicit val sys: ActorSystem = system
implicit val mat: Materializer = Materializer(sys)
implicit val ec: ExecutionContext = sys.dispatcher

val service: HttpRequest => Future[HttpResponse] =
GreeterServiceHandler(new GreeterServiceImpl(mat, system.log))

val bound = Http().newServerAt("0.0.0.0", 8080).bind(service)

bound.foreach { binding =>
sys.log.info("gRPC server bound to: {}", binding.localAddress)
}

bound
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package sample.grpckubernetes

import scala.concurrent.Future
import org.apache.pekko.NotUsed
import org.apache.pekko.event.LoggingAdapter
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.BroadcastHub
import org.apache.pekko.stream.scaladsl.Keep
import org.apache.pekko.stream.scaladsl.MergeHub
import org.apache.pekko.stream.scaladsl.Sink
import org.apache.pekko.stream.scaladsl.Source

class GreeterServiceImpl(materializer: Materializer, log: LoggingAdapter) extends GreeterService {

private implicit val mat: Materializer = materializer

val (inboundHub: Sink[HelloRequest, NotUsed], outboundHub: Source[HelloReply, NotUsed]) =
MergeHub.source[HelloRequest]
.map(request => HelloReply(s"Hello, ${request.name}"))
.toMat(BroadcastHub.sink[HelloReply])(Keep.both)
.run()

override def sayHello(request: HelloRequest): Future[HelloReply] = {
log.info("sayHello {}", request)
Future.successful(HelloReply(s"Hello, ${request.name}"))
}

override def sayHelloToAll(in: Source[HelloRequest, NotUsed]): Source[HelloReply, NotUsed] = {
log.info("sayHelloToAll")
in.runWith(inboundHub)
outboundHub
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
syntax = "proto3";

option java_multiple_files = true;
option java_package = "sample.grpckubernetes";
option java_outer_classname = "HelloWorldProto";

service GreeterService {
rpc SayHello (HelloRequest) returns (HelloReply) {}

rpc SayHelloToAll (stream HelloRequest) returns (stream HelloReply) {}
}

message HelloRequest {
string name = 1;
}

message HelloReply {
string message = 1;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
pekko.grpc.client {
"helloworld.GreeterService" {
service-discovery {
mechanism = "pekko-dns"
service-name = "grpcservice.default.svc.cluster.local"
protocol = "tcp"
port-name = "http"
}
use-tls = false
}
}

pekko {
loglevel = DEBUG
discovery.method = pekko-dns
io.dns.resolver = async-dns
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>

<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<target>System.out</target>
<encoder>
<pattern>%date{MM/dd HH:mm:ss} %-5level[%thread] %logger{1} - %m%n%xException</pattern>
</encoder>
</appender>

<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
<queueSize>1024</queueSize>
<neverBlock>true</neverBlock>
<appender-ref ref="CONSOLE" />
</appender>

<root level="INFO">
<appender-ref ref="ASYNC"/>
</root>

</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package sample.grpckubernetes

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.event.LoggingAdapter
import org.apache.pekko.grpc.GrpcClientSettings
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.http.scaladsl.model._
import org.apache.pekko.http.scaladsl.server.Directives._
import org.apache.pekko.stream.{ ActorMaterializer, Materializer }

import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success }

object HttpToGrpc {

def main(args: Array[String]): Unit = {
implicit val system: ActorSystem = ActorSystem("HttpToGrpc")
implicit val mat: Materializer = Materializer(system)
implicit val ec: ExecutionContext = system.dispatcher
val log: LoggingAdapter = system.log

val settings = GrpcClientSettings.fromConfig("helloworld.GreeterService")
val client = GreeterServiceClient(settings)

system.scheduler.scheduleAtFixedRate(5.seconds, 5.seconds)(() => {
log.info("Scheduled say hello to chris")
val response: Future[HelloReply] = client.sayHello(HelloRequest("Christopher"))
response.onComplete { r =>
log.info("Scheduled say hello response {}", r)
}
})

val route =
path("hello" / Segment) { name =>
get {
log.info("hello request")
onComplete(client.sayHello(HelloRequest(name))) {
case Success(reply) => complete(reply.message)
case Failure(t) =>
log.error(t, "Request failed")
complete(StatusCodes.InternalServerError, t.getMessage)
}
}
}

val bindingFuture = Http().newServerAt("0.0.0.0", 8080).bindFlow(route)
bindingFuture.onComplete {
case Success(sb) =>
log.info("Bound: {}", sb)
case Failure(t) =>
log.error(t, "Failed to bind. Shutting down")
system.terminate()
}

}
}
44 changes: 44 additions & 0 deletions pekko-sample-grpc-kubernetes-scala/kubernetes/grpcservice.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
---
apiVersion: "apps/v1"
kind: Deployment
metadata:
name: "grpcservice-v0-1-0-snapshot"
labels:
appName: grpcservice
appNameVersion: "grpcservice-v0-1-0-snapshot"
spec:
replicas: 1
selector:
matchLabels:
appNameVersion: "grpcservice-v0-1-0-snapshot"
template:
metadata:
labels:
appName: grpcservice
appNameVersion: "grpcservice-v0-1-0-snapshot"
spec:
restartPolicy: Always
containers:
- name: grpcservice
image: "grpcservice:0.1.0-SNAPSHOT"
imagePullPolicy: IfNotPresent
ports:
- containerPort: 8080
name: http
volumeMounts: []
volumes: []
---
apiVersion: v1
kind: Service
metadata:
labels:
appName: grpcservice
name: grpcservice
spec:
ports:
- name: http
port: 8080
protocol: TCP
targetPort: 8080
selector:
appName: grpcservice
Loading

0 comments on commit 6da8087

Please sign in to comment.