Module slack_sdk.oauth.installation_store.amazon_s3

Expand source code
import json
import logging
from logging import Logger
from typing import Optional

from botocore.client import BaseClient

from slack_sdk.errors import SlackClientConfigurationError
from slack_sdk.oauth.installation_store.async_installation_store import (
    AsyncInstallationStore,
)
from slack_sdk.oauth.installation_store.installation_store import InstallationStore
from slack_sdk.oauth.installation_store.models.bot import Bot
from slack_sdk.oauth.installation_store.models.installation import Installation


class AmazonS3InstallationStore(InstallationStore, AsyncInstallationStore):
    def __init__(
        self,
        *,
        s3_client: BaseClient,
        bucket_name: str,
        client_id: str,
        historical_data_enabled: bool = True,
        logger: Logger = logging.getLogger(__name__),
    ):
        self.s3_client = s3_client
        self.bucket_name = bucket_name
        self.historical_data_enabled = historical_data_enabled
        self.client_id = client_id
        self._logger = logger

    @property
    def logger(self) -> Logger:
        if self._logger is None:
            self._logger = logging.getLogger(__name__)
        return self._logger

    async def async_save(self, installation: Installation):
        return self.save(installation)

    async def async_save_bot(self, bot: Bot):
        return self.save_bot(bot)

    def save(self, installation: Installation):
        none = "none"
        e_id = installation.enterprise_id or none
        t_id = installation.team_id or none
        workspace_path = f"{self.client_id}/{e_id}-{t_id}"

        self.save_bot(installation.to_bot())

        if self.historical_data_enabled:
            history_version: str = str(installation.installed_at)

            # per workspace
            entity: str = json.dumps(installation.__dict__)
            response = self.s3_client.put_object(
                Bucket=self.bucket_name,
                Body=entity,
                Key=f"{workspace_path}/installer-latest",
            )
            self.logger.debug(f"S3 put_object response: {response}")
            response = self.s3_client.put_object(
                Bucket=self.bucket_name,
                Body=entity,
                Key=f"{workspace_path}/installer-{history_version}",
            )
            self.logger.debug(f"S3 put_object response: {response}")

            # per workspace per user
            u_id = installation.user_id or none
            entity: str = json.dumps(installation.__dict__)
            response = self.s3_client.put_object(
                Bucket=self.bucket_name,
                Body=entity,
                Key=f"{workspace_path}/installer-{u_id}-latest",
            )
            self.logger.debug(f"S3 put_object response: {response}")
            response = self.s3_client.put_object(
                Bucket=self.bucket_name,
                Body=entity,
                Key=f"{workspace_path}/installer-{u_id}-{history_version}",
            )
            self.logger.debug(f"S3 put_object response: {response}")

        else:
            # per workspace
            entity: str = json.dumps(installation.__dict__)
            response = self.s3_client.put_object(
                Bucket=self.bucket_name,
                Body=entity,
                Key=f"{workspace_path}/installer-latest",
            )
            self.logger.debug(f"S3 put_object response: {response}")

            # per workspace per user
            u_id = installation.user_id or none
            entity: str = json.dumps(installation.__dict__)
            response = self.s3_client.put_object(
                Bucket=self.bucket_name,
                Body=entity,
                Key=f"{workspace_path}/installer-{u_id}-latest",
            )
            self.logger.debug(f"S3 put_object response: {response}")

    def save_bot(self, bot: Bot):
        if bot.bot_token is None:
            self.logger.debug("Skipped saving a new row because of the absense of bot token in it")
            return

        none = "none"
        e_id = bot.enterprise_id or none
        t_id = bot.team_id or none
        workspace_path = f"{self.client_id}/{e_id}-{t_id}"

        if self.historical_data_enabled:
            history_version: str = str(bot.installed_at)
            entity: str = json.dumps(bot.__dict__)
            response = self.s3_client.put_object(
                Bucket=self.bucket_name,
                Body=entity,
                Key=f"{workspace_path}/bot-latest",
            )
            self.logger.debug(f"S3 put_object response: {response}")
            response = self.s3_client.put_object(
                Bucket=self.bucket_name,
                Body=entity,
                Key=f"{workspace_path}/bot-{history_version}",
            )
            self.logger.debug(f"S3 put_object response: {response}")

        else:
            entity: str = json.dumps(bot.__dict__)
            response = self.s3_client.put_object(
                Bucket=self.bucket_name,
                Body=entity,
                Key=f"{workspace_path}/bot-latest",
            )
            self.logger.debug(f"S3 put_object response: {response}")

    async def async_find_bot(
        self,
        *,
        enterprise_id: Optional[str],
        team_id: Optional[str],
        is_enterprise_install: Optional[bool] = False,
    ) -> Optional[Bot]:
        return self.find_bot(
            enterprise_id=enterprise_id,
            team_id=team_id,
            is_enterprise_install=is_enterprise_install,
        )

    def find_bot(
        self,
        *,
        enterprise_id: Optional[str],
        team_id: Optional[str],
        is_enterprise_install: Optional[bool] = False,
    ) -> Optional[Bot]:
        none = "none"
        e_id = enterprise_id or none
        t_id = team_id or none
        if is_enterprise_install:
            t_id = none
        workspace_path = f"{self.client_id}/{e_id}-{t_id}"
        try:
            fetch_response = self.s3_client.get_object(
                Bucket=self.bucket_name,
                Key=f"{workspace_path}/bot-latest",
            )
            self.logger.debug(f"S3 get_object response: {fetch_response}")
            body = fetch_response["Body"].read().decode("utf-8")
            data = json.loads(body)
            return Bot(**data)
        except Exception as e:  # skipcq: PYL-W0703
            message = f"Failed to find bot installation data for enterprise: {e_id}, team: {t_id}: {e}"
            self.logger.warning(message)
            return None

    async def async_find_installation(
        self,
        *,
        enterprise_id: Optional[str],
        team_id: Optional[str],
        user_id: Optional[str] = None,
        is_enterprise_install: Optional[bool] = False,
    ) -> Optional[Installation]:
        return self.find_installation(
            enterprise_id=enterprise_id,
            team_id=team_id,
            user_id=user_id,
            is_enterprise_install=is_enterprise_install,
        )

    def find_installation(
        self,
        *,
        enterprise_id: Optional[str],
        team_id: Optional[str],
        user_id: Optional[str] = None,
        is_enterprise_install: Optional[bool] = False,
    ) -> Optional[Installation]:
        none = "none"
        e_id = enterprise_id or none
        t_id = team_id or none
        if is_enterprise_install:
            t_id = none
        workspace_path = f"{self.client_id}/{e_id}-{t_id}"
        try:
            key = f"{workspace_path}/installer-{user_id}-latest" if user_id else f"{workspace_path}/installer-latest"
            fetch_response = self.s3_client.get_object(
                Bucket=self.bucket_name,
                Key=key,
            )
            self.logger.debug(f"S3 get_object response: {fetch_response}")
            body = fetch_response["Body"].read().decode("utf-8")
            data = json.loads(body)
            installation = Installation(**data)

            has_user_installation = user_id is not None and installation is not None
            no_bot_token_installation = installation is not None and installation.bot_token is None
            should_find_bot_installation = has_user_installation or no_bot_token_installation
            if should_find_bot_installation:
                # Retrieve the latest bot token, just in case
                # See also: https://github.com/slackapi/bolt-python/issues/664
                latest_bot_installation = self.find_bot(
                    enterprise_id=enterprise_id,
                    team_id=team_id,
                    is_enterprise_install=is_enterprise_install,
                )
                if latest_bot_installation is not None and installation.bot_token != latest_bot_installation.bot_token:
                    # NOTE: this logic is based on the assumption that every single installation has bot scopes
                    # If you need to installation patterns without bot scopes in the same S3 bucket,
                    # please fork this code and implement your own logic.
                    installation.bot_id = latest_bot_installation.bot_id
                    installation.bot_user_id = latest_bot_installation.bot_user_id
                    installation.bot_token = latest_bot_installation.bot_token
                    installation.bot_scopes = latest_bot_installation.bot_scopes
                    installation.bot_refresh_token = latest_bot_installation.bot_refresh_token
                    installation.bot_token_expires_at = latest_bot_installation.bot_token_expires_at

            return installation

        except Exception as e:  # skipcq: PYL-W0703
            message = f"Failed to find an installation data for enterprise: {e_id}, team: {t_id}: {e}"
            self.logger.warning(message)
            return None

    async def async_delete_bot(self, *, enterprise_id: Optional[str], team_id: Optional[str]) -> None:
        return self.delete_bot(
            enterprise_id=enterprise_id,
            team_id=team_id,
        )

    def delete_bot(self, *, enterprise_id: Optional[str], team_id: Optional[str]) -> None:
        none = "none"
        e_id = enterprise_id or none
        t_id = team_id or none
        workspace_path = f"{self.client_id}/{e_id}-{t_id}"
        objects = self.s3_client.list_objects(
            Bucket=self.bucket_name,
            Prefix=f"{workspace_path}/bot-",
        )
        for content in objects.get("Contents", []):
            key = content.get("Key")
            if key is not None:
                self.logger.info(f"Going to delete bot installation ({key})")
                try:
                    self.s3_client.delete_object(
                        Bucket=self.bucket_name,
                        Key=content.get("Key"),
                    )
                except Exception as e:  # skipcq: PYL-W0703
                    message = f"Failed to find bot installation data for enterprise: {e_id}, team: {t_id}: {e}"
                    raise SlackClientConfigurationError(message)

    async def async_delete_installation(
        self,
        *,
        enterprise_id: Optional[str],
        team_id: Optional[str],
        user_id: Optional[str] = None,
    ) -> None:
        return self.delete_installation(
            enterprise_id=enterprise_id,
            team_id=team_id,
            user_id=user_id,
        )

    def delete_installation(
        self,
        *,
        enterprise_id: Optional[str],
        team_id: Optional[str],
        user_id: Optional[str] = None,
    ) -> None:
        none = "none"
        e_id = enterprise_id or none
        t_id = team_id or none
        workspace_path = f"{self.client_id}/{e_id}-{t_id}"
        objects = self.s3_client.list_objects(
            Bucket=self.bucket_name,
            Prefix=f"{workspace_path}/installer-{user_id or ''}",
        )
        deleted_keys = []
        for content in objects.get("Contents", []):
            key = content.get("Key")
            if key is not None:
                self.logger.info(f"Going to delete installation ({key})")
                try:
                    self.s3_client.delete_object(
                        Bucket=self.bucket_name,
                        Key=key,
                    )
                    deleted_keys.append(key)
                except Exception as e:  # skipcq: PYL-W0703
                    message = f"Failed to find bot installation data for enterprise: {e_id}, team: {t_id}: {e}"
                    raise SlackClientConfigurationError(message)

                try:
                    no_user_id_key = key.replace(f"-{user_id}", "")
                    if not no_user_id_key.endswith("installer-latest"):
                        self.s3_client.delete_object(
                            Bucket=self.bucket_name,
                            Key=no_user_id_key,
                        )
                        deleted_keys.append(no_user_id_key)
                except Exception as e:  # skipcq: PYL-W0703
                    message = f"Failed to find bot installation data for enterprise: {e_id}, team: {t_id}: {e}"
                    raise SlackClientConfigurationError(message)

        # Check the remaining installation data
        objects = self.s3_client.list_objects(
            Bucket=self.bucket_name,
            Prefix=f"{workspace_path}/installer-",
            MaxKeys=10,  # the small number would be enough for this purpose
        )
        keys = [c.get("Key") for c in objects.get("Contents", []) if c.get("Key") not in deleted_keys]
        # If only installer-latest remains, we should delete the one as well
        if len(keys) == 1 and keys[0].endswith("installer-latest"):
            content = objects.get("Contents", [])[0]
            try:
                self.s3_client.delete_object(
                    Bucket=self.bucket_name,
                    Key=content.get("Key"),
                )
            except Exception as e:  # skipcq: PYL-W0703
                message = f"Failed to find bot installation data for enterprise: {e_id}, team: {t_id}: {e}"
                raise SlackClientConfigurationError(message)

Classes

class AmazonS3InstallationStore (*, s3_client: botocore.client.BaseClient, bucket_name: str, client_id: str, historical_data_enabled: bool = True, logger: logging.Logger = <Logger slack_sdk.oauth.installation_store.amazon_s3 (WARNING)>)

The installation store interface.

The minimum required methods are:

  • save(installation)
  • find_installation(enterprise_id, team_id, user_id, is_enterprise_install)

If you would like to properly handle app uninstallations and token revocations, the following methods should be implemented.

  • delete_installation(enterprise_id, team_id, user_id)
  • delete_all(enterprise_id, team_id)

If your app needs only bot scope installations, the simpler way to implement would be:

  • save(installation)
  • find_bot(enterprise_id, team_id, is_enterprise_install)
  • delete_bot(enterprise_id, team_id)
  • delete_all(enterprise_id, team_id)
Expand source code
class AmazonS3InstallationStore(InstallationStore, AsyncInstallationStore):
    def __init__(
        self,
        *,
        s3_client: BaseClient,
        bucket_name: str,
        client_id: str,
        historical_data_enabled: bool = True,
        logger: Logger = logging.getLogger(__name__),
    ):
        self.s3_client = s3_client
        self.bucket_name = bucket_name
        self.historical_data_enabled = historical_data_enabled
        self.client_id = client_id
        self._logger = logger

    @property
    def logger(self) -> Logger:
        if self._logger is None:
            self._logger = logging.getLogger(__name__)
        return self._logger

    async def async_save(self, installation: Installation):
        return self.save(installation)

    async def async_save_bot(self, bot: Bot):
        return self.save_bot(bot)

    def save(self, installation: Installation):
        none = "none"
        e_id = installation.enterprise_id or none
        t_id = installation.team_id or none
        workspace_path = f"{self.client_id}/{e_id}-{t_id}"

        self.save_bot(installation.to_bot())

        if self.historical_data_enabled:
            history_version: str = str(installation.installed_at)

            # per workspace
            entity: str = json.dumps(installation.__dict__)
            response = self.s3_client.put_object(
                Bucket=self.bucket_name,
                Body=entity,
                Key=f"{workspace_path}/installer-latest",
            )
            self.logger.debug(f"S3 put_object response: {response}")
            response = self.s3_client.put_object(
                Bucket=self.bucket_name,
                Body=entity,
                Key=f"{workspace_path}/installer-{history_version}",
            )
            self.logger.debug(f"S3 put_object response: {response}")

            # per workspace per user
            u_id = installation.user_id or none
            entity: str = json.dumps(installation.__dict__)
            response = self.s3_client.put_object(
                Bucket=self.bucket_name,
                Body=entity,
                Key=f"{workspace_path}/installer-{u_id}-latest",
            )
            self.logger.debug(f"S3 put_object response: {response}")
            response = self.s3_client.put_object(
                Bucket=self.bucket_name,
                Body=entity,
                Key=f"{workspace_path}/installer-{u_id}-{history_version}",
            )
            self.logger.debug(f"S3 put_object response: {response}")

        else:
            # per workspace
            entity: str = json.dumps(installation.__dict__)
            response = self.s3_client.put_object(
                Bucket=self.bucket_name,
                Body=entity,
                Key=f"{workspace_path}/installer-latest",
            )
            self.logger.debug(f"S3 put_object response: {response}")

            # per workspace per user
            u_id = installation.user_id or none
            entity: str = json.dumps(installation.__dict__)
            response = self.s3_client.put_object(
                Bucket=self.bucket_name,
                Body=entity,
                Key=f"{workspace_path}/installer-{u_id}-latest",
            )
            self.logger.debug(f"S3 put_object response: {response}")

    def save_bot(self, bot: Bot):
        if bot.bot_token is None:
            self.logger.debug("Skipped saving a new row because of the absense of bot token in it")
            return

        none = "none"
        e_id = bot.enterprise_id or none
        t_id = bot.team_id or none
        workspace_path = f"{self.client_id}/{e_id}-{t_id}"

        if self.historical_data_enabled:
            history_version: str = str(bot.installed_at)
            entity: str = json.dumps(bot.__dict__)
            response = self.s3_client.put_object(
                Bucket=self.bucket_name,
                Body=entity,
                Key=f"{workspace_path}/bot-latest",
            )
            self.logger.debug(f"S3 put_object response: {response}")
            response = self.s3_client.put_object(
                Bucket=self.bucket_name,
                Body=entity,
                Key=f"{workspace_path}/bot-{history_version}",
            )
            self.logger.debug(f"S3 put_object response: {response}")

        else:
            entity: str = json.dumps(bot.__dict__)
            response = self.s3_client.put_object(
                Bucket=self.bucket_name,
                Body=entity,
                Key=f"{workspace_path}/bot-latest",
            )
            self.logger.debug(f"S3 put_object response: {response}")

    async def async_find_bot(
        self,
        *,
        enterprise_id: Optional[str],
        team_id: Optional[str],
        is_enterprise_install: Optional[bool] = False,
    ) -> Optional[Bot]:
        return self.find_bot(
            enterprise_id=enterprise_id,
            team_id=team_id,
            is_enterprise_install=is_enterprise_install,
        )

    def find_bot(
        self,
        *,
        enterprise_id: Optional[str],
        team_id: Optional[str],
        is_enterprise_install: Optional[bool] = False,
    ) -> Optional[Bot]:
        none = "none"
        e_id = enterprise_id or none
        t_id = team_id or none
        if is_enterprise_install:
            t_id = none
        workspace_path = f"{self.client_id}/{e_id}-{t_id}"
        try:
            fetch_response = self.s3_client.get_object(
                Bucket=self.bucket_name,
                Key=f"{workspace_path}/bot-latest",
            )
            self.logger.debug(f"S3 get_object response: {fetch_response}")
            body = fetch_response["Body"].read().decode("utf-8")
            data = json.loads(body)
            return Bot(**data)
        except Exception as e:  # skipcq: PYL-W0703
            message = f"Failed to find bot installation data for enterprise: {e_id}, team: {t_id}: {e}"
            self.logger.warning(message)
            return None

    async def async_find_installation(
        self,
        *,
        enterprise_id: Optional[str],
        team_id: Optional[str],
        user_id: Optional[str] = None,
        is_enterprise_install: Optional[bool] = False,
    ) -> Optional[Installation]:
        return self.find_installation(
            enterprise_id=enterprise_id,
            team_id=team_id,
            user_id=user_id,
            is_enterprise_install=is_enterprise_install,
        )

    def find_installation(
        self,
        *,
        enterprise_id: Optional[str],
        team_id: Optional[str],
        user_id: Optional[str] = None,
        is_enterprise_install: Optional[bool] = False,
    ) -> Optional[Installation]:
        none = "none"
        e_id = enterprise_id or none
        t_id = team_id or none
        if is_enterprise_install:
            t_id = none
        workspace_path = f"{self.client_id}/{e_id}-{t_id}"
        try:
            key = f"{workspace_path}/installer-{user_id}-latest" if user_id else f"{workspace_path}/installer-latest"
            fetch_response = self.s3_client.get_object(
                Bucket=self.bucket_name,
                Key=key,
            )
            self.logger.debug(f"S3 get_object response: {fetch_response}")
            body = fetch_response["Body"].read().decode("utf-8")
            data = json.loads(body)
            installation = Installation(**data)

            has_user_installation = user_id is not None and installation is not None
            no_bot_token_installation = installation is not None and installation.bot_token is None
            should_find_bot_installation = has_user_installation or no_bot_token_installation
            if should_find_bot_installation:
                # Retrieve the latest bot token, just in case
                # See also: https://github.com/slackapi/bolt-python/issues/664
                latest_bot_installation = self.find_bot(
                    enterprise_id=enterprise_id,
                    team_id=team_id,
                    is_enterprise_install=is_enterprise_install,
                )
                if latest_bot_installation is not None and installation.bot_token != latest_bot_installation.bot_token:
                    # NOTE: this logic is based on the assumption that every single installation has bot scopes
                    # If you need to installation patterns without bot scopes in the same S3 bucket,
                    # please fork this code and implement your own logic.
                    installation.bot_id = latest_bot_installation.bot_id
                    installation.bot_user_id = latest_bot_installation.bot_user_id
                    installation.bot_token = latest_bot_installation.bot_token
                    installation.bot_scopes = latest_bot_installation.bot_scopes
                    installation.bot_refresh_token = latest_bot_installation.bot_refresh_token
                    installation.bot_token_expires_at = latest_bot_installation.bot_token_expires_at

            return installation

        except Exception as e:  # skipcq: PYL-W0703
            message = f"Failed to find an installation data for enterprise: {e_id}, team: {t_id}: {e}"
            self.logger.warning(message)
            return None

    async def async_delete_bot(self, *, enterprise_id: Optional[str], team_id: Optional[str]) -> None:
        return self.delete_bot(
            enterprise_id=enterprise_id,
            team_id=team_id,
        )

    def delete_bot(self, *, enterprise_id: Optional[str], team_id: Optional[str]) -> None:
        none = "none"
        e_id = enterprise_id or none
        t_id = team_id or none
        workspace_path = f"{self.client_id}/{e_id}-{t_id}"
        objects = self.s3_client.list_objects(
            Bucket=self.bucket_name,
            Prefix=f"{workspace_path}/bot-",
        )
        for content in objects.get("Contents", []):
            key = content.get("Key")
            if key is not None:
                self.logger.info(f"Going to delete bot installation ({key})")
                try:
                    self.s3_client.delete_object(
                        Bucket=self.bucket_name,
                        Key=content.get("Key"),
                    )
                except Exception as e:  # skipcq: PYL-W0703
                    message = f"Failed to find bot installation data for enterprise: {e_id}, team: {t_id}: {e}"
                    raise SlackClientConfigurationError(message)

    async def async_delete_installation(
        self,
        *,
        enterprise_id: Optional[str],
        team_id: Optional[str],
        user_id: Optional[str] = None,
    ) -> None:
        return self.delete_installation(
            enterprise_id=enterprise_id,
            team_id=team_id,
            user_id=user_id,
        )

    def delete_installation(
        self,
        *,
        enterprise_id: Optional[str],
        team_id: Optional[str],
        user_id: Optional[str] = None,
    ) -> None:
        none = "none"
        e_id = enterprise_id or none
        t_id = team_id or none
        workspace_path = f"{self.client_id}/{e_id}-{t_id}"
        objects = self.s3_client.list_objects(
            Bucket=self.bucket_name,
            Prefix=f"{workspace_path}/installer-{user_id or ''}",
        )
        deleted_keys = []
        for content in objects.get("Contents", []):
            key = content.get("Key")
            if key is not None:
                self.logger.info(f"Going to delete installation ({key})")
                try:
                    self.s3_client.delete_object(
                        Bucket=self.bucket_name,
                        Key=key,
                    )
                    deleted_keys.append(key)
                except Exception as e:  # skipcq: PYL-W0703
                    message = f"Failed to find bot installation data for enterprise: {e_id}, team: {t_id}: {e}"
                    raise SlackClientConfigurationError(message)

                try:
                    no_user_id_key = key.replace(f"-{user_id}", "")
                    if not no_user_id_key.endswith("installer-latest"):
                        self.s3_client.delete_object(
                            Bucket=self.bucket_name,
                            Key=no_user_id_key,
                        )
                        deleted_keys.append(no_user_id_key)
                except Exception as e:  # skipcq: PYL-W0703
                    message = f"Failed to find bot installation data for enterprise: {e_id}, team: {t_id}: {e}"
                    raise SlackClientConfigurationError(message)

        # Check the remaining installation data
        objects = self.s3_client.list_objects(
            Bucket=self.bucket_name,
            Prefix=f"{workspace_path}/installer-",
            MaxKeys=10,  # the small number would be enough for this purpose
        )
        keys = [c.get("Key") for c in objects.get("Contents", []) if c.get("Key") not in deleted_keys]
        # If only installer-latest remains, we should delete the one as well
        if len(keys) == 1 and keys[0].endswith("installer-latest"):
            content = objects.get("Contents", [])[0]
            try:
                self.s3_client.delete_object(
                    Bucket=self.bucket_name,
                    Key=content.get("Key"),
                )
            except Exception as e:  # skipcq: PYL-W0703
                message = f"Failed to find bot installation data for enterprise: {e_id}, team: {t_id}: {e}"
                raise SlackClientConfigurationError(message)

Ancestors

Instance variables

var logger : logging.Logger
Expand source code
@property
def logger(self) -> Logger:
    if self._logger is None:
        self._logger = logging.getLogger(__name__)
    return self._logger

Inherited members