Module slack_sdk.socket_mode.aiohttp
aiohttp based Socket Mode client
Classes
class SocketModeClient (app_token: str, logger: Optional[logging.Logger] = None, web_client: Optional[AsyncWebClient] = None, proxy: Optional[str] = None, auto_reconnect_enabled: bool = True, ping_interval: float = 5, trace_enabled: bool = False, on_message_listeners: Optional[List[Callable[[aiohttp.http_websocket.WSMessage], Awaitable[None]]]] = None, on_error_listeners: Optional[List[Callable[[aiohttp.http_websocket.WSMessage], Awaitable[None]]]] = None, on_close_listeners: Optional[List[Callable[[aiohttp.http_websocket.WSMessage], Awaitable[None]]]] = None)
-
Socket Mode client
Args
app_token
- App-level token
logger
- Custom logger
web_client
- Web API client
auto_reconnect_enabled
- True if automatic reconnection is enabled (default: True)
ping_interval
- interval for ping-pong with Slack servers (seconds)
trace_enabled
- True if more verbose logs to see what's happening under the hood
proxy
- the HTTP proxy URL
on_message_listeners
- listener functions for on_message
on_error_listeners
- listener functions for on_error
on_close_listeners
- listener functions for on_close
Expand source code
class SocketModeClient(AsyncBaseSocketModeClient): logger: Logger web_client: AsyncWebClient app_token: str wss_uri: Optional[str] auto_reconnect_enabled: bool message_queue: Queue message_listeners: List[ Union[ AsyncWebSocketMessageListener, Callable[["AsyncBaseSocketModeClient", dict, Optional[str]], Awaitable[None]], ] ] socket_mode_request_listeners: List[ Union[ AsyncSocketModeRequestListener, Callable[["AsyncBaseSocketModeClient", SocketModeRequest], Awaitable[None]], ] ] message_receiver: Optional[Future] message_processor: Future proxy: Optional[str] ping_interval: float trace_enabled: bool last_ping_pong_time: Optional[float] current_session: Optional[ClientWebSocketResponse] current_session_monitor: Optional[Future] auto_reconnect_enabled: bool default_auto_reconnect_enabled: bool closed: bool stale: bool connect_operation_lock: Lock on_message_listeners: List[Callable[[WSMessage], Awaitable[None]]] on_error_listeners: List[Callable[[WSMessage], Awaitable[None]]] on_close_listeners: List[Callable[[WSMessage], Awaitable[None]]] def __init__( self, app_token: str, logger: Optional[Logger] = None, web_client: Optional[AsyncWebClient] = None, proxy: Optional[str] = None, auto_reconnect_enabled: bool = True, ping_interval: float = 5, trace_enabled: bool = False, on_message_listeners: Optional[List[Callable[[WSMessage], Awaitable[None]]]] = None, on_error_listeners: Optional[List[Callable[[WSMessage], Awaitable[None]]]] = None, on_close_listeners: Optional[List[Callable[[WSMessage], Awaitable[None]]]] = None, ): """Socket Mode client Args: app_token: App-level token logger: Custom logger web_client: Web API client auto_reconnect_enabled: True if automatic reconnection is enabled (default: True) ping_interval: interval for ping-pong with Slack servers (seconds) trace_enabled: True if more verbose logs to see what's happening under the hood proxy: the HTTP proxy URL on_message_listeners: listener functions for on_message on_error_listeners: listener functions for on_error on_close_listeners: listener functions for on_close """ self.app_token = app_token self.logger = logger or logging.getLogger(__name__) self.web_client = web_client or AsyncWebClient() self.closed = False self.stale = False self.connect_operation_lock = Lock() self.proxy = proxy if self.proxy is None or len(self.proxy.strip()) == 0: env_variable = load_http_proxy_from_env(self.logger) if env_variable is not None: self.proxy = env_variable self.default_auto_reconnect_enabled = auto_reconnect_enabled self.auto_reconnect_enabled = self.default_auto_reconnect_enabled self.ping_interval = ping_interval self.trace_enabled = trace_enabled self.last_ping_pong_time = None self.wss_uri = None self.message_queue = Queue() self.message_listeners = [] self.socket_mode_request_listeners = [] self.current_session = None self.current_session_monitor = None # https://docs.aiohttp.org/en/stable/client_reference.html # Unless you are connecting to a large, unknown number of different servers # over the lifetime of your application, # it is suggested you use a single session for the lifetime of your application # to benefit from connection pooling. self.aiohttp_client_session = aiohttp.ClientSession() self.on_message_listeners = on_message_listeners or [] self.on_error_listeners = on_error_listeners or [] self.on_close_listeners = on_close_listeners or [] self.message_receiver = None self.message_processor = asyncio.ensure_future(self.process_messages()) async def monitor_current_session(self) -> None: # In the asyncio runtime, accessing a shared object (self.current_session here) from # multiple tasks can cause race conditions and errors. # To avoid such, we access only the session that is active when this loop starts. session: ClientWebSocketResponse = self.current_session session_id: str = self.build_session_id(session) if self.logger.level <= logging.DEBUG: self.logger.debug(f"A new monitor_current_session() execution loop for {session_id} started") try: logging_interval = 100 counter_for_logging = 0 while not self.closed: if session != self.current_session: if self.logger.level <= logging.DEBUG: self.logger.debug(f"The monitor_current_session task for {session_id} is now cancelled") break try: if self.trace_enabled and self.logger.level <= logging.DEBUG: # The logging here is for detailed investigation on potential issues in this client. # If you don't see this log for a while, it means that # this receive_messages execution is no longer working for some reason. counter_for_logging += 1 if counter_for_logging >= logging_interval: counter_for_logging = 0 log_message = ( "#monitor_current_session method has been verifying if this session is active " f"(session: {session_id}, logging interval: {logging_interval})" ) self.logger.debug(log_message) await asyncio.sleep(self.ping_interval) if session is not None and session.closed is False: t = time.time() if self.last_ping_pong_time is None: self.last_ping_pong_time = float(t) try: await session.ping(f"sdk-ping-pong:{t}") except Exception as e: # The ping() method can fail for some reason. # To establish a new connection even in this scenario, # we ignore the exception here. self.logger.warning(f"Failed to send a ping message ({session_id}): {e}") if self.auto_reconnect_enabled: should_reconnect = False if session is None or session.closed: self.logger.info(f"The session ({session_id}) seems to be already closed. Reconnecting...") should_reconnect = True if await self.is_ping_pong_failing(): disconnected_seconds = int(time.time() - self.last_ping_pong_time) self.logger.info( f"The session ({session_id}) seems to be stale. Reconnecting..." f" reason: disconnected for {disconnected_seconds}+ seconds)" ) self.stale = True self.last_ping_pong_time = None should_reconnect = True if should_reconnect is True or not await self.is_connected(): await self.connect_to_new_endpoint() except Exception as e: self.logger.error( f"Failed to check the current session ({session_id}) or reconnect to the server " f"(error: {type(e).__name__}, message: {e})" ) except asyncio.CancelledError: if self.logger.level <= logging.DEBUG: self.logger.debug(f"The monitor_current_session task for {session_id} is now cancelled") raise async def receive_messages(self) -> None: # In the asyncio runtime, accessing a shared object (self.current_session here) from # multiple tasks can cause race conditions and errors. # To avoid such, we access only the session that is active when this loop starts. session = self.current_session session_id = self.build_session_id(session) if self.logger.level <= logging.DEBUG: self.logger.debug(f"A new receive_messages() execution loop with {session_id} started") try: consecutive_error_count = 0 logging_interval = 100 counter_for_logging = 0 while not self.closed: if session != self.current_session: if self.logger.level <= logging.DEBUG: self.logger.debug(f"The running receive_messages task for {session_id} is now cancelled") break try: message: WSMessage = await session.receive() # just in case, checking if the value is not None if message is not None: if self.logger.level <= logging.DEBUG: # The following logging prints every single received message # except empty message data ones. m_type = WSMsgType(message.type) message_type = m_type.name if m_type is not None else message.type message_data = message.data if isinstance(message_data, bytes): message_data = message_data.decode("utf-8") if message_data is not None and len(message_data) > 0: # To skip the empty message that Slack server-side often sends self.logger.debug( f"Received message " f"(type: {message_type}, " f"data: {message_data}, " f"extra: {message.extra}, " f"session: {session_id})" ) if self.trace_enabled: # The logging here is for detailed trouble shooting of potential issues in this client. # If you don't see this log for a while, it can mean that # this receive_messages execution is no longer working for some reason. counter_for_logging += 1 if counter_for_logging >= logging_interval: counter_for_logging = 0 log_message = ( "#receive_messages method has been working without any issues " f"(session: {session_id}, logging interval: {logging_interval})" ) self.logger.debug(log_message) if message.type == WSMsgType.TEXT: message_data = message.data await self.enqueue_message(message_data) for listener in self.on_message_listeners: await listener(message) elif message.type == WSMsgType.CLOSE: if self.auto_reconnect_enabled: self.logger.info(f"Received CLOSE event from {session_id}. Reconnecting...") await self.connect_to_new_endpoint() for listener in self.on_close_listeners: await listener(message) elif message.type == WSMsgType.ERROR: for listener in self.on_error_listeners: await listener(message) elif message.type == WSMsgType.CLOSED: await asyncio.sleep(self.ping_interval) continue elif message.type == WSMsgType.PING: await session.pong(message.data) continue elif message.type == WSMsgType.PONG: if message.data is not None: str_message_data = message.data.decode("utf-8") elements = str_message_data.split(":") if len(elements) == 2 and elements[0] == "sdk-ping-pong": try: self.last_ping_pong_time = float(elements[1]) except Exception as e: self.logger.warning( f"Failed to parse the last_ping_pong_time value from {str_message_data}" f" - error : {e}, session: {session_id}" ) continue consecutive_error_count = 0 except Exception as e: consecutive_error_count += 1 self.logger.error(f"Failed to receive or enqueue a message: {type(e).__name__}, {e} ({session_id})") if isinstance(e, ClientConnectionError): await asyncio.sleep(self.ping_interval) else: await asyncio.sleep(consecutive_error_count) except asyncio.CancelledError: if self.logger.level <= logging.DEBUG: self.logger.debug(f"The running receive_messages task for {session_id} is now cancelled") raise async def is_ping_pong_failing(self) -> bool: if self.last_ping_pong_time is None: return False disconnected_seconds = int(time.time() - self.last_ping_pong_time) return disconnected_seconds >= (self.ping_interval * 4) async def is_connected(self) -> bool: connected: bool = ( not self.closed and not self.stale and self.current_session is not None and not self.current_session.closed and not await self.is_ping_pong_failing() ) if self.logger.level <= logging.DEBUG and connected is False: # Prints more detailed information about the inactive connection is_ping_pong_failing = await self.is_ping_pong_failing() session_id = await self.session_id() self.logger.debug( "Inactive connection detected (" f"session_id: {session_id}, " f"closed: {self.closed}, " f"stale: {self.stale}, " f"current_session.closed: {self.current_session and self.current_session.closed}, " f"is_ping_pong_failing: {is_ping_pong_failing}" ")" ) return connected async def session_id(self) -> str: return self.build_session_id(self.current_session) async def connect(self): # This loop is used to ensure when a new session is created, # a new monitor and a new message receiver are also created. # If a new session is created but we failed to create the new # monitor or the new message, we should try it. while True: try: old_session: Optional[ClientWebSocketResponse] = ( None if self.current_session is None else self.current_session ) # If the old session is broken (e.g. reset by peer), it might fail to close it. # We don't want to retry when this kind of cases happen. try: # We should close old session before create a new one. Because when disconnect # reason is `too_many_websockets`, we need to close the old one first to # to decrease the number of connections. self.auto_reconnect_enabled = False if old_session is not None: await old_session.close() old_session_id = self.build_session_id(old_session) self.logger.info(f"The old session ({old_session_id}) has been abandoned") except Exception as e: self.logger.exception(f"Failed to close the old session : {e}") if self.wss_uri is None: # If the underlying WSS URL does not exist, # acquiring a new active WSS URL from the server-side first self.wss_uri = await self.issue_new_wss_url() self.current_session = await self.aiohttp_client_session.ws_connect( self.wss_uri, autoping=False, heartbeat=self.ping_interval, proxy=self.proxy, ssl=self.web_client.ssl, ) session_id: str = await self.session_id() self.auto_reconnect_enabled = self.default_auto_reconnect_enabled self.stale = False self.logger.info(f"A new session ({session_id}) has been established") # The first ping from the new connection if self.logger.level <= logging.DEBUG: self.logger.debug(f"Sending a ping message with the newly established connection ({session_id})...") t = time.time() await self.current_session.ping(f"sdk-ping-pong:{t}") if self.current_session_monitor is not None: self.current_session_monitor.cancel() self.current_session_monitor = asyncio.ensure_future(self.monitor_current_session()) if self.logger.level <= logging.DEBUG: self.logger.debug(f"A new monitor_current_session() executor has been recreated for {session_id}") if self.message_receiver is not None: self.message_receiver.cancel() self.message_receiver = asyncio.ensure_future(self.receive_messages()) if self.logger.level <= logging.DEBUG: self.logger.debug(f"A new receive_messages() executor has been recreated for {session_id}") break except Exception as e: self.logger.exception(f"Failed to connect (error: {e}); Retrying...") await asyncio.sleep(self.ping_interval) async def disconnect(self): if self.current_session is not None: await self.current_session.close() session_id = await self.session_id() self.logger.info(f"The current session ({session_id}) has been abandoned by disconnect() method call") async def send_message(self, message: str): session_id = await self.session_id() if self.logger.level <= logging.DEBUG: self.logger.debug(f"Sending a message: {message} from session: {session_id}") try: await self.current_session.send_str(message) except ConnectionError as e: # We rarely get this exception while replacing the underlying WebSocket connections. # We can do one more try here as the self.current_session should be ready now. if self.logger.level <= logging.DEBUG: self.logger.debug( f"Failed to send a message (error: {e}, message: {message}, session: {session_id})" " as the underlying connection was replaced. Retrying the same request only one time..." ) # Although acquiring self.connect_operation_lock also for the first method call is the safest way, # we avoid synchronizing a lot for better performance. That's why we are doing a retry here. try: await self.connect_operation_lock.acquire() if await self.is_connected(): await self.current_session.send_str(message) else: self.logger.warning( f"The current session ({session_id}) is no longer active. " "Failed to send a message" ) raise e finally: if self.connect_operation_lock.locked() is True: self.connect_operation_lock.release() async def close(self): self.closed = True self.auto_reconnect_enabled = False await self.disconnect() if self.message_processor is not None: self.message_processor.cancel() if self.current_session_monitor is not None: self.current_session_monitor.cancel() if self.message_receiver is not None: self.message_receiver.cancel() if self.aiohttp_client_session is not None: await self.aiohttp_client_session.close() @classmethod def build_session_id(cls, session: ClientWebSocketResponse) -> str: if session is None: return "" return "s_" + str(hash(session))
Ancestors
Class variables
var app_token : str
var auto_reconnect_enabled : bool
var closed : bool
var connect_operation_lock : asyncio.locks.Lock
var current_session : Optional[aiohttp.client_ws.ClientWebSocketResponse]
var current_session_monitor : Optional[_asyncio.Future]
var default_auto_reconnect_enabled : bool
var last_ping_pong_time : Optional[float]
var logger : logging.Logger
var message_listeners : List[Union[AsyncWebSocketMessageListener, Callable[[AsyncBaseSocketModeClient, dict, Optional[str]], Awaitable[None]]]]
var message_processor : _asyncio.Future
var message_queue : asyncio.queues.Queue
var message_receiver : Optional[_asyncio.Future]
var on_close_listeners : List[Callable[[aiohttp.http_websocket.WSMessage], Awaitable[None]]]
var on_error_listeners : List[Callable[[aiohttp.http_websocket.WSMessage], Awaitable[None]]]
var on_message_listeners : List[Callable[[aiohttp.http_websocket.WSMessage], Awaitable[None]]]
var ping_interval : float
var proxy : Optional[str]
var socket_mode_request_listeners : List[Union[AsyncSocketModeRequestListener, Callable[[AsyncBaseSocketModeClient, SocketModeRequest], Awaitable[None]]]]
var stale : bool
var trace_enabled : bool
var web_client : AsyncWebClient
var wss_uri : Optional[str]
Static methods
def build_session_id(session: aiohttp.client_ws.ClientWebSocketResponse) ‑> str
Methods
async def close(self)
async def connect(self)
async def disconnect(self)
async def is_connected(self) ‑> bool
async def is_ping_pong_failing(self) ‑> bool
async def monitor_current_session(self) ‑> None
async def receive_messages(self) ‑> None
async def send_message(self, message: str)
async def session_id(self) ‑> str