Implementing Robust PCA in Python
In a previous post, I introduce robust PCA, the math behind and an example where I put the model in action. This post I will share my Python implementation of robust PCA. If you don’t have any background in convex optimization, some of the discussions here might be boring or irrelevant. If you really just need an implementation of robust PCA, skip the background section and you’ll find the code below.
First Some Background
Alternating Direction Method of Multipliers
Say you have a convex optimization problem that looks like this:
ADMM propses an iterative approach using the Lagrangian.
Say we are at iteration
If you have a closed form solution for the updates of
ADMM can be thought of as a tug-of-war between optimality gap and feasibility of the primal problem.
This can be used to set up a systematic way to early stop the algorithm and/or play around with
Robust PCA via ADMM
Let’s refresh our memories. The optimization problem for robust PCA was:
Then we can make following analogies:
What about minimization steps? I won’t go into the details of their derivation but I hope to point you in the right direction if you want to do it on your own.
where
Then
where
Implementation in Python with Numpy
Now that we have all the steps ready, we can start implementing. We will only need a Python environment with version 3.6+ (just because I like static type hinting feature) and numpy. You can easily set this up with conda or Pipenv.
import numpy as np
import numpy.linalg as la
Let’s first start by defining the proximal operators:
def soft_thresholding(y: np.ndarray, mu: float):
"""
Soft thresholding operator as explained in Section 6.5.2 of https://web.stanford.edu/~boyd/papers/pdf/prox_algs.pdf
Solves the following problem:
argmin_x (1/2)*||x-y||_F^2 + lmb*||x||_1
Parameters
----------
y : np.ndarray
Target vector/matrix
lmb : float
Penalty parameter
Returns
-------
x : np.ndarray
argmin solution
"""
return np.sign(y) * np.clip(np.abs(y) - mu, a_min=0, a_max=None)
def svd_shrinkage(y: np.ndarray, tau: float):
"""
SVD shrinakge operator as explained in Theorem 2.1 of https://statweb.stanford.edu/~candes/papers/SVT.pdf
Solves the following problem:
argmin_x (1/2)*||x-y||_F^2 + tau*||x||_*
Parameters
----------
y : np.ndarray
Target vector/matrix
tau : float
Penalty parameter
Returns
-------
x : np.ndarray
argmin solution
"""
U, s, Vh = np.linalg.svd(y, full_matrices=False)
s_t = soft_thresholding(s, tau)
return U.dot(np.diag(s_t)).dot(Vh)
I want my API to follow a scikit-learn
-like design so here’s how it should like at the end:
rpca = RobustPCA(lmb=4e-3, max_iter=100)
L, S = rpca.fit(X)
The only difference is that .fit
function doesn’t normally return anything in scikit-learn
but I want it to return the low-rank and sparse components for the sake of simplicity.
Then I start building the class with my main method .fit
. Starting with .fit
gives me a good idea about what other parameters I have to initialize or what submethods I’ll end up having to implement:
def fit(self, X: np.ndarray):
mu = self.mu_0_
Y = X / self._J(X, mu)
S = np.zeros_like(X)
S_last = np.empty_like(S)
for k in range(self.max_iter_):
# Solve argmin_L ||X - (L + S) + Y/mu||_F^2 + (lmb/mu)*||L||_*
L = svd_shrinkage(X - S + Y/mu, 1/mu)
# Solve argmin_S ||X - (L + S) + Y/mu||_F^2 + (lmb/mu)*||S||_1
S_last = S.copy()
S = soft_thresholding(X - L + Y/mu, self.lmb_/mu)
# Update dual variables Y <- Y + mu * (X - S - L)
Y += mu*(X - S - L)
r, h = self._get_residuals(X, S, L, S_last, mu)
# Check stopping cirteria
tol_r, tol_h = self._update_tols(X, L, S, Y)
if r < tol_r and h < tol_h:
break
# Update mu
mu = self._update_mu(mu, r, h)
return L, S
The private methods I need, ._J
, ._get_residuals
,._update_tols
and ._update_mu
are implemented as follows.
def _get_residuals(X: np.ndarray, S: np.ndarray, L: np.ndarray, S_last: np.ndarray, mu: float):
primal_residual = la.norm(X - S - L, ord="fro")
dual_residual = mu*la.norm(S - S_last, ord="fro")
return primal_residual, dual_residual
def _update_mu(self, mu: float, r: float, h: float):
if r > self.tau_ * h:
return mu * self.rho_
elif h > self.tau_ * r:
return mu / self.rho_
else:
return mu
def _update_tols(self, X, S, L, Y):
tol_primal = self.tol_rel_ * max(la.norm(X), la.norm(S), la.norm(L))
tol_dual = self.tol_rel_ * la.norm(Y)
return tol_primal, tol_dual
def _J(self, X: np.ndarray, lmb: float):
return max(np.linalg.norm(X), np.max(np.abs(X))/lmb)
We haven’t talked about ._J
. It’s a dual variable initialization technqiue discussed in Section 3.1 of Lin et al..
The last part is to write an __init__
function, add some docstrings and wrap everything up in a class.
class RobustPCA:
"""
Solves robust PCA using Inexact ALM as explained in Algorithm 5 of https://arxiv.org/pdf/1009.5055.pdf
Parameters
----------
lmb:
penalty on sparse errors
mu_0:
initial lagrangian penalty
rho:
learning rate
tau:
mu update criterion parameter
max_iter:
max number of iterations for the algorithm to run
tol_rel:
relative tolerance
"""
def __init__(self, lmb: float, mu_0: float=1e-5, rho: float=2, tau: float=10,
max_iter: int=10, tol_rel: float=1e-3):
assert mu_0 > 0
assert lmb > 0
assert rho > 1
assert tau > 1
assert max_iter > 0
assert tol_rel > 0
self.mu_0_ = mu_0
self.lmb_ = lmb
self.rho_ = rho
self.tau_ = tau
self.max_iter_ = max_iter
self.tol_rel_ = tol_rel
def fit(self, X: np.ndarray):
"""
Fits robust PCA to X and returns the low-rank and sparse components
Parameters
----------
X:
Original data matrix
Returns
-------
L:
Low rank component of X
S:
Sparse error component of X
"""
assert X.ndim == 2
mu = self.mu_0_
Y = X / self._J(X, mu)
S = np.zeros_like(X)
S_last = np.empty_like(S)
for k in range(self.max_iter_):
# Solve argmin_L ||X - (L + S) + Y/mu||_F^2 + (lmb/mu)*||L||_*
L = svd_shrinkage(X - S + Y/mu, 1/mu)
# Solve argmin_S ||X - (L + S) + Y/mu||_F^2 + (lmb/mu)*||S||_1
S_last = S.copy()
S = soft_thresholding(X - L + Y/mu, self.lmb_/mu)
# Update dual variables Y <- Y + mu * (X - S - L)
Y += mu*(X - S - L)
r, h = self._get_residuals(X, S, L, S_last, mu)
# Check stopping cirteria
tol_r, tol_h = self._update_tols(X, L, S, Y)
if r < tol_r and h < tol_h:
break
# Update mu
mu = self._update_mu(mu, r, h)
return L, S
def _J(self, X: np.ndarray, lmb: float):
"""
The function J() required for initialization of dual variables as advised in Section 3.1 of
https://people.eecs.berkeley.edu/~yima/matrix-rank/Files/rpca_algorithms.pdf
"""
return max(np.linalg.norm(X), np.max(np.abs(X))/lmb)
@staticmethod
def _get_residuals(X: np.ndarray, S: np.ndarray, L: np.ndarray, S_last: np.ndarray, mu: float):
primal_residual = la.norm(X - S - L, ord="fro")
dual_residual = mu*la.norm(S - S_last, ord="fro")
return primal_residual, dual_residual
def _update_mu(self, mu: float, r: float, h: float):
if r > self.tau_ * h:
return mu * self.rho_
elif h > self.tau_ * r:
return mu / self.rho_
else:
return mu
def _update_tols(self, X, S, L, Y):
tol_primal = self.tol_rel_ * max(la.norm(X), la.norm(S), la.norm(L))
tol_dual = self.tol_rel_ * la.norm(Y)
return tol_primal, tol_dual
Complexity Analysis
All the norms and matrix additions/summations/multiplications are elementwise operations so that they’re
Things to Try
The algorithm is ready to use, but here are a few suggestions I have for you to play around with the code a little and interact with it:
- Plot the interplay of
r
,h
andmu
over iterations to see the tug-of-war I mentioned earlier in action. - Take one frame and record its evolution over the course of the algorithm. Especially observe how the sparse component for that frame
S[frame,:]
changes over time. - You can estimate the rank of
L
A very accesible dataset is the cropped Yale B dataset where you have faces of different people taken under various lighting conditions. Just pick one or two people and incldue all the illumination conditions they have to see if you can extract their clean face in the low-rank component.