Module slack_sdk.webhook.async_client

Expand source code
import json
import logging
from ssl import SSLContext
from typing import Dict, Union, Optional, Any, Sequence, List

import aiohttp
from aiohttp import BasicAuth, ClientSession

from slack_sdk.models.attachments import Attachment
from slack_sdk.models.blocks import Block
from .internal_utils import (
    _debug_log_response,
    _build_request_headers,
    _build_body,
    get_user_agent,
)
from .webhook_response import WebhookResponse
from ..proxy_env_variable_loader import load_http_proxy_from_env

from slack_sdk.http_retry.async_handler import AsyncRetryHandler
from slack_sdk.http_retry.builtin_async_handlers import async_default_handlers
from slack_sdk.http_retry.request import HttpRequest as RetryHttpRequest
from slack_sdk.http_retry.response import HttpResponse as RetryHttpResponse
from slack_sdk.http_retry.state import RetryState


class AsyncWebhookClient:
    url: str
    timeout: int
    ssl: Optional[SSLContext]
    proxy: Optional[str]
    session: Optional[ClientSession]
    trust_env_in_session: bool
    auth: Optional[BasicAuth]
    default_headers: Dict[str, str]
    logger: logging.Logger
    retry_handlers: List[AsyncRetryHandler]

    def __init__(
        self,
        url: str,
        timeout: int = 30,
        ssl: Optional[SSLContext] = None,
        proxy: Optional[str] = None,
        session: Optional[ClientSession] = None,
        trust_env_in_session: bool = False,
        auth: Optional[BasicAuth] = None,
        default_headers: Optional[Dict[str, str]] = None,
        user_agent_prefix: Optional[str] = None,
        user_agent_suffix: Optional[str] = None,
        logger: Optional[logging.Logger] = None,
        retry_handlers: Optional[List[AsyncRetryHandler]] = None,
    ):
        """API client for Incoming Webhooks and `response_url`

        https://api.slack.com/messaging/webhooks

        Args:
            url: Complete URL to send data (e.g., `https://hooks.slack.com/XXX`)
            timeout: Request timeout (in seconds)
            ssl: `ssl.SSLContext` to use for requests
            proxy: Proxy URL (e.g., `localhost:9000`, `http://localhost:9000`)
            session: `aiohttp.ClientSession` instance
            trust_env_in_session: True/False for `aiohttp.ClientSession`
            auth: Basic auth info for `aiohttp.ClientSession`
            default_headers: Request headers to add to all requests
            user_agent_prefix: Prefix for User-Agent header value
            user_agent_suffix: Suffix for User-Agent header value
            logger: Custom logger
        """
        self.url = url
        self.timeout = timeout
        self.ssl = ssl
        self.proxy = proxy
        self.trust_env_in_session = trust_env_in_session
        self.session = session
        self.auth = auth
        self.default_headers = default_headers if default_headers else {}
        self.default_headers["User-Agent"] = get_user_agent(user_agent_prefix, user_agent_suffix)
        self.logger = logger if logger is not None else logging.getLogger(__name__)
        self.retry_handlers = retry_handlers if retry_handlers is not None else async_default_handlers()

        if self.proxy is None or len(self.proxy.strip()) == 0:
            env_variable = load_http_proxy_from_env(self.logger)
            if env_variable is not None:
                self.proxy = env_variable

    async def send(
        self,
        *,
        text: Optional[str] = None,
        attachments: Optional[Sequence[Union[Dict[str, Any], Attachment]]] = None,
        blocks: Optional[Sequence[Union[Dict[str, Any], Block]]] = None,
        response_type: Optional[str] = None,
        replace_original: Optional[bool] = None,
        delete_original: Optional[bool] = None,
        unfurl_links: Optional[bool] = None,
        unfurl_media: Optional[bool] = None,
        metadata: Optional[Dict[str, Any]] = None,
        headers: Optional[Dict[str, str]] = None,
    ) -> WebhookResponse:
        """Performs a Slack API request and returns the result.

        Args:
            text: The text message (even when having blocks, setting this as well is recommended as it works as fallback)
            attachments: A collection of attachments
            blocks: A collection of Block Kit UI components
            response_type: The type of message (either 'in_channel' or 'ephemeral')
            replace_original: True if you use this option for response_url requests
            delete_original: True if you use this option for response_url requests
            unfurl_links: Option to indicate whether text url should unfurl
            unfurl_media: Option to indicate whether media url should unfurl
            metadata: Metadata attached to the message
            headers: Request headers to append only for this request

        Returns:
            Webhook response
        """
        return await self.send_dict(
            # It's fine to have None value elements here
            # because _build_body() filters them out when constructing the actual body data
            body={
                "text": text,
                "attachments": attachments,
                "blocks": blocks,
                "response_type": response_type,
                "replace_original": replace_original,
                "delete_original": delete_original,
                "unfurl_links": unfurl_links,
                "unfurl_media": unfurl_media,
                "metadata": metadata,
            },
            headers=headers,
        )

    async def send_dict(self, body: Dict[str, Any], headers: Optional[Dict[str, str]] = None) -> WebhookResponse:
        """Performs a Slack API request and returns the result.

        Args:
            body: JSON data structure (it's still a dict at this point),
                if you give this argument, body_params and files will be skipped
            headers: Request headers to append only for this request
        Returns:
            Webhook response
        """
        return await self._perform_http_request(
            body=_build_body(body),
            headers=_build_request_headers(self.default_headers, headers),
        )

    async def _perform_http_request(self, *, body: Dict[str, Any], headers: Dict[str, str]) -> WebhookResponse:
        str_body: str = json.dumps(body)
        headers["Content-Type"] = "application/json;charset=utf-8"

        session: Optional[ClientSession] = None
        use_running_session = self.session and not self.session.closed
        if use_running_session:
            session = self.session
        else:
            session = aiohttp.ClientSession(
                timeout=aiohttp.ClientTimeout(total=self.timeout),
                auth=self.auth,
                trust_env=self.trust_env_in_session,
            )

        last_error: Optional[Exception] = None
        resp: Optional[WebhookResponse] = None
        try:
            request_kwargs = {
                "headers": headers,
                "data": str_body,
                "ssl": self.ssl,
                "proxy": self.proxy,
            }
            retry_request = RetryHttpRequest(
                method="POST",
                url=self.url,
                headers=headers,
                body_params=body,
            )

            retry_state = RetryState()
            counter_for_safety = 0
            while counter_for_safety < 100:
                counter_for_safety += 1
                # If this is a retry, the next try started here. We can reset the flag.
                retry_state.next_attempt_requested = False
                retry_response: Optional[RetryHttpResponse] = None
                response_body = ""

                if self.logger.level <= logging.DEBUG:
                    self.logger.debug(f"Sending a request - url: {self.url}, body: {str_body}, headers: {headers}")

                try:
                    async with session.request("POST", self.url, **request_kwargs) as res:
                        try:
                            response_body = await res.text()
                            retry_response = RetryHttpResponse(
                                status_code=res.status,
                                headers=res.headers,
                                data=response_body.encode("utf-8") if response_body is not None else None,
                            )
                        except aiohttp.ContentTypeError:
                            self.logger.debug(f"No response data returned from the following API call: {self.url}")
                            retry_response = RetryHttpResponse(
                                status_code=res.status,
                                headers=res.headers,
                            )

                        if res.status == 429:
                            for handler in self.retry_handlers:
                                if await handler.can_retry_async(
                                    state=retry_state,
                                    request=retry_request,
                                    response=retry_response,
                                ):
                                    if self.logger.level <= logging.DEBUG:
                                        self.logger.info(
                                            f"A retry handler found: {type(handler).__name__} "
                                            f"for POST {self.url} - rate_limited"
                                        )
                                    await handler.prepare_for_next_attempt_async(
                                        state=retry_state,
                                        request=retry_request,
                                        response=retry_response,
                                    )
                                    break

                        if retry_state.next_attempt_requested is False:
                            resp = WebhookResponse(
                                url=self.url,
                                status_code=res.status,
                                body=response_body,
                                headers=res.headers,
                            )
                            _debug_log_response(self.logger, resp)
                            return resp

                except Exception as e:
                    last_error = e
                    for handler in self.retry_handlers:
                        if await handler.can_retry_async(
                            state=retry_state,
                            request=retry_request,
                            response=retry_response,
                            error=e,
                        ):
                            if self.logger.level <= logging.DEBUG:
                                self.logger.info(
                                    f"A retry handler found: {type(handler).__name__} " f"for POST {self.url} - {e}"
                                )
                            await handler.prepare_for_next_attempt_async(
                                state=retry_state,
                                request=retry_request,
                                response=retry_response,
                                error=e,
                            )
                            break

                    if retry_state.next_attempt_requested is False:
                        raise last_error

            if resp is not None:
                return resp
            raise last_error

        finally:
            if not use_running_session:
                await session.close()

        return resp

Classes

class AsyncWebhookClient (url: str, timeout: int = 30, ssl: Optional[ssl.SSLContext] = None, proxy: Optional[str] = None, session: Optional[aiohttp.client.ClientSession] = None, trust_env_in_session: bool = False, auth: Optional[aiohttp.helpers.BasicAuth] = None, default_headers: Optional[Dict[str, str]] = None, user_agent_prefix: Optional[str] = None, user_agent_suffix: Optional[str] = None, logger: Optional[logging.Logger] = None, retry_handlers: Optional[List[slack_sdk.http_retry.async_handler.AsyncRetryHandler]] = None)

API client for Incoming Webhooks and response_url

https://api.slack.com/messaging/webhooks

Args

url
Complete URL to send data (e.g., https://hooks.slack.com/XXX)
timeout
Request timeout (in seconds)
ssl
ssl.SSLContext to use for requests
proxy
Proxy URL (e.g., localhost:9000, http://localhost:9000)
session
aiohttp.ClientSession instance
trust_env_in_session
True/False for aiohttp.ClientSession
auth
Basic auth info for aiohttp.ClientSession
default_headers
Request headers to add to all requests
user_agent_prefix
Prefix for User-Agent header value
user_agent_suffix
Suffix for User-Agent header value
logger
Custom logger
Expand source code
class AsyncWebhookClient:
    url: str
    timeout: int
    ssl: Optional[SSLContext]
    proxy: Optional[str]
    session: Optional[ClientSession]
    trust_env_in_session: bool
    auth: Optional[BasicAuth]
    default_headers: Dict[str, str]
    logger: logging.Logger
    retry_handlers: List[AsyncRetryHandler]

    def __init__(
        self,
        url: str,
        timeout: int = 30,
        ssl: Optional[SSLContext] = None,
        proxy: Optional[str] = None,
        session: Optional[ClientSession] = None,
        trust_env_in_session: bool = False,
        auth: Optional[BasicAuth] = None,
        default_headers: Optional[Dict[str, str]] = None,
        user_agent_prefix: Optional[str] = None,
        user_agent_suffix: Optional[str] = None,
        logger: Optional[logging.Logger] = None,
        retry_handlers: Optional[List[AsyncRetryHandler]] = None,
    ):
        """API client for Incoming Webhooks and `response_url`

        https://api.slack.com/messaging/webhooks

        Args:
            url: Complete URL to send data (e.g., `https://hooks.slack.com/XXX`)
            timeout: Request timeout (in seconds)
            ssl: `ssl.SSLContext` to use for requests
            proxy: Proxy URL (e.g., `localhost:9000`, `http://localhost:9000`)
            session: `aiohttp.ClientSession` instance
            trust_env_in_session: True/False for `aiohttp.ClientSession`
            auth: Basic auth info for `aiohttp.ClientSession`
            default_headers: Request headers to add to all requests
            user_agent_prefix: Prefix for User-Agent header value
            user_agent_suffix: Suffix for User-Agent header value
            logger: Custom logger
        """
        self.url = url
        self.timeout = timeout
        self.ssl = ssl
        self.proxy = proxy
        self.trust_env_in_session = trust_env_in_session
        self.session = session
        self.auth = auth
        self.default_headers = default_headers if default_headers else {}
        self.default_headers["User-Agent"] = get_user_agent(user_agent_prefix, user_agent_suffix)
        self.logger = logger if logger is not None else logging.getLogger(__name__)
        self.retry_handlers = retry_handlers if retry_handlers is not None else async_default_handlers()

        if self.proxy is None or len(self.proxy.strip()) == 0:
            env_variable = load_http_proxy_from_env(self.logger)
            if env_variable is not None:
                self.proxy = env_variable

    async def send(
        self,
        *,
        text: Optional[str] = None,
        attachments: Optional[Sequence[Union[Dict[str, Any], Attachment]]] = None,
        blocks: Optional[Sequence[Union[Dict[str, Any], Block]]] = None,
        response_type: Optional[str] = None,
        replace_original: Optional[bool] = None,
        delete_original: Optional[bool] = None,
        unfurl_links: Optional[bool] = None,
        unfurl_media: Optional[bool] = None,
        metadata: Optional[Dict[str, Any]] = None,
        headers: Optional[Dict[str, str]] = None,
    ) -> WebhookResponse:
        """Performs a Slack API request and returns the result.

        Args:
            text: The text message (even when having blocks, setting this as well is recommended as it works as fallback)
            attachments: A collection of attachments
            blocks: A collection of Block Kit UI components
            response_type: The type of message (either 'in_channel' or 'ephemeral')
            replace_original: True if you use this option for response_url requests
            delete_original: True if you use this option for response_url requests
            unfurl_links: Option to indicate whether text url should unfurl
            unfurl_media: Option to indicate whether media url should unfurl
            metadata: Metadata attached to the message
            headers: Request headers to append only for this request

        Returns:
            Webhook response
        """
        return await self.send_dict(
            # It's fine to have None value elements here
            # because _build_body() filters them out when constructing the actual body data
            body={
                "text": text,
                "attachments": attachments,
                "blocks": blocks,
                "response_type": response_type,
                "replace_original": replace_original,
                "delete_original": delete_original,
                "unfurl_links": unfurl_links,
                "unfurl_media": unfurl_media,
                "metadata": metadata,
            },
            headers=headers,
        )

    async def send_dict(self, body: Dict[str, Any], headers: Optional[Dict[str, str]] = None) -> WebhookResponse:
        """Performs a Slack API request and returns the result.

        Args:
            body: JSON data structure (it's still a dict at this point),
                if you give this argument, body_params and files will be skipped
            headers: Request headers to append only for this request
        Returns:
            Webhook response
        """
        return await self._perform_http_request(
            body=_build_body(body),
            headers=_build_request_headers(self.default_headers, headers),
        )

    async def _perform_http_request(self, *, body: Dict[str, Any], headers: Dict[str, str]) -> WebhookResponse:
        str_body: str = json.dumps(body)
        headers["Content-Type"] = "application/json;charset=utf-8"

        session: Optional[ClientSession] = None
        use_running_session = self.session and not self.session.closed
        if use_running_session:
            session = self.session
        else:
            session = aiohttp.ClientSession(
                timeout=aiohttp.ClientTimeout(total=self.timeout),
                auth=self.auth,
                trust_env=self.trust_env_in_session,
            )

        last_error: Optional[Exception] = None
        resp: Optional[WebhookResponse] = None
        try:
            request_kwargs = {
                "headers": headers,
                "data": str_body,
                "ssl": self.ssl,
                "proxy": self.proxy,
            }
            retry_request = RetryHttpRequest(
                method="POST",
                url=self.url,
                headers=headers,
                body_params=body,
            )

            retry_state = RetryState()
            counter_for_safety = 0
            while counter_for_safety < 100:
                counter_for_safety += 1
                # If this is a retry, the next try started here. We can reset the flag.
                retry_state.next_attempt_requested = False
                retry_response: Optional[RetryHttpResponse] = None
                response_body = ""

                if self.logger.level <= logging.DEBUG:
                    self.logger.debug(f"Sending a request - url: {self.url}, body: {str_body}, headers: {headers}")

                try:
                    async with session.request("POST", self.url, **request_kwargs) as res:
                        try:
                            response_body = await res.text()
                            retry_response = RetryHttpResponse(
                                status_code=res.status,
                                headers=res.headers,
                                data=response_body.encode("utf-8") if response_body is not None else None,
                            )
                        except aiohttp.ContentTypeError:
                            self.logger.debug(f"No response data returned from the following API call: {self.url}")
                            retry_response = RetryHttpResponse(
                                status_code=res.status,
                                headers=res.headers,
                            )

                        if res.status == 429:
                            for handler in self.retry_handlers:
                                if await handler.can_retry_async(
                                    state=retry_state,
                                    request=retry_request,
                                    response=retry_response,
                                ):
                                    if self.logger.level <= logging.DEBUG:
                                        self.logger.info(
                                            f"A retry handler found: {type(handler).__name__} "
                                            f"for POST {self.url} - rate_limited"
                                        )
                                    await handler.prepare_for_next_attempt_async(
                                        state=retry_state,
                                        request=retry_request,
                                        response=retry_response,
                                    )
                                    break

                        if retry_state.next_attempt_requested is False:
                            resp = WebhookResponse(
                                url=self.url,
                                status_code=res.status,
                                body=response_body,
                                headers=res.headers,
                            )
                            _debug_log_response(self.logger, resp)
                            return resp

                except Exception as e:
                    last_error = e
                    for handler in self.retry_handlers:
                        if await handler.can_retry_async(
                            state=retry_state,
                            request=retry_request,
                            response=retry_response,
                            error=e,
                        ):
                            if self.logger.level <= logging.DEBUG:
                                self.logger.info(
                                    f"A retry handler found: {type(handler).__name__} " f"for POST {self.url} - {e}"
                                )
                            await handler.prepare_for_next_attempt_async(
                                state=retry_state,
                                request=retry_request,
                                response=retry_response,
                                error=e,
                            )
                            break

                    if retry_state.next_attempt_requested is False:
                        raise last_error

            if resp is not None:
                return resp
            raise last_error

        finally:
            if not use_running_session:
                await session.close()

        return resp

Class variables

var auth : Optional[aiohttp.helpers.BasicAuth]
var default_headers : Dict[str, str]
var logger : logging.Logger
var proxy : Optional[str]
var retry_handlers : List[slack_sdk.http_retry.async_handler.AsyncRetryHandler]
var session : Optional[aiohttp.client.ClientSession]
var ssl : Optional[ssl.SSLContext]
var timeout : int
var trust_env_in_session : bool
var url : str

Methods

async def send(self, *, text: Optional[str] = None, attachments: Optional[Sequence[Union[Dict[str, Any], Attachment]]] = None, blocks: Optional[Sequence[Union[Dict[str, Any], Block]]] = None, response_type: Optional[str] = None, replace_original: Optional[bool] = None, delete_original: Optional[bool] = None, unfurl_links: Optional[bool] = None, unfurl_media: Optional[bool] = None, metadata: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None) ‑> WebhookResponse

Performs a Slack API request and returns the result.

Args

text
The text message (even when having blocks, setting this as well is recommended as it works as fallback)
attachments
A collection of attachments
blocks
A collection of Block Kit UI components
response_type
The type of message (either 'in_channel' or 'ephemeral')
replace_original
True if you use this option for response_url requests
delete_original
True if you use this option for response_url requests
unfurl_links
Option to indicate whether text url should unfurl
unfurl_media
Option to indicate whether media url should unfurl
metadata
Metadata attached to the message
headers
Request headers to append only for this request

Returns

Webhook response

Expand source code
async def send(
    self,
    *,
    text: Optional[str] = None,
    attachments: Optional[Sequence[Union[Dict[str, Any], Attachment]]] = None,
    blocks: Optional[Sequence[Union[Dict[str, Any], Block]]] = None,
    response_type: Optional[str] = None,
    replace_original: Optional[bool] = None,
    delete_original: Optional[bool] = None,
    unfurl_links: Optional[bool] = None,
    unfurl_media: Optional[bool] = None,
    metadata: Optional[Dict[str, Any]] = None,
    headers: Optional[Dict[str, str]] = None,
) -> WebhookResponse:
    """Performs a Slack API request and returns the result.

    Args:
        text: The text message (even when having blocks, setting this as well is recommended as it works as fallback)
        attachments: A collection of attachments
        blocks: A collection of Block Kit UI components
        response_type: The type of message (either 'in_channel' or 'ephemeral')
        replace_original: True if you use this option for response_url requests
        delete_original: True if you use this option for response_url requests
        unfurl_links: Option to indicate whether text url should unfurl
        unfurl_media: Option to indicate whether media url should unfurl
        metadata: Metadata attached to the message
        headers: Request headers to append only for this request

    Returns:
        Webhook response
    """
    return await self.send_dict(
        # It's fine to have None value elements here
        # because _build_body() filters them out when constructing the actual body data
        body={
            "text": text,
            "attachments": attachments,
            "blocks": blocks,
            "response_type": response_type,
            "replace_original": replace_original,
            "delete_original": delete_original,
            "unfurl_links": unfurl_links,
            "unfurl_media": unfurl_media,
            "metadata": metadata,
        },
        headers=headers,
    )
async def send_dict(self, body: Dict[str, Any], headers: Optional[Dict[str, str]] = None) ‑> WebhookResponse

Performs a Slack API request and returns the result.

Args

body
JSON data structure (it's still a dict at this point), if you give this argument, body_params and files will be skipped
headers
Request headers to append only for this request

Returns

Webhook response

Expand source code
async def send_dict(self, body: Dict[str, Any], headers: Optional[Dict[str, str]] = None) -> WebhookResponse:
    """Performs a Slack API request and returns the result.

    Args:
        body: JSON data structure (it's still a dict at this point),
            if you give this argument, body_params and files will be skipped
        headers: Request headers to append only for this request
    Returns:
        Webhook response
    """
    return await self._perform_http_request(
        body=_build_body(body),
        headers=_build_request_headers(self.default_headers, headers),
    )