Replies: 1 comment
-
OK, I thought about this some more, and here is a very contrived solution that doesn't seem ideal to me but does simplify things a bit. I create a This solution doesn't really take full advantage of Mutiny and I would prefer to source the data from a Thank you for any thoughts you may have on a better way to do this! Untested Code: record PointInPolygon(
PreparedPolygon polygon,
Point point,
AtomicBoolean isPointInPolygon
) {}
void process() {
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(16);
var polygonList = new ArrayList<PreparedPolygon>(); // Pretend this is populated with data
var pointList = new ArrayList<Point>(); // Pretend this is populated with data
var pointInPolygonList = new ArrayList<PointInPolygon>();
// Create a very long list of PointInPolygon to process
for (var polygon : polygonList) {
for (var point : pointList) {
pointInPolygonList.add(new PointInPolygon(polygon, point, new AtomicBoolean(false)));
}
}
Multi.createFrom().iterable(pointInPolygonList).onItem().invoke(pip -> {
if (pip.polygon.contains(pip.point)) { // Blocking call
pip.isPointInPolygon.set(true);
}
}).runSubscriptionOn(executor)
.subscribe().with(
item -> System.out.println("Item: " + item),
Throwable::printStackTrace);
}
|
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
I could use some guidance on how I should implement a complex scenario to process a large amount of data, potentially using two
Multi
streams.For my use case, I have very CPU-intensive work that I need to parallelize. I plan to run a
Point
inPolygon
GIS function. I expect to spread this load across 16 CPU cores. The specific call that I plan to make isPreparedGeometry.contains
https://locationtech.github.io/jts/javadoc/org/locationtech/jts/geom/prep/PreparedGeometry.html#contains-org.locationtech.jts.geom.Geometry- which is a blocking call.If I were to just implement some basic imperative code for this scenario, it might look like this:
My first thought on my implementation is to use Vert.x
Verticle
scaled to 16 instances withexecuteBlocking
,ordered
beingfalse
, and a dedicated thread pool of size 16. This would allow me to create a loop structure (to process allPoint
in allPolygon
and make non-blocking calls around this blocking call (such as updating a threadsafe cache).Now, I am thinking this should all be possible with Mutiny's
Multi
. I would have aMulti<Point>
stream and aMulti<Polygon>
stream. I would need to iterate across bothMulti
streams so I process each and every point for each and every polygon. I would also need to make the blocking calls toPreparedGeometry.contains
as the stream is processed.I did read https://smallrye.io/smallrye-mutiny/2.0.0/guides/emit-on-vs-run-subscription-on/ and I am still confused if
emitOn
orrunSubscriptionOn
with a dedicatedThreadPoolExecutor
would be the best choice for my use case for allowing a large number of parallel blocking calls while processing theMulti
. Once again, the order of processing does not matter.What guidance might you have for me to implement this complex scenario?
Multi<Point>
against allMulti<Polygon>
?runSubscriptionOn
oremitOn
?ThreadPoolExecutor
?Beta Was this translation helpful? Give feedback.
All reactions