perspective.handlers.starlette
1# ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ 2# ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃ 3# ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃ 4# ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃ 5# ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃ 6# ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫ 7# ┃ Copyright (c) 2017, the Perspective Authors. ┃ 8# ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃ 9# ┃ This file is part of the Perspective library, distributed under the terms ┃ 10# ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃ 11# ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ 12 13import asyncio 14import perspective 15 16 17class PerspectiveStarletteHandler(object): 18 """`PerspectiveStarletteHandler` is a drop-in implementation of Perspective. 19 20 # Examples 21 22 >>> server = Server() 23 >>> client = server.client() 24 >>> client.table(pd.read_csv("superstore.csv"), name="data_source_one") 25 >>> app = FastAPI() 26 >>> async def endpoint(websocket: Websocket): 27 ... handler = PerspectiveStarletteHandler(server, websocket) 28 ... await handler.run() 29 ... app.add_api_websocket_route('/websocket', endpoint) 30 """ 31 32 def __init__(self, **kwargs): 33 self._server = kwargs.pop("perspective_server", perspective.GLOBAL_SERVER) 34 self._websocket = kwargs.pop("websocket") 35 self._executor = kwargs.pop("executor", None) 36 self._loop = kwargs.pop("loop", asyncio.get_event_loop()) 37 super().__init__(**kwargs) 38 39 async def run(self) -> None: 40 def inner(msg): 41 self._loop.create_task(self._websocket.send_bytes(msg)) 42 43 self.session = self._server.new_session(inner) 44 45 try: 46 await self._websocket.accept() 47 while True: 48 message = await self._websocket.receive() 49 self._websocket._raise_on_disconnect(message) 50 if self._executor is not None: 51 self._executor.submit(self.session.handle_request, message["bytes"]) 52 else: 53 self.session.handle_request(message["bytes"]) 54 finally: 55 self.session.close()
class
PerspectiveStarletteHandler:
18class PerspectiveStarletteHandler(object): 19 """`PerspectiveStarletteHandler` is a drop-in implementation of Perspective. 20 21 # Examples 22 23 >>> server = Server() 24 >>> client = server.client() 25 >>> client.table(pd.read_csv("superstore.csv"), name="data_source_one") 26 >>> app = FastAPI() 27 >>> async def endpoint(websocket: Websocket): 28 ... handler = PerspectiveStarletteHandler(server, websocket) 29 ... await handler.run() 30 ... app.add_api_websocket_route('/websocket', endpoint) 31 """ 32 33 def __init__(self, **kwargs): 34 self._server = kwargs.pop("perspective_server", perspective.GLOBAL_SERVER) 35 self._websocket = kwargs.pop("websocket") 36 self._executor = kwargs.pop("executor", None) 37 self._loop = kwargs.pop("loop", asyncio.get_event_loop()) 38 super().__init__(**kwargs) 39 40 async def run(self) -> None: 41 def inner(msg): 42 self._loop.create_task(self._websocket.send_bytes(msg)) 43 44 self.session = self._server.new_session(inner) 45 46 try: 47 await self._websocket.accept() 48 while True: 49 message = await self._websocket.receive() 50 self._websocket._raise_on_disconnect(message) 51 if self._executor is not None: 52 self._executor.submit(self.session.handle_request, message["bytes"]) 53 else: 54 self.session.handle_request(message["bytes"]) 55 finally: 56 self.session.close()
PerspectiveStarletteHandler
is a drop-in implementation of Perspective.
Examples
>>> server = Server()
>>> client = server.client()
>>> client.table(pd.read_csv("superstore.csv"), name="data_source_one")
>>> app = FastAPI()
>>> async def endpoint(websocket: Websocket):
... handler = PerspectiveStarletteHandler(server, websocket)
... await handler.run()
... app.add_api_websocket_route('/websocket', endpoint)
async def
run(self) -> None:
40 async def run(self) -> None: 41 def inner(msg): 42 self._loop.create_task(self._websocket.send_bytes(msg)) 43 44 self.session = self._server.new_session(inner) 45 46 try: 47 await self._websocket.accept() 48 while True: 49 message = await self._websocket.receive() 50 self._websocket._raise_on_disconnect(message) 51 if self._executor is not None: 52 self._executor.submit(self.session.handle_request, message["bytes"]) 53 else: 54 self.session.handle_request(message["bytes"]) 55 finally: 56 self.session.close()