-
Notifications
You must be signed in to change notification settings - Fork 426
Block schema field drop if it is reference by an active partition or sort field #2410
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
d4928bd to
81732d4
Compare
Anton-Tarazi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work, left a minor comment :)
81732d4 to
7e47009
Compare
|
@Anton-Tarazi thanks for the review (this PR went off my radar) I have rebased and applied recommended changes |
geruh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some suggestions to get this moving for the 0.11 release. These fix the two bugs inverted allow_missing_fields logic, and string interpolation, also, we simplify the code by using existing schema._lazy_id_to_parent
pyiceberg/partitioning.py
Outdated
| def check_compatible(self, schema: Schema, allow_missing_fields: bool = False) -> None: | ||
| # if the underlying field is dropped, we cannot check they are compatible -- continue | ||
| schema_fields = schema._lazy_id_to_field | ||
| parents = _index_parents(schema) | ||
|
|
||
| def validate_parents_are_structs(field_id: int) -> None: | ||
| parent_id = parents.get(field_id) | ||
| while parent_id: | ||
| parent_type = schema.find_type(parent_id) | ||
| if not parent_type.is_struct: | ||
| raise ValidationError("Invalid partition field parent: %s", parent_type) | ||
| parent_id = parents.get(parent_id) | ||
|
|
||
| for field in self.fields: | ||
| source_field = schema_fields.get(field.source_id) | ||
| if allow_missing_fields and source_field: | ||
| continue | ||
|
|
||
| if not isinstance(field.transform, VoidTransform): | ||
| if source_field: | ||
| source_type = source_field.field_type | ||
| if not source_type.is_primitive: | ||
| raise ValidationError(f"Cannot partition by non-primitive source field: {source_type}") | ||
| if not field.transform.can_transform(source_type): | ||
| raise ValidationError(f"Invalid source type {source_type} for transform: {field.transform}") | ||
| # The only valid parent types for a PartitionField are StructTypes. This must be checked recursively | ||
| validate_parents_are_structs(field.source_id) | ||
| else: | ||
| raise ValidationError(f"Cannot find source column for partition field: {field}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can simplify the logic here:
- compute the id to fiel logic once instead of indexing for each field.
- align the
allow_missing_fieldslogic with java implementation
| def check_compatible(self, schema: Schema, allow_missing_fields: bool = False) -> None: | |
| # if the underlying field is dropped, we cannot check they are compatible -- continue | |
| schema_fields = schema._lazy_id_to_field | |
| parents = _index_parents(schema) | |
| def validate_parents_are_structs(field_id: int) -> None: | |
| parent_id = parents.get(field_id) | |
| while parent_id: | |
| parent_type = schema.find_type(parent_id) | |
| if not parent_type.is_struct: | |
| raise ValidationError("Invalid partition field parent: %s", parent_type) | |
| parent_id = parents.get(parent_id) | |
| for field in self.fields: | |
| source_field = schema_fields.get(field.source_id) | |
| if allow_missing_fields and source_field: | |
| continue | |
| if not isinstance(field.transform, VoidTransform): | |
| if source_field: | |
| source_type = source_field.field_type | |
| if not source_type.is_primitive: | |
| raise ValidationError(f"Cannot partition by non-primitive source field: {source_type}") | |
| if not field.transform.can_transform(source_type): | |
| raise ValidationError(f"Invalid source type {source_type} for transform: {field.transform}") | |
| # The only valid parent types for a PartitionField are StructTypes. This must be checked recursively | |
| validate_parents_are_structs(field.source_id) | |
| else: | |
| raise ValidationError(f"Cannot find source column for partition field: {field}") | |
| def check_compatible(self, schema: Schema, allow_missing_fields: bool = False) -> None: | |
| for field in self.fields: | |
| if isinstance(field.transform, VoidTransform): | |
| continue | |
| source_field = schema._lazy_id_to_field.get(field.source_id) | |
| if source_field is None: | |
| if allow_missing_fields: | |
| continue | |
| raise ValidationError(f"Cannot find source column for partition field: {field}") | |
| source_type = source_field.field_type | |
| if not source_type.is_primitive: | |
| raise ValidationError(f"Cannot partition by non-primitive source field: {source_type}") | |
| if not field.transform.can_transform(source_type): | |
| raise ValidationError(f"Invalid source type {source_type} for transform: {field.transform}") | |
| parent_id = schema._lazy_id_to_parent.get(field.source_id) | |
| while parent_id is not None: | |
| parent_type = schema.find_type(parent_id) | |
| if not parent_type.is_struct: | |
| raise ValidationError(f"Cannot partition by field within non-struct parent: {parent_type}") | |
| parent_id = schema._lazy_id_to_parent.get(parent_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
compute the id to field logic once instead of indexing for each field.
IIUC It is a cached property, so only would be indexed once throughout its lifetime
7e47009 to
0a7a39e
Compare
|
Thanks for the review @geruh applied the comments |
geruh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @gabeiglio this LGTM!!
kevinjqliu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! thanks for the pr
couple of nits but not a blocker
| # Check correctness of partition spec, and sort order | ||
| new_metadata.spec().check_compatible(new_metadata.schema()) | ||
|
|
||
| if sort_order := new_metadata.sort_order_by_id(new_metadata.default_sort_order_id): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: i think we can add a sort_order() function to TableMetadata class, i notice its only available in Table. but we can refactor later
then we can do
new_metadata.sort_order().check_compatible(new_metadata.schema())
| path = "/".join([field_str + "=" + value_str for field_str, value_str in zip(field_strs, value_strs, strict=True)]) | ||
| return path | ||
|
|
||
| def check_compatible(self, schema: Schema, allow_missing_fields: bool = False) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: doesnt look like allow_missing_fields is used anywhere, should we still keep it?
| if not source_field.field_type.is_primitive: | ||
| raise ValidationError(f"Cannot sort by non-primitive source field: {source_field}") | ||
| if not field.transform.can_transform(source_field.field_type): | ||
| raise ValidationError(f"Invalid source type {source_field.field_type} for transform: {field.transform}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we include source_field in the msg here? or do you think theres enough context already?
Closes #2166
Rationale for this change
We should block when an user wants to drop a column if that column is being referenced by either a active partition spec or sort order field.
Are these changes tested?
Yes, I added unit tests for every incompatible schema change in partitions and sort orders. Also added two new integration tests in
test_catalogto test for this scenarioAre there any user-facing changes?
No