Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@
<td>Boolean</td>
<td>Whether to remove the whole row in aggregation engine when -D records are received.</td>
</tr>
<tr>
<td><h5>add-column-before-partition</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If true, when adding a new column without specifying a position, the column will be placed before the first partition column instead of at the end of the schema. This only takes effect for partitioned tables.</td>
</tr>
<tr>
<td><h5>alter-column-null-to-not-null.disabled</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
14 changes: 14 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2040,6 +2040,16 @@ public InlineElement getDescription() {
"If true, it disables explicit type casting. For ex: it disables converting LONG type to INT type. "
+ "Users can enable this option to disable explicit type casting");

public static final ConfigOption<Boolean> ADD_COLUMN_BEFORE_PARTITION =
ConfigOptions.key("add-column-before-partition")
.booleanType()
.defaultValue(false)
.withDescription(
"If true, when adding a new column without specifying a position, "
+ "the column will be placed before the first partition column "
+ "instead of at the end of the schema. "
+ "This only takes effect for partitioned tables.");

public static final ConfigOption<Long> COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT =
ConfigOptions.key("commit.strict-mode.last-safe-snapshot")
.longType()
Expand Down Expand Up @@ -2955,6 +2965,10 @@ public boolean disableExplicitTypeCasting() {
return options.get(DISABLE_EXPLICIT_TYPE_CASTING);
}

public boolean addColumnBeforePartition() {
return options.get(ADD_COLUMN_BEFORE_PARTITION);
}

public boolean indexFileInDataFileDir() {
return options.get(INDEX_FILE_IN_DATA_FILE_DIR);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,14 @@ public static TableSchema generateTableSchema(
CoreOptions.DISABLE_EXPLICIT_TYPE_CASTING
.defaultValue()
.toString()));

boolean addColumnBeforePartition =
Boolean.parseBoolean(
oldOptions.getOrDefault(
CoreOptions.ADD_COLUMN_BEFORE_PARTITION.key(),
CoreOptions.ADD_COLUMN_BEFORE_PARTITION.defaultValue().toString()));
List<String> partitionKeys = oldTableSchema.partitionKeys();

List<DataField> newFields = new ArrayList<>(oldTableSchema.fields());
AtomicInteger highestFieldId = new AtomicInteger(oldTableSchema.highestFieldId());
String newComment = oldTableSchema.comment();
Expand Down Expand Up @@ -368,6 +376,17 @@ protected void updateLastColumn(
throw new UnsupportedOperationException(
"Unsupported move type: " + move.type());
}
} else if (addColumnBeforePartition
&& !partitionKeys.isEmpty()
&& addColumn.fieldNames().length == 1) {
int insertIndex = newFields.size();
for (int i = 0; i < newFields.size(); i++) {
if (partitionKeys.contains(newFields.get(i).name())) {
insertIndex = i;
break;
}
}
newFields.add(insertIndex, dataField);
} else {
newFields.add(dataField);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1578,4 +1578,202 @@ public void testDisableExplicitTypeCasting(String formatType) {
sql("ALTER TABLE T MODIFY v INT");
assertThat(sql("SELECT * FROM T")).containsExactlyInAnyOrder(Row.of(1, 10), Row.of(2, 20));
}

@Test
public void testAddColumnBeforePartitionEnabled() {
sql(
"CREATE TABLE T_PART (\n"
+ " user_id BIGINT,\n"
+ " item_id BIGINT,\n"
+ " behavior STRING,\n"
+ " dt STRING,\n"
+ " hh STRING\n"
+ ") PARTITIONED BY (dt, hh) WITH (\n"
+ " 'add-column-before-partition' = 'true'\n"
+ ")");

sql("INSERT INTO T_PART VALUES(1, 100, 'buy', '2024-01-01', '10')");

// Add column without specifying position
sql("ALTER TABLE T_PART ADD score DOUBLE");

List<Row> result = sql("SHOW CREATE TABLE T_PART");
assertThat(result.toString())
.contains(
"`user_id` BIGINT,\n"
+ " `item_id` BIGINT,\n"
+ " `behavior` VARCHAR(2147483647),\n"
+ " `score` DOUBLE,\n"
+ " `dt` VARCHAR(2147483647),\n"
+ " `hh` VARCHAR(2147483647)");

sql("INSERT INTO T_PART VALUES(2, 200, 'sell', 99.5, '2024-01-02', '11')");
result = sql("SELECT * FROM T_PART");
assertThat(result)
.containsExactlyInAnyOrder(
Row.of(1L, 100L, "buy", null, "2024-01-01", "10"),
Row.of(2L, 200L, "sell", 99.5, "2024-01-02", "11"));
}

@Test
public void testAddColumnBeforePartitionDisabledByDefault() {
sql(
"CREATE TABLE T_PART_DEFAULT (\n"
+ " user_id BIGINT,\n"
+ " item_id BIGINT,\n"
+ " dt STRING\n"
+ ") PARTITIONED BY (dt)");

// Add column without specifying position (default behavior)
sql("ALTER TABLE T_PART_DEFAULT ADD score DOUBLE");

List<Row> result = sql("SHOW CREATE TABLE T_PART_DEFAULT");
// score should be appended at the end
assertThat(result.toString())
.contains(
"`user_id` BIGINT,\n"
+ " `item_id` BIGINT,\n"
+ " `dt` VARCHAR(2147483647),\n"
+ " `score` DOUBLE");
}

@Test
public void testAddColumnBeforePartitionWithExplicitPosition() {
sql(
"CREATE TABLE T_PART_POS (\n"
+ " user_id BIGINT,\n"
+ " item_id BIGINT,\n"
+ " dt STRING\n"
+ ") PARTITIONED BY (dt) WITH (\n"
+ " 'add-column-before-partition' = 'true'\n"
+ ")");

// Add column with explicit FIRST position, should respect explicit position
sql("ALTER TABLE T_PART_POS ADD score DOUBLE FIRST");

List<Row> result = sql("SHOW CREATE TABLE T_PART_POS");
assertThat(result.toString())
.contains(
"`score` DOUBLE,\n"
+ " `user_id` BIGINT,\n"
+ " `item_id` BIGINT,\n"
+ " `dt` VARCHAR(2147483647)");
}

@Test
public void testAddColumnBeforePartitionViaAlterOption() {
sql(
"CREATE TABLE T_PART_ALTER (\n"
+ " user_id BIGINT,\n"
+ " item_id BIGINT,\n"
+ " dt STRING\n"
+ ") PARTITIONED BY (dt)");

// First add column without config (default: append at end)
sql("ALTER TABLE T_PART_ALTER ADD col1 INT");
List<Row> result = sql("SHOW CREATE TABLE T_PART_ALTER");
assertThat(result.toString())
.contains(
"`user_id` BIGINT,\n"
+ " `item_id` BIGINT,\n"
+ " `dt` VARCHAR(2147483647),\n"
+ " `col1` INT");

// Enable config via ALTER TABLE SET
sql("ALTER TABLE T_PART_ALTER SET ('add-column-before-partition' = 'true')");

// Now add another column, should go before partition column dt
sql("ALTER TABLE T_PART_ALTER ADD col2 DOUBLE");
result = sql("SHOW CREATE TABLE T_PART_ALTER");
assertThat(result.toString())
.contains(
"`user_id` BIGINT,\n"
+ " `item_id` BIGINT,\n"
+ " `col2` DOUBLE,\n"
+ " `dt` VARCHAR(2147483647),\n"
+ " `col1` INT");
}

@Test
public void testAddMultipleColumnsBeforePartition() {
sql(
"CREATE TABLE T_PART_MULTI (\n"
+ " user_id BIGINT,\n"
+ " item_id BIGINT,\n"
+ " dt STRING,\n"
+ " hh STRING\n"
+ ") PARTITIONED BY (dt, hh) WITH (\n"
+ " 'add-column-before-partition' = 'true'\n"
+ ")");

// Add first column
sql("ALTER TABLE T_PART_MULTI ADD col1 INT");
// Add second column
sql("ALTER TABLE T_PART_MULTI ADD ( col2 INT, col3 DOUBLE )");

List<Row> result = sql("SHOW CREATE TABLE T_PART_MULTI");
// Both new columns should be before partition columns dt and hh
assertThat(result.toString())
.contains(
"`user_id` BIGINT,\n"
+ " `item_id` BIGINT,\n"
+ " `col1` INT,\n"
+ " `col2` INT,\n"
+ " `col3` DOUBLE,\n"
+ " `dt` VARCHAR(2147483647),\n"
+ " `hh` VARCHAR(2147483647)");
}

@Test
public void testAddColumnBeforePartitionOnPrimaryKeyTable() {
sql(
"CREATE TABLE T_PK_PART (\n"
+ " user_id BIGINT,\n"
+ " item_id BIGINT,\n"
+ " behavior STRING,\n"
+ " dt STRING,\n"
+ " hh STRING,\n"
+ " PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n"
+ ") PARTITIONED BY (dt, hh) WITH (\n"
+ " 'add-column-before-partition' = 'true'\n"
+ ")");

sql("INSERT INTO T_PK_PART VALUES(1, 100, 'buy', '2024-01-01', '10')");

// Add single column
sql("ALTER TABLE T_PK_PART ADD score DOUBLE");

List<Row> result = sql("SHOW CREATE TABLE T_PK_PART");
assertThat(result.toString())
.contains(
"`user_id` BIGINT NOT NULL,\n"
+ " `item_id` BIGINT,\n"
+ " `behavior` VARCHAR(2147483647),\n"
+ " `score` DOUBLE,\n"
+ " `dt` VARCHAR(2147483647) NOT NULL,\n"
+ " `hh` VARCHAR(2147483647) NOT NULL");

// Add multiple columns
sql("ALTER TABLE T_PK_PART ADD ( col1 INT, col2 DOUBLE )");

result = sql("SHOW CREATE TABLE T_PK_PART");
assertThat(result.toString())
.contains(
"`user_id` BIGINT NOT NULL,\n"
+ " `item_id` BIGINT,\n"
+ " `behavior` VARCHAR(2147483647),\n"
+ " `score` DOUBLE,\n"
+ " `col1` INT,\n"
+ " `col2` DOUBLE,\n"
+ " `dt` VARCHAR(2147483647) NOT NULL,\n"
+ " `hh` VARCHAR(2147483647) NOT NULL");

// Verify data read/write still works
sql("INSERT INTO T_PK_PART VALUES(2, 200, 'sell', 99.5, 10, 3.14, '2024-01-02', '11')");
result = sql("SELECT * FROM T_PK_PART");
assertThat(result)
.containsExactlyInAnyOrder(
Row.of(1L, 100L, "buy", null, null, null, "2024-01-01", "10"),
Row.of(2L, 200L, "sell", 99.5, 10, 3.14, "2024-01-02", "11"));
}
}
2 changes: 2 additions & 0 deletions paimon-python/pypaimon/common/options/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
from .config_option import ConfigOption, Description
from .config_options import ConfigOptions
from .options import Options
from .core_options import CoreOptions

__all__ = [
'ConfigOption',
'Description',
'ConfigOptions',
'Options',
'CoreOptions'
]
13 changes: 13 additions & 0 deletions paimon-python/pypaimon/common/options/core_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,16 @@ class CoreOptions:
.with_description("Read batch size for any file format if it supports.")
)

ADD_COLUMN_BEFORE_PARTITION: ConfigOption[bool] = (
ConfigOptions.key("add-column-before-partition")
.boolean_type()
.default_value(False)
.with_description(
"When adding a new column, if the table has partition keys, "
"insert the new column before the first partition column by default."
)
)

def __init__(self, options: Options):
self.options = options

Expand Down Expand Up @@ -540,3 +550,6 @@ def global_index_thread_num(self) -> Optional[int]:

def read_batch_size(self, default=None) -> int:
return self.options.get(CoreOptions.READ_BATCH_SIZE, default or 1024)

def add_column_before_partition(self) -> bool:
return self.options.get(CoreOptions.ADD_COLUMN_BEFORE_PARTITION, False)
29 changes: 26 additions & 3 deletions paimon-python/pypaimon/schema/schema_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from pypaimon.catalog.catalog_exception import ColumnAlreadyExistException, ColumnNotExistException
from pypaimon.common.file_io import FileIO
from pypaimon.common.json_util import JSON
from pypaimon.common.options import Options, CoreOptions
from pypaimon.schema.data_types import AtomicInteger, DataField
from pypaimon.schema.schema import Schema
from pypaimon.schema.schema_change import (
Expand Down Expand Up @@ -181,7 +182,11 @@ def _apply_move(fields: List[DataField], new_field: Optional[DataField], move):


def _handle_add_column(
change: AddColumn, new_fields: List[DataField], highest_field_id: AtomicInteger
change: AddColumn,
new_fields: List[DataField],
highest_field_id: AtomicInteger,
partition_keys: List[str],
add_column_before_partition: bool
):
if not change.data_type.nullable:
raise ValueError(
Expand All @@ -194,6 +199,17 @@ def _handle_add_column(
new_field = DataField(field_id, field_name, change.data_type, change.comment)
if change.move:
_apply_move(new_fields, new_field, change.move)
elif (
add_column_before_partition
and partition_keys
and len(change.field_names) == 1
):
insert_index = len(new_fields)
for i, field in enumerate(new_fields):
if field.name in partition_keys:
insert_index = i
break
new_fields.insert(insert_index, new_field)
else:
new_fields.append(new_field)

Expand Down Expand Up @@ -278,7 +294,7 @@ def commit_changes(self, changes: List[SchemaChange]) -> TableSchema:
f"Table schema does not exist at path: {self.table_path}. "
"This may happen if the table was deleted concurrently."
)

new_table_schema = self._generate_table_schema(old_table_schema, changes)
try:
success = self.commit(new_table_schema)
Expand Down Expand Up @@ -306,6 +322,10 @@ def _generate_table_schema(
highest_field_id = AtomicInteger(old_table_schema.highest_field_id)
new_comment = old_table_schema.comment

# Get add_column_before_partition option
add_column_before_partition = CoreOptions(Options(old_table_schema.options)).add_column_before_partition()
partition_keys = old_table_schema.partition_keys

for change in changes:
if isinstance(change, SetOption):
new_options[change.key] = change.value
Expand All @@ -314,7 +334,10 @@ def _generate_table_schema(
elif isinstance(change, UpdateComment):
new_comment = change.comment
elif isinstance(change, AddColumn):
_handle_add_column(change, new_fields, highest_field_id)
_handle_add_column(
change, new_fields, highest_field_id,
partition_keys, add_column_before_partition
)
elif isinstance(change, RenameColumn):
_assert_not_updating_partition_keys(
old_table_schema, change.field_names, "rename"
Expand Down
Loading
Loading