'''
Utils
=====
.. autofunction:: check
.. autofunction:: dict_clean
.. autofunction:: dict_flatten
.. autofunction:: dict_merge
.. autofunction:: force_case
.. autofunction:: trunc
.. autofunction:: url_validator
'''
from collections.abc import MutableMapping
from typing import List, Optional, Any, Dict
from urllib.parse import urlparse
from copy import copy
import re
import arrow
from requests import Response
from box import Box, BoxList
from .errors import UnexpectedValueError
def format_json_response(response: Response,
box_attrs: Optional[Dict] = None,
conv_json: bool = True,
conv_box: bool = True,
):
'''
A simple utility to handle formatting the response object into either a
Box object or a Python native object from the JSON response. The function
will prefer box over python native if both flags are set to true. If none
of the flags are true, or if the content-type header reports as something
other than "applicagion/json", then the response object is instead
returned.
Args:
response:
The response object that will be checked against.
box_attrs:
The optional box attributed to pass as part of instantiation.
conv_json:
A flag handling if we should run the JSON conversion to python
native datatypes.
conv_box:
A flaghandling if we should convert the data to a Box object.
Returns:
box.Box:
If the conv_box flag is True, and the response is a single object,
then the response is a Box obj.
box.BoxList:
If the conv_box flag is True, and the response is a list of
objects, then the response is a BoxList obj.
dict:
If the conv_json flag is True and the conv_box is False, and the
response is a single object, then the response is a dict obj.
list:
If the conv_json flag is True and conv_box is False, and the
response is a list of objects, then the response is a list obj.
requests.Response:
If neither flag is True, or if the response isn't JSON data, then
a response object is returned (pass-through).
'''
content_type = response.headers.get('content-type', 'application/json')
if ((conv_json or conv_box)
and 'application/json' in content_type.lower()
and len(response.text) > 0
): # noqa: E124
if conv_box:
data = response.json()
if isinstance(data, list):
return BoxList(data, **box_attrs)
elif isinstance(data, dict):
return Box(data, **box_attrs)
elif conv_json:
return response.json()
return response
[docs]def url_validator(
url: str,
validate: Optional[List[str]] = None
) -> bool:
'''
Validates that the required URL Parts exist within the URL string.
Args:
url (string):
The URL to process.
validate (list[str], optional):
The URL parts to validate are non-empty.
Examples:
>>> url_validator('https://google.com') # Returns True
>>> url_validator('google.com') #Returns False
>>> url_validator(
... 'https://httpbin.com/404',
... validate=['scheme', 'netloc', 'path'])
# Returns True
'''
if not validate:
validate = ['scheme', 'netloc']
resp = urlparse(url)._asdict()
for val in validate:
if val not in resp or resp[val] == '':
return False
return True
[docs]def dict_flatten(
dct: dict,
parent_key: Optional[str] = '',
sep: Optional[str] = '.'
) -> dict:
'''
Flattens a nested dict.
Args:
d (dict):
The dictionary to flatten
sep (str, optional):
The separation character. If left unspecified, the default is '.'.
Examples:
>>> x = {'a': 1, 'b': {'c': 2}}
>>> dict_flatten(x)
{'a': 1, 'b.c': 2}
Shamelessly ripped from `this <https://stackoverflow.com/a/6027615>`_
Stackoverflow answer.
'''
items = []
for key, val in dct.items():
new_key = parent_key + sep + key if parent_key else key
if isinstance(val, MutableMapping):
items.extend(flatten(val, new_key, sep=sep).items())
else:
items.append((new_key, val))
return dict(items)
# backwards compat with pre 1.2.
flatten = dict_flatten
[docs]def dict_clean(dct: dict) -> dict:
'''
Recursively removes dictionary keys where the value is None
Args:
d (dict):
The dictionary to clean
Returns:
:obj:`dict`:
The cleaned dictionary
Examples:
>>> x = {'a': 1, 'b': {'c': 2, 'd': None}, 'e': None}
>>> clean_dict(x)
{'a': 1, 'b': {'c': 2}}
'''
clean = {}
for key, value in dct.items():
# if the value is a dictionary, then we will recursively clean.
if isinstance(value, dict):
new_value = dict_clean(value)
if len(new_value.keys()) > 0:
clean[key] = new_value
# if the value is a list, we will check for any dictionaries within
# the list and recursively clean.
elif isinstance(value, list):
new_value = []
for item in value:
if isinstance(item, dict):
new_item = dict_clean(item)
if len(new_item.keys()) > 0:
new_value.append(new_item)
else:
new_value.append(item)
clean[key] = new_value
# if the value isn't None, then store the value under the key.
elif value is not None:
clean[key] = value
return clean
[docs]def dict_merge(
master: dict,
*updates: dict
) -> dict:
'''
Merge many dictionaries together The updates dictionaries will be merged
into sthe master, adding/updating any values as needed.
Args:
master (dict):
The master dictionary to be used as the base.
*updates (list[dict]):
The dictionaries that will overload the values in the master.
Returns:
:obj:`dict`:
The merged dictionary
Examples:
>>> a = {'one': 1, 'two': 2, 'three': {'four': 4}}
>>> b = {'a': 'a', 'three': {'b': 'b'}}
>>> dict_merge(a, b)
{'a': 'a', 'one': 1, 'two': 2, 'three': {'b': b, 'four': 4}}
'''
for update in updates:
for key in update:
if (key in master and isinstance(master[key], dict)
and isinstance(update[key], dict)):
master[key] = dict_merge(master[key], update[key])
else:
master[key] = update[key]
return master
[docs]def force_case(obj: Any, case: str) -> Any:
'''
A simple case enforcement function.
Args:
obj (Object): object to attempt to enforce the case upon.
Returns:
:obj:`obj`:
The modified object
Examples:
A list of mixed types:
>>> a = ['a', 'list', 'of', 'strings', 'with', 'a', 1]
>>> force_Case(a, 'upper')
['A', 'LIST', 'OF', 'STRINGS', 'WITH', 'A', 1]
A simple string:
>>> force_case('This is a TEST', 'lower')
'this is a test'
A non-string item that'll pass through:
>>> force_case(1, 'upper')
1
'''
if case == 'lower':
if isinstance(obj, list): # noqa: PLR1705
return [i.lower() for i in obj if isinstance(i, str)]
elif isinstance(obj, str):
return obj.lower()
elif case == 'upper':
if isinstance(obj, list): # noqa: PLR1705
return [i.upper() for i in obj if isinstance(i, str)]
elif isinstance(obj, str):
return obj.upper()
return obj
def redact_values(
obj: dict,
keys: Optional[list] = None,
value: str = 'REDACTED'
) -> dict:
'''
Redacts the values of the keys specified. Useful in logging so that
sensitive fields are not presented to the logs.
Args:
obj (dict):
The object upon which redaction will happen.
keys (list[str], optional):
The list of key names that should be redacted.
value (str, optional):
The redacted value to use in place of the sensitive information.
Returns:
:obj:`obj`:
The modified object.
'''
if not keys:
keys = []
new = copy(obj)
for key in new:
if isinstance(new[key], dict):
new[key] = redact_values(new[key], keys=keys)
elif key in keys:
new[key] = value
return new
[docs]def trunc(
text: str,
limit: int,
suffix: Optional[str] = '...'
) -> str:
'''
Truncates a string to a given number of characters. If a string extends
beyond the limit, then truncate and add an ellipses after the truncation.
Args:
text (str): The string to truncate
limit (int): The maximum limit that the string can be.
suffix (str):
What suffix should be appended to the truncated string when we
truncate? If left unspecified, it will default to ``...``.
Returns:
:obj:`str`:
The truncated string
Examples:
A simple truncation:
>>> trunc('this is a test', 6)
'thi...'
Truncating with no suffix:
>>> trunc('this is a test', 6, suffix=None)
'this i'
Truncating with a custom suffix:
>>> trunc('this is a test', 6, suffix='->')
'this->'
'''
if len(text) >= limit:
if isinstance(suffix, str): # noqa: PLR1705
# If we have a suffix, then reduce the text string length further
# by the length of the suffix and then concatenate both the text
# and suffix together.
return f'{text[:limit - len(suffix)]}{suffix}'
else:
# If no suffix, then simply reduce the string size.
return text[:limit]
return text
[docs]def check( # noqa: C901
name: str,
obj: Any,
expected_type: Any,
**kwargs
) -> Any:
'''
Check function for validating that inputs we are receiving are of the right
type, have the expected values, and can handle defaults as necessary.
Args:
name (str): The name of the object (for exception reporting)
obj (obj): The object that we will be checking
expected_type (type):
The expected type of object that we will check against.
choices (list, optional):
if the object is only expected to have a finite number of values
then we can check to make sure that our input is one of these
values.
default (obj, optional):
if we want to return a default setting if the object is None,
we can set one here.
case (str, optional):
if we want to force the object values to be upper or lower case,
then we will want to set this to either ``upper`` or ``lower``
depending on the desired outcome. The returned object will then
also be in the specified case.
pattern (str, optional):
Specify a regex pattern from the pattern map variable.
pattern_map (dict, optional):
Any additional items to add to the pattern mapping.
regex (str, optional):
Validate that the value of the object matches this pattern.
items_type (type, optional):
If the expected type is an iterable, and if all of the items
within that iterable are expected to be a given type, then
specifying the type here will enable checking each item within
the iterable.
NOTE: this will traverse the iterable and return a list object.
softcheck (bool, optional):
If the variable is a string type
Returns:
:obj:`Object`:
Either the object or the default object depending.
Examples:
Ensure that the value is an integer type:
>>> check('example', val, int)
Ensure that the value of val is within 0 and 100:
>>> check('example', val, int, choices=list(range(100)))
'''
def validate_regex_pattern(regex, obj):
if (isinstance(obj, str)
and len(re.findall(regex, str(obj))) <= 0):
raise UnexpectedValueError(
f'{name} has value of {obj}. Does not match pattern {regex}'
)
def validate_choice_list(choices, obj):
if obj not in choices:
raise UnexpectedValueError((
f'{name} has value of {obj}. Expected one of '
f'{",".join([str(i) for i in choices])}'
))
def validate_expected_type(expected, obj, softcheck=True):
# We need to conditionally set the expected nametype local var based
# on if the expected type has a __name__ attribute.
if hasattr(expected, '__name__'):
exp = expected_type.__name__
else:
exp = expected
if isinstance(obj, expected): # noqa: PLR1705
# if everything matches, then just return the object
return obj
elif expected == arrow.Arrow:
return arrow.get(obj)
elif ((softcheck and isinstance(obj, str)
and expected not in [list, tuple])):
# if the expected type is not a list or tuple and it is a
# string type, then we will attempt to recast the object
# to be the expected type.
try:
new_obj = expected(obj)
except Exception:
# if the recasting fails, then just pass through.
raise TypeError(( # noqa: PLW0707
f'{name} is of type {obj.__class__.__name__}. '
f'Expected {exp}'
))
else:
if expected == bool:
# if the expected type was boolean, then we will
# want to ensure that the string is one of the
# allowed values. From there we will set the
# object to be either True or False. in either case
# we will also want to make sure to set the
# type_pass flag to ensure we don't raise a
# TypeError later on.
if obj.lower() in ['true', 'false', 'yes', 'no']:
return obj.lower() in ['true', 'yes']
else:
# In every other case, just set the object to be the
# recasted object and set the type_pass flag.
return new_obj
raise TypeError((
f'{name} is of type {obj.__class__.__name__}. Expected {exp}'
))
def validate_normalized(obj, func, arg):
if isinstance(obj, (list, tuple)):
# If the object is a list or tuple type, then lets ensure that
# all of the items within the obj .
for item in obj:
func(arg, item)
else:
func(arg, obj)
pmap = dict_merge({
'uuid': (r'^[0-9a-f]{8}-'
r'[0-9a-f]{4}-'
r'[0-9a-f]{4}-'
r'[0-9a-f]{4}-'
r'[0-9a-f]{12}$'
),
'email': r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$',
'hex': r'^[a-fA-f0-9]+$',
'url': (r'^(https?:\/\/)?'
r'([\da-z\.-]+)\.'
r'([a-z\.]{2,6})([\/\w \.-]*)*\/?$'
),
'ipv4': r'^([0-9]{1,3}\.){3}[0-9]{1,3}(\/([0-9]|[1-2][0-9]|3[0-2]))?$',
'ipv6': (r'(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|'
r'([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:'
r'[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}'
r'(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}'
r'(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}'
r'(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}'
r'(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:'
r'((:[0-9a-fA-F]{1,4}){1,6})|:'
r'((:[0-9a-fA-F]{1,4}){1,7}|:)|'
r'fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::'
r'(ffff(:0{1,4}){0,1}:){0,1}'
r'((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}'
r'(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|'
r'([0-9a-fA-F]{1,4}:){1,4}:'
r'((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}'
r'(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))'
),
}, kwargs.get('pattern_map', {}))
# We have a simple function to convert the case of string values so that
# we can ensure correct output.
# Convert the case of the inputs.
obj = force_case(obj, kwargs.get('case'))
kwargs['choices'] = force_case(kwargs.get('choices'), kwargs.get('case'))
kwargs['default'] = force_case(kwargs.get('default'), kwargs.get('case'))
# If the object sent to us has a None value, then we will return None.
# If a default was set, then we will return the default value.
allow_none = kwargs.get('allow_none', True)
if obj is None and allow_none: # noqa: PLR1705
return kwargs.get('default')
# If the allow_none keyword was passed and set to False, we should raise an
# unexpected value error if none was seen.
elif obj is None and not allow_none:
raise UnexpectedValueError(f'{name} has no value.')
# If the object is none of the right types then we want to raise a
# TypeError as it was something we weren't expecting.
obj = validate_expected_type(
expected_type, obj, kwargs.get('softcheck', True))
if kwargs.get('items_type'):
# If the items within the list should also be of a specific type,
# we can check those as well
lobj = []
for item in obj:
lobj.append(validate_expected_type(
kwargs.get('items_type'), item, kwargs.get('softcheck', True)))
obj = lobj
# if the object is only expected to have one of a finite set of values,
# we should check against that and raise an exception if the the actual
# value is outside of what we expect.
if kwargs.get('choices'):
validate_normalized(obj, validate_choice_list, kwargs.get('choices'))
# If a pattern was specified, then we will want to pull the pattern from
# the pattern map and validate that the
if kwargs.get('pattern') and kwargs.get('pattern') in pmap:
validate_normalized(obj, validate_regex_pattern,
pmap[kwargs.get('pattern')])
# If there wasn't a pattern matching that identifier, then throw an
# IndexError
elif kwargs.get('pattern') and kwargs.get('pattern') not in pmap.keys():
raise IndexError(
f'pattern name {kwargs.get("pattern")} not found in map'
)
# If a raw regex pattern was provided instead, then we will pass that over
# and validate
elif kwargs.get('regex'):
validate_normalized(obj, validate_regex_pattern, kwargs.get('regex'))
# if we made it this gauntlet without an exception being raised, then
# assume everything is good to go and return the object passed to us
# initially.
return obj