TUT: How to Read cosmo-field data, then calculate the Positivization?¶

Tian-Cheng Luan
2025.06.09


Read the temperature from binary file¶

In [ ]:
Ng      = 512    # Field res
boxsize = 1000.  # in unit Mpc/h
nu_axis = 0      # Here, we treat the 0th axis as the line-of-sight direction.
In [ ]:
import os
import numpy as np, gc

_root_dir = "/Raid6/2/luantch/nbody/512/par_plk2018/bao/snap_outputs/dm2HI_new/21cm_output"

# Here we use z=0.989(no rsd) as an example
redshift = 0.989
_file = os.path.join(_root_dir, f"Total_BAO_Z{redshift:.4f}.d21") 
# Read the cube
cube_orig = np.fromfile(_file, dtype=np.float32).reshape(Ng, Ng, Ng) 

# Check the cube
print(f"Field shape : {cube_orig.shape}")
print(f"Field Mean  : {cube_orig.mean():.4f} [mK]")
print(f"Field Max   : {cube_orig.max():.4f} [mK]")
print(f"Field Min   : {cube_orig.min():.4f} [mK]")
Field shape : (512, 512, 512)
Field Mean  : 0.1921 [mK]
Field Max   : 174.5629 [mK]
Field Min   : 0.0000 [mK]

Visualization configuration space for one slice¶

In [ ]:
import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm

plt.figure(figsize=(5,5))
plt.imshow(cube_orig[256], norm=LogNorm(), cmap='Greys')
plt.xticks([])  
plt.yticks([])
Out[ ]:
([], [])
No description has been provided for this image

Calculate the 2d-cylinder power spectrum¶

Include the Fourier utils funcs:¶

In [ ]:
from scipy import stats

def _get_dims(box_dims, ashape):
    '''
    If box dims is a scalar, assume that dimensions are cubic and make a list
    If it's not given, assume it's the default value of the boxsize
    Otherwise, return as it is
    '''
    if not hasattr(box_dims, '__iter__'):
            return [box_dims]*len(ashape)
    return box_dims

def _power_spectrum_nd_r(input_cube, box_dims=None, verbose=False):
    box_dims = _get_dims(box_dims, input_cube.shape)
    if(verbose): print( 'Calculating power spectrum...')
    ft = np.fft.rfftn(input_cube) # here we dont take shift for rfft
    power_spectrum = np.abs(ft)**2
    # scale
    boxvol = np.product(box_dims)
    pixelsize = boxvol/(np.product(input_cube.shape))
    power_spectrum *= pixelsize**2/boxvol
    return ft,power_spectrum

def _get_k_r_3d(input_cube, box_dims):
    '''
    Get the k values for input array(only 3d) with given dimensions using rfft.
    Return k components and magnitudes.
    For internal use.
    '''
    dim = len(input_cube.shape)
    if dim == 3:
        nx, ny, nz = input_cube.shape
        x = np.fft.fftfreq(nx, d=box_dims[0] / nx)
        y = np.fft.fftfreq(ny, d=box_dims[1] / ny)
        z = np.fft.rfftfreq(nz, d=box_dims[2] / nz)  
        kx, ky, kz = np.meshgrid(x, y, z, indexing='ij')
        kx = 2.0 * np.pi * kx 
        ky = 2.0 * np.pi * ky  
        kz = 2.0 * np.pi * kz  
        k = np.sqrt(kx**2 + ky**2 + kz**2)
        return [kx, ky, kz], k
    else:
        print('ERROR: Only support 3d input field')
        exit(1)

def _power_spectrum_2d_r(input_cube, kbins=10, binning='log', box_dims=1000., nu_axis=0, info=False):
    """
    Compute and bin the 2D power spectrum (k⊥ vs. k∥) of a 3D data cube.
    This function takes a 3D array, computes its Fourier transform and power spectrum, 
    then bins the power into perpendicular (k⊥) and parallel (k∥) wavenumber bins.
    """
    if type(kbins) == list:
        binning = None
    elif np.array(kbins).size==1: 
        kbins = [kbins, kbins]
    elif not isinstance(kbins[0], int): 
        binning = None
    box_dims  = _get_dims(box_dims, input_cube.shape)
    ft, power = _power_spectrum_nd_r(input_cube, box_dims)
    k_xyz, k = _get_k_r_3d(input_cube, box_dims)
    xy_axis  = [0, 1, 2]
    xy_axis.remove(nu_axis)
    kz = np.abs(k_xyz[nu_axis])   # in 3d
    kp = np.sqrt(k_xyz[xy_axis[0]]**2 + k_xyz[xy_axis[1]]**2)   # in 3d
    del k_xyz, k, xy_axis
    gc.collect()
    if binning is None:
        kper = np.array(kbins[0])
        kpar = np.array(kbins[1])
    else:
        if binning=='log':
            kper = np.linspace(np.log10(kp[kp!=0].min()), np.log10(kp.max()), kbins[0]+1)
            kpar = np.linspace(np.log10(kz[kz!=0].min()), np.log10(kz.max()), kbins[1]+1)
            kp, kz  = np.log10(kp), np.log10(kz)
        elif binning=='linear':
            kper = np.linspace(kp[kp!=0].min(), kp.max(), kbins[0]+1)
            kpar = np.linspace(kz[kz!=0].min(), kz.max(), kbins[1]+1)
    if info:
        print(f'INFO <pk_func> | kpar max : {kpar.max()}')
        print(f'INFO <pk_func> | kpar min : {kpar.min()}')
        print(f'INFO <pk_func> | kper max : {kper.max()}')
        print(f'INFO <pk_func> | kper min : {kper.min()}')
    ps = stats.binned_statistic_2d(x=kp.flatten(), y= kz.flatten(), values= power.flatten(), statistic='mean', bins=[kper, kpar])
    if binning=='log':
        kper_mid = np.power(10, 0.5*(kper[:-1]+kper[1:]))
        kpar_mid = np.power(10, 0.5*(kpar[:-1]+kpar[1:]))
    else:
        kper_mid = (kper[:-1]+kper[1:])/2.
        kpar_mid = (kpar[:-1]+kpar[1:])/2.
    return kp, kz, ft, power, ps.statistic, kper_mid, kpar_mid 

def wrap_2dfft(cube, boxsize, kbins=50, nu_axis=2, info=False):
    kper_, kpar_, ft_, power_, ps_, kper_bins_, kpar_bins_ = _power_spectrum_2d_r(
        input_cube = cube,
        kbins      = kbins,
        binning    = 'linear',
        box_dims   = boxsize,
        nu_axis    = nu_axis,
        info       = info
        )
    return kper_, kpar_, ft_, power_, ps_, kper_bins_, kpar_bins_
In [ ]:
%%time

kper_orig, kpar_orig, ft_orig, ps_orig, ps_bin_orig, kper_bin_orig, kpar_bin_orig = wrap_2dfft(cube_orig, boxsize, nu_axis=nu_axis)
CPU times: user 8.59 s, sys: 5.5 s, total: 14.1 s
Wall time: 14.1 s

Visualization cylinder power spectrum¶

In [ ]:
plt.figure(figsize=(6,5))

vmin, vmax = 1e0, 1e2

# For some extreme small value, set to white point diectly
ps_mask = ps_bin_orig.T.copy()
_mask = ps_mask <10**(-10)
ps_mask[_mask] = np.nan
plt.pcolormesh(kper_bin_orig, kpar_bin_orig, ps_bin_orig.T, cmap='plasma', norm=LogNorm(vmin=vmin,vmax=vmax), rasterized=True)
cbar  = plt.colorbar(norm=LogNorm())
label = cbar.ax.yaxis.label
label.set_size(22)
cbar.ax.tick_params(labelsize=16)  
plt.xlabel(r'$k_{\perp}$', fontsize=22)
plt.ylabel(r'$k_{\parallel}$', fontsize=22)
plt.title("Orig", fontsize=22)
Out[ ]:
Text(0.5, 1.0, 'Orig')
No description has been provided for this image

Remove Wedge then do Positivization¶

Include Wedge&Posi utils funcs:¶

In [ ]:
from scipy import integrate

def _k_Wedge(z, kp, theta_FOV=2.):
    # Use Planck 2018 for an example
    omega_m   = 0.3158
    omega_l   = 0.6842
    E_z       = np.sqrt(omega_m * (1+z)**3 + omega_l)
    inq       = integrate.quad(lambda x:1/np.sqrt(omega_m * (1+x)**3 + omega_l),0,z) 
    return kp * E_z / (1. + z) * inq[0] * np.sin(theta_FOV) # Eq 166, Liu & Richard (2020)

def gen_mask_cube_r(ft, kper, kpar, kmm, redshift, mask_value=0., do_wedge=True, theta_FOV=2.):
    # build masks
    if do_wedge:
        wedge_lim = _k_Wedge(z=redshift, kp=kper, theta_FOV=theta_FOV)
        cond_L = (kpar <= wedge_lim) | (kpar <= kmm)
    else:
        cond_L = (kpar <= kmm)
    cond_S = ~cond_L
    # apply masks directly (unshifted)
    mask_ft_L = ft.copy()
    mask_ft_S = ft.copy()
    mask_ft_L[cond_S] = mask_value
    mask_ft_S[cond_L] = mask_value
    # inverse rFFT
    mask_cube_L = np.fft.irfftn(mask_ft_L)
    mask_cube_S = np.fft.irfftn(mask_ft_S)
    return mask_cube_L, mask_ft_L, mask_cube_S, mask_ft_S

Calculate the wedge-removed cube¶

In [ ]:
%%time

# We use kmm=0.5 h/Mpc for an example
kmm = 0.5
_, _, cube_kcut, _ = gen_mask_cube_r(ft_orig, kper_orig, kpar_orig, kmm, redshift)
CPU times: user 9.49 s, sys: 5.69 s, total: 15.2 s
Wall time: 15.2 s

Visualization in 2d Fourier¶

In [ ]:
%%time

kper_kcut, kpar_kcut, ft_kcut, ps_kcut, ps_bin_kcut, kper_bin_kcut, kpar_bin_kcut = wrap_2dfft(cube_kcut, boxsize, nu_axis=nu_axis)
CPU times: user 12.3 s, sys: 29.2 s, total: 41.5 s
Wall time: 42.1 s
In [ ]:
plt.figure(figsize=(6,5))

vmin, vmax = 1e0, 1e2

# For some extreme small value, set to white point diectly
ps_mask = ps_bin_kcut.T.copy()
_mask = ps_mask <10**(-10)
ps_mask[_mask] = np.nan
plt.pcolormesh(kper_bin_kcut, kpar_bin_kcut, ps_mask, cmap='plasma', norm=LogNorm(vmin=vmin,vmax=vmax), rasterized=True)
cbar  = plt.colorbar(norm=LogNorm())
label = cbar.ax.yaxis.label
label.set_size(22)
cbar.ax.tick_params(labelsize=16)  
plt.xlabel(r'$k_{\perp}$', fontsize=22)
plt.ylabel(r'$k_{\parallel}$', fontsize=22)
plt.title("kcut", fontsize=22)
Out[ ]:
Text(0.5, 1.0, 'kcut')
No description has been provided for this image

Operate Positivization for cube_kcut¶

In [ ]:
cube_posi = cube_kcut.copy()
cube_posi[cube_posi <= 0.] = 0.
In [ ]:
%%time

kper_posi, kpar_posi, ft_posi, ps_posi, ps_bin_posi, kper_bin_posi, kpar_bin_posi = wrap_2dfft(cube_posi, boxsize, nu_axis=nu_axis)
CPU times: user 9.89 s, sys: 19.2 s, total: 29.1 s
Wall time: 29.4 s

Visualization in 2d fourier¶

In [ ]:
plt.figure(figsize=(6,5))

vmin, vmax = 1e0, 1e2

# For some extreme small value, set to white point diectly
ps_mask = ps_bin_posi.T.copy()
_mask = ps_mask <10**(-10)
ps_mask[_mask] = np.nan
plt.pcolormesh(kper_bin_posi, kpar_bin_posi, ps_mask, cmap='plasma', norm=LogNorm(vmin=vmin,vmax=vmax), rasterized=True)
cbar  = plt.colorbar(norm=LogNorm())
label = cbar.ax.yaxis.label
label.set_size(22)
cbar.ax.tick_params(labelsize=16)  
plt.xlabel(r'$k_{\perp}$', fontsize=22)
plt.ylabel(r'$k_{\parallel}$', fontsize=22)
plt.title("posi", fontsize=22)
Out[ ]:
Text(0.5, 1.0, 'posi')
No description has been provided for this image

Combined visualization¶

In [ ]:
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from matplotlib.colors import LogNorm

vmin, vmax = 1e0, 1e2
norm = LogNorm(vmin=vmin, vmax=vmax)

fig = plt.figure(figsize=(15.5, 5))

gs  = gridspec.GridSpec(1, 4, width_ratios=[1,1,1, 0.05], wspace=0.1)
axes = [fig.add_subplot(gs[0, i]) for i in range(3)]
cax   = fig.add_subplot(gs[0, 3])  # the narrow colorbar axis
titles = ['Orig', 'Kcut', 'Posi']
data   = [ps_bin_orig,  ps_bin_kcut,  ps_bin_posi]
for i, (ax, ps, title) in enumerate(zip(axes, data, titles)):
    # mask extremely small values
    ps_plot = ps.T.copy()
    ps_plot[ps_plot < 1e-10] = np.nan

    mesh = ax.pcolormesh(
        kper_bin_orig, kpar_bin_orig, ps_plot,
        norm=norm, cmap='plasma', rasterized=True
    )
    ax.set_title(title, fontsize=22)
    ax.set_xlabel(r'$k_\perp$', fontsize=22)
    ax.tick_params(labelsize=16)
    if i == 0:
        ax.set_ylabel(r'$k_\parallel$', fontsize=22)
    else:
        ax.set_yticks([])      # remove tick marks
        ax.set_ylabel('')      # remove axis label
cb = fig.colorbar(mesh, cax=cax)
cb.ax.tick_params(labelsize=22)
plt.tight_layout()
plt.show()
No description has been provided for this image
In [ ]:
plt.figure(figsize=(5,5))
plt.imshow(cube_posi[256], norm=LogNorm(), cmap='Greys')
plt.xticks([])  
plt.yticks([])
Out[ ]:
([], [])
No description has been provided for this image
In [ ]:
print(f"Field shape : {cube_posi.shape}")
print(f"Field Mean  : {cube_posi.mean():.4f} [mK]")
print(f"Field Max   : {cube_posi.max():.4f} [mK]")
print(f"Field Min   : {cube_posi.min():.4f} [mK]")
Field shape : (512, 512, 512)
Field Mean  : 0.1130 [mK]
Field Max   : 79.8768 [mK]
Field Min   : 0.0000 [mK]

👆 Simply substitute cube_posi for the output of your subtractwedge function, and pass cube_posi into analyticalMFs (or calculateMFs).¶