Implement TF–IDF from scratch. Given a list of documents (strings), build a memory-efficient tokenizer (lowercase, strip punctuation, optional min_df/max_df filtering), compute term frequencies, smoothed IDF = log((1 + N) / (1 + df)) + 1, and output a CSR sparse matrix (rows=documents, cols=vocabulary ordered lexicographically). Requirements: (1) do not use external NLP libraries; only Python, NumPy, and SciPy are allowed; (2) handle OOV tokens at transform time by ignoring them; (3) support L2 normalization per row; (4) provide an inverse_transform(doc_index) that reconstructs the top-k terms by TF–IDF score; (5) analyze time and space complexity for fit and transform; and (6) write a unit test showing correctness on a 3-document toy corpus with repeated terms.
Quick Answer: This question evaluates implementation-level skills in text feature extraction and numerical computing, covering TF–IDF computation, sparse matrix construction, memory-efficient tokenization, vocabulary filtering, deterministic ordering, and API design for fit/transform/inverse_transform.
Solution
# Approach Overview
We will implement a minimal, memory-conscious TF–IDF vectorizer inspired by common libraries:
- Tokenizer: a streaming regex-based generator that yields lowercase alphanumeric tokens and discards punctuation.
- Fit: one pass over the corpus to compute document frequencies (df), then build a lexicographically sorted vocabulary and compute smoothed IDF.
- Transform: one pass over the corpus to build per-document term counts restricted to the fitted vocabulary, convert to a CSR matrix, apply TF–IDF weighting, and optionally L2-normalize row-wise.
- Inverse transform: retrieve the top-k terms by TF–IDF score from a given row (document) of the CSR matrix.
Key formula:
- idf_j = log((1 + N) / (1 + df_j)) + 1
Where:
- N = number of documents.
- df_j = number of documents containing term j.
We will implement min_df and max_df as either absolute counts (int) or proportions (float in (0, 1]). OOV tokens are ignored during transform.
---
## Reference Implementation (Python + NumPy/SciPy only)
```python
import re
import math
import numpy as np
from scipy.sparse import csr_matrix
class TfidfVectorizerScratch:
"""
A simple TF–IDF vectorizer from scratch using Python, NumPy, and SciPy.
Parameters
----------
min_df : int or float, default=1
- If int: keep terms with document frequency >= min_df.
- If float in (0, 1]: keep terms with document frequency >= ceil(min_df * n_docs).
max_df : int or float, default=1.0
- If int: keep terms with document frequency <= max_df.
- If float in (0, 1]: keep terms with document frequency <= floor(max_df * n_docs).
normalize : bool, default=True
If True, apply L2 normalization to each row of the TF–IDF matrix.
"""
_token_re = re.compile(r"[a-z0-9]+") # lowercase alphanumerics only
def __init__(self, min_df=1, max_df=1.0, normalize=True):
self.min_df = min_df
self.max_df = max_df
self.normalize = normalize
# Learned attributes after fit
self.n_docs_ = None
self.vocabulary_ = None # dict term -> col_idx
self.terms_ = None # list of terms in lexicographic order (col index -> term)
self.idf_ = None # np.ndarray of idf aligned with terms_
self.df_ = None # dict term -> df
# For convenience in inverse_transform without passing X explicitly
self._last_X = None
# ------------------ Tokenization ------------------
@classmethod
def _iter_tokens(cls, text):
# Generator-based tokenizer: lowercase + strip punctuation via regex
text = text.lower()
for m in cls._token_re.finditer(text):
yield m.group(0)
# ------------------ DF thresholds ------------------
@staticmethod
def _resolve_df_thresholds(min_df, max_df, n_docs):
# Convert (int or float) thresholds to absolute integer bounds
if isinstance(min_df, float):
if not (0.0 < min_df <= 1.0):
raise ValueError("min_df float must be in (0, 1].")
min_df_abs = int(math.ceil(min_df * n_docs))
else:
min_df_abs = int(min_df)
if isinstance(max_df, float):
if not (0.0 < max_df <= 1.0):
raise ValueError("max_df float must be in (0, 1].")
max_df_abs = int(math.floor(max_df * n_docs))
else:
max_df_abs = int(max_df)
if max_df_abs < min_df_abs:
raise ValueError(f"max_df ({max_df_abs}) < min_df ({min_df_abs}).")
if min_df_abs < 1:
min_df_abs = 1
return min_df_abs, max_df_abs
# ------------------ Fitting ------------------
def fit(self, corpus):
"""
Compute df for each term, build lexicographic vocabulary, and compute IDF.
corpus: iterable of strings (documents). May be any re-iterable container.
"""
df = {}
n_docs = 0
for doc in corpus:
n_docs += 1
seen = set()
for tok in self._iter_tokens(doc):
seen.add(tok)
for tok in seen:
df[tok] = df.get(tok, 0) + 1
self.n_docs_ = n_docs
if n_docs == 0:
# Empty corpus; set empty structures
self.vocabulary_ = {}
self.terms_ = []
self.idf_ = np.zeros((0,), dtype=np.float64)
self.df_ = {}
return self
min_df_abs, max_df_abs = self._resolve_df_thresholds(self.min_df, self.max_df, n_docs)
# Filter terms by df
filtered_terms = [t for t, c in df.items() if (min_df_abs <= c <= max_df_abs)]
filtered_terms.sort() # lexicographic order
self.terms_ = filtered_terms
self.vocabulary_ = {t: i for i, t in enumerate(self.terms_)}
self.df_ = {t: df[t] for t in self.terms_}
# Compute smoothed IDF aligned to terms_
idf = np.empty(len(self.terms_), dtype=np.float64)
for i, t in enumerate(self.terms_):
c = self.df_[t]
idf[i] = math.log((1 + n_docs) / (1 + c)) + 1.0
self.idf_ = idf
return self
# ------------------ Transforming ------------------
def transform(self, corpus):
"""
Convert documents to a CSR TF–IDF matrix using the fitted vocabulary and IDF.
OOV tokens are ignored.
"""
if self.vocabulary_ is None:
raise RuntimeError("Call fit() before transform().")
vocab = self.vocabulary_
idf = self.idf_
n_cols = len(vocab)
data = []
indices = []
indptr = [0]
n_rows = 0
for doc in corpus:
n_rows += 1
term_counts = {}
# accumulate counts keyed by column index (more efficient to sort later)
for tok in self._iter_tokens(doc):
j = vocab.get(tok)
if j is not None: # ignore OOV
term_counts[j] = term_counts.get(j, 0) + 1
if term_counts:
cols = sorted(term_counts.keys()) # ensure CSR canonical order
for j in cols:
tfidf = term_counts[j] * idf[j]
indices.append(j)
data.append(tfidf)
indptr.append(len(indices))
X = csr_matrix((np.array(data, dtype=np.float64),
np.array(indices, dtype=np.int32),
np.array(indptr, dtype=np.int32)),
shape=(n_rows, n_cols), dtype=np.float64)
if self.normalize:
self._l2_normalize_csr_inplace(X)
self._last_X = X # store for inverse_transform convenience
return X
def fit_transform(self, corpus):
"""
Single-call convenience. In memory, this stores per-document counts to avoid
re-iterating corpus, which may increase peak memory. For very large corpora,
prefer fit(corpus) then transform(corpus) with a re-iterable corpus.
"""
# First pass: collect doc term counts and df
doc_counts = []
df = {}
n_docs = 0
for doc in corpus:
n_docs += 1
counts = {}
for tok in self._iter_tokens(doc):
counts[tok] = counts.get(tok, 0) + 1
doc_counts.append(counts)
for tok in counts.keys():
df[tok] = df.get(tok, 0) + 1
# Build vocabulary and idf
self.n_docs_ = n_docs
if n_docs == 0:
self.vocabulary_ = {}
self.terms_ = []
self.idf_ = np.zeros((0,), dtype=np.float64)
self.df_ = {}
X = csr_matrix((0, 0), dtype=np.float64)
self._last_X = X
return X
min_df_abs, max_df_abs = self._resolve_df_thresholds(self.min_df, self.max_df, n_docs)
terms = [t for t, c in df.items() if (min_df_abs <= c <= max_df_abs)]
terms.sort()
self.terms_ = terms
self.vocabulary_ = {t: i for i, t in enumerate(terms)}
self.df_ = {t: df[t] for t in terms}
idf = np.empty(len(terms), dtype=np.float64)
for i, t in enumerate(terms):
c = self.df_[t]
idf[i] = math.log((1 + n_docs) / (1 + c)) + 1.0
self.idf_ = idf
# Build CSR from stored counts
data, indices, indptr = [], [], [0]
for counts in doc_counts:
# Convert token counts to column-indexed counts
per_col = {}
for tok, tf in counts.items():
j = self.vocabulary_.get(tok)
if j is not None:
per_col[j] = tf
if per_col:
cols = sorted(per_col.keys())
for j in cols:
data.append(per_col[j] * idf[j])
indices.append(j)
indptr.append(len(indices))
X = csr_matrix((np.array(data, dtype=np.float64),
np.array(indices, dtype=np.int32),
np.array(indptr, dtype=np.int32)),
shape=(n_docs, len(terms)), dtype=np.float64)
if self.normalize:
self._l2_normalize_csr_inplace(X)
self._last_X = X
return X
# ------------------ Inverse Transform ------------------
def inverse_transform(self, doc_index, k=None, X=None, with_scores=True):
"""
Return the top-k terms (and optionally scores) for the given document index
in descending TF–IDF order. If k is None, return all non-zero terms.
Parameters
----------
doc_index : int
k : int or None
X : csr_matrix or None (defaults to last transformed matrix)
with_scores : bool, default=True
Returns
-------
list of terms (or list of (term, score) if with_scores=True)
"""
if X is None:
if self._last_X is None:
raise RuntimeError("No matrix available. Pass X or call transform() first.")
X = self._last_X
if not isinstance(X, csr_matrix):
raise TypeError("X must be a CSR matrix.")
if doc_index < 0 or doc_index >= X.shape[0]:
raise IndexError("doc_index out of range.")
row = X.getrow(doc_index)
if row.nnz == 0:
return []
vals = row.data
cols = row.indices
# Get top-k by TF–IDF score
if k is None or k >= len(vals):
order = np.argsort(-vals)
else:
# use argpartition for O(n) selection, then sort selected
topk_idx = np.argpartition(-vals, k-1)[:k]
order = topk_idx[np.argsort(-vals[topk_idx])]
terms = self.terms_
if with_scores:
return [(terms[cols[i]], float(vals[i])) for i in order]
else:
return [terms[cols[i]] for i in order]
# ------------------ Utilities ------------------
@staticmethod
def _l2_normalize_csr_inplace(X):
# In-place L2 normalization per row for CSR matrix
n_rows = X.shape[0]
data = X.data
indptr = X.indptr
for i in range(n_rows):
start, end = indptr[i], indptr[i+1]
if end > start:
row = data[start:end]
norm = math.sqrt(float(np.dot(row, row)))
if norm > 0.0:
data[start:end] = row / norm
# Optional helper for inspection
def get_feature_names(self):
return list(self.terms_) if self.terms_ is not None else []
```
---
## Time and Space Complexity
Let:
- D = number of documents
- T = total number of tokens across all documents during a pass
- V = vocabulary size after filtering
- nnz = total number of non-zero entries in the resulting matrix (sum over documents of distinct in-vocab terms)
- r_i = number of non-zero terms in document i (distinct in-vocab tokens)
Fit
- Time: O(T + V log V)
- O(T) to tokenize and build per-document unique sets and df.
- O(V log V) to sort terms lexicographically.
- Space: O(V) for df and vocabulary structures.
Transform
- Time: O(T + sum_i r_i log r_i + nnz)
- O(T) to tokenize and count in-vocab tokens.
- O(sum_i r_i log r_i) to sort per-document column indices (ensures CSR canonical order).
- O(nnz) to compute TF–IDF and assemble CSR arrays; L2 normalization adds O(nnz).
- Space: O(nnz) for the output matrix, plus O(max_i r_i) temporary per-document counts.
Notes
- fit_transform stores per-document counts to avoid re-iteration, increasing peak memory to O(V + sum_i r_i).
- For very large corpora, prefer fit(corpus) then transform(corpus) with a re-iterable corpus to keep memory bounded.
---
## Unit Test (3-document toy corpus)
This test checks: vocabulary order, smoothed IDF values, CSR shape/nnz, OOV handling, L2 normalization, min_df filtering, and inverse_transform ordering.
```python
import unittest
import numpy as np
from scipy.sparse import csr_matrix
class TestTfidfVectorizerScratch(unittest.TestCase):
def setUp(self):
self.docs = [
"The cat sat on the mat. The cat!",
"Dog dog dog; cat sat.",
"The dog and the cat played, and the dog slept."
]
def test_basic_fit_transform(self):
vec = TfidfVectorizerScratch(min_df=1, max_df=1.0, normalize=True)
X = vec.fit_transform(self.docs)
# Expected vocabulary (lexicographic)
expected_terms = sorted({
'the','cat','sat','on','mat','dog','and','played','slept'
})
self.assertEqual(vec.get_feature_names(), expected_terms)
# Shape: 3 documents x 9 terms
self.assertEqual(X.shape, (3, len(expected_terms)))
self.assertIsInstance(X, csr_matrix)
# Check smoothed IDF values for a few terms
# N = 3; df(cat)=3 => idf = log(4/4)+1 = 1.0
# df(the)=2 => idf = log(4/3)+1
idf_terms = dict(zip(vec.get_feature_names(), vec.idf_))
self.assertAlmostEqual(idf_terms['cat'], 1.0, places=7)
self.assertAlmostEqual(idf_terms['the'], math.log(4/3) + 1.0, places=7)
# Row-wise L2 normalization: norms should be ~1 for non-empty rows
for i in range(X.shape[0]):
row = X.getrow(i)
if row.nnz > 0:
norm = np.sqrt((row.data ** 2).sum())
self.assertAlmostEqual(norm, 1.0, places=7)
# Inverse transform: top-2 terms for doc 0 should be ['the', 'cat']
top2 = vec.inverse_transform(0, k=2, X=X, with_scores=False)
self.assertEqual(top2[:2], ['the', 'cat'])
def test_oov_ignored(self):
vec = TfidfVectorizerScratch(normalize=False)
vec.fit(self.docs)
X = vec.transform(["unseen tokens only"])
self.assertEqual(X.shape[0], 1)
self.assertEqual(X.getrow(0).nnz, 0) # all OOV => empty row
def test_min_df_filtering(self):
# Keep terms that appear in at least 2 docs
vec = TfidfVectorizerScratch(min_df=2, normalize=False)
X = vec.fit_transform(self.docs)
# Terms with df >= 2: {'cat', 'dog', 'sat', 'the'}
self.assertEqual(vec.get_feature_names(), sorted(['cat','dog','sat','the']))
# Matrix has 3 rows and 4 columns
self.assertEqual(X.shape, (3, 4))
if __name__ == '__main__':
unittest.main(argv=[''], exit=False)
```
---
## Notes, Pitfalls, and Extensions
- Punctuation and token definition: We used [a-z0-9]+. This removes punctuation and keeps digits. Adjust the regex if you want to exclude numbers.
- Case-folding: Lowercasing is applied before tokenization; for languages with case-folding complexities, consider unicode normalization.
- Lexicographic order: We sort the filtered vocabulary, and we sort per-document column indices to create a canonical CSR matrix (important for equality tests and deterministic behavior).
- Empty documents: Produce all-zero rows; normalization safely skips zero rows.
- Numeric stability: The smoothed IDF avoids division-by-zero when a term appears in all documents.
- Performance: For large corpora, prefer fit() then transform() with a re-iterable corpus to avoid storing per-document counts in memory.
- Top-k selection: inverse_transform uses argpartition for efficiency when k is much smaller than the number of non-zero terms in the row.