Skip to content
This repository has been archived by the owner on Oct 20, 2022. It is now read-only.

Update EventSinkListener.java #65

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
*/

package com.orange.cepheus.cep;

import com.espertech.esper.client.*;
import com.orange.cepheus.cep.model.*;
import com.orange.cepheus.cep.model.Configuration;
Expand Down Expand Up @@ -38,6 +37,7 @@ public class EventSinkListener implements StatementAwareUpdateListener {
private EventMapper eventMapper;

private Configuration configuration;


/**
* All outgoing outgoingEvents accessible by type
Expand All @@ -50,22 +50,43 @@ public class EventSinkListener implements StatementAwareUpdateListener {
*/
@Override
public void update(EventBean[] added, EventBean[] removed, EPStatement epStatement, EPServiceProvider epServiceProvider) {

// ignore updates for removed events

/* contribution by CAMPIE project developers at Cairo university
Ahmad Essam
Mahmoud Samir
Karem Fathy
*/

//getting the provider URI containing the service and the servicePath through EPServiceProvider interface
//looping to get the ServicePath from the URI
String[] tenant = epServiceProvider.getURI().split("/");
String service=tenant[0];
String servicePath="";
for(int i=1;i<tenant.length;i++){
servicePath += "/"+tenant[i];

}


// ignore updates for removed events
if (added == null) {
return;
}

for (EventBean eventBean : added) {

// Debug some information about the event
if (logger.isInfoEnabled()) {
StringBuilder sb = new StringBuilder();
for (String propertyName : eventBean.getEventType().getPropertyNames()) {
sb.append(" / ").append(propertyName).append(':').append(eventBean.get(propertyName));
}

logger.info("EventOut: {}{} from {}", eventBean.getEventType().getName(), sb.toString(), epStatement.getText());
}



}

// Send updateContext requests to each broker
final String type = eventBean.getEventType().getName();
Expand All @@ -77,7 +98,12 @@ public void update(EventBean[] added, EventBean[] removed, EPStatement epStateme
if (updateContext != null) {
for (Broker broker : eventTypeOut.getBrokers()) {
assert broker != null;
HttpHeaders httpHeaders = getHeadersForBroker(broker);
// HttpHeaders httpHeaders = getHeadersForBroker(broker);
HttpHeaders httpHeaders = ngsiClient.getRequestHeaders(broker.getUrl());

//adding the service and servicePath to the headers of the out event request
httpHeaders.add("Fiware-Service",service);
httpHeaders.add("Fiware-ServicePath",servicePath);
ngsiClient.updateContext(broker.getUrl(), httpHeaders, updateContext).addCallback(
updateContextResponse ->
logger.debug("UpdateContext completed for {}", broker.getUrl()),
Expand Down Expand Up @@ -131,6 +157,7 @@ private UpdateContext buildUpdateContextRequest(EventBean eventBean, EventTypeOu
*/
private HttpHeaders getHeadersForBroker(Broker broker) {
HttpHeaders httpHeaders = ngsiClient.getRequestHeaders(broker.getUrl());

if (broker.getServiceName() != null) {
httpHeaders.add("Fiware-Service", broker.getServiceName());
} else if (configuration.getService() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void noUpdateOnRemovedEventBeans() {
attributes.add(new ContextAttribute("id", "string", null)); // null => when statement does not indicate id
attributes.add(new ContextAttribute("avgTemp", "double", 10.25));
EventBean[]beans = {buildEventBean("TempSensorAvg", attributes)};
eventSinkListener.update(null, beans, statement, provider);
eventSinkListener.update( null, beans, statement, provider);

verify(ngsiClient, never()).updateContext(any(), any(), any());
}
Expand Down