Skip to content

Commit

Permalink
Merge pull request #119 from jacomago/phoebus-pvaccess
Browse files Browse the repository at this point in the history
Phoebus pvaccess
  • Loading branch information
shroffk authored Jan 11, 2024
2 parents b88a307 + a1df17e commit 7c2155b
Show file tree
Hide file tree
Showing 9 changed files with 257 additions and 255 deletions.
4 changes: 4 additions & 0 deletions docker-compose-integrationtest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,7 @@ volumes:
networks:
channelfinder-net:
driver: bridge
enable_ipv6: true
ipam:
config:
- subnet: 2001:0DB8::/112
4 changes: 4 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,7 @@ volumes:
networks:
channelfinder-net:
driver: bridge
enable_ipv6: true
ipam:
config:
- subnet: 2001:0DB8::/112
8 changes: 3 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,9 @@
</dependency>

<dependency>
<groupId>org.epics</groupId>
<artifactId>epics-core</artifactId>
<type>pom</type>
<version>7.0.8</version>
<scope>compile</scope>
<groupId>org.phoebus</groupId>
<artifactId>core-pva</artifactId>
<version>4.7.2</version>
</dependency>

<dependency>
Expand Down
12 changes: 3 additions & 9 deletions src/main/java/org/phoebus/channelfinder/MetricsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,17 @@ public class MetricsService {

private final ChannelRepository channelRepository;
private final PropertyRepository propertyRepository;
private final TagRepository tagRepository;
private final MeterRegistry meterRegistry;

@Autowired
public MetricsService(final ChannelRepository channelRepository, final PropertyRepository propertyRepository, final TagRepository tagRepository,
final MeterRegistry meterRegistry) {
public MetricsService(final ChannelRepository channelRepository, final PropertyRepository propertyRepository, final TagRepository tagRepository, final MeterRegistry meterRegistry) {
this.channelRepository = channelRepository;
this.propertyRepository = propertyRepository;
this.tagRepository = tagRepository;
this.meterRegistry = meterRegistry;
registerGaugeMetrics();
registerGaugeMetrics(meterRegistry, tagRepository);
}

MultiGauge propertyCounts;
MultiGauge tagCounts;

private void registerGaugeMetrics(){
private void registerGaugeMetrics(MeterRegistry meterRegistry, TagRepository tagRepository){
Gauge.builder(METRIC_NAME_CHANNEL_COUNT, () -> channelRepository.count(new LinkedMultiValueMap<>()))
.description(METRIC_DESCRIPTION_CHANNEL_COUNT)
.register(meterRegistry);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,51 +1,43 @@
package org.phoebus.channelfinder.epics;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.epics.nt.NTTable;
import org.epics.nt.NTTableBuilder;
import org.epics.nt.NTURI;
import org.epics.pvaccess.PVAException;
import org.epics.pvaccess.server.rpc.RPCResponseCallback;
import org.epics.pvaccess.server.rpc.RPCServer;
import org.epics.pvaccess.server.rpc.RPCServiceAsync;
import org.epics.pvdata.factory.StatusFactory;
import org.epics.pvdata.pv.PVBooleanArray;
import org.epics.pvdata.pv.PVString;
import org.epics.pvdata.pv.PVStringArray;
import org.epics.pvdata.pv.PVStructure;
import org.epics.pvdata.pv.ScalarType;
import org.epics.pva.data.PVABoolArray;
import org.epics.pva.data.PVAStringArray;
import org.epics.pva.data.PVAStructure;
import org.epics.pva.data.nt.MustBeArrayException;
import org.epics.pva.data.nt.NotValueException;
import org.epics.pva.data.nt.PVATable;
import org.epics.pva.data.nt.PVAURI;
import org.epics.pva.server.RPCService;
import org.phoebus.channelfinder.entity.Channel;
import org.phoebus.channelfinder.ChannelRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.stereotype.Service;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.epics.pva.server.PVAServer;
import org.epics.pva.server.ServerPV;

/**
* A pva RPC service for channelfinder
*
* <p>
* Request:
*
* <p>
* The client requests a query as a NTURI pvStructure.
*
* <p>
* Result:
*
* <p>
* The service returns the result as an NTTable pvStructure.
*
* @author Kunal Shroff
Expand All @@ -57,108 +49,97 @@ public class ChannelFinderEpicsService {

private static final Logger logger = Logger.getLogger(ChannelFinderEpicsService.class.getName());

private final ExecutorService pool = Executors.newScheduledThreadPool(1);

public static final String SERVICE_DESC = "cfService:query";

public static final String COLUMN_CHANNEL_NAME = "channelName";
public static final String COLUMN_OWNER = "owner";

@Autowired
ChannelRepository repository;

RPCServer server;

private ChannelFinderServiceImpl service;
PVAServer server;
ServerPV serverPV;

@PostConstruct
public void init() {
public void init() throws Exception {

logger.log(Level.INFO, "Launching the epics rpc channelfinder service: " + SERVICE_DESC);
server = new RPCServer();

server = new PVAServer();

logger.log(Level.INFO, SERVICE_DESC + " initializing...");
service = new ChannelFinderServiceImpl(repository);
server.registerService(SERVICE_DESC, service);
server.printInfo();
ChannelFinderServiceImpl service = new ChannelFinderServiceImpl(repository);
serverPV = server.createPV(SERVICE_DESC, service);
logger.log(Level.INFO, SERVICE_DESC + " is operational.");

pool.submit(() -> {
try {
server.run(0);
} catch (PVAException e) {
logger.log(Level.SEVERE, "Failed to start the epics rpc channelfinder service", e);
}
});

}

@PreDestroy
public void onDestroy() throws Exception {
public void onDestroy() {
logger.log(Level.INFO, "Shutting down service " + SERVICE_DESC);
try {
service.shutdown();
server.destroy();
logger.log(Level.INFO, SERVICE_DESC + " Shutdown complete.");
} catch (PVAException e) {
logger.log(Level.SEVERE, "Failed to close service : " + SERVICE_DESC, e);
}
logger.info("Shutting down service " + SERVICE_DESC);
serverPV.close();
server.close();
logger.info(SERVICE_DESC + " Shutdown complete.");
}

private static class ChannelFinderServiceImpl implements RPCServiceAsync {
private static class ChannelFinderServiceImpl implements RPCService {


private ChannelRepository repository;
private final ChannelRepository repository;

public ChannelFinderServiceImpl(ChannelRepository repository) {
this.repository = repository;
logger.log(Level.INFO, "start");
}

private final ExecutorService pool = Executors.newScheduledThreadPool(50);

@Override
public void request(PVStructure args, RPCResponseCallback call) {
logger.log(Level.FINE, () -> args.toString());
HandlerQuery query = new HandlerQuery(args, call, repository);
query.run();
public PVAStructure call(PVAStructure args) throws Exception {
logger.log(Level.FINE, args::toString);
HandlerQuery query = new HandlerQuery(args, repository);
return query.run();
}

private static class HandlerQuery implements Runnable {
private static class HandlerQuery {

private final RPCResponseCallback callback;
private final PVStructure args;
private final PVAStructure args;
private final ChannelRepository channelRepository;

public HandlerQuery(PVStructure args, RPCResponseCallback callback, ChannelRepository channelRepository) {
this.callback = callback;
public HandlerQuery(PVAStructure args, ChannelRepository channelRepository) {
this.args = args;
this.channelRepository = channelRepository;
}

@Override
public void run() {

final Set<String> filteredColumns = Collections.emptySet();
public PVAStructure run() throws MustBeArrayException {

MultiValueMap<String, String> searchParameters = new LinkedMultiValueMap<>();
NTURI uri = NTURI.wrap(args);
String[] query = uri.getQueryNames();
for (String parameter : query) {
String value = uri.getQueryField(PVString.class, parameter).get();
PVAURI uri = PVAURI.fromStructure(args);
Map<String, String> query;
try {
query = uri.getQuery();
} catch (NotValueException e) {
logger.log(Level.WARNING, () -> "Query " + uri + " not valid." + e.getMessage());
throw new UnsupportedOperationException("The requested operation is not supported.");
}
for (String parameter : query.keySet()) {
String value = query.get(parameter);
if (value != null && !value.isEmpty()) {
switch (parameter) {
case "_name":
searchParameters.put("~name", Arrays.asList(value));
searchParameters.put("~name", List.of(value));
break;
case "_tag":
searchParameters.put("~tag", Arrays.asList(value));
searchParameters.put("~tag", List.of(value));
break;
case "_size":
searchParameters.put("~size", Arrays.asList(value));
searchParameters.put("~size", List.of(value));
break;
case "_from":
searchParameters.put("~from", Arrays.asList(value));
searchParameters.put("~from", List.of(value));
break;
default:
searchParameters.put(parameter, Arrays.asList(value));
searchParameters.put(parameter, List.of(value));
break;
}
}
Expand All @@ -169,89 +150,48 @@ public void run() {
final Map<String, List<String>> channelTable = new HashMap<>();
final Map<String, List<String>> channelPropertyTable = new HashMap<>();
final Map<String, boolean[]> channelTagTable = new HashMap<>();
channelTable.put("channelName", Arrays.asList(new String[result.size()]));
channelTable.put("owner", Arrays.asList(new String[result.size()]));
channelTable.put(COLUMN_CHANNEL_NAME, Arrays.asList(new String[result.size()]));
channelTable.put(COLUMN_OWNER, Arrays.asList(new String[result.size()]));

AtomicInteger counter = new AtomicInteger(0);

result.forEach(ch -> {

int index = counter.getAndIncrement();

channelTable.get("channelName").set(index, ch.getName());
channelTable.get("owner").set(index, ch.getOwner());

if (!filteredColumns.contains("ALL")) {
ch.getTags().stream().filter(tag ->
filteredColumns.isEmpty() || filteredColumns.contains(tag.getName())
).forEach(t -> {
if (!channelTagTable.containsKey(t.getName())) {
channelTagTable.put(t.getName(), new boolean[result.size()]);
}
channelTagTable.get(t.getName())[index] = true;
});

ch.getProperties().stream().filter(prop ->
filteredColumns.isEmpty() || filteredColumns.contains(prop.getName())
).forEach(prop -> {
if (!channelPropertyTable.containsKey(prop.getName())) {
channelPropertyTable.put(prop.getName(), Arrays.asList(new String[result.size()]));
}
channelPropertyTable.get(prop.getName()).set(index, prop.getValue());
});
}
channelTable.get(COLUMN_CHANNEL_NAME).set(index, ch.getName());
channelTable.get(COLUMN_OWNER).set(index, ch.getOwner());

ch.getTags().forEach(t -> {
if (!channelTagTable.containsKey(t.getName())) {
channelTagTable.put(t.getName(), new boolean[result.size()]);
}
channelTagTable.get(t.getName())[index] = true;
});

ch.getProperties().forEach(prop -> {
if (!channelPropertyTable.containsKey(prop.getName())) {
channelPropertyTable.put(prop.getName(), Arrays.asList(new String[result.size()]));
}
channelPropertyTable.get(prop.getName()).set(index, prop.getValue());
});
});
NTTableBuilder ntTableBuilder = NTTable.createBuilder();
PVATable.PVATableBuilder ntTableBuilder = PVATable.PVATableBuilder.aPVATable().name(SERVICE_DESC);

channelTable.keySet().forEach(name ->
ntTableBuilder.addColumn(name, ScalarType.pvString)
ntTableBuilder.addColumn(new PVAStringArray(name, channelTable.get(name).toArray(String[]::new)))
);
channelPropertyTable.keySet().forEach(name ->
ntTableBuilder.addColumn(name, ScalarType.pvString)
ntTableBuilder.addColumn(new PVAStringArray(name, channelPropertyTable.get(name).toArray(String[]::new)))
);
channelTagTable.keySet().forEach(name ->
ntTableBuilder.addColumn(name, ScalarType.pvBoolean)
);
NTTable ntTable = ntTableBuilder.create();

channelTable.entrySet().stream().forEach(col ->
ntTable.getColumn(PVStringArray.class, col.getKey()).put(0, col.getValue().size(),
col.getValue().stream().toArray(String[]::new), 0)
);

channelPropertyTable.entrySet().stream().forEach(col ->
ntTable.getColumn(PVStringArray.class, col.getKey()).put(0, col.getValue().size(),
col.getValue().stream().toArray(String[]::new), 0)
);

channelTagTable.entrySet().stream().forEach(col ->
ntTable.getColumn(PVBooleanArray.class, col.getKey()).put(0, col.getValue().length,
col.getValue(), 0)
ntTableBuilder.addColumn(new PVABoolArray(name, channelTagTable.get(name)))
);

logger.log(Level.FINE, () -> ntTable.toString());
this.callback.requestDone(StatusFactory.getStatusCreate().getStatusOK(), ntTable.getPVStructure());
logger.log(Level.FINE, ntTableBuilder::toString);
return ntTableBuilder.build();
}
}

public void shutdown() {
logger.log(Level.INFO, "shutting down service.");
pool.shutdown();
// Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
logger.log(Level.INFO, "completed shut down.");
}

}
}
Loading

0 comments on commit 7c2155b

Please sign in to comment.