1
1
package io .github .jbellis .jvector .util ;
2
2
3
+ import java .util .Objects ;
3
4
import java .util .concurrent .LinkedBlockingQueue ;
4
5
import java .util .concurrent .atomic .AtomicInteger ;
5
6
import java .util .function .Supplier ;
14
15
public abstract class PoolingSupport <T > {
15
16
16
17
/**
17
- * Creates a pool of objects intended to be used by a fixed thread pool.
18
- * The pool size will the processor count .
18
+ * Creates a pool of objects intended to be used by a thread pool.
19
+ * This is a replacement for ThreadLocal .
19
20
* @param initialValue allows creation of new instances for the pool
20
21
*/
21
22
public static <T > PoolingSupport <T > newThreadBased (Supplier <T > initialValue ) {
22
23
return new ThreadPooling <>(initialValue );
23
24
}
24
25
25
- /**
26
- * Creates a pool intended to be used by a fixed thread pool
27
- * @param threadLimit the specific number of threads to be sharing the pooled objects
28
- * @param initialValue allows creation of new instances for the pool
29
- */
30
- public static <T > PoolingSupport <T > newThreadBased (int threadLimit , Supplier <T > initialValue ) {
31
- return new ThreadPooling <>(threadLimit , initialValue );
32
- }
33
-
34
26
/**
35
27
* Special case of not actually needing a pool (when other times you do)
36
28
*
@@ -41,6 +33,16 @@ public static <T> PoolingSupport<T> newNoPooling(T fixedValue) {
41
33
}
42
34
43
35
36
+ /**
37
+ * Recycling of objects using a MPMC queue
38
+ *
39
+ * @param limit the specific number of threads to be sharing the pooled objects
40
+ * @param initialValue allows creation of new instances for the pool
41
+ */
42
+ public static <T > PoolingSupport <T > newQueuePooling (int limit , Supplier <T > initialValue ) {
43
+ return new QueuedPooling <>(limit , initialValue );
44
+ }
45
+
44
46
private PoolingSupport () {
45
47
}
46
48
@@ -62,7 +64,7 @@ private PoolingSupport() {
62
64
* Internal call used when pooled item is returned
63
65
* @param value
64
66
*/
65
- protected abstract void onClosed (T value );
67
+ protected abstract void onClosed (Pooled < T > value );
66
68
67
69
/**
68
70
* Wrapper class for items in the pool
@@ -71,7 +73,7 @@ private PoolingSupport() {
71
73
* in a try-with-resources statement.
72
74
* @param <T>
73
75
*/
74
- public static class Pooled <T > implements AutoCloseable {
76
+ public final static class Pooled <T > implements AutoCloseable {
75
77
private final T value ;
76
78
private final PoolingSupport <T > owner ;
77
79
private Pooled (PoolingSupport <T > owner , T value ) {
@@ -85,56 +87,45 @@ public T get() {
85
87
86
88
@ Override
87
89
public void close () {
88
- owner .onClosed (this . value );
90
+ owner .onClosed (this );
89
91
}
90
92
}
91
93
92
94
93
- static class ThreadPooling <T > extends PoolingSupport <T >
95
+ final static class ThreadPooling <T > extends PoolingSupport <T >
94
96
{
95
- private final int limit ;
96
- private final AtomicInteger created ;
97
- private final LinkedBlockingQueue <T > queue ;
97
+ private final ThreadLocal <Pooled <T >> threadLocal ;
98
98
private final Supplier <T > initialValue ;
99
99
100
100
private ThreadPooling (Supplier <T > initialValue ) {
101
- //+1 for main thread
102
- this (Runtime .getRuntime ().availableProcessors () + 1 , initialValue );
103
- }
104
-
105
- private ThreadPooling (int threadLimit , Supplier <T > initialValue ) {
106
- this .limit = threadLimit ;
107
- this .created = new AtomicInteger (0 );
108
- this .queue = new LinkedBlockingQueue <>(threadLimit );
109
101
this .initialValue = initialValue ;
102
+ this .threadLocal = new ThreadLocal <>();
110
103
}
111
104
105
+ @ Override
112
106
public Pooled <T > get () {
113
- T t = queue . poll ();
114
- if (t != null )
115
- return new Pooled <>( this , t ) ;
107
+ Pooled < T > val = threadLocal . get ();
108
+ if (val != null )
109
+ return val ;
116
110
117
- if (created .incrementAndGet () > limit ) {
118
- created .decrementAndGet ();
119
- throw new IllegalStateException ("Number of outstanding pooled objects has gone beyond the limit of " + limit );
120
- }
121
- return new Pooled <>(this , initialValue .get ());
111
+ val = new Pooled <>(this , initialValue .get ());
112
+ threadLocal .set (val );
113
+ return val ;
122
114
}
123
115
116
+ @ Override
124
117
public Stream <T > stream () {
125
- if (queue .size () < created .get ())
126
- throw new IllegalStateException ("close() was not called on all pooled objects yet" );
127
-
128
- return queue .stream ();
118
+ throw new UnsupportedOperationException ();
129
119
}
130
120
131
- protected void onClosed (T value ) {
132
- queue .offer (value );
121
+ @ Override
122
+ protected void onClosed (Pooled <T > value ) {
123
+
133
124
}
134
125
}
135
126
136
127
137
- static class NoPooling <T > extends PoolingSupport <T > {
128
+ final static class NoPooling <T > extends PoolingSupport <T > {
138
129
private final T value ;
139
130
private final Pooled <T > staticPooled ;
140
131
private NoPooling (T value ) {
@@ -153,7 +144,48 @@ public Stream<T> stream() {
153
144
}
154
145
155
146
@ Override
156
- protected void onClosed (T value ) {
147
+ protected void onClosed (Pooled <T > value ) {
148
+ }
149
+ }
150
+
151
+
152
+ final static class QueuedPooling <T > extends PoolingSupport <T > {
153
+ private final int limit ;
154
+ private final AtomicInteger created ;
155
+ private final LinkedBlockingQueue <Pooled <T >> queue ;
156
+ private final Supplier <T > initialValue ;
157
+
158
+ private QueuedPooling (int limit , Supplier <T > initialValue ) {
159
+ this .limit = limit ;
160
+ this .created = new AtomicInteger (0 );
161
+ this .queue = new LinkedBlockingQueue <>(limit );
162
+ this .initialValue = initialValue ;
163
+ }
164
+
165
+ @ Override
166
+ public Pooled <T > get () {
167
+ Pooled <T > t = queue .poll ();
168
+ if (t != null )
169
+ return t ;
170
+
171
+ if (created .incrementAndGet () > limit ) {
172
+ created .decrementAndGet ();
173
+ throw new IllegalStateException ("Number of outstanding pooled objects has gone beyond the limit of " + limit );
174
+ }
175
+ return new Pooled <>(this , initialValue .get ());
176
+ }
177
+
178
+ @ Override
179
+ public Stream <T > stream () {
180
+ if (queue .size () < created .get ())
181
+ throw new IllegalStateException ("close() was not called on all pooled objects yet" );
182
+
183
+ return queue .stream ().filter (Objects ::nonNull ).map (Pooled ::get );
184
+ }
185
+
186
+ @ Override
187
+ protected void onClosed (Pooled <T > value ) {
188
+ queue .offer (value );
157
189
}
158
190
}
159
191
}
0 commit comments