diff --git a/exec.py b/exec.py index e3feb99..49e6d4f 100644 --- a/exec.py +++ b/exec.py @@ -1,240 +1,240 @@ #!/usr/bin/env python # Copyright 2018 Division of Medical Image Computing, German Cancer Research Center (DKFZ). # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """execution script.""" import argparse import os import time import torch import utils.exp_utils as utils from evaluator import Evaluator from predictor import Predictor from plotting import plot_batch_prediction def train(logger): """ perform the training routine for a given fold. saves plots and selected parameters to the experiment dir specified in the configs. """ logger.info('performing training in {}D over fold {} on experiment {} with model {}'.format( cf.dim, cf.fold, cf.exp_dir, cf.model)) net = model.net(cf, logger).cuda() optimizer = torch.optim.Adam(net.parameters(), lr=cf.learning_rate[0], weight_decay=cf.weight_decay) model_selector = utils.ModelSelector(cf, logger) train_evaluator = Evaluator(cf, logger, mode='train') val_evaluator = Evaluator(cf, logger, mode=cf.val_mode) starting_epoch = 1 # prepare monitoring monitor_metrics, TrainingPlot = utils.prepare_monitoring(cf) if cf.resume_to_checkpoint: starting_epoch, monitor_metrics = utils.load_checkpoint(cf.resume_to_checkpoint, net, optimizer) logger.info('resumed to checkpoint {} at epoch {}'.format(cf.resume_to_checkpoint, starting_epoch)) logger.info('loading dataset and initializing batch generators...') batch_gen = data_loader.get_train_generators(cf, logger) for epoch in range(starting_epoch, cf.num_epochs + 1): logger.info('starting training epoch {}'.format(epoch)) for param_group in optimizer.param_groups: param_group['lr'] = cf.learning_rate[epoch - 1] start_time = time.time() net.train() train_results_list = [] for bix in range(cf.num_train_batches): batch = next(batch_gen['train']) tic_fw = time.time() results_dict = net.train_forward(batch) tic_bw = time.time() optimizer.zero_grad() results_dict['torch_loss'].backward() optimizer.step() logger.info('tr. batch {0}/{1} (ep. {2}) fw {3:.3f}s / bw {4:.3f}s / total {5:.3f}s || ' .format(bix + 1, cf.num_train_batches, epoch, tic_bw - tic_fw, time.time() - tic_bw, time.time() - tic_fw) + results_dict['logger_string']) train_results_list.append([results_dict['boxes'], batch['pid']]) monitor_metrics['train']['monitor_values'][epoch].append(results_dict['monitor_values']) _, monitor_metrics['train'] = train_evaluator.evaluate_predictions(train_results_list, monitor_metrics['train']) train_time = time.time() - start_time logger.info('starting validation in mode {}.'.format(cf.val_mode)) with torch.no_grad(): net.eval() if cf.do_validation: val_results_list = [] val_predictor = Predictor(cf, net, logger, mode='val') for _ in range(batch_gen['n_val']): batch = next(batch_gen[cf.val_mode]) if cf.val_mode == 'val_patient': results_dict = val_predictor.predict_patient(batch) elif cf.val_mode == 'val_sampling': results_dict = net.train_forward(batch, is_validation=True) val_results_list.append([results_dict['boxes'], batch['pid']]) monitor_metrics['val']['monitor_values'][epoch].append(results_dict['monitor_values']) _, monitor_metrics['val'] = val_evaluator.evaluate_predictions(val_results_list, monitor_metrics['val']) model_selector.run_model_selection(net, optimizer, monitor_metrics, epoch) # update monitoring and prediction plots TrainingPlot.update_and_save(monitor_metrics, epoch) epoch_time = time.time() - start_time logger.info('trained epoch {}: took {} sec. ({} train / {} val)'.format( epoch, epoch_time, train_time, epoch_time-train_time)) batch = next(batch_gen['val_sampling']) results_dict = net.train_forward(batch, is_validation=True) logger.info('plotting predictions from validation sampling.') plot_batch_prediction(batch, results_dict, cf) def test(logger): """ perform testing for a given fold (or hold out set). save stats in evaluator. """ logger.info('starting testing model of fold {} in exp {}'.format(cf.fold, cf.exp_dir)) net = model.net(cf, logger).cuda() test_predictor = Predictor(cf, net, logger, mode='test') test_evaluator = Evaluator(cf, logger, mode='test') batch_gen = data_loader.get_test_generator(cf, logger) test_results_list = test_predictor.predict_test_set(batch_gen, return_results=True) test_evaluator.evaluate_predictions(test_results_list) test_evaluator.score_test_df() if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('-m', '--mode', type=str, default='train_test', help='one out of: train / test / train_test / analysis / create_exp') parser.add_argument('-f','--folds', nargs='+', type=int, default=None, help='None runs over all folds in CV. otherwise specify list of folds.') parser.add_argument('--exp_dir', type=str, default='/path/to/experiment/directory', help='path to experiment dir. will be created if non existent.') parser.add_argument('--server_env', default=False, action='store_true', help='change IO settings to deploy models on a cluster.') - parser.add_argument('--slurm_job_id', type=str, default=None, help='job scheduler info') + parser.add_argument('--data_dest', type=str, default=None, help="path to final data folder if different from config.") parser.add_argument('--use_stored_settings', default=False, action='store_true', help='load configs from existing exp_dir instead of source dir. always done for testing, ' 'but can be set to true to do the same for training. useful in job scheduler environment, ' 'where source code might change before the job actually runs.') parser.add_argument('--resume_to_checkpoint', type=str, default=None, help='if resuming to checkpoint, the desired fold still needs to be parsed via --folds.') parser.add_argument('--exp_source', type=str, default='experiments/toy_exp', help='specifies, from which source experiment to load configs and data_loader.') parser.add_argument('-d', '--dev', default=False, action='store_true', help="development mode: shorten everything") args = parser.parse_args() folds = args.folds resume_to_checkpoint = args.resume_to_checkpoint if args.mode == 'train' or args.mode == 'train_test': cf = utils.prep_exp(args.exp_source, args.exp_dir, args.server_env, args.use_stored_settings) if args.dev: folds = [0,1] cf.batch_size, cf.num_epochs, cf.min_save_thresh, cf.save_n_models = 3 if cf.dim==2 else 1, 1, 0, 1 cf.num_train_batches, cf.num_val_batches, cf.max_val_patients = 5, 1, 1 cf.test_n_epochs = cf.save_n_models cf.max_test_patients = 1 - cf.slurm_job_id = args.slurm_job_id + cf.data_dest = args.data_dest model = utils.import_module('model', cf.model_path) data_loader = utils.import_module('dl', os.path.join(args.exp_source, 'data_loader.py')) if folds is None: folds = range(cf.n_cv_splits) for fold in folds: cf.fold_dir = os.path.join(cf.exp_dir, 'fold_{}'.format(fold)) cf.fold = fold cf.resume_to_checkpoint = resume_to_checkpoint if not os.path.exists(cf.fold_dir): os.mkdir(cf.fold_dir) logger = utils.get_logger(cf.fold_dir) train(logger) cf.resume_to_checkpoint = None if args.mode == 'train_test': test(logger) for hdlr in logger.handlers: hdlr.close() logger.handlers = [] elif args.mode == 'test': cf = utils.prep_exp(args.exp_source, args.exp_dir, args.server_env, is_training=False, use_stored_settings=True) if args.dev: folds = [0,1] cf.test_n_epochs = 1; cf.max_test_patients = 1 - cf.slurm_job_id = args.slurm_job_id + cf.data_dest = args.data_dest model = utils.import_module('model', cf.model_path) data_loader = utils.import_module('dl', os.path.join(args.exp_source, 'data_loader.py')) if folds is None: folds = range(cf.n_cv_splits) for fold in folds: cf.fold_dir = os.path.join(cf.exp_dir, 'fold_{}'.format(fold)) logger = utils.get_logger(cf.fold_dir) cf.fold = fold test(logger) for hdlr in logger.handlers: hdlr.close() logger.handlers = [] # load raw predictions saved by predictor during testing, run aggregation algorithms and evaluation. elif args.mode == 'analysis': cf = utils.prep_exp(args.exp_source, args.exp_dir, args.server_env, is_training=False, use_stored_settings=True) logger = utils.get_logger(cf.exp_dir) if cf.hold_out_test_set: cf.folds = args.folds predictor = Predictor(cf, net=None, logger=logger, mode='analysis') results_list = predictor.load_saved_predictions(apply_wbc=True) utils.create_csv_output(results_list, cf, logger) else: if folds is None: folds = range(cf.n_cv_splits) for fold in folds: cf.fold_dir = os.path.join(cf.exp_dir, 'fold_{}'.format(fold)) cf.fold = fold predictor = Predictor(cf, net=None, logger=logger, mode='analysis') results_list = predictor.load_saved_predictions(apply_wbc=True) logger.info('starting evaluation...') evaluator = Evaluator(cf, logger, mode='test') evaluator.evaluate_predictions(results_list) evaluator.score_test_df() # create experiment folder and copy scripts without starting job. # useful for cloud deployment where configs might change before job actually runs. elif args.mode == 'create_exp': cf = utils.prep_exp(args.exp_source, args.exp_dir, args.server_env, use_stored_settings=True) logger = utils.get_logger(cf.exp_dir) logger.info('created experiment directory at {}'.format(args.exp_dir)) else: raise RuntimeError('mode specified in args is not implemented...') diff --git a/experiments/lidc_exp/configs.py b/experiments/lidc_exp/configs.py index cef1403..f233453 100644 --- a/experiments/lidc_exp/configs.py +++ b/experiments/lidc_exp/configs.py @@ -1,334 +1,334 @@ #!/usr/bin/env python # Copyright 2018 Division of Medical Image Computing, German Cancer Research Center (DKFZ). # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== import sys import os sys.path.append(os.path.dirname(os.path.realpath(__file__))) import numpy as np from default_configs import DefaultConfigs class configs(DefaultConfigs): def __init__(self, server_env=None): ######################### # Preprocessing # ######################### self.root_dir = '/path/to/raw/data' self.raw_data_dir = '{}/data_nrrd'.format(self.root_dir) self.pp_dir = '{}/pp_norm'.format(self.root_dir) self.target_spacing = (0.7, 0.7, 1.25) ######################### # I/O # ######################### # one out of [2, 3]. dimension the model operates in. self.dim = 2 # one out of ['mrcnn', 'retina_net', 'retina_unet', 'detection_unet', 'ufrcnn', 'detection_unet']. self.model = 'retina_unet' DefaultConfigs.__init__(self, self.model, server_env, self.dim) # int [0 < dataset_size]. select n patients from dataset for prototyping. If None, all data is used. self.select_prototype_subset = None # path to preprocessed data. - self.pp_name = 'lidc_preprocessed_for_G2' + self.pp_name = 'lidc_mdt' self.input_df_name = 'info_df.pickle' - self.pp_data_path = '/mnt/HDD2TB/Documents/data/lidc/{}'.format(self.pp_name) + self.pp_data_path = '/media/gregor/HDD2TB/data/lidc/{}'.format(self.pp_name) self.pp_test_data_path = self.pp_data_path #change if test_data in separate folder. # settings for deployment in cloud. if server_env: # path to preprocessed data. - self.pp_name = 'pp_fg_slices' + self.pp_name = 'lidc_mdt_npz' self.crop_name = 'pp_fg_slices_packed' - self.pp_data_path = '/path/to/preprocessed/data/{}/{}'.format(self.pp_name, self.crop_name) + self.pp_data_path = '/datasets/datasets_ramien/lidc_exp/data/{}'.format(self.pp_name) self.pp_test_data_path = self.pp_data_path self.select_prototype_subset = None ######################### # Data Loader # ######################### # select modalities from preprocessed data self.channels = [0] self.n_channels = len(self.channels) # patch_size to be used for training. pre_crop_size is the patch_size before data augmentation. self.pre_crop_size_2D = [300, 300] self.patch_size_2D = [288, 288] self.pre_crop_size_3D = [156, 156, 96] self.patch_size_3D = [128, 128, 64] self.patch_size = self.patch_size_2D if self.dim == 2 else self.patch_size_3D self.pre_crop_size = self.pre_crop_size_2D if self.dim == 2 else self.pre_crop_size_3D # ratio of free sampled batch elements before class balancing is triggered # (>0 to include "empty"/background patches.) self.batch_sample_slack = 0.2 # set 2D network to operate in 3D images. self.merge_2D_to_3D_preds = True # feed +/- n neighbouring slices into channel dimension. set to None for no context. self.n_3D_context = None if self.n_3D_context is not None and self.dim == 2: self.n_channels *= (self.n_3D_context * 2 + 1) ######################### # Architecture # ######################### self.start_filts = 48 if self.dim == 2 else 18 self.end_filts = self.start_filts * 4 if self.dim == 2 else self.start_filts * 2 self.res_architecture = 'resnet50' # 'resnet101' , 'resnet50' self.norm = None # one of None, 'instance_norm', 'batch_norm' self.weight_decay = 0 # one of 'xavier_uniform', 'xavier_normal', or 'kaiming_normal', None (=default = 'kaiming_uniform') self.weight_init = None ######################### # Schedule / Selection # ######################### self.num_epochs = 100 self.num_train_batches = 200 if self.dim == 2 else 200 self.batch_size = 20 if self.dim == 2 else 8 self.do_validation = True # decide whether to validate on entire patient volumes (like testing) or sampled patches (like training) # the former is morge accurate, while the latter is faster (depending on volume size) self.val_mode = 'val_sampling' # one of 'val_sampling' , 'val_patient' if self.val_mode == 'val_patient': self.max_val_patients = 50 # if 'None' iterates over entire val_set once. if self.val_mode == 'val_sampling': self.num_val_batches = 50 ######################### # Testing / Plotting # ######################### # set the top-n-epochs to be saved for temporal averaging in testing. self.save_n_models = 5 self.test_n_epochs = 5 # set a minimum epoch number for saving in case of instabilities in the first phase of training. self.min_save_thresh = 0 if self.dim == 2 else 0 self.report_score_level = ['patient', 'rois'] # choose list from 'patient', 'rois' self.class_dict = {1: 'benign', 2: 'malignant'} # 0 is background. self.patient_class_of_interest = 2 # patient metrics are only plotted for one class. self.ap_match_ious = [0.1] # list of ious to be evaluated for ap-scoring. self.model_selection_criteria = ['malignant_ap', 'benign_ap'] # criteria to average over for saving epochs. self.min_det_thresh = 0.1 # minimum confidence value to select predictions for evaluation. # threshold for clustering predictions together (wcs = weighted cluster scoring). # needs to be >= the expected overlap of predictions coming from one model (typically NMS threshold). # if too high, preds of the same object are separate clusters. self.wcs_iou = 1e-5 self.plot_prediction_histograms = True self.plot_stat_curves = False ######################### # Data Augmentation # ######################### self.da_kwargs={ 'do_elastic_deform': True, 'alpha':(0., 1500.), 'sigma':(30., 50.), 'do_rotation':True, 'angle_x': (0., 2 * np.pi), 'angle_y': (0., 0), 'angle_z': (0., 0), 'do_scale': True, 'scale':(0.8, 1.1), 'random_crop':False, 'rand_crop_dist': (self.patch_size[0] / 2. - 3, self.patch_size[1] / 2. - 3), 'border_mode_data': 'constant', 'border_cval_data': 0, 'order_data': 1 } if self.dim == 3: self.da_kwargs['do_elastic_deform'] = False self.da_kwargs['angle_x'] = (0, 0.0) self.da_kwargs['angle_y'] = (0, 0.0) #must be 0!! self.da_kwargs['angle_z'] = (0., 2 * np.pi) ######################### # Add model specifics # ######################### {'detection_unet': self.add_det_unet_configs, 'mrcnn': self.add_mrcnn_configs, 'ufrcnn': self.add_mrcnn_configs, 'retina_net': self.add_mrcnn_configs, 'retina_unet': self.add_mrcnn_configs, }[self.model]() def add_det_unet_configs(self): self.learning_rate = [1e-4] * self.num_epochs # aggregation from pixel perdiction to object scores (connected component). One of ['max', 'median'] self.aggregation_operation = 'max' # max number of roi candidates to identify per batch element and class. self.n_roi_candidates = 10 if self.dim == 2 else 30 # loss mode: either weighted cross entropy ('wce'), batch-wise dice loss ('dice), or the sum of both ('dice_wce') self.seg_loss_mode = 'dice_wce' # if <1, false positive predictions in foreground are penalized less. self.fp_dice_weight = 1 if self.dim == 2 else 1 self.wce_weights = [1, 1, 1] self.detection_min_confidence = self.min_det_thresh # if 'True', loss distinguishes all classes, else only foreground vs. background (class agnostic). self.class_specific_seg_flag = True self.num_seg_classes = 3 if self.class_specific_seg_flag else 2 self.head_classes = self.num_seg_classes def add_mrcnn_configs(self): # learning rate is a list with one entry per epoch. self.learning_rate = [1e-4] * self.num_epochs # disable the re-sampling of mask proposals to original size for speed-up. # since evaluation is detection-driven (box-matching) and not instance segmentation-driven (iou-matching), # mask-outputs are optional. self.return_masks_in_val = True self.return_masks_in_test = False # set number of proposal boxes to plot after each epoch. self.n_plot_rpn_props = 5 if self.dim == 2 else 30 # number of classes for head networks: n_foreground_classes + 1 (background) self.head_classes = 3 # seg_classes hier refers to the first stage classifier (RPN) self.num_seg_classes = 2 # foreground vs. background # feature map strides per pyramid level are inferred from architecture. self.backbone_strides = {'xy': [4, 8, 16, 32], 'z': [1, 2, 4, 8]} # anchor scales are chosen according to expected object sizes in data set. Default uses only one anchor scale # per pyramid level. (outer list are pyramid levels (corresponding to BACKBONE_STRIDES), inner list are scales per level.) self.rpn_anchor_scales = {'xy': [[8], [16], [32], [64]], 'z': [[2], [4], [8], [16]]} # choose which pyramid levels to extract features from: P2: 0, P3: 1, P4: 2, P5: 3. self.pyramid_levels = [0, 1, 2, 3] # number of feature maps in rpn. typically lowered in 3D to save gpu-memory. self.n_rpn_features = 512 if self.dim == 2 else 128 # anchor ratios and strides per position in feature maps. self.rpn_anchor_ratios = [0.5, 1, 2] self.rpn_anchor_stride = 1 # Threshold for first stage (RPN) non-maximum suppression (NMS): LOWER == HARDER SELECTION self.rpn_nms_threshold = 0.7 if self.dim == 2 else 0.7 # loss sampling settings. self.rpn_train_anchors_per_image = 6 #per batch element self.train_rois_per_image = 6 #per batch element self.roi_positive_ratio = 0.5 self.anchor_matching_iou = 0.7 # factor of top-k candidates to draw from per negative sample (stochastic-hard-example-mining). # poolsize to draw top-k candidates from will be shem_poolsize * n_negative_samples. self.shem_poolsize = 10 self.pool_size = (7, 7) if self.dim == 2 else (7, 7, 3) self.mask_pool_size = (14, 14) if self.dim == 2 else (14, 14, 5) self.mask_shape = (28, 28) if self.dim == 2 else (28, 28, 10) self.rpn_bbox_std_dev = np.array([0.1, 0.1, 0.1, 0.2, 0.2, 0.2]) self.bbox_std_dev = np.array([0.1, 0.1, 0.1, 0.2, 0.2, 0.2]) self.window = np.array([0, 0, self.patch_size[0], self.patch_size[1], 0, self.patch_size_3D[2]]) self.scale = np.array([self.patch_size[0], self.patch_size[1], self.patch_size[0], self.patch_size[1], self.patch_size_3D[2], self.patch_size_3D[2]]) if self.dim == 2: self.rpn_bbox_std_dev = self.rpn_bbox_std_dev[:4] self.bbox_std_dev = self.bbox_std_dev[:4] self.window = self.window[:4] self.scale = self.scale[:4] # pre-selection in proposal-layer (stage 1) for NMS-speedup. applied per batch element. self.pre_nms_limit = 3000 if self.dim == 2 else 6000 # n_proposals to be selected after NMS per batch element. too high numbers blow up memory if "detect_while_training" is True, # since proposals of the entire batch are forwarded through second stage in as one "batch". self.roi_chunk_size = 2500 if self.dim == 2 else 600 self.post_nms_rois_training = 500 if self.dim == 2 else 75 self.post_nms_rois_inference = 500 # Final selection of detections (refine_detections) self.model_max_instances_per_batch_element = 10 if self.dim == 2 else 30 # per batch element and class. self.detection_nms_threshold = 1e-5 # needs to be > 0, otherwise all predictions are one cluster. self.model_min_confidence = 0.1 if self.dim == 2: self.backbone_shapes = np.array( [[int(np.ceil(self.patch_size[0] / stride)), int(np.ceil(self.patch_size[1] / stride))] for stride in self.backbone_strides['xy']]) else: self.backbone_shapes = np.array( [[int(np.ceil(self.patch_size[0] / stride)), int(np.ceil(self.patch_size[1] / stride)), int(np.ceil(self.patch_size[2] / stride_z))] for stride, stride_z in zip(self.backbone_strides['xy'], self.backbone_strides['z'] )]) if self.model == 'ufrcnn': self.operate_stride1 = True self.class_specific_seg_flag = True self.num_seg_classes = 3 if self.class_specific_seg_flag else 2 self.frcnn_mode = True if self.model == 'retina_net' or self.model == 'retina_unet' or self.model == 'prob_detector': # implement extra anchor-scales according to retina-net publication. self.rpn_anchor_scales['xy'] = [[ii[0], ii[0] * (2 ** (1 / 3)), ii[0] * (2 ** (2 / 3))] for ii in self.rpn_anchor_scales['xy']] self.rpn_anchor_scales['z'] = [[ii[0], ii[0] * (2 ** (1 / 3)), ii[0] * (2 ** (2 / 3))] for ii in self.rpn_anchor_scales['z']] self.n_anchors_per_pos = len(self.rpn_anchor_ratios) * 3 self.n_rpn_features = 256 if self.dim == 2 else 64 # pre-selection of detections for NMS-speedup. per entire batch. self.pre_nms_limit = 10000 if self.dim == 2 else 50000 # anchor matching iou is lower than in Mask R-CNN according to https://arxiv.org/abs/1708.02002 self.anchor_matching_iou = 0.5 # if 'True', seg loss distinguishes all classes, else only foreground vs. background (class agnostic). self.num_seg_classes = 3 if self.class_specific_seg_flag else 2 if self.model == 'retina_unet': self.operate_stride1 = True \ No newline at end of file diff --git a/experiments/lidc_exp/data_loader.py b/experiments/lidc_exp/data_loader.py index 2e64f34..9edb526 100644 --- a/experiments/lidc_exp/data_loader.py +++ b/experiments/lidc_exp/data_loader.py @@ -1,461 +1,461 @@ #!/usr/bin/env python # Copyright 2018 Division of Medical Image Computing, German Cancer Research Center (DKFZ). # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== ''' Example Data Loader for the LIDC data set. This dataloader expects preprocessed data in .npy or .npz files per patient and a pandas dataframe in the same directory containing the meta-info e.g. file paths, labels, foregound slice-ids. ''' import numpy as np import os from collections import OrderedDict import pandas as pd import pickle import time import subprocess import utils.dataloader_utils as dutils # batch generator tools from https://github.com/MIC-DKFZ/batchgenerators from batchgenerators.dataloading.data_loader import SlimDataLoaderBase from batchgenerators.transforms.spatial_transforms import MirrorTransform as Mirror from batchgenerators.transforms.abstract_transforms import Compose from batchgenerators.dataloading.multi_threaded_augmenter import MultiThreadedAugmenter from batchgenerators.dataloading import SingleThreadedAugmenter from batchgenerators.transforms.spatial_transforms import SpatialTransform from batchgenerators.transforms.crop_and_pad_transforms import CenterCropTransform from batchgenerators.transforms.utility_transforms import ConvertSegToBoundingBoxCoordinates def get_train_generators(cf, logger): """ wrapper function for creating the training batch generator pipeline. returns the train/val generators. selects patients according to cv folds (generated by first run/fold of experiment): splits the data into n-folds, where 1 split is used for val, 1 split for testing and the rest for training. (inner loop test set) If cf.hold_out_test_set is True, adds the test split to the training data. """ all_data = load_dataset(cf, logger) all_pids_list = np.unique([v['pid'] for (k, v) in all_data.items()]) if not cf.created_fold_id_pickle: fg = dutils.fold_generator(seed=cf.seed, n_splits=cf.n_cv_splits, len_data=len(all_pids_list)).get_fold_names() with open(os.path.join(cf.exp_dir, 'fold_ids.pickle'), 'wb') as handle: pickle.dump(fg, handle) cf.created_fold_id_pickle = True else: with open(os.path.join(cf.exp_dir, 'fold_ids.pickle'), 'rb') as handle: fg = pickle.load(handle) train_ix, val_ix, test_ix, _ = fg[cf.fold] train_pids = [all_pids_list[ix] for ix in train_ix] val_pids = [all_pids_list[ix] for ix in val_ix] if cf.hold_out_test_set: train_pids += [all_pids_list[ix] for ix in test_ix] train_data = {k: v for (k, v) in all_data.items() if any(p == v['pid'] for p in train_pids)} val_data = {k: v for (k, v) in all_data.items() if any(p == v['pid'] for p in val_pids)} logger.info("data set loaded with: {} train / {} val / {} test patients".format(len(train_ix), len(val_ix), len(test_ix))) batch_gen = {} batch_gen['train'] = create_data_gen_pipeline(train_data, cf=cf, is_training=True) batch_gen['val_sampling'] = create_data_gen_pipeline(val_data, cf=cf, is_training=False) if cf.val_mode == 'val_patient': batch_gen['val_patient'] = PatientBatchIterator(val_data, cf=cf) batch_gen['n_val'] = len(val_ix) if cf.max_val_patients is None else min(len(val_ix), cf.max_val_patients) else: batch_gen['n_val'] = cf.num_val_batches return batch_gen def get_test_generator(cf, logger): """ wrapper function for creating the test batch generator pipeline. selects patients according to cv folds (generated by first run/fold of experiment) If cf.hold_out_test_set is True, gets the data from an external folder instead. """ if cf.hold_out_test_set: pp_name = cf.pp_test_name test_ix = None else: pp_name = None with open(os.path.join(cf.exp_dir, 'fold_ids.pickle'), 'rb') as handle: fold_list = pickle.load(handle) _, _, test_ix, _ = fold_list[cf.fold] # warnings.warn('WARNING: using validation set for testing!!!') test_data = load_dataset(cf, logger, test_ix, pp_data_path=cf.pp_test_data_path, pp_name=pp_name) logger.info("data set loaded with: {} test patients".format(len(test_ix))) batch_gen = {} batch_gen['test'] = PatientBatchIterator(test_data, cf=cf) batch_gen['n_test'] = len(test_ix) if cf.max_test_patients=="all" else \ min(cf.max_test_patients, len(test_ix)) return batch_gen def load_dataset(cf, logger, subset_ixs=None, pp_data_path=None, pp_name=None): """ loads the dataset. if deployed in cloud also copies and unpacks the data to the working directory. :param subset_ixs: subset indices to be loaded from the dataset. used e.g. for testing to only load the test folds. :return: data: dictionary with one entry per patient (in this case per patient-breast, since they are treated as individual images for training) each entry is a dictionary containing respective meta-info as well as paths to the preprocessed numpy arrays to be loaded during batch-generation """ if pp_data_path is None: pp_data_path = cf.pp_data_path if pp_name is None: pp_name = cf.pp_name if cf.server_env: copy_data = True - target_dir = os.path.join('/ssd', cf.slurm_job_id, pp_name, cf.crop_name) + target_dir = os.path.join(cf.data_dest, pp_name) if not os.path.exists(target_dir): cf.data_source_dir = pp_data_path os.makedirs(target_dir) subprocess.call('rsync -av {} {}'.format( os.path.join(cf.data_source_dir, cf.input_df_name), os.path.join(target_dir, cf.input_df_name)), shell=True) logger.info('created target dir and info df at {}'.format(os.path.join(target_dir, cf.input_df_name))) elif subset_ixs is None: copy_data = False pp_data_path = target_dir p_df = pd.read_pickle(os.path.join(pp_data_path, cf.input_df_name)) if cf.select_prototype_subset is not None: prototype_pids = p_df.pid.tolist()[:cf.select_prototype_subset] p_df = p_df[p_df.pid.isin(prototype_pids)] logger.warning('WARNING: using prototyping data subset!!!') if subset_ixs is not None: subset_pids = [np.unique(p_df.pid.tolist())[ix] for ix in subset_ixs] p_df = p_df[p_df.pid.isin(subset_pids)] logger.info('subset: selected {} instances from df'.format(len(p_df))) if cf.server_env: if copy_data: copy_and_unpack_data(logger, p_df.pid.tolist(), cf.fold_dir, cf.data_source_dir, target_dir) class_targets = p_df['class_target'].tolist() pids = p_df.pid.tolist() imgs = [os.path.join(pp_data_path, '{}_img.npy'.format(pid)) for pid in pids] segs = [os.path.join(pp_data_path,'{}_rois.npy'.format(pid)) for pid in pids] data = OrderedDict() for ix, pid in enumerate(pids): # for the experiment conducted here, malignancy scores are binarized: (benign: 1-2, malignant: 3-5) targets = [1 if ii >= 3 else 0 for ii in class_targets[ix]] data[pid] = {'data': imgs[ix], 'seg': segs[ix], 'pid': pid, 'class_target': targets} data[pid]['fg_slices'] = p_df['fg_slices'].tolist()[ix] return data def create_data_gen_pipeline(patient_data, cf, is_training=True): """ create mutli-threaded train/val/test batch generation and augmentation pipeline. :param patient_data: dictionary containing one dictionary per patient in the train/test subset. :param is_training: (optional) whether to perform data augmentation (training) or not (validation/testing) :return: multithreaded_generator """ # create instance of batch generator as first element in pipeline. data_gen = BatchGenerator(patient_data, batch_size=cf.batch_size, cf=cf) # add transformations to pipeline. my_transforms = [] if is_training: mirror_transform = Mirror(axes=np.arange(cf.dim)) my_transforms.append(mirror_transform) spatial_transform = SpatialTransform(patch_size=cf.patch_size[:cf.dim], patch_center_dist_from_border=cf.da_kwargs['rand_crop_dist'], do_elastic_deform=cf.da_kwargs['do_elastic_deform'], alpha=cf.da_kwargs['alpha'], sigma=cf.da_kwargs['sigma'], do_rotation=cf.da_kwargs['do_rotation'], angle_x=cf.da_kwargs['angle_x'], angle_y=cf.da_kwargs['angle_y'], angle_z=cf.da_kwargs['angle_z'], do_scale=cf.da_kwargs['do_scale'], scale=cf.da_kwargs['scale'], random_crop=cf.da_kwargs['random_crop']) my_transforms.append(spatial_transform) else: my_transforms.append(CenterCropTransform(crop_size=cf.patch_size[:cf.dim])) my_transforms.append(ConvertSegToBoundingBoxCoordinates(cf.dim, get_rois_from_seg_flag=False, class_specific_seg_flag=cf.class_specific_seg_flag)) all_transforms = Compose(my_transforms) # multithreaded_generator = SingleThreadedAugmenter(data_gen, all_transforms) multithreaded_generator = MultiThreadedAugmenter(data_gen, all_transforms, num_processes=cf.n_workers, seeds=range(cf.n_workers)) return multithreaded_generator class BatchGenerator(SlimDataLoaderBase): """ creates the training/validation batch generator. Samples n_batch_size patients (draws a slice from each patient if 2D) from the data set while maintaining foreground-class balance. Returned patches are cropped/padded to pre_crop_size. Actual patch_size is obtained after data augmentation. :param data: data dictionary as provided by 'load_dataset'. :param batch_size: number of patients to sample for the batch :return dictionary containing the batch data (b, c, x, y, (z)) / seg (b, 1, x, y, (z)) / pids / class_target """ def __init__(self, data, batch_size, cf): super(BatchGenerator, self).__init__(data, batch_size) self.cf = cf self.crop_margin = np.array(self.cf.patch_size)/8. #min distance of ROI center to edge of cropped_patch. self.p_fg = 0.5 def generate_train_batch(self): batch_data, batch_segs, batch_pids, batch_targets, batch_patient_labels = [], [], [], [], [] class_targets_list = [v['class_target'] for (k, v) in self._data.items()] if self.cf.head_classes > 2: # samples patients towards equilibrium of foreground classes on a roi-level (after randomly sampling the ratio "batch_sample_slack). batch_ixs = dutils.get_class_balanced_patients( class_targets_list, self.batch_size, self.cf.head_classes - 1, slack_factor=self.cf.batch_sample_slack) else: batch_ixs = np.random.choice(len(class_targets_list), self.batch_size) patients = list(self._data.items()) for b in batch_ixs: patient = patients[b][1] data = np.transpose(np.load(patient['data'], mmap_mode='r'), axes=(1, 2, 0))[np.newaxis] # (c, y, x, z) seg = np.transpose(np.load(patient['seg'], mmap_mode='r'), axes=(1, 2, 0)) batch_pids.append(patient['pid']) batch_targets.append(patient['class_target']) if self.cf.dim == 2: # draw random slice from patient while oversampling slices containing foreground objects with p_fg. if len(patient['fg_slices']) > 0: fg_prob = self.p_fg / len(patient['fg_slices']) bg_prob = (1 - self.p_fg) / (data.shape[3] - len(patient['fg_slices'])) slices_prob = [fg_prob if ix in patient['fg_slices'] else bg_prob for ix in range(data.shape[3])] slice_id = np.random.choice(data.shape[3], p=slices_prob) else: slice_id = np.random.choice(data.shape[3]) # if set to not None, add neighbouring slices to each selected slice in channel dimension. if self.cf.n_3D_context is not None: padded_data = dutils.pad_nd_image(data[0], [(data.shape[-1] + (self.cf.n_3D_context*2))], mode='constant') padded_slice_id = slice_id + self.cf.n_3D_context data = (np.concatenate([padded_data[..., ii][np.newaxis] for ii in range( padded_slice_id - self.cf.n_3D_context, padded_slice_id + self.cf.n_3D_context + 1)], axis=0)) else: data = data[..., slice_id] seg = seg[..., slice_id] # pad data if smaller than pre_crop_size. if np.any([data.shape[dim + 1] < ps for dim, ps in enumerate(self.cf.pre_crop_size)]): new_shape = [np.max([data.shape[dim + 1], ps]) for dim, ps in enumerate(self.cf.pre_crop_size)] data = dutils.pad_nd_image(data, new_shape, mode='constant') seg = dutils.pad_nd_image(seg, new_shape, mode='constant') # crop patches of size pre_crop_size, while sampling patches containing foreground with p_fg. crop_dims = [dim for dim, ps in enumerate(self.cf.pre_crop_size) if data.shape[dim + 1] > ps] if len(crop_dims) > 0: fg_prob_sample = np.random.rand(1) # with p_fg: sample random pixel from random ROI and shift center by random value. if fg_prob_sample < self.p_fg and np.sum(seg) > 0: seg_ixs = np.argwhere(seg == np.random.choice(np.unique(seg)[1:], 1)) roi_anchor_pixel = seg_ixs[np.random.choice(seg_ixs.shape[0], 1)][0] assert seg[tuple(roi_anchor_pixel)] > 0 # sample the patch center coords. constrained by edges of images - pre_crop_size /2. And by # distance to the desired ROI < patch_size /2. # (here final patch size to account for center_crop after data augmentation). sample_seg_center = {} for ii in crop_dims: low = np.max((self.cf.pre_crop_size[ii]//2, roi_anchor_pixel[ii] - (self.cf.patch_size[ii]//2 - self.crop_margin[ii]))) high = np.min((data.shape[ii + 1] - self.cf.pre_crop_size[ii]//2, roi_anchor_pixel[ii] + (self.cf.patch_size[ii]//2 - self.crop_margin[ii]))) # happens if lesion on the edge of the image. dont care about roi anymore, # just make sure pre-crop is inside image. if low >= high: low = data.shape[ii + 1] // 2 - (data.shape[ii + 1] // 2 - self.cf.pre_crop_size[ii] // 2) high = data.shape[ii + 1] // 2 + (data.shape[ii + 1] // 2 - self.cf.pre_crop_size[ii] // 2) sample_seg_center[ii] = np.random.randint(low=low, high=high) else: # not guaranteed to be empty. probability of emptiness depends on the data. sample_seg_center = {ii: np.random.randint(low=self.cf.pre_crop_size[ii]//2, high=data.shape[ii + 1] - self.cf.pre_crop_size[ii]//2) for ii in crop_dims} for ii in crop_dims: min_crop = int(sample_seg_center[ii] - self.cf.pre_crop_size[ii] // 2) max_crop = int(sample_seg_center[ii] + self.cf.pre_crop_size[ii] // 2) data = np.take(data, indices=range(min_crop, max_crop), axis=ii + 1) seg = np.take(seg, indices=range(min_crop, max_crop), axis=ii) batch_data.append(data) batch_segs.append(seg[np.newaxis]) data = np.array(batch_data) seg = np.array(batch_segs).astype(np.uint8) class_target = np.array(batch_targets) return {'data': data, 'seg': seg, 'pid': batch_pids, 'class_target': class_target} class PatientBatchIterator(SlimDataLoaderBase): """ creates a test generator that iterates over entire given dataset returning 1 patient per batch. Can be used for monitoring if cf.val_mode = 'patient_val' for a monitoring closer to actualy evaluation (done in 3D), if willing to accept speed-loss during training. :return: out_batch: dictionary containing one patient with batch_size = n_3D_patches in 3D or batch_size = n_2D_patches in 2D . """ def __init__(self, data, cf): #threads in augmenter super(PatientBatchIterator, self).__init__(data, 0) self.cf = cf self.patient_ix = 0 self.dataset_pids = [v['pid'] for (k, v) in data.items()] self.patch_size = cf.patch_size if len(self.patch_size) == 2: self.patch_size = self.patch_size + [1] def generate_train_batch(self): pid = self.dataset_pids[self.patient_ix] patient = self._data[pid] data = np.transpose(np.load(patient['data'], mmap_mode='r'), axes=(1, 2, 0))[np.newaxis] # (c, y, x, z) seg = np.transpose(np.load(patient['seg'], mmap_mode='r'), axes=(1, 2, 0)) batch_class_targets = np.array([patient['class_target']]) # pad data if smaller than patch_size seen during training. if np.any([data.shape[dim + 1] < ps for dim, ps in enumerate(self.patch_size)]): new_shape = [data.shape[0]] + [np.max([data.shape[dim + 1], self.patch_size[dim]]) for dim, ps in enumerate(self.patch_size)] data = dutils.pad_nd_image(data, new_shape) # use 'return_slicer' to crop image back to original shape. seg = dutils.pad_nd_image(seg, new_shape) # get 3D targets for evaluation, even if network operates in 2D. 2D predictions will be merged to 3D in predictor. if self.cf.dim == 3 or self.cf.merge_2D_to_3D_preds: out_data = data[np.newaxis] out_seg = seg[np.newaxis, np.newaxis] out_targets = batch_class_targets batch_3D = {'data': out_data, 'seg': out_seg, 'class_target': out_targets, 'pid': pid} converter = ConvertSegToBoundingBoxCoordinates(dim=3, get_rois_from_seg_flag=False, class_specific_seg_flag=self.cf.class_specific_seg_flag) batch_3D = converter(**batch_3D) batch_3D.update({'patient_bb_target': batch_3D['bb_target'], 'patient_roi_labels': batch_3D['roi_labels'], 'original_img_shape': out_data.shape}) if self.cf.dim == 2: out_data = np.transpose(data, axes=(3, 0, 1, 2)) # (z, c, x, y ) out_seg = np.transpose(seg, axes=(2, 0, 1))[:, np.newaxis] out_targets = np.array(np.repeat(batch_class_targets, out_data.shape[0], axis=0)) # if set to not None, add neighbouring slices to each selected slice in channel dimension. if self.cf.n_3D_context is not None: slice_range = range(self.cf.n_3D_context, out_data.shape[0] + self.cf.n_3D_context) out_data = np.pad(out_data, ((self.cf.n_3D_context, self.cf.n_3D_context), (0, 0), (0, 0), (0, 0)), 'constant', constant_values=0) out_data = np.array( [np.concatenate([out_data[ii] for ii in range( slice_id - self.cf.n_3D_context, slice_id + self.cf.n_3D_context + 1)], axis=0) for slice_id in slice_range]) batch_2D = {'data': out_data, 'seg': out_seg, 'class_target': out_targets, 'pid': pid} converter = ConvertSegToBoundingBoxCoordinates(dim=2, get_rois_from_seg_flag=False, class_specific_seg_flag=self.cf.class_specific_seg_flag) batch_2D = converter(**batch_2D) if self.cf.merge_2D_to_3D_preds: batch_2D.update({'patient_bb_target': batch_3D['patient_bb_target'], 'patient_roi_labels': batch_3D['patient_roi_labels'], 'original_img_shape': out_data.shape}) else: batch_2D.update({'patient_bb_target': batch_2D['bb_target'], 'patient_roi_labels': batch_2D['roi_labels'], 'original_img_shape': out_data.shape}) out_batch = batch_3D if self.cf.dim == 3 else batch_2D patient_batch = out_batch # crop patient-volume to patches of patch_size used during training. stack patches up in batch dimension. # in this case, 2D is treated as a special case of 3D with patch_size[z] = 1. if np.any([data.shape[dim + 1] > self.patch_size[dim] for dim in range(3)]): patch_crop_coords_list = dutils.get_patch_crop_coords(data[0], self.patch_size) new_img_batch, new_seg_batch, new_class_targets_batch = [], [], [] for cix, c in enumerate(patch_crop_coords_list): seg_patch = seg[c[0]:c[1], c[2]: c[3], c[4]:c[5]] new_seg_batch.append(seg_patch) # if set to not None, add neighbouring slices to each selected slice in channel dimension. # correct patch_crop coordinates by added slices of 3D context. if self.cf.dim == 2 and self.cf.n_3D_context is not None: tmp_c_5 = c[5] + (self.cf.n_3D_context * 2) if cix == 0: data = np.pad(data, ((0, 0), (0, 0), (0, 0), (self.cf.n_3D_context, self.cf.n_3D_context)), 'constant', constant_values=0) else: tmp_c_5 = c[5] new_img_batch.append(data[:, c[0]:c[1], c[2]:c[3], c[4]:tmp_c_5]) data = np.array(new_img_batch) # (n_patches, c, x, y, z) seg = np.array(new_seg_batch)[:, np.newaxis] # (n_patches, 1, x, y, z) batch_class_targets = np.repeat(batch_class_targets, len(patch_crop_coords_list), axis=0) if self.cf.dim == 2: if self.cf.n_3D_context is not None: data = np.transpose(data[:, 0], axes=(0, 3, 1, 2)) else: # all patches have z dimension 1 (slices). discard dimension data = data[..., 0] seg = seg[..., 0] patch_batch = {'data': data, 'seg': seg, 'class_target': batch_class_targets, 'pid': pid} patch_batch['patch_crop_coords'] = np.array(patch_crop_coords_list) patch_batch['patient_bb_target'] = patient_batch['patient_bb_target'] patch_batch['patient_roi_labels'] = patient_batch['patient_roi_labels'] patch_batch['original_img_shape'] = patient_batch['original_img_shape'] converter = ConvertSegToBoundingBoxCoordinates(self.cf.dim, get_rois_from_seg_flag=False, class_specific_seg_flag=self.cf.class_specific_seg_flag) patch_batch = converter(**patch_batch) out_batch = patch_batch self.patient_ix += 1 if self.patient_ix == len(self.dataset_pids): self.patient_ix = 0 return out_batch def copy_and_unpack_data(logger, pids, fold_dir, source_dir, target_dir): start_time = time.time() with open(os.path.join(fold_dir, 'file_list.txt'), 'w') as handle: for pid in pids: handle.write('{}_img.npz\n'.format(pid)) handle.write('{}_rois.npz\n'.format(pid)) subprocess.call('rsync -av --files-from {} {} {}'.format(os.path.join(fold_dir, 'file_list.txt'), source_dir, target_dir), shell=True) dutils.unpack_dataset(target_dir) copied_files = os.listdir(target_dir) logger.info("copying and unpacking data set finsihed : {} files in target dir: {}. took {} sec".format( len(copied_files), target_dir, np.round(time.time() - start_time, 0))) diff --git a/experiments/toy_exp/data_loader.py b/experiments/toy_exp/data_loader.py index 3a7062c..7ceec4b 100644 --- a/experiments/toy_exp/data_loader.py +++ b/experiments/toy_exp/data_loader.py @@ -1,305 +1,305 @@ #!/usr/bin/env python # Copyright 2018 Division of Medical Image Computing, German Cancer Research Center (DKFZ). # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== import numpy as np import os from collections import OrderedDict import pandas as pd import pickle import time import subprocess import utils.dataloader_utils as dutils # batch generator tools from https://github.com/MIC-DKFZ/batchgenerators from batchgenerators.dataloading.data_loader import SlimDataLoaderBase from batchgenerators.transforms.spatial_transforms import MirrorTransform as Mirror from batchgenerators.transforms.abstract_transforms import Compose from batchgenerators.dataloading.multi_threaded_augmenter import MultiThreadedAugmenter from batchgenerators.dataloading import SingleThreadedAugmenter from batchgenerators.transforms.spatial_transforms import SpatialTransform from batchgenerators.transforms.crop_and_pad_transforms import CenterCropTransform from batchgenerators.transforms.utility_transforms import ConvertSegToBoundingBoxCoordinates def get_train_generators(cf, logger): """ wrapper function for creating the training batch generator pipeline. returns the train/val generators. selects patients according to cv folds (generated by first run/fold of experiment): splits the data into n-folds, where 1 split is used for val, 1 split for testing and the rest for training. (inner loop test set) If cf.hold_out_test_set is True, adds the test split to the training data. """ all_data = load_dataset(cf, logger) all_pids_list = np.unique([v['pid'] for (k, v) in all_data.items()]) train_pids = all_pids_list[:cf.n_train_data] val_pids = all_pids_list[1000:1500] train_data = {k: v for (k, v) in all_data.items() if any(p == v['pid'] for p in train_pids)} val_data = {k: v for (k, v) in all_data.items() if any(p == v['pid'] for p in val_pids)} logger.info("data set loaded with: {} train / {} val patients".format(len(train_pids), len(val_pids))) batch_gen = {} batch_gen['train'] = create_data_gen_pipeline(train_data, cf=cf, do_aug=False) batch_gen['val_sampling'] = create_data_gen_pipeline(val_data, cf=cf, do_aug=False) if cf.val_mode == 'val_patient': batch_gen['val_patient'] = PatientBatchIterator(val_data, cf=cf) batch_gen['n_val'] = len(val_pids) if cf.max_val_patients is None else min(len(val_pids), cf.max_val_patients) else: batch_gen['n_val'] = cf.num_val_batches return batch_gen def get_test_generator(cf, logger): """ wrapper function for creating the test batch generator pipeline. selects patients according to cv folds (generated by first run/fold of experiment) If cf.hold_out_test_set is True, gets the data from an external folder instead. """ if cf.hold_out_test_set: pp_name = cf.pp_test_name test_ix = None else: pp_name = None with open(os.path.join(cf.exp_dir, 'fold_ids.pickle'), 'rb') as handle: fold_list = pickle.load(handle) _, _, test_ix, _ = fold_list[cf.fold] # warnings.warn('WARNING: using validation set for testing!!!') test_data = load_dataset(cf, logger, test_ix, pp_data_path=cf.pp_test_data_path, pp_name=pp_name) logger.info("data set loaded with: {} test patients from {}".format(len(test_data.keys()), cf.pp_test_data_path)) batch_gen = {} batch_gen['test'] = PatientBatchIterator(test_data, cf=cf) batch_gen['n_test'] = len(test_data.keys()) if cf.max_test_patients=="all" else \ min(cf.max_test_patients, len(test_data.keys())) return batch_gen def load_dataset(cf, logger, subset_ixs=None, pp_data_path=None, pp_name=None): """ loads the dataset. if deployed in cloud also copies and unpacks the data to the working directory. :param subset_ixs: subset indices to be loaded from the dataset. used e.g. for testing to only load the test folds. :return: data: dictionary with one entry per patient (in this case per patient-breast, since they are treated as individual images for training) each entry is a dictionary containing respective meta-info as well as paths to the preprocessed numpy arrays to be loaded during batch-generation """ if pp_data_path is None: pp_data_path = cf.pp_data_path if pp_name is None: pp_name = cf.pp_name if cf.server_env: copy_data = True - target_dir = os.path.join('/ssd', cf.slurm_job_id, pp_name) + target_dir = os.path.join(cf.data_dest, pp_name) if not os.path.exists(target_dir): cf.data_source_dir = pp_data_path os.makedirs(target_dir) subprocess.call('rsync -av {} {}'.format( os.path.join(cf.data_source_dir, cf.input_df_name), os.path.join(target_dir, cf.input_df_name)), shell=True) logger.info('created target dir and info df at {}'.format(os.path.join(target_dir, cf.input_df_name))) elif subset_ixs is None: copy_data = False pp_data_path = target_dir p_df = pd.read_pickle(os.path.join(pp_data_path, cf.input_df_name)) if subset_ixs is not None: subset_pids = [np.unique(p_df.pid.tolist())[ix] for ix in subset_ixs] p_df = p_df[p_df.pid.isin(subset_pids)] logger.info('subset: selected {} instances from df'.format(len(p_df))) if cf.server_env: if copy_data: copy_and_unpack_data(logger, p_df.pid.tolist(), cf.fold_dir, cf.data_source_dir, target_dir) class_targets = p_df['class_id'].tolist() pids = p_df.pid.tolist() imgs = [os.path.join(pp_data_path, '{}.npy'.format(pid)) for pid in pids] segs = [os.path.join(pp_data_path,'{}.npy'.format(pid)) for pid in pids] data = OrderedDict() for ix, pid in enumerate(pids): data[pid] = {'data': imgs[ix], 'seg': segs[ix], 'pid': pid, 'class_target': [class_targets[ix]]} return data def create_data_gen_pipeline(patient_data, cf, do_aug=True): """ create mutli-threaded train/val/test batch generation and augmentation pipeline. :param patient_data: dictionary containing one dictionary per patient in the train/test subset. :param is_training: (optional) whether to perform data augmentation (training) or not (validation/testing) :return: multithreaded_generator """ # create instance of batch generator as first element in pipeline. data_gen = BatchGenerator(patient_data, batch_size=cf.batch_size, cf=cf) # add transformations to pipeline. my_transforms = [] if do_aug: mirror_transform = Mirror(axes=np.arange(2, cf.dim+2, 1)) my_transforms.append(mirror_transform) spatial_transform = SpatialTransform(patch_size=cf.patch_size[:cf.dim], patch_center_dist_from_border=cf.da_kwargs['rand_crop_dist'], do_elastic_deform=cf.da_kwargs['do_elastic_deform'], alpha=cf.da_kwargs['alpha'], sigma=cf.da_kwargs['sigma'], do_rotation=cf.da_kwargs['do_rotation'], angle_x=cf.da_kwargs['angle_x'], angle_y=cf.da_kwargs['angle_y'], angle_z=cf.da_kwargs['angle_z'], do_scale=cf.da_kwargs['do_scale'], scale=cf.da_kwargs['scale'], random_crop=cf.da_kwargs['random_crop']) my_transforms.append(spatial_transform) else: my_transforms.append(CenterCropTransform(crop_size=cf.patch_size[:cf.dim])) my_transforms.append(ConvertSegToBoundingBoxCoordinates(cf.dim, get_rois_from_seg_flag=False, class_specific_seg_flag=cf.class_specific_seg_flag)) all_transforms = Compose(my_transforms) # multithreaded_generator = SingleThreadedAugmenter(data_gen, all_transforms) multithreaded_generator = MultiThreadedAugmenter(data_gen, all_transforms, num_processes=cf.n_workers, seeds=range(cf.n_workers)) return multithreaded_generator class BatchGenerator(SlimDataLoaderBase): """ creates the training/validation batch generator. Samples n_batch_size patients (draws a slice from each patient if 2D) from the data set while maintaining foreground-class balance. Returned patches are cropped/padded to pre_crop_size. Actual patch_size is obtained after data augmentation. :param data: data dictionary as provided by 'load_dataset'. :param batch_size: number of patients to sample for the batch :return dictionary containing the batch data (b, c, x, y, (z)) / seg (b, 1, x, y, (z)) / pids / class_target """ def __init__(self, data, batch_size, cf): super(BatchGenerator, self).__init__(data, batch_size) self.cf = cf def generate_train_batch(self): batch_data, batch_segs, batch_pids, batch_targets = [], [], [], [] class_targets_list = [v['class_target'] for (k, v) in self._data.items()] #samples patients towards equilibrium of foreground classes on a roi-level (after randomly sampling the ratio "batch_sample_slack). batch_ixs = dutils.get_class_balanced_patients( class_targets_list, self.batch_size, self.cf.head_classes - 1, slack_factor=self.cf.batch_sample_slack) patients = list(self._data.items()) for b in batch_ixs: patient = patients[b][1] all_data = np.load(patient['data'], mmap_mode='r') data = all_data[0] seg = all_data[1].astype('uint8') batch_pids.append(patient['pid']) batch_targets.append(patient['class_target']) batch_data.append(data[np.newaxis]) batch_segs.append(seg[np.newaxis]) data = np.array(batch_data) seg = np.array(batch_segs).astype(np.uint8) class_target = np.array(batch_targets) return {'data': data, 'seg': seg, 'pid': batch_pids, 'class_target': class_target} class PatientBatchIterator(SlimDataLoaderBase): """ creates a test generator that iterates over entire given dataset returning 1 patient per batch. Can be used for monitoring if cf.val_mode = 'patient_val' for a monitoring closer to actualy evaluation (done in 3D), if willing to accept speed-loss during training. :return: out_batch: dictionary containing one patient with batch_size = n_3D_patches in 3D or batch_size = n_2D_patches in 2D . """ def __init__(self, data, cf): #threads in augmenter super(PatientBatchIterator, self).__init__(data, 0) self.cf = cf self.patient_ix = 0 self.dataset_pids = [v['pid'] for (k, v) in data.items()] self.patch_size = cf.patch_size if len(self.patch_size) == 2: self.patch_size = self.patch_size + [1] def generate_train_batch(self): pid = self.dataset_pids[self.patient_ix] patient = self._data[pid] all_data = np.load(patient['data'], mmap_mode='r') data = all_data[0] seg = all_data[1].astype('uint8') batch_class_targets = np.array([patient['class_target']]) out_data = data[None, None] out_seg = seg[None, None] print('check patient data loader', out_data.shape, out_seg.shape) batch_2D = {'data': out_data, 'seg': out_seg, 'class_target': batch_class_targets, 'pid': pid} converter = ConvertSegToBoundingBoxCoordinates(dim=2, get_rois_from_seg_flag=False, class_specific_seg_flag=self.cf.class_specific_seg_flag) batch_2D = converter(**batch_2D) batch_2D.update({'patient_bb_target': batch_2D['bb_target'], 'patient_roi_labels': batch_2D['roi_labels'], 'original_img_shape': out_data.shape}) self.patient_ix += 1 if self.patient_ix == len(self.dataset_pids): self.patient_ix = 0 return batch_2D def copy_and_unpack_data(logger, pids, fold_dir, source_dir, target_dir): start_time = time.time() with open(os.path.join(fold_dir, 'file_list.txt'), 'w') as handle: for pid in pids: handle.write('{}.npy\n'.format(pid)) subprocess.call('rsync -av --files-from {} {} {}'.format(os.path.join(fold_dir, 'file_list.txt'), source_dir, target_dir), shell=True) # dutils.unpack_dataset(target_dir) copied_files = os.listdir(target_dir) logger.info("copying and unpacking data set finsihed : {} files in target dir: {}. took {} sec".format( len(copied_files), target_dir, np.round(time.time() - start_time, 0))) if __name__=="__main__": import utils.exp_utils as utils from .configs import Configs total_stime = time.time() cf = Configs() logger = utils.get_logger(0) batch_gen = get_train_generators(cf, logger) train_batch = next(batch_gen["train"]) mins, secs = divmod((time.time() - total_stime), 60) h, mins = divmod(mins, 60) t = "{:d}h:{:02d}m:{:02d}s".format(int(h), int(mins), int(secs)) print("{} total runtime: {}".format(os.path.split(__file__)[1], t)) \ No newline at end of file