import numpy as np
import pandas as pd
try:
from mbi import FactoredInference, Dataset, Domain, GraphicalModel
except ImportError:
print("Please install mbi with:\n pip install git+https://github.com/ryan112358/private-pgm.git")
import itertools
from snsynth.base import Synthesizer
from mbi import Dataset, FactoredInference, Domain
from snsynth.utils import cdp_rho, exponential_mechanism, gaussian_noise, powerset
from scipy import sparse
prng = np.random
class Identity(sparse.linalg.LinearOperator):
def __init__(self, n):
self.shape = (n,n)
self.dtype = np.float64
def _matmat(self, X):
return X
def __matmul__(self, X):
return X
def _transpose(self):
return self
def _adjoint(self):
return self
def downward_closure(Ws):
ans = set()
for proj in Ws:
ans.update(powerset(proj))
return list(sorted(ans, key=len))
def hypothetical_model_size(domain, cliques):
model = GraphicalModel(domain, cliques)
return model.size * 8 / 2 ** 20
def compile_workload(workload):
def score(cl):
return sum(len(set(cl) & set(ax)) for ax in workload)
return {cl: score(cl) for cl in downward_closure(workload)}
def filter_candidates(candidates, model, size_limit):
ans = {}
free_cliques = downward_closure(model.cliques)
for cl in candidates:
cond1 = hypothetical_model_size(model.domain, model.cliques + [cl]) <= size_limit
cond2 = cl in free_cliques
if cond1 or cond2:
ans[cl] = candidates[cl]
return ans
[docs]class AIMSynthesizer(Synthesizer):
"""AIM: An Adaptive and Iterative Mechanism
:param epsilon: privacy budget for the synthesizer
:type epsilon: float
:param delta: privacy parameter. Should be small, in the range of 1/(n * sqrt(n))
:type delta: float
:param verbose: print diagnostic information during processing
:type verbose: bool
Based on the code available in:
https://github.com/ryan112358/private-pgm/blob/master/mechanisms/aim.py
"""
def __init__(self, epsilon=1., delta=1e-9, max_model_size=80, degree=2, num_marginals=None, max_cells: int = 10000,
rounds=None, verbose=False):
if isinstance(epsilon, int):
epsilon = float(epsilon)
self.rounds = rounds
self.max_model_size = max_model_size
self.max_cells = max_cells
self.degree = degree
self.num_marginals = num_marginals
self.verbose = verbose
self.epsilon = epsilon
self.delta = delta
self.synthesizer = None
self.num_rows = None
self.original_column_names = None
def fit(
self,
data,
*ignore,
transformer=None,
categorical_columns=[],
ordinal_columns=[],
continuous_columns=[],
preprocessor_eps=0.0,
nullable=False,
):
if type(data) is pd.DataFrame:
self.original_column_names = data.columns
train_data = self._get_train_data(
data,
style='cube',
transformer=transformer,
categorical_columns=categorical_columns,
ordinal_columns=ordinal_columns,
continuous_columns=continuous_columns,
nullable=nullable,
preprocessor_eps=preprocessor_eps
)
if self._transformer is None:
raise ValueError("We weren't able to fit a transformer to the data. Please check your data and try again.")
cards = self._transformer.cardinality
if any(c is None for c in cards):
raise ValueError("The transformer appears to have some continuous columns. Please provide only categorical or ordinal.")
dimensionality = np.prod(cards)
if self.verbose:
print(f"Fitting with {dimensionality} dimensions")
print(self._transformer.output_width)
colnames = ["col" + str(i) for i in range(self._transformer.output_width)]
if len(cards) != len(colnames):
raise ValueError("Cardinality and column names must be the same length.")
domain = Domain(colnames, cards)
self.num_rows = len(data)
self.rho = 0 if self.delta == 0 else cdp_rho(self.epsilon, self.delta)
data = pd.DataFrame(train_data, columns=colnames)
data = Dataset(df=data, domain=domain)
workload = self.get_workload(
data, degree=self.degree, max_cells=self.max_cells, num_marginals=self.num_marginals
)
self.AIM(data, workload)
def sample(self, samples=None):
if samples is None:
samples = self.num_rows
data = self.synthesizer.synthetic_data(rows=samples)
data_iter = [tuple([c for c in t[1:]]) for t in data.df.itertuples()]
return self._transformer.inverse_transform(data_iter)
@staticmethod
def get_workload(data: Dataset, degree: int, max_cells: int, num_marginals: int = None):
workload = list(itertools.combinations(data.domain, degree))
workload = [cl for cl in workload if data.domain.size(cl) <= max_cells]
if num_marginals is not None:
workload = [workload[i] for i in prng.choice(len(workload), num_marginals, replace=False)]
# workload = [(cl, 1.0) for cl in workload]
return workload
def _worst_approximated(self, candidates, answers, model, eps, sigma):
errors = {}
sensitivity = {}
for cl in candidates:
wgt = candidates[cl]
x = answers[cl]
bias = np.sqrt(2 / np.pi) * sigma * model.domain.size(cl)
xest = model.project(cl).datavector()
errors[cl] = wgt * (np.linalg.norm(x - xest, 1) - bias)
sensitivity[cl] = abs(wgt)
max_sensitivity = max(sensitivity.values()) # if all weights are 0, could be a problem
return exponential_mechanism(errors, eps, max_sensitivity)
def AIM(self, data, workload):
rounds = self.rounds or 16 * len(data.domain)
# workload = [cl for cl, _ in W]
candidates = compile_workload(workload)
answers = {cl: data.project(cl).datavector() for cl in candidates}
oneway = [cl for cl in candidates if len(cl) == 1]
sigma = np.sqrt(rounds / (2 * 0.9 * self.rho))
epsilon = np.sqrt(8 * 0.1 * self.rho / rounds)
measurements = []
print('Initial Sigma', sigma)
rho_used = len(oneway) * 0.5 / sigma ** 2
for cl in oneway:
x = data.project(cl).datavector()
y = x + gaussian_noise(sigma, x.size)
I = Identity(y.size)
measurements.append((I, y, sigma, cl))
engine = FactoredInference(data.domain, iters=1000, warm_start=True)
model = engine.estimate(measurements)
t = 0
terminate = False
while not terminate:
t += 1
if self.rho - rho_used < 2 * (0.5 / sigma ** 2 + 1.0 / 8 * epsilon ** 2):
# Just use up whatever remaining budget there is for one last round
remaining = self.rho - rho_used
sigma = np.sqrt(1 / (2 * 0.9 * remaining))
epsilon = np.sqrt(8 * 0.1 * remaining)
terminate = True
rho_used += 1.0 / 8 * epsilon ** 2 + 0.5 / sigma ** 2
size_limit = self.max_model_size * rho_used / self.rho
small_candidates = filter_candidates(candidates, model, size_limit)
cl = self._worst_approximated(small_candidates, answers, model, epsilon, sigma)
n = data.domain.size(cl)
Q = Identity(n)
x = data.project(cl).datavector()
y = x + gaussian_noise(sigma, n)
measurements.append((Q, y, sigma, cl))
z = model.project(cl).datavector()
model = engine.estimate(measurements)
w = model.project(cl).datavector()
if self.verbose:
print('Selected', cl, 'Size', n, 'Budget Used', rho_used / self.rho)
if np.linalg.norm(w - z, 1) <= sigma * np.sqrt(2 / np.pi) * n:
if self.verbose:
print('(!!!!!!!!!!!!!!!!!!!!!!) Reducing sigma', sigma / 2)
sigma /= 2
epsilon *= 2
engine.iters = 2500
model = engine.estimate(measurements)
if self.verbose:
print("Estimating marginals")
self.synthesizer = model
def get_errors(self, data: Dataset, workload):
errors = []
for proj, wgt in workload:
X = data.project(proj).datavector()
Y = self.synthesizer.project(proj).datavector()
e = 0.5 * wgt * np.linalg.norm(X / X.sum() - Y / Y.sum(), 1)
errors.append(e)
print('Average Error: ', np.mean(errors))
return errors