cockroachdb/replicator

Prefer changefeed queries to have key_in_value option set

Closed this issue · 6 comments

As presently implemented, we perform a bit of work when parsing changefeed query payloads to create a replication key based on columns in the target table (https://github.com/cockroachdb/replicator/blob/master/internal/source/cdc/primary_key.go#L26). Due to cockroachdb/cockroach#131856 we may not have the information necessary to do so when processing deletes, when the target primary key requires a projected value.

If changefeed queries can support the key_in_value option, this back-parsing would be unnecessary. The source changefeed would simply provide a replica identity (the PK of the row that triggered the message). Given the changes to support REPLICA IDENTITY FULL, the replica key does not require any relationship to the target PK.

Hi Bob, just want to make sure I understand the ask here clearly. I did a quick search on the changefeed queries docs and found this (link):
image

The snippet above seems to imply that if we are using a webhook sink, this is already passed by default. Is the ask here then to unmarshal the primary key array into an appropriate format (maybe *ident.Map[int]) and use that in lieu of the current getPrimaryKey ? From an initial look at the codebase it looks like I need to update the query_payload processors to pull this data out of the data coming from the source CDC message when key_in_value is specified.

Also, what would be the easiest way to dump the incoming raw JSON that's coming in from the changefeed message?

Made some progress here, I'm at least able to dump out the received JSON coming out of the request body from the webhook. I see that the data now shows the key in the message:

Received JSON: map[length:1 payload:[map[after:map[id:1.204560795e+09 t:yo] before:<nil> key:[1.204560795e+09] updated:1728948380857547000.0000000000]]]

This is from an INSERT into tbl1, which has a simple schema:

root@127.0.0.1:26257/molt> SHOW CREATE TABLE tbl1;                                                                  
  table_name |               create_statement
-------------+------------------------------------------------
  tbl1       | CREATE TABLE public.tbl1 (
             |     id INT8 NOT NULL,
             |     t STRING NULL,
             |     CONSTRAINT tbl1_pkey PRIMARY KEY (id ASC)
             | )
(1 row)

Problem
One thing to note here is that the key seems to be sent with the value of the key, not the name of the column. Given that getPrimaryKeys adds to a map that has a key of ident type and value of integer, it doesn't seem like the webhook payload gives us what we need. We'll still need to introspect the table to get the column names.

I've tried this with both 23.2 and 24.1, and also with composite Pks:

Received JSON: map[length:1 payload:[map[after:map[id:3 id2:302 t:<nil>] before:<nil> key:[3 302] updated:1728950170607052977.0000000000]]]

Assuming we get the correct data from the webhook (can check to see if I can set another setting to column names)
Given that we modify the values of req.keys via reference, I'm thinking we can embed the logic here to key off of the "key" field and get that data into the ident.Map. Alternatively, we can do this after getPrimaryKey is called in the webhook handler.

The documentation is not correct when it comes to changefeed queries, it seems to be necessary to request key_in_value. The columns don't matter, just a unique replication identity for the mutation. That's currently provided by getPrimaryKey(), but it doesn't need to be in this case.

Yep, you're right, let me bring this docs inconsistency to the CDC team's attention. I tested locally and needed to pass in key_in_value explicitly.

Ok got it, if you need a unique replication identity for the mutation, then we get that from the key key in the map. But I'm curious how getPrimaryKey does that right now? Since this seems to get the column name mapped to its index right now instead of the actual PK value.

Where does this get used downstream and who needs this unique ID actually?

The output map from getPrimaryKey() winds up being used here when the payload line is decoded:

// Extract PK values.
q.keyValues = make([]json.RawMessage, q.keys.Len())
for k, pos := range q.keys.All() {
v, ok := msg.Get(k)
if !ok {
return errors.Errorf("missing primary key: %s", k)
}
q.keyValues[pos] = v
}
return nil

The types.Mutation.Key field is used when it's necessary to identify if two mutations should be considered to affect the same record, be it a tabular row, document id, etc.

Screenshot 2024-10-15 at 11 55 46

Ok yeah, I get what you're saying now. The PK columns are a means to the end of getting the unique ID for a mutation. But we skip all that because the source changefeed gives us that data now.

So we just need to use the provided keys values for the ID if it's given by the changefeed (when key_in_value is specified). Otherwise, we can do this back processing in the default case if we don't have the keys array from the payload.