duckdb/duckdb_iceberg

Hibben partition pruning

Fokko opened this issue · 0 comments

Fokko commented

Iceberg has support for hidden partitioning. Data written to a partitioned table will be split up based on the column and the applied transform:

image

A partitioned table can tremendously speed up the table, since each ManifestList keeps a summary of the partition range. This way on a high level, the files can be pruned.

Important to note here that partitions can evolve over time with Iceberg. For example, when a partition is updated from daily to hourly partitions due to increased data, then new data will be written using the new partitioning strategy. The old data will not change and will remain partitioned on a daily basis until the data is rewritten.

Pruning of manifest files

Manifest list:

{
	"manifest_path": "s3://warehouse/nyc/taxis/metadata/65085354-ca41-4ae0-b454-04bc1a669f19-m0.avro",
	"manifest_length": 16797,
	"partition_spec_id": 0,  # field-id: 502
	"content": 0,
	"sequence_number": 3,
	"min_sequence_number": 3,
	"added_snapshot_id": 864191924052698687,
	"added_data_files_count": 42,
	"existing_data_files_count": 0,
	"deleted_data_files_count": 0,
	"added_rows_count": 3627882,
	"existing_rows_count": 0,
	"deleted_rows_count": 0,
	"partitions": {  # field-id: 507
		"array": [{
			"contains_null": false,
			"contains_nan": {
				"boolean": false
			},
			"lower_bound": {
				"bytes": "¤7\u0000\u0000"
			},
			"upper_bound": {
				"bytes": "·J\u0000\u0000"
			}
		}]
	}
}

Since there can be many different partition strategies over time, the evaluators are constructed lazily on PyIceberg. Based on the 502: partition_spec_id we construct an evaluator:

        manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)

This looks up the partition spec, and creates the evaluator:

    def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
        project = inclusive_projection(self.table.schema(), self.table.specs()[spec_id])
        return project(self.row_filter)

This requires the original schema, since the filter references the original fields. Consider the following schema:

schema {
  1: id int
}

For example, id = 10, and there is a bucket(id) partitioning on the table. Then the spec is as:

[
     1000: id: bucket[22](1)
]

The PartitionSpec has a single field, that starts from field-id 1000. It references field-id 1, which corresponds with id. This way we decouple the filtering from the naming.

The most important part where it is filtered:

class InclusiveProjection(ProjectionEvaluator):
    def visit_bound_predicate(self, predicate: BoundPredicate[Any]) -> BooleanExpression:
        parts = self.spec.fields_by_source_id(predicate.term.ref().field.field_id)

        result: BooleanExpression = AlwaysTrue()
        for part in parts:
            # consider (d = 2019-01-01) with bucket(7, d) and bucket(5, d)
            # projections: b1 = bucket(7, '2019-01-01') = 5, b2 = bucket(5, '2019-01-01') = 0
            # any value where b1 != 5 or any value where b2 != 0 cannot be the '2019-01-01'
            #
            # similarly, if partitioning by day(ts) and hour(ts), the more restrictive
            # projection should be used. ts = 2019-01-01T01:00:00 produces day=2019-01-01 and
            # hour=2019-01-01-01. the value will be in 2019-01-01-01 and not in 2019-01-01-02.
            incl_projection = part.transform.project(name=part.name, pred=predicate)
            if incl_projection is not None:
                result = And(result, incl_projection)

        return result

Here we start with AlwaysTrue(), so until there is any evidence that the manifests are not relevant, it will be included. It will return a callable that can be bound to the row_filter:

    def project(self, expr: BooleanExpression) -> BooleanExpression:
        #  projections assume that there are no NOT nodes in the expression tree. to ensure that this
        #  is the case, the expression is rewritten to push all NOT nodes down to the expression
        #  leaf nodes.
        #  this is necessary to ensure that the default expression returned when a predicate can't be
        #  projected is correct.
        return visit(bind(self.schema, rewrite_not(expr), self.case_sensitive), self)

Pruning of the manifests

Manifest file:

When the manifest is considered relevant based on the the range:

[{
  "status": 1,
  "snapshot_id": {
    "long": 4554453935356538000
  },
  "data_file": {
    "file_path": "s3://warehouse/nyc/taxis/data/tpep_pickup_datetime_day=2021-04-29/00003-10-989122fb-e6e6-449e-8290-57871f57028f-00001.parquet",
    "file_format": "PARQUET",
    "partition": {
      "tpep_pickup_datetime_day": {
        "int": 18746 // Partition: Thursday, April 29, 2021
      }
    },
...
}

This re-uses the same partition projection:

    @cached_property
    def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
        return KeyDefaultDict(self._build_partition_projection)

And the meat is in this function:

    def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]:
        spec = self.table.specs()[spec_id]
        partition_type = spec.partition_type(self.table.schema())
        partition_schema = Schema(*partition_type.fields)
        partition_expr = self.partition_filters[spec_id]

        evaluator = visitors.expression_evaluator(partition_schema, partition_expr, self.case_sensitive)
        return lambda data_file: evaluator(data_file.partition)

We create a schema from the partition spec. We create a struct from the filter struct<int>, and we create an evaluator on top of it to see if there are rows that might match, based on the partition specification.

I hope this helps, I'm happy to provide more context, and when I get the time, I'll see if I can brush off my cpp skills.