Musical Interrogation III - LSTM
This article is Part 3 in a 3-Part Series.
- Part 1 - Musical Interrogation I - Intro
- Part 2 - Musical Interrogation II - FFN
- Part 3 - Musical Interrogation III - LSTM
This article is the continuation of my series Musical Interrogation. It is recommended that you read the these articles first. This time we use a recurrent neural network, more preceisly an LSTM which I explained a little bit in Introduction. An LSTM is a RNN that couter acts the problem of exploding and vanishing gradients.

The article gives some explanation to the code in the following notebook which can be executed on Google Colab.
Because our model is now able to learn long-time relations, we can use the GridEncoder
, i.e., a piano roll data representation which is exaclty what we do.
Data Preparation
Again we use the data from EsAC.
The specific dataset I utilized is Folksongs from the continent of Europe and for the purpose of this work, I will exclusively use the 1700 pieces found in the ./deutschl/erk
directory.
We assume that 1/16 is the shortest note in our dataset.
The GridEncoder
automatically filters out pieces that do not fulfill this condition.
time_step = 1/16
encoder = GridEncoder(time_step)
enc_songs, invalid_song_indices = encoder.encode_songs(scores)
print(f'there are {len(enc_songs)} valid songs and {len(invalid_song_indices)} songs')
Let us look at an example encoded of a piece:
55 _ _ _ 60 _ _ _ 60 _ _ _ 60 _ _ _ 60 _ _ _ 64 _ _ _ 64 _ _ _ r _ _ _ 62 _ 64 _ 65 ...
As we discussed in the last article, 55 _ _ _
stands for the midinote 55
played for 4 beats where one beat is 1/16 note.
Therefore, this is a 1/4 note.
Likewise, r _ _ _
is a 1/4 rest.
Netx the StringToIntEncoder
converts our alphabet of tokens into positive integers.
string_to_int = StringToIntEncoder(enc_songs)
Next, we use ScoreDataset
to arrange our training data.
It requires our encoded songs, the instance of StringToIntEncoder
and a hyperparameter sequence_len
that configures the length of token sequences our model will be trained on.
The longer the sequence, the longer the training will require because the deeper the recurrent neural network will be.
sequence_len = 64 # this is a hyperparameter!
dataset = ScoreDataset(
enc_songs=enc_songs,
stoi_encoder=string_to_int,
sequence_len=sequence_len)
It is now possible to split our data into training, validation and test set.
train_set, val_set, test_set = torch.utils.data.random_split(dataset, [0.8, 0.1, 0.1])
Model Definition
Firt we define the rest of our hyperparameters:
vocab_size = len(string_to_int) # size of our alphabet
input_dim = vocab_size # can be different
hidden_dim = 128 # can be different
layer_dim = 1 # can be different
output_dim = vocab_size # should not be different
dropout = 0.2 # can be different
criterion = torch.nn.CrossEntropyLoss()
learning_rate = 0.001 # can be different
batch_size = 64 # can be different
n_epochs = 10 # can be different
eval_interval = 100 # can be different
Before explaining every detail, let us look at the model definition first. The following is the model description of our RNN/LSTM. To understand what’s going on, look at the forward method. This sends our data through the network.

The first two lines create the short-term $\mathbf{h}_0$ and long-term memory $\mathbf{c}_0$ and fill them with zeros.
Then an embedding takes place: x = self.embedding(x)
.
This is nothing more than what we did with our simple feedforward net in Part II - FNN: Each element of the input x
is first one-hot encoded and then multiplied by a matrix.
The result: Each event is represented by the row of a matrix (with learnable parameters).
The matrix has vocab_size
rows and input_dim
columns.
Next, we send our transformed input through our LSTM out, (ht, ct) = self.lstm(x, (h0, c0))
.
This basically computes
based on
\[\mathbf{h}_{t-1}, \mathbf{c}_{t-1}\]as indicated in Fig. 2.
We get as many outputs as our sequence is long, i.e., sequence_len
many.
But we are only interested in the last output, which we get by out[:, -1, :]
.
This is a vector with hidden_dim elements
.
We don’t need ht
and ct
.
Then we send the last output through a dropout layer to counteract overfitting.
In the last step, we transform the hidden_dim
-dimensional vector into an output_dim
-dimensional vector, which is equal to vocab_size
.
This vector is interpreted as a probability distribution.
class LSTMModel(torch.nn.Module):
def __init__(self, input_dim, hidden_dim, layer_dim, output_dim, dropout=0.2):
super(LSTMModel, self).__init__()
self.hidden_dim = hidden_dim
self.layer_dim = layer_dim
self.embedding = torch.nn.Embedding(vocab_size, input_dim)
self.lstm = torch.nn.LSTM(input_dim, hidden_dim, layer_dim, batch_first=True)
self.dropout = torch.nn.Dropout(dropout)
self.fc = torch.nn.Linear(hidden_dim, output_dim)
def forward(self, x):
h0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim, device=device)
c0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim, device=device)
# x = B, T, C
x = self.embedding(x)
out, (ht, ct) = self.lstm(x, (h0, c0))
out = self.dropout(out[:, -1, :])
out = self.fc(out)
return out # B, C
Next, we initialize the model:
model = LSTMModel(input_dim, hidden_dim, layer_dim, output_dim, dropout)
model.to(device) # use gpu if possible
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
for i in range(len(list(model.parameters()))):
print(list(model.parameters())[i].shape)
We could play with different hyperparameters.
Increasing hidden_dim
basically increases the complexity of the “memory” of the LSTM.
We surely want to increase n_epochs
to increase number of times the LSTM “sees” all training data.
We can visualize the LSTM by utilizing the draw_graph
function from the torchview
package.
# (batch_size, sequence_len)
X_vis, y_vis = train_set[0:batch_size]
print(f'shape of X_vis: {X_vis.shape}')
print(f'shape of y_vis: {y_vis.shape}')
print(f'number of different symbols {vocab_size}')
X_vis, y_vis = X_vis.to(device), y_vis.to(device)
model_vis = LSTMModel(input_dim, hidden_dim, layer_dim, output_dim, dropout)
model_graph = draw_graph(model_vis, input_data=X_vis, device=device)
model_graph.visual_graph

Note that the softmax is part of our loss criterion
i.e. the cross entropy loss torch.nn.CrossEntropyLoss()
which is part of the backpropagation, i.e., the training process.
Melody Generation (Before Training)
Given a sequence of arbitrary length, the generate
function is used to generate a new piece of music.
temperature
determines how much the probability distribution learned by the model is considered.
temperature
equal to 1.0 means that sampling is done from the probability distribution.temperature
approaching infinity means that sampling is done uniformly (more variation).temperature
approaching 0 means that higher probabilities are emphasized (less variation).
We can set a maximum length for the piece and also provide the beginning of a piece.
def next_event_number(idx, temperature:float):
with torch.no_grad():
logits = model(idx)
probs = F.softmax(logits / temperature, dim=1) # B, C
idx_next = torch.multinomial(probs, num_samples=1)
return idx_next
def generate(seq: list[str]=None, max_len:int=None, temperature:float=1.0):
with torch.no_grad():
generated_encoded_song = []
if seq != None:
idx = torch.tensor(
[[string_to_int.encode(char) for char in seq]],
device=device
)
generated_encoded_song = seq.copy()
else:
idx = torch.tensor([[string_to_int.encode(TERM_SYMBOL)]], device=device)
while max_len == None or max_len > len(generated_encoded_song):
idx_next = next_event_number(idx, temperature)
char = string_to_int.decode(idx_next.item())
if idx_next == string_to_int.encode(TERM_SYMBOL):
break
idx = torch.cat((idx, idx_next), dim=1) # B, T+1, C
generated_encoded_song.append(char)
return generated_encoded_song
Of course, the results are almost random because the parameters of our model are initialized randomly and we did not train it yet. The following code snippet generates 5 scores.
# number of songs we want to generate
n_scores = 5
temperature = 0.6
before_new_songs = []
for _ in range(n_scores):
encoded_song = generate(max_len=13,temperature=temperature)
print(f'generated {" ".join(encoded_song)} conisting of {len(encoded_song)} notes')
before_new_songs.append(encoded_song)
Let’s listen to the first one:
Training
For training, we use something called a DataLoader
.
This helps us to access our data more easily.
For example, we shuffle our data before training.
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_set, batch_size=batch_size,shuffle=True)
The code for training seems a bit complicated because we use batches.
This is due to dealing with a large amount of data, and we don’t send all of it through the network at once (per training step), but only a part of it, namely batch_size
many.
An epoch
is defined by the fact that all training data have been sent through the network once.
In essence, nothing else happens but:
- Send Batch through the network (Forward pass)
- Calculate error/cost
- Propagate gradients of the cost function with respect to the model parameters backwards through the network (Backward pass)
- Update model parameters
def train_one_epoch(epoch_index, tb_writer, n_epochs):
running_loss = 0.0
last_loss = 0.0
all_steps = n_epochs * len(train_loader)
for i, data in enumerate(train_loader):
local_X, local_y = data
local_X, local_y = local_X.to(device), local_y.to(device)
optimizer.zero_grad()
outputs = model(local_X)
loss = criterion(outputs, local_y)
loss.backward()
optimizer.step()
running_loss += loss.item()
if i % eval_interval == eval_interval-1:
last_loss = running_loss / eval_interval
steps = epoch_index * len(train_loader) + (i+1)
print(
f'Epoch [{epoch_index+1}/{n_epochs}], Step [{steps}/{all_steps}], Loss: {last_loss:.4f}')
tb_x = epoch_index * len(train_loader) + i + 1
tb_writer.add_scalar('Loss/train', last_loss, tb_x)
running_loss = 0.
return last_loss
# Initializing in a separate cell so we can easily add more epochs to the same run
def train(n_epochs,respect_val=False):
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
writer = SummaryWriter('runs/fashion_trainer_{}'.format(timestamp))
best_vloss = 1_000_000
for epoch in range(n_epochs):
model.train(True)
avg_loss = train_one_epoch(epoch, writer, n_epochs)
model.train(False)
running_vloss = 0.0
for i, vdata in enumerate(val_loader):
local_X, local_y = vdata
local_X, local_y = local_X.to(device), local_y.to(device)
voutputs = model(local_X)
vloss = criterion(voutputs, local_y)
running_vloss += vloss
avg_vloss = running_vloss / (i+1)
print(f'Epoch [{epoch+1}/{n_epochs}], Train-Loss: {avg_loss:.4f}, Val-Loss: {avg_vloss:.4f}')
writer.add_scalars('Training vs. Validation Loss', {'Training': avg_loss, 'Validation': avg_vloss}, epoch)
writer.flush()
if not respect_val or (respect_val and avg_vloss < best_vloss):
best_vloss = avg_vloss
model_path = './models/_model_{}_{}'.format(timestamp, epoch)
torch.save(model.state_dict(), model_path)
Calling train
starts the training.
train(n_epochs)
The best model from the training can be found in the folder ./models
and can be loaded as follows
model_path = './models/pretrained_1_128_best_val'
if device.type == 'cpu':
model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))
elif torch.backends.mps.is_available():
model.load_state_dict(torch.load(model_path, map_location=torch.device('mps')))
else:
model.load_state_dict(torch.load(model_path))
model.eval()
Melody Generation (After Training)
After training or after we load our pretrained model, we generate new pieces:
n_scores = 5
temperature = 0.6
after_new_songs = []
for _ in range(n_scores):
encoded_song = generate(max_len=120,temperature=temperature)
print(f'generated {" ".join(encoded_song)} conisting of {len(encoded_song)} notes')
after_new_songs.append(encoded_song)
after_generated_scores = encoder.decode_songs(after_new_songs)
Audio(score_to_wav(after_generated_scores[0], 'a_g_song.wav'))
We start to hear repetition and some structure within the piece: