mix ecto.ch.schema fails for columns with type `AggregateFunction`
Zarathustra2 opened this issue ยท 30 comments
Getting this error:
** (ArgumentError) failed to decode "AggregateFunction(argMin, Decimal(18, 4), DateTime)" as ClickHouse type (no function clause matching in Ch.Types.decode/3)
lib/ch/types.ex:338: Ch.Types.decode([:type], "AggregateFunction(argMin, Decimal(18, 4), DateTime)", [])
lib/ch/types.ex:328: Ch.Types.decode/1
lib/mix/tasks/schema.ex:77: Mix.Tasks.Ecto.Ch.Schema.build_field/2
lib/mix/tasks/schema.ex:46: anonymous fn/1 in Mix.Tasks.Ecto.Ch.Schema.run/1
(elixir 1.14.4) lib/enum.ex:1658: Enum."-map/2-lists^map/1-0-"/2
(elixir 1.14.4) lib/enum.ex:1658: Enum."-map/2-lists^map/1-0-"/2
lib/mix/tasks/schema.ex:46: Mix.Tasks.Ecto.Ch.Schema.run/1
(mix 1.14.4) lib/mix/task.ex:421: anonymous fn/3 in Mix.Task.run_task/4
when running mix ecto.ch.schema default.<TABLE>
The table has the columns:
`foo` AggregateFunction(argMin, Decimal(18, 4), DateTime),
`bar` AggregateFunction(argMax, Decimal(18, 4), DateTime),
How would I define a schema for those fields as well?
** (ArgumentError) failed to decode "AggregateFunction(argMin, Decimal(18, 4), DateTime)" as ClickHouse type (no function clause matching in Ch.Types.decode/3)
(ch 0.1.11) lib/ch/types.ex:338: Ch.Types.decode([:type], "AggregateFunction(avgWeighted, Decimal(18, 4), Int64)", [])
(ch 0.1.11) lib/ch/types.ex:328: Ch.Types.decode/1
(ecto 3.10.1) lib/ecto/parameterized_type.ex:190: Ecto.ParameterizedType.init/2
(ecto 3.10.1) lib/ecto/schema.ex:1916: Ecto.Schema.__field__/4
lib/core/clickhouse/schemas/chain_aggregates.ex:34: (module)
and just using Decimal(18,4)
won't work because then on insert we will get:
Type of 'foo' must be AggregateFunction(argMin, Decimal(18, 4), DateTime), not Decimal(18, 4)
I haven't used AggregateFunction
types so I'm not sure how to properly decode them (for some reason many adapters don't support them). However, we can add something like :as
option:
# uses `AggregateFunction(argMin, Decimal(18, 4), DateTime)` in the header
# uses `Decimal(18, 4)` for encoding and decoding
field :foo, Ch, type: "AggregateFunction(argMin, Decimal(18, 4), DateTime)", as: "Decimal(18, 4)"
@Zarathustra2 what do you think?
It would also be able to replace a custom type in Plausible
field :is_bounce, Ch, type: "UInt8", as: :boolean
PR: #91
or maybe it can be a :name
, with the logic reversed
field :foo, Ch, type: "Decimal(18, 4)", name: "AggregateFunction(argMin, Decimal(18, 4), DateTime)"
field :is_bounce, Ch, type: :boolean, name: "UInt8"
It's funny, for me the intuitive naming convention would be the other way around:
field :is_bounce, Ch, type: :boolean, as: "UInt8"
I suppose it depends on which side you approach it from. Looking at it from the Clickhouse side, it's a UInt8
which is treated as a boolean
in the app. But looking from the app side, it is a boolean
which is serialized as an UInt8
to Clickhouse.
Since Ecto lives in the context or our application, it's more intuitive for me that the type
would represent how we treat it in-app.
@ruslandoga can you also add a test for inserting the data? I am wondering if we will run into the same issue as ClickHouse/clickhouse-java#1232
I personally prefer type
& name
maybe even go with ch_name
instead of name
@Zarathustra2 what kind of test do you have in mind?
Something like this?
test "insert AggregateFunction", %{conn: conn} do
Ch.query!(conn, """
CREATE TABLE test_insert_aggregate_function (
uid Int16,
updated SimpleAggregateFunction(max, DateTime),
name AggregateFunction(argMax, String, DateTime)
) ENGINE AggregatingMergeTree ORDER BY uid
""")
rows = [
[1, ~N[2020-01-02 00:00:00], "b"],
[1, ~N[2020-01-01 00:00:00], "a"]
]
assert %{num_rows: 2} =
Ch.query!(
conn,
"""
INSERT INTO test_insert_aggregate_function
SELECT uid, updated, arrayReduce('argMaxState', [name], [updated])
FROM input('uid Int16, updated DateTime, name String')
FORMAT RowBinary\
""",
rows,
types: ["Int16", "DateTime", "String"]
)
assert Ch.query!(conn, """
SELECT uid, max(updated) AS updated, argMaxMerge(name)
FROM test_insert_aggregate_function
GROUP BY uid
""").rows == [[1, ~N[2020-01-02 00:00:00], "b"]]
end
But note that it doesn't test inserting insert into ... select from input(...) format ...
won't be supported automatically in an Ecto adapter andAggregateFunction
types.
Sorry bad phrasing, more like how would I even insert into this column with ecto:
defmodule SomeSchema do
use Ecto.Schema
@primary_key false
schema "table" do
field :foo, Ch, type: "Decimal(18, 4)", name: "AggregateFunction(argMin, Decimal(18, 4), DateTime)"
end
end
How would I insert into this column since I need to insert not Decimal(18,4)
but an argMinState(Decimal(18, 4), DateTime))
.
How would you insert into that column with clickhouse-client
?
for my example above?
Yes, since I haven't used AggregateFunction
myself I don't know how inserting would work, but I can help you translate a clickhouse-client
query into ecto_ch
:)
create table table_test_agg (foo AggregateFunction(argMin, UInt8, DateTime)) Engine = Memory;
insert into table_test_agg (foo) Values (arrayReduce('argMinState', [1], [now()]));
on a different note: AggregateFunction
& SimpleAggregateFunction
are super cool, I use them a lot! :)
This query won't work in "streaming" formats like native or rowbinary, since they don't do any evaluation of the rows.
You can use the same query with Repo.query
or Ch.query
though.
I am wondering whether the data can be encoded in a different way because the java issues states:
On the other hand, in order to support more AggregateFunction types, since there's no document about the data structures for read and write, we have to dig into ClickHouse code to figure it out one by one, which is going to take a while. Would be great if someone from the server team can document all the details, so that not only Java but all other clients will benefit from that.
So it seems serializing a state of an aggregate function is possible but it is not documented? Maybe we have to ping someone on the slack?
We probably won't be supporting undocumented APIs in this driver. However, it would be possible to support them in a separate library since Ch
has a very liberal API:
encoded = YourRowBinaryEncoder.encode_rows(...)
Ch.query(conn, "insert into ... format RowBinary", encoded, encode: false)
It seems to me right now that we won't be needing :as
option in Ch
since it doesn't accomplish much.
field :is_bounce, Ch, type: :boolean, as: "UInt8"
can be already covered with a custom Ecto type which is more typing but more conventional and possibly easier to understand.
And
field :foo, Ch, type: "AggregateFunction(argMin, Decimal(18, 4), DateTime)", as: "Decimal(18, 4)"
doesn't seem to be possible, since we can't just take a decimal and insert it into an aggregate function type.
Yeah I don't think as
is needed right now
Do you think creating insert queries such as
"""
INSERT INTO test_insert_aggregate_function
SELECT uid, updated, arrayReduce('argMaxState', [name], [updated])
FROM input('uid Int16, updated DateTime, name String')
FORMAT RowBinary
"""
would be possible in ecto_ch with a macro or something based on a given schema?
EDIT: Actually this is kinda hard as we will never catch all edge cases (some may insert states with values from no fields of the table)
I'll add tests that use the approaches shown in https://kb.altinity.com/altinity-kb-schema-design/ingestion-aggregate-function/. It seems like all of them can already be supported with the current functionality.
yeah all of them can be used via arrayReduce
+ input
would be possible in ecto_ch with a macro or something based on a given schema?
That SQL statement can be constructed without macros. One sec.
Something like this:
defmodule Schema do
use Ecto.Schema
@primary_key false
schema "test_insert_aggregate_function" do
field :uid, Ch, type: "Int16"
field :updated, :naive_datetime
field :name, :string
end
end
table = Schema.__schema__(:source)
fields = Schema.__schema__(:fields)
types = Enum.map(fields, fn field -> Schema.__schema__(:type, field) |> Ecto.Type.type() end)
structure = Enum.zip(fields, types) |> Enum.map(fn {f, t} -> "#{f} #{t}" end) |> Enum.join(", ")
select = from i in fragment("input(?)", literal(^structure)),
select: %{uid: i.uid, updated: i.updated, name: fragment("arrayReduce('argMaxState', [?], [?])", i.name, i.updated)}
{select, _no_params = []} = Repo.to_sql(:all, select)
Repo.query!(["insert into ", table, ?\s, select, " format RowBinary"], rows, types: types)
Oh that is sick, thanks @ruslandoga <3
Should this may be added to the readme of ecto_ch, I bet someone else will run into this as well :D
I'll try to find a way to "automate" fragment("input(?)", ...)
in the tests for https://kb.altinity.com/altinity-kb-schema-design/ingestion-aggregate-function/.
I'm thinking about doing something like
import Ecto.Adapters.ClickHouse.API, only: [input: 1]
input =
from i in input(uid: "Int16", updated: "DateTime", name: "String"), # or input(Schema)
select: %{
uid: i.uid,
updated: i.updated,
name: fragment("arrayReduce('argMaxState', [?], [?])", i.name, i.updated)
}
rows = [
[uid: 1231, updated: ~N[2020-01-02 00:00:00], name: "Jane"],
[uid: 1231, updated: ~N[2020-01-01 00:00:00], name: "John"]
]
TestRepo.insert_all("users", rows, input: input)
WIP: plausible/ecto_ch#79
@Zarathustra2 I've merged plausible/ecto_ch#79 and I wonder if it solves your use-case.
def deps do
[
- {:ecto_ch, "~> 0.1.0"},
+ {:ecto_ch, github: "plausible/ecto_ch"}
]
end
I'm not releasing it yet since it might need some more work depending on your test-drive.
Oh that is sick!!! Let me test that today/tomorrow, I will ping you but just from reading over the code it looks pretty incredible easy to work with! <3
likely not getting to it today but should be getting to it tomorrow, sorry about that
Currently getting ** (Ch.Error) Code: 477. DB::Exception: FORMAT must be specified for function input(). (INVALID_USAGE_OF_INPUT) (version 23.2.4.12 (official build))
let me see if I can debug this one :D (can't really share the schema due to being confidential :( )
You might be able to use the tests as a guide: https://github.com/plausible/ecto_ch/blob/fb995abad0207b1751898c237a9d89c93a2154e7/test/ecto/integration/aggregate_function_type_test.exs#L78-L152
Works now I had TestRepo.insert_all("users", input, input: rows)
instead of TestRepo.insert_all("users", rows, input: input)
lol
Super awesome! @ruslandoga <3
Ah, yes. I still don't know what the API should be. I considered both of your approaches (and picked the current one since we are still inserting rows, but "via" an input). And I also thought about Repo.insert_input(table, input_query, rows, opts)
I mean, I did a silly mistake so that is that :D
insert_input
doesn't sound too bad IMO then could add dedicated docs for that function which may make it easier for other users to spot on hexdocs when reading through the different functions?