/cdp-workshop

Cloudera CDP Workshop

Primary LanguageJupyter Notebook

Cloudera CDP / CDF Workshop

Hands on概要:

Cloudera Data Platformを利⽤して、NiFi / Hive / Impala / Spark を体験できるハンズオンを実施いたします。 データの収集からデータ蓄積、データ分析を通してデータの可視化まで、データ基盤としての基礎となるデータフローを体験することができます。

  • 1) NiFi でテストデータを集めて、HDFSにデータを保存

  • 2) データETL変換

  • 3) HDFSに保存したデータをHUE+Hiveで分析

  • 4) HDFSに保存したデータをCML+Sparkで分析

本日のサーバー、アカウント情報

自分のアカウント・サーバーを取得。

サーバーアドレス一覧:Google Docs参照。

皆さんが自分利用しているサーバーに、名前を入力してください。

それぞれAWSで実行している違うVM機です。

1) データ収集

1.1) FTPサーバーから、データを収集

FTPからデータを取得し、HDFSの指定したフォルダに保存

step 1: FTPのフォルダから差分ファイルを取得

1.2) AWS S3ストレージから、データを収集

AWS S3からデータを取得し、HDFSの指定したフォルダに保存

AWS Access Key、Secret Access KeyはGoogle Docs参照

2) データETL変換

CREATE EXTERNAL TABLE customer_csv (
  customerid INT,
  name STRING,
  namekana STRING,
  zipcode STRING,
  address STRING,
  phonenumber STRING,
  emailaddress STRING,
  sex STRING,
  age INT,
  frequency INT,
  visitedday STRING
) ROW FORMAT
  DELIMITED FIELDS TERMINATED BY ','
  LINES TERMINATED BY '\n'
  LOCATION '/tmp/ftp_input/customer'
;
CREATE TABLE customer STORED AS PARQUET
  AS SELECT * FROM customer_csv;
CREATE EXTERNAL TABLE webaccess_log_ext (
  host STRING,
  ident STRING,
  usr STRING,
  access_time STRING,
  request STRING,
  status STRING,
  size STRING,
  referer STRING,
  agent STRING,
  cid INT
  ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
  WITH SERDEPROPERTIES (
    'input.regex' = '([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\]) ([^ \"]*|\"[^\"]*\") (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\") ([^ \"]*|\"[^\"]*\"))? \"([0-9]*)\"'
  )
  LOCATION '/tmp/s3_input/webaccess'
;
CREATE TABLE webaccess_log STORED AS PARQUET
  AS SELECT * FROM webaccess_log_ext;

3) HDFSに保存したデータをHUE+Hiveで分析

Step 1 構造化データ

  • customerテーブルから住所が千葉県で訪問日時が’2018/06’のデータを検索するSQLです。

  • LIMIT句で検索件数を制御することができます。

SELECT
  c.customerid,
  c.name,
  c.age,
  c.address,
  c.frequency,
  c.visitedday
FROM
  customer c
WHERE
  (c.age >= 30 and c.age < 40)
  AND c.visitedday LIKE '2018-06%'
ORDER BY
  c.frequency DESC
LIMIT
  10
;

Step 2 非構造化データ

webaccess_logテーブルから2018-07にアクセスがあったデータを取得しています。 毎日のアクセス数を統計します。

SELECT
  day,
  count(day) as access_count
  FROM(
    SELECT
      substr(access_time,2,10) as day
    FROM
      webaccess_log
  ) AS r
  WHERE
    day LIKE '2018-06%'
  GROUP BY day
  ORDER BY day
;

Step 3 構造化と非構造化データに対するクエリ処理

JOIN Query

SELECT
    customerid,
    frequency,
    (unix_timestamp("2018-07-17", 'yyyy-MM-dd') - unix_timestamp(visitedday, 'yyyy-MM-dd')) / (24 * 60 * 60) as recency,
    webrecency
FROM
    customer ct
        JOIN
    (
        SELECT
            cid,
            MIN(unix_timestamp("2018-07-17 +0900", 'yyyy-MM-dd Z') - unix_timestamp(access_time, '[yyyy-MM-dd HH:mm:ss Z]')) / (24 * 60 * 60) AS webrecency
        FROM
            webaccess_log
        GROUP BY
            cid
        ORDER BY
            cid
    ) AS wl
    ON
     wl.cid = ct.customerid

4) HDFSに保存したデータをCML+Sparkで分析

// load data
val customer_raw = spark.read.csv("/tmp/ftp_input/customer/*.csv");

customer_raw.show();

val customer = customer_raw.select($"_c0".alias("customerid"), $"_c1".alias("name"), $"_c2".alias("namekana"), $"_c3".alias("zipcode"), $"_c4".alias("address")) ;

customer.show();

// filter : 301-0で始まる行
val filtered = customer.filter ($"zipcode".contains("301-0"));
filtered.show();

// output
filtered.write.parquet("/tmp/filtered_customer");