NAME

EV::ClickHouse - Async ClickHouse client using EV

SYNOPSIS

use EV;
use EV::ClickHouse;

my $ch = EV::ClickHouse->new(
    host       => '127.0.0.1',
    port       => 8123,
    protocol   => 'http',       # or 'native'
    user       => 'default',
    password   => '',
    database   => 'default',
    settings   => { max_threads => 4 },  # connection-level defaults
    on_connect => sub { print "connected\n" },
    on_error   => sub { warn "error: $_[0]\n" },
);

# simple query
$ch->query("select * from system.one", sub {
    my ($rows, $err) = @_;
    if ($err) { warn $err; return }
    for my $row (@$rows) {
        print join(", ", @$row), "\n";
    }
});

# query with per-query settings
$ch->query("select 1", { max_execution_time => 30 }, sub {
    my ($rows, $err) = @_;
});

# insert data (TSV string)
$ch->insert("my_table", "1\tfoo\n2\tbar\n", sub {
    my (undef, $err) = @_;
    warn "insert error: $err" if $err;
});

# insert data (arrayref — no escaping needed)
$ch->insert("my_table", [
    [1, "foo"],
    [2, "bar"],
], sub {
    my (undef, $err) = @_;
    warn "insert error: $err" if $err;
});

# insert with async_insert
$ch->insert("my_table", [[1, "foo"]], { async_insert => 1 }, sub {
    my (undef, $err) = @_;
});

# raw mode — get response body as-is (HTTP only)
$ch->query("SELECT * FROM my_table FORMAT CSV", { raw => 1 }, sub {
    my ($body, $err) = @_;
    print $body;  # raw CSV text
});

EV::run;

DESCRIPTION

EV::ClickHouse is an asynchronous ClickHouse client that integrates with the EV event loop. It supports both the HTTP (port 8123) and native TCP (port 9000) protocols, implemented directly in XS without external ClickHouse client libraries.

Key features:

  • HTTP protocol with queued request delivery

  • Native TCP protocol with binary column-oriented data

  • Gzip compression (HTTP) and LZ4 compression (native)

  • TLS/SSL support via OpenSSL (with skip-verify option)

  • TabSeparated format parsing

  • INSERT with data support

  • Session management (HTTP)

  • Query/connect timeouts and auto-reconnect

  • Query cancellation

  • Streaming results via on_data callback

  • Opt-in decode of Date/DateTime, Decimal, Enum columns

  • Named rows (hashref) mode

CONSTRUCTOR

new

my $ch = EV::ClickHouse->new(%args);

Connection parameters:

host => $hostname

Server hostname. Default: 127.0.0.1.

Note: DNS resolution is currently blocking. For fully asynchronous behavior, use an IP address or a local caching resolver.

port => $port

Server port. Default: 8123 for HTTP, 9000 for native.

protocol => 'http' | 'native'

Protocol to use. Default: http.

user => $username

Username. Default: default.

password => $password

Password. Default: empty.

database => $dbname

Default database. Default: default. Also accepts db.

tls => 0 | 1

Enable TLS. Default: 0.

tls_ca_file => $path

Path to a CA certificate file for TLS verification. If provided, it will be used in addition to system default CA paths.

tls_skip_verify => 0 | 1

Skip TLS certificate verification. Default: 0. Useful for self-signed certificates in development.

Callbacks:

on_connect => sub { }

Called when the connection is established.

on_error => sub { my ($message) = @_ }

Called on connection-level errors. Default: sub { die @_ }.

on_progress => sub { my ($rows, $bytes, $total_rows, $written_rows, $written_bytes) = @_ }

Called on native protocol progress packets. Not fired for HTTP.

on_disconnect => sub { }

Called when the connection is closed (either by finish(), server disconnect, or error). Useful for reconnect logic or cleanup.

on_trace => sub { my ($message) = @_ }

Debug trace callback. Called with internal state machine messages (e.g. query dispatch). Useful for debugging protocol issues.

Options:

compress => 0 | 1

Enable compression. Default: 0.

session_id => $id

HTTP session ID for stateful operations.

connect_timeout => $seconds

Connection timeout in seconds.

query_timeout => $seconds

Default query timeout applied to all queries. Can be overridden per-query via the query_timeout key in the settings hashref.

auto_reconnect => 0 | 1

Automatically reconnect on connection loss. Default: 0. When enabled, queued (unsent) queries are preserved across reconnects; in-flight queries receive an error callback.

settings => \%hash

Connection-level ClickHouse settings applied to every query and insert. Per-query settings (see "query", "insert") override these defaults.

settings => { async_insert => 1, max_threads => 4 }
keepalive => $seconds

Send periodic native protocol ping packets to keep the connection alive during idle periods. Set to 0 (default) to disable. Only effective with the native protocol.

reconnect_delay => $seconds

Initial delay for reconnect backoff when auto_reconnect is enabled. The delay doubles after each failed attempt, up to reconnect_max_delay. Set to 0 (default) for immediate reconnect (no backoff).

reconnect_max_delay => $seconds

Maximum reconnect delay. Default: 0 (no cap).

Decode options (native protocol only):

These options control how column values are formatted when returned from the native protocol. All are opt-in and default to 0 (returning raw numeric values for backward compatibility).

decode_datetime => 0 | 1

Return Date, Date32, DateTime, and DateTime64 columns as formatted strings (e.g. "2024-01-15", "2024-01-15 10:30:00") instead of raw integer values. Uses UTC by default; if the column has an explicit timezone (e.g. DateTime('America/New_York')), values are converted to that timezone.

decode_decimal => 0 | 1

Return Decimal32/Decimal64/Decimal128 columns as scaled floating-point numbers instead of unscaled integers.

decode_enum => 0 | 1

Return Enum8/Enum16 columns as string labels instead of numeric codes.

named_rows => 0 | 1

Return each row as a hashref (keyed by column name) instead of an arrayref.

my $ch = EV::ClickHouse->new(named_rows => 1, ...);
$ch->query("SELECT 1 as n", sub {
    my ($rows, $err) = @_;
    print $rows->[0]{n};  # 1
});

METHODS

query

$ch->query($sql, sub { my ($rows, $err) = @_ });
$ch->query($sql, \%settings, sub { my ($rows, $err) = @_ });

Executes a SQL query. For SELECT: callback receives ($arrayref_of_arrayrefs). For DDL/DML: callback receives (undef) on success. On error: (undef, $error_message).

The optional \%settings hashref passes per-query ClickHouse settings (e.g. max_execution_time, max_threads). These override any connection-level defaults. Special keys (not sent to the server):

query_id — sets the query identifier (protocol-level field)
raw — HTTP only. When true, the callback receives the raw response body as a scalar string instead of parsed rows. Use this with an explicit FORMAT clause (CSV, JSONEachRow, Parquet, etc.):
$ch->query("SELECT * FROM t FORMAT CSV", { raw => 1 }, sub {
    my ($body, $err) = @_;
    # $body is the raw CSV text
});

Not supported with the native protocol (croaks).

query_timeout — per-query timeout in seconds, overriding the connection-level query_timeout.
on_data — native protocol only. A code ref called for each data block as it arrives. Enables streaming: rows are delivered incrementally and not accumulated.
$ch->query("SELECT * FROM big_table",
    { on_data => sub { my ($rows) = @_; process_batch($rows) } },
    sub { my (undef, $err) = @_; ... }  # final callback
);

Native protocol type notes: With the native protocol, column values are returned as typed Perl scalars by default. Date and DateTime columns return integer values (days since epoch and Unix timestamps); enable decode_datetime for formatted strings. Enum columns return numeric codes; enable decode_enum for string labels. Decimal columns return unscaled integers; enable decode_decimal for scaled floats. SimpleAggregateFunction columns are transparently decoded as their inner type. Nested columns are decoded as arrays of tuples. LowCardinality columns work across multi-block results with shared dictionaries.

insert

$ch->insert($table, $data, sub { my (undef, $err) = @_ });
$ch->insert($table, $data, \%settings, sub { my (undef, $err) = @_ });

$data can be either:

  • A string in TabSeparated format (tab-separated columns, newline-separated rows)

  • An arrayref of arrayrefs: [ [$col1, $col2, ...], ... ]

When using arrayrefs, values are encoded directly without TSV escaping: undef maps to NULL, strings may contain tabs and newlines freely, arrayrefs encode Array/Tuple columns, and hashrefs encode Map columns.

# TSV string (existing)
$ch->insert("my_table", "1\thello\n2\tworld\n", sub { ... });

# Arrayref (new) — no escaping needed
$ch->insert("my_table", [
    [1, "hello\tworld"],      # embedded tab
    [2, undef],               # NULL
    [3, [10, 20]],            # Array column
], sub { ... });

The optional \%settings hashref works the same as in "query".

ping

$ch->ping(sub { my ($result, $err) = @_ });

Checks if the connection is alive. On success $result is a true value and $err is undef. On error: (undef, $error_message).

finish

$ch->finish;

Disconnects. Cancels pending operations.

reset

$ch->reset;

Disconnects and reconnects using original parameters.

drain

$ch->drain(sub { ... });

Registers a callback to be invoked when all pending queries have completed. If no queries are pending, the callback fires immediately (synchronously). Useful for graceful shutdown: queue your final queries, then call drain with a callback that calls finish.

$ch->query("SELECT 1", sub { ... });
$ch->query("SELECT 2", sub { ... });
$ch->drain(sub {
    print "all done\n";
    $ch->finish;
});

cancel

$ch->cancel;

Cancels the currently running query. For the native protocol, sends a CLIENT_CANCEL packet. For HTTP, closes the connection. Pending callbacks receive an error.

skip_pending

$ch->skip_pending;

Cancels all pending operations. Each pending callback is invoked with (undef, $error_message). If a request is currently in flight, the connection is closed (subsequent queries require reconnection via reset).

ACCESSORS

is_connected

Returns true if the connection is established.

pending_count

Number of pending (queued + in-flight) operations.

server_info

Full server identification string (e.g. "ClickHouse 24.1.0 (revision 54429)"). Only available with the native protocol (populated from ServerHello). Returns undef for HTTP connections.

server_version

Server version string (e.g. "24.1.0"). Only available with the native protocol. Returns undef for HTTP connections.

server_timezone

Server timezone string (e.g. "UTC", "Europe/Moscow"). Only available with the native protocol. Returns undef for HTTP connections.

column_names

Returns an arrayref of column names from the last native protocol query result, or undef if no query has been executed yet.

$ch->query("SELECT 1 as foo, 2 as bar", sub {
    my $names = $ch->column_names;  # ['foo', 'bar']
});
column_types

Returns an arrayref of ClickHouse type strings from the last native protocol query result (e.g. ['UInt32', 'String', 'Nullable(DateTime)']), or undef if no query has been executed yet.

last_query_id

Returns the query_id of the last dispatched query, or undef if none. Set via { query_id => 'my-id' } in the settings hash of query or insert.

last_error_code

Returns the ClickHouse error code (integer) from the last server error, or 0 if no error. Useful for distinguishing retryable errors (e.g. 202 = TOO_MANY_SIMULTANEOUS_QUERIES) from permanent ones (e.g. 60 = UNKNOWN_TABLE).

last_totals

Returns an arrayref of totals rows from the last native protocol query that used WITH TOTALS, or undef if none.

last_extremes

Returns an arrayref of extremes rows from the last native protocol query, or undef if none.

profile_rows_before_limit

Number of rows that would have been returned without LIMIT, from the last query's profile info. Useful for pagination.

profile_rows

Total rows processed by the last query.

profile_bytes

Total bytes processed by the last query.

ALIASES

q          -> query
reconnect  -> reset
disconnect -> finish

SEE ALSO

EV, EV::Pg, EV::MariaDB

AUTHOR

vividsnow

LICENSE

This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.

1 POD Error

The following errors were encountered while parsing the POD:

Around line 161:

Non-ASCII character seen before =encoding in '—'. Assuming UTF-8