Skip to content

Commit

Permalink
Merge pull request #27 from whereby/geirbakke/cob-119-pair-lessdisrup…
Browse files Browse the repository at this point in the history
…tivereconnecton-with-stopped-media-detection

Adds getUpdatedStats for immediate stats collection
  • Loading branch information
geirbakke authored Oct 27, 2023
2 parents 1fb844d + d1e9665 commit f0da12e
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 15 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@whereby/jslib-media",
"description": "Media library for Whereby",
"version": "1.3.3",
"version": "1.3.4",
"private": false,
"license": "MIT",
"homepage": "https://github.com/whereby/jslib-media",
Expand Down
48 changes: 34 additions & 14 deletions src/webrtc/stats/StatsMonitor/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ export const getStats = () => {
};

let subscriptions = [];
let stopMonitor = null;
let currentMonitor = null;

export const getUpdatedStats = () => currentMonitor?.getUpdatedStats();

const getOrCreateSsrcMetricsContainer = (time, pcIndex, clientId, trackId, ssrc) => {
let viewStats = statsByView[clientId];
Expand Down Expand Up @@ -180,7 +182,9 @@ function startStatsMonitor({ interval }) {
console.warn("Failed to observe CPU pressure", ex);
}

const collectStats = async () => {
let lastUpdateTime = 0;

const collectStats = async (immediate) => {
try {
// refresh provided clients before each run
const clients = getClients();
Expand All @@ -197,8 +201,17 @@ function startStatsMonitor({ interval }) {
// set pressure to last record received
defaultViewStats.pressure = lastPressureObserverRecord;

// throttle calls to ensure we get a diff from previous stats
const timeSinceLastUpdate = Date.now() - lastUpdateTime;
if (timeSinceLastUpdate < 400) {
if (immediate) return statsByView;
subscriptions.forEach((subscription) => subscription.onUpdatedStats?.(statsByView, clients));
nextTimeout = setTimeout(collectStats, interval || STATS_INTERVAL);
return;
}

// keep track of what has been updated this run
const thisUpdateTime = Date.now();
lastUpdateTime = Date.now();

// loop through current peer connections
(await getPeerConnectionsWithStatsReports()).forEach(([pc, report, pcData]) => {
Expand Down Expand Up @@ -229,11 +242,11 @@ function startStatsMonitor({ interval }) {
const cpId = pcIndex + ":" + currentRtcStats.id;
let cpMetrics = defaultViewStats.candidatePairs[cpId];
if (!cpMetrics) {
cpMetrics = { startTime: thisUpdateTime, id: cpId };
cpMetrics = { startTime: lastUpdateTime, id: cpId };
defaultViewStats.candidatePairs[cpId] = cpMetrics;
}
captureCandidatePairInfoMetrics(cpMetrics, currentRtcStats, prevRtcStats, timeDiff, report);
cpMetrics.lastRtcStatsTime = thisUpdateTime;
cpMetrics.lastRtcStatsTime = lastUpdateTime;
}

if (currentRtcStats.type === "inbound-rtp" || currentRtcStats.type === "outbound-rtp") {
Expand Down Expand Up @@ -281,7 +294,7 @@ function startStatsMonitor({ interval }) {
? currentRtcStats.timestamp - prevRtcStats.timestamp
: STATS_INTERVAL;
const ssrcMetrics = getOrCreateSsrcMetricsContainer(
thisUpdateTime,
lastUpdateTime,
pcIndex,
client.id,
trackId,
Expand Down Expand Up @@ -321,25 +334,29 @@ function startStatsMonitor({ interval }) {
});
});

removeNonUpdatedStats(thisUpdateTime);
removeNonUpdatedStats(lastUpdateTime);

// mark candidatepairs as active/inactive
Object.entries(defaultViewStats.candidatePairs).forEach(([cpKey, cp]) => {
const active = cp.lastRtcStatsTime === thisUpdateTime;
const active = cp.lastRtcStatsTime === lastUpdateTime;
cp.active = active;
if (!active) {
cp.state = "old/inactive";
if (!cp.inactiveFromTime) cp.inactiveFromTime = thisUpdateTime;
if (!cp.inactiveFromTime) cp.inactiveFromTime = lastUpdateTime;
else {
// delete candidate pair after a few secs
if (thisUpdateTime - cp.inactiveFromTime > 4000) {
if (lastUpdateTime - cp.inactiveFromTime > 4000) {
delete defaultViewStats.candidatePairs[cpKey];
}
}
}
});

subscriptions.forEach((subscription) => subscription.onUpdatedStats?.(statsByView, clients));
if (immediate) {
return statsByView;
} else {
subscriptions.forEach((subscription) => subscription.onUpdatedStats?.(statsByView, clients));
}
} catch (ex) {
console.warn(ex);
}
Expand All @@ -355,22 +372,25 @@ function startStatsMonitor({ interval }) {
clearTimeout(nextTimeout);
if (pressureObserver) pressureObserver.unobserve("cpu");
},
getUpdatedStats: () => {
return collectStats(true);
},
};
}

export function subscribeStats(subscription) {
subscriptions.push(subscription);

// start the monitor on first subscription
if (!stopMonitor) stopMonitor = startStatsMonitor({}).stop;
if (!currentMonitor) currentMonitor = startStatsMonitor({});

return {
stop() {
subscriptions = subscriptions.filter((s) => s !== subscription);
if (!subscriptions.length) {
// stop monitor when last subscription is stopped/removed
stopMonitor?.();
stopMonitor = null;
currentMonitor?.stop();
currentMonitor = null;
}
},
};
Expand Down

0 comments on commit f0da12e

Please sign in to comment.