Skip to content

Make refactor on ViewerStats functionality #4633

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
47 changes: 31 additions & 16 deletions src/main/java/io/antmedia/datastore/db/DataStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import io.antmedia.datastore.db.types.TensorFlowObject;
import io.antmedia.datastore.db.types.Token;
import io.antmedia.datastore.db.types.VoD;
import io.antmedia.datastore.db.types.WebRTCViewerInfo;
import io.antmedia.datastore.db.types.ViewerInfo;
import io.antmedia.muxer.IAntMediaStreamHandler;

public abstract class DataStore {
Expand Down Expand Up @@ -1254,10 +1254,10 @@ public int getTotalWebRTCViewersCount(Map<String, String> broadcastMap, Gson gso
return totalWebRTCViewerCount;
}

protected ArrayList<WebRTCViewerInfo> searchOnWebRTCViewerInfo(ArrayList<WebRTCViewerInfo> list, String search) {
protected ArrayList<ViewerInfo> searchOnWebRTCViewerInfo(ArrayList<ViewerInfo> list, String search) {
if(search != null && !search.isEmpty()) {
for (Iterator<WebRTCViewerInfo> i = list.iterator(); i.hasNext(); ) {
WebRTCViewerInfo item = i.next();
for (Iterator<ViewerInfo> i = list.iterator(); i.hasNext(); ) {
ViewerInfo item = i.next();
if(item.getViewerId() != null && !item.getViewerId().toLowerCase().contains(search.toLowerCase())) {
i.remove();
}
Expand All @@ -1266,7 +1266,7 @@ protected ArrayList<WebRTCViewerInfo> searchOnWebRTCViewerInfo(ArrayList<WebRTCV
return list;
}

protected List<WebRTCViewerInfo> sortAndCropWebRTCViewerInfoList(List<WebRTCViewerInfo> list, int offset, int size, String sortBy, String orderBy) {
protected List<ViewerInfo> sortAndCropWebRTCViewerInfoList(List<ViewerInfo> list, int offset, int size, String sortBy, String orderBy) {
if("viewerId".equals(sortBy))
{
Collections.sort(list, (viewer1, viewer2) -> {
Expand Down Expand Up @@ -1299,30 +1299,45 @@ protected List<WebRTCViewerInfo> sortAndCropWebRTCViewerInfoList(List<WebRTCView
*
* @param info information for the WebRTC Viewer
*/
public abstract void saveViewerInfo(WebRTCViewerInfo info);
public abstract void saveViewerInfo(ViewerInfo info);

/**
* This is used to update WebRTC Viewer Info to datastore
*
* @param info information for the WebRTC Viewer
*/
public abstract boolean updateViewerInfoEndTime(String sessionId, long endTime);

/**
* Get list of webrtc viewers
* Get list of viewers
*
* @param viewerType
* @param offset
* @param size
* @param search
* @param orderBy
* @param sortBy
*
* @return list of webrtc viewers
* @return list of viewers
*/
public abstract List<WebRTCViewerInfo> getWebRTCViewerList(int offset, int size, String sortBy, String orderBy, String search);
public abstract List<ViewerInfo> getViewerList(String viewerType, int offset, int size, String sortBy, String orderBy, String search);

public List<WebRTCViewerInfo> getWebRTCViewerList(Map<String, String> webRTCViewerMap, int offset, int size, String sortBy, String orderBy,
public List<ViewerInfo> getViewerList(Map<String, String> viewerMap, String viewerType, int offset, int size, String sortBy, String orderBy,
String search, Gson gson) {
ArrayList<WebRTCViewerInfo> list = new ArrayList<>();
ArrayList<ViewerInfo> list = new ArrayList<>();
synchronized (this) {

Collection<String> webRTCViewers = webRTCViewerMap.values();
for (String infoString : webRTCViewers) {
WebRTCViewerInfo info = gson.fromJson(infoString, WebRTCViewerInfo.class);
list.add(info);
// Use WebRTC by default
if(viewerType == null) {
viewerType = "webrtc";
}

Collection<String> viewers = viewerMap.values();
for (String infoString : viewers) {
ViewerInfo info = gson.fromJson(infoString, ViewerInfo.class);
if(info.getViewerType().equals(viewerType)) {
list.add(info);
}
}
}
if (search != null && !search.isEmpty()) {
Expand All @@ -1338,7 +1353,7 @@ public List<WebRTCViewerInfo> getWebRTCViewerList(Map<String, String> webRTCView
*
* @param viewerId WebRTC Viewer Id
*/
public abstract boolean deleteWebRTCViewerInfo(String viewerId);
public abstract boolean deleteWebRTCViewerInfo(String sessionId);

/**
* This is used to update meta data for a bradcast
Expand Down
44 changes: 31 additions & 13 deletions src/main/java/io/antmedia/datastore/db/InMemoryDataStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import io.antmedia.datastore.db.types.TensorFlowObject;
import io.antmedia.datastore.db.types.Token;
import io.antmedia.datastore.db.types.VoD;
import io.antmedia.datastore.db.types.WebRTCViewerInfo;
import io.antmedia.datastore.db.types.ViewerInfo;
import io.antmedia.muxer.IAntMediaStreamHandler;
import io.antmedia.muxer.MuxAdaptor;

Expand All @@ -39,7 +39,7 @@ public class InMemoryDataStore extends DataStore {
private Map<String, Token> tokenMap = new LinkedHashMap<>();
private Map<String, Subscriber> subscriberMap = new LinkedHashMap<>();
private Map<String, ConferenceRoom> roomMap = new LinkedHashMap<>();
private Map<String, WebRTCViewerInfo> webRTCViewerMap = new LinkedHashMap<>();
private Map<String, ViewerInfo> viewerMap = new LinkedHashMap<>();


public InMemoryDataStore(String dbName) {
Expand Down Expand Up @@ -981,21 +981,39 @@ public int getTotalWebRTCViewersCount() {
}

@Override
public void saveViewerInfo(WebRTCViewerInfo info) {
webRTCViewerMap.put(info.getViewerId(), info);
public void saveViewerInfo(ViewerInfo info) {
viewerMap.put(info.getSessionId(), info);
}

@Override
public boolean updateViewerInfoEndTime(String sessionId, long endTime) {
ViewerInfo viewerInfo = viewerMap.get(sessionId);
boolean result = false;
if (viewerInfo != null) {
viewerInfo.setEndTime(endTime);
viewerMap.put(sessionId, viewerInfo);
result = true;
return result;
}
return result;
}

public List<WebRTCViewerInfo> getWebRTCViewerList(int offset, int size, String sortBy, String orderBy,
public List<ViewerInfo> getViewerList(String viewerType, int offset, int size, String sortBy, String orderBy,
String search) {

Collection<WebRTCViewerInfo> values = webRTCViewerMap.values();

ArrayList<WebRTCViewerInfo> list = new ArrayList<>();

Collection<ViewerInfo> values = viewerMap.values();
ArrayList<ViewerInfo> list = new ArrayList<>();

// Use WebRTC by default
if(viewerType != null) {
viewerType = "webrtc";
}

for (WebRTCViewerInfo info : values)
for (ViewerInfo info : values)
{
list.add(info);
if(info.getViewerType().equals(viewerType)) {
list.add(info);
}
}

if(search != null && !search.isEmpty()){
Expand All @@ -1006,8 +1024,8 @@ public List<WebRTCViewerInfo> getWebRTCViewerList(int offset, int size, String s
}

@Override
public boolean deleteWebRTCViewerInfo(String viewerId) {
webRTCViewerMap.remove(viewerId);
public boolean deleteWebRTCViewerInfo(String sessionId) {
viewerMap.remove(sessionId);
return true;
}

Expand Down
41 changes: 33 additions & 8 deletions src/main/java/io/antmedia/datastore/db/MapBasedDataStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import io.antmedia.datastore.db.types.TensorFlowObject;
import io.antmedia.datastore.db.types.Token;
import io.antmedia.datastore.db.types.VoD;
import io.antmedia.datastore.db.types.WebRTCViewerInfo;
import io.antmedia.datastore.db.types.ViewerInfo;
import io.antmedia.muxer.IAntMediaStreamHandler;
import io.antmedia.muxer.MuxAdaptor;

Expand All @@ -43,7 +43,7 @@ public abstract class MapBasedDataStore extends DataStore {
protected Map<String, String> tokenMap;
protected Map<String, String> subscriberMap;
protected Map<String, String> conferenceRoomMap;
protected Map<String, String> webRTCViewerMap;
protected Map<String, String> viewerMap;

public static final String REPLACE_CHARS_REGEX = "[\n|\r|\t]";

Expand Down Expand Up @@ -992,28 +992,53 @@ public int getTotalWebRTCViewersCount() {
}

@Override
public void saveViewerInfo(WebRTCViewerInfo info) {
public void saveViewerInfo(ViewerInfo info) {
synchronized (this) {
if (info != null) {
try {
webRTCViewerMap.put(info.getViewerId(), gson.toJson(info));
viewerMap.put(info.getSessionId(), gson.toJson(info));
} catch (Exception e) {
logger.error(ExceptionUtils.getStackTrace(e));
}
}
}
}

@Override
public boolean updateViewerInfoEndTime(String sessionId, long endTime) {
boolean result = false;
synchronized (this) {
if (sessionId != null) {

ViewerInfo viewerInfo = null;

String jsonString = viewerMap.get(sessionId);
if(jsonString != null) {
viewerInfo = gson.fromJson(jsonString, ViewerInfo.class);
}

if(viewerInfo != null) {
viewerInfo.setEndTime(endTime);
String currentViewerInfo = gson.toJson(viewerInfo);
viewerMap.replace(sessionId, currentViewerInfo);
result = true;
return result;
}
}
}
return result;
}

@Override
public List<WebRTCViewerInfo> getWebRTCViewerList(int offset, int size, String sortBy, String orderBy,
public List<ViewerInfo> getViewerList(String viewerType, int offset, int size, String sortBy, String orderBy,
String search) {
return super.getWebRTCViewerList(webRTCViewerMap, offset, size, sortBy, orderBy, search, gson);
return super.getViewerList(viewerMap, viewerType, offset, size, sortBy, orderBy, search, gson);
}

@Override
public boolean deleteWebRTCViewerInfo(String viewerId) {
public boolean deleteWebRTCViewerInfo(String sessionId) {
synchronized (this) {
return webRTCViewerMap.remove(viewerId) != null;
return viewerMap.remove(sessionId) != null;
}
}

Expand Down
7 changes: 2 additions & 5 deletions src/main/java/io/antmedia/datastore/db/MapDBStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.antmedia.datastore.db.types.Broadcast;
import io.antmedia.datastore.db.types.StreamInfo;
import io.antmedia.muxer.IAntMediaStreamHandler;
import io.vertx.core.Vertx;


Expand All @@ -35,7 +33,7 @@ public class MapDBStore extends MapBasedDataStore {
private static final String TOKEN = "TOKEN";
private static final String SUBSCRIBER = "SUBSCRIBER";
private static final String CONFERENCE_ROOM_MAP_NAME = "CONFERENCE_ROOM";
private static final String WEBRTC_VIEWER = "WEBRTC_VIEWER";
private static final String VIEWER = "VIEWER";


public MapDBStore(String dbName, Vertx vertx) {
Expand Down Expand Up @@ -70,7 +68,7 @@ public MapDBStore(String dbName, Vertx vertx) {
conferenceRoomMap = db.treeMap(CONFERENCE_ROOM_MAP_NAME).keySerializer(Serializer.STRING).valueSerializer(Serializer.STRING)
.counterEnable().createOrOpen();

webRTCViewerMap = db.treeMap(WEBRTC_VIEWER).keySerializer(Serializer.STRING).valueSerializer(Serializer.STRING)
viewerMap = db.treeMap(VIEWER).keySerializer(Serializer.STRING).valueSerializer(Serializer.STRING)
.counterEnable().createOrOpen();

timerId = vertx.setPeriodic(5000, id ->
Expand Down Expand Up @@ -135,5 +133,4 @@ public List<StreamInfo> getStreamInfoList(String streamId) {
public void saveStreamInfo(StreamInfo streamInfo) {
//no need to implement this method, it is used in cluster mode
}

}
35 changes: 26 additions & 9 deletions src/main/java/io/antmedia/datastore/db/MongoStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import io.antmedia.datastore.db.types.TensorFlowObject;
import io.antmedia.datastore.db.types.Token;
import io.antmedia.datastore.db.types.VoD;
import io.antmedia.datastore.db.types.WebRTCViewerInfo;
import io.antmedia.datastore.db.types.ViewerInfo;
import io.antmedia.muxer.IAntMediaStreamHandler;
import io.antmedia.muxer.MuxAdaptor;

Expand Down Expand Up @@ -1367,30 +1367,47 @@ public int getTotalWebRTCViewersCount()
}

@Override
public void saveViewerInfo(WebRTCViewerInfo info) {
public void saveViewerInfo(ViewerInfo info) {
synchronized(this) {
if (info == null) {
return;
}
datastore.save(info);
}
}

/**
* {@inheritDoc}
*/
@Override
public boolean updateViewerInfoEndTime(String sessionId, long endTime) {
synchronized(this) {
try {
Query<ViewerInfo> query = datastore.find(ViewerInfo.class).filter(Filters.eq("sessionId", sessionId));
return query.update(
set("endTime", endTime))
.execute().getMatchedCount() == 1;
} catch (Exception e) {
logger.error(e.getMessage());
return false;
}
}
}

@Override
public List<WebRTCViewerInfo> getWebRTCViewerList(int offset, int size, String sortBy, String orderBy,
public List<ViewerInfo> getViewerList(String viewerType, int offset, int size, String sortBy, String orderBy,
String search) {
synchronized(this) {
Query<WebRTCViewerInfo> query = datastore.find(WebRTCViewerInfo.class);
Query<ViewerInfo> query = datastore.find(ViewerInfo.class).filter(Filters.eq("viewerType", viewerType));

if (size > MAX_ITEM_IN_ONE_LIST) {
size = MAX_ITEM_IN_ONE_LIST;
}

FindOptions findOptions = new FindOptions().skip(offset).limit(size);

if (sortBy != null && orderBy != null && !sortBy.isEmpty() && !orderBy.isEmpty()) {
findOptions.sort(orderBy.equals("desc") ? Sort.descending(sortBy) : Sort.ascending(sortBy));

}
if (search != null && !search.isEmpty()) {
logger.info("Server side search is called for WebRTCViewerInfo = {}", search);
Expand All @@ -1405,10 +1422,10 @@ public List<WebRTCViewerInfo> getWebRTCViewerList(int offset, int size, String s
}

@Override
public boolean deleteWebRTCViewerInfo(String viewerId) {
public boolean deleteWebRTCViewerInfo(String sessionId) {
synchronized(this) {
return datastore.find(WebRTCViewerInfo.class)
.filter(Filters.eq(VIEWER_ID, viewerId))
return datastore.find(ViewerInfo.class)
.filter(Filters.eq("sessionId", sessionId))
.delete()
.getDeletedCount() == 1;
}
Expand Down
Loading