Source code for matchmaker.prob.hmm

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
This module implements Hidden Markov Models for score following
"""

import time
import warnings
from typing import Any, Callable, Dict, Generator, List, Optional, Tuple, Union

import numpy as np
import progressbar
import scipy.spatial.distance as sp_dist
from hiddenmarkov import (
    ConstantTransitionModel,
    HiddenMarkovModel,
    ObservationModel,
    TransitionModel,
)
from numpy.typing import NDArray
from scipy.signal import convolve
from scipy.stats import gumbel_l, norm

from matchmaker.base import OnlineAlignment
from matchmaker.io.queue import RECVQueue
from matchmaker.io.stream import STREAM_END
from matchmaker.utils.errors import MatchmakerMissingParameterError
from matchmaker.utils.misc import (
    get_window_indices,
    interleave_with_constant,
    set_latency_stats,
)
from matchmaker.utils.tempo_models import (
    KalmanTempoModel,
    LinearTempoModel,
    MovingAverageTempoModel,
    ReactiveTempoModel,
    TempoModel,
)

# Alias for typing arrays
NDArrayFloat = NDArray[np.float32]
NDArrayInt = NDArray[np.int32]

DEFAULT_GAUSSIAN_AUDIO_PRECISION = 2.0

DEFAULT_GAUSSIAN_AUDIO_IOI_PRECISION = 2.0
DEFAULT_GUMBEL_AUDIO_SCALE = 0.05
QUEUE_TIMEOUT = 10


class BaseHMM(HiddenMarkovModel, OnlineAlignment):
    """
    Base class for Hidden Markov Model alignment methods.

    Parameters
    ----------
    observation_model: ObservationModel
        An observation (data) model for computing the observation probabilities.

    transition_model: TransitionModel
        A transition model for computing the transition probabilities.

    score_positions: np.ndarray
        The hidden states (positions in reference time).

    tempo_model: Optional[TempoModel]
        A tempo model instance

    has_insertions: bool
        A boolean indicating whether the state space consider inserted notes.
    """

    observation_model: ObservationModel
    transition_model: TransitionModel
    score_positions: Union[NDArrayFloat, NDArrayInt]
    tempo_model: Optional[TempoModel]
    has_insertions: bool
    _alignment_path: List[Tuple[int, int]]
    queue: Optional[RECVQueue]

    def __init__(
        self,
        observation_model: ObservationModel,
        transition_model: TransitionModel,
        score_positions: Optional[Union[NDArrayFloat, NDArrayInt]] = None,
        tempo_model: Optional[TempoModel] = None,
        has_insertions: bool = False,
        queue: Optional[RECVQueue] = None,
        patience: int = 10,
        **kwargs,
    ) -> None:
        HiddenMarkovModel.__init__(
            self,
            observation_model=observation_model,
            transition_model=transition_model,
            state_space=score_positions,
        )
        OnlineAlignment.__init__(
            self,
            score_positions=score_positions,
            queue=queue,
        )
        self.tempo_model = tempo_model
        self.has_insertions = has_insertions
        self.patience = patience
        self.input_index = 0

    def step(self, features) -> None:
        current_state = self.forward_algorithm_step(
            observation=features,
            log_probabilities=False,
        )
        self.current_index = current_state
        self.input_index += 1
        self.observation_model.current_state = current_state


class PitchHMM(BaseHMM):
    """
    Implements the behavior of a HiddenMarkovModel, specifically designed for
    the task of score following.

    Parameters
    ----------
    _transition_matrix : numpy.ndarray
        Matrix for computations of state transitions within the HMM.

    _observation_model : ObservationModel
        Object responsible for computing the observation probabilities for each
        state of the HMM.

    initial_distribution : numpy array
        The initial distribution of the model. If not given, it is assumed to
        be uniform.

    forward_variable : numpy array
        The current (latest) value of the forward variable.

    _variation_coeff : float
        The normalized coefficient of variation of the current (latest) forward
        variable. Used to determine the confidence of the prediction of the HMM.

    current_state : int
        The index of the current state of the HMM.
    """

    def __init__(
        self,
        reference_features: np.ndarray,  # snote_array
        queue: Optional[RECVQueue] = None,
        transition_model: Optional[TransitionModel] = None,
        observation_model: Optional[ObservationModel] = None,
        transition_matrix: Optional[NDArrayFloat] = None,
        pitch_obs_prob_func: Optional[Callable[..., NDArrayFloat]] = None,
        pitch_prob_args: Optional[Dict[str, Any]] = None,
        initial_probabilities: Optional[np.ndarray] = None,
        has_insertions: bool = True,
        piano_range: bool = True,
        **kwargs,
    ) -> None:
        """
        Initialize the object.

        Parameters
        ----------
        transition_matrix : numpy array
            The Tranistion probability matrix of HMM.

        pitch_profiles : numpy array
            The pre-computed pitch profiles, for each separate possible pitch
            in the MIDI range. Used in calculating the pitch observation
            probabilities.

        ioi_matrix : numpy array
            The pre-computed score IOI values in beats, from each unique state
            to all other states, stored in a matrix.

        ioi_precision : float
            The precision parameter for computing the IOI observation
            probability.

        score_onsets : numpy array

        initial_distribution : numpy array
            The initial distribution of the model. If not given, it is asumed to
            be uniform.
            Default = None.
        """
        OnlineAlignment.__init__(
            self,
            reference_features=reference_features,
        )
        self.reference_features = reference_features
        self.perf_onset = None
        (
            observation_model,
            transition_matrix,
            initial_probabilities,
            unique_onsets,
        ) = self._build_hmm_modules(
            inserted_states=has_insertions,
            piano_range=piano_range,
        )

        if transition_model is not None and transition_matrix is not None:
            warnings.warn(
                "Both `transition_model` and `transition_matrix` were "
                "provided. Only `transition_model` will be used."
            )
        obs_model_params_given = [
            pitch_obs_prob_func is not None,
            pitch_prob_args is not None,
        ]
        if observation_model is not None and any(obs_model_params_given):
            warnings.warn(
                "`observation_model` and params were provided. "
                "Only `observation_model` will be used."
            )

        if observation_model is None and not all(obs_model_params_given):
            missing_params = [
                pn
                for pn, given in zip(
                    [
                        "pitch_obs_prob_func",
                        "pitch_prob_args",
                    ],
                    obs_model_params_given,
                )
                if not given
            ]
            raise ValueError(missing_params)

        if transition_model is None:
            transition_model = ConstantTransitionModel(
                transition_probabilities=transition_matrix,
                init_probabilities=initial_probabilities,
            )

        BaseHMM.__init__(
            self,
            observation_model=observation_model,
            transition_model=transition_model,
            score_positions=unique_onsets,
            has_insertions=has_insertions,
            queue=queue,
        )

    def _build_hmm_modules(
        self,
        piano_range: bool = True,
        inserted_states: bool = True,
    ):
        snote_array = self.reference_features
        unique_sonsets = np.unique(snote_array["onset_beat"])
        unique_sonset_idxs = [
            np.where(snote_array["onset_beat"] == ui)[0] for ui in unique_sonsets
        ]
        chord_pitches = [snote_array["pitch"][uix] for uix in unique_sonset_idxs]
        pitch_profiles = compute_discrete_pitch_profiles(
            chord_pitches=chord_pitches,
            piano_range=piano_range,
            inserted_states=inserted_states,
        )

        # observation model
        observation_model = BernoulliPitchObservationModel(
            pitch_profiles=pitch_profiles,
        )

        if inserted_states:
            unique_onsets_s = np.insert(
                unique_sonsets,
                np.arange(1, len(unique_sonsets)),
                (unique_sonsets[:-1] + 0.5 * np.diff(unique_sonsets)),
            )
        else:
            unique_onsets_s = unique_sonsets

        transition_matrix = stable_transition_matrix(
            n_states=len(unique_onsets_s),
            dist=gumbel_l,
            scale=1.0,  # 0.5,
            inserted_states=inserted_states,
        )
        initial_probabilities = init_dist(
            n_states=len(unique_onsets_s),
            dist=gumbel_l,
        )

        return (
            observation_model,
            transition_matrix,
            initial_probabilities,
            unique_onsets_s,
        )


def jiang_transition_matrix(
    n_states: int,
    trans_prob: float = 0.8,
    # frame_rate, sigma, transition_variance: float
) -> NDArrayFloat:
    transition_matrix = np.zeros((n_states, n_states))

    for i in range(n_states):
        for j in range(n_states):
            if j <= i:
                transition_matrix[i, j] = 0  # (1 - p1 if we want to go back)
            elif j == i:
                transition_matrix[i, j] = trans_prob
            else:
                transition_matrix[i, j] = (1 - trans_prob) / (n_states - j)

    return transition_matrix


def gaussian_transition_matrix(n_states: int) -> NDArrayFloat:
    # Use broadcasting to compute the Gaussian PDF for each pair of states
    # transition_matrix = norm.pdf(states[:, np.newaxis], loc=states, scale=1)
    transition_matrix = np.eye(n_states)
    transition_matrix = np.roll(transition_matrix, 1, axis=1)

    return transition_matrix


def jiang_transition_matrix_from_sequence(sequence, frame_rate, sigma):
    """Compute the transition matrix from a given sequence for the Jiang HMM model.

    Args:
        sequence (list): List of note indices representing the sequence from score.
            ex. [0, 0, 1, 1, 2, 2, 2, 2, 3, 3, 4, 4, 5, 6, 6, 7, 7, 8, 8, 9, 9, 9]

        frame_rate (float): Frame rate in Hz.
        sigma (float): Standard deviation for note duration.

    Returns:
        np.array: A dense matrix representing the transition probabilities.
    """
    total_frames = len(sequence)
    notes = list(set(sequence))
    n_notes = len(notes)

    # Define state space (note, age)
    score_positions = []
    for note in notes:
        ages = [i for i, x in enumerate(sequence) if x == note]
        score_positions.extend([(note, age) for age in ages])

    n_states = len(score_positions)
    transition_matrix = np.zeros((n_states, n_states))
    delta = 1 / frame_rate

    for i in range(n_states - 1):
        current_note, current_age = score_positions[i]
        next_note, next_age = score_positions[i + 1]

        # Calculate the mean duration based on the note index
        mean_duration = (current_note + 1) * delta
        stddev_duration = sigma

        if stddev_duration == 0:
            stddev_duration = 1e-6  # Prevent division by zero

        # Calculate the Gaussian CDF terms for transition probability
        phi_current = norm.cdf((current_age * delta - mean_duration) / stddev_duration)
        phi_next = norm.cdf(
            ((current_age + 1) * delta - mean_duration) / stddev_duration
        )

        if (1 - phi_current) == 0:
            p1 = 1  # Prevent division by zero
        else:
            p1 = (1 - phi_next) / (1 - phi_current)

        # Ensure p1 is between 0 and 1
        p1 = max(0, min(p1, 1))

        if current_note == next_note:
            # Staying in the same note, increasing the age
            transition_matrix[i, i + 1] = p1
        else:
            # Moving to the next note
            transition_matrix[i, i + 1] = 1 - p1

    # Ensure the last state transitions to itself
    transition_matrix[n_states - 1, n_states - 1] = 1.0

    return transition_matrix, score_positions


def simple_transition_matrix(
    n_states, trans_prob: float = 0.6, stay_prob: float = 0.3
) -> NDArrayFloat:
    if not (0 <= stay_prob <= 1) or not (0 <= trans_prob <= 1):
        raise ValueError("Probabilities must be between 0 and 1.")
    if stay_prob + trans_prob > 1:
        raise ValueError("stay_prob + trans_prob must be <= 1.")

    matrix = np.zeros((n_states, n_states), dtype=np.float32)

    for i in range(n_states):
        if i == n_states - 1:
            matrix[i, i] = 1.0  # Absorbing state
            continue

        matrix[i, i] = stay_prob
        matrix[i, i + 1] = trans_prob

        # Distribute remaining probability over future states beyond i+1
        remaining_prob = 1.0 - stay_prob - trans_prob
        if i + 2 < n_states:
            num_other_states = n_states - (i + 2)
            uniform_prob = (
                remaining_prob / num_other_states if num_other_states > 0 else 0
            )
            matrix[i, i + 2 :] = uniform_prob
        else:
            # If no future states beyond i+1, give all remaining prob to i+1
            matrix[i, i + 1] += remaining_prob

    return matrix


def kalman_transition_matrix(n_states: int, transition_variance: float) -> NDArrayFloat:
    """
    Create a transition matrix based on a Kalman filter model.
    """
    transition_matrix = np.zeros((n_states, n_states))

    for i in range(n_states - 1):
        transition_matrix[i, i] = 1 - transition_variance
        transition_matrix[i, i + 1] = transition_variance

    transition_matrix[-1, -1] = 1  # Last state is an absorbing state

    return transition_matrix


def gumbel_transition_matrix(  # TODO check works for audio (parameter)
    n_states: int,
    mp_trans_state: int = 1,
    scale: float = 0.5,
    inserted_states: bool = False,
) -> NDArrayFloat:
    """
    Compute a transiton matrix, where each row follows a normalized Gumbel
    distribution.

    Parameters
    ----------
    n_states : int
        The number of states in the Hidden Markov Model (HMM), which is required
        for the size of the matrix.

    mp_trans_state : int
        Which state should have the largest probability to be transitioned into
        from the current state the model is in.
        Default = 1, which means that the model would prioritize transitioning
        into the state that is next in line, e.g. from State 3 to State 4.

    scale : float
        The scale parameter of the distribution.
        Default = 0.5

    inserted_states : boolean
        Indicates whether the HMM includes inserted states (intermediary states
        between chords for errors and insertions in the score following).
        Default = True

    Returns
    -------
    transition_matrix : numpy array
        The computed transition matrix for the HMM.
    """
    # Initialize transition matrix:
    transition_matrix = np.zeros((n_states, n_states), dtype="f8")

    # Compute transition matrix:
    for i in range(n_states):
        if inserted_states:
            if np.mod(i, 2) == 0:
                transition_matrix[i] = gumbel_l.pdf(
                    np.arange(n_states), loc=i + mp_trans_state * 2, scale=scale
                )
            else:
                transition_matrix[i] = gumbel_l.pdf(
                    np.arange(n_states), loc=i + mp_trans_state * 2 - 1, scale=scale
                )
        else:
            transition_matrix[i] = gumbel_l.pdf(
                np.arange(n_states), loc=i + mp_trans_state * 2 - 1, scale=scale
            )

    # Normalize transition matrix (so that it is a proper stochastic matrix):
    transition_matrix /= transition_matrix.sum(1, keepdims=True)

    # Return the computed transition matrix:
    return transition_matrix


def stable_transition_matrix(  # TODO check works for audio (parameter)
    n_states: int,
    mp_trans_state: int = 1,
    dist=gumbel_l,
    scale: float = 0.5,
    inserted_states: bool = False,
) -> NDArrayFloat:
    """
    Compute a transiton matrix, where each row follows a normalized Gumbel
    distribution.

    Parameters
    ----------
    n_states : int
        The number of states in the Hidden Markov Model (HMM), which is required
        for the size of the matrix.

    mp_trans_state : int
        Which state should have the largest probability to be transitioned into
        from the current state the model is in.
        Default = 1, which means that the model would prioritize transitioning
        into the state that is next in line, e.g. from State 3 to State 4.

    scale : float
        The scale parameter of the distribution.
        Default = 0.5

    inserted_states : boolean
        Indicates whether the HMM includes inserted states (intermediary states
        between chords for errors and insertions in the score following).
        Default = True

    Returns
    -------
    transition_matrix : numpy array
        The computed transition matrix for the HMM.
    """
    # Initialize transition matrix:
    transition_matrix = np.zeros((n_states, n_states), dtype="f8")
    # Compute transition matrix:
    for i in range(n_states):
        if inserted_states:
            if np.mod(i, 2) == 0:
                transition_matrix[i] = dist.pdf(
                    np.arange(n_states), loc=i + mp_trans_state * 2, scale=scale
                )
            else:
                transition_matrix[i] = dist.pdf(
                    np.arange(n_states), loc=i + mp_trans_state * 2 - 1, scale=scale
                )
        else:
            transition_matrix[i] = dist.pdf(
                np.arange(n_states), loc=i + mp_trans_state * 2 - 1, scale=scale
            )

    # Normalize transition matrix (so that it is a proper stochastic matrix):
    transition_matrix /= transition_matrix.sum(1, keepdims=True)

    # Return the computed transition matrix:
    return transition_matrix


def init_dist(
    n_states: int,
    dist=gumbel_l,
    loc: int = 0,
    scale: float = 10,
) -> NDArrayFloat:
    """
    Compute the initial probabilites for all states in the Hidden Markov Model
    (HMM), which follow a Gumbel distribution.

    Parameters
    ----------
    n_states : int
        The number of states in the Hidden Markov Model (HMM), which is required
        for the size of the initial probabilites vector.

    Returns
    -------
    init_probs : numpy array
        The computed initial probabilities in the form of a vector.
    """

    prob_scale: float = scale if scale < n_states else n_states / 10

    init_probs: np.ndarray = dist.pdf(
        np.arange(n_states),
        loc=loc,
        scale=prob_scale,
    )

    return init_probs


def gumbel_init_dist(
    n_states: int,
    loc: int = 0,
    scale: float = 10,
) -> NDArrayFloat:
    """
    Compute the initial probabilites for all states in the Hidden Markov Model
    (HMM), which follow a Gumbel distribution.

    Parameters
    ----------
    n_states : int
        The number of states in the Hidden Markov Model (HMM), which is required
        for the size of the initial probabilites vector.

    Returns
    -------
    init_probs : numpy array
        The computed initial probabilities in the form of a vector.
    """

    prob_scale: float = scale if scale < n_states else n_states / 10

    init_probs: np.ndarray = gumbel_l.pdf(
        np.arange(n_states),
        loc=loc,
        scale=prob_scale,
    )

    return init_probs


def compute_continous_pitch_profiles(  # TODO check separately
    spectral_features: NDArrayFloat,
    spectral_feature_times: NDArrayFloat,
    onset_times: NDArrayFloat,
    eps: float = 0.01,
    context: int = 3,
    normalize=True,
    inserted_states=True,
) -> NDArrayFloat:
    onset_idxs_in_features = np.searchsorted(
        a=spectral_feature_times,
        v=onset_times,
        side="left",
    )

    window_indices = get_window_indices(
        indices=onset_idxs_in_features,
        context=context,
    )

    mask = (window_indices >= 0)[:, :, np.newaxis]
    _pitch_profiles = (spectral_features[window_indices] * mask).sum(1)

    if inserted_states:
        pitch_profiles = np.ones(2 * len(onset_times)) * eps

        pitch_profiles[np.arange(len(onset_times)) * 2] += _pitch_profiles
    else:
        pitch_profiles = _pitch_profiles

    if normalize:
        pitch_profiles /= pitch_profiles.sum(1, keepdims=True)

    return pitch_profiles


def compute_chord_matrix(chord_pitches: List[NDArrayInt]) -> NDArrayFloat:
    num_rows = len(chord_pitches)
    matrix = np.zeros((num_rows, 128), dtype=float)

    # Flatten the chord pitches and create corresponding row indices
    row_indices = np.repeat(np.arange(num_rows), [len(p) for p in chord_pitches])
    col_indices = np.concatenate(chord_pitches)

    # Set the specified indices to 1
    matrix[row_indices, col_indices] = 1

    return matrix


def compute_discrete_pitch_profiles(
    chord_pitches: NDArrayFloat,
    profile: NDArrayFloat = np.array([0.02, 0.02, 1, 0.02, 0.02]),
    eps: float = 0.01,
    piano_range: bool = False,
    normalize: bool = True,
    inserted_states: bool = True,
) -> NDArrayFloat:
    """
    Pre-compute the pitch profiles used in calculating the pitch
    observation probabilities.

    Parameters
    ----------
    chord_pitches : numpy array
        A 2D array of size (n_chords, 128) with the pitch of each chord.

    profile : numpy array
        The probability "gain" of how probable are the closest pitches to
        the one in question.

    eps : float
        The epsilon value to be added to each pre-computed pitch profile.

    piano_range : boolean
        Indicates whether the possible MIDI pitches are to be restricted
        within the range of a piano.

    normalize : boolean
        Indicates whether the pitch profiles are to be normalized.

    Returns
    -------
    pitch_profiles : numpy array
        The pre-computed pitch profiles.
    """

    chord_matrix = compute_chord_matrix(chord_pitches=chord_pitches)

    pitch_profiles = convolve(chord_matrix, profile[None, :], mode="same")

    if inserted_states:
        pitch_profiles = interleave_with_constant(array=pitch_profiles)
        pitch_profiles = pitch_profiles[:-1]
    # Add extra value
    pitch_profiles += eps

    # Check whether to trim and normalize:
    if piano_range:
        pitch_profiles = pitch_profiles[:, 21:109]
    if normalize:
        pitch_profiles /= pitch_profiles.sum(1, keepdims=True)

    return pitch_profiles


# Old version, to be deprecated.
def compute_discrete_pitch_profiles_old(
    chord_pitches: NDArrayFloat,
    profile: NDArrayFloat = np.array([0.02, 0.02, 1, 0.02, 0.02], dtype=np.float32),
    eps: float = 0.01,
    piano_range: bool = False,
    normalize: bool = True,
    inserted_states: bool = True,
) -> NDArrayFloat:
    """
    Pre-compute the pitch profiles used in calculating the pitch
    observation probabilities.

    Parameters
    ----------
    chord_pitches : array-like
        The pitches of each chord in the piece.

    profile : numpy array
        The probability "gain" of how probable are the closest pitches to
        the one in question.

    eps : float
        The epsilon value to be added to each pre-computed pitch profile.

    piano_range : boolean
        Indicates whether the possible MIDI pitches are to be restricted
        within the range of a piano.

    normalize : boolean
        Indicates whether the pitch profiles are to be normalized.

    inserted_states : boolean
        Indicates whether the HMM uses inserted states between chord states.

    Returns
    -------
    pitch_profiles : numpy array
        The pre-computed pitch profiles.
    """
    # Compute the high and low contexts:
    low_context = profile.argmax()
    high_context = len(profile) - profile.argmax()

    # Get the number of states, based on the presence of inserted states:
    if inserted_states:
        n_states = 2 * len(chord_pitches) - 1
    else:
        n_states = len(chord_pitches)
    # Initialize the numpy array to store the pitch profiles:
    pitch_profiles = np.zeros((n_states, 128))

    # Compute the profiles:
    for i in range(n_states):
        # Work on chord states (even indices), not inserted (odd indices):

        if not inserted_states or (inserted_states and np.mod(i, 2) == 0):
            chord = chord_pitches[i // 2] if inserted_states else chord_pitches[i]

            for pitch in chord:
                lowest_pitch = pitch - low_context
                highest_pitch = pitch + high_context
                # Compute the indices which are to be updated:
                idx = slice(np.maximum(lowest_pitch, 0), np.minimum(highest_pitch, 128))
                # Add the values:
                pitch_profiles[i, idx] += profile

        # Add the extra value:
        pitch_profiles[i] += eps

    # Check whether to trim and normalize:
    if piano_range:
        pitch_profiles = pitch_profiles[:, 21:109]
    if normalize:
        pitch_profiles /= pitch_profiles.sum(1, keepdims=True)

    # Return the profiles:
    return pitch_profiles.astype(np.float32)


def compute_ioi_matrix(unique_onsets, inserted_states=False):
    # Construct unique onsets with skips:
    if inserted_states:
        unique_onsets_s = np.insert(
            unique_onsets,
            np.arange(1, len(unique_onsets)),
            (unique_onsets[:-1] + 0.5 * np.diff(unique_onsets)),
        )
        ioi_matrix = sp_dist.squareform(sp_dist.pdist(unique_onsets_s.reshape(-1, 1)))

    # ... or without skips:
    else:
        unique_onsets_s = unique_onsets
        ioi_matrix = sp_dist.squareform(sp_dist.pdist(unique_onsets.reshape(-1, 1)))

    return ioi_matrix


def compute_bernoulli_pitch_probabilities(
    pitch_obs: NDArrayFloat,
    pitch_profiles: NDArrayFloat,
) -> NDArrayFloat:
    """
    Compute pitch observation probabilities
    """

    # Compute Bernoulli probability:
    pitch_prob = (pitch_profiles**pitch_obs) * ((1 - pitch_profiles) ** (1 - pitch_obs))

    obs_prob = np.prod(pitch_prob, 1)

    return obs_prob


def compute_gaussian_audio_probabilities(
    audio_obs: NDArrayFloat,
    audio_features: NDArrayFloat,
    precision: float,
    norm_term: float,
) -> NDArrayFloat:
    """
    Compute Gaussian observations for audio features.
    """
    diff = audio_features - audio_obs

    exp_arg = -0.5 * precision * np.sum(diff**2, axis=1)

    obs_prob = norm_term * np.exp(exp_arg)

    return obs_prob


def compute_exponential_cosine_audio_probabilities(
    audio_obs: NDArrayFloat,
    audio_features: NDArrayFloat,
    rate: float,
    epsilon: float = 1e-10,
) -> NDArrayFloat:
    # Compute norms with epsilon to avoid division by zero
    obs_norm = np.linalg.norm(audio_obs.flatten()) + epsilon
    features_norm = np.linalg.norm(audio_features, axis=1, keepdims=True) + epsilon

    # Normalize the vectors
    audio_obs_norm = audio_obs.flatten() / obs_norm
    audio_features_norm = audio_features / features_norm

    # Cosine similarity
    cos_sim = np.dot(audio_features_norm, audio_obs_norm)
    # Compute cosine similarity
    cos_sim = np.dot(audio_features_norm, audio_obs_norm)

    # Convert to cosine distance
    cos_dist = 1.0 - cos_sim

    # Apply exponential decay
    obs_prob = rate * np.exp(-rate * cos_dist)

    return obs_prob


def compute_gaussian_ioi_observation_probability(
    ioi_obs: float,
    ioi_score: NDArrayFloat,
    tempo_est: float,
    ioi_precision: float,
    norm_term: float,
) -> NDArrayFloat:
    """
    Compute the IOI observation probability as a zero mean
    Gaussian

    Parameters
    ----------
    ioi_obs : numpy array
        All the observed IOI.

    current_state : int
        The current state of the Score HMM.

    tempo_est : float
        The tempo estimation.

    Returns
    -------
    obs_prob : numpy array
        The computed IOI observation probabilities for each state.
    """
    # Compute the expected argument:
    exp_arg = -0.5 * ((tempo_est * ioi_score - ioi_obs) ** 2) * ioi_precision

    obs_prob = norm_term * np.exp(exp_arg)
    return obs_prob


class BernoulliPitchObservationModel(ObservationModel):
    """
    Computes the probabilities that an observation was emitted, i.e. the
    likelihood of observing performed notes at the current moment/state.

    Parameters
    ----------
    pitch_profiles : NDArrayFloat
        Pre-computed pitch profiles, for each separate possible pitch
        in the MIDI range. Used in calculating the pitch observation
        probabilities.
    """

    def __init__(self, pitch_profiles: NDArrayFloat):
        """
        The initialization method.

        Parameters
        ----------
        pitch_profiles : NDArrayFloat
            he pre-computed pitch profiles, for each separate possible pitch
            in the MIDI range. Used in calculating the pitch observation
            probabilities.
        """
        super().__init__(use_log_probabilities=False)
        # Store the parameters of the object:
        self.pitch_profiles = pitch_profiles

    def __call__(self, observation: NDArrayFloat) -> NDArrayFloat:
        return compute_bernoulli_pitch_probabilities(
            pitch_obs=observation,
            pitch_profiles=self.pitch_profiles,
        )


class GaussianAudioPitchObservationModel(ObservationModel):
    """
    Computes the probabilities that an observation was emitted, i.e. the
    likelihood of observing performed audio frame at the current moment/state.

    Parameters
    ----------
    audio_features : NDArrayFloat
        Audio features from the reference (score). Used in calculating
        the pitch observation probabilities.
    """

    def __init__(
        self,
        audio_features: NDArrayFloat,
        precision: float = 1,
    ):
        """
        The initialization method.

        Parameters
        ----------
        audio_features : NDArrayFloat
            he pre-computed audio features for the reference (e.g., score).
            Used in calculating the pitch observation probabilities.
        """
        super().__init__(use_log_probabilities=False)
        # Store the parameters of the object:
        self.audio_features = audio_features
        self.precision = precision
        self.norm_term = np.sqrt(0.5 * precision / np.pi)

    def __call__(self, observation: NDArrayFloat) -> NDArrayFloat:
        return compute_gaussian_audio_probabilities(
            audio_obs=observation,
            audio_features=self.audio_features,
            precision=self.precision,
            norm_term=self.norm_term,
        )


class CosineExpAudioPitchObservationModel(ObservationModel):
    """
    Computes the probabilities that an observation was emitted, i.e. the
    likelihood of observing performed audio frame at the current moment/state.

    Parameters
    ----------
    audio_features : NDArrayFloat
        Audio features from the reference (score). Used in calculating
        the pitch observation probabilities.
    """

    def __init__(
        self,
        audio_features: NDArrayFloat,
        rate: float = 1,
    ):
        """
        The initialization method.

        Parameters
        ----------
        audio_features : NDArrayFloat
            he pre-computed audio features for the reference (e.g., score).
            Used in calculating the pitch observation probabilities.
        """
        super().__init__(use_log_probabilities=False)
        # Store the parameters of the object:
        self.audio_features = audio_features
        self.rate = rate

    def __call__(self, observation: NDArrayFloat) -> NDArrayFloat:
        return compute_exponential_cosine_audio_probabilities(
            audio_obs=observation,
            audio_features=self.audio_features,
            rate=self.rate,
        )


class GaussianAudioPitchTempoObservationModel(ObservationModel):
    """
    Computes the probabilities that an observation was emitted, i.e. the
    likelihood of observing performed audio frame at the current moment/state.

    Parameters
    ----------
    audio_features : NDArrayFloat
        Audio features from the reference (score). Used in calculating
        the pitch observation probabilities.
    """

    def __init__(
        self,
        audio_features: NDArrayFloat,
        pitch_precision: float = 1,
        ioi_precision: float = 1,
    ):
        """
        The initialization method.

        Parameters
        ----------
        audio_features : NDArrayFloat
            he pre-computed audio features for the reference (e.g., score).
            Used in calculating the pitch observation probabilities.
        """
        super().__init__(use_log_probabilities=False)
        # Store the parameters of the object:
        self.audio_features = audio_features
        self.pitch_precision = pitch_precision
        self.pitch_norm_term = np.sqrt(0.5 * pitch_precision / np.pi)
        self.ioi_norm_term = np.sqrt(0.5 * ioi_precision / np.pi)
        self.ioi_precision = ioi_precision
        self.current_state = None
        self.states = np.arange(len(audio_features))

    def __call__(self, observation: NDArrayFloat) -> NDArrayFloat:
        pitch_obs, tempo_est = observation

        if self.current_state is None:
            estimated_state = 0
        else:
            estimated_state = self.current_state + tempo_est

        pitch_prob = compute_gaussian_audio_probabilities(
            audio_obs=pitch_obs,
            audio_features=self.audio_features,
            precision=self.pitch_precision,
            norm_term=self.pitch_norm_term,
        )

        exp_arg = -0.5 * ((self.states - estimated_state) ** 2) * self.ioi_precision

        tempo_prob = self.ioi_norm_term * np.exp(exp_arg)

        obs_prob = pitch_prob * tempo_prob

        return obs_prob


class CosineExpGaussianAudioPitchTempoObservationModel(ObservationModel):
    """
    Computes the probabilities that an observation was emitted, i.e. the
    likelihood of observing performed audio frame at the current moment/state.

    Parameters
    ----------
    audio_features : NDArrayFloat
        Audio features from the reference (score). Used in calculating
        the pitch observation probabilities.
    """

    def __init__(
        self,
        audio_features: NDArrayFloat,
        # ioi_matrix: Optional[NDArrayFloat] = None,
        pitch_rate: float = 1,
        ioi_precision: float = 1,
    ):
        """
        The initialization method.

        Parameters
        ----------
        audio_features : NDArrayFloat
            he pre-computed audio features for the reference (e.g., score).
            Used in calculating the pitch observation probabilities.
        """
        super().__init__(use_log_probabilities=False)
        # Store the parameters of the object:
        self.audio_features = audio_features
        self.pitch_rate = pitch_rate

        self.ioi_norm_term = np.sqrt(0.5 * ioi_precision / np.pi)
        self.ioi_precision = ioi_precision

        # if ioi_matrix is None:
        #     ioi_matrix = compute_ioi_matrix(
        #         unique_onsets=np.arange(len(audio_features))
        #     )
        # self.ioi_matrix = ioi_matrix
        self.current_state = None
        self.states = np.arange(len(audio_features))

    def __call__(self, observation: NDArrayFloat) -> NDArrayFloat:
        pitch_obs, tempo_est = observation

        # ioi_idx = self.current_state if self.current_state is not None else 0

        if self.current_state is None:
            estimated_state = 0
        else:
            estimated_state = self.current_state + tempo_est

        pitch_prob = compute_exponential_cosine_audio_probabilities(
            audio_obs=pitch_obs,
            audio_features=self.audio_features,
            rate=self.pitch_rate,
        )

        exp_arg = -0.5 * ((self.states - estimated_state) ** 2) * self.ioi_precision

        tempo_prob = self.ioi_norm_term * np.exp(exp_arg)

        obs_prob = pitch_prob * tempo_prob

        return obs_prob


class PitchIOIObservationModel(ObservationModel):
    def __init__(
        self,
        pitch_obs_prob_func: Callable[..., NDArrayFloat],
        ioi_obs_prob_func: Callable[..., NDArrayFloat],
        ioi_matrix: NDArrayFloat,
        pitch_prob_args: Optional[Dict[str, Any]] = None,
        ioi_prob_args: Optional[Dict[str, Any]] = None,
    ) -> None:
        super().__init__(use_log_probabilities=False)

        # TODO: Add log probabilities
        self.pitch_obs_prob_func = pitch_obs_prob_func
        self.ioi_obs_prob_func = ioi_obs_prob_func
        self.pitch_prob_args = pitch_prob_args
        self.ioi_prob_args = ioi_prob_args
        self.ioi_matrix = ioi_matrix

    def __call__(self, observation: Any, *args, **kwargs) -> NDArrayFloat:
        pitch_obs, ioi_obs, tempo_est = observation
        ioi_idx = self.current_state if self.current_state is not None else 0

        ioi_score = self.ioi_matrix[ioi_idx]
        obs_prob = self.pitch_obs_prob_func(
            pitch_obs=pitch_obs,
            **self.pitch_prob_args,
        ) * self.ioi_obs_prob_func(
            ioi_obs=ioi_obs,
            ioi_score=ioi_score,
            tempo_est=tempo_est,
            **self.ioi_prob_args,
        )
        return obs_prob


class BernoulliGaussianPitchIOIObservationModel(PitchIOIObservationModel):
    def __init__(self, pitch_profiles, ioi_matrix, ioi_precision):
        """
        The initialization method.

        Parameters
        ----------
        pitch_profiles : numpy array
            he pre-computed pitch profiles, for each separate possible pitch
            in the MIDI range. Used in calculating the pitch observation
            probabilities.

        ioi_matrix : numpy array
            The pre-computed score IOI values in beats, from each unique state
            to all other states, stored in a matrix.

        ioi_precision : float
            The precision parameter for computing the IOI observation
            probability.
        """

        pitch_prob_args = dict(
            pitch_profiles=pitch_profiles,
        )
        ioi_prob_args = dict(
            ioi_precision=ioi_precision,
            norm_term=np.sqrt(0.5 * ioi_precision / np.pi),
        )
        PitchIOIObservationModel.__init__(
            self,
            pitch_obs_prob_func=compute_bernoulli_pitch_probabilities,
            ioi_obs_prob_func=compute_gaussian_ioi_observation_probability,
            ioi_matrix=ioi_matrix,
            pitch_prob_args=pitch_prob_args,
            ioi_prob_args=ioi_prob_args,
        )


class ACCPitchIOIObservationModel(ObservationModel):
    """
    Computes the probabilities that an observation was emitted, i.e. the
    likelihood of observing performed notes at the current moment/state.

    Parameters
    ----------
    _pitch_profiles : numpy array
        he pre-computed pitch profiles, for each separate possible pitch
        in the MIDI range. Used in calculating the pitch observation
        probabilities.

    _ioi_matrix : numpy array
        The pre-computed score IOI values in beats, from each unique state
        to all other states, stored in a matrix.

    _ioi_precision : float
        The precision parameter for computing the IOI observation probability.

    _ioi_norm_term : float
        The normalization term of the Gaussian distribution used for the
        computation of the IOI probabilities.

    TODO
    ----
    * Implement log probabilities
    """

    def __init__(self, pitch_profiles, ioi_matrix, ioi_precision, piano_range=False):
        """
        The initialization method.

        Parameters
        ----------
        pitch_profiles : numpy array
            he pre-computed pitch profiles, for each separate possible pitch
            in the MIDI range. Used in calculating the pitch observation
            probabilities.

        ioi_matrix : numpy array
            The pre-computed score IOI values in beats, from each unique state
            to all other states, stored in a matrix.

        ioi_precision : float
            The precision parameter for computing the IOI observation
            probability.
        """
        super().__init__(use_log_probabilities=False)
        # Store the parameters of the object:
        self._pitch_profiles = pitch_profiles
        self._ioi_matrix = ioi_matrix
        self._ioi_precision = ioi_precision
        # Compute the IOI normalization term:
        self._ioi_norm_term = np.sqrt(0.5 * self._ioi_precision / np.pi)
        self.current_state = None
        self.piano_range = piano_range

    def compute_pitch_observation_probability(self, pitch_obs):
        """
        Compute the pitch observation probability.

        Parameters
        ----------
        pitch_obs : numpy array
            All the MIDI pitch values in an observation.

        Returns
        -------
        pitch_obs_prob : numpy array
            The computed pitch observation probabilities for all states.
        """
        # Use Bernouli distribution to compute the prob:
        # Binary piano-roll observation:
        pitch_prof_obs = np.zeros((1, 128))
        if self.piano_range:
            pitch_prof_obs = np.zeros((1, 88))
        pitch_prof_obs[0, pitch_obs.astype(int)] = 1

        # Compute Bernoulli probability:
        pitch_prob = (self._pitch_profiles**pitch_prof_obs) * (
            (1 - self._pitch_profiles) ** (1 - pitch_prof_obs)
        )

        # Return the values:
        return np.prod(pitch_prob, 1)

    def compute_ioi_observation_probability(self, ioi_obs, current_state, tempo_est):
        """
        Compute the IOI observation probability.

        Parameters
        ----------
        ioi_obs : numpy array
            All the observed IOI.

        current_state : int
            The current state of the Score HMM.

        tempo_est : float
            The tempo estimation.

        Returns
        -------
        ioi_obs_prob : numpy array
            The computed IOI observation probabilities for each state.
        """
        # Use Gaussian distribution:
        ioi_idx = current_state if current_state is not None else 0
        # Compute the expected argument:
        exp_arg = (
            -0.5
            * ((tempo_est * self._ioi_matrix[ioi_idx] - ioi_obs) ** 2)
            * self._ioi_precision
        )

        # Return the value:
        return self._ioi_norm_term * np.exp(exp_arg)

    def get_score_ioi(self, current_state):
        """
        Get the score inter onset interval (IOI) between the current state and
        the previous state in beats.

        Parameters
        ----------
        current_state : int
            The current state of the Score HMM.

        Returns
        -------
        state_ioi : numpy.int
            The IOI in beats.
        """
        # Return the specific value:
        return self._ioi_matrix[current_state]

    def __call__(self, observation):
        pitch_obs, ioi_obs, tempo_est = observation
        observation_prob = self.compute_pitch_observation_probability(
            pitch_obs
        ) * self.compute_ioi_observation_probability(
            ioi_obs=ioi_obs, current_state=self.current_state, tempo_est=tempo_est
        )
        return observation_prob


[docs] class PitchIOIHMM(BaseHMM): """ Implements the behavior of a HiddenMarkovModel, specifically designed for the task of score following. Parameters ---------- _transition_matrix : numpy.ndarray Matrix for computations of state transitions within the HMM. _observation_model : ObservationModel Object responsible for computing the observation probabilities for each state of the HMM. initial_distribution : numpy array The initial distribution of the model. If not given, it is assumed to be uniform. forward_variable : numpy array The current (latest) value of the forward variable. _variation_coeff : float The normalized coefficient of variation of the current (latest) forward variable. Used to determine the confidence of the prediction of the HMM. current_state : int The index of the current state of the HMM. """
[docs] def __init__( self, reference_features: np.ndarray, # snote_array queue: Optional[RECVQueue] = None, tempo_model: TempoModel = None, transition_model: Optional[TransitionModel] = None, observation_model: Optional[PitchIOIObservationModel] = None, transition_matrix: Optional[NDArrayFloat] = None, pitch_obs_prob_func: Optional[Callable[..., NDArrayFloat]] = None, ioi_obs_prob_func: Optional[Callable[..., NDArrayFloat]] = None, ioi_matrix: Optional[NDArrayFloat] = None, pitch_prob_args: Optional[Dict[str, Any]] = None, ioi_prob_args: Optional[Dict[str, Any]] = None, initial_probabilities: Optional[np.ndarray] = None, has_insertions: bool = False, piano_range: bool = False, **kwargs, ) -> None: """ Initialize the object. Parameters ---------- transition_matrix : numpy array The Tranistion probability matrix of HMM. pitch_profiles : numpy array The pre-computed pitch profiles, for each separate possible pitch in the MIDI range. Used in calculating the pitch observation probabilities. ioi_matrix : numpy array The pre-computed score IOI values in beats, from each unique state to all other states, stored in a matrix. ioi_precision : float The precision parameter for computing the IOI observation probability. score_onsets : numpy array TODO initial_distribution : numpy array The initial distribution of the model. If not given, it is asumed to be uniform. Default = None. """ OnlineAlignment.__init__( self, reference_features=reference_features, ) self.reference_features = reference_features ( observation_model, transition_matrix, initial_probabilities, tempo_model, unique_onsets, ) = self._build_hmm_modules( inserted_states=has_insertions, piano_range=piano_range, tempo_model=tempo_model, ) if transition_model is not None and transition_matrix is not None: warnings.warn( "Both `transition_model` and `transition_matrix` were " "provided. Only `transition_model` will be used." ) obs_model_params_given = [ pitch_obs_prob_func is not None, ioi_obs_prob_func is not None, ioi_matrix is not None, pitch_prob_args is not None, ioi_prob_args is not None, ] if observation_model is not None and any(obs_model_params_given): warnings.warn( "`observation_model` and params were provided. " "Only `observation_model` will be used." ) if observation_model is None and not all(obs_model_params_given): missing_params = [ pn for pn, given in zip( [ "pitch_obs_prob_func", "ioi_obs_prob_func", "ioi_matrix", "pitch_prob_args", "ioi_prob_args", ], obs_model_params_given, ) if not given ] raise MatchmakerMissingParameterError(missing_params) if transition_model is None: transition_model = ConstantTransitionModel( transition_probabilities=transition_matrix, init_probabilities=initial_probabilities, ) if observation_model is None: observation_model = PitchIOIObservationModel( pitch_obs_prob_func=pitch_obs_prob_func, ioi_obs_prob_func=ioi_obs_prob_func, ioi_matrix=ioi_matrix, pitch_prob_args=pitch_prob_args, ioi_prob_args=ioi_prob_args, ) self.perf_onset = None self._prev_perf_time = None BaseHMM.__init__( self, observation_model=observation_model, transition_model=transition_model, score_positions=unique_onsets, tempo_model=tempo_model, has_insertions=has_insertions, queue=queue, **kwargs, )
def step(self, features) -> None: perf_time = self.current_perf_time ioi_obs = ( perf_time - self._prev_perf_time if self._prev_perf_time is not None else 0.0 ) self._prev_perf_time = perf_time current_state = self.forward_algorithm_step( observation=(features, ioi_obs, self.tempo_model.beat_period), log_probabilities=False, ) self.input_index += 1 prev_index = self.current_index if prev_index is None: prev_index = current_state if current_state > prev_index: update_tempo = (self.has_insertions and current_state % 2 == 0) or ( not self.has_insertions ) if update_tempo: self.tempo_model.update_beat_period( performed_onset=perf_time, score_onset=self.score_positions[current_state], ) self.current_index = current_state self.observation_model.current_state = current_state def _build_hmm_modules( self, piano_range: bool = False, inserted_states: bool = True, observation_model=ACCPitchIOIObservationModel, tempo_model=KalmanTempoModel, ): snote_array = self.reference_features unique_sonsets = np.unique(snote_array["onset_beat"]) unique_sonset_idxs = [ np.where(snote_array["onset_beat"] == ui)[0] for ui in unique_sonsets ] chord_pitches = [snote_array["pitch"][uix] for uix in unique_sonset_idxs] pitch_profiles = compute_discrete_pitch_profiles( chord_pitches=chord_pitches, piano_range=piano_range, inserted_states=inserted_states, ) ioi_matrix = compute_ioi_matrix( unique_onsets=unique_sonsets, inserted_states=inserted_states, ) observation_model = observation_model( pitch_profiles=pitch_profiles, ioi_matrix=ioi_matrix, ioi_precision=1, piano_range=piano_range, ) if inserted_states: unique_onsets_s = np.insert( unique_sonsets, np.arange(1, len(unique_sonsets)), (unique_sonsets[:-1] + 0.5 * np.diff(unique_sonsets)), ) else: unique_onsets_s = unique_sonsets if tempo_model == KalmanTempoModel: tempo_model = tempo_model( init_beat_period=60 / 100, init_score_onset=unique_sonsets.min(), ) else: tempo_model = tempo_model( init_score_onset=unique_sonsets.min(), init_beat_period=60 / 100, ) transition_matrix = stable_transition_matrix( n_states=len(ioi_matrix[0]), dist=gumbel_l, scale=1.0, # 0.5, inserted_states=inserted_states, ) initial_probabilities = init_dist( n_states=len(ioi_matrix[0]), dist=gumbel_l, ) return ( observation_model, transition_matrix, initial_probabilities, tempo_model, unique_onsets_s, )
def build_local_transition_matrix(n_states, window): matrix = np.zeros((n_states, n_states)) for i in range(n_states): # Determine window bounds start = max(0, i - window) end = min(n_states, i + window + 1) width = end - start # Uniform probability across window matrix[i, start:end] = 1.0 / width return matrix class GaussianAudioPitchHMM(BaseHMM): """ Audio Gaussian HMM """ def __init__( self, reference_features: np.ndarray, # audio features queue: Optional[RECVQueue] = None, transition_model: Optional[TransitionModel] = None, observation_model: Optional[PitchIOIObservationModel] = None, transition_matrix: Optional[NDArrayFloat] = None, precision: Optional[float] = DEFAULT_GAUSSIAN_AUDIO_PRECISION, initial_probabilities: Optional[np.ndarray] = None, score_positions: Optional[NDArray] = None, patience: int = 50, ) -> None: """ Initialize the object. Parameters ---------- transition_matrix : numpy array The Tranistion probability matrix of HMM. pitch_profiles : numpy array The pre-computed pitch profiles, for each separate possible pitch in the MIDI range. Used in calculating the pitch observation probabilities. ioi_matrix : numpy array The pre-computed score IOI values in beats, from each unique state to all other states, stored in a matrix. ioi_precision : float The precision parameter for computing the IOI observation probability. score_onsets : numpy array TODO initial_distribution : numpy array The initial distribution of the model. If not given, it is asumed to be uniform. Default = None. """ OnlineAlignment.__init__( self, reference_features=reference_features, ) if transition_model is not None and transition_matrix is not None: warnings.warn( "Both `transition_model` and `transition_matrix` were " "provided. Only `transition_model` will be used." ) obs_model_params_given = [ precision is not None, ] if observation_model is not None and any(obs_model_params_given): warnings.warn( "`observation_model` and params were provided. " "Only `observation_model` will be used." ) if observation_model is None and not all(obs_model_params_given): missing_params = [ pn for pn, given in zip( [ "precision", ], obs_model_params_given, ) if not given ] raise MatchmakerMissingParameterError(missing_params) if transition_model is None: if transition_matrix is None: transition_matrix = simple_transition_matrix( n_states=len(reference_features), trans_prob=0.5, stay_prob=0.5, ) if initial_probabilities is None: initial_probabilities = gumbel_init_dist( n_states=len(reference_features) ) transition_model = ConstantTransitionModel( transition_probabilities=transition_matrix, init_probabilities=initial_probabilities, ) if observation_model is None: observation_model = GaussianAudioPitchObservationModel( audio_features=reference_features, precision=( precision if precision is not None else DEFAULT_GAUSSIAN_AUDIO_PRECISION ), ) self.perf_onset = None BaseHMM.__init__( self, observation_model=observation_model, transition_model=transition_model, score_positions=( score_positions if score_positions is not None else np.arange(len(reference_features)) ), tempo_model=None, has_insertions=False, queue=queue, patience=patience, ) self.input_features = None self.distance_func = "Euclidean" def __call__(self, input, *args, **kwargs): frame_index = args[0] if args else None frame, f_time = input current_state = self.forward_algorithm_step( observation=frame, log_probabilities=False, ) self._alignment_path.append((current_state, self.input_index)) self.input_index = self.input_index + 1 if frame_index is None else frame_index self.current_index = current_state self.observation_model.current_state = current_state return self.current_index def run(self, verbose: bool = True): same_state_counter = 0 empty_counter = 0 if verbose: pbar = progressbar.ProgressBar( max_val=self.n_states, # redirect_stdout=True ) pbar.start() while self.is_still_following(): prev_state = self.current_index queue_input = self.queue.get(timeout=QUEUE_TIMEOUT) if queue_input is STREAM_END: break features, f_time = queue_input self.last_queue_update = time.time() self.input_features = ( np.concatenate((self.input_features, features)) if self.input_features is not None else features ) if queue_input is not None: current_state = self(queue_input) empty_counter = 0 if current_state == prev_state: if same_state_counter < self.patience: same_state_counter += 1 else: break else: same_state_counter = 0 latency = time.time() - self.last_queue_update self.latency_stats = set_latency_stats( latency, self.latency_stats, self.input_index ) if verbose: pbar.update(int(current_state)) yield float(self.score_positions[current_state]) if verbose: pbar.finish() return self.alignment_path class GaussianAudioPitchTempoHMM(BaseHMM): """ Audio Gaussian HMM """ def __init__( self, reference_features: np.ndarray, # audio features queue: Optional[RECVQueue] = None, transition_model: Optional[TransitionModel] = None, observation_model: Optional[PitchIOIObservationModel] = None, tempo_model: Optional[TempoModel] = None, transition_matrix: Optional[NDArrayFloat] = None, pitch_precision: Optional[float] = DEFAULT_GAUSSIAN_AUDIO_PRECISION, ioi_precision: Optional[float] = DEFAULT_GAUSSIAN_AUDIO_IOI_PRECISION, transition_scale: Optional[float] = DEFAULT_GUMBEL_AUDIO_SCALE, initial_probabilities: Optional[np.ndarray] = None, score_positions: Optional[NDArray] = None, patience: int = 200, **kwargs, ) -> None: """ Initialize the object. Parameters ---------- transition_matrix : numpy array The Tranistion probability matrix of HMM. pitch_profiles : numpy array The pre-computed pitch profiles, for each separate possible pitch in the MIDI range. Used in calculating the pitch observation probabilities. ioi_matrix : numpy array The pre-computed score IOI values in beats, from each unique state to all other states, stored in a matrix. ioi_precision : float The precision parameter for computing the IOI observation probability. score_onsets : numpy array TODO initial_distribution : numpy array The initial distribution of the model. If not given, it is asumed to be uniform. Default = None. """ OnlineAlignment.__init__( self, reference_features=reference_features, ) if transition_model is not None and transition_matrix is not None: warnings.warn( "Both `transition_model` and `transition_matrix` were " "provided. Only `transition_model` will be used." ) obs_model_params_given = [ pitch_precision is not None, ] if observation_model is not None and any(obs_model_params_given): warnings.warn( "`observation_model` and params were provided. " "Only `observation_model` will be used." ) if observation_model is None and not all(obs_model_params_given): missing_params = [ pn for pn, given in zip( [ "precision", ], obs_model_params_given, ) if not given ] raise MatchmakerMissingParameterError(missing_params) if transition_model is None: if transition_matrix is None: transition_matrix = simple_transition_matrix( n_states=len(reference_features), trans_prob=0.475, stay_prob=0.45, ) if initial_probabilities is None: initial_probabilities = gumbel_init_dist( n_states=len(reference_features) ) transition_model = ConstantTransitionModel( transition_probabilities=transition_matrix, init_probabilities=initial_probabilities, ) if observation_model is None: observation_model = GaussianAudioPitchTempoObservationModel( audio_features=reference_features, pitch_precision=( pitch_precision if pitch_precision is not None else DEFAULT_GAUSSIAN_AUDIO_PRECISION ), ioi_precision=( ioi_precision if ioi_precision is not None else DEFAULT_GAUSSIAN_AUDIO_IOI_PRECISION ), ) if tempo_model is None: tempo_model = ReactiveTempoModel( init_beat_period=1.0, init_score_onset=0, ) self.perf_onset = None self.input_features = None self.distance_func = "Euclidean" BaseHMM.__init__( self, observation_model=observation_model, transition_model=transition_model, score_positions=( score_positions if score_positions is not None else np.arange(len(reference_features)) ), tempo_model=tempo_model, has_insertions=False, queue=queue, patience=patience, ) def __call__(self, input, *args, **kwargs): frame_index = args[0] if args else None frame, f_time = input current_state = self.forward_algorithm_step( observation=( frame, self.tempo_model.beat_period, ), log_probabilities=False, ) self._alignment_path.append((current_state, self.input_index)) self.input_index = self.input_index + 1 if frame_index is None else frame_index if current_state >= self.current_index: # Only update tempo if jump is forward # current_so = self.score_positions[current_state] self.tempo_model.update_beat_period( performed_onset=self.input_index, score_onset=current_state, ) self.perf_onset = f_time self.current_index = current_state self.observation_model.current_state = current_state return self.current_index def run(self, verbose: bool = True): same_state_counter = 0 empty_counter = 0 if verbose: pbar = progressbar.ProgressBar( max_val=self.n_states, # redirect_stdout=True ) pbar.start() while self.is_still_following(): prev_state = self.current_index queue_input = self.queue.get(timeout=QUEUE_TIMEOUT) if queue_input is STREAM_END: break features, f_time = queue_input self.last_queue_update = time.time() self.input_features = ( np.concatenate((self.input_features, features)) if self.input_features is not None else features ) if queue_input is not None: current_state = self(queue_input) empty_counter = 0 if current_state == prev_state: if same_state_counter < self.patience: same_state_counter += 1 else: break else: same_state_counter = 0 latency = time.time() - self.last_queue_update self.latency_stats = set_latency_stats( latency, self.latency_stats, self.input_index ) if verbose: pbar.update(int(current_state)) yield float(self.score_positions[current_state]) if verbose: pbar.finish() return self.alignment_path