Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.spark.shuffle.sort.io;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import com.google.common.annotations.VisibleForTesting;

Expand Down Expand Up @@ -59,7 +59,7 @@ public void initializeExecutor(String appId, String execId, Map<String, String>
}
blockResolver =
new IndexShuffleBlockResolver(
sparkConf, blockManager, Collections.emptyMap() /* Shouldn't be accessed */
sparkConf, blockManager, new ConcurrentHashMap<>() /* Shouldn't be accessed */
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io._
import java.nio.ByteBuffer
import java.nio.channels.Channels
import java.nio.file.Files
import java.util.{Collections, Map => JMap}
import java.util.concurrent.ConcurrentHashMap

import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -58,19 +58,19 @@ private[spark] class IndexShuffleBlockResolver(
conf: SparkConf,
// var for testing
var _blockManager: BlockManager,
val taskIdMapsForShuffle: JMap[Int, OpenHashSet[Long]])
val taskIdMapsForShuffle: ConcurrentHashMap[Int, OpenHashSet[Long]])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the type be the ConcurrentMap interface?

extends ShuffleBlockResolver
with Logging with MigratableResolver {

def this(conf: SparkConf) = {
this(conf, null, Collections.emptyMap())
this(conf, null, new ConcurrentHashMap[Int, OpenHashSet[Long]]())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    class EmptyConcurrentMap<K, V> implements ConcurrentMap<K, V> {
        private EmptyConcurrentMap() {
        }

        public V putIfAbsent(K key, V value) {
            return null;
        }

        public boolean remove(Object key, Object value) {
            return false;
        }

        public boolean replace(K key, V oldValue, V newValue) {
            return false;
        }

        public V replace(K key, V value) {
            return null;
        }

        public int size() {
            return 0;
        }

        public boolean isEmpty() {
            return true;
        }

        public boolean containsKey(Object key) {
            return false;
        }

        public boolean containsValue(Object value) {
            return false;
        }

        public V get(Object key) {
            return null;
        }

        public V put(K key, V value) {
            return null;
        }

        public V remove(Object key) {
            return null;
        }

        public void putAll(Map<? extends K, ? extends V> m) {
        }

        public void clear() {
        }

        public Set<K> keySet() {
            return Collections.emptySet();
        }

        public Collection<V> values() {
            return Collections.emptySet();
        }

        public Set<Map.Entry<K, V>> entrySet() {
            return Collections.emptySet();
        }
    }

Is it possible to define a similar EmptyConcurrentMap and also use a singleton pattern for it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, it seems to be overkill to me.

}

def this(conf: SparkConf, _blockManager: BlockManager) = {
this(conf, _blockManager, Collections.emptyMap())
this(conf, _blockManager, new ConcurrentHashMap[Int, OpenHashSet[Long]]())
}

def this(conf: SparkConf, taskIdMapsForShuffle: JMap[Int, OpenHashSet[Long]]) = {
def this(conf: SparkConf, taskIdMapsForShuffle: ConcurrentHashMap[Int, OpenHashSet[Long]]) = {
this(conf, null, taskIdMapsForShuffle)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,10 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
/** Remove a shuffle's metadata from the ShuffleManager. */
override def unregisterShuffle(shuffleId: Int): Boolean = {
Option(taskIdMapsForShuffle.remove(shuffleId)).foreach { mapTaskIds =>
mapTaskIds.iterator.foreach { mapTaskId =>
shuffleBlockResolver.removeDataByMap(shuffleId, mapTaskId)
mapTaskIds.synchronized {
mapTaskIds.iterator.foreach { mapTaskId =>
shuffleBlockResolver.removeDataByMap(shuffleId, mapTaskId)
}
}
}
true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.spark.*;
import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper;
Expand Down Expand Up @@ -325,7 +326,7 @@ public void writeWithoutSpilling() throws Exception {
@Test
public void writeChecksumFileWithoutSpill() throws Exception {
IndexShuffleBlockResolver blockResolver =
new IndexShuffleBlockResolver(conf, blockManager, Collections.emptyMap());
new IndexShuffleBlockResolver(conf, blockManager, new ConcurrentHashMap<>());
ShuffleChecksumBlockId checksumBlockId =
new ShuffleChecksumBlockId(0, 0, IndexShuffleBlockResolver.NOOP_REDUCE_ID());
String checksumAlgorithm = conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM());
Expand Down Expand Up @@ -356,7 +357,7 @@ public void writeChecksumFileWithoutSpill() throws Exception {
@Test
public void writeChecksumFileWithSpill() throws Exception {
IndexShuffleBlockResolver blockResolver =
new IndexShuffleBlockResolver(conf, blockManager, Collections.emptyMap());
new IndexShuffleBlockResolver(conf, blockManager, new ConcurrentHashMap<>());
ShuffleChecksumBlockId checksumBlockId =
new ShuffleChecksumBlockId(0, 0, IndexShuffleBlockResolver.NOOP_REDUCE_ID());
String checksumAlgorithm = conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM());
Expand Down