-
Notifications
You must be signed in to change notification settings - Fork 557
IOLocal
propagation for unsafe access
#3636
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 19 commits
0b88c01
db743e2
716ef32
0a69caf
2775064
270764f
d55489d
2cf72a5
cb3859d
7dce01c
5e171ac
c2f312d
638930d
9174c6a
1987e3a
02a43a6
a7bf748
145fc0e
fa99a5c
6cad03c
bb5d4b1
7517755
8d8e004
3589db4
522677e
6cc4d38
ac88480
49e5c30
925f504
d63a6ff
d4549fb
2502045
535fc8a
d854799
f070552
2cf1d8a
0eec9dd
af84973
1adf368
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
/* | ||
* Copyright 2020-2023 Typelevel | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package cats.effect | ||
|
||
private object IOLocalsConstants { | ||
final val ioLocalPropagation = false | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
/* | ||
* Copyright 2020-2023 Typelevel | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package cats.effect.unsafe; | ||
|
||
// defined in Java since Scala doesn't let us define static fields | ||
final class IOLocalsConstants { | ||
static final boolean ioLocalPropagation = Boolean.getBoolean("cats.effect.ioLocalPropagation"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it would be very helpful if this was exposed to the end user in some way so that they can check if it's enabled and potentially raise an error if not (for example, we don't want people trying to use an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great point. fa99a5c, wdyt? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lgtm, cheers! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. very cool! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a great suggestion. I also wanted to ask something similar.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
You mean so that it can be configured by user-code at runtime? The advantage of using system properties with static final fields is that their values are constant at JVM startup, which allows the JIT to optimize those branches. Allowing dynamic configuration would circumvent that. Or if I missed your point, sorry, please explain 😅 note we also have a lot of configuration available in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @armanbilge Oh, I didn't mean to configure it at runtime. I meant that it would be good to have a proper type for setting local propagation instead of having just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Got it, thanks :) Monix also seems to be using just a |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -105,6 +105,8 @@ private final class IOFiber[A]( | |
@volatile | ||
private[this] var outcome: OutcomeIO[A] = _ | ||
|
||
def getLocalState(): IOLocalState = localState | ||
|
||
override def run(): Unit = { | ||
// insert a read barrier after every async boundary | ||
readBarrier() | ||
|
@@ -250,6 +252,10 @@ private final class IOFiber[A]( | |
pushTracingEvent(cur.event) | ||
} | ||
|
||
if (ioLocalPropagation) { | ||
IOLocals.setState(localState) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Dumb question: can't we simply do this when we get scheduled on a thread? We know when we're on a thread and we know when we get off of it, so can't we simply set and clear the state respectively at those points? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No we can't, unless we unify how the state is represented. Currently it's a var to an immutable map in the fiber and also in the thread. While the fiber is running its copy of the var may be updated effectually in the runloop so the thread-local copy would need to be kept in sync with that. Or we could drive all updates through the thread-local copy of the var, but then there would be a penalty for accessing it esp. if we are not running on a worker thread. Putting aside technical issues, nobody should be unsafely messing about with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What we can do is set the current fiber in a thread local every time we get scheduled on a thread. Then the unsafe Note this would leave the fiber's |
||
} | ||
|
||
var error: Throwable = null | ||
val r = | ||
try cur.thunk() | ||
|
@@ -260,6 +266,10 @@ private final class IOFiber[A]( | |
onFatalFailure(t) | ||
} | ||
|
||
if (ioLocalPropagation) { | ||
localState = IOLocals.getAndClearState() | ||
} | ||
|
||
val next = | ||
if (error == null) succeeded(r, 0) | ||
else failed(error, 0) | ||
|
@@ -324,6 +334,10 @@ private final class IOFiber[A]( | |
pushTracingEvent(delay.event) | ||
} | ||
|
||
if (ioLocalPropagation) { | ||
IOLocals.setState(localState) | ||
} | ||
|
||
// this code is inlined in order to avoid two `try` blocks | ||
var error: Throwable = null | ||
val result = | ||
|
@@ -335,6 +349,10 @@ private final class IOFiber[A]( | |
onFatalFailure(t) | ||
} | ||
|
||
if (ioLocalPropagation) { | ||
localState = IOLocals.getAndClearState() | ||
} | ||
|
||
val nextIO = if (error == null) succeeded(result, 0) else failed(error, 0) | ||
runLoop(nextIO, nextCancelation - 1, nextAutoCede) | ||
|
||
|
@@ -391,6 +409,10 @@ private final class IOFiber[A]( | |
pushTracingEvent(delay.event) | ||
} | ||
|
||
if (ioLocalPropagation) { | ||
IOLocals.setState(localState) | ||
} | ||
|
||
// this code is inlined in order to avoid two `try` blocks | ||
val result = | ||
try f(delay.thunk()) | ||
|
@@ -401,6 +423,10 @@ private final class IOFiber[A]( | |
onFatalFailure(t) | ||
} | ||
|
||
if (ioLocalPropagation) { | ||
localState = IOLocals.getAndClearState() | ||
} | ||
|
||
runLoop(result, nextCancelation - 1, nextAutoCede) | ||
|
||
case 3 => | ||
|
@@ -446,6 +472,10 @@ private final class IOFiber[A]( | |
pushTracingEvent(delay.event) | ||
} | ||
|
||
if (ioLocalPropagation) { | ||
IOLocals.setState(localState) | ||
} | ||
|
||
// this code is inlined in order to avoid two `try` blocks | ||
var error: Throwable = null | ||
val result = | ||
|
@@ -460,6 +490,10 @@ private final class IOFiber[A]( | |
onFatalFailure(t) | ||
} | ||
|
||
if (ioLocalPropagation) { | ||
localState = IOLocals.getAndClearState() | ||
} | ||
|
||
val next = | ||
if (error == null) succeeded(Right(result), 0) else succeeded(Left(error), 0) | ||
runLoop(next, nextCancelation - 1, nextAutoCede) | ||
|
@@ -973,6 +1007,10 @@ private final class IOFiber[A]( | |
if (ec.isInstanceOf[WorkStealingThreadPool[_]]) { | ||
val wstp = ec.asInstanceOf[WorkStealingThreadPool[_]] | ||
if (wstp.canExecuteBlockingCode()) { | ||
if (ioLocalPropagation) { | ||
IOLocals.setState(localState) | ||
} | ||
|
||
var error: Throwable = null | ||
val r = | ||
try { | ||
|
@@ -984,6 +1022,10 @@ private final class IOFiber[A]( | |
onFatalFailure(t) | ||
} | ||
|
||
if (ioLocalPropagation) { | ||
localState = IOLocals.getAndClearState() | ||
} | ||
|
||
val next = if (error eq null) succeeded(r, 0) else failed(error, 0) | ||
runLoop(next, nextCancelation, nextAutoCede) | ||
} else { | ||
|
@@ -1390,6 +1432,11 @@ private final class IOFiber[A]( | |
var error: Throwable = null | ||
val cur = resumeIO.asInstanceOf[Blocking[Any]] | ||
resumeIO = null | ||
|
||
if (ioLocalPropagation) { | ||
IOLocals.setState(localState) | ||
} | ||
|
||
val r = | ||
try cur.thunk() | ||
catch { | ||
|
@@ -1399,6 +1446,10 @@ private final class IOFiber[A]( | |
onFatalFailure(t) | ||
} | ||
|
||
if (ioLocalPropagation) { | ||
localState = IOLocals.getAndClearState() | ||
} | ||
|
||
if (isStackTracing) { | ||
// Remove the reference to the fiber monitor handle | ||
objectState.pop().asInstanceOf[WeakBag.Handle].deregister() | ||
|
Uh oh!
There was an error while loading. Please reload this page.