Fast and Scalable Collaborative Filtering with Polars
Collaborative filtering is a powerful technique commonly used in recommendation systems to predict user preferences for items such as books, movies, or music based on the preferences of other users. In this post, we will implement
- a collaborative filtering using Polars, a data manipulation library for Python.
- Compare the performance in terms of efficiency and execution speed with the same algorithm implemented with Pandas.
The code to reproduce the results can be found here.
What is Polars?
Polars is a lightning-fast DataFrame library for Python written in Rust, designed to handle large datasets with ease. Unlike traditional data manipulation libraries, Polars leverages the power of Rust to provide high-performance data processing. It addition, it allows for lazy execution, removing memory bottlenecks and further speeding up computations.
Polars’ key features include:
- Performance: Written in Rust, Polars takes full advantage of Rust’s speed and safety features.
- Lazy Evaluation: Polars supports lazy evaluation, which optimizes query execution by deferring computation until necessary.
- Memory Efficiency: Polars is designed to minimize memory usage, making it ideal for handling large datasets.
Setting Up the Data
To implement our collaborative filtering system, let us start by setting up the data. We’ll be using a public dataset containing book ratings consisting of two csv files: one containing book information and another containing user ratings of these books. The books.csv
file includes details such as the book ID, title, and authors, while the ratings.csv
file contains user ratings, mapping user IDs to book IDs along with the rating they gave.
First, we import the necessary library, Polars, and then load the datasets using pl.scan_csv()
, which allows us to read large CSV files efficiently and lazily into a LazyFrame object. We then select the relevant columns from each dataset:
import polars as pl
books = pl.scan_csv("books.csv").select("book_id", "title", "authors")
ratings = pl.scan_csv("ratings.csv").select("user_id", "book_id", "rating")
We can print out the books
DataFrame with
books.limit().collect()
And also view some examples of the ratings
ratings.limit().collect()
Finally, we add our own ratings, either by writing them in my_ratings.csv
or by putting them in the code as below
# Create dataframe with my ratings
import os
if os.path.exists("my_ratings.csv"):
my_ratings = pl.scan_csv("my_ratings.csv")
else:
my_ratings = [
["The Hitchhiker's Guide to the Galaxy (Hitchhiker's Guide to the Galaxy, #1)", 5],
["The Martian", 5],
["Surely You're Joking, Mr. Feynman!: Adventures of a Curious Character", 5],
['Going Solo', 5],
["Flatland: A Romance of Many Dimensions", 5],
# Add more books here
]
my_ratings = pl.LazyFrame(my_ratings, schema=[('title', pl.String), ('my_rating', pl.Int64)])
# Join the book ids on my ratings
my_ratings = my_ratings.join(books.select("book_id", "title"), on='title', how='left')
For further processing, let us join our ratings with the existing ratings dataset.
ratings = ratings.join(my_ratings.select("book_id", "my_rating"), how="left", on="book_id")
Collaborative Filtering with Polars
To predict ratings, we perform the following user-user collaborative filtering steps:
Calculate Common Ratings: We group the ratings by user ID to determine how many books each user has rated in common with us.
articles_rated_in_common = ratings.group_by("user_id").agg((pl.col("my_rating").is_not_null()).sum().alias("articles_rated_in_common"))
Filter by Minimum Common Ratings: We filter users who have rated at least a specified number of books in common with us.
minimum_number_of_books_rated_in_common = 10
ratings = ratings.join(articles_rated_in_common, how="left", on="user_id").filter(
pl.col("articles_rated_in_common") > =minimum_number_of_books_rated_in_common)
Calculate Similarity Scores: We compute the Pearson correlation between our ratings and other users’ ratings to determine similarity.
similarities = ratings.group_by("user_id", maintain_order=True).agg(pl.corr("rating", "my_rating").alias("corr"))
Filter by Minimum Similarity: We keep only those users whose similarity scores exceed a certain threshold and are not NaN.
minimal_similarity = 0.7
ratings = ratings.join(similarities, on="user_id", how="left")
ratings = ratings.filter(pl.col("corr")>minimal_similarity)
ratings = ratings.filter(pl.col("corr").is_not_nan())
Filter by Minimum Ratings per Book: We ensure each book has received a minimum number of ratings to be considered for recommendation.
minimal_number_of_ratings = 6
ratings_per_article = ratings.group_by("book_id").agg((pl.col("rating").is_not_null()).sum().alias("ratings_per_article"))
ratings = ratings.join(ratings_per_article, on="book_id", how="left")
ratings = ratings.filter(pl.col("ratings_per_article") >= minimal_number_of_ratings)
Predict Ratings: We predict the ratings for each book by calculating the weighted average of ratings from similar users.
def predict_func():
return ((pl.col("rating")*pl.col("corr")).sum())/(pl.col("corr").sum())
predictions = ratings.group_by("book_id", maintain_order=True).agg(predict_func().alias("prediction")).select("book_id", "prediction")
predictions.limit(20).collect()
Finally, we join the predicted ratings with the book information and sort them to get the top recommendations:
predictions = predictions.join(books, how="left", on="book_id")
predictions = predictions.collect().sort(by="prediction", descending=True).limit(20)
predictions
This gives us the following top-20 recommendations with the corresponding predicted rating:
Performance Polars vs Pandas
Let us now compare the inference times for this collaborative filtering algorithm implemented in Polars against the same algorithm implemented in Pandas. In order to do so, let us write the collaborative filtering algorithm as a function:
def polars_collaborative_filtering(ratings, my_ratings, num_recs=20, minimal_similarity=0.7, minimal_number_of_ratings=6, minimum_number_of_books_rated_in_common=10):
"""
Performs collaborative filtering on a dataset of book ratings.
Args:
ratings (DataFrame): The dataset of book ratings.
my_ratings (DataFrame): The user's own ratings.
num_recs (int, optional): The number of recommendations to return. Defaults to 20.
minimal_similarity (float, optional): The minimum similarity threshold between users. Defaults to 0.7.
minimal_number_of_ratings (int, optional): The minimum number of ratings required for a book to be considered. Defaults to 6.
minimum_number_of_books_rated_in_common (int, optional): The minimum number of books rated in common between users. Defaults to 10.
Returns:
DataFrame: The predicted ratings for books.
"""
# Join the ratings dataset with the user's own ratings
ratings = ratings.join(my_ratings.select("book_id", "my_rating"), how="left", on="book_id")
# Calculate the number of articles rated in common for each user
articles_rated_in_common = ratings.group_by("user_id").agg((pl.col("my_rating").is_not_null()).sum().alias("articles_rated_in_common"))
# Filter out users who have rated less than the minimum number of books in common
ratings = ratings.join(articles_rated_in_common, how="left", on="user_id").filter(
pl.col("articles_rated_in_common") >= minimum_number_of_books_rated_in_common)
# Calculate the similarity between users based on their ratings
similarities = ratings.group_by("user_id", maintain_order=True).agg(pl.corr("rating", "my_rating").alias("corr"))
# Filter out users whose similarity is below the minimum similarity threshold
ratings = ratings.join(similarities, on="user_id", how="left")
ratings = ratings.filter(pl.col("corr") > minimal_similarity)
ratings = ratings.filter(pl.col("corr").is_not_nan())
# Calculate the number of ratings per article
ratings_per_article = ratings.group_by("book_id").agg((pl.col("rating").is_not_null()).sum().alias("ratings_per_article"))
# Filter out articles that have less than the minimum number of ratings
ratings = ratings.join(ratings_per_article, on="book_id", how="left")
ratings = ratings.filter(pl.col("ratings_per_article") >= minimal_number_of_ratings)
# Define the prediction function
def predict_func():
return ((pl.col("rating") * pl.col("corr")).sum()) / (pl.col("corr").sum())
# Calculate the predicted ratings for each book
predictions = ratings.group_by("book_id", maintain_order=True).agg(predict_func().alias("prediction")).select("book_id", "prediction")
# Return the top recommendations
predictions = predictions.sort(by="prediction", descending=True).limit(num_recs)
return predictions
For comparison, we implement the same collaborative filtering logic using Pandas:
import pandas as pd
def pandas_collaborative_filtering(ratings, my_ratings, num_recs=20, minimal_similarity=0.7, minimal_number_of_ratings=6, minimum_number_of_books_rated_in_common=10):
"""
Performs collaborative filtering on a dataset of book ratings.
Args:
ratings (DataFrame): The dataset of book ratings.
my_ratings (DataFrame): The user's own ratings.
num_recs (int, optional): The number of recommendations to return. Defaults to 20.
minimal_similarity (float, optional): The minimum similarity threshold between users. Defaults to 0.7.
minimal_number_of_ratings (int, optional): The minimum number of ratings required for a book to be considered. Defaults to 6.
minimum_number_of_books_rated_in_common (int, optional): The minimum number of books rated in common between users. Defaults to 10.
Returns:
DataFrame: The predicted ratings for books.
"""
# Join the ratings dataset with the user's own ratings
ratings = pd.merge(ratings, my_ratings[["book_id", "my_rating"]], how="left", on="book_id")
# Calculate the number of articles rated in common for each user
articles_rated_in_common = ratings.groupby("user_id").agg({'my_rating': lambda x: x.notnull().sum()}).rename(columns={"my_rating": "articles_rated_in_common"})
# Filter out users who have rated less than the minimum number of books in common
ratings = pd.merge(ratings, articles_rated_in_common, how="left", on="user_id")
ratings = ratings[ratings["articles_rated_in_common"]>=minimum_number_of_books_rated_in_common]
# Calculate the similarity between users based on their ratings
similarities = ratings.groupby("user_id")[["rating", "my_rating"]].corr().unstack().iloc[:, 1].rename("corr")
# Filter out users whose similarity is below the minimum similarity threshold
ratings = ratings.join(similarities, on="user_id", how="left")
ratings = ratings[ratings["corr"]>minimal_similarity]
ratings = ratings[ratings["corr"].notna()]
# Calculate the number of ratings per article
ratings_per_article = ratings.groupby("book_id").agg({'rating': lambda x: x.notnull().sum()}).rename(columns={"rating": "ratings_per_article"})
# Filter out articles that have less than the minimum number of ratings
ratings = pd.merge(ratings, ratings_per_article, on="book_id", how="left")
ratings = ratings[ratings["ratings_per_article"] >= minimal_number_of_ratings]
# Define the prediction function
def predict_func(x):
return ((x["rating"]*x["corr"]).sum())/(x["corr"].sum())
# Calculate the predicted ratings for each book
predictions = ratings.groupby("book_id").apply(predict_func).to_frame("prediction")
# Get the top recommendations
predictions = predictions.sort_values(by="prediction", ascending=False).head(num_recs)
return predictions
Finally, we can compare the execution time of the collaborative filtering process using Polars (both lazy and eager execution) and Pandas:
from timeit import timeit
n = 20
# Time the Polars algorithm in lazy mode
polars_time_lazy = timeit(lambda: polars_collaborative_filtering(ratings, my_ratings).collect(), number=n)
# Time the Polars algorithm in eager mode
ratings_df, my_ratings_df = pl.collect_all([ratings, my_ratings])
polars_time_eager = timeit(lambda: polars_collaborative_filtering(ratings_df, my_ratings_df), number=n)
# Time the Pandas algorithm
ratings_pd, my_ratings_pd = ratings.collect().to_pandas(), my_ratings.collect().to_pandas()
pandas_time = timeit(lambda: pandas_collaborative_filtering(ratings_pd, my_ratings_pd), number=n)
Running this across two different hardware yields the following inference time per iteration:
# Hardware 1
Polars (lazy): 1.36 seconds
Polars (eager): 0.30 seconds
Pandas: 13.31 seconds
# Hardware 2
Polars (lazy): 0.54 seconds
Polars (eager): 0.13 seconds
Pandas: 3.03 seconds
Interstingly, Polars with lazy execution is 6–10 times faster than Pandas, and in eager execution mode even up to 40 times faster.
Conclusion
The performance comparison shows significant speed advantages of using Polars over Pandas, particularly in the context of data-intensive tasks like collaborative filtering.
However, note that
- this is just one implementation of (user-user) collaborative filtering, many other implementations exist that yield better recommendations on average (see for example this post).
- this is most propably not the most efficient implementation of collaborative filtering
- Pandas offers features (such as
min_periods
inpd.corr()
) that one might exploit for further inference speeups.
Additionally, it’s interesting to observe that Polars’ lazy execution is slightly slower than eager execution. This might be due to the optimization of the compute graph or the data loading process, as implemented. However, lazy execution allows processing much larger datasets by loading only the required data into memory, enabling collaborative filtering with Polars to scale to much larger datasets and recommendation problems!