I'm trying to create graph with edges only for nodes/(records index in dataframe) that have the same values in any 2 or more columns.
What I'm doing - I create a list with all possible combination pairs of column names and go through them searching for duplicates, for which I extract indexes and create edges.
The problem is that for huge datasets (millions of records) - this solution is too slow and requires too much memory.

What I do:

df = pd.DataFrame({
    'A': [1, 2, 3, 4, 5],
    'B': [1, 1, 1, 1, 2],
    'C': [1, 1, 2, 3, 3],
    'D': [2, 7, 9, 8, 4]})  
A B C D
0 1 1 1 2
1 2 1 1 7
2 3 1 2 9
3 4 1 3 8
4 5 2 3 4

Here, rows 0 and 1 have 2 same values in columns B and C.
So, for nodes 0,1,2,3,4 I need to create edge 0-1. Other records have at maximum 1 same field between each other.

    graph = nk.Graph(num_nodes, directed=False, weighted=False)

    # Get the indices of all unique pairs
    indices = np.triu_indices(len(column_names), k=1)
    # Get the unique pairs of column names
    unique_pairs = np.column_stack((column_names[indices[0]], column_names[indices[1]]))

    for col1, col2 in unique_pairs:
        # Filter the dataframe directly
        duplicated_rows = df[[col1, col2]].dropna()
        duplicated_rows = duplicated_rows[duplicated_rows.duplicated(subset=[col1, col2], keep=False)]

    for _, group in duplicated_rows.groupby([col1, col2]):
        tb_ids = group.index.tolist()
        for i in range(len(tb_ids)):
            for j in range(i + 1, len(tb_ids)):
                graph.addEdge(tb_ids[i], tb_ids[j])

Main question - how to speed up / improve this solution? I was thinking about parallelization by column combination - but in this case can't figure out how to create edges in a graph properly.
Appreciate any help.

2

There are 2 best solutions below

0
ravenspoint On BEST ANSWER

Memory Problem

Your multi-million record input generates so many pairs, they cannot all be kept in memory.

You will have to give up storing everything in memory. You will need to store the data in a highly optimized database. I suggest SQLite. bring input data into memory as required and store the pairs to the database as they are found. If you properly optimize your use of SQLite then the performance hit will be minimal and you will not run out of memory

Performance problem

Storing pairs to a database will slow the performance slightly.

You will need to optimize how you use the database. The two most important optimizations are:

  • Transaction Grouping. Initially, keep the pairs as they are found in memory. When the pair count reaches a specified number, write them all to the database in one transaction.

  • Asynchronous Write. Once you have handed off the writes to the db engine, do not wait for confirmation that the write succeeded - just blaze ahead with the pair search.

You forgot to state your performance requirement! However, whatever your requirement might be, I will assume that you will need to squeeze out a significant improvement.

I see that you are using python. This is an interpreted language, so the performance will be sluggish. Switching to a compiled language will give you a significant performance boost. For example using well coded C++ can give an improvement of up to 50 times.

Algorithm

SET T number of pairs to writ in one DB transaction
LOOP N over all records
   IF N has 2 or more identical values
      LOOP M over records N+1 to last
          LOOP C over columns
              LOOP D over cols C+1 to last
                 IF N[C] == N[D] == M[C] == M[D]
                     SAVE M,N to memory pair store
                     IF memory pair store size >= T
                         WRITE memory pair store to DB
                         CLEAR memory pair store
WRITE memory pair store to DB

Example:

Here is an implementation of these ideas in C++ that finds ~6,000,000 pairs in 100,000 records in 40 seconds on a modest laptop.

#include <string>
#include <fstream>
#include <sstream>
#include <iostream>
#include <vector>
#include <algorithm>
#include <time.h>
#include "sqlite3.h"
#include "cRunWatch.h" // https://ravenspoint.wordpress.com/2010/06/16/timing/

std::vector<std::vector<int>> vdata;

class cPairStorage
{
    std::vector<std::pair<int, int>> vPair;
    sqlite3 *db;
    char *dbErrMsg;
    int transactionCount;

public:
    cPairStorage();

    void add(int r1, int r2)
    {
        vPair.push_back(std::make_pair(r1, r2));
        if (vPair.size() > transactionCount)
            writeDB();
    }

    void writeDB();

    int count();

    std::pair<int, int> get(int index);
};

cPairStorage pairStore;

cPairStorage::cPairStorage()
: transactionCount(500)
{
    int ret = sqlite3_open("pair.db", &db);
    if (ret)
        throw std::runtime_error("failed to open db");
    ret = sqlite3_exec(db,
                       "CREATE TABLE IF NOT EXISTS pair (r1, r2);",
                       0, 0, &dbErrMsg);
    ret = sqlite3_exec(db,
                       "DELETE FROM pair;",
                       0, 0, &dbErrMsg);
    ret = sqlite3_exec(db,
                       "PRAGMA schema.synchronous = 0;",
                       0, 0, &dbErrMsg);
}

void cPairStorage::writeDB()
{
    //raven::set::cRunWatch aWatcher("writeDB");

    sqlite3_stmt *stmt;
    int ret = sqlite3_prepare_v2(
        db,
        "INSERT INTO pair VALUES ( ?1, ?2 );",
        -1, &stmt, 0);

    ret = sqlite3_exec(
        db,
        "BEGIN TRANSACTION;",
        0, 0, &dbErrMsg);

    for (auto &p : vPair)
    {
        ret = sqlite3_bind_int(stmt, 1, p.first);
        ret = sqlite3_bind_int(stmt, 2, p.second);
        ret = sqlite3_step(stmt);
        ret = sqlite3_reset(stmt);
    }

    ret = sqlite3_exec(
        db,
        "END TRANSACTION;",
        0, 0, &dbErrMsg);

    //std::cout << "stored " << vPair.size() << "\n";

    vPair.clear();
}

int cPairStorage::count()
{
    int ret;

    sqlite3_stmt *stmt;
    ret = sqlite3_prepare_v2(
        db,
        "SELECT count(*) FROM pair;",
        -1, &stmt, 0);
    ret = sqlite3_step(stmt);

    int count = sqlite3_column_int(stmt, 0);
    ret = sqlite3_reset(stmt);
    return count;
}

std::pair<int, int> cPairStorage::get(int index)
{
    if (0 > index || index >= count())
        throw std::runtime_error("bad pair index");

    std::pair<int, int> pair;
    int ret;
    sqlite3_stmt *stmt;
    ret = sqlite3_prepare_v2(
        db,
        "SELECT * FROM pair WHERE rowid = ?1;",
        -1, &stmt, 0);
    ret = sqlite3_bind_int(stmt, 1, index);
    ret = sqlite3_step(stmt);
    pair.first = sqlite3_column_int(stmt, 0);
    pair.second = sqlite3_column_int(stmt, 1);
    ret = sqlite3_reset(stmt);
    return pair;
}

void generateRandom(
    int colCount,
    int rowCount,
    int maxValue)
{
    srand(time(NULL));
    for (int krow = 0; krow < rowCount; krow++)
    {
        std::vector<int> vrow;
        for (int kcol = 0; kcol < colCount; kcol++)
            vrow.push_back(rand() % maxValue + 1);
        vdata.push_back(vrow);
    }
}

bool isPair(int r1, int r2)
{
    auto &v1 = vdata[r1];
    auto &v2 = vdata[r2];
    for (int kc1 = 0; kc1 < v1.size(); kc1++)
    {
        for (int kc2 = kc1 + 1; kc2 < v1.size(); kc2++)
        {
            int tv = v1[kc1];
            if (tv != v1[kc2])
                continue;
            if (tv != v2[kc1])
                continue;
            if (tv != v2[kc2])
                continue;
            return true;
        }
    }
    return false;
}
void findPairs()
{
raven::set::cRunWatch aWatcher("findPairs");

int colCount = vdata[0].size();

for (int kr1 = 0; kr1 < vdata.size(); kr1++)
{
    bool pairPossible = false;
    for (int kc1 = 0; kc1 < colCount; kc1++) {
        for (int kc2 = kc1 + 1; kc2 < colCount; kc2++) {
            if (vdata[kr1][kc1] == vdata[kr1][kc2])
            {
                // row has two cols with equal values
                // so it can be part of a row pair
                pairPossible = true;
                break;
            }
        }
        if (!pairPossible)
            break;
    }
    if (!pairPossible)
        continue;
    for (int kr2 = kr1 + 1; kr2 < vdata.size(); kr2++)
        if (isPair(kr1, kr2))
            pairStore.add(kr1, kr2);
}

pairStore.writeDB();
}

void display()
{
    std::cout << "\nFound " << pairStore.count() << " pairs in " << vdata.size() << " records\n\n";
    std::cout << "First 2 pairs found:\n\n";
    for (int kp = 0; kp < 2; kp++)
    {
        auto p = pairStore.get(kp+1);
        for (int v : vdata[p.first])
            std::cout << v << " ";
        std::cout << "\n";
        for (int v : vdata[p.second])
            std::cout << v << " ";
        std::cout << "\n\n";
    }

    raven::set::cRunWatch::Report();
}

main(int ac, char *argc[])
{
    int rowCount = 10;
    if (ac == 2)
        rowCount = atoi(argc[1]);

    raven::set::cRunWatch::Start();

    generateRandom(
        5,        // columns
        rowCount, // rows
        20);      // max value

    findPairs();

    display();

    return 0;
}

Output from a test run

>matcher --rows 100000 --trans 10000 --seed 571

unit tests passed

Found 6238872 pairs in 100000 records

First 2 pairs found:

4 4 13 18 18
4 4 1 10 7

4 4 13 18 18
4 4 11 3 1

raven::set::cRunWatch code timing profile
Calls           Mean (secs)     Total           Scope
1               40.3924         40.3924         findPairs

Complete application with documentation in github repo https://github.com/JamesBremner/RecordMatcher

Multithreading

It is straightforward to split the data to be searched into two parts and search each part in its own thread. As often happens with multithreading applications the performance results at first are disappointing. However, by tuning the configuration parameters, I have achieved what seems like a worthwhile improvement.

Finds ~6,000,000 pairs in 100,000 records in 30 seconds on a modest laptop.

>matcher --rows 100000 --trans 10000 --seed 571 --multi

unit tests passed

Found 6238872 pairs in 100000 records

First 2 pairs found:

4 4 13 18 18
4 4 1 10 7

4 4 13 18 18
4 4 11 3 1

raven::set::cRunWatch code timing profile
Calls           Mean (secs)     Total           Scope
1               29.6909         29.6909         findPairs
2
Alex_Y On

A little improved my solution by using joblib parallelisation with their new feature - return_as="generator":

    def get_matching_pairs(df_grouped: pd.DataFrame) -> List:
        thub_ids = df_grouped.index.values
        return list(combinations(thub_ids, 2))


    graph = nk.Graph(num_nodes, directed=False, weighted=False)

    indices = np.triu_indices(len(column_names), k=1)
    unique_pairs = np.column_stack((column_names[indices[0]], column_names[indices[1]]))

    for col1, col2 in unique_pairs:
        duplicated_rows = df[[col1, col2, 'th_tr_id']].dropna().set_index('th_tr_id')
        duplicated_rows = duplicated_rows[duplicated_rows.duplicated(subset=[col1, col2], keep=False)]

        duplicated_groups = sorted(duplicated_rows.groupby([col1, col2]), key=lambda x: len(x[1]))
        for matching_pairs_list in Parallel(n_jobs=-2, verbose=1, return_as="generator")(
            delayed(get_matching_pairs)(group) for name, group in duplicated_groups):
            for u, v in matching_pairs_list:
                graph.addEdge(u, v)

Works fine for large datsets, output graph contains >7mln nodes and about 10 billion edges. Processing time moderate.

But anyway, @ravenspoint solution seems to be the best, although a little hardcore for me (implementation in C++).