简介

Starlette是一个轻量的基于ASGI协议的web框架, 一个简单app如下:

1
2
3
4
5
6
7
8
async def app(scope, recieve, send):
    assert scope["type"] == "http"
    body = b"hello: "
    request = Request(scope, recieve)
    async for chunk in request.stream():
        body += chunk
    response = Response(body)
    await response(scope, recieve, send)

部分模块

aplications

  • Starlette: app class, 设置routes/middleware等,实现call(scope, receive, send)

routing

routing是按导入顺序依次查找,有多个匹配时优先第一个匹配的

  • Route: 路由基本单位,记录了path、endpoint之间的映射关系
    • matches: 校验是否有与path匹配的endpoint
    • ulr_path_for: 通过end_point name找到对应的url
    • handle: call endpoint
  • Mount: 一组带path prefix的Route,类似flask中的blueprint
  • Routers: 所有Route集合,按顺序依次去match path, 另外执行starup/shotdown hook

endpoints

  • HTTPEndpoint: 类方法的形式去handler具体method
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
class HTTPEndpoint:
    def __init__(self, scope: Scope, receive: Receive, send: Send) -> None:
        assert scope["type"] == "http"
        self.scope = scope
        self.receive = receive
        self.send = send

    def __await__(self) -> typing.Generator:
        return self.dispatch().__await__()

    async def dispatch(self) -> None:
        request = Request(self.scope, receive=self.receive)
        handler_name = "get" if request.method == "HEAD" else request.method.lower()
        handler = getattr(self, handler_name, self.method_not_allowed)
        is_async = asyncio.iscoroutinefunction(handler)
        if is_async:
            response = await handler(request)
        else:
            response = await run_in_threadpool(handler, request)
        await response(self.scope, self.receive, self.send)
  • endpoint func 转app
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def request_response(func: typing.Callable) -> ASGIApp:
    """
    Takes a function or coroutine `func(request) -> response`,
    and returns an ASGI application.
    """
    is_coroutine = asyncio.iscoroutinefunction(func)

    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        request = Request(scope, receive=receive, send=send)
        if is_coroutine:
            response = await func(request)
        else:
            response = await run_in_threadpool(func, request)
        await response(scope, receive, send)

    return app

middleware

app middlewares是先进后出的栈

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
def build_middleware_stack(self) -> ASGIApp:
    middleware = (
        [Middleware(ServerErrorMiddleware, handler=error_handler, debug=debug,)]
        + self.user_middleware
        + [
            Middleware(
                ExceptionMiddleware, handlers=exception_handlers, debug=debug,
            )
        ]
    )

    app = self.router
    for cls, options in reversed(middleware):
        app = cls(app=app, **options)
    return app
官方middleware
  • AuthenticationMiddleware:通过识别scope中的如cokkie等信息校验
  • CORSMiddleware: cross origin source share
  • GZipMiddleware: gzip压缩response
  • SessionMiddleware: Set-Cokkie
  • HTTPSRedirectMiddleware: http请求跳转到https
  • TrustedHostMiddleware: 限制host
打印耗时middleware demo
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import time
class Timer:
    def __init__(self, app):
        self.app = app

    async def __call__(self, scope, receive, send):
        start = time.time()
        http_status_code = "Unknown"

        async def send_wrapper(message):
            nonlocal http_status_code
            if message["type"] == "http.response.start":
                http_status_code = message["status"]
            await send(message)

        await self.app(scope, receive, send_wrapper)
        print(f"timer: {scope['method']} - {scope['path']} - {http_status_code} - {time.time() - start}")

requests

Request: 解析scope和receive到的数据

1
2
3
4
5
class Request(HTTPConnection):
    def __init__(
        self, scope: Scope, receive: Receive = empty_receive, send: Send = empty_send
    ):
        pass

responses

设置headers, call时send content

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class Response:
    media_type = None
    charset = "utf-8"

    def __init__(
        self,
        content: typing.Any = None,
        status_code: int = 200,
        headers: dict = None,
        media_type: str = None,
        background: BackgroundTask = None,
    ) -> None:
        self.body = self.render(content)
        self.status_code = status_code
        if media_type is not None:
            self.media_type = media_type
        self.background = background
        self.init_headers(headers)
        
    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        await send(
            {
                "type": "http.response.start",
                "status": self.status_code,
                "headers": self.raw_headers,
            }
        )
        await send({"type": "http.response.body", "body": self.body})

        if self.background is not None:
            await self.background()
  • Response: 基类,render body, init headers, set cookie, call(scope, receice, send)发送header和body
  • JSONResponse: render body 时使用json.dumps
  • RedirectResponse: header中设置location
  • StreamingResponse: chunked body
  • FileResponse

config

  • Config: 从环境变量和配置文件加载变量,优先使用环境变量

tests

  • TestClient 基于requests.Session做了适配,将异步操作放入用loop.run_until_complete去完成

参考