Module slack_sdk.socket_mode.builtin.connection
Expand source code
import socket
import ssl
import struct
import time
from logging import Logger
from threading import Lock
from typing import Optional, Callable, Union, List, Tuple, Dict
from urllib.parse import urlparse
from uuid import uuid4
from slack_sdk.errors import SlackClientNotConnectedError, SlackClientConfigurationError
from .frame_header import FrameHeader
from .internals import (
_parse_handshake_response,
_validate_sec_websocket_accept,
_generate_sec_websocket_key,
_to_readable_opcode,
_receive_messages,
_build_data_frame_for_sending,
_parse_text_payload,
_establish_new_socket_connection,
)
class ConnectionState:
# The flag supposed to be used for telling SocketModeClient
# when this connection is no longer available
terminated: bool
def __init__(self):
self.terminated = False
class Connection:
url: str
logger: Logger
proxy: Optional[str]
proxy_headers: Optional[Dict[str, str]]
trace_enabled: bool
ping_pong_trace_enabled: bool
last_ping_pong_time: Optional[float]
session_id: str
sock: Optional[ssl.SSLSocket]
on_message_listener: Optional[Callable[[str], None]]
on_error_listener: Optional[Callable[[Exception], None]]
on_close_listener: Optional[Callable[[int, Optional[str]], None]]
def __init__(
self,
url: str,
logger: Logger,
proxy: Optional[str] = None,
proxy_headers: Optional[Dict[str, str]] = None,
ping_interval: float = 5, # seconds
receive_timeout: float = 3,
receive_buffer_size: int = 1024,
trace_enabled: bool = False,
all_message_trace_enabled: bool = False,
ping_pong_trace_enabled: bool = False,
on_message_listener: Optional[Callable[[str], None]] = None,
on_error_listener: Optional[Callable[[Exception], None]] = None,
on_close_listener: Optional[Callable[[int, Optional[str]], None]] = None,
connection_type_name: str = "Socket Mode",
ssl_context: Optional[ssl.SSLContext] = None,
):
self.url = url
self.logger = logger
self.proxy = proxy
self.proxy_headers = proxy_headers
self.ping_interval = ping_interval
self.receive_timeout = receive_timeout
self.receive_buffer_size = receive_buffer_size
if self.receive_buffer_size < 16:
raise SlackClientConfigurationError("Too small receive_buffer_size detected.")
self.session_id = str(uuid4())
self.trace_enabled = trace_enabled
self.all_message_trace_enabled = all_message_trace_enabled
self.ping_pong_trace_enabled = ping_pong_trace_enabled
self.last_ping_pong_time = None
self.consecutive_check_state_error_count = 0
self.sock = None
# To avoid ssl.SSLError: [SSL: BAD_LENGTH] bad length
self.sock_receive_lock = Lock()
self.sock_send_lock = Lock()
self.on_message_listener = on_message_listener
self.on_error_listener = on_error_listener
self.on_close_listener = on_close_listener
self.connection_type_name = connection_type_name
self.ssl_context = ssl_context
def connect(self) -> None:
try:
parsed_url = urlparse(self.url.strip())
hostname: str = parsed_url.hostname
port: int = parsed_url.port or (443 if parsed_url.scheme == "wss" else 80)
if self.trace_enabled:
self.logger.debug(
f"Connecting to the address for handshake: {hostname}:{port} " f"(session id: {self.session_id})"
)
sock: Union[ssl.SSLSocket, socket] = _establish_new_socket_connection( # type: ignore
session_id=self.session_id,
server_hostname=hostname,
server_port=port,
logger=self.logger,
sock_send_lock=self.sock_send_lock,
receive_timeout=self.receive_timeout,
proxy=self.proxy,
proxy_headers=self.proxy_headers,
trace_enabled=self.trace_enabled,
ssl_context=self.ssl_context,
)
# WebSocket handshake
try:
path = f"{parsed_url.path}?{parsed_url.query}"
sec_websocket_key = _generate_sec_websocket_key()
message = f"""GET {path} HTTP/1.1
Host: {parsed_url.hostname}
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: {sec_websocket_key}
Sec-WebSocket-Version: 13
"""
req: str = "\r\n".join([line.lstrip() for line in message.split("\n")])
if self.trace_enabled:
self.logger.debug(
f"{self.connection_type_name} handshake request (session id: {self.session_id}):\n{req}"
)
with self.sock_send_lock:
sock.send(req.encode("utf-8"))
status, headers, text = _parse_handshake_response(sock)
if self.trace_enabled:
self.logger.debug(
f"{self.connection_type_name} handshake response (session id: {self.session_id}):\n{text}"
)
# HTTP/1.1 101 Switching Protocols
if status == 101:
if not _validate_sec_websocket_accept(sec_websocket_key, headers):
raise SlackClientNotConnectedError(
f"Invalid response header detected in {self.connection_type_name} handshake response"
f" (session id: {self.session_id})"
)
# set this successfully connected socket
self.sock = sock
self.ping(f"{self.session_id}:{time.time()}")
else:
message = (
f"Received an unexpected response for handshake "
f"(status: {status}, response: {text}, session id: {self.session_id})"
)
self.logger.warning(message)
except socket.error as e:
code: Optional[int] = None
if e.args and len(e.args) > 1 and isinstance(e.args[0], int):
code = e.args[0]
if code is not None:
error_message = f"Error code: {code} (session id: {self.session_id}, error: {e})"
if self.trace_enabled:
self.logger.exception(error_message)
else:
self.logger.error(error_message)
raise
except Exception as e:
error_message = f"Failed to establish a connection (session id: {self.session_id}, error: {e})"
if self.trace_enabled:
self.logger.exception(error_message)
else:
self.logger.error(error_message)
if self.on_error_listener is not None:
self.on_error_listener(e)
self.disconnect()
def disconnect(self) -> None:
if self.sock is not None:
with self.sock_send_lock:
with self.sock_receive_lock:
# Synchronize before closing this instance's socket
self.sock.close()
self.sock = None
# After this, all operations using self.sock will be skipped
self.logger.info(f"The connection has been closed (session id: {self.session_id})")
def is_active(self) -> bool:
return self.sock is not None
def close(self) -> None:
self.disconnect()
def ping(self, payload: Union[str, bytes] = "") -> None:
if self.trace_enabled and self.ping_pong_trace_enabled:
if isinstance(payload, bytes):
payload = payload.decode("utf-8")
self.logger.debug("Sending a ping data frame " f"(session id: {self.session_id}, payload: {payload})")
data = _build_data_frame_for_sending(payload, FrameHeader.OPCODE_PING)
with self.sock_send_lock:
if self.sock is not None:
self.sock.send(data)
else:
if self.ping_pong_trace_enabled:
self.logger.debug("Skipped sending a ping message as the underlying socket is no longer available.")
def pong(self, payload: Union[str, bytes] = "") -> None:
if self.trace_enabled and self.ping_pong_trace_enabled:
if isinstance(payload, bytes):
payload = payload.decode("utf-8")
self.logger.debug("Sending a pong data frame " f"(session id: {self.session_id}, payload: {payload})")
data = _build_data_frame_for_sending(payload, FrameHeader.OPCODE_PONG)
with self.sock_send_lock:
if self.sock is not None:
self.sock.send(data)
else:
if self.ping_pong_trace_enabled:
self.logger.debug("Skipped sending a pong message as the underlying socket is no longer available.")
def send(self, payload: str) -> None:
if self.trace_enabled:
if isinstance(payload, bytes):
payload = payload.decode("utf-8")
self.logger.debug("Sending a text data frame " f"(session id: {self.session_id}, payload: {payload})")
data = _build_data_frame_for_sending(payload, FrameHeader.OPCODE_TEXT)
with self.sock_send_lock:
try:
self.sock.send(data)
except Exception as e:
# In most cases, we want to retry this operation with a newly established connection.
# Getting this exception means that this connection has been replaced with a new one
# and it's no longer usable.
# The SocketModeClient implementation can do one retry when it gets this exception.
raise SlackClientNotConnectedError(
f"Failed to send a message as the connection is no longer active "
f"(session_id: {self.session_id}, error: {e})"
)
def check_state(self) -> None:
try:
if self.sock is not None:
try:
self.ping(f"{self.session_id}:{time.time()}")
except ssl.SSLZeroReturnError as e:
self.logger.info(
"Unable to send a ping message. Closing the connection..."
f" (session id: {self.session_id}, reason: {e})"
)
self.disconnect()
return
if self.last_ping_pong_time is not None:
disconnected_seconds = int(time.time() - self.last_ping_pong_time)
if self.trace_enabled and disconnected_seconds > self.ping_interval:
message = (
f"{disconnected_seconds} seconds have passed "
f"since this client last received a pong response from the server "
f"(session id: {self.session_id})"
)
self.logger.debug(message)
is_stale = disconnected_seconds > self.ping_interval * 4
if is_stale:
self.logger.info(
"The connection seems to be stale. Disconnecting..."
f" (session id: {self.session_id},"
f" reason: disconnected for {disconnected_seconds}+ seconds)"
)
self.disconnect()
return
else:
self.logger.debug("This connection is already closed." f" (session id: {self.session_id})")
self.consecutive_check_state_error_count = 0
except Exception as e:
error_message = (
"Failed to check the state of sock "
f"(session id: {self.session_id}, error: {type(e).__name__}, message: {e})"
)
if self.trace_enabled:
self.logger.exception(error_message)
else:
self.logger.error(error_message)
self.consecutive_check_state_error_count += 1
if self.consecutive_check_state_error_count >= 5:
self.disconnect()
def run_until_completion(self, state: ConnectionState) -> None:
repeated_messages = {"payload": 0}
ping_count = 0
pong_count = 0
ping_pong_log_summary_size = 1000
while not state.terminated:
try:
if self.is_active():
received_messages: List[Tuple[Optional[FrameHeader], bytes]] = _receive_messages(
sock=self.sock,
sock_receive_lock=self.sock_receive_lock,
logger=self.logger,
receive_buffer_size=self.receive_buffer_size,
all_message_trace_enabled=self.all_message_trace_enabled,
)
for message in received_messages:
header, data = message
# -----------------
# trace logging
if self.trace_enabled is True:
opcode: str = _to_readable_opcode(header.opcode) if header else "-"
payload: str = _parse_text_payload(data, self.logger)
count: Optional[int] = repeated_messages.get(payload)
if count is None:
count = 1
else:
count += 1
repeated_messages = {payload: count}
if not self.ping_pong_trace_enabled and header is not None and header.opcode is not None:
if header.opcode == FrameHeader.OPCODE_PING:
ping_count += 1
if ping_count % ping_pong_log_summary_size == 0:
self.logger.debug(
f"Received {ping_pong_log_summary_size} ping data frame "
f"(session id: {self.session_id})"
)
ping_count = 0
if header.opcode == FrameHeader.OPCODE_PONG:
pong_count += 1
if pong_count % ping_pong_log_summary_size == 0:
self.logger.debug(
f"Received {ping_pong_log_summary_size} pong data frame "
f"(session id: {self.session_id})"
)
pong_count = 0
ping_pong_to_skip = (
header is not None
and header.opcode is not None
and (header.opcode == FrameHeader.OPCODE_PING or header.opcode == FrameHeader.OPCODE_PONG)
and not self.ping_pong_trace_enabled
)
if not ping_pong_to_skip and count < 5:
# if so many same payloads came in, the trace logging should be skipped.
# e.g., after receiving "UNAUTHENTICATED: cache_error", many "opcode: -, payload: "
self.logger.debug(
"Received a new data frame "
f"(session id: {self.session_id}, opcode: {opcode}, payload: {payload})"
)
if header is None:
# Skip no header message
continue
# -----------------
# message with opcode
if header.opcode == FrameHeader.OPCODE_PING:
self.pong(data)
elif header.opcode == FrameHeader.OPCODE_PONG:
str_message = data.decode("utf-8")
elements = str_message.split(":")
if len(elements) >= 2:
session_id, ping_time = elements[0], elements[1]
if self.session_id == session_id:
try:
self.last_ping_pong_time = float(ping_time)
except Exception as e:
self.logger.debug(
"Failed to parse a pong message " f" (message: {str_message}, error: {e}"
)
elif header.opcode == FrameHeader.OPCODE_TEXT:
if self.on_message_listener is not None:
text = data.decode("utf-8")
self.on_message_listener(text)
elif header.opcode == FrameHeader.OPCODE_CLOSE:
if self.on_close_listener is not None:
if len(data) >= 2:
(code,) = struct.unpack("!H", data[:2])
reason = data[2:].decode("utf-8")
self.on_close_listener(code, reason)
else:
self.on_close_listener(1005, "")
self.disconnect()
state.terminated = True
else:
# Just warn logging
opcode = _to_readable_opcode(header.opcode) if header else "-"
payload: Union[bytes, str] = data
if header.opcode != FrameHeader.OPCODE_BINARY:
try:
payload = data.decode("utf-8") if data is not None else ""
except Exception as e:
self.logger.info(f"Failed to convert the data to text {e}")
message = (
"Received an unsupported data frame "
f"(session id: {self.session_id}, opcode: {opcode}, payload: {payload})"
)
self.logger.warning(message)
else:
time.sleep(0.2)
except socket.timeout:
time.sleep(0.01)
except OSError as e:
# getting errno.EBADF and the socket is no longer available
if e.errno == 9 and state.terminated:
self.logger.debug(
"The reason why you got [Errno 9] Bad file descriptor here is " "the socket is no longer available."
)
else:
if self.on_error_listener is not None:
self.on_error_listener(e)
else:
error_message = "Got an OSError while receiving data" f" (session id: {self.session_id}, error: {e})"
if self.trace_enabled:
self.logger.exception(error_message)
else:
self.logger.error(error_message)
# As this connection no longer works in any way, terminating it
if self.is_active():
try:
self.disconnect()
except Exception as disconnection_error:
error_message = (
"Failed to disconnect" f" (session id: {self.session_id}, error: {disconnection_error})"
)
if self.trace_enabled:
self.logger.exception(error_message)
else:
self.logger.error(error_message)
state.terminated = True
break
except Exception as e:
if self.on_error_listener is not None:
self.on_error_listener(e)
else:
error_message = "Got an exception while receiving data" f" (session id: {self.session_id}, error: {e})"
if self.trace_enabled:
self.logger.exception(error_message)
else:
self.logger.error(error_message)
state.terminated = True
Classes
class Connection (url: str, logger: logging.Logger, proxy: Optional[str] = None, proxy_headers: Optional[Dict[str, str]] = None, ping_interval: float = 5, receive_timeout: float = 3, receive_buffer_size: int = 1024, trace_enabled: bool = False, all_message_trace_enabled: bool = False, ping_pong_trace_enabled: bool = False, on_message_listener: Optional[Callable[[str], None]] = None, on_error_listener: Optional[Callable[[Exception], None]] = None, on_close_listener: Optional[Callable[[int, Optional[str]], None]] = None, connection_type_name: str = 'Socket Mode', ssl_context: Optional[ssl.SSLContext] = None)
-
Expand source code
class Connection: url: str logger: Logger proxy: Optional[str] proxy_headers: Optional[Dict[str, str]] trace_enabled: bool ping_pong_trace_enabled: bool last_ping_pong_time: Optional[float] session_id: str sock: Optional[ssl.SSLSocket] on_message_listener: Optional[Callable[[str], None]] on_error_listener: Optional[Callable[[Exception], None]] on_close_listener: Optional[Callable[[int, Optional[str]], None]] def __init__( self, url: str, logger: Logger, proxy: Optional[str] = None, proxy_headers: Optional[Dict[str, str]] = None, ping_interval: float = 5, # seconds receive_timeout: float = 3, receive_buffer_size: int = 1024, trace_enabled: bool = False, all_message_trace_enabled: bool = False, ping_pong_trace_enabled: bool = False, on_message_listener: Optional[Callable[[str], None]] = None, on_error_listener: Optional[Callable[[Exception], None]] = None, on_close_listener: Optional[Callable[[int, Optional[str]], None]] = None, connection_type_name: str = "Socket Mode", ssl_context: Optional[ssl.SSLContext] = None, ): self.url = url self.logger = logger self.proxy = proxy self.proxy_headers = proxy_headers self.ping_interval = ping_interval self.receive_timeout = receive_timeout self.receive_buffer_size = receive_buffer_size if self.receive_buffer_size < 16: raise SlackClientConfigurationError("Too small receive_buffer_size detected.") self.session_id = str(uuid4()) self.trace_enabled = trace_enabled self.all_message_trace_enabled = all_message_trace_enabled self.ping_pong_trace_enabled = ping_pong_trace_enabled self.last_ping_pong_time = None self.consecutive_check_state_error_count = 0 self.sock = None # To avoid ssl.SSLError: [SSL: BAD_LENGTH] bad length self.sock_receive_lock = Lock() self.sock_send_lock = Lock() self.on_message_listener = on_message_listener self.on_error_listener = on_error_listener self.on_close_listener = on_close_listener self.connection_type_name = connection_type_name self.ssl_context = ssl_context def connect(self) -> None: try: parsed_url = urlparse(self.url.strip()) hostname: str = parsed_url.hostname port: int = parsed_url.port or (443 if parsed_url.scheme == "wss" else 80) if self.trace_enabled: self.logger.debug( f"Connecting to the address for handshake: {hostname}:{port} " f"(session id: {self.session_id})" ) sock: Union[ssl.SSLSocket, socket] = _establish_new_socket_connection( # type: ignore session_id=self.session_id, server_hostname=hostname, server_port=port, logger=self.logger, sock_send_lock=self.sock_send_lock, receive_timeout=self.receive_timeout, proxy=self.proxy, proxy_headers=self.proxy_headers, trace_enabled=self.trace_enabled, ssl_context=self.ssl_context, ) # WebSocket handshake try: path = f"{parsed_url.path}?{parsed_url.query}" sec_websocket_key = _generate_sec_websocket_key() message = f"""GET {path} HTTP/1.1 Host: {parsed_url.hostname} Upgrade: websocket Connection: Upgrade Sec-WebSocket-Key: {sec_websocket_key} Sec-WebSocket-Version: 13 """ req: str = "\r\n".join([line.lstrip() for line in message.split("\n")]) if self.trace_enabled: self.logger.debug( f"{self.connection_type_name} handshake request (session id: {self.session_id}):\n{req}" ) with self.sock_send_lock: sock.send(req.encode("utf-8")) status, headers, text = _parse_handshake_response(sock) if self.trace_enabled: self.logger.debug( f"{self.connection_type_name} handshake response (session id: {self.session_id}):\n{text}" ) # HTTP/1.1 101 Switching Protocols if status == 101: if not _validate_sec_websocket_accept(sec_websocket_key, headers): raise SlackClientNotConnectedError( f"Invalid response header detected in {self.connection_type_name} handshake response" f" (session id: {self.session_id})" ) # set this successfully connected socket self.sock = sock self.ping(f"{self.session_id}:{time.time()}") else: message = ( f"Received an unexpected response for handshake " f"(status: {status}, response: {text}, session id: {self.session_id})" ) self.logger.warning(message) except socket.error as e: code: Optional[int] = None if e.args and len(e.args) > 1 and isinstance(e.args[0], int): code = e.args[0] if code is not None: error_message = f"Error code: {code} (session id: {self.session_id}, error: {e})" if self.trace_enabled: self.logger.exception(error_message) else: self.logger.error(error_message) raise except Exception as e: error_message = f"Failed to establish a connection (session id: {self.session_id}, error: {e})" if self.trace_enabled: self.logger.exception(error_message) else: self.logger.error(error_message) if self.on_error_listener is not None: self.on_error_listener(e) self.disconnect() def disconnect(self) -> None: if self.sock is not None: with self.sock_send_lock: with self.sock_receive_lock: # Synchronize before closing this instance's socket self.sock.close() self.sock = None # After this, all operations using self.sock will be skipped self.logger.info(f"The connection has been closed (session id: {self.session_id})") def is_active(self) -> bool: return self.sock is not None def close(self) -> None: self.disconnect() def ping(self, payload: Union[str, bytes] = "") -> None: if self.trace_enabled and self.ping_pong_trace_enabled: if isinstance(payload, bytes): payload = payload.decode("utf-8") self.logger.debug("Sending a ping data frame " f"(session id: {self.session_id}, payload: {payload})") data = _build_data_frame_for_sending(payload, FrameHeader.OPCODE_PING) with self.sock_send_lock: if self.sock is not None: self.sock.send(data) else: if self.ping_pong_trace_enabled: self.logger.debug("Skipped sending a ping message as the underlying socket is no longer available.") def pong(self, payload: Union[str, bytes] = "") -> None: if self.trace_enabled and self.ping_pong_trace_enabled: if isinstance(payload, bytes): payload = payload.decode("utf-8") self.logger.debug("Sending a pong data frame " f"(session id: {self.session_id}, payload: {payload})") data = _build_data_frame_for_sending(payload, FrameHeader.OPCODE_PONG) with self.sock_send_lock: if self.sock is not None: self.sock.send(data) else: if self.ping_pong_trace_enabled: self.logger.debug("Skipped sending a pong message as the underlying socket is no longer available.") def send(self, payload: str) -> None: if self.trace_enabled: if isinstance(payload, bytes): payload = payload.decode("utf-8") self.logger.debug("Sending a text data frame " f"(session id: {self.session_id}, payload: {payload})") data = _build_data_frame_for_sending(payload, FrameHeader.OPCODE_TEXT) with self.sock_send_lock: try: self.sock.send(data) except Exception as e: # In most cases, we want to retry this operation with a newly established connection. # Getting this exception means that this connection has been replaced with a new one # and it's no longer usable. # The SocketModeClient implementation can do one retry when it gets this exception. raise SlackClientNotConnectedError( f"Failed to send a message as the connection is no longer active " f"(session_id: {self.session_id}, error: {e})" ) def check_state(self) -> None: try: if self.sock is not None: try: self.ping(f"{self.session_id}:{time.time()}") except ssl.SSLZeroReturnError as e: self.logger.info( "Unable to send a ping message. Closing the connection..." f" (session id: {self.session_id}, reason: {e})" ) self.disconnect() return if self.last_ping_pong_time is not None: disconnected_seconds = int(time.time() - self.last_ping_pong_time) if self.trace_enabled and disconnected_seconds > self.ping_interval: message = ( f"{disconnected_seconds} seconds have passed " f"since this client last received a pong response from the server " f"(session id: {self.session_id})" ) self.logger.debug(message) is_stale = disconnected_seconds > self.ping_interval * 4 if is_stale: self.logger.info( "The connection seems to be stale. Disconnecting..." f" (session id: {self.session_id}," f" reason: disconnected for {disconnected_seconds}+ seconds)" ) self.disconnect() return else: self.logger.debug("This connection is already closed." f" (session id: {self.session_id})") self.consecutive_check_state_error_count = 0 except Exception as e: error_message = ( "Failed to check the state of sock " f"(session id: {self.session_id}, error: {type(e).__name__}, message: {e})" ) if self.trace_enabled: self.logger.exception(error_message) else: self.logger.error(error_message) self.consecutive_check_state_error_count += 1 if self.consecutive_check_state_error_count >= 5: self.disconnect() def run_until_completion(self, state: ConnectionState) -> None: repeated_messages = {"payload": 0} ping_count = 0 pong_count = 0 ping_pong_log_summary_size = 1000 while not state.terminated: try: if self.is_active(): received_messages: List[Tuple[Optional[FrameHeader], bytes]] = _receive_messages( sock=self.sock, sock_receive_lock=self.sock_receive_lock, logger=self.logger, receive_buffer_size=self.receive_buffer_size, all_message_trace_enabled=self.all_message_trace_enabled, ) for message in received_messages: header, data = message # ----------------- # trace logging if self.trace_enabled is True: opcode: str = _to_readable_opcode(header.opcode) if header else "-" payload: str = _parse_text_payload(data, self.logger) count: Optional[int] = repeated_messages.get(payload) if count is None: count = 1 else: count += 1 repeated_messages = {payload: count} if not self.ping_pong_trace_enabled and header is not None and header.opcode is not None: if header.opcode == FrameHeader.OPCODE_PING: ping_count += 1 if ping_count % ping_pong_log_summary_size == 0: self.logger.debug( f"Received {ping_pong_log_summary_size} ping data frame " f"(session id: {self.session_id})" ) ping_count = 0 if header.opcode == FrameHeader.OPCODE_PONG: pong_count += 1 if pong_count % ping_pong_log_summary_size == 0: self.logger.debug( f"Received {ping_pong_log_summary_size} pong data frame " f"(session id: {self.session_id})" ) pong_count = 0 ping_pong_to_skip = ( header is not None and header.opcode is not None and (header.opcode == FrameHeader.OPCODE_PING or header.opcode == FrameHeader.OPCODE_PONG) and not self.ping_pong_trace_enabled ) if not ping_pong_to_skip and count < 5: # if so many same payloads came in, the trace logging should be skipped. # e.g., after receiving "UNAUTHENTICATED: cache_error", many "opcode: -, payload: " self.logger.debug( "Received a new data frame " f"(session id: {self.session_id}, opcode: {opcode}, payload: {payload})" ) if header is None: # Skip no header message continue # ----------------- # message with opcode if header.opcode == FrameHeader.OPCODE_PING: self.pong(data) elif header.opcode == FrameHeader.OPCODE_PONG: str_message = data.decode("utf-8") elements = str_message.split(":") if len(elements) >= 2: session_id, ping_time = elements[0], elements[1] if self.session_id == session_id: try: self.last_ping_pong_time = float(ping_time) except Exception as e: self.logger.debug( "Failed to parse a pong message " f" (message: {str_message}, error: {e}" ) elif header.opcode == FrameHeader.OPCODE_TEXT: if self.on_message_listener is not None: text = data.decode("utf-8") self.on_message_listener(text) elif header.opcode == FrameHeader.OPCODE_CLOSE: if self.on_close_listener is not None: if len(data) >= 2: (code,) = struct.unpack("!H", data[:2]) reason = data[2:].decode("utf-8") self.on_close_listener(code, reason) else: self.on_close_listener(1005, "") self.disconnect() state.terminated = True else: # Just warn logging opcode = _to_readable_opcode(header.opcode) if header else "-" payload: Union[bytes, str] = data if header.opcode != FrameHeader.OPCODE_BINARY: try: payload = data.decode("utf-8") if data is not None else "" except Exception as e: self.logger.info(f"Failed to convert the data to text {e}") message = ( "Received an unsupported data frame " f"(session id: {self.session_id}, opcode: {opcode}, payload: {payload})" ) self.logger.warning(message) else: time.sleep(0.2) except socket.timeout: time.sleep(0.01) except OSError as e: # getting errno.EBADF and the socket is no longer available if e.errno == 9 and state.terminated: self.logger.debug( "The reason why you got [Errno 9] Bad file descriptor here is " "the socket is no longer available." ) else: if self.on_error_listener is not None: self.on_error_listener(e) else: error_message = "Got an OSError while receiving data" f" (session id: {self.session_id}, error: {e})" if self.trace_enabled: self.logger.exception(error_message) else: self.logger.error(error_message) # As this connection no longer works in any way, terminating it if self.is_active(): try: self.disconnect() except Exception as disconnection_error: error_message = ( "Failed to disconnect" f" (session id: {self.session_id}, error: {disconnection_error})" ) if self.trace_enabled: self.logger.exception(error_message) else: self.logger.error(error_message) state.terminated = True break except Exception as e: if self.on_error_listener is not None: self.on_error_listener(e) else: error_message = "Got an exception while receiving data" f" (session id: {self.session_id}, error: {e})" if self.trace_enabled: self.logger.exception(error_message) else: self.logger.error(error_message) state.terminated = True
Class variables
var last_ping_pong_time : Optional[float]
var logger : logging.Logger
var on_close_listener : Optional[Callable[[int, Optional[str]], None]]
var on_error_listener : Optional[Callable[[Exception], None]]
var on_message_listener : Optional[Callable[[str], None]]
var ping_pong_trace_enabled : bool
var proxy : Optional[str]
var proxy_headers : Optional[Dict[str, str]]
var session_id : str
var sock : Optional[ssl.SSLSocket]
var trace_enabled : bool
var url : str
Methods
def check_state(self) ‑> None
-
Expand source code
def check_state(self) -> None: try: if self.sock is not None: try: self.ping(f"{self.session_id}:{time.time()}") except ssl.SSLZeroReturnError as e: self.logger.info( "Unable to send a ping message. Closing the connection..." f" (session id: {self.session_id}, reason: {e})" ) self.disconnect() return if self.last_ping_pong_time is not None: disconnected_seconds = int(time.time() - self.last_ping_pong_time) if self.trace_enabled and disconnected_seconds > self.ping_interval: message = ( f"{disconnected_seconds} seconds have passed " f"since this client last received a pong response from the server " f"(session id: {self.session_id})" ) self.logger.debug(message) is_stale = disconnected_seconds > self.ping_interval * 4 if is_stale: self.logger.info( "The connection seems to be stale. Disconnecting..." f" (session id: {self.session_id}," f" reason: disconnected for {disconnected_seconds}+ seconds)" ) self.disconnect() return else: self.logger.debug("This connection is already closed." f" (session id: {self.session_id})") self.consecutive_check_state_error_count = 0 except Exception as e: error_message = ( "Failed to check the state of sock " f"(session id: {self.session_id}, error: {type(e).__name__}, message: {e})" ) if self.trace_enabled: self.logger.exception(error_message) else: self.logger.error(error_message) self.consecutive_check_state_error_count += 1 if self.consecutive_check_state_error_count >= 5: self.disconnect()
def close(self) ‑> None
-
Expand source code
def close(self) -> None: self.disconnect()
def connect(self) ‑> None
-
Expand source code
def connect(self) -> None: try: parsed_url = urlparse(self.url.strip()) hostname: str = parsed_url.hostname port: int = parsed_url.port or (443 if parsed_url.scheme == "wss" else 80) if self.trace_enabled: self.logger.debug( f"Connecting to the address for handshake: {hostname}:{port} " f"(session id: {self.session_id})" ) sock: Union[ssl.SSLSocket, socket] = _establish_new_socket_connection( # type: ignore session_id=self.session_id, server_hostname=hostname, server_port=port, logger=self.logger, sock_send_lock=self.sock_send_lock, receive_timeout=self.receive_timeout, proxy=self.proxy, proxy_headers=self.proxy_headers, trace_enabled=self.trace_enabled, ssl_context=self.ssl_context, ) # WebSocket handshake try: path = f"{parsed_url.path}?{parsed_url.query}" sec_websocket_key = _generate_sec_websocket_key() message = f"""GET {path} HTTP/1.1 Host: {parsed_url.hostname} Upgrade: websocket Connection: Upgrade Sec-WebSocket-Key: {sec_websocket_key} Sec-WebSocket-Version: 13 """ req: str = "\r\n".join([line.lstrip() for line in message.split("\n")]) if self.trace_enabled: self.logger.debug( f"{self.connection_type_name} handshake request (session id: {self.session_id}):\n{req}" ) with self.sock_send_lock: sock.send(req.encode("utf-8")) status, headers, text = _parse_handshake_response(sock) if self.trace_enabled: self.logger.debug( f"{self.connection_type_name} handshake response (session id: {self.session_id}):\n{text}" ) # HTTP/1.1 101 Switching Protocols if status == 101: if not _validate_sec_websocket_accept(sec_websocket_key, headers): raise SlackClientNotConnectedError( f"Invalid response header detected in {self.connection_type_name} handshake response" f" (session id: {self.session_id})" ) # set this successfully connected socket self.sock = sock self.ping(f"{self.session_id}:{time.time()}") else: message = ( f"Received an unexpected response for handshake " f"(status: {status}, response: {text}, session id: {self.session_id})" ) self.logger.warning(message) except socket.error as e: code: Optional[int] = None if e.args and len(e.args) > 1 and isinstance(e.args[0], int): code = e.args[0] if code is not None: error_message = f"Error code: {code} (session id: {self.session_id}, error: {e})" if self.trace_enabled: self.logger.exception(error_message) else: self.logger.error(error_message) raise except Exception as e: error_message = f"Failed to establish a connection (session id: {self.session_id}, error: {e})" if self.trace_enabled: self.logger.exception(error_message) else: self.logger.error(error_message) if self.on_error_listener is not None: self.on_error_listener(e) self.disconnect()
def disconnect(self) ‑> None
-
Expand source code
def disconnect(self) -> None: if self.sock is not None: with self.sock_send_lock: with self.sock_receive_lock: # Synchronize before closing this instance's socket self.sock.close() self.sock = None # After this, all operations using self.sock will be skipped self.logger.info(f"The connection has been closed (session id: {self.session_id})")
def is_active(self) ‑> bool
-
Expand source code
def is_active(self) -> bool: return self.sock is not None
def ping(self, payload: Union[str, bytes] = '') ‑> None
-
Expand source code
def ping(self, payload: Union[str, bytes] = "") -> None: if self.trace_enabled and self.ping_pong_trace_enabled: if isinstance(payload, bytes): payload = payload.decode("utf-8") self.logger.debug("Sending a ping data frame " f"(session id: {self.session_id}, payload: {payload})") data = _build_data_frame_for_sending(payload, FrameHeader.OPCODE_PING) with self.sock_send_lock: if self.sock is not None: self.sock.send(data) else: if self.ping_pong_trace_enabled: self.logger.debug("Skipped sending a ping message as the underlying socket is no longer available.")
def pong(self, payload: Union[str, bytes] = '') ‑> None
-
Expand source code
def pong(self, payload: Union[str, bytes] = "") -> None: if self.trace_enabled and self.ping_pong_trace_enabled: if isinstance(payload, bytes): payload = payload.decode("utf-8") self.logger.debug("Sending a pong data frame " f"(session id: {self.session_id}, payload: {payload})") data = _build_data_frame_for_sending(payload, FrameHeader.OPCODE_PONG) with self.sock_send_lock: if self.sock is not None: self.sock.send(data) else: if self.ping_pong_trace_enabled: self.logger.debug("Skipped sending a pong message as the underlying socket is no longer available.")
def run_until_completion(self, state: ConnectionState) ‑> None
-
Expand source code
def run_until_completion(self, state: ConnectionState) -> None: repeated_messages = {"payload": 0} ping_count = 0 pong_count = 0 ping_pong_log_summary_size = 1000 while not state.terminated: try: if self.is_active(): received_messages: List[Tuple[Optional[FrameHeader], bytes]] = _receive_messages( sock=self.sock, sock_receive_lock=self.sock_receive_lock, logger=self.logger, receive_buffer_size=self.receive_buffer_size, all_message_trace_enabled=self.all_message_trace_enabled, ) for message in received_messages: header, data = message # ----------------- # trace logging if self.trace_enabled is True: opcode: str = _to_readable_opcode(header.opcode) if header else "-" payload: str = _parse_text_payload(data, self.logger) count: Optional[int] = repeated_messages.get(payload) if count is None: count = 1 else: count += 1 repeated_messages = {payload: count} if not self.ping_pong_trace_enabled and header is not None and header.opcode is not None: if header.opcode == FrameHeader.OPCODE_PING: ping_count += 1 if ping_count % ping_pong_log_summary_size == 0: self.logger.debug( f"Received {ping_pong_log_summary_size} ping data frame " f"(session id: {self.session_id})" ) ping_count = 0 if header.opcode == FrameHeader.OPCODE_PONG: pong_count += 1 if pong_count % ping_pong_log_summary_size == 0: self.logger.debug( f"Received {ping_pong_log_summary_size} pong data frame " f"(session id: {self.session_id})" ) pong_count = 0 ping_pong_to_skip = ( header is not None and header.opcode is not None and (header.opcode == FrameHeader.OPCODE_PING or header.opcode == FrameHeader.OPCODE_PONG) and not self.ping_pong_trace_enabled ) if not ping_pong_to_skip and count < 5: # if so many same payloads came in, the trace logging should be skipped. # e.g., after receiving "UNAUTHENTICATED: cache_error", many "opcode: -, payload: " self.logger.debug( "Received a new data frame " f"(session id: {self.session_id}, opcode: {opcode}, payload: {payload})" ) if header is None: # Skip no header message continue # ----------------- # message with opcode if header.opcode == FrameHeader.OPCODE_PING: self.pong(data) elif header.opcode == FrameHeader.OPCODE_PONG: str_message = data.decode("utf-8") elements = str_message.split(":") if len(elements) >= 2: session_id, ping_time = elements[0], elements[1] if self.session_id == session_id: try: self.last_ping_pong_time = float(ping_time) except Exception as e: self.logger.debug( "Failed to parse a pong message " f" (message: {str_message}, error: {e}" ) elif header.opcode == FrameHeader.OPCODE_TEXT: if self.on_message_listener is not None: text = data.decode("utf-8") self.on_message_listener(text) elif header.opcode == FrameHeader.OPCODE_CLOSE: if self.on_close_listener is not None: if len(data) >= 2: (code,) = struct.unpack("!H", data[:2]) reason = data[2:].decode("utf-8") self.on_close_listener(code, reason) else: self.on_close_listener(1005, "") self.disconnect() state.terminated = True else: # Just warn logging opcode = _to_readable_opcode(header.opcode) if header else "-" payload: Union[bytes, str] = data if header.opcode != FrameHeader.OPCODE_BINARY: try: payload = data.decode("utf-8") if data is not None else "" except Exception as e: self.logger.info(f"Failed to convert the data to text {e}") message = ( "Received an unsupported data frame " f"(session id: {self.session_id}, opcode: {opcode}, payload: {payload})" ) self.logger.warning(message) else: time.sleep(0.2) except socket.timeout: time.sleep(0.01) except OSError as e: # getting errno.EBADF and the socket is no longer available if e.errno == 9 and state.terminated: self.logger.debug( "The reason why you got [Errno 9] Bad file descriptor here is " "the socket is no longer available." ) else: if self.on_error_listener is not None: self.on_error_listener(e) else: error_message = "Got an OSError while receiving data" f" (session id: {self.session_id}, error: {e})" if self.trace_enabled: self.logger.exception(error_message) else: self.logger.error(error_message) # As this connection no longer works in any way, terminating it if self.is_active(): try: self.disconnect() except Exception as disconnection_error: error_message = ( "Failed to disconnect" f" (session id: {self.session_id}, error: {disconnection_error})" ) if self.trace_enabled: self.logger.exception(error_message) else: self.logger.error(error_message) state.terminated = True break except Exception as e: if self.on_error_listener is not None: self.on_error_listener(e) else: error_message = "Got an exception while receiving data" f" (session id: {self.session_id}, error: {e})" if self.trace_enabled: self.logger.exception(error_message) else: self.logger.error(error_message) state.terminated = True
def send(self, payload: str) ‑> None
-
Expand source code
def send(self, payload: str) -> None: if self.trace_enabled: if isinstance(payload, bytes): payload = payload.decode("utf-8") self.logger.debug("Sending a text data frame " f"(session id: {self.session_id}, payload: {payload})") data = _build_data_frame_for_sending(payload, FrameHeader.OPCODE_TEXT) with self.sock_send_lock: try: self.sock.send(data) except Exception as e: # In most cases, we want to retry this operation with a newly established connection. # Getting this exception means that this connection has been replaced with a new one # and it's no longer usable. # The SocketModeClient implementation can do one retry when it gets this exception. raise SlackClientNotConnectedError( f"Failed to send a message as the connection is no longer active " f"(session_id: {self.session_id}, error: {e})" )
class ConnectionState
-
Expand source code
class ConnectionState: # The flag supposed to be used for telling SocketModeClient # when this connection is no longer available terminated: bool def __init__(self): self.terminated = False
Class variables
var terminated : bool