Module slack_bolt.listener.async_listener_error_handler
Expand source code
from abc import ABCMeta, abstractmethod
from logging import Logger
from typing import Callable, Dict, Any, Awaitable, Optional
from slack_bolt.kwargs_injection.async_utils import build_async_required_kwargs
from slack_bolt.request.async_request import AsyncBoltRequest
from slack_bolt.response import BoltResponse
from slack_bolt.util.utils import get_arg_names_of_callable
class AsyncListenerErrorHandler(metaclass=ABCMeta):
@abstractmethod
async def handle(
self,
error: Exception,
request: AsyncBoltRequest,
response: Optional[BoltResponse],
) -> None:
"""Handles an unhandled exception.
Args:
error: The raised exception.
request: The request.
response: The response.
"""
raise NotImplementedError()
class AsyncCustomListenerErrorHandler(AsyncListenerErrorHandler):
def __init__(self, logger: Logger, func: Callable[..., Awaitable[Optional[BoltResponse]]]):
self.func = func
self.logger = logger
self.arg_names = get_arg_names_of_callable(func)
async def handle(
self,
error: Exception,
request: AsyncBoltRequest,
response: Optional[BoltResponse],
) -> None:
kwargs: Dict[str, Any] = build_async_required_kwargs(
required_arg_names=self.arg_names,
logger=self.logger,
error=error,
request=request,
response=response,
next_keys_required=False,
)
returned_response = await self.func(**kwargs)
if returned_response is not None and isinstance(returned_response, BoltResponse):
response.status = returned_response.status
response.headers = returned_response.headers
response.body = returned_response.body
class AsyncDefaultListenerErrorHandler(AsyncListenerErrorHandler):
def __init__(self, logger: Logger):
self.logger = logger
async def handle(
self,
error: Exception,
request: AsyncBoltRequest,
response: Optional[BoltResponse],
):
message = f"Failed to run listener function (error: {error})"
self.logger.exception(message)
Classes
class AsyncCustomListenerErrorHandler (logger: logging.Logger, func: Callable[..., Awaitable[Optional[BoltResponse]]])
-
Expand source code
class AsyncCustomListenerErrorHandler(AsyncListenerErrorHandler): def __init__(self, logger: Logger, func: Callable[..., Awaitable[Optional[BoltResponse]]]): self.func = func self.logger = logger self.arg_names = get_arg_names_of_callable(func) async def handle( self, error: Exception, request: AsyncBoltRequest, response: Optional[BoltResponse], ) -> None: kwargs: Dict[str, Any] = build_async_required_kwargs( required_arg_names=self.arg_names, logger=self.logger, error=error, request=request, response=response, next_keys_required=False, ) returned_response = await self.func(**kwargs) if returned_response is not None and isinstance(returned_response, BoltResponse): response.status = returned_response.status response.headers = returned_response.headers response.body = returned_response.body
Ancestors
Inherited members
class AsyncDefaultListenerErrorHandler (logger: logging.Logger)
-
Expand source code
class AsyncDefaultListenerErrorHandler(AsyncListenerErrorHandler): def __init__(self, logger: Logger): self.logger = logger async def handle( self, error: Exception, request: AsyncBoltRequest, response: Optional[BoltResponse], ): message = f"Failed to run listener function (error: {error})" self.logger.exception(message)
Ancestors
Inherited members
class AsyncListenerErrorHandler
-
Expand source code
class AsyncListenerErrorHandler(metaclass=ABCMeta): @abstractmethod async def handle( self, error: Exception, request: AsyncBoltRequest, response: Optional[BoltResponse], ) -> None: """Handles an unhandled exception. Args: error: The raised exception. request: The request. response: The response. """ raise NotImplementedError()
Subclasses
Methods
async def handle(self, error: Exception, request: AsyncBoltRequest, response: Optional[BoltResponse]) ‑> None
-
Handles an unhandled exception.
Args
error
- The raised exception.
request
- The request.
response
- The response.
Expand source code
@abstractmethod async def handle( self, error: Exception, request: AsyncBoltRequest, response: Optional[BoltResponse], ) -> None: """Handles an unhandled exception. Args: error: The raised exception. request: The request. response: The response. """ raise NotImplementedError()