|
from pathlib import Path |
|
from typing import Dict, List, Optional |
|
|
|
from yolo.config.config import Config |
|
from yolo.model.yolo import YOLO |
|
from yolo.utils.logger import logger |
|
|
|
|
|
class ModelExporter: |
|
def __init__(self, cfg: Config, model: YOLO, format: str, model_path: Optional[str] = None): |
|
self.model = model |
|
self.cfg = cfg |
|
self.class_num = cfg.dataset.class_num |
|
self.format = format |
|
if cfg.weight == True: |
|
cfg.weight = Path("weights") / f"{cfg.model.name}.pt" |
|
|
|
if model_path: |
|
self.model_path = model_path |
|
else: |
|
extention = self.format |
|
if self.format == "coreml": |
|
extention = "mlpackage" |
|
|
|
self.model_path = f"{Path(self.cfg.weight).stem}.{extention}" |
|
|
|
self.output_names: List[str] = [ |
|
"1_class_scores_small", |
|
"2_box_features_small", |
|
"3_bbox_deltas_small", |
|
"4_class_scores_medium", |
|
"5_box_features_medium", |
|
"6_bbox_deltas_medium", |
|
"7_class_scores_large", |
|
"8_box_features_large", |
|
"9_bbox_deltas_large", |
|
] |
|
|
|
def export_onnx(self, dynamic_axes: Optional[Dict[str, Dict[int, str]]] = None, model_path: Optional[str] = None): |
|
logger.info(f":package: Exporting model to onnx format") |
|
import torch |
|
|
|
dummy_input = torch.ones((1, 3, *self.cfg.image_size)) |
|
|
|
if model_path: |
|
onnx_model_path = model_path |
|
else: |
|
onnx_model_path = self.model_path |
|
|
|
torch.onnx.export( |
|
self.model, |
|
dummy_input, |
|
onnx_model_path, |
|
input_names=["input"], |
|
output_names=self.output_names, |
|
dynamic_axes=dynamic_axes, |
|
) |
|
|
|
logger.info(f":inbox_tray: ONNX model saved to {onnx_model_path}") |
|
|
|
return onnx_model_path |
|
|
|
def export_tflite(self): |
|
logger.info(f":package: Exporting model to tflite format") |
|
|
|
import torch |
|
|
|
self.model.eval() |
|
example_inputs = (torch.rand(1, 3, *self.cfg.image_size),) |
|
|
|
import ai_edge_torch |
|
|
|
edge_model = ai_edge_torch.convert(self.model, example_inputs) |
|
edge_model.export(self.model_path) |
|
|
|
logger.info(f":white_check_mark: Model exported to tflite format") |
|
|
|
def export_coreml(self): |
|
logger.info(f":package: Exporting model to coreml format") |
|
|
|
import torch |
|
|
|
self.model.eval() |
|
example_inputs = (torch.rand(1, 3, *self.cfg.image_size),) |
|
|
|
model = CoremlPostProcess(self.model, example_inputs[0]) |
|
|
|
model_output = model(example_inputs[0]) |
|
exported_program = torch.jit.trace(model, example_inputs, strict=True) |
|
import logging |
|
|
|
import coremltools as ct |
|
|
|
# Convert to Core ML program using the Unified Conversion API. |
|
logging.getLogger("coremltools").disabled = True |
|
model_from_export = ct.convert( |
|
exported_program, |
|
inputs=[ct.ImageType("image", shape=example_inputs[0].shape, |
|
scale=1/255., bias=[0,0,0])], |
|
convert_to="mlprogram", |
|
minimum_deployment_target=ct.target.iOS15, |
|
compute_precision=ct.precision.FLOAT16, |
|
) |
|
|
|
model_from_export.save(self.model_path) |
|
logger.info(f":white_check_mark: Model exported to coreml format {self.model_path}") |
|
|
|
# + export |
|
mlmodel = model_from_export |
|
modelSpec = mlmodel.get_spec() |
|
nmsSpec = createNmsModelSpec(modelSpec, model_output) |
|
|
|
experiment_name = "v9-s-x.mlpackage" |
|
test_size = (640, 640) |
|
combineModelsAndExport( |
|
mlmodel, modelSpec, nmsSpec, experiment_name, test_size) |
|
|
|
import torch |
|
from torch import nn |
|
from yolo.utils.bounding_box_utils import transform_bbox |
|
import coremltools as ct |
|
|
|
class CoremlPostProcess(nn.Module): |
|
|
|
def __init__(self, model, im): |
|
_, _, h, w = im.shape |
|
super().__init__() |
|
self.model = model |
|
self.normalize = torch.tensor([1.0 / w, 1.0 / h, 1.0 / w, 1.0 / h]) |
|
|
|
def forward(self, x): |
|
prediction = self.model(x) |
|
pred_class, _, pred_bbox = prediction[:3] |
|
pred_conf = prediction[3] if len(prediction) == 4 else None |
|
pred_class = pred_class.sigmoid() * (1 if pred_conf is None else pred_conf) |
|
pred_bbox = transform_bbox(pred_bbox, "xyxy -> xycwh") |
|
return pred_class[0], pred_bbox[0] * self.normalize |
|
|
|
classLabels = ('Person', 'Bicycle', 'Car', 'Motorcycle', 'Airplane', 'Bus', 'Train', 'Truck', 'Boat', 'Traffic light', 'Fire hydrant', 'Stop sign', 'Parking meter', 'Bench', 'Bird', 'Cat', 'Dog', 'Horse', 'Sheep', 'Cow', 'Elephant', 'Bear', 'Zebra', 'Giraffe', 'Backpack', 'Umbrella', 'Handbag', 'Tie', 'Suitcase', 'Frisbee', 'Skis', 'Snowboard', 'Sports ball', 'Kite', 'Baseball bat', 'Baseball glove', 'Skateboard', 'Surfboard', 'Tennis racket', 'Bottle', 'Wine glass', 'Cup', 'Fork', 'Knife', 'Spoon', 'Bowl', 'Banana', 'Apple', 'Sandwich', 'Orange', 'Broccoli', 'Carrot', 'Hot dog', 'Pizza', 'Donut', 'Cake', 'Chair', 'Couch', 'Potted plant', 'Bed', 'Dining table', 'Toilet', 'Tv', 'Laptop', 'Mouse', 'Remote', 'Keyboard', 'Cell phone', 'Microwave', 'Oven', 'Toaster', 'Sink', 'Refrigerator', 'Book', 'Clock', 'Vase', 'Scissors', 'Teddy bear', 'Hair drier', 'Toothbrush') |
|
iouThreshold = 0.6 |
|
confidenceThreshold = 0.1 |
|
specificationVersion = 7 |
|
numberOfClassLabels = 80 |
|
model_name = "yolo-v9-s" |
|
|
|
def createNmsModelSpec(modelSpec, model_output): |
|
''' |
|
Create a coreml model with nms to filter the results of the model |
|
''' |
|
nmsSpec = ct.proto.Model_pb2.Model() |
|
nmsSpec.specificationVersion = 6 |
|
|
|
out0, out1 = iter(modelSpec.description.output) |
|
out0_shape = tuple(model_output[0].size()) |
|
out1_shape = tuple(model_output[1].size()) |
|
|
|
out0.type.multiArrayType.shape[:] = out0_shape |
|
out1.type.multiArrayType.shape[:] = out1_shape |
|
|
|
# Define input and outputs of the model |
|
for i in range(2): |
|
nnOutput = modelSpec.description.output[i].SerializeToString() |
|
nmsSpec.description.input.add() |
|
nmsSpec.description.input[i].ParseFromString(nnOutput) |
|
|
|
nmsSpec.description.output.add() |
|
nmsSpec.description.output[i].ParseFromString(nnOutput) |
|
|
|
nmsSpec.description.output[0].name = "confidence" |
|
nmsSpec.description.output[1].name = "coordinates" |
|
|
|
# Define output shape of the model |
|
outputSizes = [numberOfClassLabels, 4] |
|
for i in range(len(outputSizes)): |
|
maType = nmsSpec.description.output[i].type.multiArrayType |
|
# First dimension of both output is the number of boxes, which should be flexible |
|
maType.shapeRange.sizeRanges.add() |
|
maType.shapeRange.sizeRanges[0].lowerBound = 0 |
|
maType.shapeRange.sizeRanges[0].upperBound = -1 |
|
# Second dimension is fixed, for "confidence" it's the number of classes, for coordinates it's position (x, y) and size (w, h) |
|
maType.shapeRange.sizeRanges.add() |
|
maType.shapeRange.sizeRanges[1].lowerBound = outputSizes[i] |
|
maType.shapeRange.sizeRanges[1].upperBound = outputSizes[i] |
|
del maType.shape[:] |
|
|
|
# Define the model type non maximum supression |
|
nms = nmsSpec.nonMaximumSuppression |
|
nms.confidenceInputFeatureName = out0.name |
|
nms.coordinatesInputFeatureName = out1.name |
|
nms.confidenceOutputFeatureName = "confidence" |
|
nms.coordinatesOutputFeatureName = "coordinates" |
|
nms.iouThresholdInputFeatureName = "iouThreshold" |
|
nms.confidenceThresholdInputFeatureName = "confidenceThreshold" |
|
# Some good default values for the two additional inputs, can be overwritten when using the model |
|
nms.iouThreshold = iouThreshold |
|
nms.confidenceThreshold = confidenceThreshold |
|
nms.stringClassLabels.vector.extend(classLabels) |
|
|
|
return nmsSpec |
|
|
|
def combineModelsAndExport(mlmodel, modelSpec, nmsSpec, fileName, test_size): |
|
''' |
|
Combines the coreml model with export logic and the nms to one final model. Optionally save with different quantization (32, 16, 8) (Works only if on Mac Os) |
|
''' |
|
try: |
|
print(f'Combine CoreMl model with nms and export model') |
|
# Combine models to a single one |
|
pipeline = ct.models.pipeline.Pipeline(input_features=[("image", ct.models.datatypes.Array(3, test_size[0], test_size[1])), |
|
("iouThreshold", ct.models.datatypes.Double( |
|
)), |
|
("confidenceThreshold", ct.models.datatypes.Double())], output_features=["confidence", "coordinates"]) |
|
|
|
# Required version (>= ios13) in order for mns to work |
|
pipeline.spec.specificationVersion = 6 |
|
|
|
pipeline.add_model(modelSpec) |
|
pipeline.add_model(nmsSpec) |
|
|
|
pipeline.spec.description.input[0].ParseFromString( |
|
modelSpec.description.input[0].SerializeToString()) |
|
pipeline.spec.description.output[0].ParseFromString( |
|
nmsSpec.description.output[0].SerializeToString()) |
|
pipeline.spec.description.output[1].ParseFromString( |
|
nmsSpec.description.output[1].SerializeToString()) |
|
|
|
# Metadata for the model‚ |
|
pipeline.spec.description.input[ |
|
1].shortDescription = f"(optional) IOU Threshold override (Default: {iouThreshold})" |
|
pipeline.spec.description.input[ |
|
2].shortDescription = f"(optional) Confidence Threshold override (Default: {confidenceThreshold})" |
|
pipeline.spec.description.output[0].shortDescription = u"Boxes \xd7 Class confidence" |
|
pipeline.spec.description.output[ |
|
1].shortDescription = u"Boxes \xd7 [x, y, width, height] (relative to image size)" |
|
pipeline.spec.description.metadata.versionString = "1.0" |
|
pipeline.spec.description.metadata.shortDescription = model_name |
|
pipeline.spec.description.metadata.author = "" |
|
pipeline.spec.description.metadata.license = "" |
|
|
|
model = ct.models.MLModel(pipeline.spec, weights_dir = mlmodel.weights_dir) |
|
model.save(fileName) |
|
|
|
print(f'CoreML export success, saved as {fileName}') |
|
except Exception as e: |
|
print(f'CoreML export failure: {e}') |