Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ruslandoga committed Jan 4, 2024
1 parent 37317dc commit e5c5b34
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 7 deletions.
20 changes: 18 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ csv = "0\n1"
Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT CSV\n" | csv])
```

#### Insert rows as [chunked](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) RowBinary stream
#### Insert [chunked](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) RowBinary stream

```elixir
{:ok, pid} = Ch.start_link()
Expand All @@ -145,10 +145,26 @@ DBConnection.run(pid, fn conn ->
|> Stream.chunk_every(100_000)
|> Stream.map(fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end)
|> Stream.take(10)
|> Enum.into(Ch.stream(conn, "INSERT INTO ch_demo(id) FORMAT RowBinary"))
|> Enum.into(Ch.stream(conn, "INSERT INTO ch_demo(id) FORMAT RowBinary\n"))
end)
```

#### Select rows as stream

```elixir
{:ok, pid} = Ch.start_link()

DBConnection.run(pid, fn conn ->
Ch.stream(conn, "SELECT * FROM system.numbers LIMIT {limit:UInt64}", %{"limit" => 1_000_000})
|> Stream.each(&IO.inspect/1)
|> Stream.run()
end)

# %Ch.Result{rows: [[0], [1], [2], ...]}
# %Ch.Result{rows: [[112123], [112124], [113125], ...]}
# etc.
```

#### Insert rows via [input](https://clickhouse.com/docs/en/sql-reference/table-functions/input) function

```elixir
Expand Down
8 changes: 5 additions & 3 deletions lib/ch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,13 @@ defmodule Ch do
DBConnection.execute!(conn, query, params, opts)
end

@doc false
@spec stream(DBConnection.t(), statement, params, [query_option]) :: DBConnection.Stream.t()
@doc """
Returns a stream for a query on a connection.
"""
@spec stream(DBConnection.t(), statement, params, [query_option]) :: Ch.Stream.t()
def stream(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)
DBConnection.stream(conn, query, params, opts)
%Ch.Stream{conn: conn, query: query, params: params, opts: opts}
end

if Code.ensure_loaded?(Ecto.ParameterizedType) do
Expand Down
3 changes: 2 additions & 1 deletion lib/ch/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ defmodule Ch.Connection do

@impl true
def handle_declare(query, params, opts, conn) do
{query_params, extra_headers, body} = params
{query_params, extra_headers} = params
body = query.statement

path = path(conn, query_params, opts)
headers = headers(conn, extra_headers, opts)
Expand Down
12 changes: 11 additions & 1 deletion lib/ch/query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ defimpl DBConnection.Query, for: Ch.Query do

@spec decode(Query.t(), [response], [Ch.query_option()]) :: Result.t()
when response: Mint.Types.status() | Mint.Types.headers() | binary
def decode(%Query{command: command}, responses, opts) do
def decode(%Query{command: command}, responses, opts) when is_list(responses) do
[_status, headers | data] = responses
format = get_header(headers, "x-clickhouse-format")
decode = Keyword.get(opts, :decode, true)
Expand All @@ -110,6 +110,16 @@ defimpl DBConnection.Query, for: Ch.Query do
end
end

def decode(%Query{command: command}, {:stream, _types, responses}, _opts) do
%Result{command: command, data: stream_responses(responses)}
end

defp stream_responses([{:status, _, _} | rest]), do: stream_responses(rest)
defp stream_responses([{:headers, _, _} | rest]), do: stream_responses(rest)
defp stream_responses([{:data, _, data} | rest]), do: [data | stream_responses(rest)]
defp stream_responses([{:done, _}]), do: []
defp stream_responses([]), do: []

defp get_header(headers, key) do
case List.keyfind(headers, key, 0) do
{_, value} -> value
Expand Down
44 changes: 44 additions & 0 deletions lib/ch/stream.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
defmodule Ch.Stream do
@moduledoc """
Stream struct returned from stream commands.
All of its fields are private.
"""

@derive {Inspect, only: []}
defstruct [:conn, :query, :params, :opts]

@type t :: %__MODULE__{
conn: DBConnection.conn(),
query: Ch.Query.t(),
params: Ch.params(),
opts: [Ch.query_option()]
}

defimpl Enumerable do
def reduce(stream, acc, fun) do
%Ch.Stream{conn: conn, query: query, params: params, opts: opts} = stream
stream = %DBConnection.Stream{conn: conn, query: query, params: params, opts: opts}
DBConnection.reduce(stream, acc, fun)
end

def member?(_, _), do: {:error, __MODULE__}
def count(_), do: {:error, __MODULE__}
def slice(_), do: {:error, __MODULE__}
end

defimpl Collectable do
def into(stream) do
%Ch.Stream{conn: conn, query: query, params: params, opts: opts} = stream
DBConnection.execute!(conn, query, params, opts)
{stream, &collect/2}
end

defp collect(%{conn: conn, query: query} = stream, {:cont, data}) do
DBConnection.execute!(conn, %{query | statement: data}, [])
stream
end

defp collect(conn, :done), do: HTTP.stream_request_body(conn, ref(conn), :eof)

Check failure on line 42 in lib/ch/stream.ex

View workflow job for this annotation

GitHub Actions / mix (1.14, 25, 23.3.7.5, UTC)

** (CompileError) lib/ch/stream.ex:42: undefined function ref/1 (expected Collectable.Ch.Stream to define such a function or for it to be imported, but none are available)

Check failure on line 42 in lib/ch/stream.ex

View workflow job for this annotation

GitHub Actions / mix (1.14, 26, 23.3.7.5, UTC)

** (CompileError) lib/ch/stream.ex:42: undefined function ref/1 (expected Collectable.Ch.Stream to define such a function or for it to be imported, but none are available)
end
end
52 changes: 52 additions & 0 deletions test/ch/stream_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
defmodule Ch.StreamTest do
use ExUnit.Case

setup do
pool = start_supervised!({Ch, database: Ch.Test.database()})
{:ok, pool: pool}
end

describe "enumerable" do
test "works", %{pool: pool} do
result =
DBConnection.run(pool, fn conn ->
conn
|> Ch.stream("select * from system.numbers limit {limit:UInt32}", %{"limit" => 100})
|> Enum.into([])
end)

assert [
%Ch.Result{
command: :select,
data: [
<<1, 6, 110, 117, 109, 98, 101, 114, 6, 85, 73, 110, 116, 54, 52>>,
<<_::6400>>
]
},
%Ch.Result{command: :select, data: []}
] = result
end
end

describe "collectable" do
test "works", %{pool: pool} do
Ch.query!(pool, "create table collectable_test(a UInt64, b String) engine Null")

rows =
Stream.repeatedly(fn -> [0, "0"] end)
|> Stream.chunk_every(10000)
|> Stream.map(fn chunk -> Ch.RowBinary.encode_rows(chunk, ["UInt64", "String"]) end)
|> Stream.take(3)

result =
DBConnection.run(pool, fn conn ->
Enum.into(
rows,
Ch.stream(conn, "insert into collectable_test(a, b) format RowBinary\n")
)
end)

assert result == :asdlkfhajsdf
end
end
end

0 comments on commit e5c5b34

Please sign in to comment.