/HDP_Tuning_Unofficial

Collection of HDP Tuning Tricks & Tips (unofficial guide)

Primary LanguagePython

Unofficial Guide for Tuning HDP Services

This repo contains settings, links, recommendations, and tips that I have collected by working with many customers on a variety of Hadoop clusters. I will continually update this as I learn more and figure out what works (and what doesn't work). Feel free to use this guide as you optimize your cluster and the associated services.

Includes: Apache Hive, Apache Spark, Apache HBase


  Tuning Tips and Tricks

Hive Ports: (Reference)
  • Hive Server: 10000
  • Hive Web UI: 9999
  • Hive Metastore: 9083


General Recommendations:
  • Enable Tez (hive.execution.engine=tez)
  • Use Vectorization (hive.vectorized.execution.enabled, hive.vectorized.execution.reduce.enabled)
  • Use CBO (hive.cbo.enable, hive.compute.query.using.stats, compute table and column stats)
  • Store as ORC (Use Zlib or Snappy compression)
  • Check SQL syntax
  • Use Tez View (within Ambari) for troublshooting
  • Look at number of reduces & mappers (how many are running in parallel, what are the runtimes)
  • Check the HDFS Hive file sizes (~1GB each). 100/200 MB is small.
  • vCores (~3 CPUs less than total CPUs... Or 80% of VCPUs if overclocked)
  • 2GB Tez container size is a good starting point (hive.tez.container.size=2250)
  • 1GB Yarn container size is a good starting point (yarn.nodemanager.resource.memory-mb=1028)
  • Consider using INSTR() instead of IN
  • Partition (evenly) on moderate cardinality variable (500, 1000 partitions). Useful for where queries.
  • Bucketing (cluster by, sorted by XX buckets - this can be applied to granular data) Useful for joins.


Configuration Suggestions:
set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.enforce.bucketing=true;
set hive.enforce.sorting=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.dynamic.partition=true;
set hive.exec.max.created.files=1000000; --default=100000
set hive.exec.max.dynamic.partitions.pernode=100000; --default=2000
set hive.exec.max.dynamic.partitions=100000; --default=5000
set hive.exec.parallel.thread.number=16; --default=8
set hive.exec.parallel=true; --default=false
set hive.exec.reducers.bytes.per.reducer=1000000000; --default=67108864 (64 MB)
set hive.exec.reducers.max=2000; --default=1009
set hive.execution.engine=tez;
set hive.optimize.reducededuplication.min.reducer=1; --default=4
set hive.optimize.sort.dynamic.partition=true; --default=false
set hive.stats.autogather=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;
set hive.tez.auto.reducer.parallelism=true;
set hive.server2.tez.initialize.default.sessions=true; --default=true/false
set hive.prewarm.enabled=true; --default=false, Use prewarm carefully.. it could add runtime
set hive.prewarm.numcontainers=3; --default=3, Use prewarm carefully.. it could add runtime
set hive.tez.container.size = same as or a small multiple (1 or 2 times that) of YARN container size yarn.scheduler.minimum-allocation-mb but NEVER more than yarn.scheduler.maximum-allocation-mb
set hive.tez.exec.print.summary=true;
--set hive.tez.java.opts=-XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps -XX:+UseNUMA -XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/;
set hive.tez.max.partition.factor=2.0;
set hive.tez.min.partition.factor=0.25;
set hive.vectorized.execution.enabled=true;
set hive.vectorized.execution.reduce.enabled=true; --default=could be true or false
set mapred.job.reduce.input.buffer.percent=0.0;
--set mapred.map.tasks=6;
set mapred.reduce.tasks=-1;
set mapreduce.input.fileinputformat.split.minsize=240000000; -- 240 MB
set mapreduce.input.fileinputformat.split.maxsize=1000000000; -- 1 GB
set mapreduce.input.fileinputformat.split.minsize.per.node=240000000; -- 240 MB
set mapreduce.input.fileinputformat.split.minsize.per.rack=240000000; -- 240 MB
set tez.am.container.reuse.enabled=true;
set tez.am.container.idle.release-timeout-min.millis=30000; default=10000
set tez.am.container.idle.release-timeout-max.millis=90000; default=20000
set tez.grouping.max-size=1073741824; -- 1GB
set tez.grouping.min-size=16777216; -- 16 MB
set tez.shuffle-vertex-manager.min-src-fraction=0.25; --default=0.2
set tez.shuffle-vertex-manager.max-src-fraction=0.75; --default=0.4
--set tez.queue.name=default;
--beeline -n hive -p hive -u "jdbc:hive2://localhost:10000?tez.queue.name=ztest"
set yarn.nodemanager.resource.memory-mb = Usually between 75% - 87.5% RAM
set yarn.scheduler.maximum-allocation-mb = yarn.nodemanager.resource.memory-mb
set yarn.scheduler.minimum-allocation-mb = Memory per processor (or less)


Run the YARN Utility Script to help determine configuration settings.
wget http://public-repo-1.hortonworks.com/HDP/tools/2.5.3.0/hdp_manual_install_rpm_helper_files-2.5.3.0.37.tar.gz tar zxvf hdp_manual_install_rpm_helper_files-2.5.3.0.37.tar.gz

python hdp-configuration-utils.py -c 16 -m 64 -d 4 -k True


Hive Syntax: (Cheatsheet)

DESCRIBE FORMATTED mytable;

CREATE TABLE a_orc STORED AS ORC AS SELECT * FROM A;

CREATE TABLE myTable (employee_id int, item string, price float) STORED AS ORC tblproperties (“orc.compress" = “SNAPPY”);

INSERT INTO TABLE myTable SELECT * FROM staging_table;

When partitioning you will use 1 or more “virtual” columns (xdate and state are virtual):
CREATE TABLE mytable (employee_id int, item string, price float) PARTITIONED BY (xdate STRING, state STRING) CLUSTERED BY (employee_id) INTO 256 BUCKETS;

When loading data into partitioned table, at least one virtual column must be specified.
INSERT INTO mytable (xdate='2017-02-11', state='NC') AS SELECT * FROM staging_table where xdate='2017-02-11' AND state = 'NC';

All partitions can be loaded at once (as dynamic partitions):
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;
INSERT INTO sale (xdate, state) SELECT * FROM staging_table;

Create table and column-level stats (Required for CBO):
ANALYZE TABLE myTable COMPUTE STATISTICS;
ANALYZE TABLE myTable COMPUTE STATISTICS for columns;
ANALYZE TABLE myTable partition (col1, col2, col3) COMPUTE STATISTICS;
ANALYZE TABLE myTable partition (col1, col2, col3) COMPUTE STATISTICS for columns;


Diagram - Hive/Tez Tuning:


Diagram - Hive Partitioning and Bucketing


Hive References:
Hortonworks - Apache Hive Tuning for High Performance
Apache Hive on Tez - Tuning Best Practices (Part 1)
Apache Hive on Tez - Tuning Best Practices (Part 2)
Capacity Scheduler Queues - Allocate cluster resources among users and groups
5 Ways to Improve Hive Performance
Hortonworks Companion Scripts (yarn-utils.py)
Apache Hive Concurrent Sessions
YARN Preemption - Capacity Scheduling / Concurrency




Tuning Tips and Tricks

Tuning Recommendations: (Full Parameter List)

Additional Recommendations:

1. Caching:
  • MEMORY_ONLY: (default/recommended) Store RDD as deserialized objects in JVM Heap
  • MEMORY_ONLY_SER: (2nd option) Store RDD as serialized Kryo objects. Trade CPU time for memory savings
  • MEMORY_AND_DISK: Spill to disk if can’t fit in memory
  • MEMORY_AND_DISK_SER: Spill serialized RDD to disk if it can’t fit in memory

2. Data Serialization Performance:
  • Reduces data size, so less data transfer
  • Use Kyro over Java (Kyro is up to 10x faster)
  • conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
  • sparkConf.set("spark.sql.tungsten.enabled", "true")
  • sparkConf.set("spark.io.compression.codec", "snappy")
  • sparkConf.set("spark.rdd.compress", "true")

3. Memory and Garbage Collection Tuning:
  • GC is a problem for Spark apps which churn RDDs
  • Measure time spent in GC by logging: -verbose:gc –XX:+PrintGCDetails –XX:+PrintGCTimeStamps
  • If there’s excessive GC per task, use the MEMORY_ONLY_SER storage level to limit just one object per RDD
    partition (one byte array) and reduce the spark.storage.memoryFraction value from 0.6 to 0.5 or less.

4. Set Correct Level of Parallelism:
  • set spark.default.parallelism = 2-3 tasks per CPU core in your cluster
  • Normally 3 - 6 executors per node is a reasonable, depends on the CPU cores and memory size per executor
  • sparkConf.set("spark.cores.max", "4")
  • 5 or less cores per executor (per node) (ie. 24-core node could run 24/4cores = 6 executors)
  • set num-executors = (4 nodes * 6 executors = 24 executors - 1 = 23 total executors)
  • set executor-memory = 256GB-2GB RAM / 6 executors = 42GB - 2GB = 40GB

Configuration Suggestions:

Spark References:
Apache Spark (latest) General Tuning
Apache Spark (latest) Properties / Configuration Settings
Hortonworks - Apache Spark Tuning Guide
Tuning Java Garbage Collection
Apache Spark 2.0 Tuning Guide
How to tune Spark executor number cores and executor memory
Apache Spark Config




  Tuning Tips and Tricks

HBase Ports: (Reference)
  • HMaster: 16000
  • HMaster Info Web UI: 16010
  • Region Server (slave nodes): 16020/16320
  • HBase Thrift Server: 9090/9095

General Recommendations:
  • zookeeper.session.timeout = 1 minute (default: 3 minutes)
  • dfs.datanode.failed.volumes.tolerated = half the amount of your available disks
  • HBase currently does not do well with anything above two or three column families
  • Reduce Hotspotting:
      • Salting (add random prefix to the rowkey, causes different sorting)
      • Hashing
      • Reversing the Key (reverse a fixed-width or numeric row key)
  • hbase.regionserver.handler.count = cores (for concurrency, threads kept open to answer incoming requests)
  • hbase.master.balancer.maxRitPercent = 0.25 (25% regions in transition when balancing, and cluster’s availability is at least 75% when balancing)
  • hbase.balancer.period = 30000 (Period, in ms, at which the region balancer runs in the Master)
  • Use 150-175 regions per RegionServer
  • Use 10GB region size
  • if Regionserver.Server.percentFilesLocal less than 70%, then time for compaction.

HBase and Phoenix References:
HBase Reference Guide
Apache Phoenix