VITERBI ALGORITHM FOR PART OF SPEECH TAGGIN
INTRODUCTION
As we might be aware by now the Viterbi algorithm is one of the most powerful of the machine learning algorithms present in Natural Language Processing, I'm going to demonstrate how it can be used in NLP to Identify part of speech of words which is crucial for machines to understand context and the underlying meaning of a sentence.
Before we proceed, We must first Identify what the Viterbi algorithm is and what it does
THE VETERBI ALGORITHM
The Viterbi algorithm is a natural language processing algorithm invented by Andrew Viterbi in 1966, It uses the Markov property(assumption that Every state in the sequence is only dependent on the state preceding it), for example given a sentence "I WANT TO EAT", it assumes that the state of "EAT" only depends on the word before it "TO" which is not always True(just like Nave Bayes), But works incredibly well.
Each Part of Speech is stored in what is known as a transition matrix, where each cell, tells you the probability of transitioning from one part of speech to another, i.e each part of speech is arranged along the rows, and repeated along the column, where the part of speech "NOUN" intersects with "ADVERB" gives you the probability at any given time of transitioning from a noun to an adverb.
But the Viterbi algorithm then takes it further, it doesn't just predict the probability of going from one state to another, because each state is actually hidden, so we cannot just predict the probability of transitioning from one state to the next, instead we also predict the probability of emitting a particular part of speech(state) given a word in a sentence.
Now for example, given the sentence "I want to eat", to predict the part of speech of eat, we need to know the part of speech of "to", now that we know the part of speech of "to" which we lets say have used our algorithm to predict, we need to know the probability of transitioning from the part of speech of "to" to the part of speech of "eat"(which we don't know yet) so for each part of speech we assume for "to" we multiply that transitioning probability of "to" -->(NN, Adverb, etc) one by one for each we assume for 'eat' by the probability of that part of speech we've assumed for "eat" actually emitting 'eat', we then proceed to pick the highest value and store that value in a new matrix we use to hold the optimal probabilities and another matrix D used to hold the best index that get's us there
Now that we have a brief overview let's dive in to truly grasp how this graph algorithm works
VETERBI EXPLANATION
Now we can say we have a Hidden Markov Model, when we have visible observations that evolved from a sequence of hidden states and the observations are the result of those hidden states. Now given that we have a sentence our goal is to predict or tag the words in that sentence with the correct part of speech, so we can say that the part of speech tags of our words are the hidden states and the words themselves are the observations or result of those hidden states. And we already said that our algorithm exhibits the Markov property, i.e in the sequence of states each state at any point in the sequence depends only on the preceding state.
Transition Probabilities
Transition probabilities tell you the chances of going from one state to the other, the Markov property helps keep the model simple. We use a table to store the transition probabilities. We can call this table A, this is an example of what the table would look like
A Transition Probability Matrix (subset)
|A |...| RBS | RP | SYM | TO | UH|... | --- ||---:-------------| ------------ | ------------ | -------- | ---------- |---- |RBS |...|2.217069e-06 |2.217069e-06 |2.217069e-06 |0.008870 |2.217069e-06|... |RP |...|3.756509e-07 |7.516775e-04 |3.756509e-07 |0.051089 |3.756509e-07|... |SYM |...|1.722772e-05 |1.722772e-05 |1.722772e-05 |0.000017 |1.722772e-05|... |TO |...|4.477336e-05 |4.472863e-08 |4.472863e-08 |0.000090 |4.477336e-05|... |UH |...|1.030439e-05 |1.030439e-05 |1.030439e-05 |0.061837 |3.092348e-02|... | ... |...| ... | ... | ... | ... | ... | ...
As you can see we have each part of speech tag and the probability of transitioning from any given part of speech to another, this would be useful as we would see how to implement it
Emission probabilities
Now given the words in our sentence, the emission probability is the chance of any given part of speech emitting any given word. It is a matrix that contains all the part of speeches in the rows and the words of the given sentence in the columns
| B | ... | 725 | adroitly | engineers | promoted | synergy | ... |
|---|---|---|---|---|---|---|---|
| CD | ... | 8.201296e-05 | 2.732854e-08 | 2.732854e-08 | 2.732854e-08 | 2.732854e-08 | ... |
| NN | ... | 7.521128e-09 | 7.521128e-09 | 7.521128e-09 | 7.521128e-09 | 2.257091e-05 | ... |
| NNS | ... | 1.670013e-08 | 1.670013e-08 | 4.676203e-04 | 1.670013e-08 | 1.670013e-08 | ... |
| VB | ... | 3.779036e-08 | 3.779036e-08 | 3.779036e-08 | 3.779036e-08 | 3.779036e-08 | ... |
| RB | ... | 3.226454e-08 | 6.456135e-05 | 3.226454e-08 | 3.226454e-08 | 3.226454e-08 | ... |
| RP | ... | 3.723317e-07 | 3.723317e-07 | 3.723317e-07 | 3.723317e-07 | 3.723317e-07 | ... |
| ... | ... | ... | ... | ... | ... | ... | ... |
Viterbi or Trellis Matrix
After we have gotten our transition and emission matrices, we can now get to the crux of the Viterbi algorithm, the Viterbi matrix, this is where we store the best probability of each path, to get to a given word in the sequence, now let's say we have the tag NN on the row and the word go as the second word on the column, we compute the best probability of transition from each tag to "NN" multiplied by the probability of the tag "NN" emitting the word "go", we then fil in that cell with the best probability.
Backtracking Matrix
As we populate our Viterbi matrix we also need to store the index, In this matrix we store the index that represent the different states tag your traversing to find the most likely POS tags for the given sequence of words W1−Wk Let's Begin We need our start state for initialization because we would compute $$ \pi * b_{s,t} $$ if t=0 for initialization of the Viterbi matrix so we also need a start state tag
from utils_pos import get_word_tag, preprocess
import pandas as pd
from collections import defaultdict
import math
import numpy as np
Here is an example 'tag-set' or Part of Speech designation describing the two or three letter tag and their meaning.
One data set (WSJ-2_21.pos) will be used for training.
The other (WSJ-24.pos) for testing.
The tagged training data has been preprocessed to form a vocabulary (hmm_vocab.txt).
The words in the vocabulary are words from the training set that were used two or more times.
The vocabulary is augmented with a set of 'unknown word tokens', described below.
The training set will be used to create the emission, transition and tag counts.
The test set (WSJ-24.pos) is read in to create y.
This contains both the test text and the true tag.
The test set has also been preprocessed to remove the tags to form test_words.txt.
This is read in and further processed to identify the end of sentences and handle words not in the vocabulary using functions provided in utils_pos.py.
This forms the list
prep, the preprocessed text used to test our POS taggers.
A POS tagger will necessarily encounter words that are not in its datasets.
To improve accuracy, these words are further analyzed during preprocessing to extract available hints as to their appropriate tag.
For example, the suffix 'ize' is a hint that the word is a verb, as in 'final-ize' or 'character-ize'.
A set of unknown-tokens, such as '--unk-verb--' or '--unk-noun--' will replace the unknown words in both the training and test corpus and will appear in the emission, transition and tag data structures.
# load in the training corpus
with open("WSJ_02-21.pos", 'r') as f:
training_corpus = f.readlines()
print(f"A few items of the training corpus list")
print(training_corpus[0:5])
A few items of the training corpus list
['In\tIN\n', 'an\tDT\n', 'Oct.\tNNP\n', '19\tCD\n', 'review\tNN\n']
A few items of the vocabulary list
['!', '#', '$', '%', '&', "'", "''", "'40s", "'60s", "'70s", "'80s", "'86", "'90s", "'N", "'S", "'d", "'em", "'ll", "'m", "'n'", "'re", "'s", "'til", "'ve", '(', ')', ',', '-', '--', '--n--', '--unk--', '--unk_adj--', '--unk_adv--', '--unk_digit--', '--unk_noun--', '--unk_punct--', '--unk_upper--', '--unk_verb--', '.', '...', '0.01', '0.0108', '0.02', '0.03', '0.05', '0.1', '0.10', '0.12', '0.13', '0.15']
A few items at the end of the vocabulary list
['yards', 'yardstick', 'year', 'year-ago', 'year-before', 'year-earlier', 'year-end', 'year-on-year', 'year-round', 'year-to-date', 'year-to-year', 'yearlong', 'yearly', 'years', 'yeast', 'yelled', 'yelling', 'yellow', 'yen', 'yes', 'yesterday', 'yet', 'yield', 'yielded', 'yielding', 'yields', 'you', 'young', 'younger', 'youngest', 'youngsters', 'your', 'yourself', 'youth', 'youthful', 'yuppie', 'yuppies', 'zero', 'zero-coupon', 'zeroing', 'zeros', 'zinc', 'zip', 'zombie', 'zone', 'zones', 'zoning', '{', '}', '']
We are using our vocabulary to compare words in the training corpus to the words in the vocabulary list. If a word in the training corpus is not in the vocabulary list, we will replace it with a special token, , which stands for "unknown". This is a common technique in natural language processing to handle out-of-vocabulary words. It ensures that our model can still process sentences that contain words it has not seen during training, by treating them as a single unknown token.
PART OF SPEECH TAGGING
Not all words can only belong to a single part of speech, some words can belong to multiple parts of speech depending on the context in which it is used
For example, the word
isis a verb and it is not ambiguous.In the
WSJcorpus, $86$% of the token are unambiguous (meaning they have only one tag)About $14%$ are ambiguous (meaning that they have more than one tag)
Transition counts
- The first dictionary is the
transition_countsdictionary which computes the number of times each tag happened next to another tag.
This dictionary will be used to compute: (1)P(ti|ti−1)
This is the probability of a tag at position $i$ given the tag at position i−1.
The keys are
(prev_tag, tag)The values are the number of times those two tags appeared in that order.
Emission counts
The second dictionary you will compute is the emission_counts dictionary. This dictionary will be used to compute:
In other words, you will use it to compute the probability of a word given its tag.
In order for you to compute equation 2, you will create an emission_counts dictionary where
The keys are
(tag, word)The values are the number of times that pair showed up in your training set.
Tag counts
The last dictionary you will compute is the tag_counts dictionary.
The key is the tag
The value is the number of times each tag appeared.
Handling the initial words in a sentence with a start token
Now let's say we have a sentence "I want to eat", when creating a transition dictionary what is the transition probability of going to the word "I" from nothing, that's why it's modelled "--s-- I want to eat --s--", so we use the start token to transition from the start of the sentence to the start token, we are basically asking the question, what is the probability of I being the first word(part of speech tag in this case). for more details start and end sentence tokens in lanuage modelling
def create_dictionaries(training_corpus, vocab, verbose=True): """ Create the transition and emission dictionaries for the HMM model.
Args:
training_corpus: A list of sentences, where each sentence is a string of word/tag pairs.
vocab: A dictionary mapping words to their corresponding indices.
verbose: If True, print the number of unique tags and words in the training corpus."""
# Initialize the transition and emission dictionaries
transition_dict = defaultdict(int)
emission_dict = defaultdict(int)
tag_counts = defaultdict(int)
prev_tag = '--s--'
i = 0
# Initialize "prev_tag" (previous tag) with the start state, denoted by '--s--'
# Each item in the training corpus contains a word and its POS tag
# Go through each word and its tag in the training corpus
for word_tag in training_corpus:
i += 1
word, tag = get_word_tag(word_tag, vocab) # Get the word and its corresponding tag using the get_word_tag function
transition_dict[(prev_tag, tag)] += 1
emission_dict[(word, tag)] += 1
if i%50000 == 0 and verbose:
print(f"tag counts {tag}, word counts {word}, transition counts {transition_dict[(prev_tag, tag)]}, emission counts {emission_dict[(word, tag)]}")
prev_tag = tag
tag_counts[tag] += 1
return emission_dict, transition_dict, tag_counts
emission_counts, transition_counts, tag_counts = create_dictionaries(training_corpus, vocab)
print(transition_counts[('--s--', 'NN')]) ---> 1598
print(tag_counts['NN']) ----> 132935
IMPLEMENTING HIDDEN MARKOV MODELS FOR POS WITH VETERBI DECODER
The HMM is one of the most common algorithms in NLP, and is a foundation to many deep learning techniques, let's see how it works.
A markov model utilized the transition matrix A and emission matrix B which describes the probability of a visible observation when we are in a particular state.
Emissions are words, Hidden states are POS of that word.
Transition Matrix
This is a matrix built from our transition_counts dictionary, It contains POS tags on rows and their transition tags on the columns, whereby each intersection of (previous_tag, tag) is on the cells and what is stored is not the count but the probability of transitioning from one part of speech tag to another
Mathematical formulation
Transition Matrix: $$ P(t_i |t_i-1) = \frac{C(t_{i-1}, t_i) + \alpha}{C(t_{i-1}) +N*\alpha} $$
def Create_transition_matrix(transition_counts, tag_counts, vocab, alpha=0.001): """ Create the transition matrix for the HMM model.
Args:
transition_counts: A dictionary containing the counts of transitions between tags.
tag_counts: A dictionary containing the counts of each tag in the training corpus.
vocab: A dictionary mapping words to their corresponding indices.
alpha: The smoothing parameter for Laplace smoothing. """
states = list(tag_counts)
states.sort()
num_tags = len(tag_counts) # total number of tags we have present
transition_matrix = np.zeros((num_tags, num_tags))
# initialize transition matric
trans_keys = set(transition_counts.keys())
for i in range(num_tags):
for j in range(num_tags):
# Initialize the count of the (prev POS, current POS) to zero
count = 0
# Define the tuple (prev POS, current POS)
# Get the tag at position i and tag at position j (from the all_tags list)
key = (states[i], states[j]) # tuple of form (tag,tag)
# Check if the (prev POS, current POS) tuple
# exists in the transition counts dictionary
if key in trans_keys: # Replace None in this line with the proper condition.
# Get count from the transition_counts dictionary
# for the (prev POS, current POS) tuple
count = transition_counts[key]
transition_matrix[i, j] = (count + alpha) / (tag_counts[states[i]] + (alpha * num_tags))
# weare looping over each tag(row) as it transitions to every other tag(column) in the document and getting that transitin probability out
return transition_matrix, states
| | **RBS** | RP | SYM | TO | UH | | --- | --- | --- | --- | --- | --- | | RBS | 2.217069e-06 | 2.217069e-06 | 2.217069e-06 | 0.008870 | 2.217069e-06 | | RP | 3.756509e-07 | 7.516775e-04 | 3.756509e-07 | 0.051089 | 3.756509e-07 |
Create the 'B' emission probabilities matrix
Now we will create the B emission matrix which computes the emission probability.
You will use smoothing as defined below:
\(C(t_i, word_i)\) is the number of times wordi was associated with \(tag_i\) in the training data (stored in
emission_countsdictionary).\(C(t_i)\) is the number of times tagi was in the training data (stored in
tag_countsdictionary).$N$ is the number of words in the vocabulary
\(\alpha\) is a smoothing parameter.
The matrix emission_matrix is of dimension (num_tags, N), where num_tags is the number of possible parts-of-speech tags.
Here is an example of the matrix, only a subset of tags and words are shown:
Emissions Probability Matrix (subset)
| B | ... | 725 | adroitly | engineers | promoted | synergy | ... |
|---|---|---|---|---|---|---|---|
| CD | ... | 8.201296e-05 | 2.732854e-08 | 2.732854e-08 | 2.732854e-08 | 2.732854e-08 | ... |
| NN | ... | 7.521128e-09 | 7.521128e-09 | 7.521128e-09 | 7.521128e-09 | 2.257091e-05 | ... |
def create_emission_matrix(tag_counts, emission_counts, vocab, alpha=0.001):
''' Input: alpha: tuning parameter used in smoothing tag_counts: a dictionary mapping each tag to its respective count emission_counts: a dictionary where the keys are (tag, word) and the values are the counts vocab: a dictionary where keys are words in vocabulary and value is an index. within the function it'll be treated as a list Output: B: a matrix of dimension (num_tags, len(vocab)) '''
# get the number of POS tag
num_tags = len(tag_counts)
# Get a list of all POS tags
states = list(tag_counts)
states.sort()
# Get the total number of unique words in the vocabulary
num_words = len(vocab)
vocab = list(vocab)
# Initialize the emission matrix B with places for
# tags in the rows and words in the columns
emission_matrix = np.zeros((num_tags, num_words))
# Get a set of all (POS, word) tuples
# from the keys of the emission_counts dictionary
emis_keys = set(list(emission_counts.keys()))
### START CODE HERE (Replace instances of 'None' with your code) ###
# Go through each row (POS tags)
for i in range(num_tags): # Replace None in this line with the proper range.
# Go through each column (words)
for j in range(num_words): # Replace None in this line with the proper range.
# Initialize the emission count for the (POS tag, word) to zero
count = 0
# Define the (POS tag, word) tuple for this row and column
key = (states[i], vocab[j]) # tuple of form (tag,word)
# check if the (POS tag, word) tuple exists as a key in emission counts
if key in emis_keys: # Replace None in this line with the proper condition.
# Get the count of (POS tag, word) from the emission_counts d
count = emission_counts[key]
# Get the count of the POS tag
count_tag = tag_counts[states[i]]
# Apply smoothing and store the smoothed value
# into the emission matrix B for this row and column
emission_matrix[i,j] = (count + alpha) / (count_tag + (alpha*num_words))
### END CODE HERE ###
return emission_matrix
VITERBI ALGORITHM AND DYNAMIC PROGRAMMING
Viterbi algorithm is a dynamic programming algorithm that finds the most likely sequence of hidden events that would explain a sequence of observed outcomes. The result is called the Viterbi path.
Remember there are three stages to implement the Viterbi of which the last is to store the index of the part of speech of the matrix we stored the best probability so we can back-trace and tag each word.
Initialization
Our viterbi matrix consists of all the words in the given sentence we are to tag along the columns arranged from W1−Wk and all part of speech or states along the rows where they meet is the probability of that part of speech producing a word at the position of the word in the sequence. given a sentence of words, To initialize our viterbi matrix and backtracking matrix, we look at the sequence and for each possible tag, we compute the probability of the "--s--" token producing word W1 in the sentence.
$$ C_{i,1} = \pi_i * b_{i,cindex(w_i)} = a_{1,i} * b_{i, cindex(w_1)} $$
That is we initialize the probability of \(W_1\) being the first in the sequence, from there we can fill our Viterbi matrix
n order to avoid multiplying and storing small values on the computer, we'll take the log of the product, which becomes the sum of two logs:
$$best\_probs[i,0] = log(A[s_{idx}, i]) + log(B[i, vocab[corpus[0]]$$
def initialize(states, tag_counts, A, B, corpus, vocab):
''' Input: states: a list of all possible parts-of-speech tag_counts: a dictionary mapping each tag to its respective count A: Transition Matrix of dimension (num_tags, num_tags) B: Emission Matrix of dimension (num_tags, len(vocab)) corpus: a sequence of words whose POS is to be identified in a list vocab: a dictionary where keys are words in vocabulary and value is an index Output: best_probs: matrix of dimension (num_tags, len(corpus)) of floats(viterbi matrix) best_paths: matrix of dimension (num_tags, len(corpus)) of integers '''
# Get the total number of unique POS tags
num_tags = len(tag_counts) num_words = len(corpus)
best_probs = np.zeros((num_tags, num_words))
best_paths = np.zeros((num_tags, num_words), dtype=int)
s_idx = states.index("--s--")
for i in range(num_tags):
best_probs[i, 0] = math.log(A[s_idx, i]) + math.log(B[(i, corpus[0])])
best_paths[i,0] = 0 # we aren't coming from any best index so we set it to zero
return best_probs, best_paths
VETERBI FORWARD PASS
Now we want to implement the forward pass for the viterbi matrix and store the index of the best path taken to get there.
Now that we've initialized the matrix and filled the first column, we need to recursively check the transition probability of progressing in the sequence for each part of speech, and the probability of each part of speech in the row emmitting a word and pick the best one and store repeatedly in our matrix
Our logic would look something like this: For Each word in the sequence --> for each part of speech that each word may be --> what is the probability of transitioning from each part of speech that the previous word would be to this part of speech? if its the highest so far pls store it
we use math.log to compute the natural logarithm.
def viterbi_forward(A, B, test_corpus, best_probs, best_paths, vocab, verbose=True):
''' Input: A, B: The transition and emission matrices respectively test_corpus: a list containing a preprocessed corpus best_probs: an initilized matrix of dimension (num_tags, len(corpus)) best_paths: an initilized matrix of dimension (num_tags, len(corpus)) vocab: a dictionary where keys are words in vocabulary and value is an index Output: best_probs: a completed matrix of dimension (num_tags, len(corpus)) best_paths: a completed matrix of dimension (num_tags, len(corpus)) '''
# Get the number of unique POS tags (which is the num of rows in best_probs)
num_tags = best_probs.shape[0]
# Go through every word in the corpus starting from word 1
# Recall that word 0 was initialized in `initialize()`
for i in range(1, len(test_corpus)):
# Print number of words processed, every 5000 words
if i % 5000 == 0 and verbose:
print("Words processed: {:>8}".format(i))
# For each unique POS tag that the current word can be
for j in range(num_tags): # Replace None in this line with the proper range. # for every pos tag
# Initialize best_prob for word i to negative infinity
best_prob_i = float("-inf")
# Initialize best_path for current word i to None
best_path_i = None # Do not replace this None # @KEEPTHIS
# For each POS tag that the previous word can be:
for k in range(num_tags): # Replace None in this line with the proper range.
# Calculate the probability = None
# best probs of POS tag k, previous word i-1 +
# log(prob of transition from POS k to POS j) +
# log(prob that emission of POS j is word i)
prob = best_probs[k, i-1] + np.log(A[k,j]) + np.log(B[j, vocab[test_corpus[i]]])
# check if this path's probability is greater than
# the best probability up to and before this point
if prob > best_prob_i: # Replace None in this line with the proper condition.
# Keep track of the best probability
best_prob_i = prob
# keep track of the POS tag of the previous word
# that is part of the best path.
# Save the index (integer) associated with
# that previous word's POS tag
best_path_i = k
# Save the best probability for the
# given current word's POS tag
# and the position of the current word inside the corpus
best_probs[j,i] = best_prob_i
# Save the unique integer ID of the previous POS tag
# into best_paths matrix, for the POS tag of the current word
# and the position of the current word inside the corpus.
best_paths[j,i] = best_path_i
return best_probs, best_paths
VITERBI BACKWARD PASS, MOVE THROUGH THE BEST INDEX MATRIX TO GET THE TAGS SEQUENCE THAT PRODUCE THE HIGHEST PROBABILITY
We have now implemented our Viterbi matrix, we would want to backtrack starting from the value in the last column with the highest probability, we take its index then go to that position in our best path matrix and we would best construct arrive at the index of the part of speech in the previous column that got us there as we do this we keep storing the indexes, for example, the highest probability row in the last column is the index of our last tag, then index there is that of the previous tag, the index in that previous is also that of the tag before it all the way to the 2nd column where we get the index of the first word.
def backward_pass(best_probs, best_paths, states):
"""This function returns the best path"""
# Get the number of words in the corpus
# which is also the number of columns in best_probs, best_paths
m = best_paths.shape[1]
# Initialize array z, same length as the corpus
z = [None] * m # DO NOT replace the "None"
# Get the number of unique POS tags
num_tags = best_probs.shape[0]
# Initialize the best probability for the last word
best_prob_for_last_word = float('-inf')
# Initialize pred array, same length as corpus
pred = [None] * m # DO NOT replace the "None"
# Get the index of the highest probability in the last column
z[m - 1] = np.argmax(best_probs[:, m - 1])
# Optionally, if you still need the actual probability value:
best_prob_for_last_word = best_probs[z[m - 1], m - 1]
# Convert the last word's predicted POS tag
# from its unique integer ID into the string representation
# using the 'states' list
# store this in the 'pred' array for the last word
pred[m - 1] = states[z[m-1]]
# Find the best POS tags by walking backward through the best_paths
# From the last word in the corpus to the 0th word in the corpus
for i in range(m-1, 0, -1): # Replace None in this line with the proper range.
# Retrieve the unique integer ID of
# the POS tag for the word at position 'i' in the corpus
# In best_paths, go to the row representing the POS tag of word i
# and the column representing the word's position in the corpus
# to retrieve the predicted POS for the word at position i-1 in the corpus
z[i - 1] = best_paths[z[i], i]
# Get the previous word's POS tag in string form
# Use the 'states' list,
# where the key is the unique integer ID of the POS tag,
# and the value is the string representation of that POS tag
pred[i - 1] = states[z[i-1]]
return pred