工程项目说明


       本Intelij Idea工程是针对TBDS组件认证而开发,包含的demo程序如下:

  1. HBaseDemo:简单的HBase二次开发程序,传入表名,如果不存在,则创建;然后列出所有的表
  2. HDFSDemo:列出根目录/下的所有文件与文件夹
  3. HiveDemo:连接hiveserver,并列出所有的数据库
  4. KafkaProducerDemo:向指定topic发送数据
  5. KafkaConsumerDemo:消费指定topic的数据
  6. MapReduceDemo:wordCount程序
  7. SparkDemo: spark版本的wordCount,使用spark-submit提交
  8. SparkLauncherDemo: spark版本的wordCount,使用java直接提交
  9. ReadHiveTableDemo:使用spark读取hive表的数据
  10. WriteHiveTableDemo:使用spark输出数据到hive表
  11. FlinkKafkaDemo:flink消费kafka数据
  12. Spark输入到Hbase:spark往Hbase写数据的3种方式

编译出的jar包如何运行

       使用maven编译出 dev-demo-.jar,使用java命令直接执行,各功能模块执行方式如下。


运行 HBaseDemo

假定在dev-demo-.jar所在目录执行

java -cp dev-demo-<version>.jar:/usr/hdp/2.2.0.0-2041/hbase/lib/* com.tencent.tbds.demo.hbase.HBaseDemo --auth-id <id> --auth-key <key> --zk-host <host1,host2> --table-name <table name>

参数解释:
auth-id: 认证ID
auth-key: 认证key
zk-host: zookeeper主机列表
table-name: hbase表名


运行 HDFSDemo

假定在dev-demo-.jar所在目录执行

hadoop jar dev-demo-<version>.jar com.tencent.tbds.demo.hdfs.HDFSDemo --auth-user <username> --auth-id <id> --auth-key <key>

参数解释:
auth-user: 认证用户
auth-id: 认证ID
auth-key: 认证key


运行 HiveDemo

准备
代码中默认采用高可用连接方式,因此在运行程序时需要传入zk地址、用户名、密码

运行
假定在dev-demo-.jar所在目录执行

java -cp dev-demo-<version>.jar:$(echo /usr/hdp/2.2.0.0-2041/hive/lib/*.jar | tr ' ' ':'):/usr/hdp/2.2.0.0-2041/hadoop/hadoop-common.jar com.tencent.tbds.demo.hive.HiveDemo --zk-list <host1:port1,host2:port2> --user <user name> --password <password>

参数解释:
zk-list: zookeeper地址列表
user: 连接hive的用户名
password: 连接hive的密码


运行 KafkaProducerDemo

假定在dev-demo-.jar所在目录执行

 java -cp dev-demo-<version>.jar:/usr/hdp/2.2.0.0-2041/kafka/libs/* com.tencent.tbds.demo.kafka.KafkaProducerDemo --auth-id <id> --auth-key <key> --kafka-brokers <broker1:6667,broker2:6667> --topic <topic name>

参数解释:
auth-id: 认证ID
auth-key: 认证key
kafka-brokers: kafka brokers列表
topic: 指定数据发送到哪个topic


运行 KafkaConsumerDemo

假定在dev-demo-.jar所在目录执行

 java -cp dev-demo-<version>.jar:/usr/hdp/2.2.0.0-2041/kafka/libs/* com.tencent.tbds.demo.kafka.KafkaConsumerDemo --auth-id <id> --auth-key <key> --kafka-brokers <broker1:6667,broker2:6667> --topic <topic name>

参数解释:
auth-id: 认证ID
auth-key: 认证key
kafka-brokers: kafka brokers列表
topic: 指定消费哪个topic的数据
offset-reset: 可选参数,默认值是latest


运行 MapReduceDemo

假定在dev-demo-.jar所在目录执行

hadoop jar dev-demo-<version>.jar com.tencent.tbds.demo.mapreduce.MapReduceDemo --auth-user <username> --auth-id <id> --auth-key <key> --input <input path> --output <output path>

参数解释:
auth-user: 认证用户
auth-id: 认证ID
auth-key: 认证key
input: 数据输入目录
output: 数据输出目录


运行 SparkDemo

准备
运行spark程序当前仅支持export认证的方式,不支持在代码中直接注入认证信息

  1. 在运行前请export 认证信息:
     export hadoop_security_authentication_tbds_secureid=g9q06icsbwYWjQ4i2wbjz3MWNpo8DXqAZxzZ
     export hadoop_security_authentication_tbds_securekey=qbQyCiWaCJ0HmgiVpc5qofcKd8kVsJgj
     export hadoop_security_authentication_tbds_username=bhchen
    
    即从路径/opt/cluster_conf/hadoop/ 中读取配置信息,所以请从集群中获取该配置文件并放置到对应的路径中

运行
假定在dev-demo-.jar所在目录执行

  1. 运行方式1:采用spark-submit方式
     /usr/hdp/2.2.0.0-2041/spark/bin/spark-submit --master yarn-cluster --executor-memory 3g --driver-memory 1g --num-executors 2 --executor-cores 2 --class com.tencent.tbds.demo.spark.SparkDemo dev-demo-<version>.jar /tmp/pyspark_demo/pyspark_demo.csv /tmp/spark_wordcount/
    
    其中 /tmp/wordcount/input/readme.txt 为输入数据,/tmp/wordcount/output为输出目录
  2. 运行方式2:采用java的方式,该方式需要使用spark Launcher进程来把主任务调度起来: 原理
    SparkAppHandle handler = new SparkLauncher()
                    .setAppName("spark-wordcount")
                    .setSparkHome("/usr/hdp/2.2.0.0-2041/spark ")
                    .setMaster("yarn")
                    .setConf("spark.driver.memory", "1g")
                    .setConf("spark.executor.memory", "3g")
                    .setConf("spark.executor.cores", "2")
                    .setConf("spark.executor.instances", "2")
                    .setAppResource("dev-demo-<version>.jar")
                    .setMainClass("com.tencent.tbds.demo.spark.SparkDemo")
                    .addAppArgs(args[0], args[1])
                    .setDeployMode("cluster")
    
    执行
    java -Djava.ext.dirs=/usr/hdp/2.2.0.0-2041/spark/jars:/usr/hdp/2.2.0.0-2041/hadoop -cp dev-demo-<version>.jar com.tencent.tbds.demo.spark.SparkLauncherDemo /usr/hdp/2.2.0.0-2041/spark cluster /tmp/pyspark_demo/pyspark_demo.csv /tmp/spark_wordcount/
    

运行 SparkReadHiveTableDemo

这是一个使用spark读取hive表的示例程序

运行步骤:

  1. 使用maven命令打包项目:mvn clean compile package
  2. 将target下的zip包上传到服务器
  3. 解压zip包,进入解压目录
  4. 执行
./bin/spark_read_hive_table_demo.sh --auth-user <user name> --auth-id <secure id> --auth-key <secure key> --hive-metastore-uris <hive metastore address> --hive-db <hive database> --hive-table <hive table name>

参数解释:
auth-user: 认证用户
auth-id: 认证ID
auth-key: 认证key
hive-metastore-uris: hive metastore的地址
hive-db: hive数据库
hive-table: hive表


运行 SparkWriteHiveTableDemo

这是一个使用spark往hive表写数据的示例程序

运行步骤:

  1. 使用maven命令打包项目:mvn clean compile package
  2. 将target下的zip包上传到服务器
  3. 解压zip包,进入解压目录
  4. 执行
./bin/spark_write_hive_table_demo.sh --auth-user <user name> --auth-id <secure id> --auth-key <secure key> --hive-metastore-uris <hive metastore address> --hive-db <hive database> --hive-table <hive table name> --hdfs-path <hdfs path>

参数解释:
auth-user: 认证用户
auth-id: 认证ID
auth-key: 认证key
hive-metastore-uris: hive metastore的地址
hive-db: hive数据库
hive-table: hive表
hdfs-path: 读取数据的路径


运行 FlinkKafkaDemo

这是一个使用flink实时消费kafka数据的示例程序,需要上传到oceanus运行

参数解释:
kafka-brokers: kafka broker地址列表
group-id: 消费者组ID
topic: kafka topic名称
auth-id: 认证ID
auth-key: 认证key


运行 FlinkHDFSSinkDemo

这是一个使用flink实时消费kafka数据,并输出到HDFS的示例程序,需要上传到oceanus运行

参数解释:
kafka-brokers: kafka broker地址列表
group-id: 消费者组ID
topic: kafka topic名称
auth-id: 认证ID
auth-key: 认证key
hdfs-path: 数据输出到HDFS的路径


运行 SparkStreamKafkaDemo

这是一个使用Spark Streaming实时消费kafka数据的示例程序

 spark-submit --class com.tencent.tbds.demo.spark.SparkStreamKafkaDemo --master yarn --deploy-mode client dev-demo-<version>.jar --kafka-brokers <kafka brokers> --group-id <group id> --auth-id <auth id> --auth-key <auth key> --topic <topic name>

参数解释:
kafka-brokers: kafka broker地址列表
group-id: 消费者组ID
auth-id: 认证ID
auth-key: 认证key
topic: kafka topic名称


Spark输出到HBase

Spark输出到HBase总体来说有3种方式,分别对应不同的应用场景

  1. 通过常规HBase API,这种方式吞吐量低,适用于数据量较小的场景,如流计算。
  2. 调用saveAsNewAPIHadoopDataset,底层实现方式是批量+异步,吞吐量高,但容易对region server造成太大压力,适用于中等规模数据的场景
  3. 生成HFile文件,再将此文件导入HBase,适用于大规模数据场景

这3种方式的例子请分别参考:SparkWriteHBaseDirectDemo、SparkWriteHBaseBatchDemo、SparkWriteHBaseBulkLoadDemo,其运行的命令如下:

 spark-submit --class com.tencent.tbds.demo.spark.SparkWriteHBaseDirectDemo --jars $(echo /usr/hdp/2.2.0.0-2041/hbase/lib/*.jar | tr ' ' ',') dev-demo-<version>.jar --auth-id <auth id> --auth-key <auth key> --zk-host <host1,host2...> --table-name <tableName>