FastAPI Integration
TensorShare provides built-in integration capabilities with FastAPI
,
a high-performance web framework for building APIs with Python.
By combining the power of TensorShare's tensor-sharing capabilities with FastAPI's intuitive and robust API development, you can easily set up an efficient server-side endpoint to handle tensor computations.
Tip
It's also because not using FastAPI in a project that leverages the best of open-source is a crime. Don't be a criminal.
Pre-requisites
This module (obviously) needs fastapi
to be installed:
The pre-built router
TensorShare provides a pre-built router that you can use to quickly integrate tensor-sharing into any existing FastAPI application.
Importing the router
To import the router, use the following import statement:
Adding it to FastAPI
To add the router to your FastAPI application, add it to the include
parameter of the APIRouter
constructor:
You can add extra prefix
or tags
parameter to the include_router
method to customize the router:
Customize the router
If you want to customize the router, you will have to pass two parameters to the create_tensorshare_router
function:
- server_config: A Python dictionary containing the configuration of the server, just like the one you would pass to
the
TensorShareServer
constructor. - custom_operation: The custom computation operation to use for the
/receive_tensor
endpoint. You will learn more about this in the Customizing the computation operation section.
The ping
endpoint
The router comes with a built-in ping
endpoint that you can use to check if the server is up and running.
@router.post(
f"{config.receive_tensor.path}",
response_model=config.response_model,
status_code=http_status.HTTP_200_OK,
tags=["tensorshare"],
)
def receive_tensor(
shared_tensor: TensorShare,
operation: Callable = Depends(get_computation_operation),
) -> Any:
"""Synchronous endpoint to handle tensors reception and computation."""
result = operation(shared_tensor)
return result
Nothing is too fancy here, just a simple endpoint that returns a DefaultResponse
object with a message.
The config.ping.path
will be the path defined in the TensorShareServer
configuration object you provided when creating the router.
The receive_tensor
endpoint
The router also comes with a built-in receive_tensor
endpoint that you can use to receive a tensor from a client.
@router.post(
f"{config.receive_tensor.path}",
response_model=config.response_model,
status_code=http_status.HTTP_200_OK,
tags=["tensorshare"],
)
def receive_tensor(
shared_tensor: TensorShare,
operation: Callable = Depends(get_computation_operation),
) -> Any:
"""Synchronous endpoint to handle tensors reception and computation."""
result = operation(shared_tensor)
return result
@router.post(
f"{config.receive_tensor.path}",
response_model=config.response_model,
status_code=http_status.HTTP_200_OK,
tags=["tensorshare"],
)
async def receive_tensor(
shared_tensor: TensorShare,
operation: Callable[[TensorShare], Awaitable[Any]] = Depends(
get_computation_operation
),
) -> Any:
"""Asynchronous endpoint to handle tensors reception and computation."""
result = await operation(shared_tensor)
return result
Like the ping
endpoint, the endpoint path will be defined in the TensorShareServer
configuration object.
Customizing the computation operation
As you can see, there is an operation
parameter that is a Callable
object.
This parameter is a dependency that will be resolved by the get_computation_operation
function.
It allows you to easily customize the computation operation that will be applied to the received tensor simply by
providing a custom Callable
object during the router creation (See Customize the router).
Here is the behavior of the get_computation_operation
function:
target_operation: Callable = custom_operation or default_compute_operation
def get_computation_operation() -> Callable:
"""Dependency that returns the currently set computation function."""
return target_operation
By default the target_operation
variable is set to the default_compute_operation
function, which is defined as follows:
target_operation: Callable[[TensorShare], Awaitable[Any]] = (
custom_operation or default_async_compute_operation
)
def get_computation_operation() -> Callable[[TensorShare], Awaitable[Any]]:
"""Dependency that returns the currently set async computation function."""
return target_operation
By default the target_operation
variable is set to the default_async_compute_operation
function, which is defined as follows:
Here is an example of a custom computation operation you could provide if you wanted to convert the received tensor
to a torch.Tensor
and return it as a list of lists of floats:
class GoBrrBrr(BaseModel):
"""The new response_model to use for the router."""
predictions: List[List[float]] # List of predictions converted to list of floats
def compute_go_brr_brr(tensors: TensorShare) -> Dict[str, List[float]]:
"""New computation operation to run inference on the received tensor."""
torch_tensors: Dict[str, torch.Tensor] = tensors.to_tensors(backend="torch")
inference_result: List[List[float]] = []
for _, v in torch_tensors.items():
y_hat = ml_pred(v)
inference_result.append(y_hat.tolist())
return {"predictions": inference_result}
Tip
This is an elementary example, but you can do more complex things with this.
You can run async
functions or even run a whole pipeline of operations.
Now during the router creation, you can pass the compute_go_brr_brr
function to the custom_operation
parameter and update the response_model
parameter to match the GoBrrBrr
model:
from fastapi import FastAPI, APIRouter
from tensorshare import create_tensorshare_router
class GoBrrBrr(BaseModel):
"""The new response_model to use for the router."""
predictions: List[List[float]] # List of predictions converted to list of floats
def compute_go_brr_brr(tensors: TensorShare) -> Dict[str, List[float]]:
"""New computation operation to run inference on the received tensor."""
torch_tensors: Dict[str, torch.Tensor] = tensors.to_tensors(backend="torch")
inference_result: List[List[float]] = []
for _, v in torch_tensors.items():
y_hat = ml_pred(v)
inference_result.append(y_hat.tolist())
return {"predictions": inference_result}
app = FastAPI() # Your FastAPI application
server_config = {"url": "http://localhost:8000", "response_model": GoBrrBrr}
ts_router = create_tensorshare_router(
server_config=server_config,
custom_operation=compute_go_brr_brr,
)
app.include_router(ts_router)
import asyncio
from fastapi import FastAPI, APIRouter
from tensorshare import create_async_tensorshare_router
class GoBrrBrr(BaseModel):
"""The new response_model to use for the router."""
predictions: List[List[float]] # List of pridctions converted to list of floats
async def compute_go_brr_brr(tensors: TensorShare) -> Dict[str, List[float]]:
"""New computation operation to run inference on the received tensor."""
torch_tensors: Dict[str, torch.Tensor] = tensors.to_tensors(backend="torch")
inference_result: List[List[float]] = []
for _, v in torch_tensors.items():
y_hat = await asyncio.run_in_executor(
None, ml_pred, v
) # Let's say ml_pred is a synchronous inference function
inference_result.append(y_hat.tolist())
return {"predictions": inference_result}
app = FastAPI() # Your FastAPI application
server_config = {"url": "http://localhost:8000", "response_model": GoBrrBrr}
ts_router = create_async_tensorshare_router(
server_config=server_config,
custom_operation=compute_go_brr_brr,
)
app.include_router(ts_router)
And voilà! 🎉
You now have a fully functional FastAPI application that can receive tensors from clients, run custom operations on them and return the result.
What a breeze! 😎
Created: 2023-08-20