Source code for snsynth.pytorch.nn.dpgan

import numpy as np
import pandas as pd

import torch
import torch.nn as nn
import torch.optim as optim

from torch.utils.data import DataLoader, TensorDataset

from opacus import PrivacyEngine

from snsynth.base import Synthesizer

from ._generator import Generator
from ._discriminator import Discriminator


[docs]class DPGAN(Synthesizer): def __init__( self, binary=False, latent_dim=64, batch_size=64, epochs=1000, delta=None, epsilon=1.0, ): self.binary = binary self.latent_dim = latent_dim self.batch_size = batch_size self.epochs = epochs self.delta = delta self.epsilon = epsilon self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") self.pd_cols = None self.pd_index = None def train( self, data, categorical_columns=None, ordinal_columns=None, update_epsilon=None, transformer=None, continuous_columns=None, preprocessor_eps=0.0, nullable=False, ): if update_epsilon: self.epsilon = update_epsilon train_data = self._get_train_data( data, style='gan', transformer=transformer, categorical_columns=categorical_columns, ordinal_columns=ordinal_columns, continuous_columns=continuous_columns, nullable=nullable, preprocessor_eps=preprocessor_eps ) data = np.array(train_data) if isinstance(data, pd.DataFrame): for col in data.columns: data[col] = pd.to_numeric(data[col], errors="ignore") self.pd_cols = data.columns self.pd_index = data.index data = data.to_numpy() elif isinstance(data, list): data = np.array(data) elif not isinstance(data, np.ndarray): raise ValueError("Data must be a numpy array or pandas dataframe") dataset = TensorDataset( torch.from_numpy(data.astype("float32")).to(self.device) ) dataloader = DataLoader( dataset, batch_size=self.batch_size, shuffle=True, drop_last=True ) self.generator = Generator( self.latent_dim, data.shape[1], binary=self.binary ).to(self.device) discriminator = Discriminator(data.shape[1]).to(self.device) optimizer_d = optim.Adam(discriminator.parameters(), lr=4e-4) privacy_engine = PrivacyEngine( discriminator, batch_size=self.batch_size, sample_size=len(data), alphas=[1 + x / 10.0 for x in range(1, 100)] + list(range(12, 64)), noise_multiplier=3.5, max_grad_norm=1.0, clip_per_layer=True, ) privacy_engine.attach(optimizer_d) optimizer_g = optim.Adam(self.generator.parameters(), lr=1e-4) criterion = nn.BCELoss() if self.delta is None: self.delta = 1 / (data.shape[0] * np.sqrt(data.shape[0])) for epoch in range(self.epochs): eps, best_alpha = optimizer_d.privacy_engine.get_privacy_spent(self.delta) if self.epsilon < eps: if epoch == 0: raise ValueError( "Inputted epsilon and sigma parameters are too small to" + " create a private dataset. Try increasing either parameter and rerunning." ) break for i, data in enumerate(dataloader): discriminator.zero_grad() real_data = data[0].to(self.device) # train with fake data noise = torch.randn( self.batch_size, self.latent_dim, 1, 1, device=self.device ) noise = noise.view(-1, self.latent_dim) fake_data = self.generator(noise) label_fake = torch.full( (self.batch_size,), 0, dtype=torch.float, device=self.device ) output = discriminator(fake_data.detach()) loss_d_fake = criterion(output.squeeze(), label_fake) loss_d_fake.backward() optimizer_d.step() # train with real data label_true = torch.full( (self.batch_size,), 1, dtype=torch.float, device=self.device ) output = discriminator(real_data.float()) loss_d_real = criterion(output.squeeze(), label_true) loss_d_real.backward() optimizer_d.step() max_grad_norm = [] for p in discriminator.parameters(): param_norm = p.grad.data.norm(2).item() max_grad_norm.append(param_norm) privacy_engine.max_grad_norm = max_grad_norm # train generator self.generator.zero_grad() label_g = torch.full( (self.batch_size,), 1, dtype=torch.float, device=self.device ) output_g = discriminator(fake_data) loss_g = criterion(output_g.squeeze(), label_g) loss_g.backward() optimizer_g.step() # manually clear gradients for p in discriminator.parameters(): if hasattr(p, "grad_sample"): del p.grad_sample # autograd_grad_sample.clear_backprops(discriminator) if self.delta is None: self.delta = 1 / data.shape[0] def generate(self, n): steps = n // self.batch_size + 1 data = [] for i in range(steps): noise = torch.randn( self.batch_size, self.latent_dim, 1, 1, device=self.device ) noise = noise.view(-1, self.latent_dim) fake_data = self.generator(noise) data.append(fake_data.detach().cpu().numpy()) data = np.concatenate(data, axis=0) data = data[:n] return self._transformer.inverse_transform(data) def fit(self, data, *ignore, transformer=None, categorical_columns=[], ordinal_columns=[], continuous_columns=[], preprocessor_eps=0.0, nullable=False): self.train(data, transformer=transformer, categorical_columns=categorical_columns, ordinal_columns=ordinal_columns, continuous_columns=continuous_columns, preprocessor_eps=preprocessor_eps, nullable=nullable) def sample(self, n_samples): return self.generate(n_samples)