Skip to content

Commit

Permalink
Do not join active call (#499)
Browse files Browse the repository at this point in the history
  • Loading branch information
kanat authored Oct 6, 2023
1 parent 844162f commit fd2dffb
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 36 deletions.
14 changes: 6 additions & 8 deletions dogfooding/lib/app/app_content.dart
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
// 🎯 Dart imports:
import 'dart:async';

// 🐦 Flutter imports:
import 'package:firebase_core/firebase_core.dart';
import 'package:firebase_messaging/firebase_messaging.dart';
import 'package:flutter/foundation.dart';
import 'package:flutter/material.dart';

// 📦 Package imports:
import 'package:firebase_messaging/firebase_messaging.dart';
import 'package:flutter_dogfooding/router/routes.dart';
import 'package:google_fonts/google_fonts.dart';
import 'package:stream_chat_flutter/stream_chat_flutter.dart' hide User;
import 'package:stream_video_flutter/stream_video_flutter.dart';
import 'package:uni_links/uni_links.dart';

// 🌎 Project imports:
import 'package:flutter_dogfooding/router/routes.dart';
import '../core/repos/app_preferences.dart';
import '../di/injector.dart';
import '../firebase_options.dart';
Expand Down Expand Up @@ -67,6 +62,7 @@ class _StreamDogFoodingAppContentState
extends State<StreamDogFoodingAppContent> {
late final _userAuthController = locator.get<UserAuthController>();

late final _logger = taggedLogger(tag: 'StreamDogFoodingAppContent');
late final _router = initRouter(_userAuthController);

@override
Expand Down Expand Up @@ -150,6 +146,7 @@ class _StreamDogFoodingAppContentState
}

void _onCallAccept(ActionCallAccept event) async {
_logger.d(() => '[onCallAccept] event: $event');
final streamVideo = locator.get<StreamVideo>();

final uuid = event.data.uuid;
Expand All @@ -163,7 +160,7 @@ class _StreamDogFoodingAppContentState
var acceptResult = await callToJoin.accept();

// Return if cannot accept call
if(acceptResult.isFailure) {
if (acceptResult.isFailure) {
debugPrint('Error accepting call: $call');
return;
}
Expand All @@ -177,6 +174,7 @@ class _StreamDogFoodingAppContentState
}

void _onCallDecline(ActionCallDecline event) async {
_logger.d(() => '[onCallDecline] event: $event');
final streamVideo = locator.get<StreamVideo>();

final uuid = event.data.uuid;
Expand Down
66 changes: 42 additions & 24 deletions packages/stream_video/lib/src/call/call.dart
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ typedef OnCallPermissionRequest = void Function(
typedef GetCurrentUserId = String? Function();

typedef SetActiveCall = Future<void> Function(Call?);
typedef GetActiveCallCid = StreamCallCid? Function();

const _idState = 1;
const _idUserId = 2;
Expand All @@ -53,6 +54,7 @@ class Call {
required CoordinatorClient coordinatorClient,
required StateEmitter<User?> currentUser,
required SetActiveCall setActiveCall,
required GetActiveCallCid getActiveCallCid,
RetryPolicy? retryPolicy,
SdpPolicy? sdpPolicy,
CallPreferences? preferences,
Expand All @@ -63,6 +65,7 @@ class Call {
coordinatorClient: coordinatorClient,
currentUser: currentUser,
setActiveCall: setActiveCall,
getActiveCallCid: getActiveCallCid,
retryPolicy: retryPolicy,
sdpPolicy: sdpPolicy,
preferences: preferences,
Expand All @@ -77,6 +80,7 @@ class Call {
required CoordinatorClient coordinatorClient,
required StateEmitter<User?> currentUser,
required SetActiveCall setActiveCall,
required GetActiveCallCid getActiveCallCid,
RetryPolicy? retryPolicy,
SdpPolicy? sdpPolicy,
CallPreferences? preferences,
Expand All @@ -87,6 +91,7 @@ class Call {
coordinatorClient: coordinatorClient,
currentUser: currentUser,
setActiveCall: setActiveCall,
getActiveCallCid: getActiveCallCid,
retryPolicy: retryPolicy,
sdpPolicy: sdpPolicy,
preferences: preferences,
Expand All @@ -101,6 +106,7 @@ class Call {
required CoordinatorClient coordinatorClient,
required StateEmitter<User?> currentUser,
required SetActiveCall setActiveCall,
required GetActiveCallCid getActiveCallCid,
RetryPolicy? retryPolicy,
SdpPolicy? sdpPolicy,
CallPreferences? preferences,
Expand All @@ -111,6 +117,7 @@ class Call {
coordinatorClient: coordinatorClient,
currentUser: currentUser,
setActiveCall: setActiveCall,
getActiveCallCid: getActiveCallCid,
retryPolicy: retryPolicy,
sdpPolicy: sdpPolicy,
preferences: preferences,
Expand All @@ -122,6 +129,7 @@ class Call {
required CoordinatorClient coordinatorClient,
required StateEmitter<User?> currentUser,
required SetActiveCall setActiveCall,
required GetActiveCallCid getActiveCallCid,
RetryPolicy? retryPolicy,
SdpPolicy? sdpPolicy,
CallPreferences? preferences,
Expand All @@ -145,6 +153,7 @@ class Call {
coordinatorClient: coordinatorClient,
currentUser: currentUser,
setActiveCall: setActiveCall,
getActiveCallCid: getActiveCallCid,
preferences: finalCallPreferences,
stateManager: stateManager,
credentials: credentials,
Expand All @@ -157,6 +166,7 @@ class Call {
Call._({
required StateEmitter<User?> currentUser,
required SetActiveCall setActiveCall,
required GetActiveCallCid getActiveCallCid,
required CoordinatorClient coordinatorClient,
required CallPreferences preferences,
required CallStateNotifier stateManager,
Expand All @@ -177,6 +187,7 @@ class Call {
.whereNotNull()
.distinct(),
_setActiveCall = setActiveCall,
_getActiveCallCid = getActiveCallCid,
_coordinatorClient = coordinatorClient,
_preferences = preferences,
_retryPolicy = retryPolicy,
Expand All @@ -196,6 +207,7 @@ class Call {
final GetCurrentUserId _getCurrentUserId;
final Stream<String> _currentUserIdUpdates;
final SetActiveCall _setActiveCall;
final GetActiveCallCid _getActiveCallCid;
final CoordinatorClient _coordinatorClient;
final RetryPolicy _retryPolicy;
final CallPreferences _preferences;
Expand Down Expand Up @@ -383,29 +395,35 @@ class Call {

@Deprecated('Lobby view no longer needs joining to coordinator')
Future<Result<None>> joinLobby() async {
_logger.d(() => '[join] no args');
_logger.d(() => '[joinLobby] no args');
_stateManager.lifecycleCallJoining(const CallJoining());
final joinedResult = await _joinIfNeeded();
if (joinedResult is Success<CallCredentials>) {
_logger.v(() => '[join] completed');
_logger.v(() => '[joinLobby] completed');
return const Result.success(none);
} else {
final failedResult = joinedResult as Failure;
_logger.e(() => '[join] failed: $failedResult');
_logger.e(() => '[joinLobby] failed: $failedResult');
final error = failedResult.error;
_stateManager.lifecycleCallConnectFailed(ConnectFailed(error));
return failedResult;
}
}

Future<Result<None>> join() async {
_logger.i(() => '[connect] status: ${_status.value}');
_logger.i(() => '[join] status: ${_status.value}');
if (_status.value == _ConnectionStatus.connected) {
_logger.w(() => '[connect] rejected (connected)');
_logger.w(() => '[join] rejected (connected)');
return const Result.success(none);
}
if (_getActiveCallCid() == callCid) {
_logger.w(
() => '[join] rejected (a call with the same cid is in progress)',
);
return Result.error('a call with the same cid is in progress');
}
if (_status.value == _ConnectionStatus.connecting) {
_logger.v(() => '[connect] await "connecting" change');
_logger.v(() => '[join] await "connecting" change');
final status = await _status.firstWhere(
(it) => it != _ConnectionStatus.connecting,
timeLimit: _preferences.connectTimeout,
Expand All @@ -423,10 +441,10 @@ class Call {
.storeIn(_idConnect, _cancelables)
.valueOrDefault(Result.error('connect cancelled'));
if (result.isSuccess) {
_logger.v(() => '[connect] finished: $result');
_logger.v(() => '[join] finished: $result');
_status.value = _ConnectionStatus.connected;
} else {
_logger.e(() => '[connect] failed: $result');
_logger.e(() => '[join] failed: $result');
await leave();
}
return result;
Expand Down Expand Up @@ -458,26 +476,26 @@ class Call {
}

Future<Result<None>> _connect() async {
_logger.d(() => '[connect] options: $_connectOptions');
_logger.d(() => '[join] options: $_connectOptions');
final validation = await _stateManager.validateUserId(_getCurrentUserId);
if (validation.isFailure) {
_logger.w(() => '[connect] rejected (validation): $validation');
_logger.w(() => '[join] rejected (validation): $validation');
return validation;
}
_logger.v(() => '[connect] validated');
_logger.v(() => '[join] validated');

final state = this.state.value;
final status = state.status;
if (!status.isConnectable) {
_logger.w(() => '[connect] rejected (not Connectable): $status');
_logger.w(() => '[join] rejected (not Connectable): $status');
return Result.error('invalid status: $status');
}
_observeState();
_observeEvents();
_observeUserId();
final result = await _awaitIfNeeded();
if (result.isFailure) {
_logger.e(() => '[connect] waiting failed: $result');
_logger.e(() => '[join] waiting failed: $result');

_stateManager.lifecycleCallTimeout(const CallTimeout());

Expand All @@ -486,28 +504,28 @@ class Call {

_stateManager
.lifecycleCallConnectingAction(CallConnecting(_reconnectAttempt));
_logger.v(() => '[connect] joining to coordinator');
_logger.v(() => '[join] joining to coordinator');
final joinedResult = await _joinIfNeeded();
if (joinedResult is! Success<CallCredentials>) {
_logger.e(() => '[connect] joining failed: $joinedResult');
_logger.e(() => '[join] coordinator joining failed: $joinedResult');
final error = (joinedResult as Failure).error;
_stateManager.lifecycleCallConnectFailed(ConnectFailed(error));
return result;
}

_logger.v(() => '[connect] starting sfu session');
_logger.v(() => '[join] starting sfu session');
final sessionResult = await _startSession(joinedResult.data);
if (sessionResult is! Success<None>) {
_logger.w(() => '[connect] sfu session start failed: $sessionResult');
_logger.w(() => '[join] sfu session start failed: $sessionResult');
final error = (sessionResult as Failure).error;
_stateManager.lifecycleCallConnectFailed(ConnectFailed(error));
return sessionResult;
}
_logger.v(() => '[connect] started session');
_logger.v(() => '[join] started session');
_stateManager.lifecycleCallConnected(const CallConnected());
await _applyConnectOptions();

_logger.v(() => '[connect] completed');
_logger.v(() => '[join] completed');
return const Result.success(none);
}

Expand Down Expand Up @@ -662,19 +680,19 @@ class Call {

Future<Result<None>> leave() async {
final state = this.state.value;
_logger.i(() => '[disconnect] ${_status.value}; state: $state');
_logger.i(() => '[leave] ${_status.value}; state: $state');
if (state.status.isDisconnected) {
_logger.w(() => '[disconnect] rejected (state.status is disconnected)');
_logger.w(() => '[leave] rejected (state.status is disconnected)');
return const Result.success(none);
}
if (_status.value == _ConnectionStatus.disconnected) {
_logger.w(() => '[disconnect] rejected (status is disconnected)');
_logger.w(() => '[leave] rejected (status is disconnected)');
return const Result.success(none);
}
_status.value = _ConnectionStatus.disconnected;
await _clear('disconnect');
await _clear('leave');
_stateManager.lifecycleCallDisconnected(const CallDisconnected());
_logger.v(() => '[disconnect] finished');
_logger.v(() => '[leave] finished');
return const Result.success(none);
}

Expand Down
3 changes: 3 additions & 0 deletions packages/stream_video/lib/src/core/client_state.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import '../call/call.dart';
import '../models/call_cid.dart';
import '../models/user.dart';
import '../shared_emitter.dart';
import '../state_emitter.dart';
Expand Down Expand Up @@ -53,6 +54,8 @@ class MutableClientState implements ClientState {
connection.value = ConnectionState.disconnected(user.value.id);
}

StreamCallCid? getActiveCallCid() => activeCall.valueOrNull!.callCid;

Future<void> setActiveCall(Call? call) async {
final ongoingCall = activeCall.valueOrNull;
if (ongoingCall != null && call != null) {
Expand Down
Loading

0 comments on commit fd2dffb

Please sign in to comment.