Building Custom Adaptors¶
This guide describes how to create new adaptors from scratch in order to fully customize the integration of your backends.
Custom Endpoint Adaptors¶
Each endpoint adaptor must inherits from the BaseEndpoint class:
from resource_server_async.endpoints.endpoint import BaseEndpoint
class CustomEndpoint(BaseEndpoint):
"""Custom endpoint implementation of BaseEndpoint."""
def __init__(self,
id: str,
endpoint_slug: str,
cluster: str,
framework: str,
model: str,
endpoint_adapter: str,
allowed_globus_groups: List[str] = None,
allowed_domains: List[str] = None,
config: dict = None
):
# Assign custom config dictionary
# From config fields in endpoints.json
self.config = config
# Initialize the rest of the common attributes
super().__init__(
id,
endpoint_slug,
cluster,
framework,
model,
endpoint_adapter,
allowed_globus_groups,
allowed_domains
)
Required Functions¶
Each adaptor must define the following required functions:
from resource_server_async.endpoints.endpoint import (
SubmitTaskResponse,
SubmitStreamingTaskResponse
)
async def submit_task(self, data: dict) -> SubmitTaskResponse:
"""Submits a single interactive task to the compute resource."""
pass
async def submit_streaming_task(self, data: dict, request_log_id: str) -> SubmitStreamingTaskResponse:
"""Submits a single interactive task to the compute resource with streaming enabled."""
pass
In these functions, while you can introduce any logics you want, you must return an object in the requested format to ensure proper integration with the rest of the Gateway API codes.
from pydantic import BaseModel
from typing import Optional
from django.http import StreamingHttpResponse
class SubmitTaskResponse(BaseModel):
result: Optional[str]
task_id: Optional[str]
error_message: Optional[str]
error_code: Optional[int]
class SubmitStreamingTaskResponse(BaseModel):
response: Optional[StreamingHttpResponse]
task_id: Optional[str]
error_message: Optional[str]
error_code: Optional[int]
Optional Functions¶
The optional batch mode, which is deactivated by default, can be activated by re-defining the following functions:
from resource_server_async.endpoints.endpoint import (
SubmitBatchResponse,
GetBatchStatusResponse
)
def has_batch_enabled(self) -> bool:
"""Return True if batch can be used for this endpoint, False otherwise."""
pass
async def submit_batch(self, batch_data: dict, username: str) -> SubmitBatchResponse:
"""Submits a batch job to the compute resource."""
pass
async def get_batch_status(self, batch: BatchLog) -> GetBatchStatusResponse:
"""Get the status and results of a batch job."""
pass
As for the required functions, you can introduce any logics you want, but you must return an object in the requested format to ensure proper integration with the rest of the Gateway API codes.
from pydantic import BaseModel
from typing import Optional
class BatchStatusEnum(str, Enum):
pending = 'pending'
running = 'running'
failed = 'failed'
completed = 'completed'
class SubmitBatchResponse(BaseModel):
batch_id: Optional[str]
task_ids: Optional[str]
status: Optional[BatchStatusEnum]
error_message: Optional[str]
error_code: Optional[int]
class GetBatchStatusResponse(BaseModel):
status: Optional[BatchStatusEnum]
result: Optional[str]
error_message: Optional[str]
error_code: Optional[int]
Custom Cluster Adaptors¶
Each cluster adaptor must inherits from the BaseEndpoint class:
from resource_server_async.clusters.cluster import BaseCluster
class CustomCluster(BaseCluster):
"""Custom implementation of BaseCluster."""
def __init__(self,
id: str,
cluster_name: str,
cluster_adapter: str,
frameworks: List[str],
openai_endpoints: List[str],
allowed_globus_groups: List[str] = [],
allowed_domains: List[str] = [],
config: dict = None
):
# Assign custom config dictionary
# From config fields in endpoints.json
self.config = config
# Initialize the rest of the common attributes
super().__init__(
id,
cluster_name,
cluster_adapter,
frameworks,
openai_endpoints,
allowed_globus_groups,
allowed_domains
)
Required Functions¶
Each adaptor must define the following required functions:
from resource_server_async.clusters.cluster import GetJobsResponse
async def get_jobs(self) -> GetJobsResponse:
"""Provides a status of the cluster as a whole, including which models are running."""
pass
The goal of this function is to query the backend and report the status of each model to help users identify which models are ready to be used. Below is an example of the expected data format for the response (taken from the ALCF Metis cluster):
{
"running": [
{
"Models": "gpt-oss-120b-131072",
"Framework": "api",
"Cluster": "metis",
"Model Status": "running",
"Description": "gpt-oss-120b-131072 - gpt oss 131K",
"Model Version": 1
},
{
"Models": "Llama-4-Maverick-17B-128E-Instruct",
"Framework": "api",
"Cluster": "metis",
"Model Status": "running",
"Description": "Llama-4-Maverick-17B-128E-Instruct - maverick",
"Model Version": 4
}
],
"queued": [],
"stopped": [],
"cluster_status": {
"cluster": "metis",
"total_models": 2,
"live_models": 2,
"stopped_models": 0
}
}
While there is some flexibility in what can be displayed in the response, the following structure must be included:
from pydantic import BaseModel
from typing import Optional, List
class JobInfo(BaseModel):
Models: str
Framework: str
Cluster: str
# Open dictionary that allow more fields
model_config = {"extra": "allow"}
class Jobs(BaseModel):
running: List[JobInfo]
queued: List[JobInfo]
stopped: List[JobInfo]
others: List[JobInfo]
private_batch_running: List[JobInfo]
private_batch_queued: List[JobInfo]
cluster_status: dict
class GetJobsResponse(BaseModel):
jobs: Optional[Jobs]
error_message: Optional[str]
error_code: Optional[int]
Paths to Your Adaptors¶
Once you have your adaptors ready, make sure you point to them in the fixtures/endpoints.json and fixtures/clusters.json files. If, for example, your endpoint and cluster adaptors are located at my_app/custom_endpoint.py and my_app/custom_cluster.py, the adaptor paths would be: