TUT: How to Read cosmo-field data, then calculate the Positivization?¶
Tian-Cheng Luan
2025.06.09
Read the temperature from binary file¶
In [ ]:
Ng = 512 # Field res
boxsize = 1000. # in unit Mpc/h
nu_axis = 0 # Here, we treat the 0th axis as the line-of-sight direction.
In [ ]:
import os
import numpy as np, gc
_root_dir = "/Raid6/2/luantch/nbody/512/par_plk2018/bao/snap_outputs/dm2HI_new/21cm_output"
# Here we use z=0.989(no rsd) as an example
redshift = 0.989
_file = os.path.join(_root_dir, f"Total_BAO_Z{redshift:.4f}.d21")
# Read the cube
cube_orig = np.fromfile(_file, dtype=np.float32).reshape(Ng, Ng, Ng)
# Check the cube
print(f"Field shape : {cube_orig.shape}")
print(f"Field Mean : {cube_orig.mean():.4f} [mK]")
print(f"Field Max : {cube_orig.max():.4f} [mK]")
print(f"Field Min : {cube_orig.min():.4f} [mK]")
Field shape : (512, 512, 512) Field Mean : 0.1921 [mK] Field Max : 174.5629 [mK] Field Min : 0.0000 [mK]
Visualization configuration space for one slice¶
In [ ]:
import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
plt.figure(figsize=(5,5))
plt.imshow(cube_orig[256], norm=LogNorm(), cmap='Greys')
plt.xticks([])
plt.yticks([])
Out[ ]:
([], [])
Calculate the 2d-cylinder power spectrum¶
Include the Fourier utils funcs:¶
In [ ]:
from scipy import stats
def _get_dims(box_dims, ashape):
'''
If box dims is a scalar, assume that dimensions are cubic and make a list
If it's not given, assume it's the default value of the boxsize
Otherwise, return as it is
'''
if not hasattr(box_dims, '__iter__'):
return [box_dims]*len(ashape)
return box_dims
def _power_spectrum_nd_r(input_cube, box_dims=None, verbose=False):
box_dims = _get_dims(box_dims, input_cube.shape)
if(verbose): print( 'Calculating power spectrum...')
ft = np.fft.rfftn(input_cube) # here we dont take shift for rfft
power_spectrum = np.abs(ft)**2
# scale
boxvol = np.product(box_dims)
pixelsize = boxvol/(np.product(input_cube.shape))
power_spectrum *= pixelsize**2/boxvol
return ft,power_spectrum
def _get_k_r_3d(input_cube, box_dims):
'''
Get the k values for input array(only 3d) with given dimensions using rfft.
Return k components and magnitudes.
For internal use.
'''
dim = len(input_cube.shape)
if dim == 3:
nx, ny, nz = input_cube.shape
x = np.fft.fftfreq(nx, d=box_dims[0] / nx)
y = np.fft.fftfreq(ny, d=box_dims[1] / ny)
z = np.fft.rfftfreq(nz, d=box_dims[2] / nz)
kx, ky, kz = np.meshgrid(x, y, z, indexing='ij')
kx = 2.0 * np.pi * kx
ky = 2.0 * np.pi * ky
kz = 2.0 * np.pi * kz
k = np.sqrt(kx**2 + ky**2 + kz**2)
return [kx, ky, kz], k
else:
print('ERROR: Only support 3d input field')
exit(1)
def _power_spectrum_2d_r(input_cube, kbins=10, binning='log', box_dims=1000., nu_axis=0, info=False):
"""
Compute and bin the 2D power spectrum (k⊥ vs. k∥) of a 3D data cube.
This function takes a 3D array, computes its Fourier transform and power spectrum,
then bins the power into perpendicular (k⊥) and parallel (k∥) wavenumber bins.
"""
if type(kbins) == list:
binning = None
elif np.array(kbins).size==1:
kbins = [kbins, kbins]
elif not isinstance(kbins[0], int):
binning = None
box_dims = _get_dims(box_dims, input_cube.shape)
ft, power = _power_spectrum_nd_r(input_cube, box_dims)
k_xyz, k = _get_k_r_3d(input_cube, box_dims)
xy_axis = [0, 1, 2]
xy_axis.remove(nu_axis)
kz = np.abs(k_xyz[nu_axis]) # in 3d
kp = np.sqrt(k_xyz[xy_axis[0]]**2 + k_xyz[xy_axis[1]]**2) # in 3d
del k_xyz, k, xy_axis
gc.collect()
if binning is None:
kper = np.array(kbins[0])
kpar = np.array(kbins[1])
else:
if binning=='log':
kper = np.linspace(np.log10(kp[kp!=0].min()), np.log10(kp.max()), kbins[0]+1)
kpar = np.linspace(np.log10(kz[kz!=0].min()), np.log10(kz.max()), kbins[1]+1)
kp, kz = np.log10(kp), np.log10(kz)
elif binning=='linear':
kper = np.linspace(kp[kp!=0].min(), kp.max(), kbins[0]+1)
kpar = np.linspace(kz[kz!=0].min(), kz.max(), kbins[1]+1)
if info:
print(f'INFO <pk_func> | kpar max : {kpar.max()}')
print(f'INFO <pk_func> | kpar min : {kpar.min()}')
print(f'INFO <pk_func> | kper max : {kper.max()}')
print(f'INFO <pk_func> | kper min : {kper.min()}')
ps = stats.binned_statistic_2d(x=kp.flatten(), y= kz.flatten(), values= power.flatten(), statistic='mean', bins=[kper, kpar])
if binning=='log':
kper_mid = np.power(10, 0.5*(kper[:-1]+kper[1:]))
kpar_mid = np.power(10, 0.5*(kpar[:-1]+kpar[1:]))
else:
kper_mid = (kper[:-1]+kper[1:])/2.
kpar_mid = (kpar[:-1]+kpar[1:])/2.
return kp, kz, ft, power, ps.statistic, kper_mid, kpar_mid
def wrap_2dfft(cube, boxsize, kbins=50, nu_axis=2, info=False):
kper_, kpar_, ft_, power_, ps_, kper_bins_, kpar_bins_ = _power_spectrum_2d_r(
input_cube = cube,
kbins = kbins,
binning = 'linear',
box_dims = boxsize,
nu_axis = nu_axis,
info = info
)
return kper_, kpar_, ft_, power_, ps_, kper_bins_, kpar_bins_
In [ ]:
%%time
kper_orig, kpar_orig, ft_orig, ps_orig, ps_bin_orig, kper_bin_orig, kpar_bin_orig = wrap_2dfft(cube_orig, boxsize, nu_axis=nu_axis)
CPU times: user 8.59 s, sys: 5.5 s, total: 14.1 s Wall time: 14.1 s
Visualization cylinder power spectrum¶
In [ ]:
plt.figure(figsize=(6,5))
vmin, vmax = 1e0, 1e2
# For some extreme small value, set to white point diectly
ps_mask = ps_bin_orig.T.copy()
_mask = ps_mask <10**(-10)
ps_mask[_mask] = np.nan
plt.pcolormesh(kper_bin_orig, kpar_bin_orig, ps_bin_orig.T, cmap='plasma', norm=LogNorm(vmin=vmin,vmax=vmax), rasterized=True)
cbar = plt.colorbar(norm=LogNorm())
label = cbar.ax.yaxis.label
label.set_size(22)
cbar.ax.tick_params(labelsize=16)
plt.xlabel(r'$k_{\perp}$', fontsize=22)
plt.ylabel(r'$k_{\parallel}$', fontsize=22)
plt.title("Orig", fontsize=22)
Out[ ]:
Text(0.5, 1.0, 'Orig')
Remove Wedge then do Positivization¶
Include Wedge&Posi utils funcs:¶
In [ ]:
from scipy import integrate
def _k_Wedge(z, kp, theta_FOV=2.):
# Use Planck 2018 for an example
omega_m = 0.3158
omega_l = 0.6842
E_z = np.sqrt(omega_m * (1+z)**3 + omega_l)
inq = integrate.quad(lambda x:1/np.sqrt(omega_m * (1+x)**3 + omega_l),0,z)
return kp * E_z / (1. + z) * inq[0] * np.sin(theta_FOV) # Eq 166, Liu & Richard (2020)
def gen_mask_cube_r(ft, kper, kpar, kmm, redshift, mask_value=0., do_wedge=True, theta_FOV=2.):
# build masks
if do_wedge:
wedge_lim = _k_Wedge(z=redshift, kp=kper, theta_FOV=theta_FOV)
cond_L = (kpar <= wedge_lim) | (kpar <= kmm)
else:
cond_L = (kpar <= kmm)
cond_S = ~cond_L
# apply masks directly (unshifted)
mask_ft_L = ft.copy()
mask_ft_S = ft.copy()
mask_ft_L[cond_S] = mask_value
mask_ft_S[cond_L] = mask_value
# inverse rFFT
mask_cube_L = np.fft.irfftn(mask_ft_L)
mask_cube_S = np.fft.irfftn(mask_ft_S)
return mask_cube_L, mask_ft_L, mask_cube_S, mask_ft_S
Calculate the wedge-removed cube¶
In [ ]:
%%time
# We use kmm=0.5 h/Mpc for an example
kmm = 0.5
_, _, cube_kcut, _ = gen_mask_cube_r(ft_orig, kper_orig, kpar_orig, kmm, redshift)
CPU times: user 9.49 s, sys: 5.69 s, total: 15.2 s Wall time: 15.2 s
Visualization in 2d Fourier¶
In [ ]:
%%time
kper_kcut, kpar_kcut, ft_kcut, ps_kcut, ps_bin_kcut, kper_bin_kcut, kpar_bin_kcut = wrap_2dfft(cube_kcut, boxsize, nu_axis=nu_axis)
CPU times: user 12.3 s, sys: 29.2 s, total: 41.5 s Wall time: 42.1 s
In [ ]:
plt.figure(figsize=(6,5))
vmin, vmax = 1e0, 1e2
# For some extreme small value, set to white point diectly
ps_mask = ps_bin_kcut.T.copy()
_mask = ps_mask <10**(-10)
ps_mask[_mask] = np.nan
plt.pcolormesh(kper_bin_kcut, kpar_bin_kcut, ps_mask, cmap='plasma', norm=LogNorm(vmin=vmin,vmax=vmax), rasterized=True)
cbar = plt.colorbar(norm=LogNorm())
label = cbar.ax.yaxis.label
label.set_size(22)
cbar.ax.tick_params(labelsize=16)
plt.xlabel(r'$k_{\perp}$', fontsize=22)
plt.ylabel(r'$k_{\parallel}$', fontsize=22)
plt.title("kcut", fontsize=22)
Out[ ]:
Text(0.5, 1.0, 'kcut')
Operate Positivization for cube_kcut¶
In [ ]:
cube_posi = cube_kcut.copy()
cube_posi[cube_posi <= 0.] = 0.
In [ ]:
%%time
kper_posi, kpar_posi, ft_posi, ps_posi, ps_bin_posi, kper_bin_posi, kpar_bin_posi = wrap_2dfft(cube_posi, boxsize, nu_axis=nu_axis)
CPU times: user 9.89 s, sys: 19.2 s, total: 29.1 s Wall time: 29.4 s
Visualization in 2d fourier¶
In [ ]:
plt.figure(figsize=(6,5))
vmin, vmax = 1e0, 1e2
# For some extreme small value, set to white point diectly
ps_mask = ps_bin_posi.T.copy()
_mask = ps_mask <10**(-10)
ps_mask[_mask] = np.nan
plt.pcolormesh(kper_bin_posi, kpar_bin_posi, ps_mask, cmap='plasma', norm=LogNorm(vmin=vmin,vmax=vmax), rasterized=True)
cbar = plt.colorbar(norm=LogNorm())
label = cbar.ax.yaxis.label
label.set_size(22)
cbar.ax.tick_params(labelsize=16)
plt.xlabel(r'$k_{\perp}$', fontsize=22)
plt.ylabel(r'$k_{\parallel}$', fontsize=22)
plt.title("posi", fontsize=22)
Out[ ]:
Text(0.5, 1.0, 'posi')
Combined visualization¶
In [ ]:
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from matplotlib.colors import LogNorm
vmin, vmax = 1e0, 1e2
norm = LogNorm(vmin=vmin, vmax=vmax)
fig = plt.figure(figsize=(15.5, 5))
gs = gridspec.GridSpec(1, 4, width_ratios=[1,1,1, 0.05], wspace=0.1)
axes = [fig.add_subplot(gs[0, i]) for i in range(3)]
cax = fig.add_subplot(gs[0, 3]) # the narrow colorbar axis
titles = ['Orig', 'Kcut', 'Posi']
data = [ps_bin_orig, ps_bin_kcut, ps_bin_posi]
for i, (ax, ps, title) in enumerate(zip(axes, data, titles)):
# mask extremely small values
ps_plot = ps.T.copy()
ps_plot[ps_plot < 1e-10] = np.nan
mesh = ax.pcolormesh(
kper_bin_orig, kpar_bin_orig, ps_plot,
norm=norm, cmap='plasma', rasterized=True
)
ax.set_title(title, fontsize=22)
ax.set_xlabel(r'$k_\perp$', fontsize=22)
ax.tick_params(labelsize=16)
if i == 0:
ax.set_ylabel(r'$k_\parallel$', fontsize=22)
else:
ax.set_yticks([]) # remove tick marks
ax.set_ylabel('') # remove axis label
cb = fig.colorbar(mesh, cax=cax)
cb.ax.tick_params(labelsize=22)
plt.tight_layout()
plt.show()
In [ ]:
plt.figure(figsize=(5,5))
plt.imshow(cube_posi[256], norm=LogNorm(), cmap='Greys')
plt.xticks([])
plt.yticks([])
Out[ ]:
([], [])
In [ ]:
print(f"Field shape : {cube_posi.shape}")
print(f"Field Mean : {cube_posi.mean():.4f} [mK]")
print(f"Field Max : {cube_posi.max():.4f} [mK]")
print(f"Field Min : {cube_posi.min():.4f} [mK]")
Field shape : (512, 512, 512) Field Mean : 0.1130 [mK] Field Max : 79.8768 [mK] Field Min : 0.0000 [mK]