Source code for GOOD.utils.fast_pytorch_kmeans.multi_kmeans

import math
import torch
from time import time
import numpy as np
from .init_methods import init_methods


[docs]class MultiKMeans: ''' Kmeans clustering algorithm implemented with PyTorch Parameters: n_clusters: int, Number of clusters max_iter: int, default: 100 Maximum number of iterations tol: float, default: 0.0001 Tolerance verbose: int, default: 0 Verbosity mode: {'euclidean', 'cosine'}, default: 'euclidean' Type of distance measure init_method: {'gaussian', 'random', 'k-means++'} Type of initialization minibatch: {None, int}, default: None Batch size of MinibatchKmeans algorithm if None perform full KMeans algorithm Attributes: centroids: torch.Tensor, shape: [n_clusters, n_features] cluster centroids ''' def __init__(self, n_clusters, n_kmeans, max_iter=100, tol=0.0001, verbose=0, mode="euclidean", init_method='kmeans++', minibatch=None): self.n_clusters = n_clusters self.max_iter = max_iter self.tol = tol self.verbose = verbose self.mode = mode self.init_method = init_method self.minibatch = minibatch self._loop = False self._show = False try: import PYNVML self._pynvml_exist = True except ModuleNotFoundError: self._pynvml_exist = False self.centroids = None
[docs] @staticmethod def cos_sim(a, b): """ Compute cosine similarity of 2 sets of vectors Parameters: a: torch.Tensor, shape: [m, n_features] b: torch.Tensor, shape: [n, n_features] """ a_norm = a.norm(dim=-1, keepdim=True) b_norm = b.norm(dim=-1, keepdim=True) a = a / (a_norm + 1e-8) b = b / (b_norm + 1e-8) return a @ b.transpose(-2, -1)
[docs] @staticmethod def euc_sim(a, b): """ Compute euclidean similarity of 2 sets of vectors Parameters: a: torch.Tensor, shape: [m, n_features] b: torch.Tensor, shape: [n, n_features] """ return 2 * a @ b.transpose(-2, -1) - (a ** 2).sum(dim=-1)[..., :, None] - (b ** 2).sum(dim=-1)[..., None, :]
[docs] def remaining_memory(self): """ Get remaining memory in gpu """ torch.cuda.synchronize() torch.cuda.empty_cache() if self._pynvml_exist: pynvml.nvmlInit() gpu_handle = pynvml.nvmlDeviceGetHandleByIndex(0) info = pynvml.nvmlDeviceGetMemoryInfo(gpu_handle) remaining = info.free else: remaining = torch.cuda.memory_allocated() return remaining
[docs] def max_sim(self, a, b): """ Compute maximum similarity (or minimum distance) of each vector in a with all of the vectors in b Parameters: a: torch.Tensor, shape: [m, n_features] b: torch.Tensor, shape: [n, n_features] """ device = a.device.type n_samples = a.shape[-2] if self.mode == 'cosine': sim_func = self.cos_sim elif self.mode == 'euclidean': sim_func = self.euc_sim sim = sim_func(a, b) max_sim_v, max_sim_i = sim.max(dim=-1) return max_sim_v, max_sim_i
[docs] def fit_predict(self, X, centroids=None): """ Combination of fit() and predict() methods. This is faster than calling fit() and predict() seperately. Parameters: X: torch.Tensor, shape: [n_samples, n_features] centroids: {torch.Tensor, None}, default: None if given, centroids will be initialized with given tensor if None, centroids will be randomly chosen from X Return: labels: torch.Tensor, shape: [n_samples] """ assert isinstance(X, torch.Tensor), "input must be torch.Tensor" assert X.dtype in [torch.half, torch.float, torch.double], "input must be floating point" assert X.ndim == 3, "input must be a 3d tensor with shape: [n_kmeans, n_samples, n_features]" n_kmeans, n_samples, n_features = X.shape self.n_kmeans = n_kmeans device = X.device.type start_time = time() if self.centroids is None: self.centroids = torch.stack( [init_methods[self.init_method](X[n], self.n_clusters, self.minibatch) for n in range(X.shape[0])], dim=0) if centroids is not None: self.centroids = centroids num_points_in_clusters = torch.ones(self.n_kmeans, self.n_clusters, device=device, dtype=X.dtype) closest = None for i in range(self.max_iter): iter_time = time() if self.minibatch is not None: x = X[:, np.random.choice(n_samples, size=[self.minibatch], replace=False)] else: x = X closest = self.max_sim(a=x, b=self.centroids)[1] uniques = [closest[i].unique(return_counts=True) for i in range(self.n_kmeans)] c_grad = torch.zeros_like(self.centroids) expanded_closest = closest[:, None].expand(-1, self.n_clusters, -1) mask = (expanded_closest == torch.arange(self.n_clusters, device=device)[None, :, None]).to(X.dtype) c_grad = mask @ x / mask.sum(-1, keepdim=True) c_grad[c_grad != c_grad] = 0 # remove NaNs error = (c_grad - self.centroids).pow(2).sum() if self.minibatch is not None: lr = 1 / num_points_in_clusters[:, :, None] * 0.9 + 0.1 else: lr = 1 for j in range(self.n_kmeans): num_points_in_clusters[j, uniques[j][0]] += uniques[j][1] self.centroids = self.centroids * (1 - lr) + c_grad * lr if self.verbose >= 2: print('iter:', i, 'error:', error.item(), 'time spent:', round(time() - iter_time, 4)) if error <= self.tol * self.n_kmeans: break if self.verbose >= 1: print( f'used {i + 1} iterations ({round(time() - start_time, 4)}s) to cluster {self.n_kmeans}x{n_samples} items into {self.n_clusters} clusters') return closest
[docs] def predict(self, X): """ Predict the closest cluster each sample in X belongs to Parameters: X: torch.Tensor, shape: [n_kmeans, n_samples, n_features] Return: labels: torch.Tensor, shape: [n_kmeans, n_samples] """ assert isinstance(X, torch.Tensor), "input must be torch.Tensor" assert X.dtype in [torch.half, torch.float, torch.double], "input must be floating point" assert X.ndim == 3, "input must be a 3d tensor with shape: [n_kmeans, n_samples, n_features]" return self.max_sim(a=X, b=self.centroids)[1]
[docs] def fit(self, X, centroids=None): """ Perform kmeans clustering Parameters: X: torch.Tensor, shape: [n_kmeans, n_samples, n_features] """ assert isinstance(X, torch.Tensor), "input must be torch.Tensor" assert X.dtype in [torch.half, torch.float, torch.double], "input must be floating point" assert X.ndim == 3, "input must be a 3d tensor with shape: [n_kmeans, n_samples, n_features]" self.fit_predict(X, centroids)