Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] One-to-one clustering #2562

Open
aymonwuolanne opened this issue Dec 16, 2024 · 11 comments
Open

[FEAT] One-to-one clustering #2562

aymonwuolanne opened this issue Dec 16, 2024 · 11 comments
Labels
enhancement New feature or request

Comments

@aymonwuolanne
Copy link
Contributor

Is there any interest in a one-to-one clustering implementation to be included as an alternative to connected components clustering?

When you have two datasets, you can do something like this:

linker.misc.query_sql(
    f"""
    with ranked as (
        select
            unique_id_l,
            unique_id_r,
            source_dataset_l,
            source_dataset_r,
            match_probability,
            rank() over (partition by unique_id_l order by match_probability desc) as rank
        from {df_predict.physical_name}
    )
    select *
    from ranked
    where rank = 1
    """
)

but it can get much more complicated when you start thinking about it, for example:

  • we might want the rank condition to be symmetric (where you don't allow duplicates to form on either dataset)
  • there can be ties in match probability
  • there might be more than 2 datasets

A proposed method

I have a method in mind for doing this, it's not pull request ready but I have some code snippets I could share too. The basic idea is that you run through some iterations where you guarantee no duplicates can form at each iteration.

The representatives table at iteration i must track several things:

  • node_id: the id corresponding (uniquely, across all source datasets) to that record.
  • source_dataset the source dataset corresponding to the node_id. Unlike connected components, this can't be dropped since it's needed to determine whether links should be accepted at each iteration.
  • representative: a representative id for the cluster this record belongs to.

Within an iteration we calculate some extra columns for the current representatives table:

  • contains_{sd} a boolean value for each source_dataset name sd, indicating whether the cluster contains a record from sd. For example, with datasets A, B, and C, we'd have columns contains_A, contains_B, contains_C.

Then we determine what the neighbours at this iteration will be. For each node_id in the neighbours table, we rank all of the incoming and outgoing edges by their match probability (where the edges aren't between nodes in the same cluster). We take the edges with rank_l and rank_r equal to 1, and then we filter out any edges that would introduce a duplicate, by checking the contains_{sd} flags on each side of the edge.
Ties could be dealt with at this step, using something arbitrary like row_number() instead of rank() or by detecting then dropping tied links.

Finally, the representatives table is updated in the same way as it is in the connected components iterations (join the previous representatives table with the neighbours.

Iterations finish when the clusters stabilise, i.e. when the count of nodes where rep_match = true reaches 0. I think the way this is set it should converge in at most k-1 iterations where k is the number of datasets.

@aymonwuolanne aymonwuolanne added the enhancement New feature or request label Dec 16, 2024
@RobinL
Copy link
Member

RobinL commented Dec 17, 2024

See also #251

Yes - definitely interested in adding support for this.

Another aspect is to do with the 'venn diagram' of the two datasets. Are they known to contain exactly the same records, is one a subset of the other, or can either side contain records not in the other. Perhaps the later is the most generally useful, because it can be used on all the other scenarios, just with some possible loss of accuracy.

If you've already got some snippets, it'd be great to see a proposed/draft solution just a a script so we can get our heads around how it works.

@aymonwuolanne
Copy link
Contributor Author

This is actually getting closer to a full implementation now, hopefully it makes sense. I read this review of bipartite graph matching algorithms and what I've got here is (I think) equivalent to the Exact Clustering method they describe (extended to multiple datasets).

One variation on this implementation would be to filter out the edges that would introduce a duplicate, and THEN rank them and select the mutual best links. This would accept more links, but I like the simplicity of only accepting the best links and not moving on to the next best ones if the best option is already "taken". I think this corresponds to the Unique Mapping Clustering in the paper I mentioned.

The comments from @zmbc about unique and exhaustive make a lot of sense. I think the 'unique' part of this could be captured well by an implementation like this one, by setting the list of source_datasets to the ones which should contain at most one record. I don't know how one would achieve the 'exhaustive' part, especially if there's a threshold set on the pairwise match probabilities, and there's even the chance that some records don't have any matching pairs that satisfy the blocking rules.

Aside: one technical point I was stuck on was quoting the literal value of the source dataset name to be used in a query. I used single quotes below, but I think that might only work with DuckDB! See this line:

contains_expr = ", ".join([f"max(source_dataset == '{sd}') as contains_{sd}" for sd in source_datasets])

from __future__ import annotations

import logging
import time
from typing import Optional

from splink.internals.database_api import DatabaseAPISubClass
from splink.internals.pipeline import CTEPipeline
from splink.internals.splink_dataframe import SplinkDataFrame

logger = logging.getLogger(__name__)

def one_to_one_clustering(
    nodes_table: SplinkDataFrame,
    edges_table: SplinkDataFrame,
    node_id_column_name: str,
    source_dataset_column_name: str,
    edge_id_column_name_left: str,
    edge_id_column_name_right: str,
    source_datasets: list(str),
    db_api: DatabaseAPISubClass,
    threshold_match_probability: Optional[float],
) -> SplinkDataFrame:
    """One to one clustering algorithm.

    This function clusters together records so that at most one record from each dataset is in each cluster.

    Args:
        
    Returns:
        SplinkDataFrame: A dataframe containing the connected components list
        for your link or dedupe job.

    """

    pipeline = CTEPipeline([edges_table])

    match_prob_expr = f"where match_probability >= {threshold_match_probability}"
    if threshold_match_probability is None:
        match_prob_expr = ""

    # Add 'reverse-edges' so that the algorithm can rank all incoming and outgoing edges
    sql = f"""
    select
        {edge_id_column_name_left} as node_id,
        {edge_id_column_name_right} as neighbour,
        match_probability
    from {edges_table.templated_name}
    {match_prob_expr}

    UNION ALL

    select
    {edge_id_column_name_right} as node_id,
    {edge_id_column_name_left} as neighbour,
    match_probability
    from {edges_table.templated_name}
    {match_prob_expr}
    """
    pipeline.enqueue_sql(sql, "__splink__df_neighbours")

    neighbours = db_api.sql_pipeline_to_splink_dataframe(pipeline)

    pipeline = CTEPipeline([nodes_table])

    sql = f"""
    select
        {node_id_column_name} as node_id,
        {node_id_column_name} as representative,
        {source_dataset_column_name} as source_dataset
    from {nodes_table.templated_name}
    """

    pipeline.enqueue_sql(sql, "__splink__df_representatives")

    prev_representatives = db_api.sql_pipeline_to_splink_dataframe(pipeline)

    iteration, needs_updating_count = 0, 1
    while needs_updating_count > 0:
        start_time = time.time()
        iteration += 1

        pipeline = CTEPipeline([neighbours, prev_representatives])

        # might need to quote the value here? 
        contains_expr = ", ".join([f"max(source_dataset == '{sd}') as contains_{sd}" for sd in source_datasets])
        
        sql = f"""
        select
            representative,
            {contains_expr}
        from {prev_representatives.physical_name}
        group by representative
        """

        pipeline.enqueue_sql(sql, f"__splink__representative_contains_flags_{iteration}")

        sql = f"""
        select
            r.node_id,
            r.source_dataset,
            cf.*
        from {prev_representatives.physical_name} as r 
        inner join __splink__representative_contains_flags_{iteration} as cf
        on r.representative = cf.representative
        """

        pipeline.enqueue_sql(sql, f"__splink__df_representatives_with_flags_{iteration}")

        duplicate_criteria = " or ".join([f"(l.contains_{sd} and r.contains_{sd})" for sd in source_datasets])
    
        # must be calculated every iteration since the where condition changes as the clustering progresses
        sql = f"""
        select 
            neighbours.node_id,
            neighbours.neighbour,
            {duplicate_criteria} as duplicate_criteria,
            row_number() over (partition by l.representative order by match_probability desc) as rank_l,
            row_number() over (partition by r.representative order by match_probability desc) as rank_r,
        from {neighbours.physical_name} as neighbours
        inner join __splink__df_representatives_with_flags_{iteration} as l
        on neighbours.node_id = l.node_id
        inner join __splink__df_representatives_with_flags_{iteration} as r 
        on neighbours.neighbour = r.node_id
        where l.representative <> r.representative
        """
        
        # note for the future: a strategy to handle ties would go right here. 
        
        pipeline.enqueue_sql(sql, f"__splink__df_ranked_{iteration}")
    
        sql = f"""
        select
            node_id,
            neighbour
        from __splink__df_ranked_{iteration} 
        where rank_l = 1 and rank_r = 1 and not duplicate_criteria
        """

        pipeline.enqueue_sql(sql, f"__splink__df_neighbours_{iteration}")
    
        sql = f"""
        select
        source.node_id, 
        min(source.representative) as representative
        from
        (
            select
                neighbours.node_id,
                repr_neighbour.representative as representative,
            from __splink__df_neighbours_{iteration} as neighbours
            left join {prev_representatives.physical_name} as repr_neighbour
            on neighbours.neighbour = repr_neighbour.node_id
            
            union all
            
            select
                node_id,
                representative
            from {prev_representatives.physical_name}
        ) AS source
        group by source.node_id
        """

        pipeline.enqueue_sql(sql, f"r")
    
        sql = f"""
        select
            r.node_id,
            r.representative,
            repr.source_dataset,
            r.representative <> repr.representative as needs_updating
        from r
        inner join {prev_representatives.physical_name} as repr
        on r.node_id = repr.node_id
        """

        pipeline.enqueue_sql(sql, f"__splink__df_representatives_{iteration}")

        representatives = db_api.sql_pipeline_to_splink_dataframe(pipeline)

        prev_representatives.drop_table_from_database_and_remove_from_cache()
        prev_representatives = representatives

        pipeline = CTEPipeline()

        # assess if the exit condition has been met
        sql = f"""
        select 
            count(*) as count_of_nodes_needing_updating
        from {representatives.physical_name}
        where needs_updating
        """

        pipeline.enqueue_sql(sql, "__splink__df_root_rows")

        root_rows_df = db_api.sql_pipeline_to_splink_dataframe(
            pipeline, use_cache=False
        )

        root_rows = root_rows_df.as_record_dict()
        root_rows_df.drop_table_from_database_and_remove_from_cache()
        needs_updating_count = root_rows[0]["count_of_nodes_needing_updating"]
        logger.info(
            f"Completed iteration {iteration}, "
            f"num representatives needing updating: {needs_updating_count}"
        )
        end_time = time.time()
        logger.log(15, f"    Iteration time: {end_time - start_time} seconds")

    return representatives


# now test out with an example

import duckdb
import pandas as pd

from splink import DuckDBAPI, Linker

logger.setLevel(10)

nodes = pd.DataFrame({
    "unique_id":      [  0,  1,   2,   3,   4,   5,   6,   7,   8],
    "source_dataset": ['a', 'b', 'c', 'a', 'b', 'c', 'a', 'b', 'c'],
})

edges = pd.DataFrame({
    "unique_id_l":       [  0,   1,   3,   4,   6,   6],
    "unique_id_r":       [  1,   2,   5,   5,   5,   7],
    "match_probability": [.90, .70, .85, .90, .80, .70],
})

db_api = DuckDBAPI()
df_nodes = db_api.register_table(nodes, "nodes")
df_edges = db_api.register_table(edges, "edges")

df_clusters = one_to_one_clustering(
    nodes_table=df_nodes,
    edges_table=df_edges,
    node_id_column_name="unique_id",
    source_dataset_column_name="source_dataset",
    edge_id_column_name_left="unique_id_l",
    edge_id_column_name_right="unique_id_r",
    source_datasets=['a', 'b', 'c'],
    db_api=db_api,
    threshold_match_probability=None,
)

df_clusters.as_pandas_dataframe()

@zmbc
Copy link
Contributor

zmbc commented Dec 18, 2024

@aymonwuolanne I agree that this is "exact clustering" extended to multiple datasets, and I think this is worth including! Out of the methods described in that paper, this one seems like the most amenable to efficient implementation in SQL.

@zmbc
Copy link
Contributor

zmbc commented Dec 18, 2024

Actually, now that I look at the code a bit closer, I'm a bit confused about rank_l and rank_r -- those are ranks regardless of source dataset, right? IIRC there are some assumptions you can make in Splink about which source dataset will be on the left and right, but if you had datasets A, B, and C, wouldn't this prevent A1 <--> B1 and A1 <--> C1 from both linking? Might not matter if B1 <--> C1 links, but it might not (maybe there is more missingness in B1 and C1).

@aymonwuolanne
Copy link
Contributor Author

@zmbc it does rank them regardless of source dataset. If you have A1 <--> B1 with a higher weight than A1 <--> C1, then after the first iteration A1 and B1 will have the same representative, and the ranking is over links where the two records have different representatives, so A1 <--> B1 is excluded from the ranking in the second iteration, leaving A1 <--> C1 free to link.

If you partition by source dataset too, you can get situations like A1 <--> B1, B1 <--> C1, and A2 <--> C1 all being accepted, creating an implicit duplicate between A1 and A2.

@zmbc
Copy link
Contributor

zmbc commented Dec 18, 2024

Ah! This is subtle and tricky to think through. Certainly if the datasets were totally scrambled between left and right, you could get A1 <--> B1, B1 <--> A2 (where the left and right there reflect the left and right tables). I think the key assumption here is that there is a valid ordering of the datasets D1, D2, D3, etc such that if DN is compared with DM, DN is on the left if and only if N < M.

A record in DN could only be linked to another record in DN if there were a loop of links back to DN. If our record if interest is linked to a record in DM, where M > N, that record in DM can only link again in the same iteration if it is on the left, which means it must link with a dataset DX where X > M. Iterating this can never lead back to DN.

@aymonwuolanne
Copy link
Contributor Author

It is tricky! I had to draw quite a few diagrams to think through different examples.

I can see the logic of what you're saying, but even with an ordering of the datasets it doesn't work to partition by representative and source_dataset. Examples like the one I gave before (with the ordering A < B < C) involve a kind of "leapfrogging", i.e. the links A1 -> B1, B1 -> C1, and A2 -> C1 all respect the ordering you described, but you'd end up with all four records in the same cluster eventually.

The approach I used in the code was to ensure that there are no chains of links at all within the one iteration. For two records A1 and B1 that link, you can be sure that A1 links to nothing else and B1 links to nothing else. Then it's just a matter of checking that merging the cluster that A1 belongs to and the cluster that B1 belongs to doesn't implicitly form a duplicate in one of the source datasets.

@zmbc
Copy link
Contributor

zmbc commented Dec 19, 2024

Sorry, let me clarify. I'm saying that it is necessary both to partition by representative only (not dataset) and to have an ordering that determines left and right records. If you don't have that second assumption, your rank_r and rank_l can't catch everything (e.g. A1 <--> B1, B1 <--> A2 as I mentioned above). I just wanted to state that assumption explicitly (and why it is sufficient). Do you know if the ordering is guaranteed by Splink, and is it documented?

@zmbc
Copy link
Contributor

zmbc commented Dec 19, 2024 via email

@aymonwuolanne
Copy link
Contributor Author

Oh I see, sorry! The code I've got adds in the reverse edges too, so in the case of A1 <--> B1 and B1 <--> A2 the edges B1 <--> A1 and A2 <--> B1 are also included when the ranking happens, so you only get one of those links accepted. In the example of A1 -> B1 and B1 -> C1 you'd also only be able to accept one of the two, since one of them will have rank 2 with respect to B1.

Just to lay it out very explicitly, this is what the ranked table might look like:

id_l id_r probability rank_l rank_r
A1 B1 0.90 1 1
B1 A1 0.90 1 1
B1 C1 0.80 2 1
C1 B1 0.80 1 2

I think it's better to not rely on the ordering of the datasets, though I believe it is guaranteed with the way the blocking code works.

@zmbc
Copy link
Contributor

zmbc commented Dec 19, 2024 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants