Source code for pyttb.gcp_opt

"""Generalized CP Decomposition."""

# Copyright 2024 National Technology & Engineering Solutions of Sandia,
# LLC (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the
# U.S. Government retains certain rights in this software.

from __future__ import annotations

import logging
import time
from math import prod
from typing import Dict, Literal, Optional, Sequence, Tuple, Union

import numpy as np

import pyttb as ttb
from pyttb.gcp.fg_setup import function_type, setup
from pyttb.gcp.handles import Objectives
from pyttb.gcp.optimizers import LBFGSB, StochasticSolver
from pyttb.gcp.samplers import GCPSampler


[docs] def gcp_opt( # noqa: PLR0912,PLR0913 data: Union[ttb.tensor, ttb.sptensor], rank: int, objective: Union[Objectives, Tuple[function_type, function_type, float]], optimizer: Union[StochasticSolver, LBFGSB], init: Union[Literal["random"], ttb.ktensor, Sequence[np.ndarray]] = "random", mask: Optional[Union[ttb.tensor, np.ndarray]] = None, sampler: Optional[GCPSampler] = None, printitn: int = 1, ) -> Tuple[ttb.ktensor, ttb.ktensor, Dict]: """Fits Generalized CP decomposition with user-specified function. Parameters ---------- data: Tensor to decompose. rank: Rank of desired CP decomposition. objective: Objective function to minimize for the CP decomposition. Either a pre-defined objective or a tuple of function_handle, gradient_handle, and lower_bound. optimizer: Optimizer class for solving the decompistion problem defined. init: Initial solution to the problem. mask: A binary mask to note missing rather than sparse data. (Only valid for dense, LBFGSB solves) sampler: Class that defined sampling strategy for stochastic solves. printitn: Controls verbosity of printing throughout the solve Returns ------- Solution, Initial Guess, Dictionary of meta data """ if not isinstance(objective, Objectives): # TODO probably do some runtime type validation here to make # sure tuple is correct if len(objective) != 3: raise ValueError( "Objective must either be an Objectives enum or a tuple containing a " "function handle, gradient_handle and lower bound." ) if isinstance(objective, Objectives): # TODO not clear how to pass in other params to setup for ex huber function_handle, gradient_handle, lower_bound = setup(objective, data) else: function_handle, gradient_handle, lower_bound = objective if not isinstance(data, (ttb.tensor, ttb.sptensor)): raise ValueError("Input data must be tensor or sptensor.") tensor_size = prod(data.shape) if isinstance(data, ttb.tensor) and isinstance(mask, ttb.tensor): data *= mask nmissing = tensor_size - mask.nnz elif isinstance(data, ttb.sptensor) and mask is not None: raise ValueError("Cannot specify missing entries for sparse tensors") else: nmissing = 0 # Create initial guess M0 = _get_initial_guess(data, rank, init) if not isinstance(optimizer, (StochasticSolver, LBFGSB)): raise ValueError("Must select a supported optimizer.") if isinstance(data, ttb.sptensor) and isinstance(optimizer, LBFGSB): raise ValueError("For sparse tensor must use: ADAM, SGD, or ADAGRAD.") if isinstance(optimizer, StochasticSolver) and mask is not None: raise ValueError("Mask isn't supported for stochastic solves") # Welcome Message if printitn > 0: # TODO capture full verbosity from MATLAB optimizer_name = type(optimizer).__name__ objective_name = "user-provided" if isinstance(objective, Objectives): objective_name = objective.name welcome_msg = ( f"\nGCP-OPT-{optimizer_name} (Generalized CP Tensor Decomposition)\n" f"\nTensor shape: {data.shape} ({tensor_size} total entries)\n" f"GCP rank: {rank}\nGeneralized function type: {objective_name}" ) if nmissing > 0: welcome_msg += ( f"Missing entries: {nmissing} ({100*nmissing/tensor_size:.2g}%)" ) logging.info(welcome_msg) main_start = time.perf_counter() if isinstance(optimizer, StochasticSolver): result, info = optimizer.solve( M0, data, function_handle, gradient_handle, lower_bound, sampler ) else: if isinstance(mask, ttb.tensor): mask = mask.data assert isinstance(data, ttb.tensor) result, info = optimizer.solve( M0, data, function_handle, gradient_handle, lower_bound, mask ) info["main_time"] = time.perf_counter() - main_start return result, M0, info
def _get_initial_guess( data: Union[ttb.tensor, ttb.sptensor], rank: int, init: Union[Literal["random"], ttb.ktensor, Sequence[np.ndarray]], ) -> ttb.ktensor: """Get initial guess for gcp_opt. Returns ------- Normalized ktensor. """ # TODO might be nice to merge with ALS/other CP methods if isinstance(init, Sequence) and not isinstance(init, str): return ttb.ktensor(init).normalize("all") if isinstance(init, ttb.ktensor): init.normalize("all") return init if init == "random": factor_matrices = [] for n in range(data.ndims): factor_matrices.append(np.random.uniform(0, 1, (data.shape[n], rank))) M0 = ttb.ktensor(factor_matrices) M0 *= data.norm() / M0.norm() M0.normalize("all") return M0 raise ValueError(f"Unexpected input for init received: {init}")