from fsl.wrappers import fslstats
from matplotlib import cm
from matplotlib.colors import ListedColormap, LinearSegmentedColormap
from nilearn import image, plotting
from sklearn import preprocessing
from scipy import ndimage, stats
from tqdm.notebook import trange, tqdm
import itertools
import os
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import re
import seaborn as sns
matplotlib.rcParams.update({'figure.max_open_warning': 0})
os.environ['FSLDIR'] = '/usr/local/fsl'
%load_ext line_profiler
# Dataset name
dataset_name = '125_MAESTRO_Praxis-Lang_LIs'
# Subject/project_handedness
subject_conditions_regex='/[0-9][0-9][0-9]_[A-Z][A-Z][A-Z][A-Z]/|[0-9]-[0-9]_[A-Z][A-Z]'
# ROIs folder
roi_dir = 'Horn_MMP_HCP_atlas'
atlas_filename = 'MMP_in_MNI_corr_resampled_2mm_91x109x91.nii.gz'
labels_filename = 'HCPMMP1_on_MNI152_ICBM2009a_nlin_left-right.txt'
n_rois_per_hemisphere = 180
picks = [10, 11, 12, 15, 17, 18, 22, 29, 43, 45, 47, 48, 49, 50, 56,
60, 63, 74, 75, 78, 79, 80, 81, 82, 83, 84, 95, 96, 97, 108,
109, 111, 116, 117, 130, 137, 138, 139, 140, 141, 144, 145,
148, 149, 150, 151, 157, 169, 171, 179]
rois_selection = ['PF', 'PFt', 'PFm', '44', '45', '6r', 'PHA3', 'AIP', 'IPS1', 'IP2', 'LIPv', 'V1']
# For printing additional information set this to `1` or `True`
# However, it may increase time for generating the output.
verbose = False
# Visualize ROIs or not
plots = False
input_dir = os.path.join('input_paths', dataset_name)
base_output_dir = os.path.join('results', dataset_name)
output_dir = os.path.join('results', dataset_name, roi_dir)
# Input files
# There are two *.txt files containing lists of NIfTI files that are inputs for LI analysis.
lists_of_inputs = {'praxis': os.path.join(input_dir, 'praxis-cope_1.txt'),
'language': os.path.join(input_dir, 'language-cope_3.txt')}
atlas_filepath = os.path.join('ROIs', roi_dir, atlas_filename)
labels_filepath = os.path.join('ROIs', roi_dir, labels_filename)
skills = ['praxis', 'language']
measures = ['voxels', 'intensities']
measures_explicit = {'voxels': 'Voxel count', 'intensities': 'Intensities'}
desc_stats = ['mean', 'sd']
measure_and_stats = ['%s_%s' % j for j in list(itertools.product(measures, desc_stats))] + ['missing_data',]
percents = [90, 80, 70, 60, 50, 40]
# Function for creating directories
# `Verbose` is taken from a global variable
def create_dir(directory, verbose=verbose):
try:
os.mkdir(directory)
if verbose:
print('Directory created: %s' % directory)
except:
if verbose:
print('Parent directory doesn\'t exist or subdirectory already exists: %s' % directory)
Automatically generate output directories.
create_dir(base_output_dir)
create_dir(output_dir)
for subdir in ['correlations', 'figures', 'laterality_indices']:
dir_path = os.path.join(output_dir, subdir)
create_dir(dir_path)
for format_dir in ['CSV', 'Excel']:
dir_path = os.path.join(output_dir, 'laterality_indices', format_dir)
create_dir(dir_path)
raw_inputs_dfs = {}
for skill in lists_of_inputs.keys():
if verbose:
print('')
print('--------')
print(skill)
print(lists_of_inputs[skill])
print('---')
raw_inputs_dfs[skill] = pd.read_csv(lists_of_inputs[skill], header=None)
if verbose:
print(raw_inputs_dfs[skill])
print(raw_inputs_dfs[skill][0][0])
Subject codename is, e.g.: 140_CISN
subject_info = {}
subject_filepaths = {}
for skill in lists_of_inputs.keys():
subject_info[skill] = pd.DataFrame()
handedness_codes = {'RH': 'right-handed',
'LH': 'left-handed',
'BH': 'both-handed'}
for input_file in raw_inputs_dfs[skill][0]:
sub_conditions = re.findall(subject_conditions_regex, input_file)
if verbose:
print('Sub conditions: %s' % sub_conditions)
project, handedness = sub_conditions[0].split('_')
if verbose:
print('Project #: %s; Handedness: %s' % (project, handedness))
project = project[0]
handedness = handedness_codes[handedness]
subject_info[skill] = subject_info[skill].append({'Subject_codename' : re.sub('/', '', sub_conditions[1]),
'Project_no' : project,
'Handedness': handedness,
'Input_filepath': input_file} , ignore_index=True)
subject_info[skill] = subject_info[skill].set_index('Subject_codename')
if verbose:
print('')
print('--------')
print('Generic info')
print(skill)
print(subject_info[skill])
# Just paths, and subjects as indices
subject_filepaths[skill] = subject_info[skill]['Input_filepath']
if verbose:
print('')
print('--------')
print('Paths only')
print(skill)
print(subject_filepaths[skill])
lis_output = {}
measures_output = {}
for skill in skills:
lis_output[skill] = {}
measures_output[skill] = {}
for measure_stat in measure_and_stats:
lis_output[skill][measure_stat] = subject_info[skill].copy()
measures_output[skill][measure_stat] = {}
measures_output[skill][measure_stat]['left'] = subject_info[skill].copy()
measures_output[skill][measure_stat]['right'] = subject_info[skill].copy()
Visualize the atlas
plotting.plot_roi(atlas_filepath)
plt.show()
Get information about the shape of the data in the atlas.
atlas_img = image.load_img(atlas_filepath)
atlas_data = atlas_img.get_fdata()
atlas_data.shape
Read labels from a file and preview the list.
labels = pd.read_csv(labels_filepath, index_col=0, sep='\t', header=None)
labels = pd.Series(labels[1], index=labels.index, name='Labels')
# Depending on the input file, there may be indexing, e.g, 1-180 & 200-360.
# Other values are NAs, hence can be removed.
labels = labels[labels != 'na']
labels
Loading mask was prioritized over, e.g., skill type (praxis/language).
In other words, once the mask data was loaded, all LIs were extracted from it.
Detailed procedure:
0. Define percentage thresholds
1. Load Horn's ROIs
2. For each ROI:
A. Get masks: left hemisphere, right hemisphere, reference mask;
B. For each subject:
a. For each skill (praxis, language):
alpha. Check how many zero-value voxels are there (`missing values`)
beta. Get Z-max from reference mask
gamma. Make sure that z-max is not an outlier
delta. For measures (n_voxels, intensities):
* For each threshold:
- Multiply percent per Z-max (get threshold value)
- For each hemisphere:
+ Count voxels above threshold
+ Get mean value (intensity) above the threshold
- Calculate LIs: (m_l - m_r) / (m_l + m_r) * 100
* Average LI for voxel counts for thresholds
* SD of LIs of voxel counts for thresholds
* Average LI for voxel intensities (values) for thresholds
* SD of LIs of voxel intensities (values) for thresholds
3. Save results (For each skill (praxis, language)):
A. For measures (voxels, intensities):
a. For stats (mean, sd):
alpha. Save results
In the cell below, the above procedure was implemented in Python-is-an-executable-pseudocode manner.
class Mask(dict):
def __init__(self, mask_type, atlas_img, index, roi_name, index_contralateral=None):
super().__init__()
self.mask_type = mask_type
self.roi_name = roi_name
self.atlas_img = atlas_img
self.atlas_data = atlas_img.get_fdata()
self.index = index
self.index_contralateral = index_contralateral
self.mask_data = self._get_mask_data()
self.mask_img = self._get_mask_img()
def count_voxels(self, thr_value):
n_voxels = np.sum(self.zstat_data[self.mask_data]>thr_value)
super().__setitem__('voxels', n_voxels)
def get_percent_zero(self):
# Count voxels with 0.0 value within the mask
n_zero_voxels = np.sum(self.zstat_data[self.mask_data]==0)
# Get ratio of missing values
percent_zero = n_zero_voxels/np.sum(self.mask_data)*100
return percent_zero
def get_value(self, thr_value):
# Intensity (values) inside a mask
intensities_inside_mask = self.zstat_data[self.mask_data]
# Above some threshold
above_thr = intensities_inside_mask[intensities_inside_mask>thr_value]
# Check if there are actually any intensities (values) above the threshold.
# If not, np.mean() would return NaN
if len(above_thr) != 0:
# Average value inside mask, above threshold
intensities = np.mean(above_thr)
else:
intensities = 0
super().__setitem__('intensities', intensities)
def plot_roi(self):
# Try to use two masks, else use just one (left xor right)
try:
if index_contralateral is not None:
plotting.plot_roi(self.mask_img, title=('ref_%s' % self.roi_name))
except NameError:
plotting.plot_roi(self.mask_img, title=('#%s %s_%s' % (self.index, self.mask_type, self.roi_name)))
def set_zstat_data(self, zstat_data):
self.zstat_data = zstat_data
def get_zmax(self):
return self.zstat_data[self.mask_data].max()
def get_zmax_outlier_factor(self):
return None
def _get_mask_data(self):
mask_data = np.zeros(self.atlas_img.shape)
# Try to use two masks, else use just one (left xor right)
try:
mask_data[self.atlas_data==self.index] = 1
mask_data[self.atlas_data==self.index_contralateral] = 1
except NameError:
mask_data[self.atlas_data==self.index] = 1
return mask_data.astype('bool')
def _get_mask_img(self):
return image.new_img_like(self.atlas_img, self.mask_data, self.atlas_img.affine)
class Masks(dict):
def __init__(self, atlas_img, index_left_hemi, index_right_hemi, roi_name):
'''
ref_img is reference image for data shape, it IS NOT
`reference` in a sense of the two masks joined
together
'''
super().__init__()
self.m_l = Mask('left', atlas_img, index_left_hemi, roi_name)
self.m_r = Mask('right', atlas_img, index_right_hemi, roi_name)
self.m_ref = Mask('reference', atlas_img, index_left_hemi, roi_name, index_right_hemi)
super().__setitem__('left', self.m_l)
super().__setitem__('right', self.m_r)
super().__setitem__('reference', self.m_ref)
def plot_rois(self):
self.m_l.plot_roi()
self.m_r.plot_roi()
self.m_ref.plot_roi()
plt.show()
def set_zstat_data(self, input_path):
zstat_img = image.load_img(input_path)
zstat_data = zstat_img.get_fdata()
self.m_l.set_zstat_data(zstat_data)
self.m_r.set_zstat_data(zstat_data)
self.m_ref.set_zstat_data(zstat_data)
def calculate_laterality_indices():
t = trange(1, len(labels[:n_rois_per_hemisphere])+1, desc='ROI', leave=True)
# 2. For each ROI:
for index_left_hemi in t:
# 1-180 are left hemisphere ROIs
# 200-380 are right hemisphere ROIs
roi_name = re.sub('_ROI', '', labels[index_left_hemi])
status = 'ROI #%s/%s' % (index_left_hemi, n_rois_per_hemisphere)
t.set_description(status)
# to show immediately the update
t.refresh()
# Write to log
with open('logs/LI_progress.txt', 'w') as writer:
writer.write(status)
index_right_hemi = index_left_hemi + 200
# A. Get masks: left hemisphere, right hemisphere, reference mask;
masks = Masks(atlas_img, index_left_hemi, index_right_hemi, roi_name)
if plots:
masks.plot_rois()
# B. For each subject:
# Get any skill to retrieve subjects
for subject in subject_filepaths[list(subject_filepaths.keys())[0]].index:
if verbose:
print('Subject: %s' % subject)
# a. For each skill (praxis, language):
for skill in lists_of_inputs.keys():
if verbose:
print('Skill: %s' % skill)
# Get input path
input_path = subject_filepaths[skill].loc[subject]
masks.set_zstat_data(input_path)
if verbose:
print('Input path: %s' % input_path)
# alpha. Check how many zero-value voxels are there (`missing values`)
lis_output[skill]['missing_data'].loc[subject, roi_name] = masks['reference'].get_percent_zero()
# beta. Get Z-max from reference mask
z_max = masks['reference'].get_zmax()
if verbose:
print('Z-max: %s' % z_max)
# gamma. Make sure that z-max is not an outlier
# For that calculate Z-max's SD. Later these SD's will be
# plotted on a graph to detect potential outliers.
z_max_sd = masks['reference'].get_zmax_outlier_factor()
# delta. For measures (voxels, intensities):
# Prepare dictionary for the measures
lis = {}
for measure in measures:
if verbose:
print('Measure: %s' % measure)
# Prepare list to append to
lis[measure] = []
thr_results = {'left': [], 'right': []}
# * For each threshold:
for thr_percent in percents:
if verbose:
print('Thr: %s' % thr_percent)
# - Multiply percent per Z-max (get threshold value)
thr_value = z_max * (thr_percent / 100.0)
# - For each hemisphere:
for hemisphere in ('left', 'right'):
# + Count voxels above threshold
masks[hemisphere].count_voxels(thr_value)
# + Get mean value above the threshold
masks[hemisphere].get_value(thr_value)
# - Calculate LIs: (m_l - m_r) / (m_l + m_r) * 100:
# I.e., masks[hemisphere][measure]
m_l = masks['left'][measure]
m_r = masks['right'][measure]
# Save to left-right output
thr_results['left'].append(m_l)
thr_results['right'].append(m_r)
if verbose:
print('m_l: %s' % m_l)
print('m_r: %s' % m_r)
# Prevent division by 0 (that generates NaNs)
if (m_l + m_r) != 0:
li = (m_l - m_r) / (m_l + m_r) * 100
else:
li = 0
lis[measure].append(li)
if verbose:
print('LI: %s' % li)
# Note, has to use specific .loc way to get Frame's view, not copy,
# see: https://stackoverflow.com/a/53954986/8877692
# * Average LI for voxel counts for thresholds
# * Average LI for voxel intensities for thresholds
average = np.mean(lis[measure])
if verbose:
print('Average LI: %s' % average)
lis_output[skill]['%s_mean' % measure].loc[subject, roi_name] = average
# * SD of LIs of voxel counts for thresholds
# * SD of LIs of voxel intensities for thresholds
deviation = np.std(lis[measure])
if verbose:
print('LI deviation: %s' % deviation)
lis_output[skill]['%s_sd' % measure].loc[subject, roi_name] = deviation
# `Raw` laterality indices, for each hemishphere
for hemishphere in ('left', 'right'):
measures_output[skill]['%s_mean' % measure][hemishphere].loc[subject, roi_name] = np.mean(thr_results[hemisphere])
measures_output[skill]['%s_sd' % measure][hemishphere].loc[subject, roi_name] = np.std(thr_results[hemisphere])
# To time-profile the function use:
# %lprun -f calculate_laterality_indices calculate_laterality_indices()
calculate_laterality_indices()
def save_results():
######################
# Laterality indices
excel_output_path = os.path.join(output_dir, 'laterality_indices/Excel/%s-%s.xlsx' % (dataset_name, roi_dir))
with pd.ExcelWriter(excel_output_path) as writer:
# 3. Save results as CSV files (For each skill (praxis, language)):
for skill in skills:
# gamma. For measures (voxels, intensities):
for measure in measures:
# For stats (mean, sd):
for stat in desc_stats:
df_name = '%s_%s_%s' % (skill, measure, stat)
csv_output_path = os.path.join(output_dir, 'laterality_indices/CSV', '%s.csv' % df_name)
df = lis_output[skill]['%s_%s' % (measure, stat)]
# As CSV
# CSV states for comma separated value
df.to_csv(csv_output_path, float_format='%.3f')
# As Excel spreadsheet
df.to_excel(writer, sheet_name=df_name, float_format='%.3f')
if verbose:
print(df.mean().head())
print('============================')
print('Result saved to: %s' % csv_output_path)
print('')
print('++++++++++++++++++++++++++++++++++++++++++++++++++++++++')
print('LIs save to excel file as: %s' % excel_output_path)
#################
# `Raw` measures
excel_output_path = os.path.join(output_dir, 'laterality_indices/Excel/raw_%s-%s.xlsx' % (dataset_name, roi_dir))
with pd.ExcelWriter(excel_output_path) as writer:
for skill in skills:
for measure in measures:
for stat in desc_stats:
for hemishphere in ('left', 'right'):
df_name = '%s_%s_%s_%s' % (skill, measure, stat, hemishphere)
df = measures_output[skill]['%s_%s' % (measure, stat)][hemishphere]
# As Excel spreadsheet
df.to_excel(writer, sheet_name=df_name, float_format='%.3f')
if verbose:
print(df.mean().head())
print('')
print('++++++++++++++++++++++++++++++++++++++++++++++++++++++++')
print('`Raw` measures save to excel file as: %s' % excel_output_path)
# For code time profiling add:
# %lprun -f save_results save_results()
save_results()
# From each DataFrame drop 'Handedness', 'Input_filepath', 'Project_no' columns.
# That will leave only ROIs, thus numeric values (voxel count or intensities).
def drop_non_roi_columns(df):
return df.drop(['Handedness', 'Input_filepath', 'Project_no'], axis=1)
# Output minus metadata columns
dfs_laterality_indices = {}
for measure in measures:
dfs_laterality_indices[measure] = {}
for skill in skills:
# Disregard standard deviations, focus on means
roi_data = drop_non_roi_columns(lis_output[skill]['%s_mean' % measure])
dfs_laterality_indices[measure][skill] = roi_data
dfs_laterality_indices
There is not much sense in generating barplot, as there are too many ROIs. It's better to show histograms to get to know the general trend across all ROIs.
sns.set(color_codes=True)
def plot_histogram(data, skill, color, measure, bins=30):
sns.distplot(data, bins=bins, kde=False, color=color, label='%s' % skill, hist_kws={"range": [-100,100]})
plt.axvline(x=data.mean(), color=color, label='mean %s=%.2f' % (skill, data.mean()),
linewidth=3, linestyle=':')
plt.ylabel('Number of ROIs')
plt.title('LIs distribution (%s)\n%s ROIs' % (measures_explicit[measure], len(data)))
plt.legend()
for measure in measures:
for skill, c in zip(skills, ('b', 'r')):
data = dfs_laterality_indices[measure][skill].mean().rename('Laterality index')
plot_histogram(data, skill, c, measure)
# Save figure
output_path = os.path.join(output_dir, 'figures', 'distributions_%s.pdf' % measure)
plt.savefig(output_path)
plt.show()
Based on ROIs defined in picks
.
if picks is not None:
for measure in measures:
for skill, c in zip(skills, ('b', 'r')):
data = dfs_laterality_indices[measure][skill].mean().rename('Laterality index')[picks]
plot_histogram(data, skill, c, measure, bins=40)
# Save figure
output_path = os.path.join(output_dir, 'figures', 'distributions_%s.pdf' % measure)
plt.savefig(output_path)
plt.show()
for measure in measures:
corr = dfs_laterality_indices[measure]['language'].corrwith(dfs_laterality_indices[measure]['praxis'])
# Important! Make sure which axis is which
# It is controlled later on by, e.g., x.scatter(x=..., y=...)
print('')
print('____________')
print('Correlations')
print(corr.describe())
# Get highest correlation
max_corr_roi = corr.idxmax()
print('---')
print('The highest correlation is in: %s' % max_corr_roi)
max_corr_idx = corr.index.get_loc(max_corr_roi)
print('It\'s numeric index is: %s' % max_corr_idx)
colors = ['lightslategray',] * n_rois_per_hemisphere
colors[max_corr_idx] = 'crimson'
fig = go.Figure(data=[go.Bar(x=corr.index,
y=corr.round(3),
marker_color=colors)],)
fig.update_layout(xaxis=dict(title='ROIs', tickmode='linear', tickfont=dict(size=7), constrain="domain"),
yaxis=dict(title='Correlation',
scaleanchor = 'x',
scaleratio = 1),
title=dict(text='Correlation between praxis and language (%s)' % measures_explicit[measure],
x=0.5))
fig.show()
# Save correlation data
output_path = os.path.join(output_dir, 'correlations', 'correlation-%s-praxis_language.csv' % measure)
corr.to_csv(output_path, index_label='ROI', float_format='%.3f')