diff --git a/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/executing/TestExecutor.kt b/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/executing/TestExecutor.kt index 50604d1629..0a0c4e792e 100644 --- a/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/executing/TestExecutor.kt +++ b/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/executing/TestExecutor.kt @@ -6,7 +6,6 @@ import com.avito.instrumentation.reservation.client.ReservationClient import com.avito.instrumentation.reservation.client.ReservationClientFactory import com.avito.instrumentation.reservation.request.Reservation import com.avito.instrumentation.suite.model.TestWithTarget -import com.avito.instrumentation.util.launchGroupedCoroutines import com.avito.runner.scheduler.TestsRunnerClient import com.avito.runner.scheduler.args.Arguments import com.avito.runner.scheduler.runner.model.TestRunRequest @@ -15,8 +14,7 @@ import com.avito.runner.service.worker.device.Serial import com.avito.runner.service.worker.device.model.DeviceConfiguration import com.avito.utils.logging.CILogger import com.avito.utils.logging.commonLogger -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.channels.toList +import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.runBlocking import java.io.File @@ -155,36 +153,26 @@ interface TestExecutor { client: ReservationClient, configurationName: String, reservations: Collection, - action: (devices: Channel) -> Unit + action: (devices: ReceiveChannel) -> Unit ) { - val reservationDeployments = Channel(reservations.size) try { - val serialsChannel = Channel(Channel.UNLIMITED) - - launchGroupedCoroutines { - launch(blocking = false) { - logger.info("Devices: Starting reservation job for configuration: $configurationName...") - client.claim( - reservations = reservations, - serialsChannel = serialsChannel, - reservationDeployments = reservationDeployments - ) - logger.info("Devices: Reservation job completed for configuration: $configurationName") - } - launch { - logger.info("Devices: Starting action for configuration: $configurationName...") - action(serialsChannel) - logger.info("Devices: Action completed for configuration: $configurationName") - } + runBlocking { + logger.info("Devices: Starting reservation job for configuration: $configurationName...") + val result = client.claim( + reservations = reservations + ) + logger.info("Devices: Reservation job completed for configuration: $configurationName") + logger.info("Devices: Starting action for configuration: $configurationName...") + action(result.serials) + logger.info("Devices: Action completed for configuration: $configurationName") } } catch (e: Throwable) { logger.critical("Error during starting reservation job", e) } finally { - reservationDeployments.close() logger.info("Devices: Starting releasing devices for configuration: $configurationName...") runBlocking { - client.release(reservationDeployments = reservationDeployments.toList()) + client.release() } logger.info("Devices: Devices released for configuration: $configurationName") } diff --git a/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/reservation/adb/Device.kt b/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/reservation/adb/Device.kt index 8bf8ccd693..23522f408c 100644 --- a/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/reservation/adb/Device.kt +++ b/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/reservation/adb/Device.kt @@ -8,7 +8,7 @@ import org.funktionale.tries.Try import java.io.File abstract class Device( - protected val serial: Serial, + val serial: Serial, protected val logger: (String) -> Unit = {} ) { private val androidHome: String? = System.getenv("ANDROID_HOME") diff --git a/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/reservation/client/ReservationClient.kt b/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/reservation/client/ReservationClient.kt index 113aa41e19..0c81761599 100644 --- a/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/reservation/client/ReservationClient.kt +++ b/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/reservation/client/ReservationClient.kt @@ -1,15 +1,18 @@ package com.avito.instrumentation.reservation.client -import com.avito.runner.service.worker.device.Serial import com.avito.instrumentation.reservation.request.Reservation -import kotlinx.coroutines.channels.SendChannel +import com.avito.runner.service.worker.device.Serial +import kotlinx.coroutines.channels.ReceiveChannel interface ReservationClient { - suspend fun claim( - reservations: Collection, - serialsChannel: SendChannel, - reservationDeployments: SendChannel + + class ClaimResult( + val serials: ReceiveChannel ) - suspend fun release(reservationDeployments: Collection) + suspend fun claim( + reservations: Collection + ): ClaimResult + + suspend fun release() } diff --git a/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/reservation/client/ReservationClientFactory.kt b/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/reservation/client/ReservationClientFactory.kt index 0c0d0c8583..0a33dfebd8 100644 --- a/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/reservation/client/ReservationClientFactory.kt +++ b/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/reservation/client/ReservationClientFactory.kt @@ -5,6 +5,7 @@ import com.avito.instrumentation.executing.ExecutionParameters import com.avito.instrumentation.reservation.adb.AndroidDebugBridge import com.avito.instrumentation.reservation.adb.EmulatorsLogsReporter import com.avito.instrumentation.reservation.client.kubernetes.KubernetesReservationClient +import com.avito.instrumentation.reservation.client.kubernetes.ReservationDeploymentFactory import com.avito.instrumentation.reservation.client.local.LocalReservationClient import com.avito.runner.service.worker.device.adb.AdbDevicesManager import com.avito.utils.gradle.KubernetesCredentials @@ -57,13 +58,16 @@ interface ReservationClientFactory { kubernetesCredentials = kubernetesCredentials, namespace = executionParameters.namespace ), - configurationName = configuration.name, - projectName = projectName, + reservationDeploymentFactory = ReservationDeploymentFactory( + configurationName = configuration.name, + projectName = projectName, + buildId = buildId, + buildType = buildType, + registry = registry, + logger = logger + ), logger = logger, - buildId = buildId, - buildType = buildType, - emulatorsLogsReporter = emulatorsLogsReporter, - registry = registry + emulatorsLogsReporter = emulatorsLogsReporter ) } } diff --git a/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/reservation/client/kubernetes/KubernetesReservationClient.kt b/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/reservation/client/kubernetes/KubernetesReservationClient.kt index afa9c4764f..e1b636da3f 100644 --- a/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/reservation/client/kubernetes/KubernetesReservationClient.kt +++ b/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/reservation/client/kubernetes/KubernetesReservationClient.kt @@ -4,41 +4,24 @@ import com.avito.instrumentation.reservation.adb.AndroidDebugBridge import com.avito.instrumentation.reservation.adb.EmulatorsLogsReporter import com.avito.instrumentation.reservation.adb.RemoteDevice import com.avito.instrumentation.reservation.client.ReservationClient -import com.avito.instrumentation.reservation.request.Device import com.avito.instrumentation.reservation.request.Reservation -import com.avito.instrumentation.util.forEachAsync -import com.avito.instrumentation.util.iterateInParallel import com.avito.instrumentation.util.waitForCondition import com.avito.runner.service.worker.device.Serial import com.avito.utils.logging.CILogger -import com.fkorotkov.kubernetes.apps.metadata -import com.fkorotkov.kubernetes.apps.newDeployment -import com.fkorotkov.kubernetes.apps.selector -import com.fkorotkov.kubernetes.apps.spec -import com.fkorotkov.kubernetes.apps.template -import com.fkorotkov.kubernetes.metadata -import com.fkorotkov.kubernetes.newContainer -import com.fkorotkov.kubernetes.newEnvVar -import com.fkorotkov.kubernetes.newHostPathVolumeSource -import com.fkorotkov.kubernetes.newToleration -import com.fkorotkov.kubernetes.newVolume -import com.fkorotkov.kubernetes.newVolumeMount -import com.fkorotkov.kubernetes.resources -import com.fkorotkov.kubernetes.securityContext -import com.fkorotkov.kubernetes.spec import io.fabric8.kubernetes.api.model.Pod -import io.fabric8.kubernetes.api.model.PodSpec -import io.fabric8.kubernetes.api.model.Quantity import io.fabric8.kubernetes.api.model.apps.Deployment import io.fabric8.kubernetes.client.KubernetesClient -import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.channels.distinctBy import kotlinx.coroutines.channels.filter +import kotlinx.coroutines.channels.toList import kotlinx.coroutines.delay import kotlinx.coroutines.launch -import java.util.UUID import java.util.concurrent.TimeUnit @Suppress("EXPERIMENTAL_API_USAGE") @@ -46,52 +29,33 @@ class KubernetesReservationClient( private val androidDebugBridge: AndroidDebugBridge, private val kubernetesClient: KubernetesClient, private val emulatorsLogsReporter: EmulatorsLogsReporter, - private val configurationName: String, - private val projectName: String, - private val buildId: String, - private val buildType: String, private val logger: CILogger, - private val registry: String + private val reservationDeploymentFactory: ReservationDeploymentFactory ) : ReservationClient { - private var state: State = State.Idling + private val scope = CoroutineScope(Dispatchers.IO + Job()) - init { - require(buildId.isNotBlank()) { "buildId is blank, client can't distinguish reservations" } - } + private var state: State = State.Idling override suspend fun claim( - reservations: Collection, - serialsChannel: SendChannel, - reservationDeployments: SendChannel // TODO: make this state internal - ) { + reservations: Collection + ): ReservationClient.ClaimResult { if (state !is State.Idling) { throw IllegalStateException("Unable to start reservation job. Already started") } - logger.debug("Starting deployments for configuration: $configurationName...") val podsChannel = Channel() - state = State.Reserving(pods = podsChannel) + val serialsChannel = Channel(Channel.UNLIMITED) + val deploymentsChannel = Channel(reservations.size) + state = State.Reserving(pods = podsChannel, deployments = deploymentsChannel) reservations.forEach { reservation -> - val deploymentName = generateDeploymentName() - reservationDeployments.send(deploymentName) - - logger.debug("Starting deployment: $deploymentName") - when (reservation.device) { - is Device.Phone -> getDeviceDeployment( - count = reservation.count, - phone = reservation.device, - deploymentName = deploymentName - ) - is Device.LocalEmulator -> throw IllegalStateException( - "Local emulator ${reservation.device} is unsupported in kubernetes reservation" - ) - is Device.CloudEmulator -> getCloudEmulatorDeployment( - emulator = reservation.device, - deploymentName = deploymentName, - count = reservation.count - ) - }.create() + val deployment = reservationDeploymentFactory.createDeployment( + namespace = kubernetesClient.namespace, + reservation = reservation + ) + val deploymentName = deployment.metadata.name + deploymentsChannel.send(deploymentName) + deployment.create() logger.debug("Deployment created: $deploymentName") listenPodsFromDeployment( @@ -101,113 +65,111 @@ class KubernetesReservationClient( } //todo use Flow - @Suppress("DEPRECATION") - podsChannel - .filter { it.status.phase == POD_STATUS_RUNNING } - .distinctBy { it.metadata.name } - .forEachAsync { pod -> - logger.debug("Found new pod: ${pod.metadata.name}") - requireNotNull(pod.status.podIP) { "pod has ip after deployment" } - - val serial = emulatorSerialName( - name = pod.status.podIP - ) - val device = androidDebugBridge.getDevice( - serial = serial - ) - check(device is RemoteDevice) - val isReady = device.waitForBoot() - if (isReady) { - emulatorsLogsReporter.redirectLogcat( - emulatorName = serial, - device = device - ) - serialsChannel.send(serial) - - logger.debug("Pod ${pod.metadata.name} sent outside for further usage") - } else { - logger.warn("Pod ${pod.metadata.name} can't load device. Disconnect and delete") - val isDisconnected = device.disconnect().isSuccess() - logger.warn("Disconnect device $serial: $isDisconnected. Can't boot it.") - val isDeleted = kubernetesClient.pods().withName(pod.metadata.name).delete() - logger.warn("Pod ${pod.metadata.name} is deleted: $isDeleted") + scope.launch { + for (pod in podsChannel + .filter { it.status.phase == POD_STATUS_RUNNING } + .distinctBy { it.metadata.name }) { + scope.launch { + val podName = pod.metadata.name + logger.debug("Found new pod: $podName") + val device = getDevice(pod) + val serial = device.serial + val isReady = device.waitForBoot() + if (isReady) { + emulatorsLogsReporter.redirectLogcat( + emulatorName = serial, + device = device + ) + serialsChannel.send(serial) + + logger.debug("Pod $podName sent outside for further usage") + } else { + logger.warn("Pod $podName can't load device. Disconnect and delete") + val isDisconnected = device.disconnect().isSuccess() + logger.warn("Disconnect device $serial: $isDisconnected. Can't boot it.") + val isDeleted = kubernetesClient.pods().withName(podName).delete() + logger.warn("Pod $podName is deleted: $isDeleted") + } } } - } - - override suspend fun release( - reservationDeployments: Collection - ) { - if (state !is State.Reserving) { - // TODO: check on client side beforehand - // TODO this leads to deployment leak - throw RuntimeException("Unable to stop reservation job. Hasn't started yet") } - (state as State.Reserving).pods.close() - - logger.debug("Releasing devices for configuration: $configurationName...") - - reservationDeployments - .iterateInParallel { _, deploymentName -> - - val runningPods = podsFromDeployment( - deploymentName = deploymentName - ).filter { it.status.phase == POD_STATUS_RUNNING } - - if (runningPods.isNotEmpty()) { - logger.debug("Save emulators logs for deployment: $deploymentName") - runningPods - .iterateInParallel { _, pod -> - val podName = pod.metadata.name - requireNotNull(pod.status.podIP) { "pod has ip before removal" } - val serial = emulatorSerialName( - name = pod.status.podIP - ) - val device = androidDebugBridge.getDevice( - serial = serial - ) - check(device is RemoteDevice) - - logger.debug("Saving emulator logs for pod: $podName with serial: $serial...") - try { - val podLogs = kubernetesClient.pods().withName(podName).log - logger.debug("Emulators logs saved for pod: $podName with serial: $serial") + return ReservationClient.ClaimResult( + serials = serialsChannel + ) + } - logger.debug("Saving logcat for pod: $podName with serial: $serial...") - emulatorsLogsReporter.reportEmulatorLogs( - emulatorName = serial, - log = podLogs - ) - logger.debug("Logcat saved for pod: $podName with serial: $serial") - } catch (throwable: Throwable) { - // TODO must be fixed after adding affinity to POD - val podDescription = getPodDescription(podName) - logger.critical( - "Get logs from emulator failed; pod=$podName; podDescription=$podDescription; container serial=$serial", - throwable - ) + override suspend fun release() { + try { + val state = state + if (state !is State.Reserving) { + // TODO: check on client side beforehand + // TODO this leads to deployment leak + throw RuntimeException("Unable to stop reservation job. Hasn't started yet") + } else { + state.pods.close() + state.deployments.close() + for (deploymentName in state.deployments.toList()) { + scope.launch { + val runningPods = podsFromDeployment( + deploymentName = deploymentName + ).filter { it.status.phase == POD_STATUS_RUNNING } + + if (runningPods.isNotEmpty()) { + logger.debug("Save emulators logs for deployment: $deploymentName") + for (pod in runningPods) { + launch { + val podName = pod.metadata.name + val device = getDevice(pod) + val serial = device.serial + try { + logger.debug("Saving emulator logs for pod: $podName with serial: $serial...") + val podLogs = kubernetesClient.pods().withName(podName).log + logger.debug("Emulators logs saved for pod: $podName with serial: $serial") + + logger.debug("Saving logcat for pod: $podName with serial: $serial...") + emulatorsLogsReporter.reportEmulatorLogs( + emulatorName = serial, + log = podLogs + ) + logger.debug("Logcat saved for pod: $podName with serial: $serial") + } catch (throwable: Throwable) { + // TODO must be fixed after adding affinity to POD + val podDescription = getPodDescription(podName) + logger.warn( + "Get logs from emulator failed; pod=$podName; podDescription=$podDescription; container serial=$serial", + throwable + ) + } + + logger.debug("Disconnecting device: $serial") + device.disconnect().fold( + { logger.debug("Disconnecting device: $serial successfully completed") }, + { logger.warn("Failed to disconnect device: $serial") } + ) + } } - - logger.debug("Disconnecting device: $serial") - device.disconnect().fold( - { logger.debug("Disconnecting device: $serial successfully completed") }, - { logger.warn("Failed to disconnect device: $serial") } - ) } - logger.debug("Emulators logs saved for deployment: $deploymentName") + } } - - logger.debug("Deleting deployment: $deploymentName...") - removeEmulatorsDeployment( - deploymentName = deploymentName - ) - logger.debug("Deployment: $deploymentName deleted") + this.state = State.Idling } + } finally { + scope.cancel() + } + } - state = State.Idling + private fun getDevice(pod: Pod): RemoteDevice { + requireNotNull(pod.status.podIP) { "Pod: ${pod.metadata.name} must has an IP" } - logger.debug("Devices released for configuration: $configurationName") + val serial = emulatorSerialName( + name = pod.status.podIP + ) + val device = androidDebugBridge.getDevice( + serial = serial + ) + check(device is RemoteDevice) + return device } private fun getPodDescription(podName: String?): String { @@ -228,163 +190,14 @@ class KubernetesReservationClient( deploymentName: String ) { try { + logger.debug("Deleting deployment: $deploymentName") kubernetesClient.apps().deployments().withName(deploymentName).delete() + logger.debug("Deployment: $deploymentName deleted") } catch (t: Throwable) { logger.warn("Failed to delete deployment $deploymentName", t) } } - private fun getDeviceDeployment( - count: Int, - phone: Device.Phone, - deploymentName: String, - kubernetesNodeName: String = "avi-training06" //temporary node, remove later - ): Deployment { - return deviceDeployment( - deploymentMatchLabels = deviceMatchLabels(phone), - deploymentName = deploymentName, - count = count - ) { - containers = listOf( - newContainer { - name = phone.name.kubernetesName() - image = "$registry/${phone.proxyImage}" - - securityContext { - privileged = true - } - resources { - limits = mapOf( - "android/device" to Quantity("1") - ) - requests = mapOf( - "android/device" to Quantity("1") - ) - } - } - ) - dnsPolicy = "ClusterFirst" - nodeName = kubernetesNodeName - tolerations = listOf( - newToleration { - operator = "Exists" - effect = "NoSchedule" - } - ) - } - } - - private fun getCloudEmulatorDeployment( - emulator: Device.CloudEmulator, - deploymentName: String, - count: Int - ): Deployment { - return deviceDeployment( - deploymentMatchLabels = deviceMatchLabels(emulator), - deploymentName = deploymentName, - count = count - ) { - containers = listOf( - newContainer { - name = emulator.name.kubernetesName() - image = "$registry/${emulator.image}" - - securityContext { - privileged = true - } - - resources { - limits = mapOf( - "cpu" to Quantity(emulator.cpuCoresLimit), - "memory" to Quantity(emulator.memoryLimit) - ) - requests = mapOf( - "cpu" to Quantity(emulator.cpuCoresRequest) - ) - } - - if (emulator.gpu) { - volumeMounts = listOf( - newVolumeMount { - name = "x-11" - mountPath = "/tmp/.X11-unix" - readOnly = true - } - ) - - env = listOf( - newEnvVar { - name = "GPU_ENABLED" - value = "true" - }, - newEnvVar { - name = "SNAPSHOT_DISABLED" - value = "true" - } - ) - } - } - ) - - if (emulator.gpu) { - volumes = listOf( - newVolume { - name = "x-11" - - hostPath = newHostPathVolumeSource { - path = "/tmp/.X11-unix" - type = "Directory" - } - } - ) - } - - tolerations = listOf( - newToleration { - key = "dedicated" - operator = "Equal" - value = "android" - effect = "NoSchedule" - } - ) - } - } - - private fun deviceDeployment( - deploymentMatchLabels: Map, - deploymentName: String, - count: Int, - block: PodSpec.() -> Unit - ): Deployment { - val deploymentSpecificationsMatchLabels = deploymentMatchLabels - .plus("deploymentName" to deploymentName) - - return newDeployment { - apiVersion = "extensions/v1beta1" - metadata { - name = deploymentName - labels = deploymentMatchLabels - finalizers = listOf( - // Remove all dependencies (replicas) in foreground after removing deployment - "foregroundDeletion" - ) - } - spec { - replicas = count - selector { - matchLabels = deploymentSpecificationsMatchLabels - } - - template { - metadata { - labels = deploymentSpecificationsMatchLabels - } - spec(block) - } - } - } - } - private suspend fun Deployment.create() { kubernetesClient.apps().deployments().create(this) waitForDeploymentCreationDone(metadata.name, spec.replicas) @@ -428,8 +241,7 @@ class KubernetesReservationClient( deploymentName: String, podsChannel: SendChannel ) { - // TODO: Don't use global scope. Unconfined coroutines lead to leaks - GlobalScope.launch { + scope.launch { logger.debug("Start listening devices for $deploymentName") var pods = podsFromDeployment(deploymentName) @@ -446,29 +258,12 @@ class KubernetesReservationClient( } } - private fun generateDeploymentName(): String = - "${kubernetesClient.namespace}-${UUID.randomUUID()}" - .kubernetesName() - - private fun deviceMatchLabels( - device: Device - ): Map { - return mapOf( - "type" to buildType, // teamcity or local - "id" to buildId, // teamcity_build_id or local synthetic - "project" to projectName, - "instrumentationConfiguration" to configurationName, - "device" to device.description - ) - } - private fun emulatorSerialName(name: String): Serial = Serial.Remote("$name:$ADB_DEFAULT_PORT") - private fun String.kubernetesName(): String = replace("_", "-").toLowerCase() - - sealed class State { + private sealed class State { class Reserving( - val pods: Channel + val pods: Channel, + val deployments: Channel ) : State() object Idling : State() diff --git a/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/reservation/client/kubernetes/ReservationDeploymentFactory.kt b/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/reservation/client/kubernetes/ReservationDeploymentFactory.kt new file mode 100644 index 0000000000..cf49000903 --- /dev/null +++ b/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/reservation/client/kubernetes/ReservationDeploymentFactory.kt @@ -0,0 +1,229 @@ +package com.avito.instrumentation.reservation.client.kubernetes + +import com.avito.instrumentation.reservation.request.Device +import com.avito.instrumentation.reservation.request.Reservation +import com.avito.utils.logging.CILogger +import com.fkorotkov.kubernetes.apps.metadata +import com.fkorotkov.kubernetes.apps.newDeployment +import com.fkorotkov.kubernetes.apps.selector +import com.fkorotkov.kubernetes.apps.spec +import com.fkorotkov.kubernetes.apps.template +import com.fkorotkov.kubernetes.metadata +import com.fkorotkov.kubernetes.newContainer +import com.fkorotkov.kubernetes.newEnvVar +import com.fkorotkov.kubernetes.newHostPathVolumeSource +import com.fkorotkov.kubernetes.newToleration +import com.fkorotkov.kubernetes.newVolume +import com.fkorotkov.kubernetes.newVolumeMount +import com.fkorotkov.kubernetes.resources +import com.fkorotkov.kubernetes.securityContext +import com.fkorotkov.kubernetes.spec +import io.fabric8.kubernetes.api.model.PodSpec +import io.fabric8.kubernetes.api.model.Quantity +import io.fabric8.kubernetes.api.model.apps.Deployment +import java.util.UUID + +class ReservationDeploymentFactory( + private val configurationName: String, + private val projectName: String, + private val buildId: String, + private val buildType: String, + private val registry: String, + private val logger: CILogger +) { + + init { + require(buildId.isNotBlank()) { "buildId is blank, client can't distinguish reservations" } + } + + fun createDeployment(namespace: String, reservation: Reservation.Data): Deployment { + logger.debug("Creating deployment for configuration: $configurationName") + val deploymentName = generateDeploymentName(namespace) + return when (reservation.device) { + is Device.LocalEmulator -> throw IllegalStateException( + "Local emulator ${reservation.device} is unsupported in kubernetes reservation" + ) + is Device.Phone -> getDeviceDeployment( + count = reservation.count, + phone = reservation.device, + deploymentName = deploymentName + ) + is Device.CloudEmulator -> getCloudEmulatorDeployment( + emulator = reservation.device, + deploymentName = deploymentName, + count = reservation.count + ) + } + } + + private fun getDeviceDeployment( + count: Int, + phone: Device.Phone, + deploymentName: String, + kubernetesNodeName: String = "avi-training06" //temporary node, remove later + ): Deployment { + return deviceDeployment( + deploymentMatchLabels = deviceMatchLabels(phone), + deploymentName = deploymentName, + count = count + ) { + containers = listOf( + newContainer { + name = phone.name.kubernetesName() + image = "$registry/${phone.proxyImage}" + + securityContext { + privileged = true + } + resources { + limits = mapOf( + "android/device" to Quantity("1") + ) + requests = mapOf( + "android/device" to Quantity("1") + ) + } + } + ) + dnsPolicy = "ClusterFirst" + nodeName = kubernetesNodeName + tolerations = listOf( + newToleration { + operator = "Exists" + effect = "NoSchedule" + } + ) + } + } + + private fun getCloudEmulatorDeployment( + emulator: Device.CloudEmulator, + deploymentName: String, + count: Int + ): Deployment { + return deviceDeployment( + deploymentMatchLabels = deviceMatchLabels(emulator), + deploymentName = deploymentName, + count = count + ) { + containers = listOf( + newContainer { + name = emulator.name.kubernetesName() + image = "$registry/${emulator.image}" + + securityContext { + privileged = true + } + + resources { + limits = mapOf( + "cpu" to Quantity(emulator.cpuCoresLimit), + "memory" to Quantity(emulator.memoryLimit) + ) + requests = mapOf( + "cpu" to Quantity(emulator.cpuCoresRequest) + ) + } + + if (emulator.gpu) { + volumeMounts = listOf( + newVolumeMount { + name = "x-11" + mountPath = "/tmp/.X11-unix" + readOnly = true + } + ) + + env = listOf( + newEnvVar { + name = "GPU_ENABLED" + value = "true" + }, + newEnvVar { + name = "SNAPSHOT_DISABLED" + value = "true" + } + ) + } + } + ) + + if (emulator.gpu) { + volumes = listOf( + newVolume { + name = "x-11" + + hostPath = newHostPathVolumeSource { + path = "/tmp/.X11-unix" + type = "Directory" + } + } + ) + } + + tolerations = listOf( + newToleration { + key = "dedicated" + operator = "Equal" + value = "android" + effect = "NoSchedule" + } + ) + } + } + + private fun deviceDeployment( + deploymentMatchLabels: Map, + deploymentName: String, + count: Int, + block: PodSpec.() -> Unit + ): Deployment { + val deploymentSpecificationsMatchLabels = deploymentMatchLabels + .plus("deploymentName" to deploymentName) + + return newDeployment { + apiVersion = "extensions/v1beta1" + metadata { + name = deploymentName + labels = deploymentMatchLabels + finalizers = listOf( + // Remove all dependencies (replicas) in foreground after removing deployment + "foregroundDeletion" + ) + } + spec { + replicas = count + selector { + matchLabels = deploymentSpecificationsMatchLabels + } + + template { + metadata { + labels = deploymentSpecificationsMatchLabels + } + spec(block) + } + } + } + } + + private fun deviceMatchLabels( + device: Device + ): Map { + return mapOf( + "type" to buildType, // teamcity or local + "id" to buildId, // teamcity_build_id or local synthetic + "project" to projectName, + "instrumentationConfiguration" to configurationName, + "device" to device.description + ) + } + + + + private fun generateDeploymentName(namespace: String): String = + "${namespace}-${UUID.randomUUID()}" + .kubernetesName() + + private fun String.kubernetesName(): String = replace("_", "-").toLowerCase() +} diff --git a/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/reservation/client/local/LocalReservationClient.kt b/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/reservation/client/local/LocalReservationClient.kt index e61cf123e0..fde9613719 100644 --- a/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/reservation/client/local/LocalReservationClient.kt +++ b/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/reservation/client/local/LocalReservationClient.kt @@ -2,22 +2,24 @@ package com.avito.instrumentation.reservation.client.local import com.avito.instrumentation.reservation.adb.AndroidDebugBridge import com.avito.instrumentation.reservation.adb.EmulatorsLogsReporter -import com.avito.runner.service.worker.device.Serial import com.avito.instrumentation.reservation.client.ReservationClient -import com.avito.instrumentation.reservation.request.Device as RequestedDevice -import com.avito.runner.service.worker.device.Device as WorkerDevice import com.avito.instrumentation.reservation.request.Reservation -import com.avito.instrumentation.util.forEachAsync import com.avito.runner.service.worker.device.Device.DeviceStatus +import com.avito.runner.service.worker.device.Serial import com.avito.runner.service.worker.device.adb.AdbDevicesManager import com.avito.utils.logging.CILogger -import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.channels.distinctBy import kotlinx.coroutines.delay import kotlinx.coroutines.launch import java.util.concurrent.TimeUnit +import com.avito.instrumentation.reservation.request.Device as RequestedDevice +import com.avito.runner.service.worker.device.Device as WorkerDevice @Suppress("EXPERIMENTAL_API_USAGE") internal class LocalReservationClient( @@ -28,79 +30,79 @@ internal class LocalReservationClient( private val logger: CILogger ) : ReservationClient { + private val scope = CoroutineScope(Dispatchers.IO + Job()) + private var state: State = State.Idling override suspend fun claim( - reservations: Collection, - serialsChannel: SendChannel, - reservationDeployments: SendChannel // TODO: make this state internal - ) { + reservations: Collection + ): ReservationClient.ClaimResult { if (state !is State.Idling) { val error = RuntimeException("Unable to start reservation job. Already started") logger.critical(error.message.orEmpty()) throw error } logger.debug("Starting reservations for the configuration: $configurationName...") - + val serialsChannel = Channel(Channel.UNLIMITED) val devicesChannel = Channel(Channel.UNLIMITED) state = State.Reserving(devices = devicesChannel) reservations.forEach { reservation -> - val fakeDeploymentName = "local-stub" - reservationDeployments.send(fakeDeploymentName) - - logger.debug("Starting deployment: $fakeDeploymentName") check(reservation.device is RequestedDevice.LocalEmulator) { "Non-local emulator ${reservation.device} is unsupported in local reservation" } - logger.debug("Deployment created: $fakeDeploymentName") - listenEmulators(reservation, devicesChannel) } - //todo use Flow - @Suppress("DEPRECATION") - devicesChannel - .distinctBy { it.id } - .forEachAsync { workerDevice -> - logger.info("Found new emulator: ${workerDevice.id}") - - val serial = workerDevice.id - val device = androidDebugBridge.getDevice(serial) - val isReady = device.waitForBoot() - if (isReady) { - emulatorsLogsReporter.redirectLogcat( - emulatorName = serial, - device = device - ) - serialsChannel.send(serial) - - logger.info("Device $serial is reserved for further usage") - } else { - logger.info("Device $serial can't be used") + scope.launch { + for (workerDevice in devicesChannel + .distinctBy { it.id }) { + scope.launch { + logger.info("Found new emulator: ${workerDevice.id}") + + val serial = workerDevice.id + val device = androidDebugBridge.getDevice(serial) + val isReady = device.waitForBoot() + if (isReady) { + emulatorsLogsReporter.redirectLogcat( + emulatorName = serial, + device = device + ) + serialsChannel.send(serial) + + logger.info("Device $serial is reserved for further usage") + } else { + logger.info("Device $serial can't be used") + } } } + } + return ReservationClient.ClaimResult( + serials = serialsChannel + ) } - override suspend fun release( - reservationDeployments: Collection - ) { - if (state !is State.Reserving) { - // TODO: check the state on client side beforehand - val error = RuntimeException("Unable to stop reservation job. Hasn't started yet") - logger.critical(error.message.orEmpty()) - throw error - } - (state as State.Reserving).devices.close() + override suspend fun release() { + try { + if (state !is State.Reserving) { + // TODO: check the state on client side beforehand + val error = RuntimeException("Unable to stop reservation job. Hasn't started yet") + logger.warn(error.message.orEmpty()) + throw error + } + (state as State.Reserving).devices.close() - state = State.Idling + state = State.Idling - logger.info("Devices released for configuration: $configurationName") + logger.info("Devices released for configuration: $configurationName") + } finally { + scope.cancel() + } } private fun listenEmulators(reservation: Reservation.Data, devices: SendChannel) { // TODO: Don't use global scope. Unconfined coroutines lead to leaks - GlobalScope.launch { + scope.launch { logger.debug("Start listening devices for $reservation") // TODO: prevent reusing the same device in different reservations val reservedDevices = mutableSetOf() diff --git a/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/util/Collections.kt b/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/util/Collections.kt deleted file mode 100644 index 11e00354b5..0000000000 --- a/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/util/Collections.kt +++ /dev/null @@ -1,25 +0,0 @@ -package com.avito.instrumentation.util - -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.asCoroutineDispatcher -import kotlinx.coroutines.async -import kotlinx.coroutines.runBlocking -import java.util.concurrent.Executors - -fun Collection.iterateInParallel(action: suspend (index: Int, item: T) -> R): Collection { - val scope = CoroutineScope( - Executors.newFixedThreadPool(size).asCoroutineDispatcher() - ) - - return this - .mapIndexed { index, item -> - scope.async { - action(index, item) - } - } - .map { - runBlocking { - it.await() - } - } -} diff --git a/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/util/Coroutines.kt b/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/util/Coroutines.kt deleted file mode 100644 index 8d2dff8a11..0000000000 --- a/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/util/Coroutines.kt +++ /dev/null @@ -1,15 +0,0 @@ -package com.avito.instrumentation.util - -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.channels.ReceiveChannel -import kotlinx.coroutines.launch - -// TODO: Don't use global scope. Unconfined coroutines lead to leaks -// Client should be responsible for parallelization or it should provide the scope -suspend fun ReceiveChannel.forEachAsync(action: suspend (T) -> Unit) { - for (item in this) { - GlobalScope.launch { - action(item) - } - } -} diff --git a/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/util/FutureValue.kt b/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/util/FutureValue.kt deleted file mode 100644 index 1373eff106..0000000000 --- a/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/util/FutureValue.kt +++ /dev/null @@ -1,30 +0,0 @@ -package com.avito.instrumentation.util - -import java.util.concurrent.CountDownLatch -import java.util.concurrent.atomic.AtomicBoolean - -internal interface FutureValue { - fun get(): T -} - -internal class SettableFutureValue : FutureValue { - private val latch = CountDownLatch(1) - private val isSet = AtomicBoolean() - private var slot: T? = null - - override fun get(): T { - latch.await() - return slot!! - } - - fun set(value: T): Boolean { - if (!isSet.get() && isSet.compareAndSet(false, true)) { - slot = value - latch.countDown() - - return true - } - - return false - } -} diff --git a/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/util/GroupedCoroutinesExecution.kt b/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/util/GroupedCoroutinesExecution.kt deleted file mode 100644 index f58cb4deda..0000000000 --- a/subprojects/gradle/instrumentation-tests/src/main/kotlin/com/avito/instrumentation/util/GroupedCoroutinesExecution.kt +++ /dev/null @@ -1,103 +0,0 @@ -package com.avito.instrumentation.util - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.selects.select -import java.util.concurrent.CancellationException - -internal class GroupedCoroutinesExecution { - - private val coroutines: MutableList = mutableListOf() - - @OptIn(ExperimentalCoroutinesApi::class) - private val activeCoroutines: List - get() = coroutines - .filter { !it.completionChannel.isClosedForReceive } - - private val activeBlockingCoroutines: List - get() = activeCoroutines - .filter { it.blocking } - - fun launch(blocking: Boolean = true, action: suspend () -> T): FutureValue { - val completionChannel = Channel() - - val future = SettableFutureValue() - - val job = GlobalScope.launch { - try { - future.set(action()) - completionChannel.send( - CompletionAwareCoroutine.CoroutineCompletion.SuccessCompletion - ) - } catch (t: Throwable) { - completionChannel.send( - CompletionAwareCoroutine.CoroutineCompletion.ErrorCompletion( - reason = t - ) - ) - } - } - - coroutines.add( - CompletionAwareCoroutine( - job = job, - blocking = blocking, - completionChannel = completionChannel - ) - ) - - return future - } - - fun join() { - runBlocking { - while (true) { - if (activeBlockingCoroutines.isEmpty()) { - activeCoroutines.forEach { - it.job.cancel() - Unit - } - break - } - - select { - activeCoroutines.forEach { coroutine -> - coroutine.completionChannel.onReceive { completionReason -> - when (completionReason) { - is CompletionAwareCoroutine.CoroutineCompletion.ErrorCompletion -> { - throw completionReason.reason - } - - is CompletionAwareCoroutine.CoroutineCompletion.SuccessCompletion -> { - coroutine.completionChannel.close() - } - } - } - } - } - } - } - } - - private class CompletionAwareCoroutine( - val job: Job, - val blocking: Boolean, - val completionChannel: Channel - ) { - sealed class CoroutineCompletion { - class ErrorCompletion(val reason: Throwable) : CoroutineCompletion() - object SuccessCompletion : CoroutineCompletion() - } - } -} - -internal fun launchGroupedCoroutines(action: GroupedCoroutinesExecution.() -> Unit) { - GroupedCoroutinesExecution().apply { - action() - } - .join() -} diff --git a/subprojects/gradle/runner/client/src/main/kotlin/com/avito/runner/scheduler/args/Arguments.kt b/subprojects/gradle/runner/client/src/main/kotlin/com/avito/runner/scheduler/args/Arguments.kt index 278d6d36d2..5b61e6685f 100644 --- a/subprojects/gradle/runner/client/src/main/kotlin/com/avito/runner/scheduler/args/Arguments.kt +++ b/subprojects/gradle/runner/client/src/main/kotlin/com/avito/runner/scheduler/args/Arguments.kt @@ -4,13 +4,13 @@ import com.avito.logger.Logger import com.avito.runner.scheduler.listener.TestLifecycleListener import com.avito.runner.scheduler.runner.model.TestRunRequest import com.avito.runner.service.worker.device.Serial -import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ReceiveChannel import java.io.File data class Arguments( val outputDirectory: File, val requests: List, - val devices: Channel, + val devices: ReceiveChannel, val logger: Logger, val listener: TestLifecycleListener? = null ) diff --git a/subprojects/gradle/runner/service/src/main/kotlin/com/avito/runner/service/worker/device/observer/ExternalIntentionBasedDevicesObserver.kt b/subprojects/gradle/runner/service/src/main/kotlin/com/avito/runner/service/worker/device/observer/ExternalIntentionBasedDevicesObserver.kt index ab9097b596..186f234e29 100644 --- a/subprojects/gradle/runner/service/src/main/kotlin/com/avito/runner/service/worker/device/observer/ExternalIntentionBasedDevicesObserver.kt +++ b/subprojects/gradle/runner/service/src/main/kotlin/com/avito/runner/service/worker/device/observer/ExternalIntentionBasedDevicesObserver.kt @@ -4,13 +4,12 @@ import com.avito.logger.Logger import com.avito.runner.service.worker.device.Device import com.avito.runner.service.worker.device.DevicesManager import com.avito.runner.service.worker.device.Serial -import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.map class ExternalIntentionBasedDevicesObserver( private val devicesManager: DevicesManager, - private val externalIntentionOfSerials: Channel, + private val externalIntentionOfSerials: ReceiveChannel, private val logger: Logger ) : DevicesObserver {