Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Sources/NIOPosix/MultiThreadedEventLoopGroup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
canBeShutdownIndividually: canEventLoopBeShutdownIndividually,
metricsDelegate: metricsDelegate
)
threadSpecificEventLoop.currentValue = loop
Self.threadSpecificEventLoop.currentValue = loop
defer {
threadSpecificEventLoop.currentValue = nil
Self.threadSpecificEventLoop.currentValue = nil
}
callback(loop)
try loop.run()
Expand Down
56 changes: 37 additions & 19 deletions Sources/NIOPosix/SelectableEventLoop.swift
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,7 @@ internal final class SelectableEventLoop: EventLoop, @unchecked Sendable {
private let promiseCreationStoreLock = NIOLock()
private var _promiseCreationStore: [_NIOEventLoopFutureIdentifier: (file: StaticString, line: UInt)] = [:]

private let metricsDelegate: (any NIOEventLoopMetricsDelegate)?

private var lastTickEndTime: NIODeadline
private var metricsDelegateState: MetricsDelegateState?

@usableFromInline
internal func _promiseCreated(futureIdentifier: _NIOEventLoopFutureIdentifier, file: StaticString, line: UInt) {
Expand Down Expand Up @@ -261,7 +259,9 @@ internal final class SelectableEventLoop: EventLoop, @unchecked Sendable {
canBeShutdownIndividually: Bool,
metricsDelegate: NIOEventLoopMetricsDelegate?
) {
self.metricsDelegate = metricsDelegate
self.metricsDelegateState = metricsDelegate.map { delegate in
MetricsDelegateState(metricsDelegate: delegate, lastTickEndime: .now())
}
self._uniqueID = uniqueID
self._parentGroup = parentGroup
self._selector = selector
Expand All @@ -270,7 +270,6 @@ internal final class SelectableEventLoop: EventLoop, @unchecked Sendable {
self.msgBufferPool = Pool<PooledMsgBuffer>(maxSize: 16)
self.tasksCopy.reserveCapacity(Self.tasksCopyBatchSize)
self.canBeShutdownIndividually = canBeShutdownIndividually
self.lastTickEndTime = .now()
// note: We are creating a reference cycle here that we'll break when shutting the SelectableEventLoop down.
// note: We have to create the promise and complete it because otherwise we'll hit a loop in `makeSucceededFuture`. This is
// fairly dumb, but it's the only option we have.
Expand Down Expand Up @@ -747,21 +746,35 @@ internal final class SelectableEventLoop: EventLoop, @unchecked Sendable {
return nextDeadline
}

private func runLoop(selfIdentifier: ObjectIdentifier) -> NIODeadline? {
let tickStartTime: NIODeadline = .now()
let sleepTime: TimeAmount = tickStartTime - self.lastTickEndTime
private func runOneLoopTick(selfIdentifier: ObjectIdentifier) -> NIODeadline? {
let tickStartInfo:
(
metricsDelegate: any NIOEventLoopMetricsDelegate,
tickStartTime: NIODeadline,
sleepTime: TimeAmount
)? = self.metricsDelegateState.map { metricsDelegateState in
let tickStartTime = NIODeadline.now() // Potentially expensive, only if delegate set.
return (
metricsDelegate: metricsDelegateState.metricsDelegate,
tickStartTime: tickStartTime,
sleepTime: tickStartTime - metricsDelegateState.lastTickEndime
)
}

var tasksProcessedInTick = 0
defer {
let tickEndTime: NIODeadline = .now()
let tickInfo = NIOEventLoopTickInfo(
eventLoopID: selfIdentifier,
numberOfTasks: tasksProcessedInTick,
sleepTime: sleepTime,
startTime: tickStartTime,
endTime: tickEndTime
)
self.metricsDelegate?.processedTick(info: tickInfo)
self.lastTickEndTime = tickEndTime
if let tickStartInfo = tickStartInfo {
let tickEndTime = NIODeadline.now() // Potentially expensive, only if delegate set.
let tickInfo = NIOEventLoopTickInfo(
eventLoopID: selfIdentifier,
numberOfTasks: tasksProcessedInTick,
sleepTime: tickStartInfo.sleepTime,
startTime: tickStartInfo.tickStartTime,
endTime: tickEndTime
)
tickStartInfo.metricsDelegate.processedTick(info: tickInfo)
self.metricsDelegateState?.lastTickEndime = tickEndTime
}
}
while true {
let nextReadyDeadline = self._tasksLock.withLock { () -> NIODeadline? in
Expand Down Expand Up @@ -905,7 +918,7 @@ internal final class SelectableEventLoop: EventLoop, @unchecked Sendable {
}
}
}
nextReadyDeadline = runLoop(selfIdentifier: selfIdentifier)
nextReadyDeadline = self.runOneLoopTick(selfIdentifier: selfIdentifier)
}

// This EventLoop was closed so also close the underlying selector.
Expand Down Expand Up @@ -1036,6 +1049,11 @@ internal final class SelectableEventLoop: EventLoop, @unchecked Sendable {
}
}

struct MetricsDelegateState {
var metricsDelegate: any NIOEventLoopMetricsDelegate
var lastTickEndime: NIODeadline
}

extension SelectableEventLoop: CustomStringConvertible, CustomDebugStringConvertible {
@usableFromInline
var description: String {
Expand Down
Loading