Module slack_sdk.scim.v1.async_client
Expand source code
import json
import logging
from ssl import SSLContext
from typing import Any, Union, List
from typing import Dict, Optional
from urllib.parse import quote
import aiohttp
from aiohttp import BasicAuth, ClientSession
from .internal_utils import (
_build_request_headers,
_debug_log_response,
get_user_agent,
_to_dict_without_not_given,
_build_query,
)
from .response import (
SCIMResponse,
SearchUsersResponse,
ReadUserResponse,
SearchGroupsResponse,
ReadGroupResponse,
UserCreateResponse,
UserPatchResponse,
UserUpdateResponse,
UserDeleteResponse,
GroupCreateResponse,
GroupPatchResponse,
GroupUpdateResponse,
GroupDeleteResponse,
)
from .user import User
from .group import Group
from ...proxy_env_variable_loader import load_http_proxy_from_env
from slack_sdk.http_retry.async_handler import AsyncRetryHandler
from slack_sdk.http_retry.builtin_async_handlers import async_default_handlers
from slack_sdk.http_retry.request import HttpRequest as RetryHttpRequest
from slack_sdk.http_retry.response import HttpResponse as RetryHttpResponse
from slack_sdk.http_retry.state import RetryState
class AsyncSCIMClient:
BASE_URL = "https://api.slack.com/scim/v1/"
token: str
timeout: int
ssl: Optional[SSLContext]
proxy: Optional[str]
base_url: str
session: Optional[ClientSession]
trust_env_in_session: bool
auth: Optional[BasicAuth]
default_headers: Dict[str, str]
logger: logging.Logger
retry_handlers: List[AsyncRetryHandler]
def __init__(
self,
token: str,
timeout: int = 30,
ssl: Optional[SSLContext] = None,
proxy: Optional[str] = None,
base_url: str = BASE_URL,
session: Optional[ClientSession] = None,
trust_env_in_session: bool = False,
auth: Optional[BasicAuth] = None,
default_headers: Optional[Dict[str, str]] = None,
user_agent_prefix: Optional[str] = None,
user_agent_suffix: Optional[str] = None,
logger: Optional[logging.Logger] = None,
retry_handlers: Optional[List[AsyncRetryHandler]] = None,
):
"""API client for SCIM API
See https://api.slack.com/scim for more details
Args:
token: An admin user's token, which starts with `xoxp-`
timeout: Request timeout (in seconds)
ssl: `ssl.SSLContext` to use for requests
proxy: Proxy URL (e.g., `localhost:9000`, `http://localhost:9000`)
base_url: The base URL for API calls
session: `aiohttp.ClientSession` instance
trust_env_in_session: True/False for `aiohttp.ClientSession`
auth: Basic auth info for `aiohttp.ClientSession`
default_headers: Request headers to add to all requests
user_agent_prefix: Prefix for User-Agent header value
user_agent_suffix: Suffix for User-Agent header value
logger: Custom logger
retry_handlers: Retry handlers
"""
self.token = token
self.timeout = timeout
self.ssl = ssl
self.proxy = proxy
self.base_url = base_url
self.session = session
self.trust_env_in_session = trust_env_in_session
self.auth = auth
self.default_headers = default_headers if default_headers else {}
self.default_headers["User-Agent"] = get_user_agent(user_agent_prefix, user_agent_suffix)
self.logger = logger if logger is not None else logging.getLogger(__name__)
self.retry_handlers = retry_handlers if retry_handlers is not None else async_default_handlers()
if self.proxy is None or len(self.proxy.strip()) == 0:
env_variable = load_http_proxy_from_env(self.logger)
if env_variable is not None:
self.proxy = env_variable
# -------------------------
# Users
# -------------------------
async def search_users(
self,
*,
# Pagination required as of August 30, 2019.
count: int,
start_index: int,
filter: Optional[str] = None,
) -> SearchUsersResponse:
return SearchUsersResponse(
await self.api_call(
http_verb="GET",
path="Users",
query_params={
"filter": filter,
"count": count,
"startIndex": start_index,
},
)
)
async def read_user(self, id: str) -> ReadUserResponse:
return ReadUserResponse(await self.api_call(http_verb="GET", path=f"Users/{quote(id)}"))
async def create_user(self, user: Union[Dict[str, Any], User]) -> UserCreateResponse:
return UserCreateResponse(
await self.api_call(
http_verb="POST",
path="Users",
body_params=user.to_dict() if isinstance(user, User) else _to_dict_without_not_given(user),
)
)
async def patch_user(self, id: str, partial_user: Union[Dict[str, Any], User]) -> UserPatchResponse:
return UserPatchResponse(
await self.api_call(
http_verb="PATCH",
path=f"Users/{quote(id)}",
body_params=partial_user.to_dict()
if isinstance(partial_user, User)
else _to_dict_without_not_given(partial_user),
)
)
async def update_user(self, user: Union[Dict[str, Any], User]) -> UserUpdateResponse:
user_id = user.id if isinstance(user, User) else user["id"]
return UserUpdateResponse(
await self.api_call(
http_verb="PUT",
path=f"Users/{quote(user_id)}",
body_params=user.to_dict() if isinstance(user, User) else _to_dict_without_not_given(user),
)
)
async def delete_user(self, id: str) -> UserDeleteResponse:
return UserDeleteResponse(
await self.api_call(
http_verb="DELETE",
path=f"Users/{quote(id)}",
)
)
# -------------------------
# Groups
# -------------------------
async def search_groups(
self,
*,
# Pagination required as of August 30, 2019.
count: int,
start_index: int,
filter: Optional[str] = None,
) -> SearchGroupsResponse:
return SearchGroupsResponse(
await self.api_call(
http_verb="GET",
path="Groups",
query_params={
"filter": filter,
"count": count,
"startIndex": start_index,
},
)
)
async def read_group(self, id: str) -> ReadGroupResponse:
return ReadGroupResponse(await self.api_call(http_verb="GET", path=f"Groups/{quote(id)}"))
async def create_group(self, group: Union[Dict[str, Any], Group]) -> GroupCreateResponse:
return GroupCreateResponse(
await self.api_call(
http_verb="POST",
path="Groups",
body_params=group.to_dict() if isinstance(group, Group) else _to_dict_without_not_given(group),
)
)
async def patch_group(self, id: str, partial_group: Union[Dict[str, Any], Group]) -> GroupPatchResponse:
return GroupPatchResponse(
await self.api_call(
http_verb="PATCH",
path=f"Groups/{quote(id)}",
body_params=partial_group.to_dict()
if isinstance(partial_group, Group)
else _to_dict_without_not_given(partial_group),
)
)
async def update_group(self, group: Union[Dict[str, Any], Group]) -> GroupUpdateResponse:
group_id = group.id if isinstance(group, Group) else group["id"]
return GroupUpdateResponse(
await self.api_call(
http_verb="PUT",
path=f"Groups/{quote(group_id)}",
body_params=group.to_dict() if isinstance(group, Group) else _to_dict_without_not_given(group),
)
)
async def delete_group(self, id: str) -> GroupDeleteResponse:
return GroupDeleteResponse(
await self.api_call(
http_verb="DELETE",
path=f"Groups/{quote(id)}",
)
)
# -------------------------
async def api_call(
self,
*,
http_verb: str,
path: str,
query_params: Optional[Dict[str, Any]] = None,
body_params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
) -> SCIMResponse:
url = f"{self.base_url}{path}"
query = _build_query(query_params)
if len(query) > 0:
url += f"?{query}"
return await self._perform_http_request(
http_verb=http_verb,
url=url,
body_params=body_params,
headers=_build_request_headers(
token=self.token,
default_headers=self.default_headers,
additional_headers=headers,
),
)
async def _perform_http_request(
self,
*,
http_verb: str,
url: str,
body_params: Optional[Dict[str, Any]],
headers: Dict[str, str],
) -> SCIMResponse:
if body_params is not None:
if body_params.get("schemas") is None:
body_params["schemas"] = ["urn:scim:schemas:core:1.0"]
body_params = json.dumps(body_params)
headers["Content-Type"] = "application/json;charset=utf-8"
session: Optional[ClientSession] = None
use_running_session = self.session and not self.session.closed
if use_running_session:
session = self.session
else:
session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self.timeout),
auth=self.auth,
trust_env=self.trust_env_in_session,
)
last_error: Optional[Exception] = None
resp: Optional[SCIMResponse] = None
try:
request_kwargs = {
"headers": headers,
"data": body_params,
"ssl": self.ssl,
"proxy": self.proxy,
}
retry_request = RetryHttpRequest(
method=http_verb,
url=url,
headers=headers,
body_params=body_params,
)
retry_state = RetryState()
counter_for_safety = 0
while counter_for_safety < 100:
counter_for_safety += 1
# If this is a retry, the next try started here. We can reset the flag.
retry_state.next_attempt_requested = False
retry_response: Optional[RetryHttpResponse] = None
response_body = ""
if self.logger.level <= logging.DEBUG:
headers_for_logging = {
k: "(redacted)" if k.lower() == "authorization" else v for k, v in headers.items()
}
self.logger.debug(
f"Sending a request - url: {url}, params: {body_params}, headers: {headers_for_logging}"
)
try:
async with session.request(http_verb, url, **request_kwargs) as res:
try:
response_body = await res.text()
retry_response = RetryHttpResponse(
status_code=res.status,
headers=res.headers,
data=response_body.encode("utf-8") if response_body is not None else None,
)
except aiohttp.ContentTypeError:
self.logger.debug(f"No response data returned from the following API call: {url}.")
retry_response = RetryHttpResponse(
status_code=res.status,
headers=res.headers,
)
if res.status == 429:
for handler in self.retry_handlers:
if await handler.can_retry_async(
state=retry_state,
request=retry_request,
response=retry_response,
):
if self.logger.level <= logging.DEBUG:
self.logger.info(
f"A retry handler found: {type(handler).__name__} "
f"for {http_verb} {url} - rate_limited"
)
await handler.prepare_for_next_attempt_async(
state=retry_state,
request=retry_request,
response=retry_response,
)
break
if retry_state.next_attempt_requested is False:
resp = SCIMResponse(
url=url,
status_code=res.status,
raw_body=response_body,
headers=res.headers,
)
_debug_log_response(self.logger, resp)
return resp
except Exception as e:
last_error = e
for handler in self.retry_handlers:
if await handler.can_retry_async(
state=retry_state,
request=retry_request,
response=retry_response,
error=e,
):
if self.logger.level <= logging.DEBUG:
self.logger.info(
f"A retry handler found: {type(handler).__name__} " f"for {http_verb} {url} - {e}"
)
await handler.prepare_for_next_attempt_async(
state=retry_state,
request=retry_request,
response=retry_response,
error=e,
)
break
if retry_state.next_attempt_requested is False:
raise last_error
if resp is not None:
return resp
raise last_error
finally:
if not use_running_session:
await session.close()
return resp
Classes
class AsyncSCIMClient (token: str, timeout: int = 30, ssl: Optional[ssl.SSLContext] = None, proxy: Optional[str] = None, base_url: str = 'https://api.slack.com/scim/v1/', session: Optional[aiohttp.client.ClientSession] = None, trust_env_in_session: bool = False, auth: Optional[aiohttp.helpers.BasicAuth] = None, default_headers: Optional[Dict[str, str]] = None, user_agent_prefix: Optional[str] = None, user_agent_suffix: Optional[str] = None, logger: Optional[logging.Logger] = None, retry_handlers: Optional[List[slack_sdk.http_retry.async_handler.AsyncRetryHandler]] = None)
-
API client for SCIM API See https://api.slack.com/scim for more details
Args
token
- An admin user's token, which starts with
xoxp-
timeout
- Request timeout (in seconds)
ssl
ssl.SSLContext
to use for requestsproxy
- Proxy URL (e.g.,
localhost:9000
,http://localhost:9000
) base_url
- The base URL for API calls
session
aiohttp.ClientSession
instancetrust_env_in_session
- True/False for
aiohttp.ClientSession
auth
- Basic auth info for
aiohttp.ClientSession
default_headers
- Request headers to add to all requests
user_agent_prefix
- Prefix for User-Agent header value
user_agent_suffix
- Suffix for User-Agent header value
logger
- Custom logger
retry_handlers
- Retry handlers
Expand source code
class AsyncSCIMClient: BASE_URL = "https://api.slack.com/scim/v1/" token: str timeout: int ssl: Optional[SSLContext] proxy: Optional[str] base_url: str session: Optional[ClientSession] trust_env_in_session: bool auth: Optional[BasicAuth] default_headers: Dict[str, str] logger: logging.Logger retry_handlers: List[AsyncRetryHandler] def __init__( self, token: str, timeout: int = 30, ssl: Optional[SSLContext] = None, proxy: Optional[str] = None, base_url: str = BASE_URL, session: Optional[ClientSession] = None, trust_env_in_session: bool = False, auth: Optional[BasicAuth] = None, default_headers: Optional[Dict[str, str]] = None, user_agent_prefix: Optional[str] = None, user_agent_suffix: Optional[str] = None, logger: Optional[logging.Logger] = None, retry_handlers: Optional[List[AsyncRetryHandler]] = None, ): """API client for SCIM API See https://api.slack.com/scim for more details Args: token: An admin user's token, which starts with `xoxp-` timeout: Request timeout (in seconds) ssl: `ssl.SSLContext` to use for requests proxy: Proxy URL (e.g., `localhost:9000`, `http://localhost:9000`) base_url: The base URL for API calls session: `aiohttp.ClientSession` instance trust_env_in_session: True/False for `aiohttp.ClientSession` auth: Basic auth info for `aiohttp.ClientSession` default_headers: Request headers to add to all requests user_agent_prefix: Prefix for User-Agent header value user_agent_suffix: Suffix for User-Agent header value logger: Custom logger retry_handlers: Retry handlers """ self.token = token self.timeout = timeout self.ssl = ssl self.proxy = proxy self.base_url = base_url self.session = session self.trust_env_in_session = trust_env_in_session self.auth = auth self.default_headers = default_headers if default_headers else {} self.default_headers["User-Agent"] = get_user_agent(user_agent_prefix, user_agent_suffix) self.logger = logger if logger is not None else logging.getLogger(__name__) self.retry_handlers = retry_handlers if retry_handlers is not None else async_default_handlers() if self.proxy is None or len(self.proxy.strip()) == 0: env_variable = load_http_proxy_from_env(self.logger) if env_variable is not None: self.proxy = env_variable # ------------------------- # Users # ------------------------- async def search_users( self, *, # Pagination required as of August 30, 2019. count: int, start_index: int, filter: Optional[str] = None, ) -> SearchUsersResponse: return SearchUsersResponse( await self.api_call( http_verb="GET", path="Users", query_params={ "filter": filter, "count": count, "startIndex": start_index, }, ) ) async def read_user(self, id: str) -> ReadUserResponse: return ReadUserResponse(await self.api_call(http_verb="GET", path=f"Users/{quote(id)}")) async def create_user(self, user: Union[Dict[str, Any], User]) -> UserCreateResponse: return UserCreateResponse( await self.api_call( http_verb="POST", path="Users", body_params=user.to_dict() if isinstance(user, User) else _to_dict_without_not_given(user), ) ) async def patch_user(self, id: str, partial_user: Union[Dict[str, Any], User]) -> UserPatchResponse: return UserPatchResponse( await self.api_call( http_verb="PATCH", path=f"Users/{quote(id)}", body_params=partial_user.to_dict() if isinstance(partial_user, User) else _to_dict_without_not_given(partial_user), ) ) async def update_user(self, user: Union[Dict[str, Any], User]) -> UserUpdateResponse: user_id = user.id if isinstance(user, User) else user["id"] return UserUpdateResponse( await self.api_call( http_verb="PUT", path=f"Users/{quote(user_id)}", body_params=user.to_dict() if isinstance(user, User) else _to_dict_without_not_given(user), ) ) async def delete_user(self, id: str) -> UserDeleteResponse: return UserDeleteResponse( await self.api_call( http_verb="DELETE", path=f"Users/{quote(id)}", ) ) # ------------------------- # Groups # ------------------------- async def search_groups( self, *, # Pagination required as of August 30, 2019. count: int, start_index: int, filter: Optional[str] = None, ) -> SearchGroupsResponse: return SearchGroupsResponse( await self.api_call( http_verb="GET", path="Groups", query_params={ "filter": filter, "count": count, "startIndex": start_index, }, ) ) async def read_group(self, id: str) -> ReadGroupResponse: return ReadGroupResponse(await self.api_call(http_verb="GET", path=f"Groups/{quote(id)}")) async def create_group(self, group: Union[Dict[str, Any], Group]) -> GroupCreateResponse: return GroupCreateResponse( await self.api_call( http_verb="POST", path="Groups", body_params=group.to_dict() if isinstance(group, Group) else _to_dict_without_not_given(group), ) ) async def patch_group(self, id: str, partial_group: Union[Dict[str, Any], Group]) -> GroupPatchResponse: return GroupPatchResponse( await self.api_call( http_verb="PATCH", path=f"Groups/{quote(id)}", body_params=partial_group.to_dict() if isinstance(partial_group, Group) else _to_dict_without_not_given(partial_group), ) ) async def update_group(self, group: Union[Dict[str, Any], Group]) -> GroupUpdateResponse: group_id = group.id if isinstance(group, Group) else group["id"] return GroupUpdateResponse( await self.api_call( http_verb="PUT", path=f"Groups/{quote(group_id)}", body_params=group.to_dict() if isinstance(group, Group) else _to_dict_without_not_given(group), ) ) async def delete_group(self, id: str) -> GroupDeleteResponse: return GroupDeleteResponse( await self.api_call( http_verb="DELETE", path=f"Groups/{quote(id)}", ) ) # ------------------------- async def api_call( self, *, http_verb: str, path: str, query_params: Optional[Dict[str, Any]] = None, body_params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, ) -> SCIMResponse: url = f"{self.base_url}{path}" query = _build_query(query_params) if len(query) > 0: url += f"?{query}" return await self._perform_http_request( http_verb=http_verb, url=url, body_params=body_params, headers=_build_request_headers( token=self.token, default_headers=self.default_headers, additional_headers=headers, ), ) async def _perform_http_request( self, *, http_verb: str, url: str, body_params: Optional[Dict[str, Any]], headers: Dict[str, str], ) -> SCIMResponse: if body_params is not None: if body_params.get("schemas") is None: body_params["schemas"] = ["urn:scim:schemas:core:1.0"] body_params = json.dumps(body_params) headers["Content-Type"] = "application/json;charset=utf-8" session: Optional[ClientSession] = None use_running_session = self.session and not self.session.closed if use_running_session: session = self.session else: session = aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=self.timeout), auth=self.auth, trust_env=self.trust_env_in_session, ) last_error: Optional[Exception] = None resp: Optional[SCIMResponse] = None try: request_kwargs = { "headers": headers, "data": body_params, "ssl": self.ssl, "proxy": self.proxy, } retry_request = RetryHttpRequest( method=http_verb, url=url, headers=headers, body_params=body_params, ) retry_state = RetryState() counter_for_safety = 0 while counter_for_safety < 100: counter_for_safety += 1 # If this is a retry, the next try started here. We can reset the flag. retry_state.next_attempt_requested = False retry_response: Optional[RetryHttpResponse] = None response_body = "" if self.logger.level <= logging.DEBUG: headers_for_logging = { k: "(redacted)" if k.lower() == "authorization" else v for k, v in headers.items() } self.logger.debug( f"Sending a request - url: {url}, params: {body_params}, headers: {headers_for_logging}" ) try: async with session.request(http_verb, url, **request_kwargs) as res: try: response_body = await res.text() retry_response = RetryHttpResponse( status_code=res.status, headers=res.headers, data=response_body.encode("utf-8") if response_body is not None else None, ) except aiohttp.ContentTypeError: self.logger.debug(f"No response data returned from the following API call: {url}.") retry_response = RetryHttpResponse( status_code=res.status, headers=res.headers, ) if res.status == 429: for handler in self.retry_handlers: if await handler.can_retry_async( state=retry_state, request=retry_request, response=retry_response, ): if self.logger.level <= logging.DEBUG: self.logger.info( f"A retry handler found: {type(handler).__name__} " f"for {http_verb} {url} - rate_limited" ) await handler.prepare_for_next_attempt_async( state=retry_state, request=retry_request, response=retry_response, ) break if retry_state.next_attempt_requested is False: resp = SCIMResponse( url=url, status_code=res.status, raw_body=response_body, headers=res.headers, ) _debug_log_response(self.logger, resp) return resp except Exception as e: last_error = e for handler in self.retry_handlers: if await handler.can_retry_async( state=retry_state, request=retry_request, response=retry_response, error=e, ): if self.logger.level <= logging.DEBUG: self.logger.info( f"A retry handler found: {type(handler).__name__} " f"for {http_verb} {url} - {e}" ) await handler.prepare_for_next_attempt_async( state=retry_state, request=retry_request, response=retry_response, error=e, ) break if retry_state.next_attempt_requested is False: raise last_error if resp is not None: return resp raise last_error finally: if not use_running_session: await session.close() return resp
Class variables
var BASE_URL
var auth : Optional[aiohttp.helpers.BasicAuth]
var base_url : str
var default_headers : Dict[str, str]
var logger : logging.Logger
var proxy : Optional[str]
var retry_handlers : List[slack_sdk.http_retry.async_handler.AsyncRetryHandler]
var session : Optional[aiohttp.client.ClientSession]
var ssl : Optional[ssl.SSLContext]
var timeout : int
var token : str
var trust_env_in_session : bool
Methods
async def api_call(self, *, http_verb: str, path: str, query_params: Optional[Dict[str, Any]] = None, body_params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None) ‑> SCIMResponse
-
Expand source code
async def api_call( self, *, http_verb: str, path: str, query_params: Optional[Dict[str, Any]] = None, body_params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, ) -> SCIMResponse: url = f"{self.base_url}{path}" query = _build_query(query_params) if len(query) > 0: url += f"?{query}" return await self._perform_http_request( http_verb=http_verb, url=url, body_params=body_params, headers=_build_request_headers( token=self.token, default_headers=self.default_headers, additional_headers=headers, ), )
async def create_group(self, group: Union[Dict[str, Any], Group]) ‑> GroupCreateResponse
-
Expand source code
async def create_group(self, group: Union[Dict[str, Any], Group]) -> GroupCreateResponse: return GroupCreateResponse( await self.api_call( http_verb="POST", path="Groups", body_params=group.to_dict() if isinstance(group, Group) else _to_dict_without_not_given(group), ) )
async def create_user(self, user: Union[Dict[str, Any], User]) ‑> UserCreateResponse
-
Expand source code
async def create_user(self, user: Union[Dict[str, Any], User]) -> UserCreateResponse: return UserCreateResponse( await self.api_call( http_verb="POST", path="Users", body_params=user.to_dict() if isinstance(user, User) else _to_dict_without_not_given(user), ) )
async def delete_group(self, id: str) ‑> GroupDeleteResponse
-
Expand source code
async def delete_group(self, id: str) -> GroupDeleteResponse: return GroupDeleteResponse( await self.api_call( http_verb="DELETE", path=f"Groups/{quote(id)}", ) )
async def delete_user(self, id: str) ‑> UserDeleteResponse
-
Expand source code
async def delete_user(self, id: str) -> UserDeleteResponse: return UserDeleteResponse( await self.api_call( http_verb="DELETE", path=f"Users/{quote(id)}", ) )
async def patch_group(self, id: str, partial_group: Union[Dict[str, Any], Group]) ‑> GroupPatchResponse
-
Expand source code
async def patch_group(self, id: str, partial_group: Union[Dict[str, Any], Group]) -> GroupPatchResponse: return GroupPatchResponse( await self.api_call( http_verb="PATCH", path=f"Groups/{quote(id)}", body_params=partial_group.to_dict() if isinstance(partial_group, Group) else _to_dict_without_not_given(partial_group), ) )
async def patch_user(self, id: str, partial_user: Union[Dict[str, Any], User]) ‑> UserPatchResponse
-
Expand source code
async def patch_user(self, id: str, partial_user: Union[Dict[str, Any], User]) -> UserPatchResponse: return UserPatchResponse( await self.api_call( http_verb="PATCH", path=f"Users/{quote(id)}", body_params=partial_user.to_dict() if isinstance(partial_user, User) else _to_dict_without_not_given(partial_user), ) )
async def read_group(self, id: str) ‑> ReadGroupResponse
-
Expand source code
async def read_group(self, id: str) -> ReadGroupResponse: return ReadGroupResponse(await self.api_call(http_verb="GET", path=f"Groups/{quote(id)}"))
async def read_user(self, id: str) ‑> ReadUserResponse
-
Expand source code
async def read_user(self, id: str) -> ReadUserResponse: return ReadUserResponse(await self.api_call(http_verb="GET", path=f"Users/{quote(id)}"))
async def search_groups(self, *, count: int, start_index: int, filter: Optional[str] = None) ‑> SearchGroupsResponse
-
Expand source code
async def search_groups( self, *, # Pagination required as of August 30, 2019. count: int, start_index: int, filter: Optional[str] = None, ) -> SearchGroupsResponse: return SearchGroupsResponse( await self.api_call( http_verb="GET", path="Groups", query_params={ "filter": filter, "count": count, "startIndex": start_index, }, ) )
async def search_users(self, *, count: int, start_index: int, filter: Optional[str] = None) ‑> SearchUsersResponse
-
Expand source code
async def search_users( self, *, # Pagination required as of August 30, 2019. count: int, start_index: int, filter: Optional[str] = None, ) -> SearchUsersResponse: return SearchUsersResponse( await self.api_call( http_verb="GET", path="Users", query_params={ "filter": filter, "count": count, "startIndex": start_index, }, ) )
async def update_group(self, group: Union[Dict[str, Any], Group]) ‑> GroupUpdateResponse
-
Expand source code
async def update_group(self, group: Union[Dict[str, Any], Group]) -> GroupUpdateResponse: group_id = group.id if isinstance(group, Group) else group["id"] return GroupUpdateResponse( await self.api_call( http_verb="PUT", path=f"Groups/{quote(group_id)}", body_params=group.to_dict() if isinstance(group, Group) else _to_dict_without_not_given(group), ) )
async def update_user(self, user: Union[Dict[str, Any], User]) ‑> UserUpdateResponse
-
Expand source code
async def update_user(self, user: Union[Dict[str, Any], User]) -> UserUpdateResponse: user_id = user.id if isinstance(user, User) else user["id"] return UserUpdateResponse( await self.api_call( http_verb="PUT", path=f"Users/{quote(user_id)}", body_params=user.to_dict() if isinstance(user, User) else _to_dict_without_not_given(user), ) )