-
Useful link
-
Data engineering basic idea
-
Amazon Kinesis
- 利用 Kinesis 處理串流資料並建立資料湖
- 可以把Kinesis Data Stream裡的shards想成港口,港口越多吞吐量越大,而Firehose delivery stream則是把貨物(資料)運送到目的地的船隻
-
Airflow
-
Pyspark
-
-
Bigquery: can provide a large amount of data storage, and query your stored data in the form of structured query (SQL), and can support the Join action between data tables.
- Note
- BigQuery supports a few external data sources: you may query these sources directly from BigQuery even though the data itself isn't stored in BQ. An external table is a table that acts like a standard BQ table. The table metadata (such as the schema) is stored in BQ storage but the data itself is external and BQ will figure out the table schema and the datatypes based on the contents of the files.(Be aware that BQ cannot determine processing costs of external tables.)
📑 Common metadata :
• table definition (what are the columns, eg: sale_id)
• The data type of each column
• The order in which each column actually appears in the original data
- BigQuery supports a few external data sources: you may query these sources directly from BigQuery even though the data itself isn't stored in BQ. An external table is a table that acts like a standard BQ table. The table metadata (such as the schema) is stored in BQ storage but the data itself is external and BQ will figure out the table schema and the datatypes based on the contents of the files.(Be aware that BQ cannot determine processing costs of external tables.)
- Tip
- Optimize your join patterns Best practice: For queries that join data from multiple tables, start with the largest table.
- Use external data sources appropiately. Constantly reading data from a bucket may incur in additional costs and has worse performance.
- Use clustered and/or partitioned tables if possible.
- Avoid select* (BigQuery engine utilizes columnar storage format to scan only the required columns to run the query. One of the best practices to control costs is to query only the columns that you need.)
- Note
-
pyspark
-
overview
- Spark includes driver and multiple workers(on different node)(master-slave architecture, the master is the driver, and slaves are the workers)
- Spark clusters(often contain multiple computers) are managed by a
master
. A driver that wants to execute a Spark job will send the job to the master, which in turn will divide the work among the cluster's workers. If any worker fails and becomes offline for any reason, the master will reassign the task to another worker.
-
Spark Cluster Topology
Very Short Summary: Driver (submits) -> Master (manages) -> Worker/Executor (pull data and process it)
In Spark Cluster, there are number of executors and each executor processes one file at a time. If we have one big file, then the file can be handled by only one executor and the rest of executors stay in idle state. However, if there are too many partitions, there will be excessive cost in managing lots of small tasks.
Therefore, it is important to understand best practices for to distribute workload in different executors. This is possible by partitioning.
-
Jobs, stages, tasks
An Application consists of a Driver and several Jobs, and a Job consists of multiple Stages
When executing an Application, the Driver will apply for resources from the cluster manager, then start the Executor process that executes the Application, and send the application code and files to the Executor, and then the Executor will execute the Task
After the operation is completed, the execution result will be returned to the Driver
Action -> Job -> Job Stages -> Tasks
- whenever you invoke an action, the SparkContext(Spark的入口,相當於應用程序的main函數) creates a job and runs the job scheduler to divide it into stages-->pipelineable
- tasks are created for every job stage and scheduled to the executors
-
what is RDD
- In Spark, datasets are represented as a list of entries, where the list is broken up into many different partitions that are each stored on a different machine. Each partition holds a unique subset of the entries in the list. Spark calls datasets that it stores "Resilient Distributed Datasets" (RDDs).
- RDD 特性
immutable
: 每個RDD都是不能被改變的,想要更新的?只能從既有中再建立另一個Resilient
:如果某節點機器故障,儲存於節點上的RDD損毀,能重新執行一連串的「轉換」指令,產生新的輸出資料 假設我們對RDD做了一系列轉換,例如: line ▶ badLines ▶ OtherRDD1 ▶ OtherRDD2 ▶ ...,因為每個RDD都是immutable,也就是說,只要紀錄了操作與建立行為(有點類似log),badLines RDD就可以從line RDD取得,所以假設存放badLines RDD的節點損毀了(一或多台),但只要儲存line RDD的節點還在的話,就能還原badLines了
-
RDD操作
- ✅ Transformation:操作一個或多個RDD,並產生出新的RDD
- ✅ Action(行動類操作):將操作結果回傳給Driver,或是對RDD元素執行一些操作,但不會產生新的RDD
- RDD透過運算可以得出新的RDD,但Spark(SQL)會延遲這個「轉換」動作的發生時間點。它並不會馬上執行,而是等到執行了Action之後,才會基於所有的RDD關係來執行轉換。ex: rdd.collect()
-
code example
a = sc.textFile(filename) b = a.filter(lambda x: len(x)>0 and x.split("\t").count("111")) c = b.collect()
(1) variable a will be saved as a RDD variable containing the expected txt file content
❗❗❗ Not really. The line just describes what will happenafter you execute an action
, i.e. the RDD variable does not contain the expected txt file content.
(2) The driver node breaks up the work into tasks and each task contains information about the split of the data it will operate on. Now these Tasks are assigned to worker nodes.
(3) when collection action (i.e collect() in our case) is invoked, the results will be returned to the master from different nodes, and saved as a local variable c.
-
-
pyspark SQL
-
pyspark performance related issue
-
Adding checkpoint(pyspark.sql.DataFrame.checkpoint)
- Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the logical plan of this DataFrame, which is especially useful in iterative algorithms where the plan may grow exponentially.(When the query plan starts to be huge, the performance decreases dramatically)
- After checkpointing the data frame, you don't need to recalculate all of the previous transformations applied on the dataframe, it is stored on disk forever
- When I checkpoint dataframe and I reuse it - It autmoatically reads the data from the dir that we wrote the files? yes, it should be read automatically
-
Spark’s Skew Problem
- Sometimes it might happen that a lot of data goes to a single executor since the same key is assigned for a lot of rows in our data and this might even result in OOM error(when doing groupby or join transformation, same key must stay in same partition and some keys may be more frequent or common which leads to the
skew
), the skewd partition will take longer time to process and make overall job execution time more (all other tasks will be just waiting for it to be completed) - how to solve
- Salting data skew in apache spark
- Sometimes it might happen that a lot of data goes to a single executor since the same key is assigned for a lot of rows in our data and this might even result in OOM error(when doing groupby or join transformation, same key must stay in same partition and some keys may be more frequent or common which leads to the
-
Avoid using UDFs
- When we execute a DataFrame transformation using native or SQL functions, each of those transformations happen inside the JVM itself, which is where the implementation of the function resides. But if we do the same thing using Python UDFs, something very different happens.
- First of all, the code cannot be executed in the JVM, it will have to be in the Python Runtime. To make this possible, each row of the DataFrame is serialized, sent to the Python Runtime and returned to the JVM. As you can imagine, it is nothing optimal.
-
Use toPandas with pyArrow
- using pyarrow to efficiently transfer data between JVM and Python processes
pip install pyarrow spark.conf.set("spark.sql.execution.arrow.enabled", "true")
- using pyarrow to efficiently transfer data between JVM and Python processes
-
pyspark bucketing and Partitioning
-
Partitioning
- Mastering PySpark Partitioning: repartition vs partitionBy
repartition()
is about how the data is distributed across partitions in memory during computation, whilepartitionBy()
is about how the data is partitioned on disk when writing out to a file system.
-
Bucketing
- Use Hash(x) mod n to assign each data to a bucket
- If you are joining a big dataframe multiple times throughout your pyspark application then save that table as bucketed tables and read them back in pyspark as dataframe. This way you can avoid multiple shuffles during join as data is already pre-shuffled
- If you want to use bucket then spark.conf.get("spark.sql.sources.bucketing.enabled") should return True
- With bucketing, we can shuffle the data in advance and save it in this pre-shuffled state(reduce shuffle during join operation)
- Bucket for optimized filtering is available in Spark 2.4+. If we use a filter on the field by which the table is bucketed, Spark will scan files only from the corresponding bucket and avoid a scan of the whole table
- Check if the table bucketed: spark.sql("DESCRIBE EXTENDED table_name").show()
- ❗❗❗ make sure that the columns for joining have same datatype for two tables
- ❗❗❗ it is ideal to have the same number of buckets on both sides of the tables in the join; however, if tables with different bucket numbers: just use
spark.sql.bucketing.coalesceBucketsInJoin.enabled
to make to tabels have same number of buckets
-
-
-
pyspark streaming
-
overview
- Spark Streaming first takes live input data streams and then divides them into batches. After this, the Spark engine processes those streams and generates the final stream results in batches.
-
learning resources
- Apache Spark Structured Streaming — First Streaming Example (1 of 6)
- Apache Spark Structured Streaming — Input Sources (2 of 6)
- Apache Spark Structured Streaming — Output Sinks (3 of 6)
- Apache Spark Structured Streaming — Checkpoints and Triggers (4 of 6)
- Apache Spark Structured Streaming — Operations (5 of 6)
- Apache Spark Structured Streaming — Watermarking (6 of 6)
- Spark streaming output modes
-
-
Kafka
-
useful links
-
basic structure:
- Broker
- A kafka server is a broker. A cluster consists of multiple brokers. A broker can hold multiple topics
- ZooKeeper is responsible for the overall management of Kafka cluster. It monitors the Kafka brokers and notifies Kafka if any broker or partition goes down, or if a new broker or partition goes up
- Topic
- Topic is a stream of messages, you may consider it as table in database
- the word topic refers to a category name used to store and publish a particular stream of data
- Partition
- In order to achieve scalability, a very large topic can be distributed to multiple brokers (ie servers), a topic can be divided into multiple partitions, and each partition is an ordered queue
- Each message in the partition is assigned an ordered id (offset). Kafka only guarantees that messages are sent to consumers in the order in a partition, and does not guarantee the order of a topic as a whole (between multiple partitions)
- That is to say, a topic can have multiple partitions in the cluster, so what is the partition strategy? There are two basic strategies for which partition the message is sent to, one is to use the Key Hash algorithm, the other is to use the Round Robin algorithm
- Each of the partitions could have replicas which are the same copy. This is helpful in avoiding single point of partition failure(
fault tolerance
).
- Replication
- When a partition is replicated accross multiple brokers, one of the brokers becomes the leader for that specific partition. The leader handles the message and writes it to its partition log. The partition log is then replicated to other brokers, which contain replicas for that partition. Replica partitions should contain the same messages as leader partitions.
- If a broker which contains a leader partition dies, another broker becomes the leader and picks up where the dead broker left off, thus guaranteeing that both producers and consumers can keep posting and reading messages.
- Producer : The message producer is the client that sends messages to the kafka broker
flowchart LR p(producer) k{{kafka broker}} subgraph logs[logs for topic 'abc'] m1[message 1] m2[message 2] m3[message 3] end p-->|1. Declare topic 'abc'|k p-->|2. Send messages 1,2,3|k k -->|3. Write messages 1,2,3|logs k-.->|4. ack|p
- Consumer : message consumer, client that fetches messages from kafka broker
flowchart LR c(consumer) k{{kafka broker}} subgraph logs[logs for topic 'abc'] m1[message 1] m2[message 2] m3[message 3] end c-->|1. Subscribe to topic 'abc|k k<-->|2. Check messages|logs k-->|3. Send unread messages|c c-.->|4. ack|k
- A producer writes messages to the topic and a consumer reads them from the topic. This way we are decoupling them since the producer can write messages to the topic without waiting for the consumer.
The consumer can then consume messages at its own pace.
This is known as the publish-subscribe pattern - Retention of records
- One thing that separates Kafka from other messaging systems is the fact that the records are not removed from the topic once they are consumed. This allows multiple consumers to consume the same record and it also allows the same consumer to read the records again (and again)
- Records are removed after a certain period of time. By default, Kafka will retain records in the topic for 7 days. Retention can be configured per topic
- Note on Config
- When a producer does not receive an acknowledgement for some time (defined by the property
max.block.ms
), it resends the message (after time defined by the propertyretry.backoff.ms
). It keeps resending the failed messages for number of times defined by the propertyretries
. batch.size
,linger.ms
When these two parameters are set at the same time, as long as one of the two conditions is met, it will be sent. For example, if batch.size is set to 16kb and linger.ms is set to 50ms, then when the messages in the internal buffer reach 16kb, the messages will be sent. If size of total message <16kb, then the first message will be sent after 50ms of its arrival.
- When a producer does not receive an acknowledgement for some time (defined by the property
- Broker
-
Schema Registry
-
-
Linux v.s. Shell Scripting basic
- note
- 簡明 Linux Shell Script 入門教學
- Parameter indirection
- Command substitution
- Command substitution is a feature of the shell, which helps save the output generated by a command in a variable
- myfoo=$(echo foo) : echo will write foo to standard out and shell will save it by the variable called myfoo
- $
- $(variable) : return the value inside the variable name
- $# : is the number of positional parameters
- $0 : is the name of the shell or shell script
- $1, $2, $3, ... are the positional parameters
-
RDBMS
Normalization example(From CS50 web course)
So far, we’ve only been working with one table at a time, but many databases in practice are populated by a number of tables that all relate to each other in some way. In our flights example, let’s imagine we also want to add an airport code to go with the city. The way our table is currently set up, we would have to add two more columns to go with each row. We would also be repeating information, as we would have to write in multiple places that city X is associated with code Y.
One way we can solve this problem is by deciding to have one table that keeps track of flights, and then another table keeping track of airports. The second table might look something like this
Now we have a table relating codes and cities, rather than storing an entire city name in our flights table, it will save storage space if we’re able to just save the id of that airport. Therefore, we should rewrite the flights table accordingly. Since we’re using the id column of the airports table to populate origin_id and destination_id, we call those valuesForeign Keys
- SQL
-
Cassandra
-
GCP
- GCP Service Account: the purpose of the Service Account is to
provide the permissions required for the application to execute on GCP.
- GCP Service Account: the purpose of the Service Account is to
-
DBT
-
Difference between a data lake and a data warehouse?
- Data lake holds data of all structure types, including raw and unprocessed data, a data warehouse stores data that has been treated and transformed with a specific purpose in mind, which can then be used to source analytic or operational reporting.