Module slack_sdk.scim.v1.internal_utils

Expand source code
import copy
import logging
import re
import sys
from typing import Dict, Callable
from typing import Union, Optional, Any
from urllib.parse import quote

from .default_arg import DefaultArg, NotGiven
from slack_sdk.web.internal_utils import get_user_agent


def _build_query(params: Optional[Dict[str, Any]]) -> str:
    if params is not None and len(params) > 0:
        return "&".join({f"{quote(str(k))}={quote(str(v))}" for k, v in params.items() if v is not None})
    return ""


def _is_iterable(obj: Union[Optional[Any], DefaultArg]) -> bool:
    return obj is not None and obj is not NotGiven


def _to_dict_without_not_given(obj: Any) -> dict:
    dict_value = {}
    given_dict = obj if isinstance(obj, dict) else vars(obj)
    for key, value in given_dict.items():
        if key == "unknown_fields":
            if value is not None:
                converted = _to_dict_without_not_given(value)
                dict_value.update(converted)
            continue

        dict_key = _to_camel_case_key(key)
        if value is NotGiven:
            continue
        if isinstance(value, list):
            dict_value[dict_key] = [elem.to_dict() if hasattr(elem, "to_dict") else elem for elem in value]
        elif isinstance(value, dict):
            dict_value[dict_key] = _to_dict_without_not_given(value)
        else:
            dict_value[dict_key] = value.to_dict() if hasattr(value, "to_dict") else value
    return dict_value


def _create_copy(original: Any) -> Any:
    if sys.version_info.major == 3 and sys.version_info.minor <= 6:
        return copy.copy(original)
    else:
        return copy.deepcopy(original)


def _to_camel_case_key(key: str) -> str:
    next_to_capital = False
    result = ""
    for c in key:
        if c == "_":
            next_to_capital = True
        elif next_to_capital:
            result += c.upper()
            next_to_capital = False
        else:
            result += c
    return result


def _to_snake_cased(original: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
    return _convert_dict_keys(
        original,
        {},
        lambda s: re.sub(
            "^_",
            "",
            "".join(["_" + c.lower() if c.isupper() else c for c in s]),
        ),
    )


def _to_camel_cased(original: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
    return _convert_dict_keys(
        original,
        {},
        _to_camel_case_key,
    )


def _convert_dict_keys(
    original_dict: Optional[Dict[str, Any]],
    result_dict: Dict[str, Any],
    convert: Callable[[str], str],
) -> Optional[Dict[str, Any]]:
    if original_dict is None:
        return result_dict

    for original_key, original_value in original_dict.items():
        new_key = convert(original_key)
        if isinstance(original_value, dict):
            result_dict[new_key] = {}
            new_value = _convert_dict_keys(original_value, result_dict[new_key], convert)
            result_dict[new_key] = new_value
        elif isinstance(original_value, list):
            result_dict[new_key] = []
            is_dict = len(original_value) > 0 and isinstance(original_value[0], dict)
            for element in original_value:
                if is_dict:
                    if isinstance(element, dict):
                        new_element = {}
                        for elem_key, elem_value in element.items():
                            new_element[convert(elem_key)] = (
                                _convert_dict_keys(elem_value, {}, convert)
                                if isinstance(elem_value, dict)
                                else _create_copy(elem_value)
                            )
                        result_dict[new_key].append(new_element)
                else:
                    result_dict[new_key].append(_create_copy(original_value))
        else:
            result_dict[new_key] = _create_copy(original_value)
    return result_dict


def _build_request_headers(
    token: str,
    default_headers: Dict[str, str],
    additional_headers: Optional[Dict[str, str]],
) -> Dict[str, str]:
    request_headers = {
        "Content-Type": "application/json;charset=utf-8",
        "Authorization": f"Bearer {token}",
    }
    if default_headers is None or "User-Agent" not in default_headers:
        request_headers["User-Agent"] = get_user_agent()
    if default_headers is not None:
        request_headers.update(default_headers)
    if additional_headers is not None:
        request_headers.update(additional_headers)
    return request_headers


def _debug_log_response(  # type: ignore
    logger,
    resp: "SCIMResponse",  # noqa: F821
) -> None:
    if logger.level <= logging.DEBUG:
        logger.debug(
            "Received the following response - "
            f"status: {resp.status_code}, "
            f"headers: {(dict(resp.headers))}, "
            f"body: {resp.raw_body}"
        )