Module slack_sdk.rtm_v2
A Python module for interacting with Slack's RTM API.
Expand source code
"""A Python module for interacting with Slack's RTM API."""
import inspect
import json
import logging
import time
from concurrent.futures.thread import ThreadPoolExecutor
from logging import Logger
from queue import Queue, Empty
from ssl import SSLContext
from threading import Lock, Event
from typing import Optional, Callable, List, Union
from slack_sdk.errors import SlackApiError, SlackClientError
from slack_sdk.proxy_env_variable_loader import load_http_proxy_from_env
from slack_sdk.socket_mode.builtin.connection import Connection, ConnectionState
from slack_sdk.socket_mode.interval_runner import IntervalRunner
from slack_sdk.web import WebClient
class RTMClient:
token: Optional[str]
bot_id: Optional[str]
default_auto_reconnect_enabled: bool
auto_reconnect_enabled: bool
ssl: Optional[SSLContext]
proxy: Optional[str]
timeout: int
base_url: str
ping_interval: int
logger: Logger
web_client: WebClient
current_session: Optional[Connection]
current_session_state: Optional[ConnectionState]
wss_uri: Optional[str]
message_queue: Queue
message_listeners: List[Callable[["RTMClient", dict], None]]
message_processor: IntervalRunner
message_workers: ThreadPoolExecutor
closed: bool
connect_operation_lock: Lock
on_message_listeners: List[Callable[[str], None]]
on_error_listeners: List[Callable[[Exception], None]]
on_close_listeners: List[Callable[[int, Optional[str]], None]]
def __init__(
self,
*,
token: Optional[str] = None,
web_client: Optional[WebClient] = None,
auto_reconnect_enabled: bool = True,
ssl: Optional[SSLContext] = None,
proxy: Optional[str] = None,
timeout: int = 30,
base_url: str = WebClient.BASE_URL,
headers: Optional[dict] = None,
ping_interval: int = 5,
concurrency: int = 10,
logger: Optional[logging.Logger] = None,
on_message_listeners: Optional[List[Callable[[str], None]]] = None,
on_error_listeners: Optional[List[Callable[[Exception], None]]] = None,
on_close_listeners: Optional[List[Callable[[int, Optional[str]], None]]] = None,
trace_enabled: bool = False,
all_message_trace_enabled: bool = False,
ping_pong_trace_enabled: bool = False,
):
self.token = token.strip() if token is not None else None
self.bot_id = None
self.default_auto_reconnect_enabled = auto_reconnect_enabled
# You may want temporarily turn off the auto_reconnect as necessary
self.auto_reconnect_enabled = self.default_auto_reconnect_enabled
self.ssl = ssl
self.proxy = proxy
self.timeout = timeout
self.base_url = base_url
self.headers = headers
self.ping_interval = ping_interval
self.logger = logger or logging.getLogger(__name__)
if self.proxy is None or len(self.proxy.strip()) == 0:
env_variable = load_http_proxy_from_env(self.logger)
if env_variable is not None:
self.proxy = env_variable
self.web_client = web_client or WebClient(
token=self.token,
base_url=self.base_url,
timeout=self.timeout,
ssl=self.ssl,
proxy=self.proxy,
headers=self.headers,
logger=logger,
)
self.on_message_listeners = on_message_listeners or []
self.on_error_listeners = on_error_listeners or []
self.on_close_listeners = on_close_listeners or []
self.trace_enabled = trace_enabled
self.all_message_trace_enabled = all_message_trace_enabled
self.ping_pong_trace_enabled = ping_pong_trace_enabled
self.message_queue = Queue()
def goodbye_listener(_self, event: dict):
if event.get("type") == "goodbye":
message = "Got a goodbye message. Reconnecting to the server ..."
self.logger.info(message)
self.connect_to_new_endpoint(force=True)
self.message_listeners = [goodbye_listener]
self.socket_mode_request_listeners = []
self.current_session = None
self.current_session_state = ConnectionState()
self.current_session_runner = IntervalRunner(self._run_current_session, 0.1).start()
self.wss_uri = None
self.current_app_monitor_started = False
self.current_app_monitor = IntervalRunner(
self._monitor_current_session,
self.ping_interval,
)
self.closed = False
self.connect_operation_lock = Lock()
self.message_processor = IntervalRunner(self.process_messages, 0.001).start()
self.message_workers = ThreadPoolExecutor(max_workers=concurrency)
# --------------------------------------------------------------
# Decorator to register listeners
# --------------------------------------------------------------
def on(self, event_type: str) -> Callable:
"""Registers a new event listener.
Args:
event_type: str representing an event's type (e.g., message, reaction_added)
"""
def __call__(*args, **kwargs):
func = args[0]
if func is not None:
if isinstance(func, Callable):
name = (
func.__name__
if hasattr(func, "__name__")
else f"{func.__class__.__module__}.{func.__class__.__name__}"
)
inspect_result: inspect.FullArgSpec = inspect.getfullargspec(func)
if inspect_result is not None and len(inspect_result.args) != 2:
actual_args = ", ".join(inspect_result.args)
error = f"The listener '{name}' must accept two args: client, event (actual: {actual_args})"
raise SlackClientError(error)
def new_message_listener(_self, event: dict):
actual_event_type = event.get("type")
if event.get("bot_id") == self.bot_id:
# SKip the events generated by this bot user
return
# https://github.com/slackapi/python-slack-sdk/issues/533
if event_type == "*" or (actual_event_type is not None and actual_event_type == event_type):
func(_self, event)
self.message_listeners.append(new_message_listener)
else:
error = f"The listener '{func}' is not a Callable (actual: {type(func).__name__})"
raise SlackClientError(error)
# Not to cause modification to the decorated method
return func
return __call__
# --------------------------------------------------------------
# Connections
# --------------------------------------------------------------
def is_connected(self) -> bool:
"""Returns True if this client is connected."""
return self.current_session is not None and self.current_session.is_active()
def issue_new_wss_url(self) -> str:
"""Acquires a new WSS URL using rtm.connect API method"""
try:
api_response = self.web_client.rtm_connect()
return api_response["url"]
except SlackApiError as e:
if e.response["error"] == "ratelimited":
delay = int(e.response.headers.get("Retry-After", "30")) # Tier1
self.logger.info(f"Rate limited. Retrying in {delay} seconds...")
time.sleep(delay)
# Retry to issue a new WSS URL
return self.issue_new_wss_url()
else:
# other errors
self.logger.error(f"Failed to retrieve WSS URL: {e}")
raise e
def connect_to_new_endpoint(self, force: bool = False):
"""Acquires a new WSS URL and tries to connect to the endpoint."""
with self.connect_operation_lock:
if force or not self.is_connected():
self.logger.info("Connecting to a new endpoint...")
self.wss_uri = self.issue_new_wss_url()
self.connect()
self.logger.info("Connected to a new endpoint...")
def connect(self):
"""Starts talking to the RTM server through a WebSocket connection"""
if self.bot_id is None:
self.bot_id = self.web_client.auth_test()["bot_id"]
old_session: Optional[Connection] = self.current_session
old_current_session_state: ConnectionState = self.current_session_state
if self.wss_uri is None:
self.wss_uri = self.issue_new_wss_url()
current_session = Connection(
url=self.wss_uri,
logger=self.logger,
ping_interval=self.ping_interval,
trace_enabled=self.trace_enabled,
all_message_trace_enabled=self.all_message_trace_enabled,
ping_pong_trace_enabled=self.ping_pong_trace_enabled,
receive_buffer_size=1024,
proxy=self.proxy,
on_message_listener=self.run_all_message_listeners,
on_error_listener=self.run_all_error_listeners,
on_close_listener=self.run_all_close_listeners,
connection_type_name="RTM",
)
current_session.connect()
if old_current_session_state is not None:
old_current_session_state.terminated = True
if old_session is not None:
old_session.close()
self.current_session = current_session
self.current_session_state = ConnectionState()
self.auto_reconnect_enabled = self.default_auto_reconnect_enabled
if not self.current_app_monitor_started:
self.current_app_monitor_started = True
self.current_app_monitor.start()
self.logger.info(f"A new session has been established (session id: {self.session_id()})")
def disconnect(self):
"""Disconnects the current session."""
self.current_session.disconnect()
def close(self) -> None:
"""
Closes this instance and cleans up underlying resources.
After calling this method, this instance is no longer usable.
"""
self.closed = True
self.disconnect()
self.current_session.close()
def start(self) -> None:
"""Establishes an RTM connection and blocks the current thread."""
self.connect()
Event().wait()
def send(self, payload: Union[dict, str]) -> None:
if payload is None:
return
if self.current_session is None or not self.current_session.is_active():
raise SlackClientError("The RTM client is not connected to the Slack servers")
if isinstance(payload, str):
self.current_session.send(payload)
else:
self.current_session.send(json.dumps(payload))
# --------------------------------------------------------------
# WS Message Processor
# --------------------------------------------------------------
def enqueue_message(self, message: str):
self.message_queue.put(message)
if self.logger.level <= logging.DEBUG:
self.logger.debug(f"A new message enqueued (current queue size: {self.message_queue.qsize()})")
def process_message(self):
try:
raw_message = self.message_queue.get(timeout=1)
if self.logger.level <= logging.DEBUG:
self.logger.debug(f"A message dequeued (current queue size: {self.message_queue.qsize()})")
if raw_message is not None:
message: dict = {}
if raw_message.startswith("{"):
message = json.loads(raw_message)
def _run_message_listeners():
self.run_message_listeners(message)
self.message_workers.submit(_run_message_listeners)
except Empty:
pass
def process_messages(self) -> None:
while not self.closed:
try:
self.process_message()
except Exception as e:
self.logger.exception(f"Failed to process a message: {e}")
def run_message_listeners(self, message: dict) -> None:
type = message.get("type")
if self.logger.level <= logging.DEBUG:
self.logger.debug(f"Message processing started (type: {type})")
try:
for listener in self.message_listeners:
try:
listener(self, message)
except Exception as e:
self.logger.exception(f"Failed to run a message listener: {e}")
except Exception as e:
self.logger.exception(f"Failed to run message listeners: {e}")
finally:
if self.logger.level <= logging.DEBUG:
self.logger.debug(f"Message processing completed (type: {type})")
# --------------------------------------------------------------
# Internals
# --------------------------------------------------------------
def session_id(self) -> Optional[str]:
if self.current_session is not None:
return self.current_session.session_id
return None
def run_all_message_listeners(self, message: str):
if self.logger.level <= logging.DEBUG:
self.logger.debug(f"on_message invoked: (message: {message})")
self.enqueue_message(message)
for listener in self.on_message_listeners:
listener(message)
def run_all_error_listeners(self, error: Exception):
self.logger.exception(
f"on_error invoked (session id: {self.session_id()}, " f"error: {type(error).__name__}, message: {error})"
)
for listener in self.on_error_listeners:
listener(error)
def run_all_close_listeners(self, code: int, reason: Optional[str] = None):
if self.logger.level <= logging.DEBUG:
self.logger.debug(f"on_close invoked (session id: {self.session_id()})")
if self.auto_reconnect_enabled:
self.logger.info("Received CLOSE event. Going to reconnect... " f"(session id: {self.session_id()})")
self.connect_to_new_endpoint()
for listener in self.on_close_listeners:
listener(code, reason)
def _run_current_session(self):
if self.current_session is not None and self.current_session.is_active():
session_id = self.session_id()
try:
self.logger.info("Starting to receive messages from a new connection" f" (session id: {session_id})")
self.current_session_state.terminated = False
self.current_session.run_until_completion(self.current_session_state)
self.logger.info("Stopped receiving messages from a connection" f" (session id: {session_id})")
except Exception as e:
self.logger.exception(
"Failed to start or stop the current session" f" (session id: {session_id}, error: {e})"
)
def _monitor_current_session(self):
if self.current_app_monitor_started:
try:
self.current_session.check_state()
if self.auto_reconnect_enabled and (self.current_session is None or not self.current_session.is_active()):
self.logger.info(
"The session seems to be already closed. Going to reconnect... " f"(session id: {self.session_id()})"
)
self.connect_to_new_endpoint()
except Exception as e:
self.logger.error(
"Failed to check the current session or reconnect to the server "
f"(session id: {self.session_id()}, error: {type(e).__name__}, message: {e})"
)
Classes
class RTMClient (*, token: Optional[str] = None, web_client: Optional[WebClient] = None, auto_reconnect_enabled: bool = True, ssl: Optional[ssl.SSLContext] = None, proxy: Optional[str] = None, timeout: int = 30, base_url: str = 'https://www.slack.com/api/', headers: Optional[dict] = None, ping_interval: int = 5, concurrency: int = 10, logger: Optional[logging.Logger] = None, on_message_listeners: Optional[List[Callable[[str], None]]] = None, on_error_listeners: Optional[List[Callable[[Exception], None]]] = None, on_close_listeners: Optional[List[Callable[[int, Optional[str]], None]]] = None, trace_enabled: bool = False, all_message_trace_enabled: bool = False, ping_pong_trace_enabled: bool = False)
-
Expand source code
class RTMClient: token: Optional[str] bot_id: Optional[str] default_auto_reconnect_enabled: bool auto_reconnect_enabled: bool ssl: Optional[SSLContext] proxy: Optional[str] timeout: int base_url: str ping_interval: int logger: Logger web_client: WebClient current_session: Optional[Connection] current_session_state: Optional[ConnectionState] wss_uri: Optional[str] message_queue: Queue message_listeners: List[Callable[["RTMClient", dict], None]] message_processor: IntervalRunner message_workers: ThreadPoolExecutor closed: bool connect_operation_lock: Lock on_message_listeners: List[Callable[[str], None]] on_error_listeners: List[Callable[[Exception], None]] on_close_listeners: List[Callable[[int, Optional[str]], None]] def __init__( self, *, token: Optional[str] = None, web_client: Optional[WebClient] = None, auto_reconnect_enabled: bool = True, ssl: Optional[SSLContext] = None, proxy: Optional[str] = None, timeout: int = 30, base_url: str = WebClient.BASE_URL, headers: Optional[dict] = None, ping_interval: int = 5, concurrency: int = 10, logger: Optional[logging.Logger] = None, on_message_listeners: Optional[List[Callable[[str], None]]] = None, on_error_listeners: Optional[List[Callable[[Exception], None]]] = None, on_close_listeners: Optional[List[Callable[[int, Optional[str]], None]]] = None, trace_enabled: bool = False, all_message_trace_enabled: bool = False, ping_pong_trace_enabled: bool = False, ): self.token = token.strip() if token is not None else None self.bot_id = None self.default_auto_reconnect_enabled = auto_reconnect_enabled # You may want temporarily turn off the auto_reconnect as necessary self.auto_reconnect_enabled = self.default_auto_reconnect_enabled self.ssl = ssl self.proxy = proxy self.timeout = timeout self.base_url = base_url self.headers = headers self.ping_interval = ping_interval self.logger = logger or logging.getLogger(__name__) if self.proxy is None or len(self.proxy.strip()) == 0: env_variable = load_http_proxy_from_env(self.logger) if env_variable is not None: self.proxy = env_variable self.web_client = web_client or WebClient( token=self.token, base_url=self.base_url, timeout=self.timeout, ssl=self.ssl, proxy=self.proxy, headers=self.headers, logger=logger, ) self.on_message_listeners = on_message_listeners or [] self.on_error_listeners = on_error_listeners or [] self.on_close_listeners = on_close_listeners or [] self.trace_enabled = trace_enabled self.all_message_trace_enabled = all_message_trace_enabled self.ping_pong_trace_enabled = ping_pong_trace_enabled self.message_queue = Queue() def goodbye_listener(_self, event: dict): if event.get("type") == "goodbye": message = "Got a goodbye message. Reconnecting to the server ..." self.logger.info(message) self.connect_to_new_endpoint(force=True) self.message_listeners = [goodbye_listener] self.socket_mode_request_listeners = [] self.current_session = None self.current_session_state = ConnectionState() self.current_session_runner = IntervalRunner(self._run_current_session, 0.1).start() self.wss_uri = None self.current_app_monitor_started = False self.current_app_monitor = IntervalRunner( self._monitor_current_session, self.ping_interval, ) self.closed = False self.connect_operation_lock = Lock() self.message_processor = IntervalRunner(self.process_messages, 0.001).start() self.message_workers = ThreadPoolExecutor(max_workers=concurrency) # -------------------------------------------------------------- # Decorator to register listeners # -------------------------------------------------------------- def on(self, event_type: str) -> Callable: """Registers a new event listener. Args: event_type: str representing an event's type (e.g., message, reaction_added) """ def __call__(*args, **kwargs): func = args[0] if func is not None: if isinstance(func, Callable): name = ( func.__name__ if hasattr(func, "__name__") else f"{func.__class__.__module__}.{func.__class__.__name__}" ) inspect_result: inspect.FullArgSpec = inspect.getfullargspec(func) if inspect_result is not None and len(inspect_result.args) != 2: actual_args = ", ".join(inspect_result.args) error = f"The listener '{name}' must accept two args: client, event (actual: {actual_args})" raise SlackClientError(error) def new_message_listener(_self, event: dict): actual_event_type = event.get("type") if event.get("bot_id") == self.bot_id: # SKip the events generated by this bot user return # https://github.com/slackapi/python-slack-sdk/issues/533 if event_type == "*" or (actual_event_type is not None and actual_event_type == event_type): func(_self, event) self.message_listeners.append(new_message_listener) else: error = f"The listener '{func}' is not a Callable (actual: {type(func).__name__})" raise SlackClientError(error) # Not to cause modification to the decorated method return func return __call__ # -------------------------------------------------------------- # Connections # -------------------------------------------------------------- def is_connected(self) -> bool: """Returns True if this client is connected.""" return self.current_session is not None and self.current_session.is_active() def issue_new_wss_url(self) -> str: """Acquires a new WSS URL using rtm.connect API method""" try: api_response = self.web_client.rtm_connect() return api_response["url"] except SlackApiError as e: if e.response["error"] == "ratelimited": delay = int(e.response.headers.get("Retry-After", "30")) # Tier1 self.logger.info(f"Rate limited. Retrying in {delay} seconds...") time.sleep(delay) # Retry to issue a new WSS URL return self.issue_new_wss_url() else: # other errors self.logger.error(f"Failed to retrieve WSS URL: {e}") raise e def connect_to_new_endpoint(self, force: bool = False): """Acquires a new WSS URL and tries to connect to the endpoint.""" with self.connect_operation_lock: if force or not self.is_connected(): self.logger.info("Connecting to a new endpoint...") self.wss_uri = self.issue_new_wss_url() self.connect() self.logger.info("Connected to a new endpoint...") def connect(self): """Starts talking to the RTM server through a WebSocket connection""" if self.bot_id is None: self.bot_id = self.web_client.auth_test()["bot_id"] old_session: Optional[Connection] = self.current_session old_current_session_state: ConnectionState = self.current_session_state if self.wss_uri is None: self.wss_uri = self.issue_new_wss_url() current_session = Connection( url=self.wss_uri, logger=self.logger, ping_interval=self.ping_interval, trace_enabled=self.trace_enabled, all_message_trace_enabled=self.all_message_trace_enabled, ping_pong_trace_enabled=self.ping_pong_trace_enabled, receive_buffer_size=1024, proxy=self.proxy, on_message_listener=self.run_all_message_listeners, on_error_listener=self.run_all_error_listeners, on_close_listener=self.run_all_close_listeners, connection_type_name="RTM", ) current_session.connect() if old_current_session_state is not None: old_current_session_state.terminated = True if old_session is not None: old_session.close() self.current_session = current_session self.current_session_state = ConnectionState() self.auto_reconnect_enabled = self.default_auto_reconnect_enabled if not self.current_app_monitor_started: self.current_app_monitor_started = True self.current_app_monitor.start() self.logger.info(f"A new session has been established (session id: {self.session_id()})") def disconnect(self): """Disconnects the current session.""" self.current_session.disconnect() def close(self) -> None: """ Closes this instance and cleans up underlying resources. After calling this method, this instance is no longer usable. """ self.closed = True self.disconnect() self.current_session.close() def start(self) -> None: """Establishes an RTM connection and blocks the current thread.""" self.connect() Event().wait() def send(self, payload: Union[dict, str]) -> None: if payload is None: return if self.current_session is None or not self.current_session.is_active(): raise SlackClientError("The RTM client is not connected to the Slack servers") if isinstance(payload, str): self.current_session.send(payload) else: self.current_session.send(json.dumps(payload)) # -------------------------------------------------------------- # WS Message Processor # -------------------------------------------------------------- def enqueue_message(self, message: str): self.message_queue.put(message) if self.logger.level <= logging.DEBUG: self.logger.debug(f"A new message enqueued (current queue size: {self.message_queue.qsize()})") def process_message(self): try: raw_message = self.message_queue.get(timeout=1) if self.logger.level <= logging.DEBUG: self.logger.debug(f"A message dequeued (current queue size: {self.message_queue.qsize()})") if raw_message is not None: message: dict = {} if raw_message.startswith("{"): message = json.loads(raw_message) def _run_message_listeners(): self.run_message_listeners(message) self.message_workers.submit(_run_message_listeners) except Empty: pass def process_messages(self) -> None: while not self.closed: try: self.process_message() except Exception as e: self.logger.exception(f"Failed to process a message: {e}") def run_message_listeners(self, message: dict) -> None: type = message.get("type") if self.logger.level <= logging.DEBUG: self.logger.debug(f"Message processing started (type: {type})") try: for listener in self.message_listeners: try: listener(self, message) except Exception as e: self.logger.exception(f"Failed to run a message listener: {e}") except Exception as e: self.logger.exception(f"Failed to run message listeners: {e}") finally: if self.logger.level <= logging.DEBUG: self.logger.debug(f"Message processing completed (type: {type})") # -------------------------------------------------------------- # Internals # -------------------------------------------------------------- def session_id(self) -> Optional[str]: if self.current_session is not None: return self.current_session.session_id return None def run_all_message_listeners(self, message: str): if self.logger.level <= logging.DEBUG: self.logger.debug(f"on_message invoked: (message: {message})") self.enqueue_message(message) for listener in self.on_message_listeners: listener(message) def run_all_error_listeners(self, error: Exception): self.logger.exception( f"on_error invoked (session id: {self.session_id()}, " f"error: {type(error).__name__}, message: {error})" ) for listener in self.on_error_listeners: listener(error) def run_all_close_listeners(self, code: int, reason: Optional[str] = None): if self.logger.level <= logging.DEBUG: self.logger.debug(f"on_close invoked (session id: {self.session_id()})") if self.auto_reconnect_enabled: self.logger.info("Received CLOSE event. Going to reconnect... " f"(session id: {self.session_id()})") self.connect_to_new_endpoint() for listener in self.on_close_listeners: listener(code, reason) def _run_current_session(self): if self.current_session is not None and self.current_session.is_active(): session_id = self.session_id() try: self.logger.info("Starting to receive messages from a new connection" f" (session id: {session_id})") self.current_session_state.terminated = False self.current_session.run_until_completion(self.current_session_state) self.logger.info("Stopped receiving messages from a connection" f" (session id: {session_id})") except Exception as e: self.logger.exception( "Failed to start or stop the current session" f" (session id: {session_id}, error: {e})" ) def _monitor_current_session(self): if self.current_app_monitor_started: try: self.current_session.check_state() if self.auto_reconnect_enabled and (self.current_session is None or not self.current_session.is_active()): self.logger.info( "The session seems to be already closed. Going to reconnect... " f"(session id: {self.session_id()})" ) self.connect_to_new_endpoint() except Exception as e: self.logger.error( "Failed to check the current session or reconnect to the server " f"(session id: {self.session_id()}, error: {type(e).__name__}, message: {e})" )
Class variables
var auto_reconnect_enabled : bool
var base_url : str
var bot_id : Optional[str]
var closed : bool
var connect_operation_lock :
var current_session : Optional[Connection]
var current_session_state : Optional[ConnectionState]
var default_auto_reconnect_enabled : bool
var logger : logging.Logger
var message_listeners : List[Callable[[RTMClient, dict], None]]
var message_processor : IntervalRunner
var message_queue : queue.Queue
var message_workers : concurrent.futures.thread.ThreadPoolExecutor
var on_close_listeners : List[Callable[[int, Optional[str]], None]]
var on_error_listeners : List[Callable[[Exception], None]]
var on_message_listeners : List[Callable[[str], None]]
var ping_interval : int
var proxy : Optional[str]
var ssl : Optional[ssl.SSLContext]
var timeout : int
var token : Optional[str]
var web_client : WebClient
var wss_uri : Optional[str]
Methods
def close(self) ‑> None
-
Closes this instance and cleans up underlying resources. After calling this method, this instance is no longer usable.
Expand source code
def close(self) -> None: """ Closes this instance and cleans up underlying resources. After calling this method, this instance is no longer usable. """ self.closed = True self.disconnect() self.current_session.close()
def connect(self)
-
Starts talking to the RTM server through a WebSocket connection
Expand source code
def connect(self): """Starts talking to the RTM server through a WebSocket connection""" if self.bot_id is None: self.bot_id = self.web_client.auth_test()["bot_id"] old_session: Optional[Connection] = self.current_session old_current_session_state: ConnectionState = self.current_session_state if self.wss_uri is None: self.wss_uri = self.issue_new_wss_url() current_session = Connection( url=self.wss_uri, logger=self.logger, ping_interval=self.ping_interval, trace_enabled=self.trace_enabled, all_message_trace_enabled=self.all_message_trace_enabled, ping_pong_trace_enabled=self.ping_pong_trace_enabled, receive_buffer_size=1024, proxy=self.proxy, on_message_listener=self.run_all_message_listeners, on_error_listener=self.run_all_error_listeners, on_close_listener=self.run_all_close_listeners, connection_type_name="RTM", ) current_session.connect() if old_current_session_state is not None: old_current_session_state.terminated = True if old_session is not None: old_session.close() self.current_session = current_session self.current_session_state = ConnectionState() self.auto_reconnect_enabled = self.default_auto_reconnect_enabled if not self.current_app_monitor_started: self.current_app_monitor_started = True self.current_app_monitor.start() self.logger.info(f"A new session has been established (session id: {self.session_id()})")
def connect_to_new_endpoint(self, force: bool = False)
-
Acquires a new WSS URL and tries to connect to the endpoint.
Expand source code
def connect_to_new_endpoint(self, force: bool = False): """Acquires a new WSS URL and tries to connect to the endpoint.""" with self.connect_operation_lock: if force or not self.is_connected(): self.logger.info("Connecting to a new endpoint...") self.wss_uri = self.issue_new_wss_url() self.connect() self.logger.info("Connected to a new endpoint...")
def disconnect(self)
-
Disconnects the current session.
Expand source code
def disconnect(self): """Disconnects the current session.""" self.current_session.disconnect()
def enqueue_message(self, message: str)
-
Expand source code
def enqueue_message(self, message: str): self.message_queue.put(message) if self.logger.level <= logging.DEBUG: self.logger.debug(f"A new message enqueued (current queue size: {self.message_queue.qsize()})")
def is_connected(self) ‑> bool
-
Returns True if this client is connected.
Expand source code
def is_connected(self) -> bool: """Returns True if this client is connected.""" return self.current_session is not None and self.current_session.is_active()
def issue_new_wss_url(self) ‑> str
-
Acquires a new WSS URL using rtm.connect API method
Expand source code
def issue_new_wss_url(self) -> str: """Acquires a new WSS URL using rtm.connect API method""" try: api_response = self.web_client.rtm_connect() return api_response["url"] except SlackApiError as e: if e.response["error"] == "ratelimited": delay = int(e.response.headers.get("Retry-After", "30")) # Tier1 self.logger.info(f"Rate limited. Retrying in {delay} seconds...") time.sleep(delay) # Retry to issue a new WSS URL return self.issue_new_wss_url() else: # other errors self.logger.error(f"Failed to retrieve WSS URL: {e}") raise e
def on(self, event_type: str) ‑> Callable
-
Registers a new event listener.
Args
event_type
- str representing an event's type (e.g., message, reaction_added)
Expand source code
def on(self, event_type: str) -> Callable: """Registers a new event listener. Args: event_type: str representing an event's type (e.g., message, reaction_added) """ def __call__(*args, **kwargs): func = args[0] if func is not None: if isinstance(func, Callable): name = ( func.__name__ if hasattr(func, "__name__") else f"{func.__class__.__module__}.{func.__class__.__name__}" ) inspect_result: inspect.FullArgSpec = inspect.getfullargspec(func) if inspect_result is not None and len(inspect_result.args) != 2: actual_args = ", ".join(inspect_result.args) error = f"The listener '{name}' must accept two args: client, event (actual: {actual_args})" raise SlackClientError(error) def new_message_listener(_self, event: dict): actual_event_type = event.get("type") if event.get("bot_id") == self.bot_id: # SKip the events generated by this bot user return # https://github.com/slackapi/python-slack-sdk/issues/533 if event_type == "*" or (actual_event_type is not None and actual_event_type == event_type): func(_self, event) self.message_listeners.append(new_message_listener) else: error = f"The listener '{func}' is not a Callable (actual: {type(func).__name__})" raise SlackClientError(error) # Not to cause modification to the decorated method return func return __call__
def process_message(self)
-
Expand source code
def process_message(self): try: raw_message = self.message_queue.get(timeout=1) if self.logger.level <= logging.DEBUG: self.logger.debug(f"A message dequeued (current queue size: {self.message_queue.qsize()})") if raw_message is not None: message: dict = {} if raw_message.startswith("{"): message = json.loads(raw_message) def _run_message_listeners(): self.run_message_listeners(message) self.message_workers.submit(_run_message_listeners) except Empty: pass
def process_messages(self) ‑> None
-
Expand source code
def process_messages(self) -> None: while not self.closed: try: self.process_message() except Exception as e: self.logger.exception(f"Failed to process a message: {e}")
def run_all_close_listeners(self, code: int, reason: Optional[str] = None)
-
Expand source code
def run_all_close_listeners(self, code: int, reason: Optional[str] = None): if self.logger.level <= logging.DEBUG: self.logger.debug(f"on_close invoked (session id: {self.session_id()})") if self.auto_reconnect_enabled: self.logger.info("Received CLOSE event. Going to reconnect... " f"(session id: {self.session_id()})") self.connect_to_new_endpoint() for listener in self.on_close_listeners: listener(code, reason)
def run_all_error_listeners(self, error: Exception)
-
Expand source code
def run_all_error_listeners(self, error: Exception): self.logger.exception( f"on_error invoked (session id: {self.session_id()}, " f"error: {type(error).__name__}, message: {error})" ) for listener in self.on_error_listeners: listener(error)
def run_all_message_listeners(self, message: str)
-
Expand source code
def run_all_message_listeners(self, message: str): if self.logger.level <= logging.DEBUG: self.logger.debug(f"on_message invoked: (message: {message})") self.enqueue_message(message) for listener in self.on_message_listeners: listener(message)
def run_message_listeners(self, message: dict) ‑> None
-
Expand source code
def run_message_listeners(self, message: dict) -> None: type = message.get("type") if self.logger.level <= logging.DEBUG: self.logger.debug(f"Message processing started (type: {type})") try: for listener in self.message_listeners: try: listener(self, message) except Exception as e: self.logger.exception(f"Failed to run a message listener: {e}") except Exception as e: self.logger.exception(f"Failed to run message listeners: {e}") finally: if self.logger.level <= logging.DEBUG: self.logger.debug(f"Message processing completed (type: {type})")
def send(self, payload: Union[dict, str]) ‑> None
-
Expand source code
def send(self, payload: Union[dict, str]) -> None: if payload is None: return if self.current_session is None or not self.current_session.is_active(): raise SlackClientError("The RTM client is not connected to the Slack servers") if isinstance(payload, str): self.current_session.send(payload) else: self.current_session.send(json.dumps(payload))
def session_id(self) ‑> Optional[str]
-
Expand source code
def session_id(self) -> Optional[str]: if self.current_session is not None: return self.current_session.session_id return None
def start(self) ‑> None
-
Establishes an RTM connection and blocks the current thread.
Expand source code
def start(self) -> None: """Establishes an RTM connection and blocks the current thread.""" self.connect() Event().wait()