plausible/ecto_ch

Mix task to generate ClickHouse schema hint

Opened this issue · 0 comments

Hey folks,

Thanks for the great library! I put together a small mix task to help me get started (this is my first time interacting with ClickHouse).

If you think it might be useful I'm open to suggestions on how to make it good enough for ecto_ch:

defmodule Mix.Tasks.Ecto.Ch.SchemaHint do
  @moduledoc """
  Provides ClickHouse schema hints based on PostgreSQL table data types.
  """
  use Mix.Task
  require Ecto.Schema
  require Logger
  alias Ecto.Adapters.SQL, as: SQLAdapter

  @with_migration_opt "--with-migration"

  def run([]) do
    IO.puts("Provide the Ecto schema module name and the application repo module name.")
    IO.puts("#{IO.ANSI.yellow()}For example:#{IO.ANSI.reset()}  mix ecto.ch.schema_hint MyApp.Accounts.User MyApp.Repo")
    IO.puts("Pass #{@with_migration_opt} to print migration hint.")
  end

  @shortdoc "Accepts an Ecto schema module name and an application repo module name."
  def run(args) when length(args) == 2 or length(args) == 3 do
    schema_module = to_elixir_module(Enum.at(args, 0))
    repo = to_elixir_module(Enum.at(args, 1))
    start_ecto_dependencies!(repo)

    types =
      schema_module
      |> get_column_info(repo)
      |> infer_types()

    IO.puts(build_clickhouse_schema(schema_module, types))

    if Enum.any?(args, &(&1 == @with_migration_opt)) do
      IO.puts(build_clickhouse_migration(schema_module, types))
    end
  end

  defp infer_types(column_info) do
    Enum.reduce(column_info, [], fn
      {"id", {_type, _nullable}}, acc ->
        acc

      {field_name, {"json", _nullable}}, acc ->
        Logger.warning(
          "Ignoring jsonb column '#{field_name}'. Consult https://clickhouse.com/docs/en/integrations/data-formats/json"
        )

        acc

      {field_name, {data_type, nullable}}, acc ->
        data_type = map_postgres_type_to_clickhouse(data_type)

        clickhouse_type =
          if nullable do
            "Nullable(#{data_type})"
          else
            data_type
          end

        [
          %{field_name: field_name, data_type: data_type, clickhouse_type: clickhouse_type, nullable: nullable}
          | acc
        ]
    end)
    |> Enum.sort()
  end

  defp get_column_info(schema_module, repo) do
    table_name = schema_module.__schema__(:source)
    query = "SELECT column_name, data_type, is_nullable FROM information_schema.columns WHERE table_name = $1"

    case SQLAdapter.query(repo, query, [table_name]) do
      {:ok, result} ->
        Enum.reduce(result.rows, %{}, fn [column_name, data_type, is_nullable], acc ->
          Map.put(acc, column_name, {data_type, is_nullable == "YES"})
        end)

      {:error, _reason} ->
        %{}
    end
  end

  defp map_postgres_type_to_clickhouse(data_type) do
    case String.downcase(data_type) do
      "date" ->
        "Date"

      "timestamp" ->
        "DateTime"

      "real" ->
        "Float32"

      "double precision" ->
        "Float64"

      "decimal" ->
        "Decimal"

      "numeric" ->
        "Decimal"

      "smallint" ->
        "Int16"

      "integer" ->
        "Int32"

      "bigint" ->
        "Int64"

      "serial" ->
        "UInt32"

      "bigserial" ->
        "UInt64"

      "text" ->
        "String"

      "char" ->
        "String"

      "character varying" ->
        "String"

      "array" ->
        "Array"

      "boolean" ->
        "Bool"

      "uuid" ->
        "UUID"

      "timestamp without time zone" ->
        "DateTime"

      unknown ->
        Logger.warning("Ignoring unknown type '#{unknown}'. Consult https://clickhouse.com/docs/en/sql-reference/data-types")
    end
  end

  defp build_clickhouse_schema(schema_module, types) do
    schema_name = schema_module.__schema__(:source)

    fields_definitions =
      Enum.map(types, fn field_definition ->
        "field :#{field_definition.field_name}, Ch, type: \"#{field_definition.clickhouse_type}\""
      end)

    """
    schema "#{schema_name}" do
      #{Enum.join(fields_definitions, "\n  ")}
    end
    """
  end

  defp build_clickhouse_migration(schema_module, fields_definitions) do
    schema_name = schema_module.__schema__(:source)

    add_column_entries =
      fields_definitions
      |> Enum.map(fn field_definition ->
        "add :#{field_definition.field_name}, :#{field_definition.data_type}#{if field_definition.nullable, do: ", null: true"}"
      end)
      |> Enum.sort()

    """
    table_options = []
    engine_options = [order_by: "tuple()"] # skip sorting key. See https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/mergetree#order_by

    options = table_options ++ engine_options

    create table(:#{schema_name}, primary_key: false, engine: "ReplicatedMergeTree", options: options) do
      #{Enum.join(add_column_entries, "\n  ")}
    end
    """
  end

  defp to_elixir_module(module_name) do
    String.to_existing_atom("Elixir.#{module_name}")
  end

  # https://github.com/elixir-ecto/ecto_sql/blob/b4329a1fe6f2888b5b99b3b5b3316f246a838c3a/lib/ecto/migrator.ex#L149
  def start_ecto_dependencies!(repo) do
    config = repo.config()
    mode = :permanent

    Enum.map([:ecto_sql], fn app ->
      {:ok, _started} = Application.ensure_all_started(app, mode)
    end)

    {:ok, _repo_started} = repo.__adapter__().ensure_all_started(config, mode)

    case repo.start_link(pool_size: 2) do
      {:ok, _} ->
        {:ok, :stop}

      {:error, {:already_started, _pid}} ->
        {:ok, :restart}

      {:error, _} = error ->
        raise "error starting repo: #{inspect(error)}"
    end
  end
end