Source code for snsynth.mst.mst

import numpy as np
import pandas as pd

try:
    from mbi import FactoredInference, Dataset, Domain
except ImportError:
    print("Please install mbi with:\n   pip install git+https://github.com/ryan112358/private-pgm.git")

from scipy import sparse
from disjoint_set import DisjointSet
import networkx as nx
import itertools
from scipy.special import logsumexp
from snsynth.base import Synthesizer
from snsynth.utils import cdp_rho, gaussian_noise

"""
Wrapper for MST synthesizer from Private PGM:
https://github.com/ryan112358/private-pgm/tree/e9ea5fcac62e2c5b92ae97f7afe2648c04432564

This is a generalization of the winning mechanism from the
2018 NIST Differential Privacy Synthetic Data Competition.

Unlike the original implementation, this one can work for any discrete dataset,
and does not rely on public provisional data for measurement selection.
"""


[docs]class MSTSynthesizer(Synthesizer): """Maximum Spanning Tree synthesizer, uses Private PGM. :param epsilon: privacy budget for the synthesizer :type epsilon: float :param delta: privacy parameter. Should be small, in the range of 1/(n * sqrt(n)) :type delta: float :param verbose: print diagnostic information during processing :type verbose: bool Reuses code and modifies it lightly from https://github.com/ryan112358/private-pgm/blob/master/mechanisms/mst.py. Awesome work McKenna et. al! """ def __init__(self, epsilon=0.1, delta=1e-9, *ignore, verbose=False ): if isinstance(epsilon, int): epsilon = float(epsilon) self.epsilon = epsilon self.delta = delta self.verbose = verbose self.synthesizer = None self.num_rows = None def fit( self, data, *ignore, transformer=None, categorical_columns=[], ordinal_columns=[], continuous_columns=[], preprocessor_eps=0.0, nullable=False, ): train_data = self._get_train_data( data, style='cube', transformer=transformer, categorical_columns=categorical_columns, ordinal_columns=ordinal_columns, continuous_columns=continuous_columns, nullable=nullable, preprocessor_eps=preprocessor_eps ) if self._transformer is None: raise ValueError("We weren't able to fit a transformer to the data. Please check your data and try again.") cards = self._transformer.cardinality if any (c is None for c in cards): raise ValueError("The transformer appears to have some continuous columns. Please provide only categorical or ordinal.") dimensionality = np.prod(cards) if self.verbose: print(f"Fitting with {dimensionality} dimensions") colnames = ["col" + str(i) for i in range(self._transformer.output_width)] cards = self._transformer.cardinality if len(cards) != len(colnames): raise ValueError("Cardinality and column names must be the same length.") domain = Domain(colnames, cards) self.num_rows = len(data) data = pd.DataFrame(train_data, columns=colnames) data = Dataset(df=data, domain=domain) self.MST(data, self.epsilon, self.delta) def sample(self, samples=None): if samples is None: samples = self.num_rows data = self.synthesizer.synthetic_data(rows=samples) decompressed = self.undo_compress_fn(data) data_iter = [tuple([c for c in t[1:]]) for t in decompressed.df.itertuples()] return self._transformer.inverse_transform(data_iter) def MST(self, data, epsilon, delta): rho = cdp_rho(epsilon, delta) sigma = np.sqrt(3/(2*rho)) cliques = [(col,) for col in data.domain] if self.verbose: print("Getting cliques") log1 = self.measure(data, cliques, sigma) data, log1, undo_compress_fn = self.compress_domain(data, log1) # Here's the decompress function self.undo_compress_fn = undo_compress_fn cliques = self.select(data, rho/3.0, log1) log2 = self.measure(data, cliques, sigma) engine = FactoredInference(data.domain, iters=5000) if self.verbose: print("Estimating marginals") est = engine.estimate(log1+log2) # Here's the synthesizer self.synthesizer = est def measure(self, data, cliques, sigma, weights=None): if weights is None: weights = np.ones(len(cliques)) weights = np.array(weights) / np.linalg.norm(weights) measurements = [] for proj, wgt in zip(cliques, weights): x = data.project(proj).datavector() y = x + gaussian_noise(sigma/wgt, x.size) Q = sparse.eye(x.size) measurements.append((Q, y, sigma/wgt, proj)) return measurements def compress_domain(self, data, measurements): supports = {} new_measurements = [] for Q, y, sigma, proj in measurements: col = proj[0] sup = y >= 3*sigma supports[col] = sup if supports[col].sum() == y.size: new_measurements.append((Q, y, sigma, proj)) else: # need to re-express measurement over the new domain y2 = np.append(y[sup], y[~sup].sum()) I2 = np.ones(y2.size) I2[-1] = 1.0 / np.sqrt(y.size - y2.size + 1.0) y2[-1] /= np.sqrt(y.size - y2.size + 1.0) I2 = sparse.diags(I2) new_measurements.append((I2, y2, sigma, proj)) undo_compress_fn = lambda data: self.reverse_data(data, supports) # noqa: E731 return self.transform_data(data, supports), new_measurements, undo_compress_fn def exponential_mechanism(self, q, eps, sensitivity, prng=np.random, monotonic=False): coef = 1.0 if monotonic else 0.5 scores = coef*eps/sensitivity*q probas = np.exp(scores - logsumexp(scores)) return prng.choice(q.size, p=probas) def select(self, data, rho, measurement_log, cliques=[]): engine = FactoredInference(data.domain, iters=1000) est = engine.estimate(measurement_log) weights = {} candidates = list(itertools.combinations(data.domain.attrs, 2)) for a, b in candidates: xhat = est.project([a, b]).datavector() x = data.project([a, b]).datavector() weights[a, b] = np.linalg.norm(x - xhat, 1) T = nx.Graph() T.add_nodes_from(data.domain.attrs) ds = DisjointSet() for e in cliques: T.add_edge(*e) ds.union(*e) r = len(list(nx.connected_components(T))) epsilon = np.sqrt(8*rho/(r-1)) for i in range(r-1): candidates = [e for e in candidates if not ds.connected(*e)] wgts = np.array([weights[e] for e in candidates]) idx = self.exponential_mechanism(wgts, epsilon, sensitivity=1.0) e = candidates[idx] T.add_edge(*e) ds.union(*e) return list(T.edges) def transform_data(self, data, supports): df = data.df.copy() newdom = {} for col in data.domain: support = supports[col] size = support.sum() newdom[col] = int(size) if size < support.size: newdom[col] += 1 mapping = {} idx = 0 for i in range(support.size): mapping[i] = size if support[i]: mapping[i] = idx idx += 1 assert idx == size df[col] = df[col].map(mapping) newdom = Domain.fromdict(newdom) return Dataset(df, newdom) def reverse_data(self, data, supports): df = data.df.copy() newdom = {} for col in data.domain: support = supports[col] mx = support.sum() newdom[col] = int(support.size) idx, extra = np.where(support)[0], np.where(~support)[0] mask = df[col] == mx if extra.size == 0: pass else: df.loc[mask, col] = np.random.choice(extra, mask.sum()) df.loc[~mask, col] = idx[df.loc[~mask, col]] newdom = Domain.fromdict(newdom) return Dataset(df, newdom)