Skip to content

Add cluster mode for ClickHouse schema migrations #1221

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions database/clickhouse/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@
* Clickhouse cluster mode is not officially supported, since it's not tested right now, but you can try enabling `schema_migrations` table replication by specifying a `x-cluster-name`:
* When `x-cluster-name` is specified, `x-migrations-table-engine` also should be specified. See the docs regarding [replicated table engines](https://clickhouse.tech/docs/en/engines/table-engines/mergetree-family/replication/#table_engines-replication).
* When `x-cluster-name` is specified, only the `schema_migrations` table is replicated across the cluster. You still need to write your migrations so that the application tables are replicated within the cluster.
* When `x-cluster-name` is specified and `x-distributed` is set to "true", a distributed table engine is used allowing you to run commands from any host in the cluster. Note this only works in cluster mode and an `x-cluster-name` must be specified.
* If you want to create database inside the migration, you should know, that table which will manage migrations `schema-migrations table` will be in `default` table, so you can't use `USE <database_name>` inside migration. In this case you may not specify the database in the connection string (example you can find [here](examples/migrations/003_create_database.up.sql))
53 changes: 46 additions & 7 deletions database/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var (
type Config struct {
DatabaseName string
ClusterName string
IsDistributed bool
MigrationsTable string
MigrationsTableEngine string
MultiStatementEnabled bool
Expand Down Expand Up @@ -99,6 +100,7 @@ func (ch *ClickHouse) Open(dsn string) (database.Driver, error) {
MigrationsTableEngine: migrationsTableEngine,
DatabaseName: purl.Query().Get("database"),
ClusterName: purl.Query().Get("x-cluster-name"),
IsDistributed: purl.Query().Get("x-distributed") == "true",
MultiStatementEnabled: purl.Query().Get("x-multi-statement") == "true",
MultiStatementMaxSize: multiStatementMaxSize,
},
Expand Down Expand Up @@ -130,7 +132,14 @@ func (ch *ClickHouse) init() error {
ch.config.MigrationsTableEngine = DefaultMigrationsTableEngine
}

return ch.ensureVersionTable()
if err := ch.ensureVersionTable(); err != nil {
return err
}
if ch.config.IsDistributed {
ch.config.MigrationsTableEngine = "ReplicatedMergeTree" // base table must be replicated to supported a distributed table
return ch.ensureDistributedTable()
}
return nil
}

func (ch *ClickHouse) Run(r io.Reader) error {
Expand Down Expand Up @@ -192,7 +201,11 @@ func (ch *ClickHouse) SetVersion(version int, dirty bool) error {
return err
}

query := "INSERT INTO " + ch.config.MigrationsTable + " (version, dirty, sequence) VALUES (?, ?, ?)"
query := "INSERT INTO " + ch.config.DatabaseName + "." + ch.config.MigrationsTable + " (version, dirty, sequence) "
if ch.config.IsDistributed {
query = query + "SETTINGS distributed_foreground_insert = 1 "
}
query = query + "VALUES (?, ?, ?)"
if _, err := tx.Exec(query, version, bool(dirty), time.Now().UnixNano()); err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
Expand Down Expand Up @@ -233,19 +246,30 @@ func (ch *ClickHouse) ensureVersionTable() (err error) {

// if not, create the empty migration table
if len(ch.config.ClusterName) > 0 {
query = fmt.Sprintf(`
CREATE TABLE %s ON CLUSTER %s (

if ch.config.IsDistributed { // we will rename the underlying table
baseTableName := ch.config.MigrationsTable + "_local"
query = fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.%s ON CLUSTER %s (
version Int64,
dirty UInt8,
sequence UInt64
) Engine=%s Primary Key (sequence)`, ch.config.DatabaseName, baseTableName, ch.config.ClusterName, ch.config.MigrationsTableEngine)
} else { // cluster mode without the distributed table
query = fmt.Sprintf(`
CREATE TABLE %s.%s ON CLUSTER %s (
version Int64,
dirty UInt8,
sequence UInt64
) Engine=%s`, ch.config.MigrationsTable, ch.config.ClusterName, ch.config.MigrationsTableEngine)
) Engine=%s`, ch.config.DatabaseName, ch.config.MigrationsTable, ch.config.ClusterName, ch.config.MigrationsTableEngine)
}
} else {
query = fmt.Sprintf(`
CREATE TABLE %s (
CREATE TABLE %s.%s (
version Int64,
dirty UInt8,
sequence UInt64
) Engine=%s`, ch.config.MigrationsTable, ch.config.MigrationsTableEngine)
) Engine=%s`, ch.config.DatabaseName, ch.config.MigrationsTable, ch.config.MigrationsTableEngine)
}

if strings.HasSuffix(ch.config.MigrationsTableEngine, "Tree") {
Expand All @@ -258,6 +282,17 @@ func (ch *ClickHouse) ensureVersionTable() (err error) {
return nil
}

func (ch *ClickHouse) ensureDistributedTable() error {
baseTableName := ch.config.MigrationsTable + "_local"
query := fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.%s ON CLUSTER %s AS %s.%s Engine=Distributed(%s, %s, %s, sequence) SETTINGS fsync_after_insert = 1`, ch.config.DatabaseName, ch.config.MigrationsTable, ch.config.ClusterName, ch.config.DatabaseName, baseTableName, ch.config.ClusterName, ch.config.DatabaseName, baseTableName)

if _, err := ch.conn.Exec(query); err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
return nil
}

func (ch *ClickHouse) Drop() (err error) {
query := "SHOW TABLES FROM " + quoteIdentifier(ch.config.DatabaseName)
tables, err := ch.conn.Query(query)
Expand All @@ -279,6 +314,10 @@ func (ch *ClickHouse) Drop() (err error) {

query = "DROP TABLE IF EXISTS " + quoteIdentifier(ch.config.DatabaseName) + "." + quoteIdentifier(table)

if len(ch.config.ClusterName) > 0 {
query = query + " ON CLUSTER " + ch.config.ClusterName
}

if _, err := ch.conn.Exec(query); err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
Expand Down
Loading