-
Couldn't load subscription status.
- Fork 1.7k
Deduplicate PhysicalExpr on proto ser/de using Arc pointer addresses #18192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| // Check cache first if an ID is present | ||
| if let Some(id) = proto.id { | ||
| if let Some(cached) = decode_ctx.get_cached_expr(id) { | ||
| return Ok(cached); | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a very elegant solution!
It does solve the problem at hand, but I wonder how could this be extended for the case where the producer part of a dynamic filter is deserialized in one machine, and the consumer part in deserialized in other different machine, which is almost always going to be the case in a distributed context.
For example, the idea that powered gabotechs#7, is that users can subscribe to changes to the dynamic filters in the global registry, and send/produce updates over the wire, something like:
let ctx = SessionContext::new();
let registry = ctx
.task_ctx()
.session_config()
.get_extension::<DynamicFiltersRegistry>();
registry.subscribe_to_updates();
registry.push_updates();
let plan: Arc<dyn ExecutionPlan>; // <- plan with dynamic filters
execute_stream(plan, ctx.task_ctx());I really would love to see something like this happening without recurring to a "global place" for reading/writing updates to dynamic filters, but I cannot come up with other ideas.
Do you think there's a chance we can do that with something simpler like what this PR proposes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 your idea for having callbacks in #17370 could be a good alternative, although I wonder how those can be set from the outside in an arbitrary Arc<dyn ExecutionPlan>.
If we had a way of getting all the dynamic filter expressions in a plan (fn(plan: &Arc<dyn ExecutionPlan>) -> Vec<&DynamicPhysicalExpr>) this would indeed be a very nice approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking that we set the callbacks when we deserialize in a custom codec or something. We might have to add a function to the codec trait along the lines of visit_physical_expr(&self, expr: Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that it can happen that under certain circumstances the producer part of a dynamic filter is never serialized/deserialized, as it might never get sent over the wire, but the consumer part does. I imagine in this scenario we will be left with an un-connected dynamic filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we can inject the hooks when we serialize as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it will also not get serialized at all, any dynamic filter present above the first network boundary (reading from top to bottom) will never suffer any serialization or deserialization
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see your point now. So we still need some way to get a reference to all Arc<dyn PhysicalExpr> outside of serialization 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also played with the option of adding an expressions(&self) -> Vec<&dyn PhysicalExpr> or something similar to the ExecutionPlan trait, like the children() method, but it gets a bit messy as the relationship between expressions and an ExecutionPlan is a bit different.
It was not too bad though, maybe that approach could be revisited.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may just need to go in that direction. Since it wouldn't be used for serialization it's okay if it's not implemented everywhere. As long as we implement it on the key nodes we know have dynamic filters it should work.
| } | ||
|
|
||
| // Optional ID for caching during deserialization. | ||
| // Set to the Arc pointer address during serialization to enable deduplication. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 I'm not sure if this addresses the fact that serialization will most likely happen on one machine, but deserialization would happen in a different one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What this accomplishes is that if you deserialize both a DataSourceExec and TopK execution node on the same machine at least those two are connected.
Connecting them between machines needs more work, e.g. #18192 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Still feels a bit weird to use a pointer address as identifier where in most cases serialization/deserialization will happen in different machines, but I have to agree it gets the job done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thing is we're not really using the addresses as pointers, just as a way to identify which two expressions are the same expression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can change the documentation to something like:
// Optional ID for caching during deserialization. This is used for deduplication,
// so PhysicalExprs with the same ID will be deserialized as Arcs pointing to the
// same address (instead of distinct addresses) on the deserializing machine.
//
// We use the Arc pointer address during serialization as the ID, as this by default
// indicates if a PhysicalExpr is identical to another on the serializing machine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice suggestion, I'll commit it tomorrow :)
|
@Jefffrey any chance you could give some input on this change? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this makes sense to me. Main concern (other than the pointer bits) is the introduction of DecodeContext; I guess it wasn't easily possible to do via codec (I think you mentioned this already)?
| } | ||
|
|
||
| // Optional ID for caching during deserialization. | ||
| // Set to the Arc pointer address during serialization to enable deduplication. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can change the documentation to something like:
// Optional ID for caching during deserialization. This is used for deduplication,
// so PhysicalExprs with the same ID will be deserialized as Arcs pointing to the
// same address (instead of distinct addresses) on the deserializing machine.
//
// We use the Arc pointer address during serialization as the ID, as this by default
// indicates if a PhysicalExpr is identical to another on the serializing machine.
I don't see a better way to add the mutable context necessary for this to work. |
Something like this: https://github.com/pydantic/datafusion/pull/41/files |
Hmm yeah I don't this is is any better, since we might as well go all the way instead of a halfway solution 😅 I'll cc @timsaucer too as they also changed the signatures recently for proto physical plan in #18123 |
|
Thanks for the ping. One thing I was planning to do this weekend was to write up a PR to move from I'm happy to put that PR in, but since you're digging into this bit of the code maybe we can include it? Also +1 on the I think the core idea is a good one. |
|
I'm not clear on what the pros/cons of
Hmm the only reason I see to do that is for backwards compatibility. The complexity and opacity introduced is not worth it otherwise IMO. |
In regards to |
Note that having a It might be worth to at least have a plan on how to do that end to end before committing to introducing an API change that might need to get revisited for having a full solution. |
I can chime in here, |
|
I wonder in which cases decoding protobuf is a bottleneck? Do you have some flame graphs to show, or this might be theoretical bottleneck? |
If we're talking about adding |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to proceed in this direction, making cache disabled by default would make sense, as the benefits of having it on are not really obvious and very use case specific.
Isn't there a benefit for all users of reducing blowup of duplicate expressions? If duplicate expressions aren't a problem we wouldn't be Arcing them in the first place. The cost is miniscule: a hashmap of integers and pointers. |
This is good to know. Then either as this PR or as a follow on, it would be good to move the current |
Most users do not have this problem to start with. It's not issue with performance overhead, issue is with user generated ids, and subtitle bugs it can bring. Also I believe it's trivial to have two implementation one which will cache other which won't and change it as needed |
| let protobuf = protobuf::PhysicalPlanNode::decode(bytes) | ||
| .map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?; | ||
| protobuf.try_into_physical_plan(ctx, extension_codec) | ||
| let decode_ctx = DecodeContext::new(ctx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should decode_ctx be method parameter rather than created here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comet is added for consistency with other public methods expecting DecodeCtx but I'm not sure we should expose &DecodeContext in public methods, explanation in following comment
| .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?; | ||
| let extension_codec = DefaultPhysicalExtensionCodec {}; | ||
| back.try_into_physical_plan(&ctx, &extension_codec) | ||
| let decode_ctx = DecodeContext::new(ctx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should decode_ctx be method parameter rather than created here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comet is added for consistency with other public methods expecting DecodeCtx but I'm not sure we should expose &DecodeContext in public methods, explanation in following comment
|
One simple question: if the I believe the current implementation does not prevent the reuse of let task_ctx = ctx.task_ctx();
let decode_ctx = DecodeContext::new(&task_ctx);
let result_exec_plan: Arc<dyn ExecutionPlan> = proto
.try_into_physical_plan(&decode_ctx, codec)
.expect("from proto");Two subsequent encoded plans coming to the same I can't really claim but current approach use of arc address may be safe for referencing expressions within single plan. If |
Yes, my intention was that you create a new |
then public methods should consume (and invalidate) |
|
Yeah that makes sense! I'm not sure how to encode that into the APIs, but making it very explicit in the docs, etc. should be enough? One thing I'm thinking is if there's a way to satisfy all of the input here by making a new high level API. Something along the lines of: pub trait Decoder {
fn decode_plan(&self, plan: PhysicalPlanNode) -> Result<Arc<dyn ExecutionPlan>>;
fn decode_expression(&self, expression: PhysicalExprNode) -> Result<Arc<dyn ExpressionNode>>;
}
pub struct DefaultDecoder {
ctx: TaskContext,
}
impl DefaultDecoder {
pub fn new(ctx: TaskContext) -> Self {
Self { ctx }
}
impl Decoder for DefaultDecoder {
fn decode_plan(decoder: & dyn Decoder, plan: PhysicalPlanNode) -> Result<Arc<dyn ExecutionPlan>> {
// Essentially the code inside of `PhysicalPlanNode::try_from_physical_plan`
// but passing around a reference to ourselves as `&dyn Decoder` so that e.g. if we have to decode
// predicates inside of a plan it calls back into `decode_expression`
// Maybe delegates to
}
fn decode_expression(decoder: &dyn Decoder expression: ExpressionNode, input_schema: &Schema) -> Result<Arc<dyn ExpressionNode>> {
// essentially the code inside of `parse_physical_expr` but again passing around a reference to ourselves
}
}Then it's easy to make a custom Anyway that's a half baked idea and that discussion may be a blocker for this PR but I think it is largely unrelated to "is the deduplication worth doing by default", I'll address that in my next comment. |
I'd argue that most users do have this problem. Consider a query like: SELECT *
FROM 'file.parquet'
WHERE id IN (1, 2, 3, 4, 5...);This PR improves memory usage for this query by avoiding duplicating the
The idea is not to cache expressions across plans, rather within a plan. Here are flame graphs from
Raw data: I generated this by adding the following example to use datafusion::{common::Result, prelude::*};
use datafusion_proto::bytes::{physical_plan_from_bytes, physical_plan_to_bytes};
use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[allow(non_upper_case_globals)]
#[export_name = "malloc_conf"]
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0";
#[tokio::main]
async fn main() -> Result<()> {
let mut prof_ctl = jemalloc_pprof::PROF_CTL.as_ref().unwrap().lock().await;
prof_ctl.activate().unwrap();
let _plan = {
let ctx = SessionContext::new();
let batches = ctx.sql("SELECT c FROM generate_series(1, 1000000) t(c)").await?.collect().await?;
let file = std::fs::File::create("test.parquet")?;
let props = WriterProperties::builder()
// limit batch sizes so that we have useful statistics
.set_max_row_group_size(4096)
.build();
let mut writer = ArrowWriter::try_new(file, batches[0].schema(), Some(props))?;
for batch in &batches {
writer.write(batch)?;
}
writer.close()?;
let mut df = ctx.read_parquet("test.parquet", ParquetReadOptions::default()).await?;
df = df.filter(col("c").in_list((1_000..10_000).map(|v| lit(v)).collect(), false))?;
let plan = df.create_physical_plan().await?;
physical_plan_from_bytes(&physical_plan_to_bytes(plan)?, &ctx.task_ctx())?
};
let pprof = prof_ctl.dump_pprof().unwrap();
std::fs::write("proto_memory.pprof", pprof).unwrap();
Ok(())
}Full diff: It looks like this was able to deduplicate the |
|
5.7 to 4 is around 30% saved, if I'm not mistaken 😀 This optimisation is on the "control plane" not on the "data plane" (30% on data plane would make huge difference, we would not have this discussion in that case). IMHO, small improvements on the "control plane" does not justify additional moving part or increase of interface/protocol complexity. If we put current limitations in the API description we have made it (API) more complex, and provided an avenue to introduce bugs as someone did not read documentation I have never seen decoding having any significant impact, most tasks will take quite more time crunching data, compared to few microseconds saved in decoding (on top of that data has already been moved over the network). |
|
It depends on how you do the math (I did IMO "control plane" vs "data plane" can get a bit blurry, eg in the case of Besides: all of this is to further enable an optimization (dynamic filters) that can make queries 25x faster. We are in fact discussing using DataFusion makes plenty of breaking API changes, I don't even think this is that egregious of one. Is it API changes in this part of code in general that you're opposed to, or mainly the footgun of re-using a cache / context causing collisions? I'm sure the later can be addressed in some way. |
lol, Im not sure you can redefine mats, there are strict rules around it 😂
I'm not sure why this argument is valid in this context 😕 I had sad nothing related to dynamic filters.
That's perfectly fine but let's have some kind of high bar when we do that.
Saving a 1.7MB on a executor which use 8GB does not make huge difference, that like 0.02%, yet for that you have introduced a possibility to shoot your foot and made interface more complex |
|
I think it might be worth to have an end-to-end plan for bringing dynamic filters into distributed contexts before continuing this discussion, as having the full picture on the table can help us better inform decisions. Created a ticket for bringing discussions there: |


This stemmed from wanting to deduplicate
DynamicFilterPhysicalExprwhich is essential for them to have a chance at working.I started by thinking we should add an
idfield toDynamicFilterPhysicalExprspecifically. But then I had the thought: what if I used the pointer address of theArcas the id? This has several advantages:Arc'ed!) and we would currently use multiple times the memory when deserializing anInListexpression because we make a deep copy for eachArc'ed reference.Hashbut not only is it a lot more code to derive it everywhere, it also requires that everything is hashable and could be very expensive to compute (again, imagine a hugeInListexpression).seen: HashSet<usize>on the serialization side and only serialize each expression once.From a high level point of view this also makes sense: if it's the same
Arcon one end it should be on the other end, and this is just a mechanism to achieve that.Use of pointers as numbers is... a bit scary, but this usage seems safe to me:
Maybe there's some issue lurking with threads? Not sure but in any case the ser/de code should all be single threaded, non asynchronous code.
98% of the diff is the change to introduce
DecodeContext. I felt that was cleaner than trying to use some part ofRuntimeEnvor something - that is really a different concern. But I'm open to ideas.