import json import os import numpy as np from tqdm import tqdm from tensorflow.keras.utils import Sequence from lstm_chem.utils.smiles_tokenizer2 import SmilesTokenizer class DataLoader(Sequence): def __init__(self, config, data_type='train'): self.config = config self.data_type = data_type assert self.data_type in ['train', 'valid', 'finetune'] self.max_len = 0 if self.data_type == 'train': self.smiles = self._load(self.config.data_filename) elif self.data_type == 'finetune': self.smiles = self._load(self.config.finetune_data_filename) else: pass self.st = SmilesTokenizer() self.one_hot_dict = self.st.one_hot_dict self.tokenized_smiles = self._tokenize(self.smiles) if self.data_type in ['train', 'valid']: self.idx = np.arange(len(self.tokenized_smiles)) self.valid_size = int( np.ceil( len(self.tokenized_smiles) * self.config.validation_split)) np.random.seed(self.config.seed) np.random.shuffle(self.idx) def _set_data(self): if self.data_type == 'train': ret = [ self.tokenized_smiles[self.idx[i]] for i in self.idx[self.valid_size:] ] elif self.data_type == 'valid': ret = [ self.tokenized_smiles[self.idx[i]] for i in self.idx[:self.valid_size] ] else: ret = self.tokenized_smiles return ret def _load(self, data_filename): length = self.config.data_length print('loading SMILES...') with open(data_filename) as f: smiles = [s.rstrip() for s in f] if length != 0: smiles = smiles[:length] print('done.') return smiles def _tokenize(self, smiles): assert isinstance(smiles, list) print('tokenizing SMILES...') tokenized_smiles = [self.st.tokenize(smi) for smi in tqdm(smiles)] if self.data_type == 'train': for tokenized_smi in tokenized_smiles: length = len(tokenized_smi) if self.max_len < length: self.max_len = length self.config.train_smi_max_len = self.max_len print('done.') return tokenized_smiles def __len__(self): target_tokenized_smiles = self._set_data() if self.data_type in ['train', 'valid']: ret = int( np.ceil( len(target_tokenized_smiles) / float(self.config.batch_size))) else: ret = int( np.ceil( len(target_tokenized_smiles) / float(self.config.finetune_batch_size))) return ret def __getitem__(self, idx): target_tokenized_smiles = self._set_data() if self.data_type in ['train', 'valid']: data = target_tokenized_smiles[idx * self.config.batch_size:(idx + 1) * self.config.batch_size] else: data = target_tokenized_smiles[idx * self.config.finetune_batch_size: (idx + 1) * self.config.finetune_batch_size] data = self._padding(data) self.X, self.y = [], [] for tp_smi in data: X = [self.one_hot_dict[symbol] for symbol in tp_smi[:-1]] self.X.append(X) y = [self.one_hot_dict[symbol] for symbol in tp_smi[1:]] self.y.append(y) self.X = np.array(self.X, dtype=np.float32) self.y = np.array(self.y, dtype=np.float32) return self.X, self.y def _pad(self, tokenized_smi): return ['G'] + tokenized_smi + ['E'] + [ 'A' for _ in range(self.max_len - len(tokenized_smi)) ] def _padding(self, data): padded_smiles = [self._pad(t_smi) for t_smi in data] return padded_smiles