Skip to content

API Reference: Training

Standard Trainer

auditml.training.trainer.Trainer

Standard (non-private) training loop.

Parameters:

Name Type Description Default
model Module

The network to train.

required
train_loader DataLoader

DataLoader for training data.

required
val_loader DataLoader

DataLoader for validation/test data.

required
optimizer Optimizer

PyTorch optimizer instance.

required
criterion Module | None

Loss function (default CrossEntropyLoss).

None
device device | str

Device to train on.

'cpu'
max_grad_norm float | None

Optional gradient clipping max-norm. None disables clipping.

None
Source code in src/auditml/training/trainer.py
class Trainer:
    """Standard (non-private) training loop.

    Parameters
    ----------
    model:
        The network to train.
    train_loader:
        DataLoader for training data.
    val_loader:
        DataLoader for validation/test data.
    optimizer:
        PyTorch optimizer instance.
    criterion:
        Loss function (default ``CrossEntropyLoss``).
    device:
        Device to train on.
    max_grad_norm:
        Optional gradient clipping max-norm. ``None`` disables clipping.
    """

    def __init__(
        self,
        model: nn.Module,
        train_loader: DataLoader,
        val_loader: DataLoader,
        optimizer: torch.optim.Optimizer,
        criterion: nn.Module | None = None,
        device: torch.device | str = "cpu",
        max_grad_norm: float | None = None,
    ) -> None:
        self.model = model.to(device)
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.optimizer = optimizer
        self.criterion = criterion or nn.CrossEntropyLoss()
        self.device = torch.device(device)
        self.max_grad_norm = max_grad_norm

        self.history: dict[str, list[float]] = {
            "train_loss": [],
            "train_acc": [],
            "val_loss": [],
            "val_acc": [],
        }

    # ── public API ───────────────────────────────────────────────────────

    def train(
        self,
        epochs: int = 20,
        patience: int = 10,
        checkpoint_dir: str | Path | None = None,
    ) -> dict[str, list[float]]:
        """Run the full training loop.

        Parameters
        ----------
        epochs:
            Maximum number of training epochs.
        patience:
            Stop early if validation loss has not improved for this many
            consecutive epochs. Set to ``0`` to disable.
        checkpoint_dir:
            If provided, save the best model checkpoint here.

        Returns
        -------
        dict
            The training ``history`` dict.
        """
        scheduler = lr_scheduler.ReduceLROnPlateau(
            self.optimizer, mode="min", factor=0.5, patience=max(1, patience // 3),
        )

        best_val_loss = float("inf")
        best_state: dict[str, Any] | None = None
        no_improve = 0

        for epoch in range(1, epochs + 1):
            train_loss, train_acc = self._train_epoch(epoch, epochs)
            val_metrics = self.evaluate(self.val_loader)
            val_loss = val_metrics["loss"]
            val_acc = val_metrics["accuracy"]

            self.history["train_loss"].append(train_loss)
            self.history["train_acc"].append(train_acc)
            self.history["val_loss"].append(val_loss)
            self.history["val_acc"].append(val_acc)

            scheduler.step(val_loss)

            tqdm.write(
                f"Epoch {epoch}/{epochs} — "
                f"train_loss={train_loss:.4f}  train_acc={train_acc:.2%}  "
                f"val_loss={val_loss:.4f}  val_acc={val_acc:.2%}"
            )

            # Early stopping / checkpointing
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                best_state = copy.deepcopy(self.model.state_dict())
                no_improve = 0
            else:
                no_improve += 1

            if patience > 0 and no_improve >= patience:
                tqdm.write(f"Early stopping at epoch {epoch} (patience={patience}).")
                break

        # Restore best weights
        if best_state is not None:
            self.model.load_state_dict(best_state)

        if checkpoint_dir is not None:
            self.save_checkpoint(
                Path(checkpoint_dir),
                epoch=epoch,
                metrics={"val_loss": best_val_loss, "val_acc": val_acc},
            )

        return self.history

    def evaluate(self, loader: DataLoader) -> dict[str, float]:
        """Evaluate model on *loader*.

        Returns
        -------
        dict
            ``{"loss": float, "accuracy": float}``
        """
        self.model.eval()
        total_loss = 0.0
        correct = 0
        total = 0

        with torch.no_grad():
            for inputs, targets in loader:
                inputs = inputs.to(self.device)
                targets = targets.to(self.device)
                outputs = self.model(inputs)
                total_loss += self.criterion(outputs, targets).item() * inputs.size(0)
                correct += (outputs.argmax(1) == targets).sum().item()
                total += inputs.size(0)

        return {
            "loss": total_loss / max(total, 1),
            "accuracy": correct / max(total, 1),
        }

    # ── checkpoint helpers ───────────────────────────────────────────────

    def save_checkpoint(
        self,
        directory: Path,
        epoch: int,
        metrics: dict[str, float] | None = None,
    ) -> Path:
        """Save model weights, optimizer state, and metadata.

        Returns the path to the saved ``.pt`` file.
        """
        directory = Path(directory)
        directory.mkdir(parents=True, exist_ok=True)

        ckpt_path = directory / "model.pt"
        torch.save(
            {
                "epoch": epoch,
                "model_state_dict": self.model.state_dict(),
                "optimizer_state_dict": self.optimizer.state_dict(),
                "metrics": metrics or {},
                "history": self.history,
            },
            ckpt_path,
        )

        # Also save a human-readable metrics file
        if metrics:
            (directory / "metrics.json").write_text(
                json.dumps(metrics, indent=2),
            )

        return ckpt_path

    def load_checkpoint(self, path: str | Path) -> dict[str, Any]:
        """Load a checkpoint and restore model/optimizer state.

        Returns the checkpoint dict (contains ``epoch``, ``metrics``, etc.).
        """
        path = Path(path)
        ckpt = torch.load(path, map_location=self.device, weights_only=False)
        self.model.load_state_dict(ckpt["model_state_dict"])
        self.optimizer.load_state_dict(ckpt["optimizer_state_dict"])
        self.history = ckpt.get("history", self.history)
        return ckpt

    # ── internals ────────────────────────────────────────────────────────

    def _train_epoch(self, epoch: int, total_epochs: int) -> tuple[float, float]:
        """Train for one epoch. Returns (avg_loss, accuracy)."""
        self.model.train()
        total_loss = 0.0
        correct = 0
        total = 0

        pbar = tqdm(
            self.train_loader,
            desc=f"Epoch {epoch}/{total_epochs}",
            leave=False,
        )
        for inputs, targets in pbar:
            inputs = inputs.to(self.device)
            targets = targets.to(self.device)

            self.optimizer.zero_grad()
            outputs = self.model(inputs)
            loss = self.criterion(outputs, targets)
            loss.backward()

            if self.max_grad_norm is not None:
                nn.utils.clip_grad_norm_(self.model.parameters(), self.max_grad_norm)

            self.optimizer.step()

            batch_size = inputs.size(0)
            total_loss += loss.item() * batch_size
            correct += (outputs.argmax(1) == targets).sum().item()
            total += batch_size

            pbar.set_postfix(loss=f"{loss.item():.4f}")

        return total_loss / max(total, 1), correct / max(total, 1)

train(epochs: int = 20, patience: int = 10, checkpoint_dir: str | Path | None = None) -> dict[str, list[float]]

Run the full training loop.

Parameters:

Name Type Description Default
epochs int

Maximum number of training epochs.

20
patience int

Stop early if validation loss has not improved for this many consecutive epochs. Set to 0 to disable.

10
checkpoint_dir str | Path | None

If provided, save the best model checkpoint here.

None

Returns:

Type Description
dict

The training history dict.

Source code in src/auditml/training/trainer.py
def train(
    self,
    epochs: int = 20,
    patience: int = 10,
    checkpoint_dir: str | Path | None = None,
) -> dict[str, list[float]]:
    """Run the full training loop.

    Parameters
    ----------
    epochs:
        Maximum number of training epochs.
    patience:
        Stop early if validation loss has not improved for this many
        consecutive epochs. Set to ``0`` to disable.
    checkpoint_dir:
        If provided, save the best model checkpoint here.

    Returns
    -------
    dict
        The training ``history`` dict.
    """
    scheduler = lr_scheduler.ReduceLROnPlateau(
        self.optimizer, mode="min", factor=0.5, patience=max(1, patience // 3),
    )

    best_val_loss = float("inf")
    best_state: dict[str, Any] | None = None
    no_improve = 0

    for epoch in range(1, epochs + 1):
        train_loss, train_acc = self._train_epoch(epoch, epochs)
        val_metrics = self.evaluate(self.val_loader)
        val_loss = val_metrics["loss"]
        val_acc = val_metrics["accuracy"]

        self.history["train_loss"].append(train_loss)
        self.history["train_acc"].append(train_acc)
        self.history["val_loss"].append(val_loss)
        self.history["val_acc"].append(val_acc)

        scheduler.step(val_loss)

        tqdm.write(
            f"Epoch {epoch}/{epochs} — "
            f"train_loss={train_loss:.4f}  train_acc={train_acc:.2%}  "
            f"val_loss={val_loss:.4f}  val_acc={val_acc:.2%}"
        )

        # Early stopping / checkpointing
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_state = copy.deepcopy(self.model.state_dict())
            no_improve = 0
        else:
            no_improve += 1

        if patience > 0 and no_improve >= patience:
            tqdm.write(f"Early stopping at epoch {epoch} (patience={patience}).")
            break

    # Restore best weights
    if best_state is not None:
        self.model.load_state_dict(best_state)

    if checkpoint_dir is not None:
        self.save_checkpoint(
            Path(checkpoint_dir),
            epoch=epoch,
            metrics={"val_loss": best_val_loss, "val_acc": val_acc},
        )

    return self.history

evaluate(loader: DataLoader) -> dict[str, float]

Evaluate model on loader.

Returns:

Type Description
dict

{"loss": float, "accuracy": float}

Source code in src/auditml/training/trainer.py
def evaluate(self, loader: DataLoader) -> dict[str, float]:
    """Evaluate model on *loader*.

    Returns
    -------
    dict
        ``{"loss": float, "accuracy": float}``
    """
    self.model.eval()
    total_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, targets in loader:
            inputs = inputs.to(self.device)
            targets = targets.to(self.device)
            outputs = self.model(inputs)
            total_loss += self.criterion(outputs, targets).item() * inputs.size(0)
            correct += (outputs.argmax(1) == targets).sum().item()
            total += inputs.size(0)

    return {
        "loss": total_loss / max(total, 1),
        "accuracy": correct / max(total, 1),
    }

save_checkpoint(directory: Path, epoch: int, metrics: dict[str, float] | None = None) -> Path

Save model weights, optimizer state, and metadata.

Returns the path to the saved .pt file.

Source code in src/auditml/training/trainer.py
def save_checkpoint(
    self,
    directory: Path,
    epoch: int,
    metrics: dict[str, float] | None = None,
) -> Path:
    """Save model weights, optimizer state, and metadata.

    Returns the path to the saved ``.pt`` file.
    """
    directory = Path(directory)
    directory.mkdir(parents=True, exist_ok=True)

    ckpt_path = directory / "model.pt"
    torch.save(
        {
            "epoch": epoch,
            "model_state_dict": self.model.state_dict(),
            "optimizer_state_dict": self.optimizer.state_dict(),
            "metrics": metrics or {},
            "history": self.history,
        },
        ckpt_path,
    )

    # Also save a human-readable metrics file
    if metrics:
        (directory / "metrics.json").write_text(
            json.dumps(metrics, indent=2),
        )

    return ckpt_path

load_checkpoint(path: str | Path) -> dict[str, Any]

Load a checkpoint and restore model/optimizer state.

Returns the checkpoint dict (contains epoch, metrics, etc.).

Source code in src/auditml/training/trainer.py
def load_checkpoint(self, path: str | Path) -> dict[str, Any]:
    """Load a checkpoint and restore model/optimizer state.

    Returns the checkpoint dict (contains ``epoch``, ``metrics``, etc.).
    """
    path = Path(path)
    ckpt = torch.load(path, map_location=self.device, weights_only=False)
    self.model.load_state_dict(ckpt["model_state_dict"])
    self.optimizer.load_state_dict(ckpt["optimizer_state_dict"])
    self.history = ckpt.get("history", self.history)
    return ckpt

DP Trainer

auditml.training.dp_trainer.DPTrainer

Bases: Trainer

Differentially-private training loop powered by Opacus.

Wraps the standard Trainer with Opacus's PrivacyEngine. After make_private() is called the training loop automatically clips per-sample gradients and injects calibrated noise.

Parameters:

Name Type Description Default
model Module

The model to train (must be Opacus-compatible — call validate_and_fix_model first if needed).

required
train_loader DataLoader

Training data loader.

required
val_loader DataLoader

Validation data loader.

required
optimizer Optimizer

PyTorch optimizer (will be wrapped by Opacus).

required
dp_config DPConfig

Differential privacy parameters.

required
criterion Module | None

Loss function.

None
device device | str

Torch device.

'cpu'
Source code in src/auditml/training/dp_trainer.py
class DPTrainer(Trainer):
    """Differentially-private training loop powered by Opacus.

    Wraps the standard ``Trainer`` with Opacus's ``PrivacyEngine``.
    After ``make_private()`` is called the training loop automatically
    clips per-sample gradients and injects calibrated noise.

    Parameters
    ----------
    model:
        The model to train (must be Opacus-compatible — call
        ``validate_and_fix_model`` first if needed).
    train_loader:
        Training data loader.
    val_loader:
        Validation data loader.
    optimizer:
        PyTorch optimizer (will be wrapped by Opacus).
    dp_config:
        Differential privacy parameters.
    criterion:
        Loss function.
    device:
        Torch device.
    """

    def __init__(
        self,
        model: nn.Module,
        train_loader: DataLoader,
        val_loader: DataLoader,
        optimizer: torch.optim.Optimizer,
        dp_config: DPConfig,
        criterion: nn.Module | None = None,
        device: torch.device | str = "cpu",
    ) -> None:
        super().__init__(
            model=model,
            train_loader=train_loader,
            val_loader=val_loader,
            optimizer=optimizer,
            criterion=criterion,
            device=device,
            max_grad_norm=None,  # Opacus handles clipping
        )
        self.dp_config = dp_config
        self.privacy_engine = None
        self.epsilon_history: list[float] = []
        self._is_private = False

    def make_private(self) -> None:
        """Attach the Opacus ``PrivacyEngine`` to model/optimizer/loader.

        After this call the training loop will enforce DP guarantees.
        This method must be called **before** ``train()``.

        The method uses either ``noise_multiplier`` (if set in config)
        or calibrates noise from ``(epsilon, delta, epochs)``.
        """
        from opacus import PrivacyEngine

        self.privacy_engine = PrivacyEngine()

        kwargs: dict[str, Any] = {
            "max_grad_norm": self.dp_config.max_grad_norm,
        }

        if self.dp_config.noise_multiplier is not None:
            kwargs["noise_multiplier"] = self.dp_config.noise_multiplier
        else:
            # Let Opacus calibrate noise from target epsilon/delta
            kwargs["noise_multiplier"] = self._calibrate_noise()

        (
            self.model,
            self.optimizer,
            self.train_loader,
        ) = self.privacy_engine.make_private(
            module=self.model,
            optimizer=self.optimizer,
            data_loader=self.train_loader,
            **kwargs,
        )

        self._is_private = True
        logger.info(
            "DP training enabled — max_grad_norm=%.2f, noise_multiplier=%.4f",
            self.dp_config.max_grad_norm,
            kwargs["noise_multiplier"],
        )

    def _calibrate_noise(self) -> float:
        """Estimate a noise multiplier from the target epsilon.

        Uses a simple heuristic: higher epsilon → less noise.
        For precise calibration Opacus's ``get_noise_multiplier`` can be
        used, but it requires knowing the number of steps in advance.
        """
        from opacus.accountants.utils import get_noise_multiplier

        # Estimate total steps (assume 20 epochs if unknown)
        steps_per_epoch = len(self.train_loader)
        sample_rate = 1.0 / steps_per_epoch if steps_per_epoch > 0 else 0.01

        noise = get_noise_multiplier(
            target_epsilon=self.dp_config.epsilon,
            target_delta=self.dp_config.delta,
            sample_rate=sample_rate,
            epochs=20,  # default planning horizon
        )
        logger.info(
            "Calibrated noise_multiplier=%.4f for epsilon=%.2f, delta=%.1e",
            noise, self.dp_config.epsilon, self.dp_config.delta,
        )
        return noise

    def train(
        self,
        epochs: int = 20,
        patience: int = 10,
        checkpoint_dir: str | Path | None = None,
    ) -> dict[str, list[float]]:
        """Run the DP training loop.

        If ``make_private()`` has not been called yet, it is called
        automatically before training begins.

        Returns the training history dict (same as ``Trainer.train()``)
        with an additional ``"epsilon"`` key.
        """
        if not self._is_private:
            self.make_private()

        # Add epsilon tracking to history
        self.history["epsilon"] = []

        result = super().train(
            epochs=epochs,
            patience=patience,
            checkpoint_dir=checkpoint_dir,
        )

        return result

    def _train_epoch(self, epoch: int, total_epochs: int) -> tuple[float, float]:
        """Train one epoch with DP noise.

        Identical to the parent except we record epsilon after each epoch.
        Opacus handles the gradient clipping and noise internally via the
        wrapped optimizer.
        """
        self.model.train()
        total_loss = 0.0
        correct = 0
        total = 0

        pbar = tqdm(
            self.train_loader,
            desc=f"Epoch {epoch}/{total_epochs} (DP)",
            leave=False,
        )
        for inputs, targets in pbar:
            inputs = inputs.to(self.device)
            targets = targets.to(self.device)

            self.optimizer.zero_grad()
            outputs = self.model(inputs)
            loss = self.criterion(outputs, targets)
            loss.backward()
            self.optimizer.step()

            batch_size = inputs.size(0)
            total_loss += loss.item() * batch_size
            correct += (outputs.argmax(1) == targets).sum().item()
            total += batch_size

            pbar.set_postfix(loss=f"{loss.item():.4f}")

        # Record privacy budget after this epoch
        if self.privacy_engine is not None:
            eps = self.privacy_engine.get_epsilon(self.dp_config.delta)
            self.epsilon_history.append(eps)
            self.history["epsilon"].append(eps)
            tqdm.write(f"  DP epsilon after epoch {epoch}: {eps:.2f}")

        return total_loss / max(total, 1), correct / max(total, 1)

    def get_epsilon(self) -> float:
        """Return the current cumulative privacy budget (epsilon).

        Returns
        -------
        float
            The epsilon spent so far. Returns 0.0 if training
            has not started or no steps have been taken.
        """
        if self.epsilon_history:
            return self.epsilon_history[-1]
        if self.privacy_engine is not None and self._is_private:
            try:
                return self.privacy_engine.get_epsilon(self.dp_config.delta)
            except (ValueError, RuntimeError):
                # No steps taken yet — accountant can't compute epsilon
                return 0.0
        return 0.0

    def save_checkpoint(
        self,
        directory: Path,
        epoch: int,
        metrics: dict[str, float] | None = None,
    ) -> Path:
        """Save checkpoint with DP-specific metadata.

        Extends the parent to include epsilon in the saved metrics.
        """
        if metrics is None:
            metrics = {}
        metrics["epsilon"] = self.get_epsilon()
        metrics["noise_multiplier"] = (
            self.dp_config.noise_multiplier
            if self.dp_config.noise_multiplier is not None
            else 0.0
        )
        return super().save_checkpoint(directory, epoch, metrics)

make_private() -> None

Attach the Opacus PrivacyEngine to model/optimizer/loader.

After this call the training loop will enforce DP guarantees. This method must be called before train().

The method uses either noise_multiplier (if set in config) or calibrates noise from (epsilon, delta, epochs).

Source code in src/auditml/training/dp_trainer.py
def make_private(self) -> None:
    """Attach the Opacus ``PrivacyEngine`` to model/optimizer/loader.

    After this call the training loop will enforce DP guarantees.
    This method must be called **before** ``train()``.

    The method uses either ``noise_multiplier`` (if set in config)
    or calibrates noise from ``(epsilon, delta, epochs)``.
    """
    from opacus import PrivacyEngine

    self.privacy_engine = PrivacyEngine()

    kwargs: dict[str, Any] = {
        "max_grad_norm": self.dp_config.max_grad_norm,
    }

    if self.dp_config.noise_multiplier is not None:
        kwargs["noise_multiplier"] = self.dp_config.noise_multiplier
    else:
        # Let Opacus calibrate noise from target epsilon/delta
        kwargs["noise_multiplier"] = self._calibrate_noise()

    (
        self.model,
        self.optimizer,
        self.train_loader,
    ) = self.privacy_engine.make_private(
        module=self.model,
        optimizer=self.optimizer,
        data_loader=self.train_loader,
        **kwargs,
    )

    self._is_private = True
    logger.info(
        "DP training enabled — max_grad_norm=%.2f, noise_multiplier=%.4f",
        self.dp_config.max_grad_norm,
        kwargs["noise_multiplier"],
    )

train(epochs: int = 20, patience: int = 10, checkpoint_dir: str | Path | None = None) -> dict[str, list[float]]

Run the DP training loop.

If make_private() has not been called yet, it is called automatically before training begins.

Returns the training history dict (same as Trainer.train()) with an additional "epsilon" key.

Source code in src/auditml/training/dp_trainer.py
def train(
    self,
    epochs: int = 20,
    patience: int = 10,
    checkpoint_dir: str | Path | None = None,
) -> dict[str, list[float]]:
    """Run the DP training loop.

    If ``make_private()`` has not been called yet, it is called
    automatically before training begins.

    Returns the training history dict (same as ``Trainer.train()``)
    with an additional ``"epsilon"`` key.
    """
    if not self._is_private:
        self.make_private()

    # Add epsilon tracking to history
    self.history["epsilon"] = []

    result = super().train(
        epochs=epochs,
        patience=patience,
        checkpoint_dir=checkpoint_dir,
    )

    return result

get_epsilon() -> float

Return the current cumulative privacy budget (epsilon).

Returns:

Type Description
float

The epsilon spent so far. Returns 0.0 if training has not started or no steps have been taken.

Source code in src/auditml/training/dp_trainer.py
def get_epsilon(self) -> float:
    """Return the current cumulative privacy budget (epsilon).

    Returns
    -------
    float
        The epsilon spent so far. Returns 0.0 if training
        has not started or no steps have been taken.
    """
    if self.epsilon_history:
        return self.epsilon_history[-1]
    if self.privacy_engine is not None and self._is_private:
        try:
            return self.privacy_engine.get_epsilon(self.dp_config.delta)
        except (ValueError, RuntimeError):
            # No steps taken yet — accountant can't compute epsilon
            return 0.0
    return 0.0

save_checkpoint(directory: Path, epoch: int, metrics: dict[str, float] | None = None) -> Path

Save checkpoint with DP-specific metadata.

Extends the parent to include epsilon in the saved metrics.

Source code in src/auditml/training/dp_trainer.py
def save_checkpoint(
    self,
    directory: Path,
    epoch: int,
    metrics: dict[str, float] | None = None,
) -> Path:
    """Save checkpoint with DP-specific metadata.

    Extends the parent to include epsilon in the saved metrics.
    """
    if metrics is None:
        metrics = {}
    metrics["epsilon"] = self.get_epsilon()
    metrics["noise_multiplier"] = (
        self.dp_config.noise_multiplier
        if self.dp_config.noise_multiplier is not None
        else 0.0
    )
    return super().save_checkpoint(directory, epoch, metrics)