Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
495 changes: 495 additions & 0 deletions .github/workflows/cloudberry-backup-ci.yml

Large diffs are not rendered by default.

175 changes: 175 additions & 0 deletions .github/workflows/scale-tests-cloudberry-ci.bash
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
#!/bin/bash

# ------------------------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed
# with this work for additional information regarding copyright
# ownership. The ASF licenses this file to You under the Apache
# License, Version 2.0 (the "License"); you may not use this file
# except in compliance with the License. You may obtain a copy of the
# License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# ------------------------------------------------------------------------------
# Non-perf scale tests for GitHub Actions Cloudberry demo cluster.
# This focuses on backup/restore correctness under moderate object/data scale.
# ------------------------------------------------------------------------------

set -euo pipefail

BACKUP_DIR="${BACKUP_DIR:-/tmp/scale_backup}"
LOG_DIR="${LOG_DIR:-/tmp/scale-test-logs}"

mkdir -p "${BACKUP_DIR}" "${LOG_DIR}"

extract_timestamp() {
local log_file="$1"
local ts
ts="$(grep -E "Backup Timestamp[[:space:]]*=" "${log_file}" | grep -Eo "[[:digit:]]{14}" | head -n 1 || true)"
if [ -z "${ts}" ]; then
local latest_gpbackup_log
latest_gpbackup_log="$(ls -1t "${HOME}/gpAdminLogs"/gpbackup_*.log 2>/dev/null | head -n 1 || true)"
if [ -n "${latest_gpbackup_log}" ]; then
ts="$(grep -E "Backup Timestamp[[:space:]]*=" "${latest_gpbackup_log}" | grep -Eo "[[:digit:]]{14}" | head -n 1 || true)"
fi
fi
if [ -z "${ts}" ]; then
echo "Could not parse backup timestamp from ${log_file}"
return 1
fi
echo "${ts}"
}

validate_datascaledb_restore() {
local restore_db="$1"
local src_tables
local dst_tables
local src_big
local dst_big

src_tables="$(psql -X -d datascaledb -Atc "SELECT count(*) FROM pg_class c JOIN pg_namespace n ON n.oid=c.relnamespace WHERE n.nspname='public' AND c.relkind='r'")"
dst_tables="$(psql -X -d "${restore_db}" -Atc "SELECT count(*) FROM pg_class c JOIN pg_namespace n ON n.oid=c.relnamespace WHERE n.nspname='public' AND c.relkind='r'")"
src_big="$(psql -X -d datascaledb -Atc "SELECT count(*) FROM tbl_big")"
dst_big="$(psql -X -d "${restore_db}" -Atc "SELECT count(*) FROM tbl_big")"

if [ "${src_tables}" != "${dst_tables}" ] || [ "${src_big}" != "${dst_big}" ]; then
echo "Data scale restore validation failed for ${restore_db}"
echo "source tables=${src_tables}, restored tables=${dst_tables}"
echo "source tbl_big=${src_big}, restored tbl_big=${dst_big}"
return 1
fi
}

echo "## Preparing copy queue scale database ##"
psql -X -d postgres -qc "DROP DATABASE IF EXISTS copyqueuedb"
createdb copyqueuedb
for j in $(seq 1 300); do
psql -X -d copyqueuedb -q -c "CREATE TABLE tbl_1k_${j}(i int) DISTRIBUTED BY (i);"
psql -X -d copyqueuedb -q -c "INSERT INTO tbl_1k_${j} SELECT generate_series(1,1000)"
done

echo "## Copy queue backup/restore matrix ##"
for q in 2 4 8; do
b_log="${LOG_DIR}/copyqueue_backup_q${q}.log"
echo "Running gpbackup copy queue size ${q}"
gpbackup --dbname copyqueuedb --backup-dir "${BACKUP_DIR}" --single-data-file --no-compression --copy-queue-size "${q}" \
2>&1 | tee "${b_log}"
timestamp="$(extract_timestamp "${b_log}")"
restore_db="copyqueue_restore_q${q}"
psql -X -d postgres -qc "DROP DATABASE IF EXISTS ${restore_db}"
gprestore --timestamp "${timestamp}" --backup-dir "${BACKUP_DIR}" --create-db --redirect-db "${restore_db}" --copy-queue-size "${q}" \
2>&1 | tee "${LOG_DIR}/copyqueue_restore_q${q}.log"
src_tbl_count="$(psql -X -d copyqueuedb -Atc "SELECT count(*) FROM pg_class c JOIN pg_namespace n ON n.oid=c.relnamespace WHERE n.nspname='public' AND c.relkind='r'")"
dst_tbl_count="$(psql -X -d "${restore_db}" -Atc "SELECT count(*) FROM pg_class c JOIN pg_namespace n ON n.oid=c.relnamespace WHERE n.nspname='public' AND c.relkind='r'")"
if [ "${src_tbl_count}" != "${dst_tbl_count}" ]; then
echo "Copy queue restore validation failed for ${restore_db}: source tables=${src_tbl_count}, restored tables=${dst_tbl_count}"
exit 1
fi
done

echo "## Preparing data scale database ##"
psql -X -d postgres -qc "DROP DATABASE IF EXISTS datascaledb"
createdb datascaledb
for j in $(seq 1 200); do
psql -X -d datascaledb -q -c "CREATE TABLE tbl_1k_${j}(i int) DISTRIBUTED BY (i);"
psql -X -d datascaledb -q -c "INSERT INTO tbl_1k_${j} SELECT generate_series(1,1000)"
done

psql -X -d datascaledb -q -c "CREATE TABLE tbl_big(i int) DISTRIBUTED BY (i);"
for j in $(seq 1 25); do
psql -X -d datascaledb -q -c "INSERT INTO tbl_big SELECT generate_series(1,100000)"
done

psql -X -d datascaledb -q -c "CREATE TABLE big_partition(a int, b int, c int) DISTRIBUTED BY (a) PARTITION BY RANGE (b) (START (1) END (101) EVERY (1))"
psql -X -d datascaledb -q -c "INSERT INTO big_partition SELECT i, i, i FROM generate_series(1,100) i"
for j in $(seq 1 8); do
psql -X -d datascaledb -q -c "INSERT INTO big_partition SELECT * FROM big_partition"
done

echo "## Running data scale backup/restore matrix ##"
run_data_scale_case() {
local case_name="$1"
local backup_flags="$2"
local restore_db="$3"
local jobs="$4"
local b_log="${LOG_DIR}/datascale_${case_name}_backup.log"
local r_log="${LOG_DIR}/datascale_${case_name}_restore.log"

gpbackup --dbname datascaledb --backup-dir "${BACKUP_DIR}" ${backup_flags} 2>&1 | tee "${b_log}"
local ts
ts="$(extract_timestamp "${b_log}")"
psql -X -d postgres -qc "DROP DATABASE IF EXISTS ${restore_db}"
gprestore --timestamp "${ts}" --backup-dir "${BACKUP_DIR}" --create-db --redirect-db "${restore_db}" --jobs "${jobs}" \
2>&1 | tee "${r_log}"
validate_datascaledb_restore "${restore_db}"
}

run_data_scale_case "multi_data_file" "--leaf-partition-data" "datascale_restore_multi" "4"
run_data_scale_case "multi_data_file_zstd" "--leaf-partition-data --compression-type zstd" "datascale_restore_multi_zstd" "4"
run_data_scale_case "single_data_file" "--leaf-partition-data --single-data-file" "datascale_restore_single" "1"
run_data_scale_case "single_data_file_zstd" "--leaf-partition-data --single-data-file --compression-type zstd" "datascale_restore_single_zstd" "1"

echo "## Preparing metadata scale database ##"
psql -X -d postgres -qc "DROP DATABASE IF EXISTS metadatascaledb"
createdb metadatascaledb

psql -X -d metadatascaledb <<'SQL'
DO $$
DECLARE
i int;
BEGIN
FOR i IN 1..80 LOOP
EXECUTE format('CREATE SCHEMA IF NOT EXISTS s_%s', i);
EXECUTE format('CREATE TABLE s_%s.t_%s(id int, val text) DISTRIBUTED BY (id)', i, i);
EXECUTE format('CREATE VIEW s_%s.v_%s AS SELECT * FROM s_%s.t_%s', i, i, i, i);
END LOOP;
END$$;
SQL

echo "## Running metadata-only backup/restore ##"
meta_backup_log="${LOG_DIR}/metadata_backup.log"
meta_restore_log="${LOG_DIR}/metadata_restore.log"
gpbackup --dbname metadatascaledb --backup-dir "${BACKUP_DIR}" --metadata-only --verbose 2>&1 | tee "${meta_backup_log}"
meta_ts="$(extract_timestamp "${meta_backup_log}")"
psql -X -d postgres -qc "DROP DATABASE IF EXISTS metadatascaledb_res"
gprestore --timestamp "${meta_ts}" --backup-dir "${BACKUP_DIR}" --redirect-db metadatascaledb_res --jobs 4 --create-db \
2>&1 | tee "${meta_restore_log}"

echo "## Minimal correctness checks ##"
src_schema_count="$(psql -X -d metadatascaledb -Atc "SELECT count(*) FROM pg_namespace WHERE nspname LIKE 's_%'")"
dst_schema_count="$(psql -X -d metadatascaledb_res -Atc "SELECT count(*) FROM pg_namespace WHERE nspname LIKE 's_%'")"
if [ "${src_schema_count}" != "${dst_schema_count}" ]; then
echo "Metadata restore schema count mismatch: src=${src_schema_count} dst=${dst_schema_count}"
exit 1
fi

echo "Scale tests completed successfully"
2 changes: 1 addition & 1 deletion plugins/generate_minio_config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ options:
endpoint: http://localhost:9000/
aws_access_key_id: minioadmin
aws_secret_access_key: minioadmin
bucket: gpbackup-s3-test
bucket: cloudberry-backup-s3-test
folder: test/backup
backup_max_concurrent_requests: 2
backup_multipart_chunksize: 5MB
Expand Down
4 changes: 2 additions & 2 deletions plugins/plugin_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ testdatalarge="$testdir/testdatalarge_$time_second.txt"
logdir="/tmp/test_bench_logs"

text="this is some text"
data=`LC_ALL=C tr -dc 'A-Za-z0-9' </dev/urandom | head -c 1000 ; echo`
data_large=`LC_ALL=C tr -dc 'A-Za-z0-9' </dev/urandom | head -c 1000000 ; echo`
data=`LC_ALL=C tr -dc 'A-Za-z0-9' </dev/urandom 2>/dev/null | head -c 1000 ; echo`
data_large=`LC_ALL=C tr -dc 'A-Za-z0-9' </dev/urandom 2>/dev/null | head -c 1000000 ; echo`
mkdir -p $testdir
mkdir -p $logdir
echo $text > $testfile
Expand Down
9 changes: 9 additions & 0 deletions plugins/s3plugin/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ func SetupPluginForBackup(c *cli.Context) error {
return err
}

func VersionCommand(c *cli.Context) error {
if Version == "" {
Version = "dev" // fallback for local builds without ldflags
}
fmt.Printf("gpbackup_s3_plugin version %s\n", Version)
fmt.Printf("gpbackup_s3_plugin api version %s\n", apiVersion)
return nil
}

func BackupFile(c *cli.Context) error {
config, sess, err := readConfigAndStartSession(c)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion plugins/s3plugin/s3plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ import (
"gopkg.in/yaml.v2"
)

var Version string
var (
version string // This will be set by ldflags
Version = version // Public alias
)

const apiVersion = "0.5.0"
const Mebibyte = 1024 * 1024
Expand Down
7 changes: 4 additions & 3 deletions utils/agent_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func StartGpbackupHelpers(c *cluster.Cluster, fpInfo filepath.FilePathInfo, oper
defer helperMutex.Unlock()

gphomePath := operating.System.Getenv("GPHOME")
envSourceCommand := SourceClusterEnvCommand(gphomePath)
pluginStr := ""
if pluginConfigFile != "" {
_, configFilename := path.Split(pluginConfigFile)
Expand Down Expand Up @@ -188,12 +189,12 @@ func StartGpbackupHelpers(c *cluster.Cluster, fpInfo filepath.FilePathInfo, oper
// we run these commands in sequence to ensure that any failure is critical; the last command ensures the agent process was successfully started
return fmt.Sprintf(`cat << HEREDOC > %[1]s && chmod +x %[1]s && ( nohup %[1]s &> /dev/null &)
#!/bin/bash
source %[2]s/greenplum_path.sh
%[2]s/bin/%s
%[3]s
%[2]s/bin/%[4]s

HEREDOC

`, scriptFile, gphomePath, helperCmdStr)
`, scriptFile, gphomePath, envSourceCommand, helperCmdStr)
})
c.CheckClusterError(remoteOutput, "Error starting gpbackup_helper agent", func(contentID int) string {
return "Error starting gpbackup_helper agent"
Expand Down
20 changes: 10 additions & 10 deletions utils/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ func (plugin *PluginConfig) CheckPluginExistsOnAllHosts(c *cluster.Cluster) stri
}

func (plugin *PluginConfig) checkPluginAPIVersion(c *cluster.Cluster) {
command := fmt.Sprintf("source %s/greenplum_path.sh && %s plugin_api_version",
operating.System.Getenv("GPHOME"), plugin.ExecutablePath)
command := fmt.Sprintf("%s && %s plugin_api_version",
SourceClusterEnvCommand(operating.System.Getenv("GPHOME")), plugin.ExecutablePath)
remoteOutput := c.GenerateAndExecuteCommand(
"Checking plugin api version on all hosts",
cluster.ON_HOSTS&cluster.INCLUDE_COORDINATOR,
Expand Down Expand Up @@ -159,8 +159,8 @@ func (plugin *PluginConfig) checkPluginAPIVersion(c *cluster.Cluster) {
}

func (plugin *PluginConfig) getPluginNativeVersion(c *cluster.Cluster) string {
command := fmt.Sprintf("source %s/greenplum_path.sh && %s --version",
operating.System.Getenv("GPHOME"), plugin.ExecutablePath)
command := fmt.Sprintf("%s && %s --version",
SourceClusterEnvCommand(operating.System.Getenv("GPHOME")), plugin.ExecutablePath)
remoteOutput := c.GenerateAndExecuteCommand(
"Checking plugin version on all hosts",
cluster.ON_HOSTS|cluster.INCLUDE_COORDINATOR,
Expand Down Expand Up @@ -284,8 +284,8 @@ func (plugin *PluginConfig) buildHookString(command string,
}

backupDir := fpInfo.GetDirForContent(contentID)
return fmt.Sprintf("source %s/greenplum_path.sh && %s %s %s %s %s %s",
operating.System.Getenv("GPHOME"), plugin.ExecutablePath, command,
return fmt.Sprintf("%s && %s %s %s %s %s %s",
SourceClusterEnvCommand(operating.System.Getenv("GPHOME")), plugin.ExecutablePath, command,
plugin.ConfigPath, backupDir, scope, contentIDStr)
}

Expand Down Expand Up @@ -426,8 +426,8 @@ func (plugin *PluginConfig) BackupSegmentTOCs(c *cluster.Cluster, fpInfo filepat
remoteOutput = c.GenerateAndExecuteCommand("Processing segment TOC files with plugin", cluster.ON_SEGMENTS,
func(contentID int) string {
tocFile := fpInfo.GetSegmentTOCFilePath(contentID)
return fmt.Sprintf("source %s/greenplum_path.sh && %s backup_file %s %s && "+
"chmod 0755 %s", operating.System.Getenv("GPHOME"), plugin.ExecutablePath, plugin.ConfigPath, tocFile, tocFile)
return fmt.Sprintf("%s && %s backup_file %s %s && "+
"chmod 0755 %s", SourceClusterEnvCommand(operating.System.Getenv("GPHOME")), plugin.ExecutablePath, plugin.ConfigPath, tocFile, tocFile)
})
c.CheckClusterError(remoteOutput, "Unable to process segment TOC files using plugin", func(contentID int) string {
return "See gpAdminLog for gpbackup_helper on segment host for details: Error occurred with plugin"
Expand All @@ -445,8 +445,8 @@ func (plugin *PluginConfig) RestoreSegmentTOCs(c *cluster.Cluster, fpInfo filepa
tocFile := fpInfo.GetSegmentTOCFilePath(contentID)
// Restore the filename with the origin content to the directory with the destination content
tocFile = strings.ReplaceAll(tocFile, fmt.Sprintf("gpbackup_%d", contentID), fmt.Sprintf("gpbackup_%d", origContent))
command = fmt.Sprintf("mkdir -p %s && source %s/greenplum_path.sh && %s restore_file %s %s",
fpInfo.GetDirForContent(contentID), operating.System.Getenv("GPHOME"),
command = fmt.Sprintf("mkdir -p %s && %s && %s restore_file %s %s",
fpInfo.GetDirForContent(contentID), SourceClusterEnvCommand(operating.System.Getenv("GPHOME")),
plugin.ExecutablePath, plugin.ConfigPath, tocFile)
return command
})
Expand Down
19 changes: 19 additions & 0 deletions utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,25 @@ func CommandExists(cmd string) bool {
return err == nil
}

func ClusterEnvScriptPath(gphome string) string {
greenplumPath := path.Join(gphome, "greenplum_path.sh")
if FileExists(greenplumPath) {
return greenplumPath
}

cloudberryPath := path.Join(gphome, "cloudberry-env.sh")
if FileExists(cloudberryPath) {
return cloudberryPath
}

// Preserve previous behavior as fallback for clearer error messages upstream.
return greenplumPath
}

func SourceClusterEnvCommand(gphome string) string {
return fmt.Sprintf("source %s", ClusterEnvScriptPath(gphome))
}

func FileExists(filename string) bool {
_, err := os.Stat(filename)
return err == nil
Expand Down
Loading