Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions edge_config_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Example of constructing an edge endpoint configuration programmatically."""
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is temporary, I'll get rid of this before merging.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting rid of it is too easy, grab the most important snippets and put them in docs in a md file, we auto push those to a webpage


from groundlight import Groundlight
from groundlight.edge import DEFAULT, EDGE_WITH_ESCALATION, NO_CLOUD, EdgeInferenceConfig, RootEdgeConfig

gl = Groundlight()
detector1 = gl.get_detector("det_2z41nK0CyoFdWF6tEoB7DN5qwAx")
detector2 = gl.get_detector("det_2z41rs0Fo12LAk0oOZg0r4wR9Fn")
detector3 = gl.get_detector("det_2tYVTZrz8VLZhe94tjuPRl5rDeG")
detector4 = gl.get_detector("det_2sDfBz5xp6ZysB82kK7LfNYYSXx")
detector5 = gl.get_detector("det_2sDfGUP8cBt9Wrq0YFVLjVZhoI5")

config = RootEdgeConfig()

config.add_detector(detector1, NO_CLOUD)
config.add_detector(detector2, EDGE_WITH_ESCALATION)
config.add_detector(detector3, DEFAULT)

# Custom configs work alongside presets
my_custom_config = EdgeInferenceConfig(
name="my_custom_config",
always_return_edge_prediction=True,
min_time_between_escalations=0.5,
)
detector_id = detector4.id
config.add_detector(detector_id, my_custom_config)

# Cannot reuse names on EdgeInferenceConfig
config_with_name_collision = EdgeInferenceConfig(name='default')
try:
config.add_detector(detector5, config_with_name_collision)
except ValueError as e:
print(e)

# Frozen -- mutation raises an error
try:
NO_CLOUD.enabled = False
except Exception as e:
print(e)

print(config.model_dump_json(indent=2))
21 changes: 21 additions & 0 deletions src/groundlight/edge/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from .config import (
DEFAULT,
DISABLED,
EDGE_WITH_ESCALATION,
NO_CLOUD,
DetectorConfig,
EdgeInferenceConfig,
GlobalConfig,
RootEdgeConfig,
)

__all__ = [
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, this is a little redundant since it includes all objects that could be importet

"DEFAULT",
"DISABLED",
"EDGE_WITH_ESCALATION",
"NO_CLOUD",
"DetectorConfig",
"EdgeInferenceConfig",
"GlobalConfig",
"RootEdgeConfig",
]
134 changes: 134 additions & 0 deletions src/groundlight/edge/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from typing import Union

from model import Detector
from pydantic import BaseModel, ConfigDict, Field, model_validator
from typing_extensions import Self


class GlobalConfig(BaseModel):
refresh_rate: float = Field(
default=60.0,
description="The interval (in seconds) at which the inference server checks for a new model binary update.",
)
confident_audit_rate: float = Field(
default=1e-5, # A detector running at 1 FPS = ~100,000 IQ/day, so 1e-5 is ~1 confident IQ/day audited
description="The probability that any given confident prediction will be sent to the cloud for auditing.",
)


class EdgeInferenceConfig(BaseModel):
"""
Configuration for edge inference on a specific detector.
"""

model_config = ConfigDict(frozen=True)

name: str = Field(..., exclude=True, description="A unique name for this inference config preset.")
enabled: bool = Field( # TODO investigate and update the functionality of this option
default=True, description="Whether the edge endpoint should accept image queries for this detector."
)
api_token: str | None = Field(
default=None, description="API token used to fetch the inference model for this detector."
)
always_return_edge_prediction: bool = Field(
default=False,
description=(
"Indicates if the edge-endpoint should always provide edge ML predictions, regardless of confidence. "
"When this setting is true, whether or not the edge-endpoint should escalate low-confidence predictions "
"to the cloud is determined by `disable_cloud_escalation`."
),
)
disable_cloud_escalation: bool = Field(
default=False,
description=(
"Never escalate ImageQueries from the edge-endpoint to the cloud. "
"Requires `always_return_edge_prediction=True`."
),
)
min_time_between_escalations: float = Field(
default=2.0,
description=(
"The minimum time (in seconds) to wait between cloud escalations for a given detector. "
"Cannot be less than 0.0. "
"Only applies when `always_return_edge_prediction=True` and `disable_cloud_escalation=False`."
),
)

@model_validator(mode="after")
def validate_configuration(self) -> Self:
if self.disable_cloud_escalation and not self.always_return_edge_prediction:
raise ValueError(
"The `disable_cloud_escalation` flag is only valid when `always_return_edge_prediction` is set to True."
)
if self.min_time_between_escalations < 0.0:
raise ValueError("`min_time_between_escalations` cannot be less than 0.0.")
return self


class DetectorConfig(BaseModel):
"""
Configuration for a specific detector.
"""

detector_id: str = Field(..., description="Detector ID")
edge_inference_config: str = Field(..., description="Config for edge inference.")


class RootEdgeConfig(BaseModel):
"""
Root configuration for edge inference.
"""

global_config: GlobalConfig = Field(default_factory=GlobalConfig)
edge_inference_configs: dict[str, EdgeInferenceConfig] = Field(default_factory=dict)
detectors: list[DetectorConfig] = Field(default_factory=list)

@model_validator(mode="after")
def validate_inference_configs(self):
for detector_config in self.detectors:
if detector_config.edge_inference_config not in self.edge_inference_configs:
raise ValueError(f"Edge inference config '{detector_config.edge_inference_config}' not defined.")
return self

def add_detector(
self, detector: Union[str, Detector], edge_inference_config: Union[str, EdgeInferenceConfig]
) -> None:
detector_id = detector.id if isinstance(detector, Detector) else detector
if any(d.detector_id == detector_id for d in self.detectors):
raise ValueError(f"A detector with ID '{detector_id}' already exists.")
if isinstance(edge_inference_config, EdgeInferenceConfig):
config = edge_inference_config
existing = self.edge_inference_configs.get(config.name)
if existing is None:
self.edge_inference_configs[config.name] = config
elif existing is not config:
raise ValueError(f"A different inference config named '{config.name}' is already registered.")
config_name = config.name
else:
config_name = edge_inference_config
if config_name not in self.edge_inference_configs:
raise ValueError(
f"Edge inference config '{config_name}' not defined. "
f"Available configs: {list(self.edge_inference_configs.keys())}"
)
self.detectors.append(
DetectorConfig(
detector_id=detector_id,
edge_inference_config=config_name,
)
)


# Preset inference configs matching the standard edge-endpoint defaults.
DEFAULT = EdgeInferenceConfig(name="default")
EDGE_WITH_ESCALATION = EdgeInferenceConfig(
name="edge_with_escalation",
always_return_edge_prediction=True,
min_time_between_escalations=2.0,
)
NO_CLOUD = EdgeInferenceConfig(
name="no_cloud",
always_return_edge_prediction=True,
disable_cloud_escalation=True,
)
DISABLED = EdgeInferenceConfig(name="disabled", enabled=False)