plausible/ch

mix ecto.ch.schema fails for columns with type `AggregateFunction`

Zarathustra2 opened this issue ยท 30 comments

Getting this error:

** (ArgumentError) failed to decode "AggregateFunction(argMin, Decimal(18, 4), DateTime)" as ClickHouse type (no function clause matching in Ch.Types.decode/3)
    lib/ch/types.ex:338: Ch.Types.decode([:type], "AggregateFunction(argMin, Decimal(18, 4), DateTime)", [])
    lib/ch/types.ex:328: Ch.Types.decode/1
    lib/mix/tasks/schema.ex:77: Mix.Tasks.Ecto.Ch.Schema.build_field/2
    lib/mix/tasks/schema.ex:46: anonymous fn/1 in Mix.Tasks.Ecto.Ch.Schema.run/1
    (elixir 1.14.4) lib/enum.ex:1658: Enum."-map/2-lists^map/1-0-"/2
    (elixir 1.14.4) lib/enum.ex:1658: Enum."-map/2-lists^map/1-0-"/2
    lib/mix/tasks/schema.ex:46: Mix.Tasks.Ecto.Ch.Schema.run/1
    (mix 1.14.4) lib/mix/task.ex:421: anonymous fn/3 in Mix.Task.run_task/4

when running mix ecto.ch.schema default.<TABLE>

The table has the columns:

`foo` AggregateFunction(argMin, Decimal(18, 4), DateTime),
`bar` AggregateFunction(argMax, Decimal(18, 4), DateTime),

How would I define a schema for those fields as well?

** (ArgumentError) failed to decode "AggregateFunction(argMin, Decimal(18, 4), DateTime)" as ClickHouse type (no function clause matching in Ch.Types.decode/3)
    (ch 0.1.11) lib/ch/types.ex:338: Ch.Types.decode([:type], "AggregateFunction(avgWeighted, Decimal(18, 4), Int64)", [])
    (ch 0.1.11) lib/ch/types.ex:328: Ch.Types.decode/1
    (ecto 3.10.1) lib/ecto/parameterized_type.ex:190: Ecto.ParameterizedType.init/2
    (ecto 3.10.1) lib/ecto/schema.ex:1916: Ecto.Schema.__field__/4
    lib/core/clickhouse/schemas/chain_aggregates.ex:34: (module)

and just using Decimal(18,4) won't work because then on insert we will get:

 Type of 'foo' must be AggregateFunction(argMin, Decimal(18, 4), DateTime), not Decimal(18, 4)

I haven't used AggregateFunction types so I'm not sure how to properly decode them (for some reason many adapters don't support them). However, we can add something like :as option:

# uses `AggregateFunction(argMin, Decimal(18, 4), DateTime)` in the header
# uses `Decimal(18, 4)` for encoding and decoding
field :foo, Ch, type: "AggregateFunction(argMin, Decimal(18, 4), DateTime)", as: "Decimal(18, 4)"

@Zarathustra2 what do you think?

It would also be able to replace a custom type in Plausible

field :is_bounce, Ch, type: "UInt8", as: :boolean

PR: #91

or maybe it can be a :name, with the logic reversed

field :foo, Ch, type: "Decimal(18, 4)", name: "AggregateFunction(argMin, Decimal(18, 4), DateTime)"
field :is_bounce, Ch, type: :boolean, name: "UInt8"

It's funny, for me the intuitive naming convention would be the other way around:

field :is_bounce, Ch, type: :boolean, as: "UInt8"

I suppose it depends on which side you approach it from. Looking at it from the Clickhouse side, it's a UInt8 which is treated as a boolean in the app. But looking from the app side, it is a boolean which is serialized as an UInt8 to Clickhouse.

Since Ecto lives in the context or our application, it's more intuitive for me that the type would represent how we treat it in-app.

@ruslandoga can you also add a test for inserting the data? I am wondering if we will run into the same issue as ClickHouse/clickhouse-java#1232

I personally prefer type & name maybe even go with ch_name instead of name

@Zarathustra2 what kind of test do you have in mind?

Something like this?

  test "insert AggregateFunction", %{conn: conn} do
    Ch.query!(conn, """
    CREATE TABLE test_insert_aggregate_function (
      uid Int16,
      updated SimpleAggregateFunction(max, DateTime),
      name AggregateFunction(argMax, String, DateTime)
    ) ENGINE AggregatingMergeTree ORDER BY uid
    """)

    rows = [
      [1, ~N[2020-01-02 00:00:00], "b"],
      [1, ~N[2020-01-01 00:00:00], "a"]
    ]

    assert %{num_rows: 2} =
             Ch.query!(
               conn,
               """
               INSERT INTO test_insert_aggregate_function
                 SELECT uid, updated, arrayReduce('argMaxState', [name], [updated])
                 FROM input('uid Int16, updated DateTime, name String')
                 FORMAT RowBinary\
               """,
               rows,
               types: ["Int16", "DateTime", "String"]
             )

    assert Ch.query!(conn, """
           SELECT uid, max(updated) AS updated, argMaxMerge(name)
           FROM test_insert_aggregate_function
           GROUP BY uid
           """).rows == [[1, ~N[2020-01-02 00:00:00], "b"]]
  end

But note that insert into ... select from input(...) format ... won't be supported automatically in an Ecto adapter and it doesn't test inserting AggregateFunction types.

Sorry bad phrasing, more like how would I even insert into this column with ecto:

defmodule SomeSchema do

  use Ecto.Schema

  @primary_key false
  schema "table" do
     field :foo, Ch, type: "Decimal(18, 4)", name: "AggregateFunction(argMin, Decimal(18, 4), DateTime)"
  end

end

How would I insert into this column since I need to insert not Decimal(18,4) but an argMinState(Decimal(18, 4), DateTime)).

How would you insert into that column with clickhouse-client?

for my example above?

Yes, since I haven't used AggregateFunction myself I don't know how inserting would work, but I can help you translate a clickhouse-client query into ecto_ch :)

 create table table_test_agg (foo AggregateFunction(argMin, UInt8, DateTime)) Engine = Memory;
insert into table_test_agg (foo) Values (arrayReduce('argMinState', [1], [now()]));

on a different note: AggregateFunction & SimpleAggregateFunction are super cool, I use them a lot! :)

This query won't work in "streaming" formats like native or rowbinary, since they don't do any evaluation of the rows.

You can use the same query with Repo.query or Ch.query though.

I am wondering whether the data can be encoded in a different way because the java issues states:

On the other hand, in order to support more AggregateFunction types, since there's no document about the data structures for read and write, we have to dig into ClickHouse code to figure it out one by one, which is going to take a while. Would be great if someone from the server team can document all the details, so that not only Java but all other clients will benefit from that.

So it seems serializing a state of an aggregate function is possible but it is not documented? Maybe we have to ping someone on the slack?

We probably won't be supporting undocumented APIs in this driver. However, it would be possible to support them in a separate library since Ch has a very liberal API:

encoded = YourRowBinaryEncoder.encode_rows(...)
Ch.query(conn, "insert into ... format RowBinary", encoded, encode: false)

It seems to me right now that we won't be needing :as option in Ch since it doesn't accomplish much.

field :is_bounce, Ch, type: :boolean, as: "UInt8"

can be already covered with a custom Ecto type which is more typing but more conventional and possibly easier to understand.

And

field :foo, Ch, type: "AggregateFunction(argMin, Decimal(18, 4), DateTime)", as: "Decimal(18, 4)"

doesn't seem to be possible, since we can't just take a decimal and insert it into an aggregate function type.

Yeah I don't think as is needed right now


Do you think creating insert queries such as

"""
 INSERT INTO test_insert_aggregate_function
                 SELECT uid, updated, arrayReduce('argMaxState', [name], [updated])
                 FROM input('uid Int16, updated DateTime, name String')
                 FORMAT RowBinary
"""

would be possible in ecto_ch with a macro or something based on a given schema?

EDIT: Actually this is kinda hard as we will never catch all edge cases (some may insert states with values from no fields of the table)

I'll add tests that use the approaches shown in https://kb.altinity.com/altinity-kb-schema-design/ingestion-aggregate-function/. It seems like all of them can already be supported with the current functionality.

yeah all of them can be used via arrayReduce + input

would be possible in ecto_ch with a macro or something based on a given schema?

That SQL statement can be constructed without macros. One sec.

Something like this:

defmodule Schema do
  use Ecto.Schema
  
  @primary_key false
  schema "test_insert_aggregate_function" do
    field :uid, Ch, type: "Int16"
    field :updated, :naive_datetime
    field :name, :string
  end
end

table = Schema.__schema__(:source)
fields = Schema.__schema__(:fields)
types = Enum.map(fields, fn field -> Schema.__schema__(:type, field) |> Ecto.Type.type() end)
structure = Enum.zip(fields, types) |> Enum.map(fn {f, t} -> "#{f} #{t}" end) |> Enum.join(", ")

select = from i in fragment("input(?)", literal(^structure)),
  select:  %{uid: i.uid, updated: i.updated, name: fragment("arrayReduce('argMaxState', [?], [?])", i.name, i.updated)}

{select, _no_params = []} = Repo.to_sql(:all, select)

Repo.query!(["insert into ", table, ?\s, select, " format RowBinary"], rows, types: types)

Oh that is sick, thanks @ruslandoga <3


Should this may be added to the readme of ecto_ch, I bet someone else will run into this as well :D

I'll try to find a way to "automate" fragment("input(?)", ...) in the tests for https://kb.altinity.com/altinity-kb-schema-design/ingestion-aggregate-function/.


I'm thinking about doing something like

import Ecto.Adapters.ClickHouse.API, only: [input: 1]

input =
  from i in input(uid: "Int16", updated: "DateTime", name: "String"), # or input(Schema)
    select: %{
      uid: i.uid,
      updated: i.updated,
      name: fragment("arrayReduce('argMaxState', [?], [?])", i.name, i.updated)
    }

rows = [
  [uid: 1231, updated: ~N[2020-01-02 00:00:00], name: "Jane"],
  [uid: 1231, updated: ~N[2020-01-01 00:00:00], name: "John"]
]

TestRepo.insert_all("users", rows, input: input)

WIP: plausible/ecto_ch#79

@Zarathustra2 I've merged plausible/ecto_ch#79 and I wonder if it solves your use-case.

def deps do
  [
-   {:ecto_ch, "~> 0.1.0"},
+   {:ecto_ch, github: "plausible/ecto_ch"}
  ]
end

I'm not releasing it yet since it might need some more work depending on your test-drive.

Oh that is sick!!! Let me test that today/tomorrow, I will ping you but just from reading over the code it looks pretty incredible easy to work with! <3

likely not getting to it today but should be getting to it tomorrow, sorry about that

Currently getting ** (Ch.Error) Code: 477. DB::Exception: FORMAT must be specified for function input(). (INVALID_USAGE_OF_INPUT) (version 23.2.4.12 (official build)) let me see if I can debug this one :D (can't really share the schema due to being confidential :( )

Works now I had TestRepo.insert_all("users", input, input: rows) instead of TestRepo.insert_all("users", rows, input: input) lol

Super awesome! @ruslandoga <3

Ah, yes. I still don't know what the API should be. I considered both of your approaches (and picked the current one since we are still inserting rows, but "via" an input). And I also thought about Repo.insert_input(table, input_query, rows, opts)

I mean, I did a silly mistake so that is that :D

insert_input doesn't sound too bad IMO then could add dedicated docs for that function which may make it easier for other users to spot on hexdocs when reading through the different functions?