Source code for snsynth.transform.bin

from .base import CachingColumnTransformer
from snsql.sql._mechanisms.approx_bounds import approx_bounds
from snsql.sql.privacy import Privacy
from snsynth.transform.definitions import ColumnType
import numpy as np

[docs]class BinTransformer(CachingColumnTransformer): """Transforms continuous values into a discrete set of bins. :param bins: The number of bins to create. :param lower: The minimum value to scale to. :param upper: The maximum value to scale to. :param epsilon: The privacy budget to use to infer bounds, if none provided. :param nullable: If null values are expected, a second output will be generated indicating null. :param odometer: The optional odometer to use to track privacy budget. """ def __init__(self, *, bins=10, lower=None, upper=None, epsilon=0.0, nullable=False, odometer=None): self.lower = lower self.upper = upper self.epsilon = epsilon self.bins = bins self.budget_spent = [] self.nullable = nullable self.odometer = odometer super().__init__() @property def output_type(self): return ColumnType.CATEGORICAL @property def needs_epsilon(self): return self.upper is None or self.lower is None @property def cardinality(self): if self.nullable: return [self.bins, 2] else: return [self.bins] def allocate_privacy_budget(self, epsilon, odometer): self.epsilon = epsilon self.odometer = odometer def _fit_finish(self): if self.epsilon is not None and self.epsilon > 0.0 and (self.lower is None or self.upper is None): self._fit_vals = [v for v in self._fit_vals if v is not None and not (isinstance(v, float) and np.isnan(v))] self.fit_lower, self.fit_upper = approx_bounds(self._fit_vals, self.epsilon) if self.odometer is not None: self.odometer.spend(Privacy(epsilon=self.epsilon, delta=0.0)) self.budget_spent.append(self.epsilon) if self.fit_lower is None or self.fit_upper is None: raise ValueError("BinTransformer could not find bounds.") elif self.lower is None or self.upper is None: raise ValueError("BinTransformer requires either epsilon or min and max.") else: self.fit_lower = self.lower self.fit_upper = self.upper self._fit_complete = True if self.nullable: self.output_width = 2 else: self.output_width = 1 def _clear_fit(self): self._reset_fit() self.fit_lower = None self.fit_upper = None # if bounds provided, we can immediately use without fitting if self.lower and self.upper: self._fit_complete = True if self.nullable: self.output_width = 2 else: self.output_width = 1 self.fit_lower = self.lower self.fit_upper = self.upper def _bin_edges(self, bin): return ( self.fit_lower + (bin / self.bins) * (self.fit_upper - self.fit_lower), self.fit_lower + ((bin + 1) / self.bins) * (self.fit_upper - self.fit_lower) ) def _bin(self, val): if not self.fit_complete: raise ValueError("BinTransformer has not been fit yet.") if self.nullable and (val is None or (isinstance(val, float) and np.isnan(val))): return 1 return int(self.bins * (val - self.fit_lower) / (self.fit_upper - self.fit_lower)) def _transform(self, val): if not self.fit_complete: raise ValueError("BinTransformer has not been fit yet.") if val is None or (isinstance(val, float) and np.isnan(val)): if self.nullable: return (0, 1) else: raise ValueError("Cannot transform None or NaN. Consider setting nullable=True.") val = self.fit_lower if val < self.fit_lower else val val = self.fit_upper if val > self.fit_upper else val if self.nullable: return (self._bin(val), 0) else: return self._bin(val) def _inverse_transform(self, val): if not self.fit_complete: raise ValueError("BinTransformer has not been fit yet.") if self.nullable: v, n = val if n == 1 or v is None or (isinstance(v, float) and np.isnan(v)): return None val = v lower, upper = self._bin_edges(val) return (lower + upper) / 2