本章主要介绍如何通过pyspark读取clickhouse并写入clickhouse分布式表。
- 为适应线上集群环境,可保持spark版本相同
然后去spark官网下载spark(因为我们cdh spark版本是2.4.0)就下载对应的版本并解压到指定位置,然后设置SPARK_HOME
-
clickhouse jdbc 驱动包下载 pyspark 在pycharm中通过jdbc的方式读取clickhouse
- 下载驱动 https://github.com/ClickHouse/clickhouse-jdbc
- 把驱动移动到 之前下载好的spark文件 spark-2.4.0-bin-hadoop2.6/jars
- 配置 calssname : com.clickhouse.jdbc.ClickHouseDriver 即为 ("driver", "com.clickhouse.jdbc.ClickHouseDriver")
-
配置pycharm(图例展示为spark 2.4.2版本)
配置 pyspark和py4j
create TABLE data_report.dwd_test_replica on cluster hgj_clickhouse_2shards_2replicas
(
`customer_id` String COMMENT '企业 ID',
`company_name` Nullable(String) COMMENT '企业名称',
`sell_name` Nullable(String) COMMENT '销售',
`date_time` Nullable(Date) DEFAULT NULL COMMENT '时间',
`aci_send_num` Nullable(Int32) COMMENT '发单'
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/data_report/dwd_test_replica',
'{replica}')
ORDER BY customer_id
SETTINGS index_granularity = 8192;
-- 分布式表
CREATE TABLE IF NOT EXISTS data_report.dwd_test on
cluster hgj_clickhouse_2shards_2replicas as data_report.dwd_test_replica
ENGINE = Distributed(hgj_clickhouse_2shards_2replicas,
data_report,
dwd_test_replica,
cityHash64(customer_id));
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @Project -> File:spark_demo -> spark_clickhouse.py
# @Author :wangxintian
# @Date :2022/6/27 14:34
# @Desc :读写clickhouse
## spark 读取clickhouse
import os
from os.path import abspath
from pyspark.sql import SparkSession
warehouse_location = abspath('spark-warehouse')
# 加载本地环境方法1
os.environ["SPARK_HOME"] = "/Users/neo/Downloads/spark-2.4.0-bin-hadoop2.6"
os.environ["SPARK_PYTHON"] = "/Users/neo/Downloads/spark-2.4.0-bin-hadoop2.6/python"
def spark_write_clickhouse(df, table_name):
df.write \
.format("jdbc") \
.option("url", "jdbc:clickhouse://你的ck地址/default") \
.option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
.option("user", "用户") \
.option("password", "密码") \
.option("dbtable", f"{table_name}") \
.option("batchSize", "100000") \
.option("isolationLevel", "NONE") \
.mode("append") \
.save()
return
# 读取clickhouse
class SparkClickHouse:
def __init__(self, spark_session):
self.spark = spark_session
# 读取clickhouse 返回dataframe
def read_clickhouse(self, sql):
dataframe = self.spark.read \
.format("jdbc") \
.option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
.option("url", "jdbc:clickhouse://你的地址/default") \
.option("user", "用户") \
.option("password", "密码") \
.option("query", f"{sql}") \
.load()
return dataframe
if __name__ == '__main__':
spark = SparkSession \
.builder \
.master("local[*]") \
.appName("spark_read_clickhouse") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("INFO") ## 可以自己设置log级别
spark_clickhouse = SparkClickHouse(spark)
sql = "select * from ocean_shipping.dwd_ka_aci "
df = spark_clickhouse.read_clickhouse(sql)
df.show()
## 写入 clickhouse 分布式表
spark_write_clickhouse(df, "data_report.dwd_test")
# 关闭spark
spark.stop()
# 结束程序
exit(0)
可能遇到的问题
'TypeError: an integer is required (got type bytes)'
可能你的python3.8版本不兼容,降级到python3.7即可。