From 548238f0dbce2a2cb9725df98b0aab8562928677 Mon Sep 17 00:00:00 2001 From: "J. R. Williams" Date: Mon, 4 Mar 2024 11:50:44 -0500 Subject: [PATCH] Emlink refactor (#2) * major update. numeric completely working, priors added for expectation maximization * removing temp files * addendum. removing unnecessary temp file from emacs... again --- .gitignore | 4 +- Project.toml | 6 +- README.md | 3 + scratch.jl | 102 ++------- src/DiBitMatrix.jl | 75 +++++++ src/FastLink.jl | 11 +- src/emlink.jl | 77 +++++-- src/fastlink/fastlink.jl | 271 ++++++++++------------- src/gammas/Gammas.jl | 6 +- src/gammas/gammaCKfuzzy.jl | 181 +++++++-------- src/gammas/gammaCKpar.jl | 153 ++++++------- src/gammas/gammaKpar.jl | 68 +++--- src/gammas/gammaNUMCKpar.jl | 427 +++++++++++++++--------------------- src/getMatches.jl | 16 +- src/matchPatterns.jl | 107 +++++++++ src/matchesLink.jl | 0 src/resultMatrix.jl | 31 --- src/tableCounts.jl | 110 ---------- test/runtests.jl | 27 +-- 19 files changed, 772 insertions(+), 903 deletions(-) create mode 100644 src/DiBitMatrix.jl create mode 100644 src/matchPatterns.jl delete mode 100755 src/matchesLink.jl delete mode 100755 src/resultMatrix.jl delete mode 100755 src/tableCounts.jl diff --git a/.gitignore b/.gitignore index 200a369..7fa7bf7 100644 --- a/.gitignore +++ b/.gitignore @@ -23,5 +23,7 @@ docs/site/ # environment. Manifest.toml - +actual_scratch.jl +scratch.jl +.#* \#* diff --git a/Project.toml b/Project.toml index 95754ee..79244d1 100644 --- a/Project.toml +++ b/Project.toml @@ -1,10 +1,11 @@ name = "FastLink" uuid = "11f39cfd-5548-489f-be9a-f4ad0ff6eadc" authors = ["Jack R. Williams "] -version = "0.1.1" +version = "0.0.2" [deps] DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" +DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8" Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f" PooledArrays = "2dfb63ee-cc39-5dd5-95bd-886bf059d720" StringDistances = "88034a9c-02f8-509d-84a9-84ec65e18404" @@ -12,8 +13,9 @@ StringDistances = "88034a9c-02f8-509d-84a9-84ec65e18404" [extras] BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf" CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b" -Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f" +Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" + [targets] test = ["Test", "CSV", "Pkg"] diff --git a/README.md b/README.md index 20b7326..8324da0 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,7 @@ # FastLink.jl Fast Probabilistic Record Linkage for the Julia Language +## What is FastLink.jl + +The purpose of FastLink.jl is to bring a fast record linkage package to the julia language. When attempting to match large datasets using existing libraries in R and Python, I found they can be very slow and succumb to issues with memory pressure. This implementation of the fastlink algorithm is intended to scale effeciently in parallel and be able to easily handle matches between tabular data that span millions of rows. [![Run tests](https://github.com/jw2249a/FastLink.jl/actions/workflows/test.yml/badge.svg)](https://github.com/jw2249a/FastLink.jl/actions/workflows/test.yml) diff --git a/scratch.jl b/scratch.jl index 89716e4..44a0b76 100755 --- a/scratch.jl +++ b/scratch.jl @@ -1,108 +1,48 @@ using Pkg -Pkg.develop(path=".") +#Pkg.develop(path=".") using DataFrames using BenchmarkTools using CSV using FastLink using PooledArrays +import Pkg.Artifacts: @artifact_str -numeric=false -# files for performance -test=true -if test - a_fil="../dfA.csv" - b_fil="../dfB.csv" - if numeric - varnames=["housenum"] - match_method=["float"] - cut_a=[1] - cut_p=[2] - else - varnames=["firstname","middlename", "lastname","housenum"] - match_method=["string", "string","string", "float"] - cut_a=[0.92,0.92,0.92,1] - cut_p=[0.88,0.88,0.88,2] - end -else - a_fil="../../rstudio/test_merge/data/test_a.csv" - b_fil="../../rstudio/test_merge/data/test_b.csv" - if numeric - varnames=["ZIP", "DOB_YEAR", "ZIP4"] - match_method=["float", "float", "float"] - cut_a=[1,1,1] - cut_p=[2,2,2] - else - varnames=["FIRST_NAME", "MIDDLE_NAME", "LAST_NAME", "STREET_NAME"] - cut_a=[0.92,0.92,0.92,0.92] - cut_p=[0.88,0.88,0.88,0.88] - #varnames=["FIRST_NAME", "MIDDLE_NAME", "LAST_NAME", "STREET_NAME", "STATE"] - end -end +a_fil = @artifact_str "dfA" +b_fil = @artifact_str "dfB" +varnames=["firstname","middlename", "lastname","housenum"] +match_method=["string", "string","string", "float"] +cut_a=[0.92,0.92,0.92,1] +cut_p=[0.88,0.88,0.88,2] -#[100,200,500,1_000,2_000,4_000, 5_000, 10_000,20_000, 40_000, 50_000,100_000,1_000_000] -N1=10_000 -N2=500_000 -if test - dfA=CSV.read(a_fil, DataFrame, - ntasks=1, - pool=true, - missingstring=["", "NA"]) - dfB=CSV.read(b_fil, DataFrame, - ntasks=1, - pool=true, - missingstring=["", "NA"]) -else - dfA=CSV.read(a_fil, DataFrame, - limit=N1, - ignoreemptyrows=true, - ntasks=1, - pool=true, - missingstring=["", "NA", "NaN", "NULL", "Null"]) - dfB=CSV.read(b_fil, DataFrame, - limit=N2, - ignoreemptyrows=true, - ntasks=1, - pool=true, - missingstring=["", "NA", "NaN", "NULL", "Null"]) -end +dfA=CSV.read("$(a_fil)/dfA.csv", DataFrame, + ntasks=1, + pool=true, + missingstring=["", "NA"]) +dfB=CSV.read("$(b_fil)/dfB.csv", DataFrame, + ntasks=1, + pool=true, + missingstring=["", "NA"]) -if !test && numeric - for var in varnames - dfA[!,var]=passmissing(x-> try return parse(Float64,x) catch e return 0.0 end).(dfA[:,var]) - dfB[!,var]=passmissing(x-> try return parse(Float64,x) catch e return 0.0 end).(dfB[:,var]) - end +for var in varnames[1:3] + dfA[!,var] = PooledArray(passmissing(x->uppercase(x)).(dfA[:,var])) + dfB[!,var] = PooledArray(passmissing(x->uppercase(x)).(dfB[:,var])) end -# if test && !numeric -# for var in varnames -# dfA[!,var] = PooledArray(passmissing(x->uppercase(x)).(dfA[:,var])) -# dfB[!,var] = PooledArray(passmissing(x->uppercase(x)).(dfB[:,var])) -# end -# end - - config = fastLink(dfA,dfB,varnames,match_method=match_method,cut_a=cut_a,cut_p=cut_p, threshold_match = 0.85) - - dump(config.fastlink_settings.comparison_funs[4]) results=fastLink(dfA,dfB,varnames,match_method=match_method,cut_a=cut_a,cut_p=cut_p, - - - threshold_match = 0.85)() - - - x=results[1].patterns_w x[findall(ismissing.(x.gamma_4) .== false .&& x.gamma_4 .== 1),:] x[findall(ismissing.(x.gamma_4)),:] -44+7+1+43+79+1 + + diff --git a/src/DiBitMatrix.jl b/src/DiBitMatrix.jl new file mode 100644 index 0000000..e38d5f5 --- /dev/null +++ b/src/DiBitMatrix.jl @@ -0,0 +1,75 @@ +module DiBitMat +import Base: getindex, setindex!, view +import DataStructures: DiBitVector +export DiBitMatrix + +""" +Extending DiBitVectors from DataStructures.jl to include matrices. +""" +struct DiBitMatrix + data::DiBitVector + nrows::Integer + ncols::Integer +end + +# base definition of the DiBitMatrix +function DiBitMatrix(nrows::Integer, ncols::Integer) + data = DiBitVector(nrows * ncols, 0) # Or choose an appropriate type + return DiBitMatrix(data, nrows, ncols) +end + +# getting items by index +function getindex(vm::DiBitMatrix, i::Int, j::Int) + linear_index = (j - 1) * vm.nrows + i + return vm.data[linear_index] +end + +function getindex(vm::DiBitMatrix, ::Colon, j::Int) + column = zeros(UInt8, vm.nrows) + for i in 1:vm.nrows + linear_index = (j - 1) * vm.nrows + i + column[i] = vm.data[linear_index] + end + return column +end + +function getindex(vm::DiBitMatrix, i::Int, ::Colon) + row = zeros(UInt8, vm.ncols) + for j in 1:vm.ncols + linear_index = (j - 1) * vm.nrows + i + row[j] = vm.data[linear_index] + end + return row +end + +# setting items by index +function setindex!(vm::DiBitMatrix, value::UInt8, i::T, j::T) where {T<:Integer} + linear_index = (j - 1) * vm.nrows + i + vm.data[linear_index] = value +end + +# extending view to handle DiBitMatrix columns +function getIndices(vm::DiBitMatrix,::Colon,j::Int) + return (j - 1) * vm.nrows + 1, (j - 1) * vm.nrows + vm.nrows +end + +function getIndices(vm::DiBitMatrix, i::Int, ::Colon) + row = zeros(Integer, vm.ncols) + for j in 1:vm.ncols + row[j] = (j - 1) * vm.nrows + i + end + return row +end + +function view(vm::DiBitMatrix,::Colon, j::Int) + start,finish=getIndices(vm,:,j) + return view(vm.data, start:finish) +end + +function view(vm::DiBitMatrix, i::Int,::Colon) + vals=getIndices(vm, i,:) + return view(vm.data, vals) +end + + +end diff --git a/src/FastLink.jl b/src/FastLink.jl index 0659676..dafdded 100644 --- a/src/FastLink.jl +++ b/src/FastLink.jl @@ -3,17 +3,22 @@ using DataFrames import PooledArrays: PooledVector import Distributions: Dirichlet,rand +# match constants +const nonmatch::UInt8 = UInt8(0) +const match1::UInt8 = UInt8(1) +const match2::UInt8 = UInt8(2) +const missingval::UInt8 = UInt8(3) -include("resultMatrix.jl") +include("DiBitMatrix.jl") +using .DiBitMat include("gammas/Gammas.jl") using .Gammas -include("tableCounts.jl") +include("matchPatterns.jl") include("emlink.jl") include("getMatches.jl") include("fastlink/fastlink.jl") -export(tableCounts) export(fastLink) diff --git a/src/emlink.jl b/src/emlink.jl index bcf5ffe..fd2b055 100644 --- a/src/emlink.jl +++ b/src/emlink.jl @@ -24,7 +24,9 @@ end """ Expectation maximization function. """ -function emlinkMARmov(patterns::Dict, obs_a::Int,obs_b::Int,varnames::Vector{String}, ranges::Vector{UnitRange{Int64}}; p_m=0.1,iter_max=5000,tol=Float64(1e-05),missingval = [false,true]) +function emlinkMARmov(patterns::MatchPatterns, obs_a::Int, obs_b::Int,varnames::Vector{String}; + p_m=0.1,iter_max=5000,tol=1e-05, prior_lambda=0.0, w_lambda=0.0, + prior_pi=0.0,w_pi=0.0, address_field=Vector{Bool}()) # Initialize count and delta for while loop and break point delta = Float64(1) count = 1 @@ -32,25 +34,52 @@ function emlinkMARmov(patterns::Dict, obs_a::Int,obs_b::Int,varnames::Vector{Str # Info for EM algorithm p_u = 1 - p_m nfeatures=length(varnames) - gamma_jk=collect(keys(patterns)) - n_j = collect(values(patterns)) + gamma_jk=patterns.patterns + n_j = length.(patterns.indices) N = length(n_j) - # TODO: add "if statement" λ priors are declared - psi = 1 - mu = 1 - - ########################################### - # # TODO: add "if statement" for π priors # - # ## for address # - # ⍺₀_address = 1 # - # ⍺₁_address = 1 # - # address_field = falses(nfeatures) # - # ## for lambda # - # ⍺₀_gender = 1 # - # ⍺₁_gender = 1 # - # genderaddress_field = falses(nfeatures) # - ########################################### + # if λ priors are declared + if prior_lambda == 0 + psi = 1 + mu = 1 + else + if w_lambda == 0 + @error "If declaring a lambda prior, you need to declare weights via w_lambda." + elseif w_lambda > 0 | w_lambda < 0 + @error "w_lambda must be between 0 and 1." + elseif w_lambda == 1 + w_lambda = 1 - 1e-05 + end + c_lambda = w_lambda/(1-w_lambda) + # hyperparameters for lambda + mu = prior_lambda * c_lambda * obs_a * obs_b + 1 + psi = (1 - prior_lambda) * mu / prior_lambda + end + + # if pi prior is declared + if prior_pi == 0 + alpha0_address = 1 + alpha1_address = 1 + address_field = falses(nfeatures) + else + if prior_lambda == 0 + @error "If declaring a prior on pi, you need to declare lambda prior." + elseif w_pi == 0 + @error "If providing a prior for pi, please specify the weight using w_pi" + elseif w_pi < 0 | w_pi > 1 + @error "w_pi must be between 0 and 1." + elseif w_pi == 1 + w_pi = 1 - 1e-05 + end + + c_pi = w_pi / (1 - w_pi) + exp_match = prior_lambda * obs_a * obs_b + + # Optimal hyperparameters for pi + alpha0_address = c_pi * prior_pi * exp_match + 1 + alpha1_address = alpha0_address * (1 - prior_pi) / prior_pi + end + # initialize variables that need value to be returned zeta_j=0.0 num_prod = zeros(Float64,0) @@ -65,8 +94,7 @@ function emlinkMARmov(patterns::Dict, obs_a::Int,obs_b::Int,varnames::Vector{Str p_gamma_kjm = missings(Union{Missing,Float64}, (nfeatures,N)) p_gamma_kju = missings(Union{Missing,Float64}, (nfeatures,N)) for c in 1:nfeatures - col=ranges[c] - vals_gamma_jk[c] = [i[col] == missingval ? missing : sum(i[col]) for i in gamma_jk] + vals_gamma_jk[c] = [i[c] == missingval ? missing : sum(i[c]) for i in gamma_jk] uvals_gamma_jk[c] = sort(unique([i for i in vals_gamma_jk[c] if !ismissing(i)])) c_m = collect(1:50:(length(uvals_gamma_jk[c])*50)) p_gamma_km[c] = sort(rand(Dirichlet(c_m),1)[:],rev=false) @@ -89,9 +117,13 @@ function emlinkMARmov(patterns::Dict, obs_a::Int,obs_b::Int,varnames::Vector{Str p_u = 1-p_m for i in 1:nfeatures + km_prob=sort([sum(num_prod[findall(skipmissing_equality(vals_gamma_jk[i], uvals_gamma_jk[i][j]))]) + for j in 1:length(uvals_gamma_jk[i])],rev=false) + if address_field[i] + km_prob += append!([alpha0_address], [alpha1_address for i in 1:(length(uvals_gamma_jk[i])-1)]) + end p_gamma_km[i] = - sort(probability_vector([sum(num_prod[findall(skipmissing_equality(vals_gamma_jk[i], uvals_gamma_jk[i][j]))]) - for j in 1:length(uvals_gamma_jk[i])]),rev=false) + probability_vector(km_prob) p_gamma_ku[i] = sort(probability_vector([let sub1 = sub=findall(skipmissing_equality(vals_gamma_jk[i], uvals_gamma_jk[i][j])); sum(n_j[sub] - num_prod[sub]) @@ -118,6 +150,7 @@ function emlinkMARmov(patterns::Dict, obs_a::Int,obs_b::Int,varnames::Vector{Str pgamma_jm = p_gamma_jm, pgamma_ju = p_gamma_ju, patterns_w = data_w, patterns_b = gamma_jk, + indices = patterns.indices, iter_converge = count, obs_a = obs_a, obs_b = obs_b, varnames = varnames) diff --git a/src/fastlink/fastlink.jl b/src/fastlink/fastlink.jl index 723d140..a5eada9 100755 --- a/src/fastlink/fastlink.jl +++ b/src/fastlink/fastlink.jl @@ -1,4 +1,3 @@ - function check_input_lengths(x, var_length::Int, varname::String) if typeof(x) == "string" x=[x] @@ -51,101 +50,11 @@ function check_var_types(x::DataFrame, y::DataFrame, varnames::Vector{String},ma return match_method,comparison_levels end - - -function create_comparison_function(res, - col::Int, - match_method::String, - stringdist::String, - jw_weight::Float64, - cut_a::T, - cut_p::T, - upper::Bool, - partial::Bool, - fuzzy::Bool, - comparison_level::Int) where T <: Number - match2 = [true,true] - match1 = [true,false] - missingval = [false,true] - difference_function = - - if match_method == "string" - if fuzzy - match_fun = gammaCKfuzzy!(view(res.result_matrix,:,res.ranges[col]), - res.array_2Dindex, - res.dims, - cut_a=cut_a, - cut_b=cut_p, - upper=upper, - w=jw_weight, - partial=partial, - match2=reshape(match2,(1,2)), - match1=reshape(match1,(1,2)), - missingval=reshape(missingval,(1,2))) - else - match_fun = gammaCKpar!(view(res.result_matrix,:,res.ranges[col]), - res.array_2Dindex, - res.dims, - distmethod=stringdist, - cut_a=cut_a, - cut_b=cut_p, - partial=partial, - w=jw_weight, - match2=match2, - match1=match1, - missingval=missingval) - end - elseif match_method == "exact" || match_method == "bool" - if comparison_level == 1 - match2 = true - end - match_fun = gammaKpar!(view(res.result_matrix,:,res.ranges[col]), - res.array_2Dindex, - res.dims, - match2,missingval) - elseif match_method == "numeric" || match_method=="float" || match_method == "int" - match_fun = gammaNUMCKpar!( - view(res.result_matrix,:,res.ranges[col]), - res.array_2Dindex, - res.dims, - cut_a=cut_a, - cut_b=cut_p, - partial=partial, - match2=match2, - match1=match1, - missingval=missingval, - comparison_function=difference_function, - match_method=match_method) - end - - return match_fun -end - - -# Constructor to check types and variable presence in varnames parameter -struct FastLinkVars - varnames::Vector{String} - types::Vector{String} - comparison_funs::Vector{Function} - function FastLinkVars(varnames::Vector{String}, - res::ResultMatrix, - vartypes::Vector{String}, - stringdist::Vector{String}, - jw_weight::Vector{Float64}, - cut_a::Vector{T}, - cut_p::Vector{T}, - upper::Vector{Bool}, - partial::Vector{Bool}, - fuzzy::Vector{Bool}, - comparison_levels::Vector{Int} - ) where T <: Number - comparison_funs=Function[] - for i in 1:length(comparison_levels) - push!(comparison_funs, - create_comparison_function(res,i,vartypes[i],stringdist[i],jw_weight[i], - cut_a[i],cut_p[i],upper[i],partial[i],fuzzy[i],comparison_levels[i] - )) +function remove_no_matched_var_indices(resultsEM) + for i in eachindex(resultsEM.patterns_b) + if (match1 in resultsEM.patterns_b[i]) ⊽ (match2 in resultsEM.patterns_b[i]) + resultsEM.indices[i] = Vector{ComparisonIndex}() end - new(varnames, vartypes, comparison_funs) end end @@ -168,6 +77,7 @@ Algorithm taken from: - `tol_em`: Convergence tolerance for the EM Algorithm. (default 1e-04) - `threshold_match`: Lower bound for the posterior probability that will act as a cutoff for matches. - `dedupe_matches`: Whether to dedupe the matches within the dataset. + # Returns - `MatchedData::DataFrame`: The resulting DataFrame after matching. @@ -176,7 +86,8 @@ Algorithm taken from: matched_data = fastLink(dfA, dfB, ["firstname", "lastname", "city"]) """ function fastLink(dfA::DataFrame, dfB::DataFrame, - varnames::Vector{String}; + varnames::Vector{String}, + idvar::Tuple{String,String}; match_method=String[], partials=[true], fuzzy=[false], @@ -193,7 +104,8 @@ function fastLink(dfA::DataFrame, dfB::DataFrame, numvars=length(varnames) obs_a=nrow(dfA) obs_b=nrow(dfB) - + dims = (obs_a,obs_b) + @info "Checking settings for $numvars declared variables." partials = check_input_lengths(partials, numvars, "partials") upper_case = check_input_lengths(upper_case, numvars, "upper_case") @@ -202,41 +114,72 @@ function fastLink(dfA::DataFrame, dfB::DataFrame, cut_a = check_input_lengths(cut_a, numvars, "cut_a") cut_p = check_input_lengths(cut_p, numvars, "cut_p") stringdist_method = check_input_lengths(stringdist_method, numvars, "stringdist_method") - vartypes, comparison_levels = check_var_types(dfA,dfB,varnames,match_method,partials) - res=ResultMatrix(comparison_levels, (obs_a,obs_b)) - - fastlink_settings=FastLinkVars(varnames,res,vartypes,stringdist_method,jw_weight,cut_a,cut_p,upper_case,partials,fuzzy,comparison_levels) - + res = [DiBitMatrix(obs_a,obs_b) for _ in varnames] + # allow missing for comparisons + allowmissing!(dfA) + allowmissing!(dfB) - return () -> begin - - # allow missing for comparisons - allowmissing!(dfA) - allowmissing!(dfB) - - - # iterate through variables and execute function over them - for i in eachindex(varnames) - @info "Now matching var $(varnames[i]) using $(match_method[i])" - fastlink_settings.comparison_funs[i](dfA[!,varnames[i]],dfB[!,varnames[i]]) + + # iterate through variables and execute function over them + for i in eachindex(varnames) + @info "Now matching var $(varnames[i]) using $(match_method[i])" + if match_method[i] == "string" + if fuzzy[i] + gammaCKfuzzy!(dfA[!,varnames[i]], + dfB[!,varnames[i]], + res[i], + dims, + cut_a=cut_a[i], + cut_b=cut_p[i], + upper=upper[i], + w=jw_weight[i], + partial=partials[i]) + else + gammaCKpar!(dfA[!,varnames[i]], + dfB[!,varnames[i]], + res[i], + dims, + distmethod=stringdist_method[i], + cut_a=cut_a[i], + cut_b=cut_p[i], + w=jw_weight[i], + partial=partials[i]) + end + elseif match_method[i] == "exact" || match_method[i] == "bool" + gammaKpar!(dfA[!,varnames[i]], + dfB[!,varnames[i]], + res[i], + dims) + elseif match_method == "numeric" || match_method=="float" || match_method == "int" + gammaNUMCKpar!(dfA[!,varnames[i]], + dfB[!,varnames[i]], + res[i], + cut_a=cut_a[i], + cut_b=cut_p[i], + partial=partials[i]) end + end + @info "Getting table counts" + counts = get_match_patterns(res) - @info "Getting table counts" - counts = tableCounts(view(res.result_matrix,:,:), varnames) - - @info "Running expectation maximization function" - resultsEM = emlinkMARmov(counts[2], obs_a,obs_b, - varnames,res.ranges,tol=tol_em) + @info "Running expectation maximization function" + resultsEM = emlinkMARmov(counts, obs_a,obs_b, + varnames,tol=tol_em) - @info "Retrieving matches" - matches = getMatches(resultsEM, counts[1], obs_a,threshold_match=threshold_match) - - return (resultsEM, matches) - end + # testing removing uncessessary indices (where no obs exist) + #remove_no_matched_var_indices(resultsEM) + # adding uids + resultsEM = merge(resultsEM, (matched_ids = indices_to_uids(dfA[!, idvar[1]],dfB[!, idvar[2]],resultsEM.indices),)) + + @info "Retrieving matches" + getMatches(resultsEM,threshold_match=threshold_match) + + return (resultsEM) + end # version of fast link that i can pass to via named tuples @@ -258,7 +201,8 @@ function fastLink(dfA::DataFrame, dfB::DataFrame; numvars=length(varnames) obs_a=nrow(dfA) obs_b=nrow(dfB) - + dims = (obs_a,obs_b) + @info "Checking settings for $numvars declared variables." partials = check_input_lengths(partials, numvars, "partials") upper_case = check_input_lengths(upper_case, numvars, "upper_case") @@ -267,41 +211,64 @@ function fastLink(dfA::DataFrame, dfB::DataFrame; cut_a = check_input_lengths(cut_a, numvars, "cut_a") cut_p = check_input_lengths(cut_p, numvars, "cut_p") stringdist_method = check_input_lengths(stringdist_method, numvars, "stringdist_method") - vartypes, comparison_levels = check_var_types(dfA,dfB,varnames,match_method,partials) - res=ResultMatrix(comparison_levels, (obs_a,obs_b)) - - fastlink_settings=FastLinkVars(varnames,res,vartypes,stringdist_method,jw_weight,cut_a,cut_p,upper_case,partials,fuzzy,comparison_levels) - + res = [DiBitMatrix(obs_a,obs_b) for _ in varnames] + # allow missing for comparisons + allowmissing!(dfA) + allowmissing!(dfB) - return () -> begin - - # allow missing for comparisons - allowmissing!(dfA) - allowmissing!(dfB) - - - # iterate through variables and execute function over them - for i in eachindex(varnames) - @info "Now matching var $(varnames[i]) using $(match_method[i])" - fastlink_settings.comparison_funs[i](dfA[!,varnames[i]],dfB[!,varnames[i]]) + + # iterate through variables and execute function over them + for i in eachindex(varnames) + @info "Now matching var $(varnames[i]) using $(match_method[i])" + if match_method[i] == "string" + if fuzzy[i] + gammaCKfuzzy!(dfA[!,varnames[i]], + dfB[!,varnames[i]], + res[i], + dims, + cut_a=cut_a[i], + cut_b=cut_p[i], + upper=upper[i], + w=jw_weight[i], + partial=partials[i]) + else + gammaCKpar!(dfA[!,varnames[i]], + dfB[!,varnames[i]], + res[i], + dims, + distmethod=stringdist_method[i], + cut_a=cut_a[i], + cut_b=cut_p[i], + w=jw_weight[i], + partial=partials[i]) + end + elseif match_method[i] == "exact" || match_method[i] == "bool" + gammaKpar!(dfA[!,varnames[i]], + dfB[!,varnames[i]], + res[i], + dims) end + end + @info "Getting table counts" + counts = get_match_patterns(res) - @info "Getting table counts" - counts = tableCounts(view(res.result_matrix,:,:), varnames) - - @info "Running expectation maximization function" - resultsEM = emlinkMARmov(counts[2], obs_a,obs_b, - varnames,res.ranges,tol=tol_em) + @info "Running expectation maximization function" + resultsEM = emlinkMARmov(counts, obs_a,obs_b, + varnames,tol=tol_em) - @info "Retrieving matches" - matches = getMatches(resultsEM, counts[1], obs_a,threshold_match=threshold_match) - - return (resultsEM, matches) - end + # testing removing uncessessary indices (where no obs exist) + #remove_no_matched_var_indices(resultsEM) + # adding uids + resultsEM = merge(resultsEM, (matched_ids = indices_to_uids(vecA,vecB,resultsEM.indices),)) + + @info "Retrieving matches" + getMatches(resultsEM,threshold_match=threshold_match) + + return (resultsEM) end diff --git a/src/gammas/Gammas.jl b/src/gammas/Gammas.jl index 7af4ae8..a27bd96 100755 --- a/src/gammas/Gammas.jl +++ b/src/gammas/Gammas.jl @@ -1,11 +1,13 @@ module Gammas import PooledArrays: PooledVector -import StringDistances: Jaro, JaroWinkler, Levenshtein, DamerauLevenshtein +import StringDistances: Jaro, JaroWinkler, Levenshtein, DamerauLevenshtein, compare +using FastLink.DiBitMat +import FastLink: match1, match2, missingval, nonmatch include("gammaKpar.jl") +include("gammaNUMCKpar.jl") include("gammaCKpar.jl") include("gammaCKfuzzy.jl") -include("gammaNUMCKpar.jl") export gammaCKpar!, gammaKpar!, gammaCKfuzzy!, gammaNUMCKpar! end # Gammas diff --git a/src/gammas/gammaCKfuzzy.jl b/src/gammas/gammaCKfuzzy.jl index 8b886bb..2084cdf 100644 --- a/src/gammas/gammaCKfuzzy.jl +++ b/src/gammas/gammaCKfuzzy.jl @@ -1,4 +1,4 @@ -function pool_lookup_table(vec::Vector{UInt32},len::UInt32) +function pool_lookup_table(vec::Vector{UInt32},len::UInt32)::Vector{Vector{UInt32}} lookup_by_id = fill(Vector{UInt32}(), len) Threads.@threads for pool_index in UInt32(1):len lookup_by_id[pool_index] = (1:length(vec))[vec .=== pool_index] @@ -107,63 +107,51 @@ function score_letter!(candidate_score::CandidateScore, query_mask::UInt16, cand candidate_score.last_match_letter_index |= mask_result end -function calculate_jaro_winkler(p::Float64) - return (score::CandidateScore, query_partial::UInt16) -> begin - transpositions = score.transposition_count > score.matches / 2 ? score.transposition_count - 1 : score.transposition_count - partial = ((score.matches * score.len_partial) + (score.matches * query_partial)) / 1024.0 - jaro = (partial + 1.0 - (transpositions / score.matches)) / 3.0 - l = trailing_ones(UInt16(score.used_exact & 0x000f)) - return jaro + p * l * (1.0 - jaro) - end +function calculate_jaro_winkler(score::CandidateScore, query_partial::UInt16, p::Float64) + transpositions = score.transposition_count > score.matches / 2 ? score.transposition_count - 1 : score.transposition_count + partial = ((score.matches * score.len_partial) + (score.matches * query_partial)) / 1024.0 + jaro = (partial + 1.0 - (transpositions / score.matches)) / 3.0 + l = trailing_ones(UInt16(score.used_exact & 0x000f)) + return jaro + p * l * (1.0 - jaro) end function find_missing_index(d::Dict)::UInt32 return let x = [i for i in values(filter(x->ismissing(x[1]), d))]; length(x) > 0 ? x[1] : UInt32(0) ; end end - -function update_results(results::SubArray, - array_2Dindex::Function, - vartype::DataType) - if vartype == Vector{UInt32} - return (a_ids::Vector{UInt32}, - b_ids::Vector{UInt32}, - val::Matrix{Bool}) -> begin - results[[array_2Dindex(UInt32(ia),UInt32(ib)) for ia in a_ids for ib in b_ids],:] .= val - return nothing - end - else - return (a_ids::Vector{UInt32}, - b_ids::UnitRange{UInt32}, - val::Matrix{Bool}) -> begin - results[[array_2Dindex(UInt32(ia),UInt32(ib)) for ia in a_ids for ib in b_ids],:] .= val - return nothing - end +function update_results!(results::DiBitMatrix, a_ids::Vector{UInt32}, b_ids::Vector{UInt32}, val::UInt8) + for ia in a_ids, ib in b_ids + results[ia,ib] = val end + return nothing +end +function update_results!(results::DiBitMatrix, a_ids::Vector{UInt32}, b_ids::UnitRange{UInt32}, val::UInt8) + for ia in a_ids, ib in b_ids + results[ia,ib] = val + end + return nothing end +function score_value(results::DiBitMatrix,score::CandidateScore,query_partial::UInt16, + a_ids::Vector{UInt32},lookup_b_by_id::Vector{Vector{UInt32}}, + score_i::Int, p::Float64,cut_a::Float64,cut_b::Float64) + jw = calculate_jaro_winkler(score, query_partial, p) + if jw >= cut_a + update_results!(results,a_ids,lookup_b_by_id[score_i],match2) + end + return nothing +end -function score_value(partial::Bool,p::Float64,match1::Matrix{Bool},match2::Matrix{Bool}, - update_results!::Function,cut_a::Float64,cut_b::Float64; - calculate_jaro_winkler=calculate_jaro_winkler) - calculate_jaro_winkler = calculate_jaro_winkler(p) - if partial - return (score,query_partial, a_ids,lookup_b_by_id,score_i) -> begin - jw = calculate_jaro_winkler(score, query_partial) - if jw >= cut_a - update_results!(a_ids,lookup_b_by_id[score_i],match2) - elseif jw >= cut_b - update_results!(a_ids,lookup_b_by_id[score_i],match1) - end - end - else - return (score,query_partial, a_ids,lookup_b_by_id,score_i) -> begin - jw = calculate_jaro_winkler(score, query_partial) - if jw >= cut_a - update_results!(a_ids,lookup_b_by_id[score_i],match2) - end - end +function score_value2(results::DiBitMatrix,score::CandidateScore, query_partial::UInt16, + a_ids::Vector{UInt32},lookup_b_by_id::Vector{Vector{UInt32}}, + score_i::Int, p::Float64,cut_a::Float64,cut_b::Float64) + jw = calculate_jaro_winkler(score, query_partial, p) + if jw >= cut_a + update_results!(results,a_ids,lookup_b_by_id[score_i],match2) + elseif jw >= cut_b + update_results!(results,a_ids,lookup_b_by_id[score_i],match1) end + return nothing end @@ -184,69 +172,66 @@ https://tech.popdata.org/speeding-up-Jaro-Winkler-with-rust-and-bitwise-operatio - `upper::Bool=true`: Whether input string is uppercase. - `w`: Winkler weight for jw string distance. """ -function gammaCKfuzzy!(results::SubArray,array_2Dindex::Function, - dims::Tuple; - cut_a::Float64=0.92,cut_b::Float64=0.88, - upper::Bool=true,w::Float64="jw",partial::Bool=true, - match2::Matrix{Bool}=[true true],match1::Matrix{Bool}=[true false], - missingval::Matrix{Bool}=[false true]) +function gammaCKfuzzy!(vecA::PooledVector,vecB::PooledVector,results::DiBitMatrix,dims::Tuple{Int,Int}; + cut_a::Float64=0.92,cut_b::Float64=0.88,upper::Bool=true, + w::Float64=0.1,partial::Bool=true) + + # functions that update the results view - score_value! = score_value(partial,w,match1,match2, - update_results(results,array_2Dindex,Vector{UInt32}), - cut_a,cut_b) - - update_results! = update_results(results,array_2Dindex,UnitRange{UInt32}) + if partial + score_value! = score_value2 + else + score_value! = score_value + end - if upper + # change the range of ascii characters dependent on case + if upper space_char,max_char = 0x40,0x5a else space_char,max_char = 0x60,0x7a end - - (vecA::PooledVector,vecB::PooledVector) -> begin - - lenA = UInt32(length(vecA.pool)) - lenB = UInt32(length(vecB.pool)) + # length of unique values + lenA = UInt32(length(vecA.pool)) + lenB = UInt32(length(vecB.pool)) - lookup_a_by_id=pool_lookup_table(vecA.refs, lenA) - lookup_b_by_id=pool_lookup_table(vecB.refs, lenB) + # vector of pool value indices + lookup_a_by_id=pool_lookup_table(vecA.refs, lenA) + lookup_b_by_id=pool_lookup_table(vecB.refs, lenB) + + missingindexA = find_missing_index(vecA.invpool) + + base_candidate_lookup = build_candidate_lookup(vecB.pool,spaceletter=space_char,lastletter=max_char) + base_candidate_scores = build_candidate_scores(vecB.pool) + + Threads.@threads for (query_name,new_a_id) in collect(vecA.invpool) + # pass if query is missing val + if new_a_id === missingindexA + update_results!(results, lookup_a_by_id[new_a_id],UInt32(1):UInt32(dims[2]),missingval) + continue + end - missingindexA = find_missing_index(vecA.invpool) + query_len = UInt8(min(ncodeunits(query_name),16)) + query_masks_lookup = maskify(query_name,query_len,space_char=space_char,max_char=max_char) + query_partial = UInt16(1024 ÷ query_len) + candidate_scores = deepcopy(base_candidate_scores) - base_candidate_lookup = build_candidate_lookup(vecB.pool,spaceletter=space_char,lastletter=max_char) - base_candidate_scores = build_candidate_scores(vecB.pool) + for (query_index, (letter_index, query_mask_by_candidate_len)) in enumerate(query_masks_lookup) + for c_info in base_candidate_lookup[letter_index] + candidate_score = candidate_scores[c_info.name_index] + query_mask = query_mask_by_candidate_len[c_info.len] + score_letter!(candidate_score, query_mask, c_info.mask, query_index) + end + end - Threads.@threads for (query_name,new_a_id) in collect(vecA.invpool) - # pass if query is missing val - if new_a_id === missingindexA - update_results!(lookup_a_by_id[new_a_id],UInt32(1):UInt32(dims[2]),missingval) + a_ids = lookup_a_by_id[new_a_id] + for (score_i, score) in enumerate(candidate_scores) + if score.len_partial === UInt16(1) + update_results!(results, a_ids, lookup_b_by_id[score_i], missingval) continue end - - query_len = UInt8(min(ncodeunits(query_name),16)) - query_masks_lookup = maskify(query_name,query_len,space_char=space_char,max_char=max_char) - query_partial = UInt16(1024 ÷ query_len) - candidate_scores = deepcopy(base_candidate_scores) - - for (query_index, (letter_index, query_mask_by_candidate_len)) in enumerate(query_masks_lookup) - for c_info in base_candidate_lookup[letter_index] - candidate_score = candidate_scores[c_info.name_index] - query_mask = query_mask_by_candidate_len[c_info.len] - score_letter!(candidate_score, query_mask, c_info.mask, query_index) - end - end - - a_ids = lookup_a_by_id[new_a_id] - for (score_i, score) in enumerate(candidate_scores) - if score.len_partial === UInt16(1) - update_results!(a_ids,lookup_b_by_id[score_i],missingval) - continue - end - # if present calculate scores - score_value!(score,query_partial, a_ids,lookup_b_by_id,score_i) - end + # if present calculate scores + score_value!(results, score,query_partial, a_ids,lookup_b_by_id,score_i, w, cut_a, cut_b) end - - return nothing end + return nothing end diff --git a/src/gammas/gammaCKpar.jl b/src/gammas/gammaCKpar.jl index 7a2b982..4165cbc 100644 --- a/src/gammas/gammaCKpar.jl +++ b/src/gammas/gammaCKpar.jl @@ -1,47 +1,25 @@ -function score_strings(results::SubArray,array_2Dindex::Function, - distmethod::String, partial::Bool, cut_a::Float64,cut_b::Float64, w::Float64, - match2::Vector{Bool}, match1::Vector{Bool}) - - thresh_a, thresh_b = 1 - cut_a, 1 - cut_b - - # assign distance function - if distmethod=="jw" - distance = JaroWinkler(p=w) - elseif distmethod=="dl" - distance = DamerauLevenshtein() - elseif distmethod=="jaro" - distance = Jaro(p=w) - elseif distmethod=="lv" - distance = Levenshtein() - end - - if partial - return (string1,string2,indices_x,indices_y) -> begin - dist=round(distance(string1,string2),digits=4) - # if matches at a threshold, go through result vector and assign new value - if dist <= thresh_a - for ix in indices_x,iy in indices_y - results[array_2Dindex(UInt32(ix),UInt32(iy)),:] = match2 - end - elseif dist <= thresh_b - for ix in indices_x,iy in indices_y - results[array_2Dindex(UInt32(ix),UInt32(iy)),:] = match1 - end - end - return nothing +function score_value2(dist::Float64,indices_x::Vector{<:Integer},indices_y::Vector{<:Integer}, cut_a::Float64, cut_b::Float64, results::DiBitMatrix) + # if matches at a threshold, go through result vector and assign new value + if dist >= cut_a + for ix in indices_x, iy in indices_y + results[ix,iy] = match2 end - else - return (string1,string2,indices_x,indices_y) -> begin - dist=round(distance(string1,string2),digits=4) - # if matches at a threshold, go through result vector and assign new value - if dist <= thresh_a - for ix in indices_x,iy in indices_y - results[array_2Dindex(UInt32(ix),UInt32(iy)),:] = match2 - end - end - return nothing + elseif dist >= cut_b + for ix in indices_x,iy in indices_y + results[ix,iy] = match1 + end + end + return nothing +end + +function score_value(dist::Float64,indices_x::Vector{<:Integer},indices_y::Vector{<:Integer}, cut_a::Float64, cut_b::Float64, results::DiBitMatrix) + # if matches at a threshold, go through result vector and assign new value + if dist >= cut_a + for ix in indices_x,iy in indices_y + results[ix,iy] = match2 end end + return nothing end @@ -51,59 +29,70 @@ String comparison of two columns with partial match. # Arguments - `vecA::PooledVector`: Target column of dfB for string comparison. - `vecB::PooledVector`: Target column of dfB for string comparison. -- `results::SubArray`: ResultMatrix object's result_matrix. -- `array_2Dindex::Function`: ResultMatrix object's array_2Dindex function -- `dims::Tuple`: ResultMatrix object's dims. +- `results::DiBitMatrix`: DiBitMatrix object's result_matrix. +- `dims::Tuple`: DiBitMatrix object's dims. - `cut_a::Float=0.92`: Lower bound for close string distances. - `cut_b::Float=0.88`: Lower bound for partial string distances. - `distmethod::String`: String distance method ("jw" Jaro-Winkler (Default), "dl" Damerau-Levenshtein, "jaro" Jaro, "lv" Levenshtein, and "ham" Hamming). - `w`: Winkler weight for jw string distance. """ -function gammaCKpar!(results::SubArray,array_2Dindex::Function,dims::Tuple; - distmethod="jw",cut_a=0.92,cut_b=0.88,partial=true,w=0.1, - match2 = [true, true], match1 = [true, false], - missingval = [false, true]) +function gammaCKpar!(vecA::PooledVector,vecB::PooledVector, + results::DiBitMatrix,dims::Tuple{Int,Int}; + distmethod="jw",cut_a=0.92,cut_b=0.88,partial=true,w=0.1) - score_strings! = score_strings(results,array_2Dindex,distmethod, - partial,cut_a,cut_b,w,match2,match1) + # assign distance function + if distmethod=="jw" + distance = JaroWinkler(p=w) + elseif distmethod=="dl" + distance = DamerauLevenshtein() + elseif distmethod=="jaro" + distance = Jaro(p=w) + elseif distmethod=="lv" + distance = Levenshtein() + end - return (vecA::PooledVector,vecB::PooledVector) -> begin - # Segment unique keys from missing key - missingvals_x = findfirst(ismissing.(vecA.pool)) - iter_x=filter(x -> x != missingvals_x, 0x00000001:UInt32(length(vecA.pool))) - - missingvals_y = findfirst(ismissing.(vecB.pool)) - iter_y=filter(x -> x != missingvals_y, 0x00000001:UInt32(length(vecB.pool))) - - # Form match matrices based on differing levels of matches - Threads.@threads for x in iter_x - indices_x = findall(vecA.refs .=== x) - for y in iter_y - indices_y = findall(vecB.refs .=== y) - score_strings!(vecA.pool[x],vecB.pool[y], indices_x,indices_y) - end + if partial + score_value! = score_value2 + else + score_value! = score_value + end + + # Segment unique keys from missing key + missingvals_x = findfirst(ismissing.(vecA.pool)) + iter_x=filter(x -> x != missingvals_x, UInt32(1):UInt32(length(vecA.pool))) + + missingvals_y = findfirst(ismissing.(vecB.pool)) + iter_y=filter(x -> x != missingvals_y, UInt32(1):UInt32(length(vecB.pool))) + + # Form match matrices based on differing levels of matches + Threads.@threads for x in iter_x + indices_x = findall(vecA.refs .=== x) + for y in iter_y + indices_y = findall(vecB.refs .=== y) + dist=round(compare(vecA.pool[x],vecB.pool[y], distance),digits=4) #this always normalizes dist 0 to 1 + score_value!(dist, indices_x,indices_y, cut_a,cut_b, results) end + end - # set all to missing where x is missing - if !isnothing(missingvals_x) - missingindices = findall(vecA.refs .== missingvals_x) - Threads.@threads for iy in 1:dims[2] - for ix in missingindices - results[array_2Dindex(UInt32(ix),UInt32(iy)),:] = missingval - end + # set all to missing where x is missing + if !isnothing(missingvals_x) + missingindices = findall(vecA.refs .== missingvals_x) + Threads.@threads for iy in 1:dims[2] + for ix in missingindices + results[ix,iy] = missingval end end - # set all to missing where y is missing - if !isnothing(missingvals_y) - missingindices = findall(vecB.refs .== missingvals_y) - Threads.@threads for ix in 1:dims[1] - for iy in missingindices - results[array_2Dindex(UInt32(ix),UInt32(iy)),:] = missingval - end + end + + # set all to missing where y is missing + if !isnothing(missingvals_y) + missingindices = findall(vecB.refs .== missingvals_y) + Threads.@threads for ix in 1:dims[1] + for iy in missingindices + results[ix,iy] = missingval end end - # Return nothing - return nothing - end + # Return nothing + return nothing end diff --git a/src/gammas/gammaKpar.jl b/src/gammas/gammaKpar.jl index efce518..7533033 100755 --- a/src/gammas/gammaKpar.jl +++ b/src/gammas/gammaKpar.jl @@ -8,49 +8,47 @@ Exact comparison of variables. - `array_2Dindex::Function`: ResultMatrix object's array_2Dindex function - `dims::Tuple`: ResultMatrix object's dims. """ -function gammaKpar!(results::SubArray, array_2Dindex::Function, dims::Tuple, match2::Vector{Bool}, missingval::Vector{Bool}) - (vecA::PooledVector,vecB::PooledVector) -> begin - # Segment unique keys from missing key - missingvals_x = findfirst(ismissing.(vecA.pool)) - iter_x=filter(x -> x != missingvals_x, 0x00000001:UInt32(length(vecA.pool))) - - missingvals_y = findfirst(ismissing.(vecB.pool)) - iter_y=filter(x -> x != missingvals_y, 0x00000001:UInt32(length(vecB.pool))) - - # Form match matrices based on differing levels of matches - Threads.@threads for x in iter_x - indices_x = findall(vecA.refs .=== x) - for y in iter_y - indices_y = findall(vecB.refs .=== y) - # if matches at a threshold, go through result vector and assign new value - if vecA.pool[x] == vecB.pool[y] - for ix in indices_x,iy in indices_y - results[array_2Dindex(UInt32(ix),UInt32(iy)),:] = match2 - end +function gammaKpar!(vecA::PooledVector,vecB::PooledVector,results::DiBitMatrix, dims::Tuple) + # Segment unique keys from missing key + missingvals_x = findfirst(ismissing.(vecA.pool)) + iter_x=filter(x -> x != missingvals_x, 0x00000001:UInt32(length(vecA.pool))) + + missingvals_y = findfirst(ismissing.(vecB.pool)) + iter_y=filter(x -> x != missingvals_y, 0x00000001:UInt32(length(vecB.pool))) + + # Form match matrices based on differing levels of matches + Threads.@threads for x in iter_x + indices_x = findall(vecA.refs .=== x) + for y in iter_y + indices_y = findall(vecB.refs .=== y) + # if matches at a threshold, go through result vector and assign new value + if vecA.pool[x] == vecB.pool[y] + for ix in indices_x,iy in indices_y + results[ix,iy] = match2 end end end + end - # set all to missing where x is missing - if !isnothing(missingvals_x) - missingindices = findall(vecA.refs .== missingvals_x) - Threads.@threads for iy in 1:dims[2] - for ix in missingindices - results[array_2Dindex(UInt32(ix),UInt32(iy)),:] = missingval - end + # set all to missing where x is missing + if !isnothing(missingvals_x) + missingindices = findall(vecA.refs .== missingvals_x) + Threads.@threads for iy in 1:dims[2] + for ix in missingindices + results[ix,iy] = missingval end end - # set all to missing where y is missing - if !isnothing(missingvals_y) - missingindices = findall(vecB.refs .== missingvals_y) - Threads.@threads for ix in 1:dims[1] - for iy in missingindices - results[array_2Dindex(UInt32(ix),UInt32(iy)),:] = missingval - end + end + # set all to missing where y is missing + if !isnothing(missingvals_y) + missingindices = findall(vecB.refs .== missingvals_y) + Threads.@threads for ix in 1:dims[1] + for iy in missingindices + results[ix,iy] = missingval end end - # Return nothing - return nothing end + # Return nothing + return nothing end diff --git a/src/gammas/gammaNUMCKpar.jl b/src/gammas/gammaNUMCKpar.jl index 77f09e2..1dc4162 100755 --- a/src/gammas/gammaNUMCKpar.jl +++ b/src/gammas/gammaNUMCKpar.jl @@ -1,180 +1,31 @@ -# B-tree for difference buckets -struct IndexValue{T} - indx::UInt32 - value::T +function allow_missing(x::Vector) + return copy!(Vector{Union{eltype(x), Missing}}(),x) end -struct ValueRange{T} - left::T - right::T - function ValueRange{T}(value, offset; compare! = -) where T - new{T}(compare!(value,offset),compare!(value,-offset)) - end +# difference categorization functions functions +function get_diff1(x::T,y::T,cut_a::T)::UInt8 where T <: Number + y - x < cut_a ? match2 : nonmatch end - -mutable struct Bucket{T} - range::ValueRange{T} - obs::Vector{IndexValue{T}} - left::Bucket{T} - right::Bucket{T} - lock::ReentrantLock - function Bucket{T}(index, value, offset) where T - n = new{T}() - n.range = ValueRange{T}(value, offset) - n.obs=IndexValue[IndexValue(UInt32(index), value)] - n.lock = ReentrantLock() - return n - end -end - -# allows for redefinition of comparison function based on need -function compare_values(ValueType::DataType,fn::Function) - if ValueType <: Float64 - return (a::Float64,b::Float64) -> fn(a,b) +function get_diff2(x::T,y::T,cut_a::T, cut_b::T)::UInt8 where T <: Number + d = y - x + if d <= cut_a + return match2 else - return (a::Int64,b::Int64) -> fn(a,b) - end -end - -function update_tree(cut_b,BucketImpl) - return function update_tree!(node::Bucket,index::UInt32, value) - if value < node.range.left - lock(node.lock) - if isdefined(node,:left) - unlock(node.lock) - update_tree!(node.left,index, value) - else - node.left = BucketImpl(index, value, cut_b) - unlock(node.lock) - end - elseif value > node.range.right - lock(node.lock) - if isdefined(node,:right) - unlock(node.lock) - update_tree!(node.right,index, value) - else - node.right = BucketImpl(index, value,cut_b) - unlock(node.lock) - end + if d <= cut_b + return match1 else - lock(node.lock) - push!(node.obs, IndexValue(UInt32(index), value)) - unlock(node.lock) + return nonmatch end - - return node end end -function score_diff(results::SubArray,array_2Dindex::Function, - partial::Bool, cut_a::Float64,cut_b::Float64, - match2::Vector{Bool}, match1::Vector{Bool}, - compare!::Function) - if partial - (node::Bucket, value::Float64,index::UInt32) -> begin - for i in node.obs - if compare!(value, i.value) <= cut_a - results[array_2Dindex(i.indx,index),:] = match2 - elseif compare!(value, i.value) <= cut_b - results[array_2Dindex(i.indx,index),:] = match1 - end - end - return nothing - end - else - (node::Bucket, value::Float64,index::UInt32) -> begin - for i in node.obs - if compare!(value, i.value) <= cut_a - results[array_2Dindex(i.indx,index),:] = match2 - end - end - return nothing - end - end +function get_diff2(x::T, y::Missing,cut_a::T, cut_b::T)::UInt8 where T <: Number + return missingval end -function score_diff(results::SubArray,array_2Dindex::Function, - partial::Bool, cut_a::Int,cut_b::Int, - match2::Vector{Bool}, match1::Vector{Bool}, - compare!::Function) - if partial - (node::Bucket, value::Int,index::UInt32) -> begin - for i in node.obs - if abs(compare!(value, i.value)) <= cut_a - results[array_2Dindex(i.indx,index),:] = match2 - elseif abs(compare!(value, i.value)) <= cut_b - results[array_2Dindex(i.indx,index),:] = match1 - end - end - return nothing - end - else - (node::Bucket, value::Int,index::UInt32) -> begin - for i in node.obs - if abs(compare!(value, i.value)) <= cut_a - results[array_2Dindex(i.indx,index),:] = match2 - end - end - return nothing - end - end -end -function search_upper(compare!::Function, score_diff!::Function) - return function search_upper!(node::Bucket,index::UInt32, value, cut_b) - diff = compare!(value, node.range.right) - if diff > cut_b - if isdefined(node,:right) - search_upper!(node.right,index,value,cut_b) - end - # if exactly center upper will return it - elseif diff <= -cut_b - if isdefined(node,:left) - search_upper!(node.left,index, value,cut_b) - end - else - score_diff!(node,value,index) - if diff > 0 - if isdefined(node,:right) - search_upper!(node.right,index,value,cut_b) - end - elseif diff < 0 - if isdefined(node,:left) - search_upper!(node.left,index, value,cut_b) - end - end - end - return nothing - end -end +fix_id(id::Int, N_a::Int) = id <= N_a ? (id, true) : (id - N_a, false) -function search_lower(compare!::Function,score_diff!::Function) - return function search_lower!(node::Bucket,index::UInt32, value, cut_b) - diff = compare!(value, node.range.left) - if diff > cut_b - if isdefined(node,:right) - search_lower!(node.right,index,value,cut_b) - end - # does not handle exactly center - elseif diff < -cut_b - if isdefined(node,:left) - search_lower!(node.left,index, value,cut_b) - end - else - score_diff!(node,value,index) - if diff > 0 - if isdefined(node,:right) - search_lower!(node.right,index,value,cut_b) - end - elseif diff < 0 - if isdefined(node,:left) - search_lower!(node.left,index, value,cut_b) - end - end - end - return nothing - end -end """ Numeric comparison of two columns @@ -182,108 +33,172 @@ Numeric comparison of two columns # Arguments - `vecA::PooledVector`: Target column of dfB for string comparison. - `vecB::PooledVector`: Target column of dfB for string comparison. -- `results::SubArray`: ResultMatrix object's result_matrix. -- `array_2Dindex::Function`: ResultMatrix object's array_2Dindex function -- `dims::Tuple`: ResultMatrix object's dims. -- `cut_a::Number=1.0`: Lower bound for close string distances. -- `cut_b::Number=2.0`: Lower bound for partial string distances. +- `res::DiBitMatrix`: DiBitMatrix. +- `cut_a::Number=1`: Lower bound for close string distances. +- `cut_b::Number=2`: Lower bound for partial string distances. """ -function gammaNUMCKpar!(results::SubArray,array_2Dindex::Function, - dims::Tuple; cut_a=1,cut_b=2, - partial::Bool=true,match2=[true,true], - match1=[true,false],missingval=[false,true], - comparison_function=-, match_method="int") +function gammaNUMCKpar!(vecA::Vector,vecB::Vector, + results::DiBitMatrix; + cut_a=1,cut_b=2, + partial::Bool=true) + + N_a = length(vecA) + N_b = length(vecB) + # whether it is two levels or 1 + if partial + get_diff = get_diff2 + else + get_diff = get_diff1 + end - - ValueType=match_method=="int" ? Int64 : Float64 - compare! = compare_values(ValueType, comparison_function) - # define scoring function - score_diff! = score_diff(results, - array_2Dindex, - partial, - cut_a, - cut_b, - match2, - match1, - compare!) + vecA=allow_missing(vecA) + vecB=allow_missing(vecB) - search_upper! = search_upper(compare!,score_diff!) - search_lower! = search_lower(compare!,score_diff!) - - # inner fun - (vecA,vecB::Vector) -> begin - # coerce to float if needed - if typeof(vecA) <: Vector{Union{Missing, Float64}} || - typeof(vecB) <: Vector{Union{Missing, Float64}} || - match_method == "float" - vecA=convert(Vector{Union{Missing, Float64}},vecA) - vecB=convert(Vector{Union{Missing, Float64}},vecB) - cut_a=convert(Float64,cut_a) - cut_b=convert(Float64,cut_b) - ValueType=Float64 - score_diff! = score_diff(results, - array_2Dindex, - partial, - cut_a, - cut_b, - match2, - match1, - compare!) + # coercing cuts in case they are wrong type + if (eltype(vecA) <: Union{Integer,Missing}) & (eltype(vecB) <: Union{Integer,Missing}) + cut_a = Int(cut_a) + cut_b = Int(cut_b) + else + cut_a = Float64(cut_a) + cut_b = Float64(cut_b) + end - search_upper! = search_upper(compare!,score_diff!) - search_lower! = search_lower(compare!,score_diff!) - - end - - + # get the sorted indices of a large copied array + append!(vecA,vecB) + vecC=sortperm(vecA) + copy!(vecA,vecA[vecC]) + len=length(vecA) + + # preallocation of ranges for the threads + tids = Threads.nthreads() + breaksize= len ÷ (tids-1) + starts = (collect(0:(tids-1))) .* breaksize .+ 1 + ends = starts[:] + if last(starts) == len + pop!(starts) + popfirst!(ends) + else + ends = append!(starts[:], [len]) + popfirst!(ends) + end + tids=length(starts) + + Threads.@threads for tid in 1:tids + start=starts[tid] + s_end=ends[tid] + + under_consideration_value = [vecA[start]] + current_id, tableA_bool = fix_id(vecC[start], N_a) + under_consideration_id = [current_id] + tableA_bools = [tableA_bool] + match_values = Vector{UInt8}() - missings_x=UInt32[] - missings_y=UInt32[] - missing_lock = ReentrantLock() - # root of B-tree - BucketImpl=Bucket{ValueType} - update_tree! = update_tree(cut_b, BucketImpl) - root = BucketImpl(UInt32(1), vecA[1],cut_b) - - Threads.@threads for i in UInt32(2):UInt32(length(vecA)) - value=vecA[i] - if ismissing(value) - lock(missing_lock) - push!(missings_x,UInt32(i)) - unlock(missing_lock) - else - update_tree!(root,i, value) - end - end - Threads.@threads for i in UInt32(1):UInt32(length(vecB)) - value=vecB[i] - if ismissing(value) - lock(missing_lock) - push!(missings_y,i) - unlock(missing_lock) - else - search_upper!(root,i, value,cut_b) - search_lower!(root,i, value,cut_b) - end - end - # set all to missing where x is missing - if missings_x != UInt32[] - Threads.@threads for iy in 1:dims[2] - for ix in missings_x - results[array_2Dindex(UInt32(ix),UInt32(iy)),:] = missingval + for i in 1:(s_end-start) + itarg=start+i + obs=vecA[itarg] + current_id, tableA_bool = fix_id(vecC[itarg], N_a) + match_values=get_diff.(under_consideration_value, obs, cut_a, cut_b) + if last(match_values) === missingval + for ii in itarg:s_end + current_id, tableA_bool = fix_id(vecC[ii], N_a) + if tableA_bool + for icol in 1:N_b + results[current_id, icol] = missingval + end + else + for irow in 1:N_a + results[irow, current_id] = missingval + end + end + end + under_consideration_id=Int64[] + under_consideration_value=Int64[] + tableA_bools = Bool[] + match_values = Vector{UInt8}() + break # break if we run into missing values + end + removed_values=0 + for m in eachindex(match_values) + if match_values[m] === nonmatch + popfirst!(under_consideration_id) + popfirst!(under_consideration_value) + popfirst!(tableA_bools) + removed_values += 1 + elseif match_values[m] === match2 + # iterate through and break because they all match + for idc in (m-removed_values):(length(match_values)-removed_values) + if tableA_bool ⊻ tableA_bools[idc] + if tableA_bool + results[current_id,under_consideration_id[idc]] = match2 + else + results[under_consideration_id[idc], current_id] = match2 + end + end + end + break + else + if tableA_bool ⊻ tableA_bools[m-removed_values] + if tableA_bool + results[current_id,under_consideration_id[m-removed_values]] = match1 + else + results[under_consideration_id[m-removed_values], current_id] = match1 + end + end end end + push!(under_consideration_value, obs) + push!(under_consideration_id, current_id) + push!(tableA_bools,tableA_bool) end - # set all to missing where y is missing - if missings_y != UInt32[] - Threads.@threads for ix in 1:dims[1] - for iy in missings_y - results[array_2Dindex(UInt32(ix),UInt32(iy)),:] = missingval + + i = (s_end-start) + + # clear out the overlap + while length(under_consideration_id) > 0 + i += 1 + itarg=start+i + if itarg === len + break + end + obs=vecA[itarg] + current_id, tableA_bool = fix_id(vecC[itarg], N_a) + match_values=get_diff.(under_consideration_value, obs, cut_a, cut_b) + + removed_values=0 + for m in eachindex(match_values) + if match_values[m] === nonmatch + popfirst!(under_consideration_id) + popfirst!(under_consideration_value) + popfirst!(tableA_bools) + removed_values += 1 + elseif match_values[m] === match2 + for idc in (m-removed_values):(length(match_values)-removed_values) # iterate through and break because they all match + if tableA_bool ⊻ tableA_bools[idc] + if tableA_bool + results[current_id,under_consideration_id[idc]] = match2 + else + results[under_consideration_id[idc], current_id] = match2 + end + end + end + break + else + if tableA_bool ⊻ tableA_bools[m-removed_values] + if tableA_bool + results[current_id,under_consideration_id[m-removed_values]] = match1 + else + results[under_consideration_id[m-removed_values], current_id] = match1 + end + end end end - end - - return nothing + end + end + + return nothing end + + + diff --git a/src/getMatches.jl b/src/getMatches.jl index 7e4c304..252a3d7 100755 --- a/src/getMatches.jl +++ b/src/getMatches.jl @@ -11,22 +11,12 @@ Converts the matches from the tableCounts function based on the predefined thres # Arguments - `resultsEM::NamedTuple`: Output of the expectation maximization fuction (eg emlinkMARmov()) -- `patterns::Dict`: First indexed item of the tableCounts function (patterns extracted with transformed row names. -- `obs_a::Int`: Observations of the first dataframe input to main fastlink function. - `threshold_match`: Lower bound for the posterior probability that will act as a cutoff for matches. """ -function getMatches(resultsEM::NamedTuple, patterns::Dict, obs_a::Int; +function getMatches(resultsEM::NamedTuple; threshold_match=0.85,u_b=1e10) - match_vector = ( (resultsEM.zeta_j .>= threshold_match .&& resultsEM.patterns_w.weights .<= u_b) |> - x -> resultsEM.patterns_b[findall(x)] .|> - k -> patterns[k] ) |> - recursive_flatten - result_vector = Vector{Tuple{Int,Int}}(undef, length(match_vector)) - Threads.@threads for i in eachindex(match_vector) - result_vector[i] = get_2Dindex(match_vector[i], obs_a) - end - - return result_vector + resultsEM.patterns_w.ismatch = resultsEM.zeta_j .>= threshold_match .&& resultsEM.patterns_w.weights .<= u_b + return nothing end diff --git a/src/matchPatterns.jl b/src/matchPatterns.jl new file mode 100644 index 0000000..e2110ad --- /dev/null +++ b/src/matchPatterns.jl @@ -0,0 +1,107 @@ +struct ComparisonIndex + row::UInt32 + col::UInt32 + function ComparisonIndex(row::Int,col::Int) + new(UInt32(row),UInt32(col)) + end +end + +struct LocalPatterns + patterns::Vector{Vector{UInt8}} + indices::Vector{Vector{Int}} + hashes::Vector{UInt64} +end + +struct MatchPatterns + patterns::Vector{Vector{UInt8}} + indices::Vector{Vector{ComparisonIndex}} + hashes::Vector{UInt64} + + function MatchPatterns() + new(Vector{Vector{UInt8}}(), Vector{ComparisonIndex}(),Vector{UInt64}()) + end +end + +function indices_to_uids(vecA::T, vecB::T, + indices::Vector{Vector{ComparisonIndex}} + ) where T <: Vector + inds=eachindex(indices) + paired_ids = [Vector{Tuple}() for _ in inds] + Threads.@threads for i in inds + len=length(indices[i]) + lk = ReentrantLock() + Threads.@threads for first_val in 1:500:len + local_paired_ids=Vector{Tuple}() + last_val = first_val + min(len-first_val,500-first_val) + for ii in first_val:last_val + push!(local_paired_ids,(vecA[indices[i][ii].row],vecB[indices[i][ii].col])) + end + + lock(lk) do + append!(paired_ids[i],local_paired_ids) + end + + end + end + return paired_ids +end + +function get_2Dindex(index::T, nrows::Int) where T <: Integer + zero_based_index = index - 1 + row = Int(mod(zero_based_index, nrows)) + 1 + col = Int(div(zero_based_index, nrows)) + 1 + return ComparisonIndex(row, col) +end + +function get_local_patterns(x::Vector{Vector{UInt8}}, N::Int, S::Int) + patterns=Vector{Vector{UInt8}}() + hashes=Vector{UInt64}() + indices=Vector{Vector{UInt16}}() + + + for i in 1:S + pattern=zeros(UInt8,4) + for n in 1:N + pattern[n]=x[n][i] + end + pattern_hash=hash(pattern) + id = findfirst(pattern_hash .=== hashes) + if isnothing(id) + push!(patterns,pattern) + push!(hashes,pattern_hash) + push!(indices,[i]) + else + push!(indices[id],i) + end + end + return LocalPatterns(patterns,indices,hashes) +end + +function get_match_patterns(res::Vector{DiBitMatrix}) + matches=MatchPatterns() + N = length(res) + dimy=res[1].nrows + len=Int(res[1].data.len) + lk = ReentrantLock() + Threads.@threads for first_loc in 0:1024:len + last_loc = first_loc + 1024 + if last_loc > len + last_loc=len + end + x=[res[n].data[(first_loc+1):last_loc] for n in 1:N] + patterns=get_local_patterns(x,N,last_loc-first_loc) + for i in eachindex(patterns.hashes) + lock(lk) do + id = findfirst(patterns.hashes[i] .=== matches.hashes) + if isnothing(id) + push!(matches.patterns,patterns.patterns[i]) + push!(matches.hashes,patterns.hashes[i]) + push!(matches.indices,get_2Dindex.(patterns.indices[i],dimy)) + else + append!(matches.indices[id],get_2Dindex.(first_loc .+ patterns.indices[i],dimy)) + end + end + end + end + return matches +end diff --git a/src/matchesLink.jl b/src/matchesLink.jl deleted file mode 100755 index e69de29..0000000 diff --git a/src/resultMatrix.jl b/src/resultMatrix.jl deleted file mode 100755 index 2c54767..0000000 --- a/src/resultMatrix.jl +++ /dev/null @@ -1,31 +0,0 @@ -# needed to create ranges for the result matrix -function create_ranges(vec) - counter=1 - weight=0 - ranges = [] - for i in vec - push!(ranges, counter:(i+weight)) - counter += i - weight += i - end - return ranges -end - -# prespecify matrix that is able to hold results of comparisons -struct ResultMatrix - comparison_bits::Vector{Int} - dims::Tuple{Int, Int} - ranges::Vector{UnitRange{Int}} - result_matrix::Matrix{Bool} - array_2Dindex::Function - - function ResultMatrix(comparison_bits::Vector{Int}, dims::Tuple{Int, Int}) - comparison_bits=Int.(ceil.(log2.(comparison_bits.+2))) - new(comparison_bits, dims, - create_ranges(comparison_bits), - falses((prod(dims),sum(comparison_bits))), - function array_2Dindex(row::UInt32, col::UInt32; nrows = dims[1])::UInt64 - return UInt64(row) + UInt64(col - 1) * UInt64(nrows) - end) - end -end diff --git a/src/tableCounts.jl b/src/tableCounts.jl deleted file mode 100755 index cef4809..0000000 --- a/src/tableCounts.jl +++ /dev/null @@ -1,110 +0,0 @@ -function generate_bitvector(tid::Int, vlen::Int)::Vector{Bool} - return [(((tid-1) >> j) & 1) == 1 for j in 0:(vlen - 1)] -end - -function find_matches(res_matrix::SubArray,id::Int,vlen::Int) - targ_vec=generate_bitvector(id, vlen) - match_count::UInt64=0 - row_count::UInt64=0 - matches::Vector{UInt64}=[] - for i in eachrow(res_matrix) - row_count += 1 - if i == targ_vec - match_count += 1 - append!(matches, row_count) - end - end - return (targ_vec, match_count, matches) -end - -function find_matches(res_matrix::Matrix,id::Int,vlen::Int) - targ_vec=generate_bitvector(id, vlen) - match_count::UInt64=0 - row_count::UInt64=0 - matches::Vector{UInt64}=[] - for i in eachrow(res_matrix) - row_count += 1 - if i == targ_vec - match_count += 1 - append!(matches, row_count) - end - end - return (targ_vec, match_count, matches) -end - -function tableCounts_bf(res_matrix::SubArray) - vlen=size(res_matrix)[2] - nlevels=2^vlen - out=fill((falses(0),UInt64(0),zeros(UInt64,0)),nlevels) - Threads.@threads for i in 1:nlevels - out[i]=find_matches(res_matrix,i, vlen) - end - return out -end - -function tableCounts_bf(res_matrix::Matrix) - vlen=size(res_matrix)[2] - nlevels=2^vlen - out=fill((falses(0),UInt64(0),zeros(UInt64,0)),nlevels) - Threads.@threads for i in 1:nlevels - out[i]=find_matches(res_matrix,i, vlen) - end - return out -end - -function tableCounts_dict(res_matrix::SubArray) - tids=1:Threads.nthreads() - resultvec=[Dict() for i in tids] - countvec=[Dict() for i in tids] - Threads.@threads for i in eachrow(res_matrix) - resdict=resultvec[Threads.threadid()] - cntdict=countvec[Threads.threadid()] - if haskey(resdict, i) - push!(resdict[i], i.indices[1]) - cntdict[i] += 1 - else - resdict[i] = [i.indices[1]] - cntdict[i] = 1 - end - end - finalcounts=Dict() - finalindices=Dict() - for i in tids - mergewith!(append!,finalindices, resultvec[i]) - mergewith!(+, finalcounts,countvec[i]) - end - return (finalindices, finalcounts) -end - -function tableCounts_dict(res_matrix::Matrix) - tids=1:Threads.nthreads() - resultvec=[Dict() for i in tids] - countvec=[Dict() for i in tids] - Threads.@threads for i in eachrow(res_matrix) - resdict=resultvec[Threads.threadid()] - cntdict=countvec[Threads.threadid()] - if haskey(resdict, i) - push!(resdict[i], i.indices[1]) - cntdict[i] += 1 - else - resdict[i] = [i.indices[1]] - cntdict[i] = 1 - end - end - finalcounts=Dict() - finalindices=Dict() - for i in tids - mergewith!(append!,finalindices, resultvec[i]) - mergewith!(+, finalcounts,countvec[i]) - end - return (finalindices, finalcounts) -end - -# TODO implement the varnames > 3 version but dict version will do for now. -function tableCounts(res_matrix,varnames) - # if length(varnames) > 3 - return tableCounts_dict(res_matrix) - # else - #return tableCounts_bf(res_matrix) - #end -end diff --git a/test/runtests.jl b/test/runtests.jl index 6c7cc1c..704cdd7 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,4 +1,3 @@ - using FastLink using Test import DataFrames: DataFrame, nrow @@ -18,6 +17,10 @@ dfB=CSV.read("$(b_fil)/dfB.csv", DataFrame, pool=true, missingstring=["", "NA"]) +dfA.ida = hash.(eachrow(dfA)) +dfB.idb = hash.(eachrow(dfB)) + + varnames = ["firstname","middlename", "lastname","housenum"] cut_a = [0.92,0.92,0.92,1] cut_p = [0.88,0.88,0.88,2] @@ -28,12 +31,10 @@ stringdist_method = ["jw","jw","jw","jw"] upper_case = [false,false,false,false] jw_weight = [0.1,0.1,0.1,0.0] - - - @testset "Testing FastLink Basic Run" begin @info "Executing fastLink()" results=fastLink(dfA,dfB,varnames, + ("ida", "idb"), match_method=match_method, partials=partials, fuzzy=fuzzy, @@ -41,22 +42,18 @@ jw_weight = [0.1,0.1,0.1,0.0] stringdist_method=stringdist_method, cut_a=cut_a, cut_p=cut_p, - jw_weight=jw_weight)() + jw_weight=jw_weight) @test true @info "Correct # of Matches" - @test length(results[2]) == 50 - @info "Number of patterns == 26" - @test length(results[1].patterns_b) == 26 + @test sum(results.patterns_w.counts[results.patterns_w.ismatch]) == 50 + @info "Number of patterns == 18" + @test length(results.patterns_b) == 18 @info "Number of counts == (N₁×N₂) " - @test sum(results[1].patterns_w.counts) == nrow(dfA) * nrow(dfB) + @test sum(results.patterns_w.counts) == nrow(dfA) * nrow(dfB) @info "Ρ(𝑢) >=.999" - @test results[1].p_u >= .999 + @test results.p_u >= .999 @info "Ρ(𝑚) <= .0005" - @test results[1].p_m <= .0005 - - - - + @test results.p_m <= .0005 true end