perspective.handlers.starlette

 1#  ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
 2#  ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
 3#  ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
 4#  ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
 5#  ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
 6#  ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
 7#  ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
 8#  ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
 9#  ┃ This file is part of the Perspective library, distributed under the terms ┃
10#  ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11#  ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13import asyncio
14import perspective
15
16
17class PerspectiveStarletteHandler(object):
18    """`PerspectiveStarletteHandler` is a drop-in implementation of Perspective.
19
20    # Examples
21
22    >>> server = Server()
23    >>> client = server.client()
24    >>> client.table(pd.read_csv("superstore.csv"), name="data_source_one")
25    >>> app = FastAPI()
26    >>> async def endpoint(websocket: Websocket):
27    ...     handler = PerspectiveStarletteHandler(server, websocket)
28    ...     await handler.run()
29    ... app.add_api_websocket_route('/websocket', endpoint)
30    """
31
32    def __init__(self, **kwargs):
33        self._server = kwargs.pop("perspective_server", perspective.GLOBAL_SERVER)
34        self._websocket = kwargs.pop("websocket")
35        self._executor = kwargs.pop("executor", None)
36        self._loop = kwargs.pop("loop", asyncio.get_event_loop())
37        super().__init__(**kwargs)
38
39    async def run(self) -> None:
40        def inner(msg):
41            self._loop.create_task(self._websocket.send_bytes(msg))
42
43        self.session = self._server.new_session(inner)
44
45        try:
46            await self._websocket.accept()
47            while True:
48                message = await self._websocket.receive()
49                self._websocket._raise_on_disconnect(message)
50                if self._executor is not None:
51                    self._executor.submit(self.session.handle_request, message["bytes"])
52                else:
53                    self.session.handle_request(message["bytes"])
54        finally:
55            self.session.close()
class PerspectiveStarletteHandler:
18class PerspectiveStarletteHandler(object):
19    """`PerspectiveStarletteHandler` is a drop-in implementation of Perspective.
20
21    # Examples
22
23    >>> server = Server()
24    >>> client = server.client()
25    >>> client.table(pd.read_csv("superstore.csv"), name="data_source_one")
26    >>> app = FastAPI()
27    >>> async def endpoint(websocket: Websocket):
28    ...     handler = PerspectiveStarletteHandler(server, websocket)
29    ...     await handler.run()
30    ... app.add_api_websocket_route('/websocket', endpoint)
31    """
32
33    def __init__(self, **kwargs):
34        self._server = kwargs.pop("perspective_server", perspective.GLOBAL_SERVER)
35        self._websocket = kwargs.pop("websocket")
36        self._executor = kwargs.pop("executor", None)
37        self._loop = kwargs.pop("loop", asyncio.get_event_loop())
38        super().__init__(**kwargs)
39
40    async def run(self) -> None:
41        def inner(msg):
42            self._loop.create_task(self._websocket.send_bytes(msg))
43
44        self.session = self._server.new_session(inner)
45
46        try:
47            await self._websocket.accept()
48            while True:
49                message = await self._websocket.receive()
50                self._websocket._raise_on_disconnect(message)
51                if self._executor is not None:
52                    self._executor.submit(self.session.handle_request, message["bytes"])
53                else:
54                    self.session.handle_request(message["bytes"])
55        finally:
56            self.session.close()

PerspectiveStarletteHandler is a drop-in implementation of Perspective.

Examples

>>> server = Server()
>>> client = server.client()
>>> client.table(pd.read_csv("superstore.csv"), name="data_source_one")
>>> app = FastAPI()
>>> async def endpoint(websocket: Websocket):
...     handler = PerspectiveStarletteHandler(server, websocket)
...     await handler.run()
... app.add_api_websocket_route('/websocket', endpoint)
PerspectiveStarletteHandler(**kwargs)
33    def __init__(self, **kwargs):
34        self._server = kwargs.pop("perspective_server", perspective.GLOBAL_SERVER)
35        self._websocket = kwargs.pop("websocket")
36        self._executor = kwargs.pop("executor", None)
37        self._loop = kwargs.pop("loop", asyncio.get_event_loop())
38        super().__init__(**kwargs)
async def run(self) -> None:
40    async def run(self) -> None:
41        def inner(msg):
42            self._loop.create_task(self._websocket.send_bytes(msg))
43
44        self.session = self._server.new_session(inner)
45
46        try:
47            await self._websocket.accept()
48            while True:
49                message = await self._websocket.receive()
50                self._websocket._raise_on_disconnect(message)
51                if self._executor is not None:
52                    self._executor.submit(self.session.handle_request, message["bytes"])
53                else:
54                    self.session.handle_request(message["bytes"])
55        finally:
56            self.session.close()