Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 48 additions & 6 deletions src/qs_codec/decode.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,19 @@ def _interpret_numeric_entities(value: str) -> str:
return re.sub(r"&#(\d+);", lambda match: chr(int(match.group(1))), value)


def _parse_array_value(value: t.Any, options: DecodeOptions) -> t.Any:
def _parse_array_value(value: t.Any, options: DecodeOptions, current_list_length: int) -> t.Any:
if isinstance(value, str) and value and options.comma and "," in value:
return value.split(",")
split_val: t.List[str] = value.split(",")
if options.raise_on_limit_exceeded and len(split_val) > options.list_limit:
raise ValueError(
f"List limit exceeded: Only {options.list_limit} element{'' if options.list_limit == 1 else 's'} allowed in a list."
)
return split_val

if options.raise_on_limit_exceeded and current_list_length >= options.list_limit:
raise ValueError(
f"List limit exceeded: Only {options.list_limit} element{'' if options.list_limit == 1 else 's'} allowed in a list."
)

return value

Expand All @@ -61,11 +71,26 @@ def _parse_query_string_values(value: str, options: DecodeOptions) -> t.Dict[str
clean_str: str = value.replace("?", "", 1) if options.ignore_query_prefix else value
clean_str = clean_str.replace("%5B", "[").replace("%5b", "[").replace("%5D", "]").replace("%5d", "]")
limit: t.Optional[int] = None if isinf(options.parameter_limit) else options.parameter_limit # type: ignore [assignment]

if limit is not None and limit <= 0:
raise ValueError("Parameter limit must be a positive integer.")

parts: t.List[str]
if isinstance(options.delimiter, re.Pattern):
parts = re.split(options.delimiter, clean_str) if not limit else re.split(options.delimiter, clean_str)[:limit]
parts = (
re.split(options.delimiter, clean_str)
if (limit is None) or not limit
else re.split(options.delimiter, clean_str)[: (limit + 1 if options.raise_on_limit_exceeded else limit)]
)
else:
parts = clean_str.split(options.delimiter) if not limit else clean_str.split(options.delimiter)[:limit]
parts = (
clean_str.split(options.delimiter)
if (limit is None) or not limit
else clean_str.split(options.delimiter)[: (limit + 1 if options.raise_on_limit_exceeded else limit)]
)

if options.raise_on_limit_exceeded and (limit is not None) and len(parts) > limit:
raise ValueError(f"Parameter limit exceeded: Only {limit} parameter{'' if limit == 1 else 's'} allowed.")

skip_index: int = -1 # Keep track of where the utf8 sentinel was found
i: int
Expand Down Expand Up @@ -98,7 +123,11 @@ def _parse_query_string_values(value: str, options: DecodeOptions) -> t.Dict[str
else:
key = options.decoder(part[:pos], charset)
val = Utils.apply(
_parse_array_value(part[pos + 1 :], options),
_parse_array_value(
part[pos + 1 :],
options,
len(obj[key]) if key in obj and isinstance(obj[key], (list, tuple)) else 0,
),
lambda v: options.decoder(v, charset),
)

Expand All @@ -123,7 +152,20 @@ def _parse_query_string_values(value: str, options: DecodeOptions) -> t.Dict[str
def _parse_object(
chain: t.Union[t.List[str], t.Tuple[str, ...]], val: t.Any, options: DecodeOptions, values_parsed: bool
) -> t.Any:
leaf: t.Any = val if values_parsed else _parse_array_value(val, options)
current_list_length: int = 0

if bool(chain) and chain[-1] == "[]":
parent_key: t.Optional[int]

try:
parent_key = int("".join(chain[0:-1]))
except ValueError:
parent_key = None

if parent_key is not None and isinstance(val, (list, tuple)) and parent_key in dict(enumerate(val)):
current_list_length = len(val[parent_key])

leaf: t.Any = val if values_parsed else _parse_array_value(val, options, current_list_length)

i: int
for i in reversed(range(len(chain))):
Expand Down
7 changes: 5 additions & 2 deletions src/qs_codec/models/decode_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,13 @@ class DecodeOptions:
"""To disable ``list`` parsing entirely, set ``parse_lists`` to ``False``."""

strict_depth: bool = False
"""Set to ``True`` to throw an error when the input exceeds the ``depth`` limit."""
"""Set to ``True`` to raise an error when the input exceeds the ``depth`` limit."""

strict_null_handling: bool = False
"""Set to true to decode values without ``=`` to ``None``."""
"""Set to ``True`` to decode values without ``=`` to ``None``."""

raise_on_limit_exceeded: bool = False
"""Set to ``True`` to raise an error when the input contains more parameters than the ``list_limit``."""

decoder: t.Callable[[t.Optional[str], t.Optional[Charset]], t.Any] = DecodeUtils.decode
"""Set a ``Callable`` to affect the decoding of the input."""
Expand Down
15 changes: 4 additions & 11 deletions src/qs_codec/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ def merge(
else:
target_[len(target_)] = source

target = list(filter(lambda el: not isinstance(el, Undefined), target_.values()))
if any(isinstance(value, Undefined) for value in target_.values()):
target = {str(i): target_[i] for i in target_ if not isinstance(target_[i], Undefined)}
else:
target = list(filter(lambda el: not isinstance(el, Undefined), target_.values()))
else:
if isinstance(source, (list, tuple)):
if all((isinstance(el, t.Mapping) or isinstance(el, Undefined)) for el in target) and all(
Expand Down Expand Up @@ -123,20 +126,10 @@ def compact(value: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
queue.append({"obj": obj, "prop": key})
refs.append(val)

Utils._compact_queue(queue)
Utils._remove_undefined_from_map(value)

return value

@staticmethod
def _compact_queue(queue: t.List[t.Dict]) -> None:
while len(queue) > 1:
item = queue.pop()
obj = item["obj"][item["prop"]]

if isinstance(obj, (list, tuple)):
item["obj"][item["prop"]] = list(filter(lambda el: not isinstance(el, Undefined), obj))

@staticmethod
def _remove_undefined_from_list(value: t.List) -> None:
i: int = len(value) - 1
Expand Down
66 changes: 64 additions & 2 deletions tests/unit/decode_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def test_parses_a_mix_of_simple_and_explicit_lists(self) -> None:
assert decode("a[0]=b&a=c") == {"a": ["b", "c"]}
assert decode("a=b&a[0]=c") == {"a": ["b", "c"]}

assert decode("a[1]=b&a=c", DecodeOptions(list_limit=20)) == {"a": ["b", "c"]}
assert decode("a[1]=b&a=c", DecodeOptions(list_limit=20)) == {"a": {"1": "b", "2": "c"}}
assert decode("a[]=b&a=c", DecodeOptions(list_limit=0)) == {"a": ["b", "c"]}
assert decode("a[]=b&a=c") == {"a": ["b", "c"]}

Expand Down Expand Up @@ -292,7 +292,9 @@ def test_allows_for_empty_strings_in_lists(self) -> None:
assert decode("a[]=&a[]=b&a[]=c") == {"a": ["", "b", "c"]}

def test_compacts_sparse_lists(self) -> None:
assert decode("a[10]=1&a[2]=2", DecodeOptions(list_limit=20)) == {"a": ["2", "1"]}
decoded = decode("a[10]=1&a[2]=2", DecodeOptions(list_limit=20))
assert decoded == {"a": {"10": "1", "2": "2"}}
assert decoded != {"a": ["2", "1"]}
assert decode("a[1][b][2][c]=1", DecodeOptions(list_limit=20)) == {"a": [{"b": [{"c": "1"}]}]}
assert decode("a[1][2][3][c]=1", DecodeOptions(list_limit=20)) == {"a": [[[{"c": "1"}]]]}
assert decode("a[1][2][3][c][1]=1", DecodeOptions(list_limit=20)) == {"a": [[[{"c": ["1"]}]]]}
Expand Down Expand Up @@ -684,3 +686,63 @@ def test_decodes_successfully_when_depth_is_within_the_limit_with_strict_depth_f

def test_does_not_throw_when_depth_is_exactly_at_the_limit_with_strict_depth_true(self) -> None:
assert decode("a[b][c]=d", DecodeOptions(depth=2, strict_depth=True)) == {"a": {"b": {"c": "d"}}}


class TestParameterList:
def test_does_not_raise_error_when_within_parameter_limit(self) -> None:
assert decode("a=1&b=2&c=3", DecodeOptions(parameter_limit=5, raise_on_limit_exceeded=True)) == {
"a": "1",
"b": "2",
"c": "3",
}

def test_raises_error_when_parameter_limit_exceeded(self) -> None:
with pytest.raises(ValueError):
decode("a=1&b=2&c=3&d=4&e=5&f=6", DecodeOptions(parameter_limit=3, raise_on_limit_exceeded=True))

def test_silently_truncates_when_throw_on_limit_exceeded_is_not_given(self) -> None:
assert decode("a=1&b=2&c=3&d=4&e=5", DecodeOptions(parameter_limit=3)) == {"a": "1", "b": "2", "c": "3"}

def test_silently_truncates_when_parameter_limit_exceeded_without_error(self) -> None:
assert decode("a=1&b=2&c=3&d=4&e=5", DecodeOptions(parameter_limit=3, raise_on_limit_exceeded=False)) == {
"a": "1",
"b": "2",
"c": "3",
}

def test_allows_unlimited_parameters_when_parameter_limit_set_to_infinity(self) -> None:
assert decode("a=1&b=2&c=3&d=4&e=5&f=6", DecodeOptions(parameter_limit=float("inf"))) == {
"a": "1",
"b": "2",
"c": "3",
"d": "4",
"e": "5",
"f": "6",
}


class TestListLimit:
def test_does_not_raise_error_when_within_list_limit(self) -> None:
assert decode("a[]=1&a[]=2&a[]=3", DecodeOptions(list_limit=5, raise_on_limit_exceeded=True)) == {
"a": ["1", "2", "3"],
}

def test_raises_error_when_list_limit_exceeded(self) -> None:
with pytest.raises(ValueError):
decode("a[]=1&a[]=2&a[]=3&a[]=4", DecodeOptions(list_limit=3, raise_on_limit_exceeded=True))

def test_converts_list_to_map_if_length_is_greater_than_limit(self) -> None:
assert decode("a[1]=1&a[2]=2&a[3]=3&a[4]=4&a[5]=5&a[6]=6", DecodeOptions(list_limit=5)) == {
"a": {"1": "1", "2": "2", "3": "3", "4": "4", "5": "5", "6": "6"}
}

def test_handles_list_limit_of_zero_correctly(self) -> None:
assert decode("a[]=1&a[]=2", DecodeOptions(list_limit=0)) == {"a": ["1", "2"]}

def test_handles_negative_list_limit_correctly(self) -> None:
with pytest.raises(ValueError):
decode("a[]=1&a[]=2", DecodeOptions(list_limit=-1, raise_on_limit_exceeded=True))

def test_applies_list_limit_to_nested_lists(self) -> None:
with pytest.raises(ValueError):
decode("a[0][]=1&a[0][]=2&a[0][]=3&a[0][]=4", DecodeOptions(list_limit=3, raise_on_limit_exceeded=True))
2 changes: 1 addition & 1 deletion tests/unit/example_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def test_lists(self):
# Note that the only difference between an index in a `list` and a key in a `dict` is that the value between the
# brackets must be a number to create a `list`. When creating `list`s with specific indices, **qs_codec** will compact
# a sparse `list` to only the existing values preserving their order:
assert qs_codec.decode("a[1]=b&a[15]=c") == {"a": ["b", "c"]}
assert qs_codec.decode("a[1]=b&a[15]=c") == {"a": {"1": "b", "15": "c"}}

# Note that an empty string is also a value, and will be preserved:
assert qs_codec.decode("a[]=&a[]=b") == {"a": ["", "b"]}
Expand Down