-
Notifications
You must be signed in to change notification settings - Fork 57
hadoop cluster tuning plan
-
Hadoop Performance Tuning by impetus
script name
total map GB input size
total map GB output size
total reduce GB output size
/task map GB input
/task map GB output
/task reduce GB output
map function (eg FILTER)
reduce function (eg DISTINCT)
total job duration
10/50/90/max map time end
10/50/90/max map duration run
10/50/90/max reduce time end
10/50/90/max reduce duration run
total reduce slots
total reduce tasks
total map slots
total map tasks
total map tasks non-data-local map tasks
/machine reduce slots
/machine map slots
input path names
input filesystem
output filesystem
machine size
machines count of machines
map GB child heap size
reduce GB child heap size
Write
size=4096 # 4 GB
src_file=/mnt/tmp/junk-src-$size
out_file=/mnt/tmp/junk-out-$size
#
time dd if=/dev/rand of=$out_file bs=1024k count=$size
time dd if=$src_file of=/dev/null bs=1024k count=$size
time dd if=$src_file of=$out_file bs=1024k count=$size
from Job Configuration:
mapred.job.shuffle.merge.percent float The memory threshold for fetched map outputs before an in-memory merge is started, expressed as a percentage of memory allocated to storing map outputs in memory. Since map outputs that can't fit in memory can be stalled, setting this high may decrease parallelism between the fetch and merge. Conversely, values as high as 1.0 have been effective for reduces whose input can fit entirely in memory. This parameter influences only the frequency of in-memory merges during the shuffle. io.sort.factor int Specifies the number of segments on disk to be merged at the same time. It limits the number of open files and compression codecs during the merge. If the number of files exceeds this limit, the merge will proceed in several passes. Though this limit also applies to the map, most jobs should be configured so that hitting this limit is unlikely there. mapred.inmem.merge.threshold int The number of sorted map outputs fetched into memory before being merged to disk. Like the spill thresholds in the preceding note, this is not defining a unit of partition, but a trigger. In practice, this is usually set very high (1000) or disabled (0), since merging in-memory segments is often less expensive than merging from disk (see notes following this table). This threshold influences only the frequency of in-memory merges during the shuffle. mapred.job.shuffle.merge.percent float The memory threshold for fetched map outputs before an in-memory merge is started, expressed as a percentage of memory allocated to storing map outputs in memory. Since map outputs that can't fit in memory can be stalled, setting this high may decrease parallelism between the fetch and merge. Conversely, values as high as 1.0 have been effective for reduces whose input can fit entirely in memory. This parameter influences only the frequency of in-memory merges during the shuffle. mapred.job.shuffle.input.buffer.percent float The percentage of memory- relative to the maximum heapsize as typically specified in mapred.child.java.opts- that can be allocated to storing map outputs during the shuffle. Though some memory should be set aside for the framework, in general it is advantageous to set this high enough to store large and numerous map outputs. mapred.job.reduce.input.buffer.percent float The percentage of memory relative to the maximum heapsize in which map outputs may be retained during the reduce. When the reduce begins, map outputs will be merged to disk until those that remain are under the resource limit this defines. By default, all map outputs are merged to disk before the reduce begins to maximize the memory available to the reduce. For less memory-intensive reduces, this should be increased to avoid trips to disk.
- number of map tasks:
- mapred.max.split.size, mapred.min.split.size, size of file, size of HDFS block (
dfs.block.size
)
- mapred.max.split.size, mapred.min.split.size, size of file, size of HDFS block (
- spills
-
io.sort.mb
-- size of the output buffer. Increase if your output size exceeds input size. -
io.sort.spill.percent
andio.sort.record.percent
-- turn down for skinny rows- at io.sort.mb = 250, 85% = 212.5 (data) and 2,457,600 16-byte records. Spill happens at 80% of 212.5, or 170. If your data is larger than this, or you are outputting more records than this, you'll spill.
- for 128 MB, io.sort.mb should be at least 190 MB
-
mapred.local.dir
-- stripe across many local drives
-
Turn up block size for large files. Note that this will lead to spill.
ACMurthy: "Applications should ensure that each reduce should process at least 1-2 GB of data, and at most 5-10GB of data, in most scenarios"
-
other
mapred.map.tasks.speculative.execution
mapred.reduce.tasks.speculative.execution
-
mapred.job.reuse.jvm.num.tasks
= -1 -- to the number of times to reuse a JVM for the same map or reduce transform -or to -1 to reuse without limits. This reduces JVM startup/teardown times.
-
map-side merge
-
io.sort.factor
-- number of streams to merge at once
-
-
copy
-
mapred.compress.map.output
andmapred.map.output.compression.codec
-
tracker.http.threads
> (# reducers) * (mapred.reduce.parallel.copies) -
dfs.namenode.handler.count
-
mapred.job.tracker.handler.count
-
dfs.datanode.handler.count
-
dfs.datanode.max.xceivers
/dfs.datanode.max.xcievers
-- limit on the number of files. Increase to 4096. It's misspelled, so we set both. -
dfs.datanode.socket.write.timeout
-
- merge
-
mapred.reduce.parallel.copies
-- number of threads to copy inputs mapred.job.shuffle.input.buffer.percent
- merge starts when
mapred.job.shuffle.merge.percent
ORmapred.inmem.merge.threshold
is exceeded -
mapred.job.reduce.input.buffer.percent
-- keep sorted input in memory mapred.reduce.copy.backoff
-
- mapred.tasktracker.map.tasks.maximum
- mapred.tasktracker.reduce.tasks.maximum
-
mapred.child.java.opts
,mapred.reduce.child.java.opts
,mapred.map.child.java.opts
1x m1.xlarge master, 5x m1.xlarge worker, 6 map (1200m) 2 red (2400m)
java: -Xmx2400m -Xss128k -XX:+UseCompressedOops -XX:MaxNewSize=200m -server
daemon heap 1000, jobtracker heap 3072
Used 360GB input data, emitted X/60 sample of the data:
twitter_out_04 20742179859 19.3 GB
twitter_out_05 26011764400 24.2 GB
twitter_out_07 36689819407 34.2 GB
twitter_out_10 54037083442 50.3 GB
twitter_out_12 65942075457 61.4 GB
twitter_out_15 83782104433 78.0 GB
twitter_out_18 101422765493 94.5 GB
twitter_out_20 113325576268 105.5 GB
twitter_out_30 173331942369 161.4 GB
io.sort.mb 250
io.sort.factor 25
mapred.inmem.merge.threshold 0
mapred.job.shuffle.merge.percent 0.66
mapred.job.shuffle.input.buffer.percent 0.7
mapred.job.reduce.input.buffer.percent 0.0
mapred.job.reduce.markreset.buffer.percent 0.3
- ex 1: 0005 19gb DISTINCT mapred.job.reduce.input.buffer.percent 0
- ex 2: 0007 106gb DISTINCT mapred.job.reduce.input.buffer.percent 0.5
- ex 3: 0008 106gb DISTINCT mapred.job.reduce.input.buffer.percent 0.7
- ex 4: 0009 19gb GROUP/FLATTEN
- ex 5: 0010 7gb GROUP/FLATTEN
- ex 6: 0011 50gb GROUP/FLATTEN
- ex 7: 0012 50GB DISTINCT
- ex 8: 0013 50GB DISTINCT
- ex 9: 0014 106GB GROUP/FLATTEN
- ex10: 0015 106GB DISTINCT
- ex11: 0016 333GB DISTINCT
io.sort.mb 200
io.sort.factor 10
mapred.inmem.merge.threshold 0
mapred.job.shuffle.merge.percent 0.66
mapred.job.shuffle.input.buffer.percent 0.70
mapred.job.reduce.input.buffer.percent 0.70
mapred.reduce.parallel.copies 30
Ratio of map tasks to reduce tasks => how many map spills
http://old.nabble.com/HDFS-read-write-speeds,-and-read-optimization-td22980463.html
For comparison, on a 1400 node cluster, I can checksum 100 TB in
around 10 minutes, which means I'm seeing read averages of roughly 166
GB/sec. For writes with replication of 3, I see roughly 40-50 minutes
to write 100TB, so roughly 33 GB/sec average. Of course the peaks are
much higher. Each node has 4 SATA disks, dual quad core, and 8 GB of ram.
-- Owen
pig.tmpfilecompression true
pig.tmpfilecompression.codec org.apache.hadoop.io.compress.SnappyCodec
# Specifies the size, in bytes, of data to be processed by a single map. Smaller files are combined untill this size is reached.
# Set to 64MB (1/2 our HDFS block size)
pig.maxCombinedSplitSize 67108864
# Should Pig combine small files so that they are processed as a single map? Processing input (either user input or intermediate input) from multiple small files can be inefficient because a separate map has to be created for each file.
pig.splitCombination
If neither "set default parallel" nor the PARALLEL clause are used, Pig sets the number of reducers using a heuristic based on the size of the input data. You can set the values for these properties:
# Defines the number of input bytes per reduce; default value is 1000*1000*1000 (1GB).
pig.exec.reducers.bytes.per.reducer
# Defines the upper bound on the number of reducers; default is 999.
pig.exec.reducers.max
# The formula, shown below, is very simple and will improve over time. The
# computed value takes all inputs within the script into account and applies the
# computed value to all the jobs within Pig script.
#
# #reducers = MIN (pig.exec.reducers.max, total input size (in bytes) / bytes per reducer)