123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- import numpy as np
- import time
- from numpy import float64, sum, nonzero, newaxis, finfo
- nu = newaxis
- def emwfnnls(data, G, F, r, Tmax):
- tol = 0
- delta_measure = 1
- em_iter_max = round(Tmax / delta_measure) + 1 #
- T = np.empty(shape=(em_iter_max + 1))
- T.fill(np.nan)
- RMSE = np.empty(shape=(2, em_iter_max + 1))
- RMSE.fill(np.nan)
- # RRE = np.empty(shape=(em_iter_max + 1))
- # RRE.fill(np.nan)
- np.put(F, data.idxOF, data.sparsePhi_F)
- np.put(G, data.idxOG, data.sparsePhi_G)
- X = data.X + np.multiply(data.nW, np.dot(G, F))
- GtX = np.dot(G.T, X)
- GtG = np.dot(G.T, G)
- # Iterative updating
- G = G.T
- k = 0
- RMSE[:, k] = np.linalg.norm(F[:, 0:-1] - data.F[:, 0:-1], 2, axis=1) / np.sqrt(F.shape[1] - 1)
- T[k] = 0
- t = time.time()
- # Main loop
- while time.time() - t <= Tmax + delta_measure:
- # Estimation step
- X = data.X + np.multiply(data.nW, np.dot(G.T, F))
- # Maximisation step
- # Optimize F with fixed G
- np.put(F, data.idxOF, 0)
- F, _ = fnnls(GtG, GtX - GtG.dot(data.Phi_F))
- np.put(F, data.idxOF, data.sparsePhi_F)
- FFt = np.dot(F, F.T)
- FXt = np.dot(F, X.T)
- # Optimize G with fixed F
- np.put(G.T, data.idxOG, 0)
- G, _ = fnnls(FFt, FXt - FFt.dot(data.Phi_G.T))
- np.put(G.T, data.idxOG, data.sparsePhi_G)
- GtG = np.dot(G, G.T)
- GtX = np.dot(G, X)
- if time.time() - t - k * delta_measure >= delta_measure:
- k = k + 1
- if k >= em_iter_max + 1:
- break
- RMSE[:, k] = np.linalg.norm(F[:, 0:-1] - data.F[:, 0:-1], 2, axis=1) / np.sqrt(F.shape[1] - 1)
- T[k] = time.time() - t
- # RRE[k] = nmf_norm_fro(data.Xtheo, G.T, F, data.W)
- # if k%100==0:
- # print(str(k)+' '+str(RMSE[0,k])+' '+str(RMSE[1,k]))
- # plt.semilogy(RRE)
- # plt.show()
- return {'RMSE': RMSE, 'T': T}
- def stop_rule(X, GradX):
- # Stopping Criterions
- pGrad = GradX[np.any(np.dstack((X > 0, GradX < 0)), 2)]
- return np.linalg.norm(pGrad, 2)
- # machine epsilon
- eps = finfo(float64).eps
- def any(a):
- # assuming a vector, a
- larger_than_zero = sum(a > 0)
- if larger_than_zero:
- return True
- else:
- return False
- def find_nonzero(a):
- # returns indices of nonzero elements in a
- return nonzero(a)[0]
- def fnnls(AtA, Aty, epsilon=None, iter_max=None):
- """
- Given a matrix A and vector y, find x which minimizes the objective function
- f(x) = ||Ax - y||^2.
- This algorithm is similar to the widespread Lawson-Hanson method, but
- implements the optimizations described in the paper
- "A Fast Non-Negativity-Constrained Least Squares Algorithm" by
- Rasmus Bro and Sumen De Jong.
- Note that the inputs are not A and y, but are
- A^T * A and A^T * y
- This is to avoid incurring the overhead of computing these products
- many times in cases where we need to call this routine many times.
- :param AtA: A^T * A. See above for definitions. If A is an (m x n)
- matrix, this should be an (n x n) matrix.
- :type AtA: numpy.ndarray
- :param Aty: A^T * y. See above for definitions. If A is an (m x n)
- matrix and y is an m dimensional vector, this should be an
- n dimensional vector.
- :type Aty: numpy.ndarray
- :param epsilon: Anything less than this value is consider 0 in the code.
- Use this to prevent issues with floating point precision.
- Defaults to the machine precision for doubles.
- :type epsilon: float
- :param iter_max: Maximum number of inner loop iterations. Defaults to
- 30 * [number of cols in A] (the same value that is used
- in the publication this algorithm comes from).
- :type iter_max: int, optional
- """
- if epsilon is None:
- epsilon = np.finfo(np.float64).eps
- n = AtA.shape[0]
- if iter_max is None:
- iter_max = 30 * n
- if Aty.ndim != 1 or Aty.shape[0] != n:
- raise ValueError('Invalid dimension; got Aty vector of size {}, ' \
- 'expected {}'.format(Aty.shape, n))
- # Represents passive and active sets.
- # If sets[j] is 0, then index j is in the active set (R in literature).
- # Else, it is in the passive set (P).
- sets = np.zeros(n, dtype=np.bool)
- # The set of all possible indices. Construct P, R by using `sets` as a mask
- ind = np.arange(n, dtype=int)
- P = ind[sets]
- R = ind[~sets]
- x = np.zeros(n, dtype=np.float64)
- w = Aty
- s = np.zeros(n, dtype=np.float64)
- i = 0
- # While R not empty and max_(n \in R) w_n > epsilon
- while not np.all(sets) and np.max(w[R]) > epsilon and i < iter_max:
- # Find index of maximum element of w which is in active set.
- j = np.argmax(w[R])
- # We have the index in MASKED w.
- # The real index is stored in the j-th position of R.
- m = R[j]
- # Move index from active set to passive set.
- sets[m] = True
- P = ind[sets]
- R = ind[~sets]
- # Get the rows, cols in AtA corresponding to P
- AtA_in_p = AtA[P][:, P]
- # Do the same for Aty
- Aty_in_p = Aty[P]
- # Update s. Solve (AtA)^p * s^p = (Aty)^p
- s[P] = np.linalg.lstsq(AtA_in_p, Aty_in_p, rcond=None)[0]
- s[R] = 0.
- while np.any(s[P] <= epsilon):
- i += 1
- mask = (s[P] <= epsilon)
- alpha = np.min(x[P][mask] / (x[P][mask] - s[P][mask]))
- x += alpha * (s - x)
- # Move all indices j in P such that x[j] = 0 to R
- # First get all indices where x == 0 in the MASKED x
- zero_mask = (x[P] < epsilon)
- # These correspond to indices in P
- zeros = P[zero_mask]
- # Finally, update the passive/active sets.
- sets[zeros] = False
- P = ind[sets]
- R = ind[~sets]
- # Get the rows, cols in AtA corresponding to P
- AtA_in_p = AtA[P][:, P]
- # Do the same for Aty
- Aty_in_p = Aty[P]
- # Update s. Solve (AtA)^p * s^p = (Aty)^p
- s[P] = np.linalg.lstsq(AtA_in_p, Aty_in_p, rcond=None)[0]
- s[R] = 0.
- x = s.copy()
- w = Aty - AtA.dot(x)
- return x
|