export code
yolo_model = "yoloe-11s-seg.pt"
model = YOLOE(yolo_model)
# Define visual prompts using bounding boxes and their corresponding class IDs.
# Each box highlights an example of the object you want the model to detect.
visual_prompts = dict(
bboxes=np.array(
[
[221.52, 405.8, 344.98, 857.54], # Box enclosing person
[120, 425, 160, 445], # Box enclosing glasses
],
),
cls=np.array(
[
0, # ID to be assigned for person
1, # ID to be assigned for glassses
]
),
)
# Run inference on an image, using the provided visual prompts as guidance
results = model.predict(
"ultralytics/assets/bus.jpg",
visual_prompts=visual_prompts,
predictor=YOLOEVPSegPredictor,
)
exported_path = model.export(
format="engine",
int8=True,
)
inference code
model = YOLOE("yoloe-11s-seg.engine")
# Run inference on an image, using the provided visual prompts as guidance
results = model(
"ultralytics/assets/bus.jpg",
verbose=False,
)
in the flamegraph, the warmup and forward take a long time, about 500ms
I also wrote a tensorrt api wrapper but the output is all zero
class TRTWrapper:
"""
TensorRT 10+ wrapper for YOLOE engine.
Handles preprocessing, execution, and postprocessing.
"""
def __init__(self, engine_path: str):
if not os.path.exists(engine_path):
raise FileNotFoundError(f"Engine file not found: {engine_path}")
self.logger = trt.Logger(trt.Logger.WARNING)
# Load engine
with open(engine_path, "rb") as f, trt.Runtime(self.logger) as runtime:
self.engine = runtime.deserialize_cuda_engine(f.read())
self.context = self.engine.create_execution_context()
# Allocate device memory
self.inputs, self.outputs, self.bindings = [], [], []
self.input_tensor_name = None
self.output_tensor_name = None
self.input_shape = None
# Iterate over bindings
for binding_name in self.engine: # engine is iterable over tensor names
shape = self.engine.get_tensor_shape(binding_name)
dtype = trt.nptype(self.engine.get_tensor_dtype(binding_name))
size = trt.volume(shape)
device_mem = cuda.mem_alloc(size * dtype().nbytes)
self.bindings.append(int(device_mem))
if self.engine.get_tensor_mode(binding_name) == trt.TensorIOMode.INPUT:
self.inputs.append(device_mem)
self.input_tensor_name = binding_name
self.input_shape = shape
else:
self.outputs.append(device_mem)
self.output_tensor_name = binding_name
if self.input_shape is None or self.input_tensor_name is None:
raise RuntimeError("No input binding found in engine.")
if self.output_tensor_name is None:
raise RuntimeError("No output binding found in engine.")
# CUDA stream
self.stream = cuda.Stream()
def preprocess(self, img: np.ndarray) -> np.ndarray:
"""Resize, BGR->RGB, normalize, CHW."""
_, h, w = self.input_shape[-3:]
# img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, (w, h))
# img = img.astype(np.float32) / 255.0
img = np.transpose(img, (2, 0, 1)) # HWC -> CHW
return np.ascontiguousarray(img)
def postprocess(self, output: np.ndarray, conf_thresh: float, original_shape):
"""
Converts network output to masks, bounding boxes, and class IDs.
Args:
output: np.ndarray, shape (batch, channels, H, W)
conf_thresh: float, threshold for confidence
original_shape: tuple, (H_orig, W_orig, C)
Returns:
List of dicts per mask with keys: 'bbox', 'conf', 'class_id', 'mask'
"""
h_orig, w_orig = original_shape[:2]
batch, num_channels, H, W = output.shape
results = []
for b in range(batch):
pred = output[b] # shape: (num_channels, H, W)
# Assume:
# channels 0..N-2 = feature/mask channels
# last channel (or some channel) = confidence map
# You can adjust depending on your model
conf_map = pred[4] # shape (H, W), example confidence channel
if conf_map.max() < conf_thresh:
continue # skip low-confidence maps
# Example: generate mask(s)
mask_channels = pred[5:] # remaining channels are masks
for i, mask_map in enumerate(mask_channels):
mask = (mask_map >= 0.5).astype(np.uint8) # threshold mask
if mask.sum() == 0:
continue # skip empty masks
# Compute bounding box from mask
ys, xs = np.where(mask)
y1, y2 = ys.min(), ys.max()
x1, x2 = xs.min(), xs.max()
# Rescale bbox to original image
scale_y = h_orig / H
scale_x = w_orig / W
bbox = [
int(x1 * scale_x),
int(y1 * scale_y),
int(x2 * scale_x),
int(y2 * scale_y),
]
# Aggregate confidence for the mask
conf_value = conf_map[y1:y2+1, x1:x2+1].mean()
results.append({
"bbox": bbox,
"conf": float(conf_value),
"class_id": i, # or assign proper class
"mask": cv2.resize(mask, (w_orig, h_orig), interpolation=cv2.INTER_NEAREST)
})
return results
def __call__(self, image: np.ndarray, conf: float = 0.1, verbose: bool = False):
# Preprocess
img = self.preprocess(image)
if len(self.input_shape) == 4:
img = np.expand_dims(img, axis=0)
# Copy input to device
cuda.memcpy_htod_async(self.inputs[0], img, self.stream)
# Allocate output buffers
output_buffers = {}
for name in [self.output_tensor_name]:
shape = tuple(self.engine.get_tensor_shape(name)) # <-- convert Dims to tuple
dtype = trt.nptype(self.engine.get_tensor_dtype(name))
host_arr = cuda.pagelocked_empty(shape, dtype=dtype)
device_arr = cuda.mem_alloc(host_arr.nbytes)
self.context.set_tensor_address(name, int(device_arr))
output_buffers[name] = (host_arr, device_arr)
# Run inference
self.context.execute_async_v3(stream_handle=self.stream.handle)
# Copy outputs to host
for name, (host_arr, device_arr) in output_buffers.items():
cuda.memcpy_dtoh_async(host_arr, device_arr, self.stream)
# Synchronize
self.stream.synchronize()
# Postprocess
output_name = self.output_tensor_name
output_arr = output_buffers[output_name][0].reshape(tuple(self.engine.get_tensor_shape(output_name)))
return self.postprocess(output_arr, conf_thresh=conf, original_shape=image.shape)
I am using TensorRT 10.13.2, cuda 11.4