#!/usr/bin/env python """ Created on Thu Jul 16 15:08:27 2020 @author: Aloma Blanch """ import matplotlib import numpy as np import pandas as pd import matplotlib.pyplot as plt import matplotlib.transforms as mtransforms from math import floor from statistics import mean from scipy.signal import find_peaks def cycle(folder,dt,save_path): pressure = np.loadtxt(folder + '/PHistRCR.dat',skiprows=2) N_ts = pressure.shape[0] T = round(dt*N_ts,3) time = np.linspace(0,T,N_ts) # Find peaks, keep only the maximums peaks, _ = find_peaks(pressure[:,-1]) idx = np.where(pressure[peaks,-1] >= np.mean(pressure)) peaks_loc = peaks[idx] P_peaks = pressure[peaks,-1][pressure[peaks,-1] >= np.mean(pressure)] # Rmove the multiple picks found in the same location idx_del = [] t_pk_loc = time[peaks_loc].tolist() peak_tdiff = [t_pk_loc[n]-t_pk_loc[n-1] for n in range(1,len(t_pk_loc))] for i in range(0,len(peak_tdiff)): if peak_tdiff[i]<= dt*10: idx_del.append(i) t_pk_loc[i] = [] peak_tdiff[i] = [] t_pk_loc[:] = [x for x in t_pk_loc if x] peak_tdiff[:] = [x for x in peak_tdiff if x] P_peaks = np.delete(P_peaks,idx_del) fig, ax = plt.subplots() ax.plot(time,pressure[:,-1]/1333.22,'b') ax.plot(t_pk_loc, P_peaks/1333.22, "or",label='peak location') ax.set(xlabel='time [s]', ylabel='Pressure [mmHg]', title='Pressure at last outlet') ax.spines['right'].set_visible(False) ax.spines['top'].set_visible(False) ax.legend(loc=0) # plt.show() plt.savefig(save_path + '/Pressure_max_n_cycles.pdf') return (floor(mean(peak_tdiff[1:])*1000)/1000,len(t_pk_loc)) def error_plot(folder,t_step,r_criteria,save_path): # Load iterations and residual error histor = folder + '/histor.dat' input = open(histor, 'r') output = open(folder + "/histor_better.dat", 'w') output.writelines(line.strip() +'\n' for line in input) input.close() output.close() error_info = pd.read_csv(folder + "/histor_better.dat", sep=' ', header=None, usecols=(0,1,2)) # Select only last iteration of residual error error=[] for n in range(1,error_info.shape[0]): if (error_info[0][n]>error_info[0][n-1]): error.append(error_info[2][n-1]) time = np.linspace(start = t_step, stop = len(error)*t_step, num = len(error)) # Liniar Scale fig, ax = plt.subplots() ax.plot(time,error) ax.plot(time,r_criteria*np.ones(len(error)),'r') ax.set(xlabel='Time steps', ylabel='Residual error', title='Last nonlinear residual error for each time step') ax.spines['right'].set_visible(False) ax.spines['top'].set_visible(False) # plt.show() plt.savefig(save_path + '/Last_nonlin_res_error.pdf') # Semilog scale fig, ax = plt.subplots() ax.semilogy(time,error) ax.semilogy(time,r_criteria*np.ones(len(error)),'r') ax.set(xlabel='Time steps', ylabel='Residual error', title='Log - Last nonlinear residual error for each time step') ax.spines['right'].set_visible(False) ax.spines['top'].set_visible(False) # plt.show() plt.savefig(save_path + '/Log_Last_nonlin_res_error.pdf') fig.savefig(save_path + '/Log_Last_nonlin_res_error.jpg',dpi=150) def periodicity(project,folder,dt,T_cyc,n_cyc,save_path): pressure = np.loadtxt(folder+'/PHistRCR.dat',skiprows=2,) time = np.linspace(0,T_cyc*n_cyc,round(T_cyc/dt*n_cyc)) peak_P = [] peak_P_pos = [] Nc = round(T_cyc/dt) for i in range(0,n_cyc): peak_P.append(np.amax(pressure[i*Nc:Nc*(i+1),-1])/1333.22) peak_P_pos.append(np.where(pressure[i*Nc:Nc*(i+1),-1] == np.amax(pressure[i*Nc:Nc*(i+1),-1]))[0][0]+Nc*i) peak_Pdiff = [peak_P[n]-peak_P[n-1] for n in range(1,len(peak_P))] peak_Pdiff = list(map(abs, peak_Pdiff)) fig, ax = plt.subplots() ax.plot(time,pressure[:,-1]/1333.22,'b') ax.plot(time[peak_P_pos], peak_P,'ro',label='Cycle pike') ax.set(xlabel='time [s]', ylabel='Pressure [mmHg]', title='Pressure at last outlet') ax.spines['right'].set_visible(False) ax.spines['top'].set_visible(False) ax.legend(loc=0) # plt.show() plt.savefig(save_path + '/periodicity.pdf') fig.savefig(save_path + '/periodicity.jpg',dpi=150) if (peak_Pdiff[-1]<=1): print('The numerical simulation \'{0}\' has achieve periodicity!\nSystolic Blood Pressure (SBP):\nsecond-last cycle = {1:.2f} mmHg,\nlast cycle = {2:.2f} mmHg,\n\u0394mmHg = {3:.2f} mmHg'.format(project,peak_P[-2],peak_P[-1],peak_Pdiff[-1])) txt = ['The numerical simulation \'{0}\' has achieved periodicity.'.format(project), 'Systolic Blood Pressure (SBP):', 'Second-last cycle = {0:.2f} mmHg'.format(peak_P[-2]), 'Last cycle = {0:.2f} mmHg'.format(peak_P[-1]), 'Delta_mmHg = {0:.2f} mmHg'.format(peak_Pdiff[-1])] return txt def pressure(folder,N_ts,T_cyc,dt,n_cyc,save_path): pressure = np.loadtxt(folder+'/PHistRCR.dat',skiprows=2,) Nc = round(T_cyc/dt) time = np.linspace(0,T_cyc,Nc) fig, ax = plt.subplots() SBP = np.empty(pressure.shape[1]) DBP = np.empty(pressure.shape[1]) MBP = np.empty(pressure.shape[1]) for i in range(0,pressure.shape[1]): ax.plot(time,pressure[N_ts-Nc:N_ts,i]/1333.22,label='ROI-'+str(i+2)) SBP[i] = (np.amax(pressure[N_ts-Nc:N_ts,i]/1333.22)) DBP[i] = (np.amin(pressure[N_ts-Nc:N_ts,i]/1333.22)) MBP[i] = (mean(pressure[N_ts-Nc:N_ts,i]/1333.22)) PP = SBP-DBP ax.set(xlabel='time [s]', ylabel='Pressure [mmHg]', title='Pressure at each outlet') ax.spines['right'].set_visible(False) ax.spines['top'].set_visible(False) ax.legend(loc=0) # plt.show() plt.savefig(save_path + '/pressure.pdf') fig.savefig(save_path + '/pressure.jpg',dpi=150) return (DBP,MBP,SBP,PP) def flow(folder,N_ts,T_cyc,dt,n_cyc,save_path): flow = np.loadtxt(folder+'/QHistRCR.dat',skiprows=2,) Nc = round(T_cyc/dt) time = np.linspace(0,T_cyc,Nc) fig, ax = plt.subplots() Q = np.empty(flow.shape[1]) for i in range(0,flow.shape[1]): ax.plot(time,flow[N_ts-Nc:N_ts,i],label='ROI-'+str(i+2)) Q[i] = (mean(flow[N_ts-Nc:N_ts,i])) ax.set(xlabel='time [s]', ylabel='Flow [mL/s]', title='Flow at each outlet') ax.spines['right'].set_visible(False) ax.spines['top'].set_visible(False) ax.legend(loc=0) # plt.show() plt.savefig(save_path + '/flow.pdf') fig.savefig(save_path + '/flow.jpg',dpi=150) return Q def inlet_flow_waveform(project_folder,t_btw_rst,N_ts,dt,T_cyc,n_cyc,save_path): x = np.loadtxt(project_folder+'/ROI-1.flow') t = x[:,0] Q = -x[:,1] Nt_pts = np.linspace(t_btw_rst,N_ts,int(N_ts/t_btw_rst)) t_pts = Nt_pts*dt # Put all the time values on a single cardiac cylce for n in range(len(t_pts)): tmp=divmod(t_pts[n],T_cyc) t_pts[n]=tmp[1] if round(tmp[1],3) == 0: t_pts[n]=T_cyc # Interpolate the flow rate to obtain the location of the point Q_pts = np.interp(t_pts, t, Q) fig, ax = plt.subplots() ax.plot(t, Q, 'r') ax.plot(t_pts, Q_pts, 'ob',label='Time steps saved') trans_offset = mtransforms.offset_copy(ax.transData, fig=fig, x=-0, y=0.15, units='inches') ax.set(xlabel='Time [s]', ylabel='Flow Rate - Q [mL/s]', title='Inlet Flow Rate Waveform') ax.set_ylim([-10, 90]) ax.spines['right'].set_visible(False) ax.spines['top'].set_visible(False) # Adding label to the points time = [] for i in range(0,np.unique(np.round(t_pts,3)).shape[0]): time.append('$t_'+str(i+1)+'$') for x, y, t in zip(t_pts[(-n_cyc-1):], Q_pts[(-n_cyc-1):], time): plt.text(x, y, t, transform=trans_offset, fontsize=12) ax.legend(loc=0) # plt.show() plt.savefig(save_path + '/inlet_waveform.pdf') fig.savefig(save_path + '/inlet_waveform.jpg',dpi=150) print('{0} time steps saved, available to visualize in ParaView.'.format(np.unique(np.round(t_pts,3)).shape[0])) txt = '{0} time steps saved, available to visualize in ParaView.'.format(np.unique(np.round(t_pts,3)).shape[0]) return txt def rcr(project_folder): rcr_lines = [] with open (project_folder + '/rcrt.dat', "rt") as myfile: for myline in myfile: rcr_lines.append(myline) n_out = int((len(rcr_lines)-1)/6) Rc_C_Rd = np.empty([n_out,3]) for i in range(0,n_out): Rc_C_Rd[i][0] = float(rcr_lines[2+i*6][:-1]) Rc_C_Rd[i][1] = float(rcr_lines[3+i*6][:-1]) Rc_C_Rd[i][2] = float(rcr_lines[4+i*6][:-1]) return Rc_C_Rd def barPlot(project_folder,DBP,MBP,SBP,PP,Q_avg,save_path): import csv with open(project_folder+'/aimed.csv', 'r') as f: aimed_all = list(csv.reader(f, delimiter=';')) def plot_bar(CFD,aimed,name,save_path,unit): labels = [] for i in range(0,len(CFD)): labels.append('ROI-'+str(i+2)) x = np.arange(len(labels)) # the label locations width = 0.38 # the width of the bars fig, ax = plt.subplots() rects1 = ax.bar(x - width/2, CFD, width, label='CFD',color='k') rects2 = ax.bar(x + width/2, aimed, width, label='Aimed',color='white', edgecolor='black', hatch='/') ax.set_ylabel(name + unit) ax.set_title(name) ax.set_xticks(x) ax.set_xticklabels(labels) ax.legend() high = max(max(aimed,CFD)) ax.set_ylim(top = 1.7*high) def autolabel(rects): """Attach a text label above each bar in *rects*, displaying its height.""" for rect in rects: height = rect.get_height() ax.annotate('{}'.format(height), xy=(rect.get_x() + rect.get_width() / 2, height), xytext=(0,3), # 3 points vertical offset textcoords="offset points", ha='center', va='bottom', fontsize = 7.5) autolabel(rects1) autolabel(rects2) # Create labels err = [] zip_object = zip(CFD, aimed) for x1_i, x2_i in zip_object: err.append(round((x1_i-x2_i)/x2_i*100,2)) # Text on the top of each barplot for i in range(len(x)): plt.text(x = x[i]- width/2, y = max(CFD[i],aimed[i])+0.15*max(max(CFD,aimed)), s = str(err[i])+'%', fontsize = 8, color='r') fig.tight_layout() # plt.show() plt.savefig(save_path + '/'+name+'_bar.pdf') fig.savefig(save_path + '/'+name+'_bar.jpg',dpi=150) # DBP CFD = [round(float(i),1) for i in DBP] aimed = [round(float(i),1) for i in aimed_all[0]] plot_bar(CFD,aimed,'DBP',save_path,' [mmHg]') # MBP CFD = [round(float(i),1) for i in MBP] aimed = [round(float(i),1) for i in aimed_all[1]] plot_bar(CFD,aimed,'MBP',save_path,' [mmHg]') # SBP CFD = [round(float(i),1) for i in SBP] aimed = [round(float(i),1) for i in aimed_all[2]] plot_bar(CFD,aimed,'SBP',save_path,' [mmHg]') # PP CFD = [round(float(i),1) for i in PP] aimed = [round(float(i),1) for i in aimed_all[3]] plot_bar(CFD,aimed,'PP',save_path,' [mmHg]') # DBP CFD = [round(float(i),1) for i in Q_avg] aimed = [round(float(i),1) for i in aimed_all[4]] plot_bar(CFD,aimed,'Q_avg',save_path,' [mL/s]')