Module slack_bolt.adapter.tornado.async_handler
Expand source code
from tornado.httputil import HTTPServerRequest
from tornado.web import RequestHandler
from slack_bolt.async_app import AsyncApp
from slack_bolt.oauth.async_oauth_flow import AsyncOAuthFlow
from slack_bolt.request.async_request import AsyncBoltRequest
from slack_bolt.response import BoltResponse
from .handler import set_response
class AsyncSlackEventsHandler(RequestHandler):
def initialize(self, app: AsyncApp): # type: ignore
self.app = app
async def post(self):
bolt_resp: BoltResponse = await self.app.async_dispatch(to_async_bolt_request(self.request))
set_response(self, bolt_resp)
return
class AsyncSlackOAuthHandler(RequestHandler):
def initialize(self, app: AsyncApp): # type: ignore
self.app = app
async def get(self):
if self.app.oauth_flow is not None: # type: ignore
oauth_flow: AsyncOAuthFlow = self.app.oauth_flow # type: ignore
if self.request.path == oauth_flow.install_path:
bolt_resp = await oauth_flow.handle_installation(to_async_bolt_request(self.request))
set_response(self, bolt_resp)
return
elif self.request.path == oauth_flow.redirect_uri_path:
bolt_resp = await oauth_flow.handle_callback(to_async_bolt_request(self.request))
set_response(self, bolt_resp)
return
self.set_status(404)
def to_async_bolt_request(req: HTTPServerRequest) -> AsyncBoltRequest:
return AsyncBoltRequest(
body=req.body.decode("utf-8") if req.body else "",
query=req.query,
headers=req.headers,
)
Functions
def to_async_bolt_request(req: tornado.httputil.HTTPServerRequest) ‑> AsyncBoltRequest
-
Expand source code
def to_async_bolt_request(req: HTTPServerRequest) -> AsyncBoltRequest: return AsyncBoltRequest( body=req.body.decode("utf-8") if req.body else "", query=req.query, headers=req.headers, )
Classes
class AsyncSlackEventsHandler (application: Application, request: tornado.httputil.HTTPServerRequest, **kwargs: Any)
-
Base class for HTTP request handlers.
Subclasses must define at least one of the methods defined in the "Entry points" section below.
Applications should not construct
RequestHandler
objects directly and subclasses should not override__init__
(override~RequestHandler.initialize
instead).Expand source code
class AsyncSlackEventsHandler(RequestHandler): def initialize(self, app: AsyncApp): # type: ignore self.app = app async def post(self): bolt_resp: BoltResponse = await self.app.async_dispatch(to_async_bolt_request(self.request)) set_response(self, bolt_resp) return
Ancestors
- tornado.web.RequestHandler
Methods
def initialize(self, app: AsyncApp)
-
Expand source code
def initialize(self, app: AsyncApp): # type: ignore self.app = app
async def post(self)
-
Expand source code
async def post(self): bolt_resp: BoltResponse = await self.app.async_dispatch(to_async_bolt_request(self.request)) set_response(self, bolt_resp) return
class AsyncSlackOAuthHandler (application: Application, request: tornado.httputil.HTTPServerRequest, **kwargs: Any)
-
Base class for HTTP request handlers.
Subclasses must define at least one of the methods defined in the "Entry points" section below.
Applications should not construct
RequestHandler
objects directly and subclasses should not override__init__
(override~RequestHandler.initialize
instead).Expand source code
class AsyncSlackOAuthHandler(RequestHandler): def initialize(self, app: AsyncApp): # type: ignore self.app = app async def get(self): if self.app.oauth_flow is not None: # type: ignore oauth_flow: AsyncOAuthFlow = self.app.oauth_flow # type: ignore if self.request.path == oauth_flow.install_path: bolt_resp = await oauth_flow.handle_installation(to_async_bolt_request(self.request)) set_response(self, bolt_resp) return elif self.request.path == oauth_flow.redirect_uri_path: bolt_resp = await oauth_flow.handle_callback(to_async_bolt_request(self.request)) set_response(self, bolt_resp) return self.set_status(404)
Ancestors
- tornado.web.RequestHandler
Methods
async def get(self)
-
Expand source code
async def get(self): if self.app.oauth_flow is not None: # type: ignore oauth_flow: AsyncOAuthFlow = self.app.oauth_flow # type: ignore if self.request.path == oauth_flow.install_path: bolt_resp = await oauth_flow.handle_installation(to_async_bolt_request(self.request)) set_response(self, bolt_resp) return elif self.request.path == oauth_flow.redirect_uri_path: bolt_resp = await oauth_flow.handle_callback(to_async_bolt_request(self.request)) set_response(self, bolt_resp) return self.set_status(404)
def initialize(self, app: AsyncApp)
-
Expand source code
def initialize(self, app: AsyncApp): # type: ignore self.app = app