How do I apply changes to np.memmap with multiprocessing?

58 Views Asked by At

The current task at hand that I have requires multiple array manipulations that take longer than what is feasible. I am trying to utilize the multiprocessing package to accelerate the process, but I am currently having a problem with mixing np.memmap with multiprocessing. Specifically, I can't seem to be able to write to the memory map or save it afterward.

The current plan is to create a large numpy memmap on the disk, generate data and copy it onto the memmap one by one, then save it to be used later. The problem seems to be coming from the writing or the saving part, as the data generation part works fine. The below is the sample code.

import os
import multiprocessing

import numpy as np

from datetime import date
from tqdm.auto import tqdm

from Micelle_Simulation import Simulated_Micelle


def target(
    id_: int, 
    q_range: np.ndarray, 
    fp: np.memmap, 
    shape: str, 
    r: float, 
    epsilon: float, 
    N_agg: int, 
    rho_beta: float, 
    b: float, 
    N: int, 
    n: int, 
    f_core: float
) -> None:
        
    micelle = Simulated_Micelle(
        shape=shape, 
        r=r, 
        epsilon=epsilon, 
        N_agg=N_agg, 
        rho_beta=rho_beta, 
        b=b, 
        N=N, 
        n=n, 
        f_core=f_core
    )
    
    n_corona = n*N*N_agg
    n_core = int((f_core/(1 - f_core))*n_corona)
    
    params = np.array((
        r, 
        epsilon, 
        micelle.R_g, 
        N_agg, 
        rho_beta, 
        b, 
        N, 
        n, 
        f_core
    ))
    
    I_q = micelle.Debye_Scattering(
        nodes=micelle.scatterers, 
        core_node_num=n_core, 
        q_range=q_range, 
        rho_beta=rho_beta
    )
    
    params_I_q = np.hstack((params, I_q))
        
    fp[id_, :] = params_I_q
    print(params_I_q)
    

def main(*args, **kwargs) -> int:
    
    remainder = 2
    p_count = os.cpu_count() - remainder
    
    processes = []
    
    for i in range(p_count):
        processes.append(None)
    
    sample_num = 2
    
    shape = 'sphere'
    
    cwd = os.getcwd()

    today = str(date.today()).replace("-", "_")
    username = os.getlogin()
    end = ".npy"

    file_name = f"{today}_{username}"
    attempt = 0
    
    while True:
        
        temp_name = f"{file_name}_{attempt}{end}"
        temp_path = os.path.join(cwd, temp_name)
        
        if not os.path.isfile(temp_path):
            path = temp_path
            
            break
        else:
            attempt += 1
    
    fp = np.memmap(path, dtype='float32', mode='w+', shape=(sample_num, 265))
    
    # storage = np.empty((sample_num, 265))
      
    q_log_range = np.arange(-2, 0, np.true_divide(1, 128))
    q_range = np.power(10, q_log_range - 2*np.log10(2))
    
    p_num = 0
    
    print(f'Using {p_count} threads.')
    
    for i in tqdm(range(sample_num)):
        
        r = np.power(2, np.random.uniform(6, 10))
        epsilon = 1
        N_agg = np.random.randint(8, 33)
        rho_beta = np.random.normal(0.1, 0.02)
        b = np.power(2, np.random.uniform(4, 7))
        N = np.random.randint(2, 9)
        n = 6
        f_core = np.random.normal(0.7, 0.05)
        
        args = (i, q_range, fp, shape, r, epsilon, N_agg, rho_beta, b, N, n, f_core)
                        
        while True:
            
            if processes[p_num] is None or not processes[p_num].is_alive():
                
                p = multiprocessing.Process(target=target, args=args)
                p.start()
                processes[p_num] = p
                
                break
            
            else:
                p_num = (p_num + 1)%p_count
    
    print('Loop completed.')
    
    for process in processes:
        
        if process:
            process.join()
        else:
            continue
    
    print("Task completed.")
    
    fp.flush()
    
    print(fp)
                    
    return 0


if __name__ == '__main__':
    main()

Ignore any part of the code that involves Micelle_Simulation. It just creates a (265, ) array.

The issue seems to be coming from the following parts of the code.

    fp[id_, :] = params_I_q
    fp = np.memmap(path, dtype='float32', mode='w+', shape=(sample_num, 265))
        while True:
            
            if processes[p_num] is None or not processes[p_num].is_alive():
                
                p = multiprocessing.Process(target=target, args=args)
                p.start()
                processes[p_num] = p
                
                break
            
            else:
                p_num = (p_num + 1)%p_count

My expectation was that the code will create the data one by one, but with multiple processes, copy it onto the disk (on memmap), and save it for later use. So, if I wanted to create 10000 samples, it would create (10000, 265) memmap, generate (265) array data one by one, and copy it to the memmap.

However, what I actually got was an array of zero, as if the memmap hadn't been touched at all.

There were no errors during the test.

P.S. I know that my code for multiprocessing is not optimal. I have not studied CS or SWE, and this was intended as a short script for data generation. Any improvement or advice will be appreciated.

1

There are 1 best solutions below

0
Yun On

Changed the multiprocessing to multithreading. Seems to work fine for my purpose.

import numpy as np


class Simulated_Micelle:
    
    
    def __init__(
        self, 
        shape: str, 
        r: float, 
        epsilon: float, 
        N_agg: int, 
        rho_beta: float, 
        b: float, 
        N: int, 
        n: int, 
        f_core: float
    ) -> None:
        
        self.shape = shape
        self.r = r
        self.epsilon = epsilon
        self.N_agg = N_agg
        self.rho_beta = rho_beta
        self.b = b
        self.N = N
        self.n = n
        self.f_core = f_core
        
        self.scatterers = self.Generate_Micelle(
            shape=self.shape, 
            r=self.r, 
            epsilon=self.epsilon, 
            N_agg=self.N_agg, 
            rho_beta=self.rho_beta, 
            b=self.b, 
            N=self.N, 
            n=self.n, 
            f_core=self.f_core
        )


    def Spherical_Core(self, r: float, n: int) -> np.ndarray:
        
        # (n) array
        rho = np.cbrt((r**3)*np.random.rand(n))
        phi = np.pi*np.random.rand(n)
        theta = 2*np.pi*np.random.rand(n)
        
        # (n) array
        X = rho*np.sin(phi)*np.cos(theta)
        Y = rho*np.sin(phi)*np.sin(theta)
        Z = rho*np.cos(phi)
        
        # (n, 3) array
        return np.column_stack((X, Y, Z))
    
    
    def Spheroidal_Core(self, r: float, epsilon: float, n: int) -> np.ndarray:
        
        # (n, 3) array
        R = self.Spherical_Core(r=r, n=n)
        # (1, 3) array
        elongation = np.array((1, 1, epsilon))[np.newaxis, :]
        
        return R*elongation
    
    
    def Cylindrical_Core(self, r: float, epsilon: float, n: int) -> np.ndarray:
        
        # (n) array
        rho = np.sqrt((r**2)*np.random.rand(n))
        theta = 2*np.pi*np.random.rand(n)
        Z = r*epsilon*np.random.rand(n)
        
        # (n) array
        X = rho*np.cos(theta)
        Y = rho*np.sin(theta)
        
        # (n, 3) array
        return np.column_stack((X, Y, Z))
    
    
    def Vesicle_Core(self, r: float, epsilon: float, n: int) -> np.ndarray:
        
        t = r*epsilon
        
        # (n) array
        rho = np.cbrt(r**3 - (r - t)**3)*np.random.rand(n) + (r - t)
        phi = np.pi*np.random.rand(n)
        theta = 2*np.pi*np.random.rand(n)
        
        # (n) array
        X = rho*np.sin(phi)*np.cos(theta)
        Y = rho*np.sin(phi)*np.sin(theta)
        Z = rho*np.cos(phi)
        
        # (n, 3) array
        return np.column_stack((X, Y, Z))
    
    
    def Spherical_Roots(self, r: float, N_agg: int, b: float) -> np.ndarray:
        
        phi = np.pi*(np.sqrt(5.0) - 1.0)
        
        # (N_agg) array
        y = np.linspace(1, -1, N_agg)
        # (N_agg) array
        radius = np.sqrt(1 - np.power(y, 2))
        # (N_agg) array
        theta = phi*np.arange(N_agg)
    
        # (N_agg) array
        x = np.cos(theta)*radius
        z = np.sin(theta)*radius
    
        # (N_agg, 3) array
        R_temp = np.column_stack((x, y, z))
    
        # (3) array
        angles = np.random.rand(3)*(2*np.pi)
        sins = np.sin(angles)
        coss = np.cos(angles)
    
        sin_a, sin_b, sin_c = sins
        cos_a, cos_b, cos_c = coss
    
        # (3, 3) array
        rot_mat = [[cos_a*cos_b, cos_a*sin_b*sin_c - sin_a*cos_c, cos_a*sin_b*cos_c + sin_a*sin_c], 
                   [sin_a*cos_b, sin_a*sin_b*sin_c + cos_a*cos_c, sin_a*sin_b*cos_c - cos_a*sin_c], 
                   [-sin_b, cos_b*sin_c, cos_b*cos_c]]
        rot_mat = np.array(rot_mat)
    
        # (N_agg, 3) array
        node_0 = r*np.dot(R_temp, rot_mat.T)
        node_1 = (r + b)*np.dot(R_temp, rot_mat.T)
        
        # (2, N_agg, 3) array
        return np.vstack((node_0, node_1))
    
    
    def Spheroidal_Roots(
        self, 
        r: float, 
        epsilon: float, 
        N_agg: int, 
        b: float
    ) -> np.ndarray:
        
        # (2, N_agg, 3) array
        roots = self.Spherical_Roots(r=r, N_agg=N_agg, b=b)
        # (1, 1, 3) array
        elongation = np.array((1, 1, epsilon))[np.newaxis, np.newaxis, :]
        
        # (2, N_agg, 3) array
        return roots*elongation
    
    
    def Cylindrical_Roots(
        self, 
        r: float, 
        epsilon: float, 
        N_agg: int, 
        b: float
    ) -> np.ndarray:
        
        # (N_agg) array
        Z = np.linspace(0, r*epsilon, N_agg)
        theta = 2*np.pi*np.random.rand(N_agg)
        
        # (N_agg) array
        X = np.cos(theta)
        Y = np.sin(theta)
        
        # (N_agg, 3) array
        R = np.column_stack((X, Y, Z))
        
        # (N_agg, 3) array
        node_0 = R*np.array((r, r, 1))[np.newaxis, :]
        node_1 = R*np.array((r + b, r + b, 1))[np.newaxis, :]
        
        # (2, N_agg, 3) array
        return np.vstack((node_0, node_1))
    
    
    def Disk_Roots(
        self, 
        r: float, 
        epsilon: float, 
        N_agg: int, 
        b: float
    ) -> np.ndarray:
        
        # (N_agg) array
        rho = np.sqrt((r**2)*np.random.rand(N_agg))
        theta = 2*np.pi*np.random.rand(N_agg)
        Z = np.random.choice([-1, 1], size=N_agg)
        
        # (N_agg) array
        X = rho*np.cos(theta)
        Y = rho*np.sin(theta)
        Z_0 = Z*r*epsilon/2
        Z_1 = Z*(r*epsilon/2 + b)
        
        # (N_agg, 3) array
        node_0 = np.column_stack((X, Y, Z_0))
        node_1 = np.column_stack((X, Y, Z_1))
        
        # (2, N_agg, 3) array
        return np.vstack((node_0, node_1))
    
    
    def Vesicle_Roots(
        self, 
        r: float, 
        epsilon: float, 
        N_agg: int, 
        b: float
    ) -> np.ndarray:
        
        t = r*epsilon
        w_in = (r - t)**2
        w_out = r**2
        
        p = np.array((w_in, w_out))/(w_in + w_out)
        
        # (N_agg) array
        R = np.random.choice([-1, 1], size=N_agg, p=p)
        phi = np.pi*np.random.rand(N_agg)
        theta = 2*np.pi*np.random.rand(N_agg)
            
        # (N_agg) array
        R_0 = r - t/2 + t*R/2
        R_1 = r - t/2 + (t/2 + b)*R
        
        # (N_agg) array
        X_0 = R_0*np.sin(phi)*np.cos(theta)
        Y_0 = R_0*np.sin(phi)*np.sin(theta)
        Z_0 = R_0*np.cos(phi)
        
        # (N_agg) array
        X_1 = R_1*np.sin(phi)*np.cos(theta)
        Y_1 = R_1*np.sin(phi)*np.sin(theta)
        Z_1 = R_1*np.cos(phi)
        
        # (N_agg, 3) array
        node_0 = np.column_stack((X_0, Y_0, Z_0))
        node_1 = np.column_stack((X_1, Y_1, Z_1))
    
        # (2, N_agg, 3) array
        return np.vstack((node_0, node_1))
    
    
    def Generate_Nodes(
        self, 
        roots: np.ndarray, 
        core: np.ndarray, 
        r: float, 
        epsilon: float, 
        N_agg: float, 
        b: float, 
        N: int
    ) -> np.ndarray:
        
        core_num = core.shape[0]
        
        sample_num = 16
        
        # (N + 1, N_agg, 3) array
        nodes = np.empty((core_num + (N + 1)*N_agg + 1, 3))
        nodes[:core_num, :] = core
        nodes[core_num:core_num + 2*N_agg, :] = roots
        
        for i in range(1, N):
                    
            # (core_num + (i + 1)*N_agg, 3, 1, 1) array
            previous_nodes = nodes[:core_num + (i + 1)*N_agg, :]
            previous_nodes = previous_nodes[:, :, np.newaxis, np.newaxis]
            
            # (N_agg, 3) array
            start = nodes[core_num + i*N_agg:core_num + (i + 1)*N_agg, :]
                    
            # (N_agg, sample_num) array
            phi = np.pi*np.random.rand(N_agg, sample_num)
            theta = 2*np.pi*np.random.rand(N_agg, sample_num)
            
            # (N_agg, sample_num) array
            Del_Xs = b*np.sin(phi)*np.cos(theta)
            Del_Ys = b*np.sin(phi)*np.sin(theta)
            Del_Zs = b*np.cos(phi)
            
            # (3, N_agg, sample_num) array
            Del_Rs = np.stack((Del_Xs, Del_Ys, Del_Zs), axis=0)
            
            # (3, N_agg, sample_num) array
            temp_locs = np.transpose(start)[:, :, np.newaxis] + Del_Rs
            # (1, 3, N_agg, sample_num) array
            temp_locs = temp_locs[np.newaxis, :]
            
            # (core_num + (i + 1)*N_agg, 3, N_agg, sample_num) array
            Xi_vecs = previous_nodes - temp_locs
            
            # (core_num + (i + 1)*N_agg, N_agg, sample_num) array
            Xi_scas = np.sum(np.power(Xi_vecs, 2), axis=1)
            
            # (N_agg, sample_num) array
            energies = np.sum(1/np.sqrt(Xi_scas), axis=0)
            
            # (1, N_agg, sample_num) array
            weights = np.exp(-energies)[np.newaxis, :]
            
            # (3, N_agg, sample_num) array
            temp_hats = Del_Rs/b
            
            # (3, N_agg) array
            directions = np.sum(temp_hats*weights, axis=2)/np.sum(weights, axis=2)
            
            # (1, N_agg) array
            norms = np.sqrt(np.sum(np.power(directions, 2), axis=0))[np.newaxis, :]
            
            # (N_agg, 3) array
            Deltas = np.transpose(b*directions/norms)
            
            # (N_agg, 3) array
            nodes[core_num + (i + 1)*N_agg:core_num + (i + 2)*N_agg, :] = start + Deltas
        
        # (N + 1, N_agg, 3) array
        return nodes[core_num:-1, :].reshape(N + 1, N_agg, 3)
    
    
    def Generate_Scatterers(self, nodes: np.ndarray, n: int, b: float) -> None:
        
        # nodes: (N + 1, N_agg, 3) array
        node_shape = nodes.shape
        
        # (N, 1, N_agg, 3) array
        start = nodes[:-1, :, :][:, np.newaxis, :]
        
        N = node_shape[0] - 1
        N_agg = node_shape[1]
    
        R_g = np.sqrt(np.power(b, 2)/12)
        
        # (N, N_agg, 3) array
        z_hat = (nodes[1:, :, :] - nodes[:-1, :, :])/b
        # (N, N_agg, 3, 1) array
        z_hat = z_hat[:, :, :, np.newaxis]
        
        z_step = b/n
        
        # (n) array
        z_range = np.arange(n)
        # (N_agg, n) array
        z_range = np.tile(z_range, (N_agg, 1))
        # (N, N_agg, 1, n) array
        z_range = np.tile(z_range, (N, 1, 1))[:, :, np.newaxis, :]
        
        # (N, N_agg, 1, n) array
        zeta = z_step*(1 - np.abs(1 - z_range/(n/2)))
        # (N, N_agg, 1, n) array
        z = (z_range + 1)*z_step + np.random.randn(N, N_agg, 1, n)*zeta/2
        
        # (N, N_agg, 1, n) array
        rho = R_g*(1 - np.abs(1 - z/(b/2)))
        # (N, N_agg, 1, n) array
        r = rho*np.abs(np.random.randn(N, N_agg, 1, n))
    
        # (N, N_agg, 3, n) array
        R = np.random.normal(size=(N, N_agg, 3, n))
        # (N, N_agg, 3, n) array
        R = R - np.sum(R*z_hat, axis=2)[:, :, np.newaxis, :]*z_hat
        # (N, N_agg, 3, n) array
        R_hat = R/np.sqrt(np.sum(np.power(R, 2), axis=2))[:, :, np.newaxis, :]
        
        # (N, N_agg, 3, n) array
        delta = z*z_hat + r*R_hat
        # (N, n, N_agg, 3) array
        delta = delta.transpose(0, 3, 1, 2)
        
        # (N, n, N_agg, 3) array
        scatterers = start + delta
        # (N*n, N_agg, 3) array
        scatterers = scatterers.reshape(N*n, N_agg, 3)
        # (N*n + 1, N_agg, 3) array
        scatterers = np.append(nodes[0, :, :][np.newaxis, :], scatterers, axis=0)
        # ((N*n + 1)*N_agg, 3) array
        scatterers = scatterers.reshape((N*n + 1)*N_agg, 3)
        
        return scatterers
    
    
    def Calculate_R_g(self, nodes: np.ndarray, N: int, N_agg: int) -> float:
        
        # nodes: (N + 1, N_agg, 3) array

        # (N_agg, 3) array
        R_cm_i = np.sum(nodes, axis=0)/(N + 1)
        # (1, N_agg, 3) array
        R_cm_i = R_cm_i[np.newaxis, :]
        
        R_g_sq = np.sum(np.power(R_cm_i - nodes, 2))/((N + 1)*N_agg)
        
        return np.sqrt(R_g_sq)
        
    
    def Debye_Scattering(
        self, 
        nodes: np.ndarray, 
        core_node_num: int, 
        q_range: np.ndarray, 
        rho_beta: float
    ) -> np.ndarray:
        
        # nodes: (scatterer_num, 3) array
        scatterer_num = nodes.shape[0]
        temp_diag = np.diag(np.ones(scatterer_num))
        
        # (scatterer_num) array
        rhos = np.append(np.ones(core_node_num), rho_beta*np.ones(scatterer_num - core_node_num))
        # (scatterer_num, scatterer_num) array
        rhos = rhos[:, np.newaxis]*rhos[np.newaxis, :]
        
        # (scatterer_num, scatterer_num, 3) array
        r_ij = nodes[:, np.newaxis, :] - nodes[np.newaxis, :]
        # (scatterer_num, scatterer_num) array
        r_ij = np.sqrt(np.sum(np.power(r_ij, 2), axis=2)) + temp_diag
        
        # (scatterer_num, scatterer_num) array
        non_kro = 1 - temp_diag
        
        f_q = np.empty(q_range.shape)
        
        for i, q in enumerate(q_range):
            
            qr = q*r_ij
            
            f = non_kro*rhos*np.sin(qr)/qr
            f_q[i] = np.sum(f)
        
        f_q /= np.max(f_q)
        f_q[f_q <= 0] = np.min(np.abs(f_q))
        
        return f_q
    
    
    def Generate_Micelle(
        self, 
        shape: str, 
        r: float, 
        epsilon: float, 
        N_agg: int, 
        rho_beta: float, 
        b: float, 
        N: int, 
        n: int, 
        f_core: float
    ) -> np.ndarray:
        
        n_corona = n*N*N_agg
        n_core = int((f_core/(1 - f_core))*n_corona)
        
        match shape:
            
            case 'sphere':
                
                core_scatterers = self.Spherical_Core(r=r, n=n_core)
                corona_roots = self.Spherical_Roots(r=r, N_agg=N_agg, b=b)
                corona_nodes = self.Generate_Nodes(
                    roots=corona_roots, 
                    core=core_scatterers, 
                    r=r, 
                    epsilon=epsilon, 
                    N_agg=N_agg, 
                    b=b, 
                    N=N
                )
                corona_scatterers = self.Generate_Scatterers(nodes=corona_nodes, n=n, b=b)
                            
            case 'spheroid':
                
                core_scatterers = self.Spheroidal_Core(r=r, epsilon=epsilon, n=n_core)
                corona_roots = self.Spheroidal_Roots(r=r, epsilon=epsilon, N_agg=N_agg, b=b)
                corona_nodes = self.Generate_Nodes(
                    roots=corona_roots, 
                    core=core_scatterers, 
                    r=r, 
                    epsilon=epsilon, 
                    N_agg=N_agg, 
                    b=b, 
                    N=N
                )
                corona_scatterers = self.Generate_Scatterers(nodes=corona_nodes, n=n, b=b)
                            
            case 'cylinder':
                
                core_scatterers = self.Cylindrical_Core(r=r, epsilon=epsilon, n=n_core)
                corona_roots = self.Cylindrical_Roots(r=r, epsilon=epsilon, N_agg=N_agg, b=b)
                corona_nodes = self.Generate_Nodes(
                    roots=corona_roots, 
                    core=core_scatterers, 
                    r=r, 
                    epsilon=epsilon, 
                    N_agg=N_agg, 
                    b=b, 
                    N=N
                )
                corona_scatterers = self.Generate_Scatterers(nodes=corona_nodes, n=n, b=b)
            
            case 'disk':
                
                core_scatterers = self.Cylindrical_Core(r=r, epsilon=epsilon, n=n_core)
                corona_roots = self.Disk_Roots(r=r, epsilon=epsilon, N_agg=N_agg, b=b)
                corona_nodes = self.Generate_Nodes(
                    roots=corona_roots, 
                    core=core_scatterers, 
                    r=r, 
                    epsilon=epsilon, 
                    N_agg=N_agg, 
                    b=b, 
                    N=N
                )
                corona_scatterers = self.Generate_Scatterers(nodes=corona_nodes, n=n, b=b)
                
            case 'vesicle':
                
                core_scatterers = self.Vesicle_Core(r=r, epsilon=epsilon, n=n_core)
                corona_roots = self.Vesicle_Roots(r=r, epsilon=epsilon, N_agg=N_agg, b=b)
                corona_nodes = self.Generate_Nodes(
                    roots=corona_roots, 
                    core=core_scatterers, 
                    r=r, 
                    epsilon=epsilon, 
                    N_agg=N_agg, 
                    b=b, 
                    N=N
                )
                corona_scatterers = self.Generate_Scatterers(nodes=corona_nodes, n=n, b=b)
        
            case _:
                core_scatterers = self.Spherical_Core(r=r, n=n_core)
                corona_roots = self.Spherical_Roots(r=r, N_agg=N_agg, b=b)
                corona_nodes = self.Generate_Nodes(
                    roots=corona_roots, 
                    core=core_scatterers, 
                    r=r, 
                    epsilon=epsilon, 
                    N_agg=N_agg, 
                    b=b, 
                    N=N
                )
                corona_scatterers = self.Generate_Scatterers(nodes=corona_nodes, n=n, b=b)
        
        self.R_g = self.Calculate_R_g(nodes=corona_nodes, N=N, N_agg=N_agg)
        
        # (n_core + n_corona + n*N, 3) array
        scatterers = np.vstack((core_scatterers, corona_scatterers))
    
        return scatterers


def main(*args, **kwargs) -> int:
    
    return 0


if __name__ == '__main__':
    main()