Created
November 10, 2023 13:37
-
-
Save nbecker/e45512cc2cdce42e7b67a77e8bd1dfcd to your computer and use it in GitHub Desktop.
test1
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding: utf-8 | |
import os | |
os.environ["PATH"] += os.pathsep + '/usr/local/cuda-12.2/bin' | |
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' #supposed to suppress messages, but doesn't work | |
#import os | |
# use xla-lite (doesn't work for centroid for now) | |
#os.environ['TF_XLA_FLAGS']='--tf_xla_auto_jit=fusible' | |
def parse_args(args): | |
import argparse | |
from argparse_bool import ConfigureAction | |
parser = argparse.ArgumentParser() | |
parser.add_argument ('--ibodB', type=float) | |
parser.add_argument ('--obodB', type=float, default=-1) | |
parser.add_argument ('--alpha', type=float, default=0.1) | |
parser.add_argument ('--sps', type=int, default=4) | |
parser.add_argument ('--si', type=int, default=32) | |
parser.add_argument ('--rate', type=float, default=0.66666) | |
parser.add_argument ('--init-const-rate', default='2/3') | |
parser.add_argument ('--limit', choices=['linear', 'sspa'], default='sspa') | |
parser.add_argument ('--train-esno-min', type=float) | |
parser.add_argument ('--train-esno-max', type=float) | |
parser.add_argument ('--esno-min', type=float) | |
parser.add_argument ('--esno-max', type=float) | |
parser.add_argument ('--esno-step', type=float, default=0.125) | |
parser.add_argument ('--width', type=int, default=3) | |
parser.add_argument ('--n-layers', type=int, default=4) | |
parser.add_argument ('--N', type=int, default=256) | |
parser.add_argument ('--kind', choices=['cnn', 'dense'], default='cnn') | |
parser.add_argument ('--load') | |
parser.add_argument ('--min-bler', type=float) | |
parser.add_argument ('--action', choices=['run-baseline', 'run-centroids', 'train-ml', 'run-ml', 'run-opt-centroids', 'dump-const', 'plot-const', 'dump-orig-const', 'run-pred', 'run-opt-pred', 'run-cancel', 'run-pred-centroids', 'run-opt-pred-centroids', 'plot-spec', 'plot-spec-pred', 'train-ml-pred', 'train-centroids', 'run-centroid-opt-centroids', 'run-opt-baseline', 'run-ml-pred', 'train-and-run-ml']) | |
parser.add_argument ('--mod', choices=['32apsk', '64apsk'], default='32apsk') | |
parser.add_argument ('--pdf', action=ConfigureAction) | |
parser.add_argument ('--normalize', action=ConfigureAction, default=True) | |
parser.add_argument ('--initial-beta', type=float, default=0.2) | |
parser.add_argument ('--final-beta', type=float, default=0.05) | |
parser.add_argument ('--pred-iter', type=int, default=20) | |
parser.add_argument ('--rcv-iter', type=int, default=4) | |
parser.add_argument ('--max-iter', type=int, default=10000) | |
parser.add_argument ('--max-block-errors', type=int, default=1000) | |
parser.add_argument ('--tensorboard', action=ConfigureAction) | |
opt = parser.parse_args(args[1:]) | |
if opt.esno_min is None: | |
opt.esno_min = {'32apsk' : 13.0, '64apsk' : 18.0}[opt.mod] | |
if opt.esno_max is None: | |
opt.esno_max = {'32apsk' : 13.5, '64apsk' : 18.5}[opt.mod] | |
if opt.train_esno_min is None: | |
opt.train_esno_min = {'32apsk' : 13.0, '64apsk' : 18.0}[opt.mod] | |
if opt.train_esno_max is None: | |
opt.train_esno_max = {'32apsk' : 13.5, '64apsk' : 18.5}[opt.mod] | |
import hashlib | |
m = hashlib.md5() | |
for c in args: | |
m.update (c.encode (sys.getdefaultencoding())) | |
logname = args[0] + '.' + m.hexdigest()+'.log' | |
print ('logname:', logname) | |
return opt, logname | |
from sionna.utils import QAMSource | |
from sionna.signal import Upsampling, Downsampling, RootRaisedCosineFilter, empirical_psd, empirical_aclr | |
# Configure the notebook to use only a single GPU and allocate only as much memory as needed | |
# For more details, see https://www.tensorflow.org/guide/gpu | |
import tensorflow as tf | |
import tensorflow_probability as tfp | |
tf.get_logger().setLevel('ERROR') | |
gpus = tf.config.list_physical_devices('GPU') | |
print('Number of GPUs available :', len(gpus)) | |
if gpus: | |
gpu_num = 0 # Index of the GPU to use | |
try: | |
tf.config.set_visible_devices(gpus[gpu_num], 'GPU') | |
print('Only GPU number', gpu_num, 'used.') | |
tf.config.experimental.set_memory_growth(gpus[gpu_num], True) | |
except RuntimeError as e: | |
print(e) | |
# In[2]: | |
#get_ipython().run_line_magic('matplotlib', 'inline') | |
import numpy as np | |
import pickle | |
from tensorflow.keras import Model | |
from tensorflow.keras.layers import Layer, Dense, Conv1D | |
import sys | |
import sionna | |
import traceback | |
import contextlib | |
# Some helper code to demonstrate the kinds of errors you might encounter. | |
@contextlib.contextmanager | |
def assert_raises(error_class): | |
try: | |
yield | |
except error_class as e: | |
print('Caught expected exception \n {}:'.format(error_class)) | |
traceback.print_exc(limit=2) | |
except Exception as e: | |
raise e | |
else: | |
raise Exception('Expected {} to be raised but no error was raised!'.format( | |
error_class)) | |
from sionna.channel import AWGN | |
from sionna.utils import BinarySource, ebnodb2no, log10, expand_to_rank, insert_dims | |
from sionna.fec.ldpc.encoding import LDPC5GEncoder | |
from sionna.fec.ldpc.decoding import LDPC5GDecoder | |
from sionna.mapping import Mapper, Demapper, Constellation | |
from sionna.utils import sim_ber | |
def power (u): | |
return tf.math.real (tf.norm (u)**2 / tf.cast (tf.size (u), u.dtype)) | |
def normalize (u, start=0, end=-1): | |
return u / tf.cast (tf.sqrt (power (u[start:end])), u.dtype) | |
def average_mutual_information (xbits, llr): | |
return 1.0 - tf.reduce_mean (tf.experimental.numpy.log2 (1 + tf.exp ((1.0-2.0*tf.cast (xbits, tf.float32)) * llr))) | |
# ## Simulation Parameters <a class="anchor" id="Simulation-Parameters"></a> | |
# In[3]: | |
############################################### | |
# SNR range for evaluation and training [dB] | |
############################################### | |
# if opt.limit == 'sspa': | |
# esno_db_min = 12.0 # obo=-0.5 | |
# esno_db_max = 13.0 | |
# else: | |
# esno_db_min = 15.5 | |
# esno_db_max = 16.0 #9.5 if we use lower esno will we converge to 8+8 instead of qam? | |
# ## Neural Demapper <a class="anchor" id="Neural-Demapper"></a> | |
# The neural network-based demapper shown in the figure above is made of three dense layers with ReLU activation. | |
# | |
# The input of the demapper consists of a received sample $y \in \mathbb{C}$ and the noise power spectral density $N_0$ in log-10 scale to handle different orders of magnitude for the SNR. | |
# | |
# As the neural network can only process real-valued inputs, these values are fed as a 3-dimensional vector | |
# | |
# $$\left[ \mathcal{R}(y), \mathcal{I}(y), \log_{10}(N_0) \right]$$ | |
# | |
# where $\mathcal{R}(y)$ and $\mathcal{I}(y)$ refer to the real and imaginary component of $y$, respectively. | |
# | |
# The output of the neural network-based demapper consists of LLRs on the `num_bits_per_symbol` bits mapped to a constellation point. Therefore, the last layer consists of ``num_bits_per_symbol`` units. | |
# | |
# **Note**: The neural network-based demapper processes the received samples $y$ forming a block individually. The [neural receiver notebook](https://nvlabs.github.io/sionna/examples/Neural_Receiver.html) provides an example of a more advanced neural network-based receiver that jointly processes a resource grid of received symbols. | |
# In[4]: | |
class NeuralDemapper(Layer): | |
def __init__(self, N=128, width=3, n_layers=3, kind='cnn'): | |
super().__init__() | |
if kind == 'dense': | |
self.layers = [Dense(N, 'relu') for _ in range (n_layers-1)] | |
elif kind == 'cnn': | |
self.layers = [Conv1D(N, width, activation='relu', padding='same') for _ in range (n_layers-1)] | |
self._dense_3 = Dense(num_bits_per_symbol, None) # The feature correspond to the LLRs for every bits carried by a symbol | |
def call(self, inputs): | |
y,no = inputs | |
# Using log10 scale helps with the performance | |
no_db = log10(no) | |
# Stacking the real and imaginary components of the complex received samples | |
# and the noise variance | |
no_db = tf.tile(no_db, [1, num_symbols_per_codeword]) # [batch size, num_symbols_per_codeword] | |
z = tf.stack([tf.math.real(y), | |
tf.math.imag(y), | |
no_db], axis=2) # [batch size, num_symbols_per_codeword, 3] | |
for layer in self.layers: | |
z = layer (z) | |
llr = self._dense_3(z) # [batch size, num_symbols_per_codeword, num_bits_per_symbol] | |
return llr | |
# ## Trainable End-to-end System: Conventional Training <a class="anchor" id="Trainable-End-to-end-System:-Conventional-Training"></a> | |
# The following cell defines an end-to-end communication system that transmits bits modulated using a trainable constellation over an AWGN channel. | |
# | |
# The receiver uses the previously defined neural network-based demapper to compute LLRs on the transmitted (coded) bits. | |
# | |
# As in [1], the constellation and neural network-based demapper are jointly trained through SGD and backpropagation using the binary cross entropy (BCE) as loss function. | |
# | |
# Training on the BCE is known to be equivalent to maximizing an achievable information rate [2]. | |
# | |
# The following model can be instantiated either for training (`training = True`) or evaluation (`training = False`). | |
# | |
# In the former case, the BCE is returned and no outer code is used to reduce computational complexity and as it does not impact the training of the constellation or demapper. | |
# | |
# When setting `training` to `False`, an LDPC outer code from 5G NR is applied. | |
# In[5]: | |
from read_sspa2 import read_sspa, interp | |
class sspa: | |
def __init__ (self): | |
sspa, inp, obo_spline, ibo_am, pin_ref, obo, pout_ref, ibo_phase, phase, phase_ref, ibo_phase2, phase2, phase_spline = read_sspa (return_extra=True) | |
self.I = interp (obo_spline, phase_spline) | |
def __call__ (self, x): | |
return self.I (x) | |
def setup(opt, logname): | |
############################################### | |
# Modulation and coding configuration | |
############################################### | |
beta = opt.alpha | |
span_in_symbols = opt.si # Filter span in symbold | |
samples_per_symbol = opt.sps # Number of samples per symbol, i.e., the oversampling factor | |
rrcf = RootRaisedCosineFilter(span_in_symbols, samples_per_symbol, beta, normalize=False) | |
mf = RootRaisedCosineFilter(span_in_symbols, samples_per_symbol, beta, normalize=False) | |
mf._coefficients = mf._coefficients / samples_per_symbol | |
if opt.mod == '32apsk': | |
from const32apsk import gen_constellation_32apsk | |
const = (gen_constellation_32apsk (opt.init_const_rate)) | |
bits_per_sym = num_bits_per_symbol = 5 | |
elif opt.mod == '64apsk': | |
from const64apsk import gen_constellation_64apsk | |
const = gen_constellation_64apsk (opt.init_const_rate) | |
bits_per_sym = num_bits_per_symbol = 6 | |
modulation_order = 2**num_bits_per_symbol | |
coderate = opt.rate | |
n = N = 8400 # Codeword length [bit]. Must be a multiple of num_bits_per_symbol | |
num_symbols_per_codeword = n//num_bits_per_symbol # Number of modulated baseband symbols per codeword | |
k = K = int(n*coderate) # Number of information bits per codeword | |
############################################### | |
# Training configuration | |
############################################### | |
num_training_iterations_conventional = 5000 # Number of training iterations for conventional training | |
# Number of training iterations with RL-based training for the alternating training phase and fine-tuning of the receiver phase | |
num_training_iterations_rl_alt = 7000 | |
num_training_iterations_rl_finetuning = 3000 | |
training_batch_size = tf.constant(128, tf.int32) # Training batch size | |
rl_perturbation_var = 0.01 # Variance of the perturbation used for RL-based training of the transmitter | |
model_weights_path_conventional_training = logname + ',conventional.weights'#"awgn_autoencoder_weights_conventional_training" # Filename to save the autoencoder weights once conventional training is done | |
model_weights_path_rl_training = logname + ',rl.weights'#"awgn_autoencoder_weights_rl_training" # Filename to save the autoencoder weights once RL-based training is done | |
############################################### | |
# Evaluation configuration | |
############################################### | |
results_filename = logname.replace('.log', '.results') | |
if opt.limit == 'linear': | |
def limiter (x): | |
return x | |
limiter.name = 'linear' | |
elif opt.limit == 'sspa': | |
limiter = sspa () | |
limiter.name = 'sspa(j3)' | |
ind1, ind0 = gen_ind (bits_per_sym) | |
llr_mask = gen_llr_mask (bits_per_sym) | |
M = 1<<bits_per_sym | |
globals().update(locals()) # gross hack | |
#tf.config.run_functions_eagerly(True) | |
def gen_ind (bits): | |
size = 1<<bits | |
all_syms = np.arange (size) | |
all_bits = np.arange (bits) | |
bit_mask = (all_syms[:,np.newaxis] >> all_bits) & 1 | |
ind1 = np.array ([np.nonzero (bit_mask[...,i])[0] for i in range (bits)]) | |
ind0 = np.array ([np.nonzero (np.logical_not (bit_mask[...,i]))[0] for i in range (bits)]) | |
return ind1, ind0 | |
def gen_llr_mask (bits): | |
size = 1<<bits | |
all_syms = np.arange (size) | |
all_bits = np.arange (bits) | |
bit_mask = (all_syms[:,np.newaxis] >> all_bits) & 1 | |
return bit_mask | |
#from xtensor_math import mag_sqr | |
def mag_sqr (u): | |
return np.real(u)**2 + np.imag(u)**2 | |
import traceback | |
import warnings | |
import sys | |
def warn_with_traceback(message, category, filename, lineno, file=None, line=None): | |
log = file if hasattr(file,'write') else sys.stderr | |
traceback.print_stack(file=log) | |
log.write(warnings.formatwarning(message, category, filename, lineno, line)) | |
warnings.showwarning = warn_with_traceback | |
# from tensorflow.python.ops.numpy_ops import np_config | |
# np_config.enable_numpy_behavior() | |
class demapper (): | |
def __init__ (self, M, n_symbols): | |
all_indices = np.array([[[[b,i,j] for j in range (M)] for i in range (n_symbols)] for b in range (training_batch_size.numpy())] ) | |
self.indices_0 = all_indices[:,:,ind0] | |
self.indices_1 = all_indices[:,:,ind1] | |
#@tf.function(jit_compile=True) | |
def __call__ (self, ll): | |
#inputs[batches, N] | |
# Unforturnately this direct addressing doesn't work in graph mode | |
# ld0 = ll[:,:,ind0] | |
# ld1 = ll[:,:,ind1] | |
# Instead we have to enumerate all the indices and use gather_nd | |
ld0 = tf.gather_nd (ll, self.indices_0) | |
ld1 = tf.gather_nd (ll, self.indices_1) | |
llr = tf.math.reduce_logsumexp (ld0, axis=-1) - tf.math.reduce_logsumexp (ld1, axis=-1) | |
# llr has shape [batch, n_symbols, bits_per_sym] | |
llr_flat = tf.reshape (llr[:,:,::-1], (llr.shape[0],N)) | |
return -llr_flat # wants llrs inverted | |
#dem = demapper (M, num_symbols_per_codeword) | |
def measure_obo (constellation, ibodB, xconst=None): | |
'given ibodB(negative) find power(dB)' | |
ibo = 10 ** (0.05 * ibodB) | |
batch_size = 128 | |
if xconst is None: | |
binary_source = BinarySource() | |
mapper = Mapper(constellation=constellation) | |
c = binary_source([batch_size, n]) | |
x = mapper(c) # x [batch size, num_symbols_per_codeword] | |
else: | |
x = xconst | |
us = Upsampling(samples_per_symbol) | |
x_us = us(x) | |
xfilt_out = rrcf(x_us) | |
limit_out = limiter (xfilt_out * tf.cast (ibo, xfilt_out.dtype)) | |
# mf_out = mf (limit_out) | |
# ds = Downsampling(samples_per_symbol, rrcf.length-1) | |
# x_hat = ds(mf_out)[:,:x.shape[1]] | |
p = power (limit_out[:,opt.sps*opt.si//2:opt.sps*opt.si//2+num_symbols_per_codeword*opt.sps]) | |
return 10*log10(p) | |
def find_ibodB (constellation, obodB, xconst=None): # obodB specified as negative amount | |
#from scipy.optimize import root_scalar | |
#sol = root_scalar (lambda ibodB: measure_obo (constellation, ibodB, xconst) - obodB, method='brentq', bracket=[-20, +6]) | |
#return sol.root | |
res = tfp.math.find_root_chandrupatla( | |
lambda ibodB: measure_obo (constellation, ibodB, xconst) - obodB, | |
low=-20, | |
high=+6,) | |
return res.estimated_root | |
# def set_ibo (constellation, verbose=True): | |
# 'find ibo(dB) for opt.obodB' | |
# if opt.obodB is not None: | |
# ibo = find_ibo (constellation, opt.obodB) | |
# if verbose: | |
# print ('ibo(dB):', ibo) | |
# return ibo | |
class E2ESystemConventionalTraining(Model): | |
def __init__(self, training, constellation=None, demapper_type='neural', N=128, n_layers=3, train_mapper=True, demapper=None, z=None, width=3, kind='cnn', pred=False): | |
super().__init__() | |
self._training = training | |
self.pred = pred | |
################ | |
## Transmitter | |
################ | |
self._binary_source = BinarySource() | |
# To reduce the computational complexity of training, the outer code is not used when training, | |
# as it is not required | |
if not self._training: | |
self._encoder = LDPC5GEncoder(k, n, num_bits_per_symbol) | |
if z is not None: | |
symlength = n // num_bits_per_symbol | |
encout = np.arange (n) | |
in_bits = np.empty (n, dtype=np.int64) | |
for i in range (num_bits_per_symbol): | |
in_bits[i::num_bits_per_symbol] = encout[z[i]*symlength:(z[i]+1)*symlength] | |
self._encoder._out_int = in_bits | |
self._encoder._out_int_inv = np.argsort (in_bits) | |
# Trainable constellation | |
if constellation is None: | |
constellation = Constellation("qam", num_bits_per_symbol, trainable=True) | |
else: | |
constellation = Constellation("custom", num_bits_per_symbol, initial_value=constellation, trainable=train_mapper) | |
self.constellation = constellation | |
self._mapper = Mapper(constellation=constellation) | |
################ | |
## Channel | |
################ | |
self._channel = AWGN() | |
################ | |
## Receiver | |
################ | |
# We use the previously defined neural network for demapping | |
if demapper is not None: | |
self._demapper = demapper | |
elif demapper_type == 'neural': | |
self._demapper = NeuralDemapper(N=N, n_layers=n_layers, width=width, kind=kind) | |
elif demapper_type == 'app': | |
self._demapper = Demapper (demapping_method='app', num_bits_per_symbol=num_bits_per_symbol, constellation=constellation) | |
# To reduce the computational complexity of training, the outer code is not used when training, | |
# as it is not required | |
if not self._training: | |
self._decoder = LDPC5GDecoder(self._encoder, hard_out=True) | |
################# | |
# Loss function | |
################# | |
if self._training: | |
self._bce = tf.keras.losses.BinaryCrossentropy(from_logits=True) | |
if opt.limit == 'sspa': | |
limiter.ibo = tf.cast (10**(0.05 * find_ibodB (self._mapper.constellation, opt.obodB)), tf.complex64) | |
elif opt.limit == 'linear': | |
limiter.ibo = tf.constant (1.0, tf.complex64) | |
@tf.function()#jit_compile=True) | |
def call(self, batch_size, esno_db=None, ebno_db=None, return_limit_out=False): | |
# kludge: sim_ber will call with ebno_db, but it's really esno we're using | |
if esno_db == None: | |
esno_db = ebno_db | |
# If `esno_db` is a scalar, a tensor with shape [batch size] is created as it is what is expected by some layers | |
if len(esno_db.shape) == 0: | |
esno_db = tf.fill([batch_size], esno_db) | |
no = 10**(-0.1 * esno_db) | |
no = expand_to_rank(no, 2) | |
no = tf.cast (no, tf.float32) | |
################ | |
## Transmitter | |
################ | |
# Outer coding is only performed if not training | |
if self._training: | |
c = self._binary_source([batch_size, n]) | |
else: | |
b = self._binary_source([batch_size, k]) | |
c = self._encoder(b) | |
# Modulation | |
x = self._mapper(c) # x [batch size, num_symbols_per_codeword] | |
if self.pred: | |
x_pred, mse = pred (x, 1.0, betas, 20*log10(limiter.ibo), limiter, samples_per_symbol, span_in_symbols, rrcf, mf, norm=opt.normalize) | |
else: | |
x_pred = x | |
us = Upsampling(samples_per_symbol) | |
# Upsample the QAM symbol sequence | |
x_us = us(x_pred) | |
xfilt_out = rrcf(x_us) | |
################ | |
## Channel | |
################ | |
if self.pred: | |
# Find a new ibo with the predistorted signal to give the proper obo | |
ibo = tf.cast (10**(0.05 * find_ibodB (None, opt.obodB, x_pred)), tf.complex64) | |
else: | |
ibo = limiter.ibo | |
limit_out = limiter (xfilt_out * ibo) | |
if return_limit_out: | |
return limit_out | |
y = self._channel([limit_out, no*tf.cast(samples_per_symbol, no.dtype)]) # [batch size, num_symbols_per_codeword] | |
mf_out = mf (y) | |
# Instantiate a downsampling layer | |
num_symbols = x.shape[1] | |
#ds = Downsampling(samples_per_symbol, span_in_symbols-1) | |
ds = Downsampling(samples_per_symbol, rrcf.length-1, num_symbols) | |
# Recover the transmitted symbol sequence | |
x_hat = ds(mf_out)[:,:x.shape[1]] | |
self.x_hat = x_hat | |
#print ('mses:', np.array([tf.math.real(power (a - b)).numpy() for a,b in zip (x, x_hat)])) | |
xconst_sqnorm = power (x) | |
signal = tf.reduce_sum (tf.multiply (x_hat, tf.math.conj(x)))/tf.cast(tf.size(x), x.dtype)/tf.cast(xconst_sqnorm, tf.complex64) | |
y = x_hat / signal | |
################ | |
## Receiver | |
################ | |
llr = self._demapper([y, no]) | |
llr = tf.reshape(llr, [batch_size, n]) | |
# If training, outer decoding is not performed and the BCE is returned | |
if self._training: | |
loss = self._bce(c, llr) | |
return loss | |
else: | |
# Outer decoding | |
b_hat = self._decoder(llr) | |
#print ('bce:', tf.keras.losses.BinaryCrossentropy(from_logits=True)(c, llr), 'ami:', average_mutual_information (c, llr)) | |
return b,b_hat # Ground truth and reconstructed information bits returned for BER/BLER computation | |
# A simple training loop is defined in the next cell, which performs `num_training_iterations_conventional` training iterations of SGD. Training is done over a range of SNR, by randomly sampling a batch of SNR values at each iteration. | |
# **Note:** For an introduction to the implementation of differentiable communication systems and their optimization through SGD and backpropagation with Sionna, please refer to [the Part 2 of the Sionna tutorial for Beginners](https://nvlabs.github.io/sionna/examples/Sionna_tutorial_part2.html). | |
# In[6]: | |
from datetime import datetime | |
logdir = "logs/scalars/" + datetime.now().strftime("%Y%m%d-%H%M%S") | |
file_writer = tf.summary.create_file_writer(logdir + "/metrics") | |
file_writer.set_as_default() | |
def conventional_training(model): | |
# Optimizer used to apply gradients | |
optimizer = tf.keras.optimizers.Adam() | |
for i in range(num_training_iterations_conventional): | |
# Sampling a batch of SNRs | |
esno_db = tf.random.uniform(shape=[training_batch_size], minval=opt.train_esno_min, maxval=opt.train_esno_max) | |
# Forward pass | |
with tf.GradientTape() as tape: | |
loss = model(training_batch_size, esno_db) # The model is assumed to return the BMD rate | |
# Computing and applying gradients | |
weights = model.trainable_weights | |
grads = tape.gradient(loss, weights) | |
nonfinite_grads = [not tf.math.reduce_all (tf.math.is_finite (grad)) for grad in grads] | |
zero_grads = [tf.zeros_like (grad) for grad in grads] | |
grads = [zero_grad if nonfinite else grad for nonfinite,zero_grad,grad in zip (nonfinite_grads, zero_grads, grads)] | |
optimizer.apply_gradients(zip(grads, weights)) | |
tf.summary.scalar('BCE', data=loss.numpy(), step=i) | |
# Printing periodically the progress | |
if i % 100 == 0: | |
if opt.limit == 'sspa': | |
limiter.ibo = tf.cast (10**(0.05 * find_ibodB (model._mapper.constellation, opt.obodB)), tf.complex64) | |
print('Iteration {}/{} BCE: {:.4f}'.format(i, num_training_iterations_conventional, loss.numpy()), 'ibo(dB):', 20*log10(limiter.ibo).numpy(), 'obo:', measure_obo (model._mapper.constellation, 20*np.log10(limiter.ibo.numpy())).numpy(), 'pwr:', power (model.constellation.points.numpy()).numpy(), end='\r') | |
# The next cell defines a utility function for saving the weights using [pickle](https://docs.python.org/3/library/pickle.html). | |
# In[7]: | |
def save_weights(model, model_weights_path): | |
weights = model.get_weights() | |
with open(model_weights_path, 'wb') as f: | |
pickle.dump(weights, f, -1) | |
# In the next cell, an instance of the model defined previously is instantiated and trained. | |
# In[8]: | |
# Fix the seed for reproducible trainings | |
tf.random.set_seed(1) | |
# Instantiate and train the end-to-end system | |
#model_32apsk_notrain = E2ESystemConventionalTraining(training=True, constellation=const, N=128, n_layers=3, train_mapper=False) | |
def run_ml_training(model): | |
conventional_training(model) | |
# Save weights | |
save_weights(model, model_weights_path_conventional_training) | |
return model_weights_path_conventional_training | |
def train_2_times(): | |
conventional_training (model_32apsk_notrain) | |
model_32apsk_train = E2ESystemConventionalTraining(training=True, constellation=const, N=128, n_layers=3, train_mapper=True, demapper=model_32apsk_notrain._demapper) | |
conventional_training (model_32apsk_train) | |
return model_32apsk_train | |
# ## Trainable End-to-end System: RL-based Training <a class="anchor" id="Trainable-End-to-end-System:-RL-based-Training"></a> | |
# The following cell defines the same end-to-end system as before, but stop the gradients after the channel to simulate a non-differentiable channel. | |
# | |
# To jointly train the transmitter and receiver over a non-differentiable channel, we follow [3], which key idea is to alternate between: | |
# | |
# - Training of the receiver on the BCE using conventional backpropagation and SGD. | |
# - Training of the transmitter by applying (known) perturbations to the transmitter output to enable estimation of the gradient of the transmitter weights with respect to an approximation of the loss function. | |
# | |
# When `training` is set to `True`, both losses for training the receiver and the transmitter are returned. | |
# In[9]: | |
# class E2ESystemRLTraining(Model): | |
# def __init__(self, training): | |
# super().__init__() | |
# self._training = training | |
# ################ | |
# ## Transmitter | |
# ################ | |
# self._binary_source = BinarySource() | |
# # To reduce the computational complexity of training, the outer code is not used when training, | |
# # as it is not required | |
# if not self._training: | |
# self._encoder = LDPC5GEncoder(k, n) | |
# # Trainable constellation | |
# constellation = Constellation("qam", num_bits_per_symbol, trainable=True) | |
# self.constellation = constellation | |
# self._mapper = Mapper(constellation=constellation) | |
# ################ | |
# ## Channel | |
# ################ | |
# self._channel = AWGN() | |
# ################ | |
# ## Receiver | |
# ################ | |
# # We use the previously defined neural network for demapping | |
# self._demapper = NeuralDemapper() | |
# # To reduce the computational complexity of training, the outer code is not used when training, | |
# # as it is not required | |
# if not self._training: | |
# self._decoder = LDPC5GDecoder(self._encoder, hard_out=True) | |
# @tf.function(jit_compile=True) | |
# def call(self, batch_size, ebno_db, perturbation_variance=tf.constant(0.0, tf.float32)): | |
# # If `ebno_db` is a scalar, a tensor with shape [batch size] is created as it is what is expected by some layers | |
# if len(ebno_db.shape) == 0: | |
# ebno_db = tf.fill([batch_size], ebno_db) | |
# no = ebnodb2no(ebno_db, num_bits_per_symbol, coderate) | |
# no = expand_to_rank(no, 2) | |
# ################ | |
# ## Transmitter | |
# ################ | |
# # Outer coding is only performed if not training | |
# if self._training: | |
# c = self._binary_source([batch_size, n]) | |
# else: | |
# b = self._binary_source([batch_size, k]) | |
# c = self._encoder(b) | |
# # Modulation | |
# x = self._mapper(c) # x [batch size, num_symbols_per_codeword] | |
# # Adding perturbation | |
# # If ``perturbation_variance`` is 0, then the added perturbation is null | |
# epsilon_r = tf.random.normal(tf.shape(x))*tf.sqrt(0.5*perturbation_variance) | |
# epsilon_i = tf.random.normal(tf.shape(x))*tf.sqrt(0.5*perturbation_variance) | |
# epsilon = tf.complex(epsilon_r, epsilon_i) # [batch size, num_symbols_per_codeword] | |
# x_p = x + epsilon # [batch size, num_symbols_per_codeword] | |
# ################ | |
# ## Channel | |
# ################ | |
# y = self._channel([x_p, no]) # [batch size, num_symbols_per_codeword] | |
# y = tf.stop_gradient(y) # Stop gradient here | |
# ################ | |
# ## Receiver | |
# ################ | |
# llr = self._demapper([y, no]) | |
# # If training, outer decoding is not performed | |
# if self._training: | |
# # Average BCE for each baseband symbol and each batch example | |
# c = tf.reshape(c, [-1, num_symbols_per_codeword, num_bits_per_symbol]) | |
# bce = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(c, llr), axis=2) # Avergare over the bits mapped to a same baseband symbol | |
# # The RX loss is the usual average BCE | |
# rx_loss = tf.reduce_mean(bce) | |
# # From the TX side, the BCE is seen as a feedback from the RX through which backpropagation is not possible | |
# bce = tf.stop_gradient(bce) # [batch size, num_symbols_per_codeword] | |
# x_p = tf.stop_gradient(x_p) | |
# p = x_p-x # [batch size, num_symbols_per_codeword] Gradient is backpropagated through `x` | |
# tx_loss = tf.square(tf.math.real(p)) + tf.square(tf.math.imag(p)) # [batch size, num_symbols_per_codeword] | |
# tx_loss = -bce*tx_loss/rl_perturbation_var # [batch size, num_symbols_per_codeword] | |
# tx_loss = tf.reduce_mean(tx_loss) | |
# return tx_loss, rx_loss | |
# else: | |
# llr = tf.reshape(llr, [-1, n]) # Reshape as expected by the outer decoder | |
# b_hat = self._decoder(llr) | |
# return b,b_hat | |
# The next cell implements the training algorithm from [3], which alternates between conventional training of the neural network-based receiver, and RL-based training of the transmitter. | |
# In[19]: | |
# def rl_based_training(model): | |
# # Optimizers used to apply gradients | |
# optimizer_tx = tf.keras.optimizers.Adam() # For training the transmitter | |
# optimizer_rx = tf.keras.optimizers.Adam() # For training the receiver | |
# # Function that implements one transmitter training iteration using RL. | |
# def train_tx(): | |
# # Sampling a batch of SNRs | |
# ebno_db = tf.random.uniform(shape=[training_batch_size], minval=ebno_db_min, maxval=ebno_db_max) | |
# # Forward pass | |
# with tf.GradientTape() as tape: | |
# # Keep only the TX loss | |
# tx_loss, _ = model(training_batch_size, ebno_db, | |
# tf.constant(rl_perturbation_var, tf.float32)) # Perturbation are added to enable RL exploration | |
# ## Computing and applying gradients | |
# weights = model.trainable_weights | |
# grads = tape.gradient(tx_loss, weights) | |
# optimizer_tx.apply_gradients(zip(grads, weights)) | |
# # Function that implements one receiver training iteration | |
# def train_rx(): | |
# # Sampling a batch of SNRs | |
# ebno_db = tf.random.uniform(shape=[training_batch_size], minval=ebno_db_min, maxval=ebno_db_max) | |
# # Forward pass | |
# with tf.GradientTape() as tape: | |
# # Keep only the RX loss | |
# _, rx_loss = model(training_batch_size, ebno_db) # No perturbation is added | |
# ## Computing and applying gradients | |
# weights = model.trainable_weights | |
# grads = tape.gradient(rx_loss, weights) | |
# optimizer_rx.apply_gradients(zip(grads, weights)) | |
# # The RX loss is returned to print the progress | |
# return rx_loss | |
# # Training loop. | |
# for i in range(num_training_iterations_rl_alt): | |
# # 10 steps of receiver training are performed to keep it ahead of the transmitter | |
# # as it is used for computing the losses when training the transmitter | |
# for _ in range(10): | |
# rx_loss = train_rx() | |
# # One step of transmitter training | |
# train_tx() | |
# # Printing periodically the progress | |
# if i % 100 == 0: | |
# print('Iteration {}/{} BCE {:.4f}'.format(i, num_training_iterations_rl_alt, rx_loss.numpy()), end='\r') | |
# print() # Line break | |
# # Once alternating training is done, the receiver is fine-tuned. | |
# print('Receiver fine-tuning... ') | |
# for i in range(num_training_iterations_rl_finetuning): | |
# rx_loss = train_rx() | |
# if i % 100 == 0: | |
# print('Iteration {}/{} BCE {:.4f}'.format(i, num_training_iterations_rl_finetuning, rx_loss.numpy()), end='\r') | |
# In the next cell, an instance of the model defined previously is instantiated and trained. | |
# In[20]: | |
# # Fix the seed for reproducible trainings | |
# tf.random.set_seed(1) | |
# # Instantiate and train the end-to-end system | |
# model = E2ESystemRLTraining(training=True) | |
# rl_based_training(model) | |
# # Save weights | |
# save_weights(model, model_weights_path_rl_training) | |
# ## Evaluation <a class="anchor" id="Evaluation"></a> | |
# The following cell implements a baseline which uses QAM with Gray labeling and conventional demapping for AWGN channel. | |
# In[21]: | |
class Baseline(Model): | |
def __init__(self, constellation=None): | |
super().__init__() | |
################ | |
## Transmitter | |
################ | |
self._binary_source = BinarySource() | |
self._encoder = LDPC5GEncoder(k, n, num_bits_per_symbol) | |
if constellation is None: | |
constellation = Constellation("qam", num_bits_per_symbol, trainable=False) | |
self.constellation = constellation | |
else: | |
self.constellation = constellation | |
self._mapper = Mapper(constellation=constellation) | |
################ | |
## Channel | |
################ | |
self._channel = AWGN() | |
self.us = Upsampling(samples_per_symbol) | |
self.ds = Downsampling(samples_per_symbol, rrcf.length-1) | |
################ | |
## Receiver | |
################ | |
self._demapper = Demapper("app", constellation=constellation)#demapper (constellation.points, M, num_symbols_per_codeword) | |
self._decoder = LDPC5GDecoder(self._encoder, hard_out=True) | |
@tf.function()#jit_compile=True) | |
def call(self, batch_size, esno_db):#, perturbation_variance=tf.constant(0.0, tf.float32)): | |
#tf.print ('batch:', batch_size, output_stream=sys.stdout) | |
# If `esno_db` is a scalar, a tensor with shape [batch size] is created as it is what is expected by some layers | |
if len(esno_db.shape) == 0: | |
esno_db = tf.fill([batch_size], esno_db) | |
no = 10**(-0.1 * esno_db) | |
no = expand_to_rank(no, 2) | |
no = tf.cast (no, tf.float32) | |
#tf.print ('no:', no) | |
################ | |
## Transmitter | |
################ | |
b = self._binary_source([batch_size, k]) | |
c = self._encoder(b) | |
# Modulation | |
x = self._mapper(c) # x [batch size, num_symbols_per_codeword] | |
# Upsample the QAM symbol sequence | |
x_us = self.us(x) | |
xfilt_out = rrcf(x_us) | |
# ################ | |
# ## Channel | |
# ################ | |
limit_out = limiter (xfilt_out * limiter.ibo) | |
y = self._channel([limit_out, no*tf.cast(samples_per_symbol, no.dtype)]) # [batch size, num_symbols_per_codeword] | |
mf_out = mf (y) | |
# # Instantiate a downsampling layer | |
# num_symbols = x.shape[1] | |
# # Recover the transmitted symbol sequence | |
x_hat = self.ds(mf_out)[:,:x.shape[1]] | |
#xconst_sqnorm = power (x) should be 1 anyway | |
signal = tf.reduce_sum (tf.multiply (x_hat, tf.math.conj(x)))/tf.cast(tf.size(x), x.dtype) | |
#print ('signal:', signal) | |
#print ('y:', y) | |
y = x_hat / signal | |
#tf.print ('y:', y) | |
################ | |
## Receiver | |
################ | |
#tf.print (y.shape, no.shape) | |
#print ('trace demapper') | |
llr = self._demapper(y, no) | |
# Outer decoding | |
b_hat = self._decoder(llr) | |
#tf.print (b[0,:10], b_hat[0,:10]) | |
#tf.print ('exec') | |
return b,b_hat # Ground truth and reconstructed information bits returned for BER/BLER computation | |
class Pred(Model): | |
def __init__(self, constellation=None): | |
super().__init__() | |
################ | |
## Transmitter | |
################ | |
self._binary_source = BinarySource() | |
self._encoder = LDPC5GEncoder(k, n, num_bits_per_symbol) | |
if constellation is None: | |
constellation = Constellation("qam", num_bits_per_symbol, trainable=False) | |
self.constellation = constellation | |
else: | |
self.constellation = constellation | |
self._mapper = Mapper(constellation=constellation) | |
################ | |
## Channel | |
################ | |
self._channel = AWGN() | |
self.us = Upsampling(samples_per_symbol) | |
self.ds = Downsampling(samples_per_symbol, rrcf.length-1) | |
################ | |
## Receiver | |
################ | |
self._demapper = Demapper("app", constellation=constellation)#demapper (constellation.points, M, num_symbols_per_codeword) | |
self._decoder = LDPC5GDecoder(self._encoder, hard_out=True) | |
@tf.function()#jit_compile=True) | |
def call(self, batch_size, esno_db=None, ebno_db=None, return_limit_out=False): | |
# kludge: sim_ber will call with ebno_db, but it's really esno we're using | |
if esno_db == None: | |
esno_db = ebno_db | |
# If `esno_db` is a scalar, a tensor with shape [batch size] is created as it is what is expected by some layers | |
if len(esno_db.shape) == 0: | |
esno_db = tf.fill([batch_size], esno_db) | |
no = 10**(-0.1 * esno_db) | |
no = expand_to_rank(no, 2) | |
no = tf.cast (no, tf.float32) | |
#tf.print ('no:', no) | |
################ | |
## Transmitter | |
################ | |
b = self._binary_source([batch_size, k]) | |
c = self._encoder(b) | |
# Modulation | |
x = self._mapper(c) # x [batch size, num_symbols_per_codeword] | |
# Note ibo is not updated during iterations, that would be impractical | |
x_pred, mse = pred (x, 1.0, betas, 20*log10(limiter.ibo), limiter, samples_per_symbol, span_in_symbols, rrcf, mf, norm=opt.normalize) | |
# Upsample the QAM symbol sequence | |
x_us = self.us(x_pred) | |
xfilt_out = rrcf(x_us) | |
# ################ | |
# ## Channel | |
# ################ | |
# Find a new ibo with the predistorted signal to give the proper obo | |
ibo = tf.cast (10**(0.05 * find_ibodB (None, opt.obodB, x_pred)), tf.complex64) | |
print ('ibo set to:', 10*log10(ibo)) | |
limit_out = limiter (xfilt_out * ibo) | |
if return_limit_out: | |
return limit_out | |
y = self._channel([limit_out, no*tf.cast(samples_per_symbol, no.dtype)]) # [batch size, num_symbols_per_codeword] | |
mf_out = mf (y) | |
# # Instantiate a downsampling layer | |
# num_symbols = x.shape[1] | |
# # Recover the transmitted symbol sequence | |
x_hat = self.ds(mf_out)[:,:x.shape[1]] | |
#xconst_sqnorm = power (x) should be 1 anyway | |
signal = tf.reduce_sum (tf.multiply (x_hat, tf.math.conj(x)))/tf.cast(tf.size(x), x.dtype) | |
#print ('signal:', signal) | |
#print ('y:', y) | |
y = x_hat / signal | |
#tf.print ('y:', y) | |
################ | |
## Receiver | |
################ | |
#tf.print (y.shape, no.shape) | |
#print ('trace demapper') | |
llr = self._demapper([y, no]) | |
# Outer decoding | |
b_hat = self._decoder(llr) | |
#tf.print (b[0,:10], b_hat[0,:10]) | |
#tf.print ('exec') | |
return b,b_hat # Ground truth and reconstructed information bits returned for BER/BLER computation | |
from llr import bit_llr_to_sym_ll3 | |
def prob (x): | |
y = -x | |
one = tf.constant (1., dtype=tf.float64) | |
zero = tf.constant (0., dtype=tf.float64) | |
return tf.where (y >= zero, one/(one + tf.exp (-y)), one/(one + tf.exp (y))) | |
class Cancel(Model): | |
def __init__(self, constellation=None): | |
super().__init__() | |
################ | |
## Transmitter | |
################ | |
self._binary_source = BinarySource() | |
self._encoder = LDPC5GEncoder(k, n, num_bits_per_symbol) | |
if constellation is None: | |
constellation = Constellation("qam", num_bits_per_symbol, trainable=False) | |
self.constellation = constellation | |
else: | |
self.constellation = constellation | |
self._mapper = Mapper(constellation=constellation) | |
################ | |
## Channel | |
################ | |
self._channel = AWGN() | |
self.us = Upsampling(samples_per_symbol) | |
self.ds = Downsampling(samples_per_symbol, rrcf.length-1) | |
################ | |
## Receiver | |
################ | |
self._demapper = Demapper("app", constellation=constellation)#demapper (constellation.points, M, num_symbols_per_codeword) | |
self._decoder = LDPC5GDecoder(self._encoder, hard_out=False, stateful=True, num_iter=20, return_infobits=False) | |
#@tf.function()#jit_compile=True) | |
def call(self, batch_size, esno_db=None, ebno_db=None): | |
# kludge: sim_ber will call with ebno_db, but it's really esno we're using | |
if esno_db == None: | |
esno_db = ebno_db | |
# If `esno_db` is a scalar, a tensor with shape [batch size] is created as it is what is expected by some layers | |
if len(esno_db.shape) == 0: | |
esno_db = tf.fill([batch_size], esno_db) | |
no = 10**(-0.1 * esno_db) | |
no = expand_to_rank(no, 2) | |
no = tf.cast (no, tf.float32) | |
#tf.print ('no:', no) | |
################ | |
## Transmitter | |
################ | |
b = self._binary_source([batch_size, k]) | |
c = self._encoder(b) | |
# Modulation | |
x = self._mapper(c) # x [batch size, num_symbols_per_codeword] | |
# Note ibo is not updated during iterations, that would be impractical | |
# Upsample the QAM symbol sequence | |
x_us = self.us(x) | |
xfilt_out = rrcf(x_us) | |
# ################ | |
# ## Channel | |
# ################ | |
# Find a new ibo with the predistorted signal to give the proper obo | |
ibo = tf.cast (10**(0.05 * find_ibodB (None, opt.obodB, x)), tf.complex64) | |
print ('ibo set to:', 10*log10(ibo)) | |
limit_out = limiter (xfilt_out * ibo) | |
y = self._channel([limit_out, no*tf.cast(samples_per_symbol, no.dtype)]) # [batch size, num_symbols_per_codeword] | |
mf_out = mf (y) | |
# # Instantiate a downsampling layer | |
# num_symbols = x.shape[1] | |
# # Recover the transmitted symbol sequence | |
x_hat = self.ds(mf_out)[:,:x.shape[1]] | |
#xconst_sqnorm = power (x) should be 1 anyway | |
msg_vn = None | |
dist = 0 | |
for iter in range (opt.rcv_iter): | |
signal = tf.reduce_sum (tf.multiply (x_hat, tf.math.conj(x)))/tf.cast(tf.size(x), x.dtype) | |
#print ('signal:', signal) | |
#print ('y:', y) | |
y = x_hat / signal | |
y -= dist | |
mse = power (y - x) | |
print ('iter:', iter, 'mse:', 10*log10(mse)) | |
#tf.print ('y:', y) | |
################ | |
## Receiver | |
################ | |
#tf.print (y.shape, no.shape) | |
#print ('trace demapper') | |
llr = self._demapper([y, no]) | |
# Outer decoding | |
b_hat, msg_vn = self._decoder([llr, msg_vn]) | |
b_hd = tf.cast(tf.less(0.0, b_hat), b.dtype) | |
bler = tf.reduce_mean (tf.cast (tf.not_equal(c, b_hd), tf.float32)) | |
# bler = tf.reduce_any(tf.not_equal(c, b_hd), axis=-1) | |
# bler = tf.cast(bler, tf.float64) # tf.float64 to suport large batch-sizes | |
# bler = tf.reduce_mean(bler) | |
# tf.reduce_mean (tf.cast (tf.not_equal(c, b_hd), tf.float32), axis=-1) | |
print ('bler:', bler) | |
sym_ll = tf.convert_to_tensor ([bit_llr_to_sym_ll3 (-b_hat[row].numpy().astype(float), llr_mask.astype(np.int32)) for row in range (b_hat.shape[0])]) | |
sym_prob = tf.exp (sym_ll) | |
tfconst = self.constellation.points | |
exp_syms = tf.tensordot (tf.cast(sym_prob, tf.complex64), tfconst, 1) | |
remod_us = self.us(exp_syms) | |
remod_out = rrcf(remod_us) | |
limit_out = limiter (remod_out * limiter.ibo) | |
remod_mf_out = mf (limit_out) | |
remod_x_hat = self.ds(remod_mf_out)[:,:x.shape[1]] | |
signal2 = tf.reduce_sum (tf.multiply (remod_x_hat, tf.math.conj(x)))/tf.cast(tf.size(x), x.dtype) | |
#print ('signal:', signal) | |
#print ('y:', y) | |
y2 = remod_x_hat / signal2 | |
dist = y2 - exp_syms # diff between what went into mod and came out of demod | |
breakpoint() | |
#tf.print (b[0,:10], b_hat[0,:10]) | |
#tf.print ('exec') | |
return b, b_hd # Ground truth and reconstructed information bits returned for BER/BLER computation | |
class Centroids(Model): | |
# I think to train need both mapper and constellation to be trainable | |
def __init__(self, constellation=None, training=False, pred=False): | |
super().__init__() | |
self._training = training | |
self.pred = pred | |
################ | |
## Transmitter | |
################ | |
self._binary_source = BinarySource() | |
if not training: | |
self._encoder = LDPC5GEncoder(k, n, num_bits_per_symbol) | |
if constellation is None: | |
constellation = Constellation("qam", num_bits_per_symbol, trainable=False) | |
self.constellation = constellation | |
else: | |
self.constellation = constellation | |
self._mapper = Mapper(constellation=constellation, return_indices=True, trainable=training) | |
################ | |
## Channel | |
################ | |
self._channel = AWGN() | |
################ | |
## Receiver | |
################ | |
#self._demapper = Demapper("app", constellation=self.constellation2) | |
self._demapper = demapper(M, num_symbols_per_codeword) | |
if not training: | |
self._decoder = LDPC5GDecoder(self._encoder, hard_out=True) | |
if self._training: | |
self._bce = tf.keras.losses.BinaryCrossentropy(from_logits=True) | |
self.us = Upsampling(samples_per_symbol) | |
self.ds = Downsampling(samples_per_symbol, rrcf.length-1) | |
self.sum_is = tf.Variable (tf.zeros (M)) | |
self.sum_i2s = tf.Variable (tf.zeros(M)) | |
self.sum_qs = tf.Variable (tf.zeros(M)) | |
self.sum_q2s = tf.Variable (tf.zeros(M)) | |
self.sum_iqs = tf.Variable (tf.zeros(M)) | |
self.counts = tf.Variable (tf.zeros(M)) | |
if opt.limit == 'sspa': | |
limiter.ibo = tf.cast (10**(0.05 * find_ibodB (self._mapper.constellation, opt.obodB)), tf.complex64) | |
elif opt.limit == 'linear': | |
limiter.ibo = tf.constant (1.0, tf.complex64) | |
@tf.function()#jit_compile=True) | |
def call(self, batch_size, esno_db=None, ebno_db=None): | |
# kludge: sim_ber will call with ebno_db, but it's really esno we're using | |
if esno_db == None: | |
esno_db = ebno_db | |
# If `esno_db` is a scalar, a tensor with shape [batch size] is created as it is what is expected by some layers | |
if len(esno_db.shape) == 0: | |
esno_db = tf.fill([batch_size], esno_db) | |
no = 10**(-0.1 * esno_db) | |
no = expand_to_rank(no, 2) | |
no = tf.cast (no, tf.float32) | |
################ | |
## Transmitter | |
################ | |
if self._training: | |
c = self._binary_source([batch_size, n]) | |
else: | |
b = self._binary_source([batch_size, k]) | |
c = self._encoder(b) | |
# Modulation | |
x, xconst = self._mapper(c) # x [batch size, num_symbols_per_codeword] | |
if self.pred: | |
x, mse = pred (x, 1.0, betas, 20*log10(limiter.ibo), limiter, samples_per_symbol, span_in_symbols, rrcf, mf, norm=opt.normalize) | |
# Upsample the QAM symbol sequence | |
x_us = self.us(x) | |
xfilt_out = rrcf(x_us) | |
################ | |
## Channel | |
################ | |
if self.pred: | |
# Find a new ibo with the predistorted signal to give the proper obo | |
ibo = tf.cast (10**(0.05 * find_ibodB (None, opt.obodB, x)), tf.complex64) | |
else: | |
ibo = limiter.ibo | |
limit_out = limiter (xfilt_out * ibo) | |
y = self._channel([limit_out, no*tf.cast(samples_per_symbol, no.dtype)]) # [batch size, num_symbols_per_codeword] | |
mf_out = mf (y) | |
x_hat = self.ds(mf_out)[:,:x.shape[1]] | |
rsyms = [x_hat[xconst==i] for i in range (M)] | |
if self._training: | |
self.sum_is.assign ([tf.reduce_sum (tf.math.real (rsyms[i])) for i in range (M)]) | |
self.sum_i2s.assign ([tf.reduce_sum (tf.math.real (rsyms[i])**2) for i in range (M)]) | |
self.sum_qs.assign ([tf.reduce_sum (tf.math.imag (rsyms[i])) for i in range (M)]) | |
self.sum_q2s.assign ([tf.reduce_sum (tf.math.imag (rsyms[i])**2) for i in range (M)]) | |
self.sum_iqs.assign ([tf.reduce_sum (tf.math.real (rsyms[i]) * tf.math.imag (rsyms[i])) for i in range (M)]) | |
self.counts.assign (tf.convert_to_tensor ([len (rsyms[i]) for i in range (M)], dtype=tf.float32)) | |
else: | |
self.sum_is.assign_add ([tf.reduce_sum (tf.math.real (rsyms[i])) for i in range (M)]) | |
self.sum_i2s.assign_add ([tf.reduce_sum (tf.math.real (rsyms[i])**2) for i in range (M)]) | |
self.sum_qs.assign_add ([tf.reduce_sum (tf.math.imag (rsyms[i])) for i in range (M)]) | |
self.sum_q2s.assign_add ([tf.reduce_sum (tf.math.imag (rsyms[i])**2) for i in range (M)]) | |
self.sum_iqs.assign_add ([tf.reduce_sum (tf.math.real (rsyms[i]) * tf.math.imag (rsyms[i])) for i in range (M)]) | |
self.counts.assign_add (tf.convert_to_tensor ([len (rsyms[i]) for i in range (M)], dtype=tf.float32)) | |
mean_is = [self.sum_is[i]/self.counts[i] for i in range (M)] | |
mean_qs = [self.sum_qs[i]/self.counts[i] for i in range (M)] | |
var_is = [self.sum_i2s[i]/self.counts[i] - (mean_is[i]**2) for i in range (M)] | |
var_qs = [self.sum_q2s[i]/self.counts[i] - (mean_qs[i]**2) for i in range (M)] | |
corrs = [(self.sum_iqs[i]/self.counts[i] - (mean_is[i] * mean_qs[i])) / (tf.sqrt(var_is[i]) * tf.sqrt(var_qs[i])) for i in range (M)] | |
stuff = [(tf.math.real (x_hat) - mean_is[i])**2 / var_is[i] + (tf.math.imag (x_hat) - mean_qs[i])**2 / var_qs[i] - 2 * corrs[i] * (tf.math.real (x_hat) - mean_is[i]) / tf.sqrt (var_is[i]) * (tf.math.imag (x_hat) - mean_qs[i]) / tf.sqrt (var_qs[i]) for i in range (M)] | |
junk = [stuff[i] - tf.math.log (2. * tf.constant (np.pi) * tf.sqrt ((1 - corrs[i])**2 * var_is[i] * var_qs[i])) for i in range (M)] | |
lls = tf.convert_to_tensor([-1 / (2 * (1 - corrs[i]**2)) * junk[i] for i in range (M)]) | |
lls = tf.transpose (lls, perm=[1,2,0]) | |
################ | |
## Receiver | |
################ | |
#llr = self._demapper([y, no]) | |
llr = self._demapper(lls) | |
# Outer decoding | |
if self._training: | |
return self._bce(c, llr) | |
else: | |
b_hat = self._decoder(llr) | |
return b,b_hat # Ground truth and reconstructed information bits returned for BER/BLER computation | |
# To train mapper for centroid demapper | |
# model_centroids_trainable = Centroids (Constellation (constellation_type='custom', initial_value=const, num_bits_per_symbol=bits_per_sym, trainable=True), training=True) | |
# to train: | |
# run_ml_training (model_centroids_trainable, suffix='centroids-train') | |
# Then to test | |
# model_centroids3 = Centroids (Constellation (constellation_type='custom', initial_value=model_centroids_trainable.constellation.points.numpy(), num_bits_per_symbol=bits_per_sym)) | |
# run_centroids (model_centroids3, label='mapper=centroidopt+demapper=centroid') | |
# | |
# for opt mapper + centroid demapper: | |
# you might need load_weights first if you haven't just trained model_64apsk | |
#model_centroids2 = Centroids (Constellation (constellation_type='custom', initial_value=model_64apsk.constellation.points.numpy(), num_bits_per_symbol=bits_per_sym)) | |
# In[22]: | |
# Range of SNRs over which the systems are evaluated | |
# In[23]: | |
# Utility function to load and set weights of a model | |
def load_weights(model, model_weights_path): | |
model(1, tf.constant(10.0, tf.float32)) | |
with open(model_weights_path, 'rb') as f: | |
weights = pickle.load(f) | |
model.set_weights(weights) | |
# The next cell evaluate the baseline and the two autoencoder-based communication systems, trained with different method. | |
# The results are stored in the dictionary ``BLER``. | |
# In[24]: | |
def run_qam64(): | |
print ('Qam64') | |
model_qam64 = Baseline() | |
if opt.limit == 'sspa': | |
limiter.ibo = tf.cast (10**(0.05 * find_ibodB (model_qam64.constellation, opt.obodB)), tf.complex64) | |
elif opt.limit == 'linear': | |
limiter.ibo = tf.constant (1.0, tf.complex64) | |
print ('ibo(dB) set to:', 20*np.log10 (limiter.ibo.numpy())) | |
_,bler = sim_ber(model_qam64, esno_dbs, batch_size=128, num_target_block_errors=opt.max_block_errors, max_mc_iter=opt.max_iter) | |
BLER['64qam'] = bler.numpy() | |
def run_apsk64(): | |
from const64apsk import gen_constellation_64apsk | |
c = gen_constellation_64apsk ('3/4') | |
model_apsk = Baseline (Constellation (constellation_type='custom', initial_value=c, num_bits_per_symbol=6)) | |
if opt.limit == 'sspa': | |
limiter.ibo = tf.cast (10**(0.05 * find_ibodB (model_apsk.constellation, opt.obodB)), tf.complex64) | |
elif opt.limit == 'linear': | |
limiter.ibo = tf.constant (1.0, tf.complex64) | |
print ('ibo(dB) set to:', 20*np.log10 (limiter.ibo.numpy())) | |
_,bler = sim_ber(model_apsk, esno_dbs, batch_size=128, num_target_block_errors=opt.max_block_errors, max_mc_iter=opt.max_iter) | |
BLER['64apsk'] = bler.numpy() | |
def plot_centroids(): | |
mc (batch_size=128, esno_db=tf.cast (200, tf.float32)) | |
centroids = tf.cast (mc.centroids, dtype=tf.complex64).numpy() | |
fig,ax = plt.subplots() | |
x_hat = tf.cast (mc.x_hat, dtype=tf.complex64).numpy().flatten() | |
ax.scatter (x_hat.real, x_hat.imag) | |
ax.scatter (centroids.real, centroids.imag, marker='x') | |
plt.show() | |
if __name__ == '__main__': | |
import sys | |
args = sys.argv | |
opt,logname = parse_args (args) | |
setup(opt, logname) | |
esno_dbs = np.arange(opt.esno_min, # Min SNR for evaluation | |
opt.esno_max, # Max SNR for evaluation | |
opt.esno_step) # Step | |
def setup_logger(): | |
import logger | |
import sys | |
sys.stderr = logger.logger (logname) | |
sys.results_file = logger.logger (logname.replace ('.log', '.results'), mode='wb') | |
def log_bler(esno_dbs, bit_errors, block_errors, nb_bits, nb_blocks): | |
from contextlib import redirect_stdout | |
if hasattr (sys.stderr, 'reopen'): | |
sys.stderr.reopen() | |
with redirect_stdout(sys.stderr): | |
print (args) | |
print (opt) | |
print ('esno:', esno_dbs.numpy()) | |
print ('block_errors:', block_errors.numpy()) | |
print ('blocks:', nb_blocks.numpy()) | |
bler = tf.cast(block_errors, tf.float64) / tf.cast(nb_blocks, tf.float64) | |
print ('bler:', bler.numpy()) | |
sys.stderr._flush() | |
if hasattr (sys, 'results_file'): | |
from pickle import dumps | |
results = { | |
'args' : args, | |
'opt' : opt, | |
'esno_db' : esno_dbs.numpy(), | |
'block_errors' : block_errors.numpy(), | |
'blocks' : nb_blocks.numpy(), | |
'bler' : bler.numpy(), | |
} | |
sys.results_file.reopen() | |
sys.results_file.write (dumps (results, -1)) | |
sys.results_file._flush() | |
if opt.ibodB is not None: | |
opt.obodB = measure_obo (Constellation (constellation_type='custom', initial_value=const, num_bits_per_symbol=bits_per_sym), opt.ibodB) | |
print (f'ibo was set to {opt.ibodB}, obo is set to {opt.obodB}') | |
def run_centroids(model_centroids, esno_dbs): | |
if opt.limit == 'sspa': | |
limiter.ibo = tf.cast (10**(0.05 * find_ibodB (model_centroids.constellation, opt.obodB)), tf.complex64) | |
elif opt.limit == 'linear': | |
limiter.ibo = tf.constant (1.0, tf.complex64) | |
print ('ibo(dB) set to:', 20*np.log10 (limiter.ibo.numpy())) | |
_,bler = sim_ber(model_centroids, esno_dbs, batch_size=128, num_target_block_errors=opt.max_block_errors, max_mc_iter=opt.max_iter, callback=log_bler, min_bler=opt.min_bler) | |
BLER = (esno_dbs, bler.numpy()) | |
return BLER | |
def run_ml(N=opt.N, n_layers=opt.n_layers, demapper=None, label='mapper=opt+demapper=NN', suffix=None, esno_dbs=esno_dbs, width=opt.width, kind=opt.kind, pred=False, load=opt.load): | |
model_conventional = E2ESystemConventionalTraining(training=False, constellation=const, N=N, n_layers=n_layers, width=width, kind=kind, pred=pred) | |
load_weights(model_conventional, load) | |
if demapper == 'app': | |
model_conventional._demapper = Demapper (demapping_method='app', num_bits_per_symbol=num_bits_per_symbol, constellation=model_conventional.constellation) | |
if opt.limit == 'sspa': | |
limiter.ibo = tf.cast (10**(0.05 * find_ibodB (model_conventional.constellation, opt.obodB)), tf.complex64) | |
elif opt.limit == 'linear': | |
limiter.ibo = tf.constant (1.0, tf.complex64) | |
print ('ibo(dB) set to:', 20*np.log10 (limiter.ibo.numpy())) | |
_,bler = sim_ber(model_conventional, esno_dbs, batch_size=128, num_target_block_errors=opt.max_block_errors, max_mc_iter=opt.max_iter, callback=log_bler, min_bler=opt.min_bler) | |
if label is None: | |
if demapper == 'app': | |
label = 'autoencoder-app'+suffix | |
else: | |
label = 'autoencoder-conv'+suffix | |
BLER = (esno_dbs, bler.numpy()) | |
return BLER | |
def run_baseline(model, esno_dbs=esno_dbs): | |
if opt.limit == 'sspa': | |
limiter.ibo = tf.cast (10**(0.05 * find_ibodB (model.constellation, opt.obodB)), tf.complex64) | |
elif opt.limit == 'linear': | |
limiter.ibo = tf.constant (1.0, tf.complex64) | |
print ('ibo(dB) set to:', 20*np.log10 (limiter.ibo.numpy())) | |
_,bler = sim_ber(model, esno_dbs, batch_size=128, num_target_block_errors=opt.max_block_errors, max_mc_iter=opt.max_iter, callback=log_bler, min_bler=opt.min_bler) | |
BLER = (esno_dbs, bler.numpy()) | |
return BLER | |
def run_pred(model, esno_dbs=esno_dbs): | |
if opt.limit == 'sspa': | |
limiter.ibo = tf.cast (10**(0.05 * find_ibodB (model.constellation, opt.obodB)), tf.complex64) | |
elif opt.limit == 'linear': | |
limiter.ibo = tf.constant (1.0, tf.complex64) | |
print ('ibo(dB) set to:', 20*np.log10 (limiter.ibo.numpy())) | |
_,bler = sim_ber(model, esno_dbs, batch_size=128, num_target_block_errors=opt.max_block_errors, max_mc_iter=opt.max_iter) | |
BLER = (esno_dbs, bler.numpy()) | |
return BLER | |
# model_rl = E2ESystemRLTraining(training=False) | |
# load_weights(model_rl, model_weights_path_rl_training) | |
# _,bler = sim_ber(model_rl, ebno_dbs, batch_size=128, num_target_block_errors=opt.max_block_errors, max_mc_iter=1000) | |
# BLER['autoencoder-rl'] = bler.numpy() | |
def save_results(): | |
with open(results_filename, 'wb') as f: | |
pickle.dump((esno_dbs, BLER), f) | |
def run_all(): | |
print ('run training') | |
run_ml_training() | |
print ('run ml') | |
run_ml() | |
print ('run qam64') | |
run_qam64() | |
print ('run centroids(64apsk)') | |
run_centroids ('64apsk') | |
print ('run centroids(64qam)') | |
run_centroids ('64qam') | |
print ('run 64apsk') | |
run_apsk64() | |
# In[25]: | |
def ebno_to_esno (ebnodB, R, bits_per_sym): | |
return ebnodB + 10*np.log10(R * bits_per_sym) | |
def plot_results(): | |
import matplotlib.pyplot as plt | |
from matplotlib.lines import Line2D | |
from itertools import cycle | |
m_cycle = cycle(Line2D.filled_markers) | |
fig,ax = plt.subplots() | |
# Baseline - Perfect CSI | |
#esno_dbs = ebno_dbs + 10*np.log10 (coderate * bits_per_sym) | |
for key, val in BLER.items(): | |
ax.semilogy(val[0], val[1], label=key, marker=next(m_cycle)) | |
ax.set_xlabel(r"$E_s/N_0$ (dB)") | |
ax.set_ylabel("BLER") | |
ax.grid(which="both") | |
ax.set_ylim((1e-5, 1.0)) | |
ax.legend() | |
fig.suptitle (f'limit={limiter.name},obo(dB)={opt.obodB}, R={coderate}, N={N}') | |
plt.tight_layout() | |
plt.savefig (logname + '.pdf') | |
plt.show() | |
# model_conventional = E2ESystemConventionalTraining(training=True) | |
# load_weights(model_conventional, model_weights_path_conventional_training) | |
# fig = model_conventional.constellation.show() | |
# fig.suptitle('Conventional training'); | |
def create_df(BLER, label): | |
import pandas as pd | |
import functools | |
df = pd.DataFrame ({'esno' : BLER[0], 'bler' : BLER[1]}) | |
#df.set_index ('esno', inplace=True) | |
df.replace (0.0, np.nan, inplace=True) | |
return df | |
def plot_df(df): | |
import matplotlib.pyplot as plt | |
from matplotlib.lines import Line2D | |
from itertools import cycle | |
m_cycle = cycle(Line2D.filled_markers) | |
fig,ax = plt.subplots() | |
lines = df.plot (logy=True, ax=ax) | |
for line in lines.get_lines(): | |
line.set_marker (next (m_cycle)) | |
ax.set_xlabel(r"$E_s/N_0$ (dB)") | |
ax.set_ylabel("BLER") | |
ax.grid(which="both") | |
ax.set_ylim((1e-5, 1.0)) | |
ax.legend() | |
fig.suptitle (f'limit={limiter.name},obo(dB)={opt.obodB}, R={coderate}, N={N}') | |
plt.tight_layout() | |
plt.savefig (logname + '.pdf') | |
plt.show() | |
def save_results(df): | |
from pickle import dump | |
dump (df, open (logname + '.results.pickle', 'wb'), -1) | |
# In[28]: | |
# model_rl = E2ESystemRLTraining(training=False) | |
# load_weights(model_rl, model_weights_path_rl_training) | |
# fig = model_rl.constellation.show() | |
# fig.suptitle('RL-based training'); | |
def plot_const(PDF, model, logname): | |
if PDF: | |
from mplcairo.multipage import MultiPage | |
cm = MultiPage(logname) | |
else: | |
from contextlib import suppress | |
cm = suppress() | |
c = const | |
c2 = model.constellation.points.numpy() | |
import matplotlib.pyplot as plt | |
with cm as pdf: | |
fig, ax = plt.subplots() | |
ax.set_aspect('equal') | |
ax.scatter (c.real, c.imag, marker='o', c='blue') | |
ax.set_xlim (-1.5, 1.5) | |
ax.set_ylim (-1.5, 1.5) | |
for j, p in enumerate (c): | |
ax.annotate (np.binary_repr (j, bits_per_sym), (p.real, p.imag)) | |
if PDF: | |
pdf.savefig(fig) | |
plt.close() | |
else: | |
plt.show() | |
fig, ax = plt.subplots() | |
ax.set_aspect('equal') | |
ax.scatter (c2.real, c2.imag, marker='o', c='red') | |
ax.set_xlim (-1.5, 1.5) | |
ax.set_ylim (-1.5, 1.5) | |
for j, p in enumerate (c2): | |
ax.annotate (np.binary_repr (j, bits_per_sym), (p.real, p.imag)) | |
if PDF: | |
pdf.savefig(fig) | |
plt.close() | |
else: | |
plt.show() | |
fig, ax = plt.subplots() | |
ax.set_aspect('equal') | |
ax.scatter (c.real, c.imag, marker='x', c='blue') | |
ax.scatter (c2.real, c2.imag, marker='o', c='red') | |
for a,b in zip (c, c2): | |
ax.arrow (b.real, b.imag, (a-b).real, (a-b).imag, length_includes_head=True) | |
if PDF: | |
pdf.savefig(fig) | |
plt.close() | |
else: | |
plt.show() | |
def plot_bler (BLER): | |
import pandas as pd | |
#array = np.array | |
df = pd.DataFrame(BLER) | |
df['esno'] = esno_dbs | |
df.set_index('esno', inplace=True) | |
df.replace(0, np.nan, inplace=True) | |
import matplotlib.pyplot as plt | |
fig,ax = plt.subplots() | |
df.plot (logy=True, ax=ax) | |
ax.grid() | |
fig.suptitle ('64apsk N=8400 R=2/3 linear') | |
plt.show() | |
def try_perm (const, esno_dbs): | |
model_conventional = E2ESystemConventionalTraining(training=False, constellation=const, N=N, n_layers=3) | |
z = np.arange (bits_per_sym) | |
blers = [] | |
ps = [] | |
from itertools import permutations | |
for i,p in enumerate (permutations (z)): | |
print (p) | |
model_conventional = E2ESystemConventionalTraining(training=False, constellation=const, N=N, n_layers=3, z=p) | |
_,bler = sim_ber(model_conventional, esno_dbs, batch_size=128, num_target_block_errors=5, max_mc_iter=100) | |
ps.append (p) | |
blers.append (bler) | |
return {'bler' : blers, 'z' : ps} | |
def bitwise_mutual_info (bce): | |
return 1. - bce/tf.math.log(2.0) | |
# def capacity (model, esno_range, bits_per_sym): | |
# c = [bits_per_sym * bitwise_mutual_info (model (batch_size=256, esno_db=tf.constant(esno))).numpy() for esno in esno_range] | |
# return c | |
def capacity (model, esno_range, bits_per_sym): | |
c = [bits_per_sym * (model (batch_size=256, esno_db=tf.constant(esno), return_ami=True)).numpy() for esno in esno_range] | |
return c | |
# ## References <a class="anchor" id="References"></a> | |
# [1] T. O’Shea and J. Hoydis, "An Introduction to Deep Learning for the Physical Layer," in IEEE Transactions on Cognitive Communications and Networking, vol. 3, no. 4, pp. 563-575, Dec. 2017, doi: 10.1109/TCCN.2017.2758370. | |
# | |
# [2] S. Cammerer, F. Ait Aoudia, S. Dörner, M. Stark, J. Hoydis and S. ten Brink, "Trainable Communication Systems: Concepts and Prototype," in IEEE Transactions on Communications, vol. 68, no. 9, pp. 5489-5503, Sept. 2020, doi: 10.1109/TCOMM.2020.3002915. | |
# | |
# [3] F. Ait Aoudia and J. Hoydis, "Model-Free Training of End-to-End Communication Systems," in IEEE Journal on Selected Areas in Communications, vol. 37, no. 11, pp. 2503-2516, Nov. 2019, doi: 10.1109/JSAC.2019.2933891. | |
#args = ['32apsk_cnn.py'] | |
if opt.action == 'run-baseline': | |
setup_logger() | |
model_baseline = E2ESystemConventionalTraining(training=False, constellation=const, demapper_type='app') | |
BLER = run_baseline (model_baseline) | |
if opt.action == 'run-opt-baseline': | |
BLER = run_ml (demapper='app') | |
with open (logname, 'w') as out: | |
print (args, file=out) | |
print (opt, file=out) | |
from pickle import dump | |
results = { | |
'opt' : opt, | |
'args' : args, | |
'df' : create_df (BLER, label=f'mapper=opt,demapper=NN,w={opt.width},d={opt.n_layers},N={opt.N}'), | |
} | |
dump (results, open (results_filename, 'wb'), -1) | |
if opt.action == 'run-pred': | |
betas = np.logspace (np.log10(opt.initial_beta), np.log10 (opt.final_beta), opt.pred_iter) | |
print ('betas:', betas) | |
from iter_pred import pred | |
model_pred = Pred (Constellation (constellation_type='custom', initial_value=const, num_bits_per_symbol=bits_per_sym)) | |
BLER = run_pred (model_pred) | |
with open (logname, 'w') as out: | |
print (args, file=out) | |
print (opt, file=out) | |
from pickle import dump | |
results = { | |
'opt' : opt, | |
'args' : args, | |
'df' : create_df (BLER, label=f'mapper=pred+orig,demapper=app'), | |
} | |
dump (results, open (results_filename, 'wb'), -1) | |
if opt.action == 'run-cancel': | |
model_pred = Cancel (Constellation (constellation_type='custom', initial_value=const, num_bits_per_symbol=bits_per_sym)) | |
BLER = run_pred (model_pred) | |
with open (logname, 'w') as out: | |
print (args, file=out) | |
print (opt, file=out) | |
from pickle import dump | |
results = { | |
'opt' : opt, | |
'args' : args, | |
'df' : create_df (BLER, label=f'mapper=orig,demapper=cancel+app'), | |
} | |
dump (results, open (results_filename, 'wb'), -1) | |
if opt.action == 'run-opt-pred': | |
betas = np.logspace (np.log10(opt.initial_beta), np.log10 (opt.final_beta), opt.pred_iter) | |
print ('betas:', betas) | |
from iter_pred import pred | |
model_conventional = E2ESystemConventionalTraining(training=False, constellation=const, N=opt.N, n_layers=opt.n_layers, width=opt.width) | |
load_weights(model_conventional, opt.load) | |
model_pred = Pred (Constellation (constellation_type='custom', initial_value=model_conventional.constellation.points.numpy(), num_bits_per_symbol=bits_per_sym)) | |
BLER = run_pred (model_pred) | |
with open (logname, 'w') as out: | |
print (args, file=out) | |
print (opt, file=out) | |
from pickle import dump | |
results = { | |
'opt' : opt, | |
'args' : args, | |
'df' : create_df (BLER, label=f'mapper=pred+opt,demapper=app'), | |
} | |
dump (results, open (results_filename, 'wb'), -1) | |
if opt.action == 'run-centroids': | |
setup_logger() | |
model_centroids = Centroids (Constellation (constellation_type='custom', initial_value=const, num_bits_per_symbol=bits_per_sym)) | |
BLER = run_centroids (model_centroids, esno_dbs=esno_dbs) | |
# with open (logname, 'w') as out: | |
# print (args, file=out) | |
# print (opt, file=out) | |
# from pickle import dump | |
# results = { | |
# 'opt' : opt, | |
# 'args' : args, | |
# 'df' : create_df (BLER, label=f'mapper=opt,demapper=NN,w={opt.width},d={opt.n_layers},N={opt.N}'), | |
# } | |
# dump (results, open (results_filename, 'wb'), -1) | |
if opt.action == 'run-pred-centroids': | |
model_centroids = Centroids (Constellation (constellation_type='custom', initial_value=const, num_bits_per_symbol=bits_per_sym), pred=True) | |
from iter_pred import pred | |
betas = np.logspace (np.log10(opt.initial_beta), np.log10 (opt.final_beta), opt.pred_iter) | |
print ('betas:', betas) | |
BLER = run_centroids (model_centroids, esno_dbs=esno_dbs) | |
with open (logname, 'w') as out: | |
print (args, file=out) | |
print (opt, file=out) | |
from pickle import dump | |
results = { | |
'opt' : opt, | |
'args' : args, | |
'df' : create_df (BLER, label=f'mapper=orig,demapper=centroids'), | |
} | |
dump (results, open (results_filename, 'wb'), -1) | |
def do_train_ml(): | |
model_32apsk = E2ESystemConventionalTraining(training=True, constellation=const, N=opt.N, n_layers=opt.n_layers, train_mapper=True, width=opt.width, kind=opt.kind) | |
with open (logname, 'w') as out: | |
print (args, file=out) | |
print (opt, file=out) | |
weights_path = run_ml_training (model_32apsk) | |
return weights_path | |
if opt.action == 'train-ml': | |
do_train_ml() | |
if opt.action == 'train-and-run-ml': | |
print ('training...') | |
weights_path = do_train_ml() | |
print ('running...') | |
run_ml (load=weights_path) | |
if opt.action == 'train-centroids': | |
model_32apsk = model_centroids_trainable = Centroids (Constellation (constellation_type='custom', initial_value=const, num_bits_per_symbol=bits_per_sym, trainable=True), training=True) | |
with open (logname, 'w') as out: | |
print (args, file=out) | |
print (opt, file=out) | |
run_ml_training (model_32apsk) | |
if opt.action == 'train-ml-pred': | |
from iter_pred import pred | |
betas = np.logspace (np.log10(opt.initial_beta), np.log10 (opt.final_beta), opt.pred_iter) | |
print ('betas:', betas, 'sum:', sum(betas)) | |
model_32apsk = E2ESystemConventionalTraining(training=True, constellation=const, N=opt.N, n_layers=opt.n_layers, train_mapper=True, width=opt.width, kind=opt.kind, pred=True) | |
with open (logname, 'w') as out: | |
print (args, file=out) | |
print (opt, file=out) | |
run_ml_training (model_32apsk) | |
def do_run_ml(): | |
setup_logger() | |
BLER = run_ml () | |
return BLER | |
if opt.action == 'run-ml': | |
BLER = do_run_ml() | |
if opt.action == 'run-ml-pred': | |
betas = np.logspace (np.log10(opt.initial_beta), np.log10 (opt.final_beta), opt.pred_iter) | |
print ('betas:', betas) | |
from iter_pred import pred | |
BLER = run_ml (pred=True) | |
with open (logname, 'w') as out: | |
print (args, file=out) | |
print (opt, file=out) | |
from pickle import dump | |
results = { | |
'opt' : opt, | |
'args' : args, | |
'df' : create_df (BLER, label=f'mapper=opt,demapper=NN,w={opt.width},d={opt.n_layers},N={opt.N},kind={opt.kind}'), | |
} | |
dump (results, open (results_filename, 'wb'), -1) | |
if opt.action == 'run-opt-centroids': | |
model_conventional = E2ESystemConventionalTraining(training=False, constellation=const, N=opt.N, n_layers=opt.n_layers, width=opt.width) | |
load_weights(model_conventional, opt.load) | |
model_centroids2 = Centroids (Constellation (constellation_type='custom', initial_value=model_conventional.constellation.points.numpy(), num_bits_per_symbol=bits_per_sym)) | |
BLER = run_centroids (model_centroids2, esno_dbs=esno_dbs) | |
with open (logname, 'w') as out: | |
print (args, file=out) | |
print (opt, file=out) | |
from pickle import dump | |
results = { | |
'opt' : opt, | |
'args' : args, | |
'df' : create_df (BLER, label=f'mapper=opt,demapper=centroid') | |
} | |
dump (results, open (results_filename, 'wb'), -1) | |
if opt.action == 'run-centroid-opt-centroids': | |
model_centroids = Centroids (Constellation (constellation_type='custom', initial_value=const, num_bits_per_symbol=bits_per_sym, trainable=True)) | |
# I think we need trainable=True or we can't load_weights in next step | |
load_weights(model_centroids, opt.load) | |
BLER = run_centroids (model_centroids, esno_dbs=esno_dbs) | |
with open (logname, 'w') as out: | |
print (args, file=out) | |
print (opt, file=out) | |
from pickle import dump | |
results = { | |
'opt' : opt, | |
'args' : args, | |
'df' : create_df (BLER, label=f'mapper=centroid-opt,demapper=centroid') | |
} | |
dump (results, open (results_filename, 'wb'), -1) | |
if opt.action == 'run-opt-pred-centroids': | |
model_conventional = E2ESystemConventionalTraining(training=False, constellation=const, N=opt.N, n_layers=opt.n_layers, width=opt.width) | |
load_weights(model_conventional, opt.load) | |
from iter_pred import pred | |
betas = np.logspace (np.log10(opt.initial_beta), np.log10 (opt.final_beta), opt.pred_iter) | |
print ('betas:', betas) | |
model_centroids2 = Centroids (Constellation (constellation_type='custom', initial_value=model_conventional.constellation.points.numpy(), num_bits_per_symbol=bits_per_sym), pred=True) | |
BLER = run_centroids (model_centroids2, esno_dbs=esno_dbs) | |
with open (logname, 'w') as out: | |
print (args, file=out) | |
print (opt, file=out) | |
from pickle import dump | |
results = { | |
'opt' : opt, | |
'args' : args, | |
'df' : create_df (BLER, label=f'mapper=pred+opt,demapper=centroids'), | |
} | |
dump (results, open (results_filename, 'wb'), -1) | |
if opt.action == 'dump-const': | |
model_conventional = E2ESystemConventionalTraining(training=False, constellation=const, N=opt.N, n_layers=opt.n_layers, width=opt.width, kind=opt.kind) | |
load_weights(model_conventional, opt.load) | |
c = model_conventional.constellation.points.numpy() | |
const_filename = opt.load.replace ('.log,conventional.weights', '.const') | |
const_txt_filename = opt.load.replace ('.log,conventional.weights', '.const.txt') | |
from pickle import dump | |
dump (c, open (const_filename, 'wb'), -1) | |
print (c, file=open (const_txt_filename, 'w')) | |
if opt.action == 'dump-orig-const': | |
const_filename = opt.load.replace ('.log,conventional.weights', '.orig.const') | |
from pickle import dump | |
dump (const, open (const_filename, 'wb'), -1) | |
if opt.action == 'plot-const': | |
model_conventional = E2ESystemConventionalTraining(training=False, constellation=const, N=opt.N, n_layers=opt.n_layers, width=opt.width, kind=opt.kind) | |
load_weights(model_conventional, opt.load) | |
const_filename = opt.load.replace ('.log,conventional.weights', '.const.pdf') | |
plot_const (PDF=opt.pdf, model=model_conventional, logname=const_filename) | |
if opt.action == 'plot-spec': | |
from spectral_est import spectral_est | |
S = spectral_est (4096) | |
model_conventional = E2ESystemConventionalTraining(training=False, constellation=const, N=opt.N, n_layers=opt.n_layers, width=opt.width, kind=opt.kind) | |
if opt.load is not None: | |
load_weights(model_conventional, opt.load) | |
if opt.limit == 'sspa': | |
limiter.ibo = tf.cast (10**(0.05 * find_ibodB (model_conventional.constellation, opt.obodB)), tf.complex64) | |
elif opt.limit == 'linear': | |
limiter.ibo = tf.constant (1.0, tf.complex64) | |
print ('ibo(dB) set to:', 20*np.log10 (limiter.ibo.numpy())) | |
limit_out = model_conventional (batch_size=256, esno_db=tf.constant(200.), return_limit_out=True) | |
for row in range (limit_out.shape[0]): | |
S += limit_out[row] | |
S.flush() | |
import matplotlib.pyplot as plt | |
from numpy.fft import fftshift | |
plt.plot (fftshift (10*log10 (S.get()))) | |
plt.grid() | |
plt.show() | |
if opt.action == 'plot-spec-pred': | |
from spectral_est import spectral_est | |
S = spectral_est (4096) | |
betas = np.logspace (np.log10(opt.initial_beta), np.log10 (opt.final_beta), opt.pred_iter) | |
print ('betas:', betas) | |
from iter_pred import pred | |
if opt.load is not None: | |
model_conventional = E2ESystemConventionalTraining(training=False, constellation=const, N=opt.N, n_layers=opt.n_layers, width=opt.width) | |
load_weights(model_conventional, opt.load) | |
model_pred = Pred (Constellation (constellation_type='custom', initial_value=model_conventional.constellation.points.numpy(), num_bits_per_symbol=bits_per_sym)) | |
else: | |
model_pred = Pred (Constellation (constellation_type='custom', initial_value=const, num_bits_per_symbol=bits_per_sym)) | |
if opt.limit == 'sspa': | |
limiter.ibo = tf.cast (10**(0.05 * find_ibodB (model_pred.constellation, opt.obodB)), tf.complex64) | |
elif opt.limit == 'linear': | |
limiter.ibo = tf.constant (1.0, tf.complex64) | |
print ('ibo(dB) set to:', 20*np.log10 (limiter.ibo.numpy())) | |
limit_out = model_pred (batch_size=256, esno_db=tf.constant(200.), return_limit_out=True) | |
for row in range (limit_out.shape[0]): | |
S += limit_out[row] | |
S.flush() | |
import matplotlib.pyplot as plt | |
from numpy.fft import fftshift | |
plt.plot (fftshift (10*log10 (S.get()))) | |
plt.grid() | |
plt.show() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment