[EPIC] Catalog refactor
dmetasoul01 opened this issue · 0 comments
Overview
An overall refactor on catalog implementation, to address some known issues and pave the way of future development:
The catalog interaction code is now tightly coupled with spark. We need a separate 'clean' implementation independent with compute engines. This enables further and easier integration with query engines like Flink/Presto, etc.
Problems
- users may not have Cassandra deployment or is not familiar with Cassandra;
- Some complex multiple partition/multiple table transaction scenarios are hard to implement based on Cassandra's LWT. We would like to extend catalog to support more DBs, like PostgresSQL with its dialect.
- More complex merge semantics and concurrent control semantics require more metadata in the catalog, and we need to encapsulate the details within independent catalog implementation.
Proposal
Design Goals
Catalog metadata management refers to the management of metadata such as all tables, schemas, partitions, data directories or files in a data warehouse. Similar to Hive MetaStore, metadata management needs to be connected with the Catalog interface of the computing engine to implement SQL query resolving for the LakeSoul table.
LakeSoul's metadata management hopes to achieve the following goals:
- High performance and scalable. There are some performance problems in metadata management such as Hive. For example, when Hive queries partition, it needs to perform Join in MySQL for the two tables PARTITONS and PARTITION_KEY_VALS, there will be performance problems with thousands of partitions of one table.
LakeSoul's metadata read and write operations can be searched through the primary key index, preventing full table scans. - Atomicity. The metadata atomicity of LakeSoul provides the atomicity of data commit operations, that is, partitioned micro-batch writing, which can ensure that the batch is fully submitted and the reader will not see the data in the intermediate state.
- Consistency. LakeSoul implements tunable consistency through Cassandra, and the default is eventual consistency.
- Isolation. LakeSoul realizes the separation of read and write versions through a multi-version mechanism, and can realize version backtracking based on timestamps (ie time travel).
- Multiple concurrent writes. LakeSoul supports multiple concurrent writes, and determines whether data conflicts occur by detecting write conditions. For conflict-free writes (Append/Upsert), concurrent updates are allowed. For conflicting writes (Update), the retry logic is performed by the compute framework.
- Other extension points. Metadata can also provide table-level special semantic control (CDC, SCD), file-level statistics, data distribution (Bucketing, Ordering)
Meta Operations
1. Data Format Definition
1. Table information. Store table name, path, Schema, Properties, partition name and type (Range, Hash)
CREATE TABLE table_info (
table_id text,
table_name text,
table_path text,
table_schema text,
properties json, # entity corresponds to JSONObject
partitions text,
PRIMARY KEY (table_id)
)
CREATE TABLE table_name_id (
table_name text,
table_id text,
PRIMARY KEY (table_name)
)
CREATE TABLE table_path_id (
table_path text,
table_id text,
PRIMARY KEY (table_path)
)
PostgreSQL: Documentation: 14: Chapter 8. Data Types
PostgreSQL: Documentation: 14: 8.15. Arrays
2. File Information. File information stores file names and file operations (file_op), such as add (add), delete (delete)
3. Commit. A commit corresponds to a set of file information and records the type of the current commit (commit_op). Commit and file information are stored in a table
CREATE TYPE data_file_op AS (
path text,
file_op text,
size bigint,
file_exist_cols text
);
CREATE TABLE data_commit_info (
table_id text,
partition_desc text,
commit_id UUID, # can use timestamp
file_ops data_file_op[],
commit_op text,
timestamp bigint,
PRIMARY KEY (table_id, partition_desc, commit_id)
)
PostgreSQL: Documentation: 14: 8.16. Composite Types
https://www.postgresql.org/docs/current/datatype-uuid.html
The types of commit_op include update, compaction, append, and merge. These four types are related to the semantic relationship of multiple commits in the snapshot
4. Snapshots.
A Snapshot contains a commit sequence, where the commit sequence is organized in chronological order, and each snapshot can restore the files that need to be read, ignored (deleted or invalidated after update), merged (merge on read), and the files between them. sequence relationship
5. Partition information.
Stores all historical snapshots in a partition, and a corresponding version number (used to control read-write isolation and multi-version, etc.). Among them, partition_desc is a combined description generated by multi-level partitions, which is used to uniquely locate a leaf partition of a table.
CREATE TABLE partition_info (
table_id text,
partition_desc text, # year=2022,month=04,day=20,bucket=000
version int, # continuous monotonically increasing
commit_op text,
snapshot UUID[], # organized by commit time order
expression text, #Entity corresponds to JSONArray
PRIMARY KEY (table_id, partition_desc, version)
)
Write logic
Each time the data of a table partition is updated, the overall process is as follows
- A list of DataFileOp file operation pairs is generated by the computing framework, each file operation pair is a tuple of (file name, operation), where the operation includes add, delete
- Generate a UUID as the commit id, and write the file operation pair list and the current commit_op into the data_commit_info table (replace with timestamp)
- At the same time, generate the corresponding partition_info data according to the generated data_commit_info
- Get the current version value from each partition: current version , current snapshot: current_snapshot
- Calculate the new version and snapshot of each partition:
- new_version = current_version + 1
- new_snapshot =
1. append/merge: current_snapshot.append(commit_id)
2. update, compaction: [commit_id] - Ideally, the data can be written to the table at a time. Considering that there may be version conflicts in concurrency, it is necessary to consider the conflict detection mechanism. Here, the pg transcation mechanism is used to ensure that data conflicts can be safely rolled back, and different operations handle conflicts.
- Fine-grained Concurrent conflict resolution:
In some cases, conflict is resolvable. For example, two concurrent append operations have no conflict, and the second writer could just retry to get newest commit sequence and make a new snapshot. However in some cases like to concurrent udpate, the conflict cannot be resolved and the second operation should fail.
OP | append | compaction | update | merge |
---|---|---|---|---|
append | Retry Commit | Reorder as compaction+append sequence | ✕ | ✕ |
compaction | Reorder as compaction+append sequence | Discard the last one | Keep Update only | reorder as compaction+ merge sequence |
update | ✕ | Keep update only | ✕ | ✕ |
merge | ✕ | Reorder as compaction+ merge sequence | ✕ | ✕ |
Conflict resolving steps:
- If there is a conflict, according to the difference of this submission operation, you can re-fetch the current (cur_1) partition information each time in the following way:
1. If this is an append operation and a conflict with the concurrent update/merge is not allowed, you can directly determine that the append operation fails. If it conflicts with compaction/append, then snapshot = cur_1.snapshot + partition_info.snapshot;
2. If this is a compaction operation, update will prevail when it conflicts with updte. The operation will be abandoned and the value true will be returned directly. If it conflicts with compaction, the operation will be invalid and return true. If it conflicts with append/merge , then snapshot= partition_info.snapshot.append(cur_1.snapshot-current.snapshot);
3. If this is an update operation, the conflict with append/merge/update that occurs at the same time is not allowed, so the judgment fails. If it conflicts with compaction, just overwrite the snapshot, then snapshot = partition_info.snapshot;
4. If this is a merge operation, conflicts with append/merge/update that occur at the same time are not allowed, so the judgment fails. If it conflicts with compaction, snapshot = cur_1.snapshot + partition_info.snapshot; - Resubmitted version = cur_1.version + 1
Read logic
1. Normal reading process
- Get the latest partition version number and snapshot of each partition
SELECT max(version), snapshot FROM partition_info
WHERE table_id = id and partition_desc in (descs...);
select m.table_id, t.partition_desc, m.version, m.commit_op, m.snapshot, m.expression
from
select table_id,partition_desc,max(version)
from partition_info
where table_id = 'tableId' and partition_desc in ('partitionlist')
group by table_id, partition_desc) t
left join partition_info m
on t.table_id = m.table_id and t.partition_desc = m.partition_desc and t.max = m.version
Note: Since version
is inside the composite primary key, pg would automatically use btree as index so this query execution requires no table scan and is actually very fast.
- Create a read logical plan through the commit_id list in the snapshot
- For each commit id, read the list of DataFileOp from data_commit_info
- According to the commit order, decide the Scan plan
1. If there is a merge commit, you need to create a MergeScan
2. In the rest of the cases, create ParquetScan directly
There is actually a requirement implicit here, that is, only one file can be retained in each partition after Update, while merge can have any number of delta files. For example, if a partition has been updated several times, only one file may remain in this partition. Therefore, we can actually keep only the latest commit id after an update.
2 Time Travel reading process
Time Travel supports reading at a given version or at a given timestamp. When the version is given, the corresponding version is directly searched, and the rest of the logic is the same as 4.1. Given a timestamp, each partition needs to traverse the timestamp of the version to find the version with the first commit timestamp <= the given timestamp.
Exception handling logic
Possible exceptions when submitting file updates:
- After writing the file, the writing job fails and no commit record is created.
- After the commit record is created, the update partition version fails.
Logically, both failure cases can be ignored because the final version number is not modified and has no effect on reads. But it leaves invalid data in storage and metadata that needs to be cleaned up.
TTL processing logic
TTL is a common requirement for data warehouses.
- For tables with TTL set, set the same TTL to partition_info, commit, and set the TTL of file storage at the same time.
- For situations where TTL cannot be used for file storage, such as HDFS, cleanup operations can be performed asynchronously by listening to TTL events of metadata (via CDC).
Schema Evolution (DDL)
The above section does not take into account the capabilities of Schema evolution. For schema evolution, the main problem that needs to be dealt with is that when the schema is changed (adding, deleting, and modifying several columns)