gabledata/recap

Explore a Flink catalog based on Recap

gunnarmorling opened this issue · 3 comments

In Apache Flink, catalogs "provide metadata, such as databases, tables, partitions, views, and functions and information needed to access data stored in a database or other external systems".

There is an in-memory implementation which keeps any information only in the context of specific sessions. A persistent implementation is provided in form of the HiveCatalog, using Apache Hive (more precisely, the Hive Metastore) as the underlying storage layer.

The purpose of this issue is to explore the feasibility of creating a persistent Flink catalog on top of Recap (or, specifically, its schema registry). This could be interesting to Flink users looking for an alternative to the Hive-based catalog implementation. Note the Catalog contract has some facets which probably are not supported in Recap, e.g. the ability to store metadata about functions and statistics. I'm not sure whether Recap's core design is extensible so that it would allow for storing this kind of additional information?

Ok, so I sniffed around the Flink catalog stuff a bit.

tl;dr This looks doable, but with some (potentially significant) caveats.

A few notes/questions/thoughts:

  1. Can the catalog impl to be read-only? Or read-write is required?
  2. It's unclear to me what the catalog -> db -> table data model in Flink should map to in Recap's registry paths. After reading FLIP-125, I can't determine what the CFLT registry proposal does, but it has a similar problem (it just has subjects). Any ideas on how the catalog/db/table (or ObjectPath?) should map to Recap's path?
  3. Right now, Recap stores all schemas as Recap schemas. Conversion from/to other formats (avsc, json schema, proto) could be slightly lossy. The registry could be easily modified to a) receive and return schemas in any of the above formats, and b) persist the exact file for the format that was posted, so no coercion/loss would occur when reading a schema in its native format (i.e. a put AVRO and get AVRO would both see the exact same schema). Would need to write some code, though.
  4. As you said, Recap only models schemas, not partitions, stats, or functions. Recap does allow arbitrary attributes to be attached to schemas, though. It just ignores them and passes them along. Recap's HiveMetastoreClient already does this with HMS stats. In the registry, though, it gets a little more dangerous, since every mutation of a schema must re-persist the stats/function/partition info. technically I think you could treat functions as a schema in the Recap registry. You'd just have to denote that the schema is for a function, and interpret it as such in Flink's RecapCatalog. This is only an issue if we're talking about supporting write operations (see (1) above).
  5. Recap catalog is a Python process. It ships as a Docker container. Not sure if you care about this at all, but wanted to call it out.

Can the catalog impl to be read-only? Or read-write is required?

Read-write would definitely be required: the idea is to use Recap as a persistent catalog when adding tables (e.g. representing a CDC source or an Elasticsearch sink connection) in an interactive Flink SQL session, making them available again after starting a new session.

It's unclear to me what the catalog -> db -> table data model in Flink should map

Good question; how do you model this for Postgres today? There, you have a notion of "databases" (of which there could be multiple on one PG host), "schemas" (logical namespaces within a database), and "tables" (actual tables within a schema. How do Recap paths look like for that?

Recap catalog is a Python process. It ships as a Docker container

I don't think that's a problem whatsoever. It does expose a remote (REST) API to which Flink could talk, right?

Good question; how do you model this for Postgres today

The registry does not impose any structure. The API just takes a "path" (foo/bar/baz). The path structure is up to the writer. That said, for Recap tooling, the path for PG is always:

postgresql://host:port/db/schema/table

So, I think the mapping should be: catalog -> "recap", db -> "postgresql://host:port/db", table -> "schema.table" | "schema/table"

I'm also unsure why the API has ObjectPath for the write APIs and just string for the read APIs. That seems odd to me. Using ObjectPath for the read APIs would work better for Recap as you could do ObjectPath("schema", "table").

It does expose a remote (REST) API to which Flink could talk, right?

Correct.

Lastly, for (4), I want to make sure: This is still useful even if Recap doesn't store function/partition/stats right?