Source code for snsynth.transform.standard

from snsynth.transform.definitions import ColumnType
from .base import CachingColumnTransformer
from opendp.mod import enable_features, binary_search_param
from opendp.transformations import make_sized_bounded_mean, make_sized_bounded_variance, make_clamp, make_resize
from opendp.domains import atom_domain
from opendp.measurements import make_base_laplace
from snsql.sql._mechanisms.approx_bounds import approx_bounds
from snsql.sql.privacy import Privacy
import numpy as np

[docs]class StandardScaler(CachingColumnTransformer): """Transforms a column of values to scale with mean centered on 0 and unit variance. Some privacy budget is always used to estimate the mean and variance. If upper and lower are not supplied, the budget will also be used to estimate the bounds of the column. :param lower: The minimum value to scale to. :param upper: The maximum value to scale to. :param epsilon: The privacy budget to use. :param nullable: Whether the column can contain null values. If True, the output will be a tuple of (value, null_flag). :param odometer: The optional privacy odometer to use to track privacy budget spent. """ def __init__(self, *, lower=None, upper=None, epsilon=0.0, nullable=False, odometer=None): self.lower = lower self.upper = upper self.epsilon = epsilon self.budget_spent = [] self.nullable = nullable self.odometer = odometer self.scaler = None self.mean = None self.var = None super().__init__() @property def output_type(self): return ColumnType.CONTINUOUS @property def needs_epsilon(self): return True @property def cardinality(self): if self.nullable: return [None, 2] else: return [None] def allocate_privacy_budget(self, epsilon, odometer): self.epsilon = epsilon self.odometer = odometer def _fit_finish(self): if self.scaler is None: if self.epsilon is None or self.epsilon == 0.0: raise ValueError("StandardScaler requires epsilon to estimate mean and variance.") self._fit_vals = [float(v) for v in self._fit_vals if v is not None and not (isinstance(v, float) and np.isnan(v))] # set bounds if self.upper is None or self.lower is None: bounds_eps = self.epsilon / 2 self.epsilon -= bounds_eps self.fit_lower, self.fit_upper = approx_bounds(self._fit_vals, bounds_eps) if self.odometer is not None: self.odometer.spend(Privacy(epsilon=bounds_eps, delta=0.0)) self.budget_spent.append(bounds_eps) if self.fit_lower is None or self.fit_upper is None: raise ValueError("StandardScaler could not find upper and lower bounds.") else: self.fit_lower = self.lower self.fit_upper = self.upper # fit scaler bounds = (float(self.fit_lower), float(self.fit_upper)) n = len(self._fit_vals) enable_features("floating-point", "contrib") var_pre = make_clamp(bounds) >> make_resize(n, atom_domain(bounds), float(self.fit_lower)) >> make_sized_bounded_variance(size=n, bounds=bounds) mean_pre = make_clamp(bounds) >> make_resize(n, atom_domain(bounds), float(self.fit_lower)) >> make_sized_bounded_mean(size=n, bounds=bounds) v_e = self.epsilon * 0.8 m_e = self.epsilon - v_e v_s = binary_search_param(lambda s: var_pre >> make_base_laplace(s), d_in=1, d_out=v_e) m_s = binary_search_param(lambda s: mean_pre >> make_base_laplace(s), d_in=1, d_out=m_e) dpvar = var_pre >> make_base_laplace(v_s) dpmean = mean_pre >> make_base_laplace(m_s) self.var = dpvar(np.array(self._fit_vals)) self.var = np.clip(self.var, 0.001, (self.fit_upper - self.fit_lower) ** 2 / 4) self.mean = dpmean(np.array(self._fit_vals)) self.mean = np.clip(self.mean, self.fit_lower, self.fit_upper) if self.odometer is not None: self.odometer.spend(Privacy(epsilon=self.epsilon, delta=0.0)) self.budget_spent.append(self.epsilon) self._fit_complete = True if self.nullable: self.output_width = 2 else: self.output_width = 1 def _clear_fit(self): self._reset_fit() self.fit_lower = None self.fit_upper = None def _transform(self, val): if not self.fit_complete: raise ValueError("StandardScaler has not been fit yet.") if self.nullable and (val is None or isinstance(val, float) and np.isnan(val)): return (0.0, 1) else: val = (val - self.mean) / np.sqrt(self.var) if self.nullable: return (val, 0) else: return val def _inverse_transform(self, val): if not self.fit_complete: raise ValueError("StandardScaler has not been fit yet.") if self.nullable: v, n = val val = v if n == 1: return None val = val * np.sqrt(self.var) + self.mean return np.clip(val, self.fit_lower, self.fit_upper)