Skip to content

Commit ec3edad

Browse files
authored
Merge pull request #60 from vinscom/added-event
Added Event
2 parents a77e400 + df16e27 commit ec3edad

File tree

8 files changed

+137
-80
lines changed

8 files changed

+137
-80
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package in.erail.model;
2+
3+
/**
4+
*
5+
* @author vinay
6+
*/
7+
public class Event {
8+
9+
private RequestEvent request;
10+
private ResponseEvent response;
11+
12+
public Event() {
13+
this(new RequestEvent(), new ResponseEvent());
14+
}
15+
16+
public Event(RequestEvent pRequest, ResponseEvent pResponse) {
17+
this.request = pRequest;
18+
this.response = pResponse;
19+
}
20+
21+
public RequestEvent getRequest() {
22+
return request;
23+
}
24+
25+
public Event setRequest(RequestEvent pRequest) {
26+
this.request = pRequest;
27+
return this;
28+
}
29+
30+
public ResponseEvent getResponse() {
31+
return response;
32+
}
33+
34+
public Event setResponse(ResponseEvent pResponse) {
35+
this.response = pResponse;
36+
return this;
37+
}
38+
39+
}

src/main/java/in/erail/service/RESTService.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package in.erail.service;
22

3+
import in.erail.model.Event;
34
import in.erail.model.RequestEvent;
45
import in.erail.model.ResponseEvent;
56
import io.reactivex.Maybe;
6-
import io.reactivex.MaybeSource;
77

88
/**
99
*
@@ -15,7 +15,19 @@ public interface RESTService {
1515

1616
String getServiceUniqueId();
1717

18-
Maybe<ResponseEvent> handleEvent(RequestEvent pRequest);
18+
default Class<? extends RequestEvent> getRequestEventClass() {
19+
return RequestEvent.class;
20+
}
21+
22+
default Class<? extends ResponseEvent> getResponseEventClass() {
23+
return ResponseEvent.class;
24+
}
25+
26+
default Event createEvent(RequestEvent pRequest) throws InstantiationException, IllegalAccessException {
27+
return new Event(pRequest, getResponseEventClass().newInstance());
28+
}
29+
30+
Maybe<Event> handleEvent(Event pEvent);
1931

2032
String getAuthority();
2133

src/main/java/in/erail/service/RESTServiceImpl.java

Lines changed: 44 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import io.vertx.reactivex.core.Vertx;
77
import org.apache.logging.log4j.Logger;
88
import in.erail.glue.annotation.StartService;
9+
import in.erail.model.Event;
910
import in.erail.model.RequestEvent;
1011
import in.erail.model.ResponseEvent;
1112
import io.netty.handler.codec.http.HttpResponseStatus;
@@ -22,25 +23,27 @@
2223
*
2324
* @author vinay
2425
*/
25-
public abstract class RESTServiceImpl implements RESTService, MaybeTransformer<RequestEvent, ResponseEvent> {
26-
27-
private static final ResponseEvent DEFAULT_REPONSE_EVENT = new ResponseEvent();
26+
public abstract class RESTServiceImpl implements RESTService, MaybeTransformer<Event, Event> {
2827

2928
private String mOperationId;
3029
private String mServiceUniqueId;
3130
private Vertx mVertx;
3231
private boolean mEnable = false;
3332
private Logger mLog;
3433
private Scheduler mScheduler = Schedulers.io();
35-
private ResponseEvent mDefaultResponseEvent = DEFAULT_REPONSE_EVENT;
34+
private Event mDefaultEvent;
3635
private boolean mSecure = false;
3736
private String mAuthority;
3837
private Class<? extends RequestEvent> mRequestEventClass = RequestEvent.class;
39-
private MaybeTransformer<RequestEvent, RequestEvent> mPreProcessProcessors[];
40-
private MaybeTransformer<ResponseEvent, ResponseEvent> mPostProcessProcessors[];
38+
private Class<? extends ResponseEvent> mResponseEventClass = ResponseEvent.class;
39+
private MaybeTransformer<Event, Event> mPreProcessProcessors[];
40+
private MaybeTransformer<Event, Event> mPostProcessProcessors[];
4141

4242
@StartService
43-
public void start() {
43+
public void start() throws InstantiationException, IllegalAccessException {
44+
45+
mDefaultEvent = new Event(getRequestEventClass().newInstance(), getResponseEventClass().newInstance());
46+
4447
if (mEnable) {
4548
getVertx()
4649
.eventBus()
@@ -60,12 +63,13 @@ public Single<JsonObject> handleRequest(Message<JsonObject> pMessage) {
6063
return Single
6164
.just(pMessage)
6265
.map(m -> pMessage.body().mapTo(getRequestEventClass()))
66+
.map(this::createEvent)
6367
.flatMapMaybe(this::handleEvent)
64-
.toSingle(getDefaultResponseEvent())
65-
.map(resp -> JsonObject.mapFrom(resp))
68+
.toSingle(getDefaultEvent())
69+
.map(resp -> JsonObject.mapFrom(resp.getResponse()))
6670
.doOnSuccess(resp -> pMessage.reply(resp))
6771
.doOnError(err -> {
68-
ResponseEvent resp = new ResponseEvent()
72+
ResponseEvent resp = getResponseEventClass().newInstance()
6973
.setStatusCode(HttpResponseStatus.BAD_REQUEST.code())
7074
.setMediaType(MediaType.PLAIN_TEXT_UTF_8)
7175
.setBody(ExceptionUtils.getMessage(err).getBytes());
@@ -76,43 +80,30 @@ public Single<JsonObject> handleRequest(Message<JsonObject> pMessage) {
7680
}
7781

7882
@Override
79-
public Maybe<ResponseEvent> handleEvent(RequestEvent pRequest) {
80-
83+
public Maybe<Event> handleEvent(Event pEvent) {
8184
return Maybe
82-
.just(pRequest)
83-
.compose(this::preProcess)
85+
.just(pEvent)
86+
.compose(composePipeline(getPreProcessProcessors()))
8487
.compose(this)
85-
.compose(this::postProcess);
88+
.compose(composePipeline(getPostProcessProcessors()));
8689
}
8790

88-
public MaybeSource<RequestEvent> preProcess(Maybe<RequestEvent> pRequest) {
91+
protected MaybeTransformer<Event, Event> composePipeline(MaybeTransformer<Event, Event>[] pProcessors) {
8992

90-
if (getPreProcessProcessors() == null || getPreProcessProcessors().length == 0) {
91-
return pRequest;
93+
if (pProcessors == null || pProcessors.length == 0) {
94+
return (Maybe<Event> pEvent) -> pEvent;
9295
}
9396

94-
return Arrays
95-
.stream(getPreProcessProcessors())
96-
.reduce(pRequest, (acc, p) -> acc.compose(p), (a, b) -> a);
97+
return (Maybe<Event> pEvent) -> Arrays
98+
.stream(pProcessors)
99+
.reduce(pEvent, (acc, p) -> acc.compose(p), (a, b) -> a);
97100
}
98101

99102
@Override
100-
public MaybeSource<ResponseEvent> apply(Maybe<RequestEvent> pRequest) {
103+
public final MaybeSource<Event> apply(Maybe<Event> pRequest) {
101104
return process(pRequest);
102105
}
103-
104-
public abstract MaybeSource<ResponseEvent> process(Maybe<RequestEvent> pRequest);
105-
106-
public Maybe<ResponseEvent> postProcess(Maybe<ResponseEvent> pResponse) {
107-
108-
if (getPostProcessProcessors() == null || getPostProcessProcessors().length == 0) {
109-
return pResponse;
110-
}
111-
112-
return Arrays
113-
.stream(getPostProcessProcessors())
114-
.reduce(pResponse, (acc, p) -> acc.compose(p), (a, b) -> a);
115-
}
106+
public abstract MaybeSource<Event> process(Maybe<Event> pEvent);
116107

117108
@Override
118109
public String getOperationId() {
@@ -164,12 +155,12 @@ public void setScheduler(Scheduler pScheduler) {
164155
this.mScheduler = pScheduler;
165156
}
166157

167-
public ResponseEvent getDefaultResponseEvent() {
168-
return mDefaultResponseEvent;
158+
public Event getDefaultEvent() {
159+
return mDefaultEvent;
169160
}
170161

171-
public void setDefaultResponseEvent(ResponseEvent pDefaultResponseEvent) {
172-
this.mDefaultResponseEvent = pDefaultResponseEvent;
162+
public void setDefaultEvent(Event pDefaultEvent) {
163+
this.mDefaultEvent = pDefaultEvent;
173164
}
174165

175166
@Override
@@ -190,6 +181,7 @@ public void setAuthority(String pAuthority) {
190181
this.mAuthority = pAuthority;
191182
}
192183

184+
@Override
193185
public Class<? extends RequestEvent> getRequestEventClass() {
194186
return mRequestEventClass;
195187
}
@@ -198,19 +190,28 @@ public void setRequestEventClass(Class<? extends RequestEvent> pRequestEventClas
198190
this.mRequestEventClass = pRequestEventClass;
199191
}
200192

201-
public MaybeTransformer<RequestEvent, RequestEvent>[] getPreProcessProcessors() {
193+
@Override
194+
public Class<? extends ResponseEvent> getResponseEventClass() {
195+
return mResponseEventClass;
196+
}
197+
198+
public void setResponseEventClass(Class<? extends ResponseEvent> pResponseEventClass) {
199+
this.mResponseEventClass = pResponseEventClass;
200+
}
201+
202+
public MaybeTransformer<Event, Event>[] getPreProcessProcessors() {
202203
return mPreProcessProcessors;
203204
}
204205

205-
public void setPreProcessProcessors(MaybeTransformer<RequestEvent, RequestEvent>[] pPreProcessProcessors) {
206+
public void setPreProcessProcessors(MaybeTransformer<Event, Event>[] pPreProcessProcessors) {
206207
this.mPreProcessProcessors = pPreProcessProcessors;
207208
}
208209

209-
public MaybeTransformer<ResponseEvent, ResponseEvent>[] getPostProcessProcessors() {
210+
public MaybeTransformer<Event, Event>[] getPostProcessProcessors() {
210211
return mPostProcessProcessors;
211212
}
212213

213-
public void setPostProcessProcessors(MaybeTransformer<ResponseEvent, ResponseEvent>[] pPostProcessProcessors) {
214+
public void setPostProcessProcessors(MaybeTransformer<Event, Event>[] pPostProcessProcessors) {
214215
this.mPostProcessProcessors = pPostProcessProcessors;
215216
}
216217

src/test/java/in/erail/service/BinaryBodyService.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.common.base.Strings;
44
import com.google.common.net.MediaType;
5+
import in.erail.model.Event;
56

67
import in.erail.model.RequestEvent;
78
import in.erail.model.ResponseEvent;
@@ -19,23 +20,23 @@
1920
public class BinaryBodyService extends RESTServiceImpl {
2021

2122
@Override
22-
public MaybeSource<ResponseEvent> process(Maybe<RequestEvent> pRequest) {
23-
return pRequest.map(this::handle);
23+
public MaybeSource<Event> process(Maybe<Event> pRequest) {
24+
return pRequest.doOnSuccess(e -> handle(e.getRequest(), e.getResponse()));
2425
}
2526

26-
protected ResponseEvent handle(RequestEvent pRequest) {
27+
protected void handle(RequestEvent pRequest, ResponseEvent pRespone) {
2728
String topicName = pRequest.getPathParameters().get(TestConstants.Service.Broadcast.APIMessage.PARAM_TOPIC_NAME);
2829

2930
if (Strings.isNullOrEmpty(topicName)) {
30-
return new ResponseEvent().setStatusCode(HttpResponseStatus.BAD_REQUEST.code());
31+
pRespone.setStatusCode(HttpResponseStatus.BAD_REQUEST.code());
32+
return;
3133
}
3234

33-
ResponseEvent response = new ResponseEvent();
34-
response.setMediaType(MediaType.PLAIN_TEXT_UTF_8);
35+
pRespone.setMediaType(MediaType.PLAIN_TEXT_UTF_8);
3536

3637
JsonObject jsonBody = new JsonObject(Buffer.buffer(pRequest.getBody()));
3738

3839
String bodyContent = jsonBody.getString("data");
39-
return response.setBody(bodyContent.getBytes());
40+
pRespone.setBody(bodyContent.getBytes());
4041
}
4142
}

src/test/java/in/erail/service/BroadcastService.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.common.base.Strings;
44
import com.google.common.net.MediaType;
5+
import in.erail.model.Event;
56

67
import in.erail.model.RequestEvent;
78
import in.erail.model.ResponseEvent;
@@ -18,15 +19,16 @@
1819
public class BroadcastService extends RESTServiceImpl {
1920

2021
@Override
21-
public MaybeSource<ResponseEvent> process(Maybe<RequestEvent> pRequest) {
22-
return pRequest.map(this::handle);
22+
public MaybeSource<Event> process(Maybe<Event> pRequest) {
23+
return pRequest.doOnSuccess(e -> handle(e.getRequest(), e.getResponse()));
2324
}
2425

25-
protected ResponseEvent handle(RequestEvent pRequest) {
26+
protected void handle(RequestEvent pRequest, ResponseEvent pRespone) {
2627
String topicName = pRequest.getPathParameters().get(TestConstants.Service.Broadcast.APIMessage.PARAM_TOPIC_NAME);
2728

2829
if (Strings.isNullOrEmpty(topicName)) {
29-
return new ResponseEvent().setStatusCode(HttpResponseStatus.BAD_REQUEST.code());
30+
pRespone.setStatusCode(HttpResponseStatus.BAD_REQUEST.code());
31+
return;
3032
}
3133

3234
JsonObject bodyJson = new JsonObject(pRequest.bodyAsString());
@@ -37,10 +39,7 @@ protected ResponseEvent handle(RequestEvent pRequest) {
3739

3840
getLog().debug(() -> String.format("Message[%s] published on [%s]", bodyJson.toString(), topicName));
3941

40-
ResponseEvent response = new ResponseEvent();
41-
response.setBody(TestConstants.Service.Message.successMessage().toString().getBytes());
42-
response.setMediaType(MediaType.JSON_UTF_8);
43-
44-
return response;
42+
pRespone.setBody(TestConstants.Service.Message.successMessage().toString().getBytes());
43+
pRespone.setMediaType(MediaType.JSON_UTF_8);
4544
}
4645
}
Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package in.erail.service;
22

33
import com.google.common.net.MediaType;
4+
import in.erail.model.Event;
45

56
import in.erail.model.RequestEvent;
67
import in.erail.model.ResponseEvent;
@@ -13,14 +14,13 @@
1314
*/
1415
public class ProcessorCheckService extends RESTServiceImpl {
1516

16-
@Override
17-
public MaybeSource<ResponseEvent> process(Maybe<RequestEvent> pRequest) {
18-
return pRequest.map(this::handle);
17+
@Override
18+
public MaybeSource<Event> process(Maybe<Event> pRequest) {
19+
return pRequest.doOnSuccess(e -> handle(e.getRequest(), e.getResponse()));
1920
}
2021

21-
protected ResponseEvent handle(RequestEvent pRequest) {
22-
ResponseEvent response = new ResponseEvent();
23-
response.setMediaType(MediaType.PLAIN_TEXT_UTF_8);
24-
return response.setBody(pRequest.getSubject().toString().getBytes());
22+
protected void handle(RequestEvent pRequest, ResponseEvent pRespone) {
23+
pRespone.setMediaType(MediaType.PLAIN_TEXT_UTF_8);
24+
pRespone.setBody(pRequest.getSubject().toString().getBytes());
2525
}
2626
}

src/test/java/in/erail/service/processor/AddHeaderProcessor.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,27 @@
11
package in.erail.service.processor;
22

3-
import in.erail.model.ResponseEvent;
3+
import java.util.Optional;
4+
5+
import in.erail.model.Event;
46
import io.reactivex.Maybe;
57
import io.reactivex.MaybeSource;
68
import io.reactivex.MaybeTransformer;
7-
import java.util.Optional;
89

910
/**
1011
*
1112
* @author vinay
1213
*/
13-
public class AddHeaderProcessor implements MaybeTransformer<ResponseEvent, ResponseEvent> {
14+
public class AddHeaderProcessor implements MaybeTransformer<Event, Event> {
1415

1516
private String mMessage;
1617

1718
@Override
18-
public MaybeSource<ResponseEvent> apply(Maybe<ResponseEvent> pResponse) {
19-
return pResponse.map(r -> {
20-
String msg = Optional.ofNullable(r.headerValue("ProcessorHeader")).orElse("") + getMessage();
21-
r.removeHeader("ProcessorHeader");
22-
return r.addHeader("ProcessorHeader", msg);
19+
public MaybeSource<Event> apply(Maybe<Event> pEvent) {
20+
return pEvent.map(r -> {
21+
String msg = Optional.ofNullable(r.getResponse().headerValue("ProcessorHeader")).orElse("") + getMessage();
22+
r.getResponse().removeHeader("ProcessorHeader");
23+
r.getResponse().addHeader("ProcessorHeader", msg);
24+
return r;
2325
});
2426
}
2527

0 commit comments

Comments
 (0)