Source code for snsynth.mwem

import math
import random
from typing import List
import warnings
from itertools import combinations, product

import numpy as np
import pandas as pd

from snsynth.base import Synthesizer
from snsynth.utils import laplace_noise

class Query:
    def __init__(self, query):
        self.query = query

    def evaluate(self, hist):
        e = hist.T[tuple(self.query)]
        if isinstance(e, np.ndarray):
            return np.sum(e)
        else:
            return e

    def error(self, hist, synth_hist):
        return np.abs(self.evaluate(hist) - self.evaluate(synth_hist))

    def mask(self, hist):
        data = np.zeros_like(hist.copy())
        view = data.copy()
        view.T[tuple(self.query)] = 1.0
        return view

    @property
    def queries(self):
        return [self]

    @classmethod
    def make_arbitrary(cls, dims):
        inds = []
        for _, s in np.ndenumerate(dims):
            # Random linear sample, within dimensions
            a = np.random.randint(s)
            b = np.random.randint(s)
            l_b = min(a, b)
            u_b = max(a, b) + 1
            pre = []
            pre.append(l_b)
            pre.append(u_b)
            inds.append(pre)
        # Compose slice
        sl = []
        for ind in inds:
            sl.append(np.s_[ind[0]: ind[1]])
        return Query(sl)

    @classmethod
    def make_marginals(cls, dims_mask):
        # Makes all marginal slices matching the dimensions
        # Some should be set to None to marginalize.  For example,
        # if dims = (2,None,7), then the middle dimension is marginalized,
        # and slices for all of 2x7 are made.
        dims_mask = list(reversed(dims_mask))
        mask = [1 if v is not None else 0 for v in dims_mask]
        mask = np.cumsum(mask) - 1
        mask = [x if y is not None else None for x, y in zip(mask, dims_mask)]

        dims = [d for d in dims_mask if d is not None]
        ranges = [range(d) for d in dims]
        prod = product(*ranges)  # cartesian product

        return [
            Query([
                np.s_[:] if v is None else np.s_[p[v]] for v in mask
            ]) for p in prod
        ]


class Cuboid:
    # A cuboid is a collection of marginal queries that are mutually disjoint
    def __init__(self, queries, dims_mask):
        self.queries = queries
        dims_mask = dims_mask

    @property
    def n_cells(self):
        return len(self.queries)

    @property
    def n_cols(self):
        return np.sum([1 if c is not None else 0 for c in self.dims_mask])

    def error(self, hist, synth_hist):
        err = np.sum([q.error(hist, synth_hist) for q in self.queries])
        err = err / self.n_cells
        return err if err > 0.0 else 0.0

    @classmethod
    def make_cuboid(cls, dimensions, columns):
        assert max(columns) < len(dimensions)
        dims_mask = [d if c in columns else None for c, d in enumerate(dimensions)]
        queries = Query.make_marginals(dims_mask)
        return Cuboid(queries, dims_mask)

    @classmethod
    def make_n_way(cls, dimensions, n):
        n_dims = len(dimensions)
        if n_dims < n:
            return []
        indices = list(np.arange(n_dims))
        combos = list(combinations(indices, n))
        return [cls.make_cuboid(dimensions, c) for c in combos]


class Histogram:
    def __init__(self, data, dimensions, bins, split):
        self.data = data
        self.dimensions = dimensions
        self.bins = bins
        self.split = split
        self.queries = []
        assert (len(self.dimensions) == len(self.bins))
        assert (len(self.dimensions) == len(self.split))
        assert (all([a == b for a, b in zip(self.data.shape, self.dimensions)]))

    @property
    def dimensionality(self):
        return np.prod(self.dimensions)

    @property
    def n_cols(self):
        return len(self.dimensions)

    @property
    def n_cuboids(self):
        return 2 ** self.n_cols - 1

    @property
    def n_queries(self):
        return len(self.queries)

    @property
    def n_slices(self):
        return np.sum([len(q.queries) for q in self.queries])

    def add_arbitrary_queries(self, n_queries):
        for _ in range(n_queries):
            self.queries.append(Query.make_arbitrary(self.dimensions))

    def add_marginal_queries(self, max_cols=2):
        dims = self.dimensions
        for n in range(1, max_cols + 1):
            cuboids = Cuboid.make_n_way(dims, n)
            for c in cuboids:
                self.queries.extend(c.queries)

    def add_cuboid_queries(self, max_cols):
        dims = self.dimensions
        max_cols = max_cols if max_cols <= len(dims) else len(dims)
        for n in range(1, max_cols + 1):
            cuboids = Cuboid.make_n_way(dims, n)
            self.queries.extend(cuboids)

    @classmethod
    def histogramdd_indexes(cls, x: np.ndarray, category_lengths: List[int]) -> np.ndarray:
        # https://github.com/opendp/prelim/blob/main/python/stat_histogram.py#L9-L31
        """Compute counts of each combination of categories in d dimensions.
        Discrete version of np.histogramdd.
        :param x: data of shape [n, len(`category_lengths`)] of non-negative category indexes
        :param category_lengths: the number of unique categories per column
        """

        assert x.shape[1] == len(category_lengths)
        assert x.ndim == 2
        if not len(category_lengths):
            return np.array(x.shape[0])

        # consider each row as a multidimensional index into an ndarray
        # determine what those indexes would be should the ndarray be flattened
        # the flat indices uniquely identify each cell
        flat_indices = np.ravel_multi_index(x.T, category_lengths)

        # count the number of instances of each index
        hist = np.bincount(flat_indices, minlength=np.prod(category_lengths))

        # map counts back to d-dimensional output
        return hist.reshape(category_lengths)


[docs]class MWEMSynthesizer(Synthesizer): """ N-Dimensional numpy implementation of MWEM. (http://users.cms.caltech.edu/~katrina/papers/mwem-nips.pdf) From the paper: "[MWEM is] a broadly applicable, simple, and easy-to-implement algorithm, capable of substantially improving the performance of linear queries on many realistic datasets... (circa 2012)...MWEM matches the best known and nearly optimal theoretical accuracy guarantees for differentially private data analysis with linear queries." Linear queries used for sampling in this implementation are random contiguous slices of the n-dimensional numpy array. :param epsilon: Privacy budget. :type epsilon: float, optional :param q_count: Number of random queries in the pool to generate. Must be more than # of iterations :type q_count: int, optional :param iterations: Number of iterations of MWEM to run. MWEM will guess a reasonable number of iterations if this is not specified. :type iterations: int, optional :param splits: Allows you to specify feature dependence when creating internal histograms. Columns that are known to be dependent can be kept together. Example: splits=[[0,1],[2,3]] where columns 0 and 1 are dependent, columns 2 and 3 are dependent, and between groupings there is independence, defaults to [] :type splits: list, optional :param split_factor: If splits not specified, can instead subdivide pseudo-randomly. For example, split_factor=3 will make groupings of features of size 3 for the histograms. Note: this will likely make synthetic data worse. defaults to None :type split_factor: int, optional :param marginal_width: MWEM by default will create cuboids to measure marginals, and will use a heuristic to determine the maximum width of the marginals. This parameter allows you to specify that MWEM should always measure marginals up to width marginal_width. :type marginal_width: int, optional :param add_ranges: In addition to measuring cuboids, MWEM can measure randomly generated range queries. Range queries work well on columns that are binned from continuous values. :type add_range: bool, optional :param measure_only: MWEM operates by spending some privacy budget to select the best query. This parameter allows you to specify that MWEM should uniformly measure all queries, and not spend any privacy budget on the query selection. This is useful in limited cases. Allowing MWEM to select will usually work best. :type measure_only: bool, optional :param max_retries_exp_mechanism: In each iteration, MWEM tries to select a poorly-peforming query that hasn't yet been measured. If it fails, it will select one of the remaining queries uniformly at random. :type max_retries_exp_mechanism: int, optional :param mult_weights_iterations: Number of iterations of multiplicative weights, per iteration of MWEM, defaults to 20 :type mult_weights_iterations: int, optional :param verbose: Set to True to print debug information. :type verbose: bool, optional """ def __init__( self, epsilon=3.0, *ignore, q_count=None, iterations=None, splits=[], split_factor=None, marginal_width=None, add_ranges=False, measure_only=False, max_retries_exp_mechanism=10, mult_weights_iterations=20, verbose=False ): if isinstance(epsilon, int): epsilon = float(epsilon) self.epsilon = epsilon self.q_count = q_count self.iterations = iterations self.mult_weights_iterations = mult_weights_iterations self.add_ranges = add_ranges self.measure_only = measure_only self.synthetic_data = None self.data_bins = None self.real_data = None self.splits = splits self.split_factor = split_factor self.mins_maxes = {} self.scale = {} self.marginal_width = marginal_width self.debug = verbose # Pandas check self.pandas = False self.pd_cols = None self.pd_index = None # Query trackers self.q_values = None self.max_retries_exp_mechanism = max_retries_exp_mechanism self.accountant = [] @property def spent(self): return sum([a for a in self.accountant]) def fit( self, data, *ignore, transformer=None, categorical_columns=None, ordinal_columns=None, continuous_columns=None, preprocessor_eps=0.0, nullable=False): """ Follows sdgym schema to be compatible with their benchmark system. :param data: Dataset to use as basis for synthetic data :type data: np.ndarray :return: synthetic data, real data histograms :rtype: np.ndarray """ train_data = self._get_train_data( data, style='cube', transformer=transformer, categorical_columns=categorical_columns, ordinal_columns=ordinal_columns, continuous_columns=continuous_columns, nullable=nullable, preprocessor_eps=preprocessor_eps ) data = train_data if isinstance(data, np.ndarray): self.data = data.copy() elif isinstance(data, pd.DataFrame): self.pandas = True for col in data.columns: data[col] = pd.to_numeric(data[col], errors="ignore") self.data = data.to_numpy().copy() self.pd_cols = data.columns self.pd_index = data.index elif isinstance(data, list): self.data = np.array(data) else: raise ValueError("Data must be a list of tuples, a numpy array, or a pandas dataframe.") if self.split_factor is not None and self.splits == []: self.splits = self._generate_splits(self.data.T.shape[0], self.split_factor) elif self.split_factor is None and self.splits == []: # Set split factor to default to shape[1] self.split_factor = self.data.shape[1] self.splits = self._generate_splits(self.data.T.shape[0], self.split_factor) if len(self.splits) == 0: self.histograms = self._histogram_from_data_attributes( self.data, [np.arange(self.data.shape[1])] ) else: self.histograms = self._histogram_from_data_attributes(self.data, self.splits) if self.debug: print(f"Processing {len(self.histograms)} histograms") print() # load queries into each split n_histograms = len(self.histograms) for idx, h in enumerate(self.histograms): marginal_width = self.marginal_width scale = np.sum(h.data) iterations = self.iterations if iterations is None: iterations = int(np.ceil(np.log10(scale))) * 10 if marginal_width is None: if scale > 10_000: marginal_width = 3 else: marginal_width = 2 h.add_cuboid_queries(marginal_width) q_count = self.q_count if self.q_count is not None else 2 * iterations while h.n_queries < q_count: if self.add_ranges: h.add_arbitrary_queries(q_count - h.n_queries) else: h.add_cuboid_queries(marginal_width) if h.n_queries > q_count: h.queries = np.random.choice(h.queries, q_count, replace=False) meas_eps = self.epsilon / 2.0 / n_histograms / iterations meas_bounds = -np.log(2*(0.05)) * (1 / meas_eps) # 95% confidence h.iterations = iterations h.meas_eps = meas_eps h.meas_bound = meas_bounds if self.debug: print(f"Histogram #{idx} split: {h.split}") print(f"Columns: {h.n_cols}") print(f"Dimensionality: {h.dimensionality:,}") print(f"Cuboids possible: {h.n_cuboids}") if hasattr(math, "comb"): print(f"1-2-way cuboids possible: {math.comb(h.n_cols, 2) + h.n_cols}") elif hasattr(math, "factorial"): print(f"1-2-way cuboids possible: {(math.factorial(h.n_cols) // math.factorial(2) // math.factorial(h.n_cols - 2)) + h.n_cols}") print(f"Fitting for {h.iterations} iterations") print(f"Number of queries: {h.n_queries}") print(f"Number of slices in queries: {h.n_slices}") print(f"Per-Measure Epsilon: {h.meas_eps:.3f}") print(f"Measurement Error: {h.meas_bound:.2f}") print() self.max_iterations = max([h.iterations for h in self.histograms]) # Run the algorithm self.synthetic_histograms = self.mwem() def sample(self, samples): """ Creates samples from the histogram data. Follows sdgym schema to be compatible with their benchmark system. NOTE: We are sampling from each split dimensional group as though they are *independent* from one another. We have essentially created len(splits) DP histograms as if they are separate databases, and combine the results into a single sample. :param samples: Number of samples to generate :type samples: int :return: N samples :rtype: list(np.ndarray) """ synthesized_columns = () first = True for fake, _, split in self.synthetic_histograms: s = [] fake_indices = np.arange(len(np.ravel(fake))) fake_distribution = np.ravel(fake) norm = np.sum(fake) for _ in range(samples): s.append(np.random.choice(fake_indices, p=(fake_distribution / norm))) s_unraveled = [] for ind in s: s_unraveled.append(np.unravel_index(ind, fake.shape)) # Here we make scale adjustments to match the original # data np_unraveled = np.array(s_unraveled) for i in range(np_unraveled.shape[-1]): min_c, max_c = self.mins_maxes[str(split[i])] # TODO: Deal with the 0 edge case when scaling # i.e. scale factor * 0th bin is 0, # but should still scale appropriately np_unraveled[:, i] = np_unraveled[:, i] * self.scale[str(split[i])] np_unraveled[:, i] = np_unraveled[:, i] + min_c if first: synthesized_columns = np_unraveled first = False else: synthesized_columns = np.hstack((synthesized_columns, np_unraveled)) # Recombine the independent distributions into a single dataset combined = synthesized_columns # Reorder the columns to mirror their original order r = self._reorder(self.splits) return self._transformer.inverse_transform(combined[:, r]) def mwem(self): """ Runner for the mwem algorithm. Initializes the synthetic histogram, and updates it for up to self.max_iterations using the exponential mechanism and multiplicative weights. Draws from the initialized query store for measurements. :return: synth_hist, self.histogram - synth_hist is the synthetic data histogram, self.histogram is original histo :rtype: np.ndarray, np.ndarray """ a_values = [] for idx, h in enumerate(self.histograms): hist = h.data dimensions = h.dimensions split = h.split queries = h.queries synth_hist = self._initialize_a(hist, dimensions) measurements = {} # NOTE: Here we perform a privacy check, # because if the histogram dimensions are # greater than the iterations, this can be # a big privacy risk (the sample queries will # otherwise be able to match the actual # distribution) # This usually occurs with a split factor of 1, # so that each attribute is independent of the other flat_dim = h.dimensionality iterations = h.iterations if 2 * flat_dim <= iterations: warnings.warn( "Flattened dimensionality of synthetic histogram is less than" + " the number of iterations. This is a privacy risk." + " Consider increasing your split_factor (especially if it is 1), " + "or decreasing the number of iterations. " + "Dim: " + str(flat_dim) + " Split: " + str(split), Warning, ) eps = h.meas_eps if not self.measure_only else 2 * h.meas_eps for i in range(iterations): qi = self._exponential_mechanism( hist, synth_hist, queries, eps, measurements, i ) for query in queries[qi].queries: assert (isinstance(query, Query)) actual = query.evaluate(hist) lap = self._laplace(1.0/eps) if qi in measurements: measurements[qi].append(actual + lap) else: measurements[qi] = [actual + lap] self.accountant.append(eps) synth_hist = self._multiplicative_weights( synth_hist, queries, measurements, hist, self.mult_weights_iterations ) a_values.append((synth_hist, hist, split)) return a_values def _initialize_a(self, histogram, dimensions): """ Initializes a uniform distribution histogram from the given histogram with dimensions :param histogram: Reference histogram :type histogram: np.ndarray :param dimensions: Reference dimensions :type dimensions: np.ndarray :return: New histogram, uniformly distributed according to reference histogram :rtype: np.ndarray """ n = np.sum(histogram) value = n / np.prod(dimensions) synth_hist = np.zeros_like(histogram) synth_hist += value return synth_hist def _histogram_from_data_attributes(self, data, splits=[]): """ Create a histogram from given data :param data: Reference histogram :type data: np.ndarray :return: Histogram over given data, dimensions, bins created (output of np.histogramdd) :rtype: np.ndarray, np.shape, np.ndarray """ histograms = [] for split in splits: split_data = data[:, split] mins_data = [] maxs_data = [] dims_sizes = [] # Transpose for column wise iteration for i, column in enumerate(split_data.T): min_c = min(column) max_c = max(column) # TODO: just use integer bins and assume 0 base mins_data.append(min_c) maxs_data.append(max_c) # Dimension size (number of bins) bin_count = int(max_c - min_c + 1) # Here we track the min and max for the column, # for sampling self.mins_maxes[str(split[i])] = (min_c, max_c) self.scale[str(split[i])] = 1 dims_sizes.append(bin_count) # Produce an N,D dimensional histogram, where # we pre-specify the bin sizes to correspond with # our ranges above if any([a > 0 for a in mins_data]): warnings.warn("Data should be preprocessed to have 0 based indices.") dimensionality = np.product(dims_sizes) if dimensionality > 1e8: warnings.warn(f"Dimensionality of histogram is {dimensionality:,}, consider using splits.") histogram, bins = np.histogramdd(split_data, bins=dims_sizes) # Return histogram, dimensions h = Histogram(histogram, dims_sizes, bins, split) histograms.append(h) return histograms def _exponential_mechanism(self, hist, synth_hist, queries, eps, measurements, iteration): """ Refer to paper for in depth description of Exponential Mechanism. Parametrized with epsilon value epsilon/(2 * iterations) :param hist: Basis histogram :type hist: np.ndarray :param synth_hist: Synthetic histogram :type synth_hist: np.ndarray :param queries: Queries to draw from :type queries: list :param eps: Budget :type eps: float :return: # of errors :rtype: int """ errors = [queries[i].error(hist, synth_hist) * (eps / 2.0) for i in range(len(queries)) ] maxi = max(errors) mean_err = np.mean(errors) exp_errors = [math.exp(errors[i] - maxi) for i in range(len(errors))] count_retries = 0 qi = None while qi is None or qi in measurements: if self.measure_only or count_retries > self.max_retries_exp_mechanism: # grab one uniformly random, wastes epsilon, but is safe options = [i for i in range(len(queries))] options = [i for i in options if i not in measurements] qi = np.random.choice(options) else: r = random.random() e_s = sum(exp_errors) c = 0 qi = len(exp_errors) - 1 for i in range(len(exp_errors)): c += exp_errors[i] if c > r * e_s: qi = i break count_retries += 1 if self.debug: log = int(np.floor(np.log10(self.max_iterations))) skip = 1 if log < 2 else 10 ** (log - 1) if iteration % skip == 0: print(f"[{iteration}] - Average error: {mean_err:.3f}. Selected {len(queries[qi].queries)} slices") if not self.measure_only: self.accountant.append(eps) return qi def _multiplicative_weights(self, synth_hist, queries, m, hist, iterate): """ Multiplicative weights update algorithm, used to boost the synthetic data accuracy given measurements m. Run for iterate times :param synth_hist: Synthetic histogram :type synth_hist: np.ndarray :param queries: Queries to draw from :type queries: list :param m: Measurements taken from real data for each qi query :type m: dict :param hist: Basis histogram :type hist: np.ndarray :param iterate: Number of iterations to run mult weights :type iterate: iterate :return: synth_hist :rtype: np.ndarray """ sum_a = np.sum(synth_hist) for _ in range(iterate): for qi in m: measurements = m[qi] queries_list = queries[qi].queries assert (len(measurements) == len(queries_list)) for measurement, query in zip(measurements, queries_list): error = measurement - query.evaluate(synth_hist) query_update = query.mask(synth_hist) # Apply the update a_multiplier = np.exp(query_update * error / (2.0 * sum_a)) a_multiplier[a_multiplier == 0.0] = 1.0 synth_hist = synth_hist * a_multiplier # Normalize again count_a = np.sum(synth_hist) synth_hist = synth_hist * (sum_a / count_a) return synth_hist def _reorder(self, splits): """ Given an array of dimensionality splits (column indices) returns the corresponding reorder array (indices to return columns to original order) Example: original = [[1, 2, 3, 4, 5, 6], [ 6, 7, 8, 9, 10, 11]] splits = [[1,3,4],[0,2,5]] mod_data = [[2 4 5 1 3 6] [ 7 9 10 6 8 11]] reorder = [3 0 4 1 2 5] :param splits: 2d list with splits (column indices) :type splits: array of arrays :return: 2d list with splits (column indices) :rtype: array of arrays """ flat = [i for l in splits for i in l] reordered = np.zeros(len(flat)) for i, ind in enumerate(flat): reordered[ind] = i return reordered.astype(int) def _generate_splits(self, n_dim, factor): """ If user specifies, do the work and figure out how to divide the dimensions into even splits to speed up MWEM Last split will contain leftovers <= sizeof(factor) :param n_dim: Total # of dimensions :type n_dim: int :param factor: Desired size of the splits :type factor: int :return: Splits :rtype: np.array(np.array(),...) """ # Columns indices indices = np.arange(n_dim) # Split intelligently fits = int((np.floor(len(indices) / factor)) * factor) even_inds = indices[:fits].copy().reshape((int(len(indices) / factor), factor)) s1 = even_inds.tolist() if indices[fits:].size != 0: s1.append(indices[fits:]) s2 = [np.array(l_val) for l_val in s1] return s2 def _laplace(self, sigma): """ Laplace mechanism :param sigma: Laplace scale param sigma :type sigma: float :return: Random value from laplace distribution [-1,1] :rtype: float """ return laplace_noise(sigma)