Skip to content

Connecting to Direct API Backends

This guide describes how to connect the Gateway to existing OpenAI-compatible backend APIs.

Endpoint Configuration

You can simply reuse the DirectAPIEndpoint endpoint adaptor and add your entries to the fixtures/endpoints.json file. Each entry should respect the following the data structure:

{
    "model": "resource_server_async.endpoint",
    "pk": 1,
    "fields": {
        "endpoint_slug": "your-cluster-api-your-model-70b",
        "cluster": "your-cluster",
        "framework": "api",
        "model": "Your-Model-70B",
        "endpoint_adapter": "resource_server_async.endpoints.direct_api.DirectAPIEndpoint",
        "config": {
            "api_url": "https://your-targetted-api.com/v1/your-model/chat/completions",
            "api_key_env_name": "YOUR_MODEL_70B_API_KEY"
        }
    }
}

Here, YOUR_MODEL_70B_API_KEY is an environment variable that includes the actual API key. Such variable can have arbitrary names.

Make sure that endpoint_slug has the following format: cluster-framework-model (with no / or . character, all lower case). For example, the meta-llama/Meta-Llama-3.1-70B-Instruct model hosted on my-cluster and served with my-framework should have the following slug: my-cluster-my-framework-meta-llamameta-llama-31-70b-instruct. You can also use the Django slugify tool.

from django.utils.text import slugify
endpoint_slug = slugify(" ".join([cluster, framework, model.lower()]))

If you need to incorporate additional logics, you can create an extention adaptor that inherits from the DirectAPIEndpoint class. Make sure that you change the endpoint_adapter path in fixtures/endpoints.json to point to your new adaptor class. In the function re-definitions, you can modify the input data, make additional checks, modify the API URL (via the self.set_api_url(your_new_url) function), ect. Below is an example of how an adaptor extention can be built:

from resource_server_async.endpoints.endpoint import BaseEndpoint, SubmitTaskResponse

class CustomEndpoint(DirectAPIEndpoint):
    """Custom endpoint implementation of DirectAPIEndpoint."""

    # Class initialization
    def __init__(self,
        id: str,
        endpoint_slug: str,
        cluster: str,
        framework: str,
        model: str,
        endpoint_adapter: str,
        allowed_globus_groups: List[str] = None,
        allowed_domains: List[str] = None,
        config: dict = None
    ):
        super().__init__(
            id,
            endpoint_slug,
            cluster,
            framework,
            model,
            endpoint_adapter,
            allowed_globus_groups,
            allowed_domains,
            config
        )

    # Inject custom logics to required submit_task function
    async def submit_task(self, data: dict) -> SubmitTaskResponse:
        """Add custom logic before calling the parent submit_task function."""

        # Do some checks with model status [recommended to avoid overloading the backend API]
        response = await self.get_endpoint_status()
        if response.error_message:
            return SubmitStreamingTaskResponse(
                error_message=response.error_message,
                error_code=response.error_code
            )

        # Modify input data to be compliant with the backend API
        api_request_data = {**data["model_params"]}
        api_request_data["stream"] = False

        # Additional logging
        log.info(f"Making API call to model {self.model}")

        # Call sumbit_task function of the parent DirectAPIEndpoint class
        return await super().submit_task(api_request_data)

Cluster Configuration

A cluster adaptor that inherits from the BaseCluster class must be created in order to add the get_jobs function logic, which is designed to list the state (e.g., running) of each model hosted in the backend. Entries in the fixtures/clusters.json file should respect the following the data structure:

{
    "model": "resource_server_async.cluster",
    "pk": 1,
    "fields": {
        "cluster_name": "your-cluster",
        "frameworks": [
            "vllm"
        ],
        "openai_endpoints": [
            "chat/completions"
            "completions"
        ],
        "cluster_adapter": "resource_server_async.clusters.your_cluster.YourCluster",
        "config": {}
    }
}

Below is an example of how such cluster adaptor can be defined:

from resource_server_async.clusters.cluster import BaseCluster, GetJobsResponse

class CustomCluster(BaseCluster):
    """Custom implementation of BaseCluster."""

    # Class initialization
    def __init__(self,
        id: str,
        cluster_name: str,
        cluster_adapter: str,
        frameworks: List[str],
        openai_endpoints: List[str],
        allowed_globus_groups: List[str] = [],
        allowed_domains: List[str] = [],
        config: Dict = None
    ):
        # [Optional] Do something with custom config if needed
        self.config = config

        # Initialize the rest of the common attributes
        super().__init__(
            id,
            cluster_name,
            cluster_adapter,
            frameworks,
            openai_endpoints,
            allowed_globus_groups,
            allowed_domains
        )

    # [Required function]
    async def get_jobs(self) -> GetJobsResponse:
        """Provides a status of the cluster as a whole, including which models are running."""

        # Get cluster status
        cluster_status = await some_utils.fetch_status()

        # Format and return model status
        try:
            return GetJobsResponse(jobs=cluster_status)
        except Exception as e:
            return GetJobsResponse(
                error_message=f"Error: Could not generate GetJobsResponse: {e}", 
                error_code=500
            )