From a1930802376d0e81de3773d880b43d0fc9b0fe99 Mon Sep 17 00:00:00 2001 From: jw2249a Date: Mon, 6 May 2024 14:00:53 +0000 Subject: [PATCH] full json and nested parameters update --- Project.toml | 3 +- README.md | 7 +- scratch.jl | 42 ++- src/FastLink.jl | 42 ++- src/emlink.jl | 232 ++++++++++++++--- src/fastlink/fastlink.jl | 435 +++++++------------------------ src/gammas/Gammas.jl | 6 +- src/gammas/gammaCKfuzzy.jl | 35 ++- src/gammas/gammaCKpar.jl | 122 +++------ src/gammas/gammaKpar.jl | 85 +++++- src/gammas/gammaNUMCKpar.jl | 2 +- src/getMatches.jl | 39 --- src/matchPatterns.jl | 96 ++----- src/patterns.jl | 262 +++++++++++++++++++ src/settings/settings.jl | 104 ++++++++ src/term_frequency_adjustment.jl | 176 +++++++++++++ test/runtests.jl | 73 ++++-- test_parameters.json | 26 ++ 18 files changed, 1136 insertions(+), 651 deletions(-) delete mode 100755 src/getMatches.jl create mode 100644 src/patterns.jl create mode 100644 src/settings/settings.jl create mode 100644 src/term_frequency_adjustment.jl create mode 100644 test_parameters.json diff --git a/Project.toml b/Project.toml index dfd8b06..0d844ac 100644 --- a/Project.toml +++ b/Project.toml @@ -1,12 +1,13 @@ name = "FastLink" uuid = "11f39cfd-5548-489f-be9a-f4ad0ff6eadc" authors = ["Jack R. Williams "] -version = "0.0.2" +version = "0.0.5" [deps] DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8" Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f" +JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6" PooledArrays = "2dfb63ee-cc39-5dd5-95bd-886bf059d720" StaticStrings = "4db0a0c5-418a-4e1d-8806-cb305fe13294" StringDistances = "88034a9c-02f8-509d-84a9-84ec65e18404" diff --git a/README.md b/README.md index 1bf8c4e..aca16ac 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,6 @@ The purpose of FastLink.jl is to bring a fast record linkage package to the juli ___________________________ ### Using the fastLink function - The basic arguments for the `fastLink` function to run are - `dfA`: A `DataFrame` table of records to be matched. @@ -44,7 +43,7 @@ The optional parameters are - `tol_em`: Convergence tolerance for the EM Algorithm. (default `1e-05`) -- `threshold_match`: Lower bound for the posterior probability that will act as a cutoff for matches. +- `threshold_match`: Lower bound for the posterior probability that will act as a cutoff for matches. Default `[0.85]`. - `prior_lambda::Float64`: Default 0.0. @@ -54,7 +53,7 @@ The optional parameters are - `w_pi::Float64`: Default 0.0. -- `dedupe_matches`: Whether to dedupe the matches within the dataset. Default `[0.85]`. +- `dedupe_matches`: Whether to dedupe the matches within the dataset. __________________ @@ -93,7 +92,7 @@ A `NamedTuple` with these vars: - `tf_adj_table` - A `Vector{DataFrame}` that has a DataFrame for each match pattern and a row in each DataFrame for each comparison appended with the letter of their corresponding dataset. - `varnames` - A `Vector{String}` of the input variable names - + - `zeta_j` - A `Vector{Float64}` with the posterior match probabilities for each agreement pattern. # Examples diff --git a/scratch.jl b/scratch.jl index ee80e9f..fe8b999 100755 --- a/scratch.jl +++ b/scratch.jl @@ -1,23 +1,15 @@ -using Pkg -Pkg.develop(path=".") -Pkg.precompile() +using FastLink using DataFrames -using BenchmarkTools using CSV -using FastLink using PooledArrays import Pkg.Artifacts: @artifact_str - +#Pkg.add(url="https://github.com/jw2249a/FastLink.jl") +using StatsBase +using JSON a_fil = @artifact_str "dfA" b_fil = @artifact_str "dfB" -varnames=["firstname","middlename", "lastname","housenum"] -match_method=["string", "string","string", "float"] -cut_a=[0.92,0.92,0.92,1] -cut_p=[0.88,0.88,0.88,2] - - dfA=CSV.read("$(a_fil)/dfA.csv", DataFrame, ntasks=1, @@ -27,21 +19,25 @@ dfB=CSV.read("$(b_fil)/dfB.csv", DataFrame, ntasks=1, pool=true, missingstring=["", "NA"]) -dfA.id = hash.(eachrow(dfA)) -dfB.id2 = hash.(eachrow(dfB)) -for var in varnames[1:3] - dfA[!,var] = PooledArray(passmissing(x->uppercase(x)).(dfA[:,var])) - dfB[!,var] = PooledArray(passmissing(x->uppercase(x)).(dfB[:,var])) -end +config = JSON.parsefile("test_parameters.json") -results=fastLink(dfA,dfB,varnames,("id","id2"), - match_method=match_method, - term_freq_adjustment=[true], - cut_a=cut_a,cut_p=cut_p, - threshold_match = 0.85) +dfA.id = hash.(eachrow(dfA)) +dfB.id2 = hash.(eachrow(dfB)) +varnames=["firstname","middlename", "lastname","housenum"] + +for var in varnames + if eltype(dfA[:,var]) <: AbstractString + dfA[!,var] = PooledArray(passmissing(x->uppercase(x)).(dfA[:,var])) + dfB[!,var] = PooledArray(passmissing(x->uppercase(x)).(dfB[:,var])) + else + dfA[!,var] = Vector(dfA[!,var]) + dfB[!,var] = Vector(dfB[!,var]) + end +end +result=fastLink(dfA, dfB, config) diff --git a/src/FastLink.jl b/src/FastLink.jl index dafdded..b66af91 100644 --- a/src/FastLink.jl +++ b/src/FastLink.jl @@ -1,7 +1,6 @@ module FastLink using DataFrames import PooledArrays: PooledVector -import Distributions: Dirichlet,rand # match constants const nonmatch::UInt8 = UInt8(0) @@ -9,17 +8,48 @@ const match1::UInt8 = UInt8(1) const match2::UInt8 = UInt8(2) const missingval::UInt8 = UInt8(3) +const STRING_DISTANCE_METHODS = Dict("jw" => "jw", + "jarowinkler" => "jw", + "jaro winkler" => "jw", + "jaro-winkler" => "jw", + "jaro" => "jaro", + "dl" => "dl", + "dameraulevenshtein" => "dl", + "damerau levenshtein" => "dl", + "damerau-levenshtein" => "dl", + "lv" => "lv", + "levenshtein" => "lv", + "hamming" => "hamming", + "ro" => "ro", + "ratcliffobershelp" => "ro", + "ratcliff obershelp" => "ro", + "ratcliff-obershelp" => "ro", + "osa" => "osa", + "optimal string alignment" => "osa", + "optimalstringalignment" => "osa" + ) + +include("settings/settings.jl") include("DiBitMatrix.jl") -using .DiBitMat +include("matchPatterns.jl") include("gammas/Gammas.jl") +include("term_frequency_adjustment.jl") +include("emlink.jl") +include("patterns.jl") + +using .settings +using .DiBitMat +using .matchpatterns using .Gammas +using .emlink +using .tf +using .patterns -include("matchPatterns.jl") -include("emlink.jl") -include("getMatches.jl") include("fastlink/fastlink.jl") -export(fastLink) +export gammaCKpar!, gammaKpar!, gammaCKfuzzy!, gammaNUMCKpar!, DiBitMatrix, namedtuple, fetch_parameters, retrieve, parse_configuration, remove_keys, emlinkMARmov, STRING_DISTANCE_METHODS, match1, match2, missingval, nonmatch, indices_to_uids, process_comparisons, fastLink + +#export(fastLink) end # module FastLink diff --git a/src/emlink.jl b/src/emlink.jl index fd2b055..70539c1 100644 --- a/src/emlink.jl +++ b/src/emlink.jl @@ -1,3 +1,10 @@ +module emlink +using DataFrames +import Distributions: Dirichlet,rand +using ..matchpatterns +import ..nonmatch, ..match1, ..match2, ..missingval +export emlinkMARmov + function recursive_flatten(x) result = [] for item in x @@ -9,11 +16,12 @@ function recursive_flatten(x) end return result end + function logxpy(lx,ly) return maximum.(eachrow([lx ly])) .+ log1p.(exp.(-abs.(lx .- ly))) end -function probability_vector(x::Vector{Float64}) +function probability_vector(x::Vector{BigFloat}) return x./sum(x) end @@ -21,23 +29,35 @@ function skipmissing_equality(x, y) return replace(x .== y, missing=>false) end + + """ Expectation maximization function. """ -function emlinkMARmov(patterns::MatchPatterns, obs_a::Int, obs_b::Int,varnames::Vector{String}; +function emlinkMARmov(patterns::MatchPatterns, dims::Tuple{Int,Int},varnames::Vector{String}; p_m=0.1,iter_max=5000,tol=1e-05, prior_lambda=0.0, w_lambda=0.0, - prior_pi=0.0,w_pi=0.0, address_field=Vector{Bool}()) + prior_pi=0.0, w_pi=0.0, address_field=Vector{Bool}(), threshold_match=0.85, u_b=1e10, + digit_precision=16) + obs_a, obs_b = dims + + # precision for comparisons + old=precision(BigFloat) + setprecision(BigFloat,digit_precision; base=10) + atexit(x->setprecision(BigFloat,old)) # Initialize count and delta for while loop and break point - delta = Float64(1) + delta = BigFloat(1) count = 1 - + p_m=BigFloat(p_m) # Info for EM algorithm p_u = 1 - p_m nfeatures=length(varnames) gamma_jk=patterns.patterns n_j = length.(patterns.indices) N = length(n_j) - + psi = 0 + mu = 0 + alpha0_address = 1.0 + alpha1_address = 1.0 # if λ priors are declared if prior_lambda == 0 psi = 1 @@ -45,7 +65,7 @@ function emlinkMARmov(patterns::MatchPatterns, obs_a::Int, obs_b::Int,varnames:: else if w_lambda == 0 @error "If declaring a lambda prior, you need to declare weights via w_lambda." - elseif w_lambda > 0 | w_lambda < 0 + elseif w_lambda > 1 || w_lambda < 0 @error "w_lambda must be between 0 and 1." elseif w_lambda == 1 w_lambda = 1 - 1e-05 @@ -55,7 +75,154 @@ function emlinkMARmov(patterns::MatchPatterns, obs_a::Int, obs_b::Int,varnames:: mu = prior_lambda * c_lambda * obs_a * obs_b + 1 psi = (1 - prior_lambda) * mu / prior_lambda end + + # if pi prior is declared + if prior_pi == 0 + alpha0_address = 1 + alpha1_address = 1 + address_field = falses(nfeatures) + else + if prior_lambda == 0 + @error "If declaring a prior on pi, you need to declare lambda prior." + elseif w_pi == 0 + @error "If providing a prior for pi, please specify the weight using w_pi" + elseif w_pi < 0 | w_pi > 1 + @error "w_pi must be between 0 and 1." + elseif w_pi == 1 + w_pi = 1 - 1e-05 + end + + c_pi = w_pi / (1 - w_pi) + exp_match = prior_lambda * obs_a * obs_b + + # Optimal hyperparameters for pi + alpha0_address = c_pi * prior_pi * exp_match + 1 + alpha1_address = alpha0_address * (1 - prior_pi) / prior_pi + end + # initialize variables that need value to be returned + zeta_j=BigFloat(0) + num_prod = zeros(BigFloat,0) + p_old=zeros(BigFloat, 0) + p_gamma_jm = zeros(Union{Missing,BigFloat},0) + p_gamma_ju = zeros(Union{Missing,BigFloat},0) + # Initializing matrices for conditional probabilities + p_gamma_km = fill(BigFloat[], nfeatures) + p_gamma_ku = fill(BigFloat[], nfeatures) + vals_gamma_jk = fill(Vector{Union{Missing,Int}}(),nfeatures) + uvals_gamma_jk = fill(Int[],nfeatures) + p_gamma_kjm = missings(Union{Missing,BigFloat}, (nfeatures,N)) + p_gamma_kju = missings(Union{Missing,BigFloat}, (nfeatures,N)) + for c in 1:nfeatures + vals_gamma_jk[c] = [i[c] == missingval ? missing : sum(i[c]) for i in gamma_jk] + uvals_gamma_jk[c] = sort(unique([i for i in vals_gamma_jk[c] if !ismissing(i)])) + c_m = BigFloat.(collect(1:50:(length(uvals_gamma_jk[c])*50))) + p_gamma_km[c] = sort(rand(Dirichlet(c_m),1)[:],rev=false) + p_gamma_ku[c] = sort(rand(Dirichlet(c_m),1)[:],rev=true) + end + + while abs(delta) >= tol + p_old=recursive_flatten([p_m,p_u,p_gamma_km,p_gamma_ku]) + for i in 1:nfeatures + p_gamma_kjm[i,:] = [ismissing(j) ? missing : p_gamma_km[i][findfirst(uvals_gamma_jk[i] .== j)] for j in vals_gamma_jk[i]] + p_gamma_kju[i,:] = [ismissing(j) ? missing : p_gamma_ku[i][findfirst(uvals_gamma_jk[i] .== j)] for j in vals_gamma_jk[i]] + end + p_gamma_jm = sum.(skipmissing.(eachcol(log.(p_gamma_kjm)))) + p_gamma_ju = sum.(skipmissing.(eachcol(log.(p_gamma_kju)))) + log_prod_gamma_jm = p_gamma_jm .+ log(p_m) + log_prod_gamma_ju = p_gamma_ju .+ log(p_u) + zeta_j = exp.(log_prod_gamma_jm - logxpy(log_prod_gamma_jm,log_prod_gamma_ju)) + num_prod = exp.(log.(n_j) + log.(zeta_j)) + p_m = exp(log(sum(num_prod) + mu - 1) - log(psi - mu + sum(n_j))) + p_u = 1-p_m + + for i in 1:nfeatures + km_prob=sort([sum(num_prod[findall(skipmissing_equality(vals_gamma_jk[i], uvals_gamma_jk[i][j]))]) + for j in 1:length(uvals_gamma_jk[i])],rev=false) + if address_field[i] + km_prob += append!([alpha0_address], [alpha1_address for i in 1:(length(uvals_gamma_jk[i])-1)]) + end + p_gamma_km[i] = + probability_vector(km_prob) + p_gamma_ku[i] = + sort(probability_vector([let sub1 = sub=findall(skipmissing_equality(vals_gamma_jk[i], uvals_gamma_jk[i][j])); + sum(n_j[sub] - num_prod[sub]) + end + for j in 1:length(uvals_gamma_jk[i])]),rev=true) + end + delta = maximum(BigFloat.(abs.(recursive_flatten([p_m,p_u,p_gamma_km,p_gamma_ku]) - p_old))) + + count += 1 + if count > iter_max + @warn "The EM algorithm has run for the specified number of iterations but has not converged yet." + break + end + end + weights = p_gamma_jm - p_gamma_ju + p_gamma_jm=probability_vector(exp.(p_gamma_jm)) + p_gamma_ju=probability_vector(exp.(p_gamma_ju)) + data_gamma_ = DataFrame(vals_gamma_jk, :auto) + rename!(data_gamma_,["gamma_$i" for i in 1:ncol(data_gamma_)]) + data_w=hcat(data_gamma_,DataFrame((counts=n_j,weights,p_gamma_jm,p_gamma_ju))) + + ismatch = zeta_j .>= threshold_match .&& data_w.weights .<= u_b + + data_w.zeta_j = zeta_j + + + return Dict("p_m" => p_m, "p_u" => p_u, + "number_of_comparisons" => *(dims...), + "number_of_unique_patterns" => nrow(data_w), + "iter_converge" => count, + "threshold_match" => threshold_match, + "varnames" => varnames, + "patterns_w" => data_w, + "pgamma_km" => p_gamma_km, "pgamma_ku" => p_gamma_ku, + "pgamma_jm" => p_gamma_jm, "pgamma_ju" => p_gamma_ju) +end + + + +function emlinkMARmov(gamma_jk::Vector{Vector{UInt8}},n_j::Vector{Int64}, dims::Tuple{Int,Int},varnames::Vector{String}; + p_m=0.1,iter_max=5000,tol=1e-05, prior_lambda=0.0, w_lambda=0.0, + prior_pi=0.0,w_pi=0.0, address_field=Vector{Bool}(), threshold_match=0.85, u_b=1e10, + digit_precision=16) + obs_a, obs_b = dims + + # precision for comparisons + old=precision(BigFloat) + setprecision(BigFloat,digit_precision; base=10) + atexit(x->setprecision(BigFloat,old)) + # Initialize count and delta for while loop and break point + delta = BigFloat(1) + count = 1 + p_m=BigFloat(p_m) + # Info for EM algorithm + p_u = 1 - p_m + nfeatures=length(varnames) + N = length(n_j) + psi = 0 + mu = 0 + alpha0_address = 1.0 + alpha1_address = 1.0 + # if λ priors are declared + if prior_lambda == 0 + psi = 1 + mu = 1 + else + if w_lambda == 0 + @error "If declaring a lambda prior, you need to declare weights via w_lambda." + elseif w_lambda > 1 || w_lambda < 0 + @error "w_lambda must be between 0 and 1." + elseif w_lambda == 1 + w_lambda = 1 - 1e-05 + end + c_lambda = w_lambda/(1-w_lambda) + # hyperparameters for lambda + mu = prior_lambda * c_lambda * obs_a * obs_b + 1 + psi = (1 - prior_lambda) * mu / prior_lambda + end + # if pi prior is declared if prior_pi == 0 alpha0_address = 1 @@ -81,22 +248,22 @@ function emlinkMARmov(patterns::MatchPatterns, obs_a::Int, obs_b::Int,varnames:: end # initialize variables that need value to be returned - zeta_j=0.0 - num_prod = zeros(Float64,0) - p_old=zeros(Float64, 0) - p_gamma_jm = zeros(Union{Missing,Float64},0) - p_gamma_ju = zeros(Union{Missing,Float64},0) + zeta_j=BigFloat(0) + num_prod = zeros(BigFloat,0) + p_old=zeros(BigFloat, 0) + p_gamma_jm = zeros(Union{Missing,BigFloat},0) + p_gamma_ju = zeros(Union{Missing,BigFloat},0) # Initializing matrices for conditional probabilities - p_gamma_km = fill(Float64[], nfeatures) - p_gamma_ku = fill(Float64[], nfeatures) + p_gamma_km = fill(BigFloat[], nfeatures) + p_gamma_ku = fill(BigFloat[], nfeatures) vals_gamma_jk = fill(Vector{Union{Missing,Int}}(),nfeatures) uvals_gamma_jk = fill(Int[],nfeatures) - p_gamma_kjm = missings(Union{Missing,Float64}, (nfeatures,N)) - p_gamma_kju = missings(Union{Missing,Float64}, (nfeatures,N)) + p_gamma_kjm = missings(Union{Missing,BigFloat}, (nfeatures,N)) + p_gamma_kju = missings(Union{Missing,BigFloat}, (nfeatures,N)) for c in 1:nfeatures vals_gamma_jk[c] = [i[c] == missingval ? missing : sum(i[c]) for i in gamma_jk] uvals_gamma_jk[c] = sort(unique([i for i in vals_gamma_jk[c] if !ismissing(i)])) - c_m = collect(1:50:(length(uvals_gamma_jk[c])*50)) + c_m = BigFloat.(collect(1:50:(length(uvals_gamma_jk[c])*50))) p_gamma_km[c] = sort(rand(Dirichlet(c_m),1)[:],rev=false) p_gamma_ku[c] = sort(rand(Dirichlet(c_m),1)[:],rev=true) end @@ -118,7 +285,7 @@ function emlinkMARmov(patterns::MatchPatterns, obs_a::Int, obs_b::Int,varnames:: for i in 1:nfeatures km_prob=sort([sum(num_prod[findall(skipmissing_equality(vals_gamma_jk[i], uvals_gamma_jk[i][j]))]) - for j in 1:length(uvals_gamma_jk[i])],rev=false) + for j in 1:length(uvals_gamma_jk[i])],rev=false) if address_field[i] km_prob += append!([alpha0_address], [alpha1_address for i in 1:(length(uvals_gamma_jk[i])-1)]) end @@ -130,28 +297,35 @@ function emlinkMARmov(patterns::MatchPatterns, obs_a::Int, obs_b::Int,varnames:: end for j in 1:length(uvals_gamma_jk[i])]),rev=true) end - delta = maximum(abs.(recursive_flatten([p_m,p_u,p_gamma_km,p_gamma_ku]) - p_old)) + delta = maximum(BigFloat.(abs.(recursive_flatten([p_m,p_u,p_gamma_km,p_gamma_ku]) - p_old))) - count +=1 + count += 1 if count > iter_max @warn "The EM algorithm has run for the specified number of iterations but has not converged yet." break end end weights = p_gamma_jm - p_gamma_ju + p_gamma_jm=probability_vector(exp.(p_gamma_jm)) p_gamma_ju=probability_vector(exp.(p_gamma_ju)) data_gamma_ = DataFrame(vals_gamma_jk, :auto) rename!(data_gamma_,["gamma_$i" for i in 1:ncol(data_gamma_)]) data_w=hcat(data_gamma_,DataFrame((counts=n_j,weights,p_gamma_jm,p_gamma_ju))) - return (zeta_j = zeta_j, p_m = p_m, p_u = p_u, - pgamma_km = p_gamma_km, pgamma_ku = p_gamma_ku, - pgamma_jm = p_gamma_jm, pgamma_ju = p_gamma_ju, - patterns_w = data_w, - patterns_b = gamma_jk, - indices = patterns.indices, - iter_converge = count, - obs_a = obs_a, obs_b = obs_b, - varnames = varnames) + ismatch = zeta_j .>= threshold_match .&& data_w.weights .<= u_b + + data_w.zeta_j = zeta_j + + return Dict("p_m" => p_m, "p_u" => p_u, + "number_of_comparisons" => *(dims...), + "number_of_unique_patterns" => nrow(data_w), + "iter_converge" => count, + "threshold_match" => threshold_match, + "varnames" => varnames, + "patterns_w" => data_w, + "pgamma_km" => p_gamma_km, "pgamma_ku" => p_gamma_ku, + "pgamma_jm" => p_gamma_jm, "pgamma_ju" => p_gamma_ju) +end + end diff --git a/src/fastlink/fastlink.jl b/src/fastlink/fastlink.jl index eeefcc3..fbd8a8b 100755 --- a/src/fastlink/fastlink.jl +++ b/src/fastlink/fastlink.jl @@ -1,62 +1,4 @@ -function check_input_lengths(x, var_length::Int, varname::String) - if typeof(x) == "string" - x=[x] - end - if length(x) == var_length - return x - elseif length(x) == 1 - return [x[1] for i in 1:var_length] - else - @error "Number of inputs for $varname is > 1 and < $var_length (the number of vars declared)." - end -end -# Ensure that vartypes match each other. if string then both must be strings -# TODO: implement tighter parameter checks and coercion where needed. -function check_var_types(x::DataFrame, y::DataFrame, varnames::Vector{String},match_method::Vector{String}, partials::Vector{Bool}) - xtypes=eltype.(eachcol(select(x, varnames))) - ytypes=eltype.(eachcol(select(y, varnames))) - # if partials is empty then all partial - if partials == [] - partials = [true for i in xtypes] - end - comparison_levels = Int[] - # defaults if no match methods declared - if match_method == [] - for (ix,iy,iv,partial) in zip(xtypes,ytypes, varnames, partials) - comparison_level = typeof(ix) <: Union || typeof(iy) <: Union || partial ? 2 : 1 - if (ix <: Union{Missing,AbstractString}) && (iy <: Union{Missing,AbstractString}) - match_type="string" - elseif (ix <: Union{Missing,Number}) && (iy <: Union{Missing,Number}) - if ix <: Union{Missing, Float64} || iy <: Union{Missing,Float64} - match_type="float" - else - match_type="numeric" - end - elseif (ix <: Union{Missing,Bool}) && (iy <: Union{Missing,Bool}) - match_type="bool" - else - @error "*(VAR $iv)*: dfA type $ix does not match dfB type $ix or type not known for matching" - end - push!(match_method, match_type) - push!(comparison_levels, comparison_level) - end - else - for (ix,iy,iv,im,partial) in zip(xtypes,ytypes, varnames,match_method, partials) - comparison_level = typeof(ix) <: Union || typeof(iy) <: Union || partial ? 2 : 1 - push!(comparison_levels, comparison_level) - end - end - return match_method,comparison_levels -end - -function remove_no_matched_var_indices(resultsEM) - for i in eachindex(resultsEM.patterns_b) - if (match1 in resultsEM.patterns_b[i]) ⊽ (match2 in resultsEM.patterns_b[i]) - resultsEM.indices[i] = Vector{ComparisonIndex}() - end - end -end """ @@ -91,309 +33,114 @@ pgamma_ku tf_adj_table varnames zeta_j ```julia matched_data = fastLink(dfA, dfB, ["firstname", "lastname", "city"]) """ -function fastLink(dfA::DataFrame, dfB::DataFrame, - varnames::Vector{String}, - idvar::Tuple{String,String}; - term_freq_adjustment=[false], - match_method=String[], - partials=[true], - upper_case=[true], - stringdist_method = ["jw"], - cut_a = [0.92], cut_p = [0.88], - jw_weight = [0.1], - address_field = [false], - tol_em = 1e-05, - prior_lambda = 0.0, - w_lambda = 0.0, - prior_pi = 0.0, - w_pi = 0.0, - threshold_match = 0.85, - dedupe_matches = true) +function fastLink(dfA::DataFrame, dfB::DataFrame, config::Dict{String,Any}) + # idvar to Tuple + idvar=Tuple(config["idvar"]) # dims + varnames = retrieve(config,"varname") numvars=length(varnames) - obs_a=nrow(dfA) - obs_b=nrow(dfB) - dims = (obs_a,obs_b) - - @info "Checking settings for $numvars declared variables." - partials = check_input_lengths(partials, numvars, "partials") - upper_case = check_input_lengths(upper_case, numvars, "upper_case") - jw_weight = check_input_lengths(jw_weight, numvars, "jw_weight") - cut_a = check_input_lengths(cut_a, numvars, "cut_a") - cut_p = check_input_lengths(cut_p, numvars, "cut_p") - address_field = check_input_lengths(address_field, numvars, "address_field") - term_freq_adjustment = check_input_lengths(term_freq_adjustment, numvars, "term_freq_adjustment") - stringdist_method = check_input_lengths(stringdist_method, numvars, "stringdist_method") - - vartypes, comparison_levels = check_var_types(dfA,dfB,varnames,match_method,partials) + _dims = (nrow(dfA),nrow(dfB)) # results table - res = [DiBitMatrix(obs_a,obs_b) for _ in varnames] - - # term frequency tables - tf_table_x = [ones(Float16,dims[1]) for _ in varnames] - tf_table_y = [ones(Float16,dims[2]) for _ in varnames] - - # allow missing for comparisons - allowmissing!(dfA) - allowmissing!(dfB) - - - # iterate through variables and execute function over them - for i in eachindex(varnames) - @info "Now matching var $(varnames[i]) using $(match_method[i])" - if match_method[i] == "fuzzy" - if term_freq_adjustment[i] - gammaCKfuzzy!(dfA[!,varnames[i]], - dfB[!,varnames[i]], - res[i], - dims, - view(tf_table_x[i],:), - view(tf_table_y[i],:), - cut_a=cut_a[i], - cut_b=cut_p[i], - upper=upper_case[i], - w=jw_weight[i], - partial=partials[i]) - else - gammaCKfuzzy!(dfA[!,varnames[i]], - dfB[!,varnames[i]], - res[i], - dims, - cut_a=cut_a[i], - cut_b=cut_p[i], - upper=upper_case[i], - w=jw_weight[i], - partial=partials[i]) - end - elseif match_method[i] == "string" - if term_freq_adjustment[i] - gammaCKpar!(dfA[!,varnames[i]], - dfB[!,varnames[i]], - res[i], - dims, - view(tf_table_x[i],:), - view(tf_table_y[i],:), - distmethod=stringdist_method[i], - cut_a=cut_a[i], - cut_b=cut_p[i], - w=jw_weight[i], - partial=partials[i]) - else - gammaCKpar!(dfA[!,varnames[i]], - dfB[!,varnames[i]], - res[i], - dims, - distmethod=stringdist_method[i], - cut_a=cut_a[i], - cut_b=cut_p[i], - w=jw_weight[i], - partial=partials[i]) - end - elseif match_method[i] == "exact" || match_method[i] == "bool" - if term_freq_adjustment[i] - gammaKpar!(dfA[!,varnames[i]], - dfB[!,varnames[i]], - res[i], - dims, - view(tf_table_x[i],:), - view(tf_table_y[i],:)) - else - gammaKpar!(dfA[!,varnames[i]], - dfB[!,varnames[i]], - res[i], - dims) - end - elseif match_method == "numeric" || match_method=="float" || match_method == "int" - gammaNUMCKpar!(dfA[!,varnames[i]], - dfB[!,varnames[i]], - res[i], - cut_a=cut_a[i], - cut_b=cut_p[i], - partial=partials[i]) - end - end - @info "Getting table counts" - counts = get_match_patterns(res) - - @info "Running expectation maximization function" - resultsEM = emlinkMARmov(counts, obs_a,obs_b, - varnames,tol=tol_em, - prior_lambda=prior_lambda, w_lambda=w_lambda, - prior_pi=prior_pi, w_pi=w_pi, - address_field=address_field) - # testing removing uncessessary indices (where no obs exist) - #remove_no_matched_var_indices(resultsEM) - # adding uids - - if any(term_freq_adjustment) - resultsEM = merge(resultsEM, (matched_ids = indices_to_uids(dfA[!, idvar[1]],dfB[!, idvar[2]],resultsEM.indices), - tf_adj_table = tf_adj_table(resultsEM,varnames,tf_table_x,tf_table_y))) - - else - resultsEM = merge(resultsEM, (matched_ids = indices_to_uids(dfA[!, idvar[1]],dfB[!, idvar[2]],resultsEM.indices),)) - end + res = Dict(v=>DiBitMatrix(_dims...) for v in varnames) - + # fetch parameters for varnames + parameters=Dict(v=>fetch_parameters(config,v) for v in varnames) - @info "Retrieving matches" - getMatches!(resultsEM,threshold_match=threshold_match) - return (resultsEM) -end - -# version of fast link that i can pass to via named tuples -function fastLink(dfA::DataFrame, dfB::DataFrame; - varnames=String[], - match_method=String[], - idvar=String[], - partials=[true], - term_freq_adjustment=[false], - upper_case=[true], - stringdist_method = ["jw"], - cut_a = [0.92], cut_p = [0.88], - jw_weight = [0.1], - address_field = [false], - tol_em = 1e-05, - prior_lambda = 0.0, - w_lambda = 0.0, - prior_pi = 0.0, - w_pi = 0.0, - threshold_match = 0.85, - dedupe_matches = true) - - # idvar to Tuple - idvar = (idvar[1],idvar[2]) - # dims - numvars=length(varnames) - obs_a=nrow(dfA) - obs_b=nrow(dfB) - dims = (obs_a,obs_b) + tf_tables = Dict{String, Vector{Vector{Float16}}}(v=>[ones(Float16,_dims[1]),ones(Float16,_dims[2])] for v in varnames if haskey(parameters[v], "tf_adjust") && parameters[v]["tf_adjust"]) - @info "Checking settings for $numvars declared variables." - partials = check_input_lengths(partials, numvars, "partials") - upper_case = check_input_lengths(upper_case, numvars, "upper_case") - jw_weight = check_input_lengths(jw_weight, numvars, "jw_weight") - cut_a = check_input_lengths(cut_a, numvars, "cut_a") - cut_p = check_input_lengths(cut_p, numvars, "cut_p") - stringdist_method = check_input_lengths(stringdist_method, numvars, "stringdist_method") - term_freq_adjustment = check_input_lengths(term_freq_adjustment, numvars, "term_freq_adjustment") - stringdist_method = check_input_lengths(stringdist_method, numvars, "stringdist_method") - - vartypes, comparison_levels = check_var_types(dfA,dfB,varnames,match_method,partials) - - # results table - res = [DiBitMatrix(obs_a,obs_b) for _ in varnames] - - # term frequency tables - tf_table_x = [ones(Float16,dims[1]) for _ in varnames] - tf_table_y = [ones(Float16,dims[2]) for _ in varnames] + # structure of the expectation maximization function in order of the ability to be executed + emlink_configuration = parse_configuration(config) # allow missing for comparisons allowmissing!(dfA) allowmissing!(dfB) - - for i in eachindex(varnames) - @info "Now matching var $(varnames[i]) using $(match_method[i])" - if match_method[i] == "fuzzy" - if term_freq_adjustment[i] - gammaCKfuzzy!(dfA[!,varnames[i]], - dfB[!,varnames[i]], - res[i], - dims, - view(tf_table_x[i],:), - view(tf_table_y[i],:), - cut_a=cut_a[i], - cut_b=cut_p[i], - upper=upper_case[i], - w=jw_weight[i], - partial=partials[i]) - else - gammaCKfuzzy!(dfA[!,varnames[i]], - dfB[!,varnames[i]], - res[i], - dims, - cut_a=cut_a[i], - cut_b=cut_p[i], - upper=upper_case[i], - w=jw_weight[i], - partial=partials[i]) - end - elseif match_method[i] == "string" - if term_freq_adjustment[i] - gammaCKpar!(dfA[!,varnames[i]], - dfB[!,varnames[i]], - res[i], - dims, - view(tf_table_x[i],:), - view(tf_table_y[i],:), - distmethod=stringdist_method[i], - cut_a=cut_a[i], - cut_b=cut_p[i], - w=jw_weight[i], - partial=partials[i]) - else - gammaCKpar!(dfA[!,varnames[i]], - dfB[!,varnames[i]], - res[i], - dims, - distmethod=stringdist_method[i], - cut_a=cut_a[i], - cut_b=cut_p[i], - w=jw_weight[i], - partial=partials[i]) - end - elseif match_method[i] == "exact" || match_method[i] == "bool" - if term_freq_adjustment[i] - gammaKpar!(dfA[!,varnames[i]], - dfB[!,varnames[i]], - res[i], - dims, - view(tf_table_x[i],:), - view(tf_table_y[i],:)) - else - gammaKpar!(dfA[!,varnames[i]], - dfB[!,varnames[i]], - res[i], - dims) + for v in varnames + match_method = lowercase(parameters[v]["method"]) + term_freq_adjustment = retrieve(parameters[v],"tf_adjust") |> x -> !isempty(x) && x |> first + + @info "Now matching var $(v) using $(match_method) with tf_adjust: $term_freq_adjustment" + if term_freq_adjustment + comparisons_args=namedtuple(remove_keys(parameters[v], ["method", "varname", "tf_adjust", "tf_adjustment_weight"])) + + if match_method == "fuzzy" + gammaCKfuzzy!(dfA[!,v], + dfB[!,v], + res[v], + view(tf_tables[v][1],:), + view(tf_tables[v][2],:); + comparisons_args...) + elseif match_method == "string" + gammaCKpar!(dfA[!,v], + dfB[!,v], + res[v], + view(tf_tables[v][1],:), + view(tf_tables[v][2],:); + comparisons_args...) + elseif match_method ∈ keys(STRING_DISTANCE_METHODS) + gammaCKpar!(dfA[!,v], + dfB[!,v], + res[v], + view(tf_tables[v][1],:), + view(tf_tables[v][2],:); + distmethod=STRING_DISTANCE_METHODS[match_method], + comparisons_args...) + elseif match_method == "exact" || match_method == "bool" + gammaKpar!(dfA[!,v], + dfB[!,v], + res[v], + view(tf_tables[v][1],:), + view(tf_tables[v][2],:); + comparisons_args...) + elseif match_method == "numeric" || match_method=="float" || match_method == "int" + gammaNUMCKpar!(dfA[!,v], + dfB[!,v], + res[v]; + comparisons_args...) end - elseif match_method == "numeric" || match_method=="float" || match_method == "int" - gammaNUMCKpar!(dfA[!,varnames[i]], - dfB[!,varnames[i]], - res[i], - cut_a=cut_a[i], - cut_b=cut_p[i], - partial=partials[i]) + else + comparisons_args=namedtuple(remove_keys(parameters[v], ["method", "varname"])) + if match_method == "fuzzy" + gammaCKfuzzy!(dfA[!,v], + dfB[!,v], + res[v]; + comparisons_args...) + elseif match_method == "string" + gammaCKpar!(dfA[!,v], + dfB[!,v], + res[v]; + comparisons_args...) + elseif match_method ∈ keys(STRING_DISTANCE_METHODS) + gammaCKpar!(dfA[!,v], + dfB[!,v], + res[v]; + distmethod=STRING_DISTANCE_METHODS[match_method], + comparisons_args...) + elseif match_method == "exact" || match_method == "bool" + gammaKpar!(dfA[!,v], + dfB[!,v], + res[v]; + comparisons_args...) + elseif match_method == "numeric" || match_method=="float" || match_method == "int" + gammaNUMCKpar!(dfA[!,v], + dfB[!,v], + res[v]; + comparisons_args...) + end end - end - @info "Getting table counts" - counts = get_match_patterns(res) + end - @info "Running expectation maximization function" - resultsEM = emlinkMARmov(counts, obs_a,obs_b, - varnames,tol=tol_em, - prior_lambda=prior_lambda, w_lambda=w_lambda, - prior_pi=prior_pi, w_pi=w_pi, - address_field=address_field) - # testing removing uncessessary indices (where no obs exist) - #remove_no_matched_var_indices(resultsEM) - # adding uids + results = process_comparisons(res, emlink_configuration, _dims, parameters, tf_tables) - if any(term_freq_adjustment) - resultsEM = merge(resultsEM, (matched_ids = indices_to_uids(dfA[!, idvar[1]],dfB[!, idvar[2]],resultsEM.indices), - tf_adj_table = tf_adj_table(resultsEM,varnames,tf_table_x,tf_table_y))) - + + if length(results) == 3 + return Dict("ids" => indices_to_uids(dfA[!, config["idvar"][1]],dfB[!, config["idvar"][2]],results[1].indices), + "resultsEM" => results[2], + "resultsTF" => results[3]) else - resultsEM = merge(resultsEM, (matched_ids = indices_to_uids(dfA[!, idvar[1]],dfB[!, idvar[2]],resultsEM.indices),)) + return Dict("ids" => indices_to_uids(dfA[!, config["idvar"][1]],dfB[!, config["idvar"][2]],results[1].indices), + "resultsEM" => results[2]) end - - - - @info "Retrieving matches" - getMatches!(resultsEM,threshold_match=threshold_match) - return (resultsEM) end - - - diff --git a/src/gammas/Gammas.jl b/src/gammas/Gammas.jl index a27bd96..47fd02f 100755 --- a/src/gammas/Gammas.jl +++ b/src/gammas/Gammas.jl @@ -1,8 +1,10 @@ module Gammas import PooledArrays: PooledVector import StringDistances: Jaro, JaroWinkler, Levenshtein, DamerauLevenshtein, compare -using FastLink.DiBitMat -import FastLink: match1, match2, missingval, nonmatch + +using ..DiBitMat +import ..nonmatch, ..match1, ..match2, ..missingval + include("gammaKpar.jl") include("gammaNUMCKpar.jl") diff --git a/src/gammas/gammaCKfuzzy.jl b/src/gammas/gammaCKfuzzy.jl index 82616c0..54357d8 100644 --- a/src/gammas/gammaCKfuzzy.jl +++ b/src/gammas/gammaCKfuzzy.jl @@ -166,17 +166,20 @@ https://tech.popdata.org/speeding-up-Jaro-Winkler-with-rust-and-bitwise-operatio - `vecB::PooledVector`: Target column of dfB for string comparison. - `results::SubArray`: ResultMatrix object's result_matrix. - `array_2Dindex::Function`: ResultMatrix object's array_2Dindex function -- `dims::Tuple`: ResultMatrix object's dims. +- `_dims::Tuple`: ResultMatrix object's _dims. - `cut_a::Float=0.92`: Lower bound for close string distances. - `cut_b::Float=0.88`: Lower bound for partial string distance. - `upper::Bool=true`: Whether input string is uppercase. - `w`: Winkler weight for jw string distance. """ -function gammaCKfuzzy!(vecA::PooledVector,vecB::PooledVector,results::DiBitMatrix,dims::Tuple{Int,Int}; +function gammaCKfuzzy!(vecA::PooledVector,vecB::PooledVector,results::DiBitMatrix; cut_a::Float64=0.92,cut_b::Float64=0.88,upper::Bool=true, w::Float64=0.1,partial::Bool=true) - + if @isdefined(_dims) == false + _dims = (length(vecA), length(vecB)) + end + # functions that update the results view if partial score_value! = score_value2 @@ -206,7 +209,7 @@ function gammaCKfuzzy!(vecA::PooledVector,vecB::PooledVector,results::DiBitMatri Threads.@threads for (query_name,new_a_id) in collect(vecA.invpool) # pass if query is missing val if new_a_id === missingindexA - update_results!(results, lookup_a_by_id[new_a_id],UInt32(1):UInt32(dims[2]),missingval) + update_results!(results, lookup_a_by_id[new_a_id],UInt32(1):UInt32(_dims[2]),missingval) continue end @@ -237,13 +240,17 @@ function gammaCKfuzzy!(vecA::PooledVector,vecB::PooledVector,results::DiBitMatri end -function gammaCKfuzzy!(vecA::PooledVector,vecB::PooledVector,results::DiBitMatrix,dims::Tuple{Int,Int}, +function gammaCKfuzzy!(vecA::PooledVector,vecB::PooledVector,results::DiBitMatrix, tf_table_x::SubArray{Float16}, tf_table_y::SubArray{Float16}; - cut_a::Float64=0.92,cut_b::Float64=0.88,upper::Bool=true, - w::Float64=0.1,partial::Bool=true) + cut_a::Float64=0.92, cut_b::Float64=0.88, upper::Bool=true, + w::Float64=0.1,partial::Bool=true, + tf_minimum_u_value=0.001) + + if @isdefined(_dims) == false + _dims = (length(vecA), length(vecB)) + end - # functions that update the results view if partial score_value! = score_value2 @@ -265,21 +272,21 @@ function gammaCKfuzzy!(vecA::PooledVector,vecB::PooledVector,results::DiBitMatri lookup_a_by_id=pool_lookup_table(vecA.refs, lenA) lookup_b_by_id=pool_lookup_table(vecB.refs, lenB) - dims=(length(vecA),length(vecB)) + _dims=(length(vecA),length(vecB)) # term frequency for x Threads.@threads for i in lookup_a_by_id - tf_val=length(i)/dims[1] + tf_val=length(i)/_dims[1] for ii in i - tf_table_x[ii] = tf_val + tf_table_x[ii] = max(tf_val, tf_minimum_u_value) end end # term frequency for y Threads.@threads for i in lookup_b_by_id - tf_val=length(i)/dims[2] + tf_val=length(i)/_dims[2] for ii in i - tf_table_y[ii] = tf_val + tf_table_y[ii] = max(tf_val, tf_minimum_u_value) end end @@ -291,7 +298,7 @@ function gammaCKfuzzy!(vecA::PooledVector,vecB::PooledVector,results::DiBitMatri Threads.@threads for (query_name,new_a_id) in collect(vecA.invpool) # pass if query is missing val if new_a_id === missingindexA - update_results!(results, lookup_a_by_id[new_a_id],UInt32(1):UInt32(dims[2]),missingval) + update_results!(results, lookup_a_by_id[new_a_id],UInt32(1):UInt32(_dims[2]),missingval) continue end diff --git a/src/gammas/gammaCKpar.jl b/src/gammas/gammaCKpar.jl index ee90643..3214cc8 100644 --- a/src/gammas/gammaCKpar.jl +++ b/src/gammas/gammaCKpar.jl @@ -30,16 +30,20 @@ String comparison of two columns with partial match. - `vecA::PooledVector`: Target column of dfB for string comparison. - `vecB::PooledVector`: Target column of dfB for string comparison. - `results::DiBitMatrix`: DiBitMatrix object's result_matrix. -- `dims::Tuple`: DiBitMatrix object's dims. +- `_dims::Tuple`: DiBitMatrix object's _dims. - `cut_a::Float=0.92`: Lower bound for close string distances. - `cut_b::Float=0.88`: Lower bound for partial string distances. - `distmethod::String`: String distance method ("jw" Jaro-Winkler (Default), "dl" Damerau-Levenshtein, "jaro" Jaro, "lv" Levenshtein, and "ham" Hamming). - `w`: Winkler weight for jw string distance. """ function gammaCKpar!(vecA::PooledVector,vecB::PooledVector, - results::DiBitMatrix,dims::Tuple{Int,Int}; + results::DiBitMatrix; distmethod="jw",cut_a=0.92,cut_b=0.88,partial=true,w=0.1) + if @isdefined(_dims) == false + _dims = (length(vecA), length(vecB)) + end + # assign distance function if distmethod=="jw" distance = JaroWinkler(p=w) @@ -49,8 +53,15 @@ function gammaCKpar!(vecA::PooledVector,vecB::PooledVector, distance = Jaro(p=w) elseif distmethod=="lv" distance = Levenshtein() + elseif distmethod=="ro" + distance = RatcliffObershelp() + elseif distmethod=="osa" + distance = OptimalStringAlignment() + elseif distmethod=="hamming" + distance = Hamming() end + if partial score_value! = score_value2 else @@ -77,7 +88,7 @@ function gammaCKpar!(vecA::PooledVector,vecB::PooledVector, # set all to missing where x is missing if !isnothing(missingvals_x) missingindices = findall(vecA.refs .== missingvals_x) - Threads.@threads for iy in 1:dims[2] + Threads.@threads for iy in 1:_dims[2] for ix in missingindices results[ix,iy] = missingval end @@ -87,7 +98,7 @@ function gammaCKpar!(vecA::PooledVector,vecB::PooledVector, # set all to missing where y is missing if !isnothing(missingvals_y) missingindices = findall(vecB.refs .== missingvals_y) - Threads.@threads for ix in 1:dims[1] + Threads.@threads for ix in 1:_dims[1] for iy in missingindices results[ix,iy] = missingval end @@ -99,11 +110,17 @@ end # term frequency adjusted version function gammaCKpar!(vecA::PooledVector,vecB::PooledVector, - results::DiBitMatrix,dims::Tuple{Int,Int}, + results::DiBitMatrix, tf_table_x::SubArray{Float16}, tf_table_y::SubArray{Float16}; - distmethod="jw",cut_a=0.92,cut_b=0.88,partial=true,w=0.1) + distmethod="jw",cut_a=0.92,cut_b=0.88,partial=true,w=0.1, + tf_minimum_u_value=0.001) + if @isdefined(_dims) == false + _dims = (length(vecA), length(vecB)) + end + + # assign distance function if distmethod=="jw" distance = JaroWinkler(p=w) @@ -113,6 +130,12 @@ function gammaCKpar!(vecA::PooledVector,vecB::PooledVector, distance = Jaro(p=w) elseif distmethod=="lv" distance = Levenshtein() + elseif distmethod=="ro" + distance = RatcliffObershelp() + elseif distmethod=="osa" + distance = OptimalStringAlignment() + elseif distmethod=="hamming" + distance = Hamming() end if partial @@ -134,9 +157,9 @@ function gammaCKpar!(vecA::PooledVector,vecB::PooledVector, indices_x = findall(vecA.refs .=== x) # term frequency adjustment for x - tf_val_x = length(indices_x)/dims[1] + tf_val_x = length(indices_x)/_dims[1] for tf_i in indices_x - tf_table_x[tf_i] =tf_val_x + tf_table_x[tf_i] = max(tf_val_x, tf_minimum_u_value) end for y in iter_y @@ -144,9 +167,9 @@ function gammaCKpar!(vecA::PooledVector,vecB::PooledVector, indices_y = findall(vecB.refs .=== y) # term frequency adjustment for y - tf_val_y = length(indices_y)/dims[2] + tf_val_y = length(indices_y)/_dims[2] for tf_i in indices_y - tf_table_y[tf_i] = tf_val_y + tf_table_y[tf_i] = max(tf_val_y, tf_minimum_u_value) end # string comparison @@ -160,12 +183,12 @@ function gammaCKpar!(vecA::PooledVector,vecB::PooledVector, missingindices = findall(vecA.refs .== missingvals_x) # term frequency adjustment for x - tf_val_x = length(missingindices)/dims[1] + tf_val_x = length(missingindices)/_dims[1] for tf_i in missingindices - tf_table_x[tf_i] =tf_val_x + tf_table_x[tf_i] = max(tf_val_y, tf_minimum_u_value) end - Threads.@threads for iy in 1:dims[2] + Threads.@threads for iy in 1:_dims[2] for ix in missingindices results[ix,iy] = missingval end @@ -176,12 +199,12 @@ function gammaCKpar!(vecA::PooledVector,vecB::PooledVector, if !isnothing(missingvals_y) missingindices = findall(vecB.refs .== missingvals_y) # term frequency adjustment for y - tf_val_y = length(missingindices)/dims[2] + tf_val_y = length(missingindices)/_dims[2] for tf_i in missingindices - tf_table_y[tf_i] =tf_val_y + tf_table_y[tf_i] = max(tf_val_y, tf_minimum_u_value) end - Threads.@threads for ix in 1:dims[1] + Threads.@threads for ix in 1:_dims[1] for iy in missingindices results[ix,iy] = missingval end @@ -191,70 +214,3 @@ function gammaCKpar!(vecA::PooledVector,vecB::PooledVector, return nothing end -# term frequency adjusted version -function gammaKpar!(vecA::PooledVector,vecB::PooledVector,results::DiBitMatrix, dims::Tuple, - tf_table_x::SubArray{Float16}, - tf_table_y::SubArray{Float16}) - # Segment unique keys from missing key - missingvals_x = findfirst(ismissing.(vecA.pool)) - iter_x=filter(x -> x != missingvals_x, 0x00000001:UInt32(length(vecA.pool))) - - missingvals_y = findfirst(ismissing.(vecB.pool)) - iter_y=filter(x -> x != missingvals_y, 0x00000001:UInt32(length(vecB.pool))) - - # Form match matrices based on differing levels of matches - Threads.@threads for x in iter_x - indices_x = findall(vecA.refs .=== x) - # term frequency adjustment for x - tf_val_x = length(indices_x)/dims[1] - for tf_i in indices_x - tf_table_x[tf_i] =tf_val_x - end - for y in iter_y - indices_y = findall(vecB.refs .=== y) - # term frequency adjustment for y - tf_val_y = length(indices_y)/dims[2] - for tf_i in indices_y - tf_table_y[tf_i] = tf_val_y - end - # if matches at a threshold, go through result vector and assign new value - if vecA.pool[x] == vecB.pool[y] - for ix in indices_x,iy in indices_y - results[ix,iy] = match2 - end - end - end - end - - # set all to missing where x is missing - if !isnothing(missingvals_x) - missingindices = findall(vecA.refs .== missingvals_x) - # term frequency adjustment for x - tf_val_x = length(missingindices)/dims[1] - for tf_i in missingindices - tf_table_x[tf_i] =tf_val_x - end - Threads.@threads for iy in 1:dims[2] - for ix in missingindices - results[ix,iy] = missingval - end - end - end - # set all to missing where y is missing - if !isnothing(missingvals_y) - missingindices = findall(vecB.refs .== missingvals_y) - # term frequency adjustment for y - tf_val_y = length(missingindices)/dims[2] - for tf_i in missingindices - tf_table_y[tf_i] =tf_val_y - end - - Threads.@threads for ix in 1:dims[1] - for iy in missingindices - results[ix,iy] = missingval - end - end - end - # Return nothing - return nothing -end diff --git a/src/gammas/gammaKpar.jl b/src/gammas/gammaKpar.jl index 7533033..b296107 100755 --- a/src/gammas/gammaKpar.jl +++ b/src/gammas/gammaKpar.jl @@ -6,9 +6,12 @@ Exact comparison of variables. - `vecB::PooledVector`: Target column of dfB for string comparison. - `results::SubArray`: ResultMatrix object's result_matrix. - `array_2Dindex::Function`: ResultMatrix object's array_2Dindex function -- `dims::Tuple`: ResultMatrix object's dims. +- `_dims::Tuple`: ResultMatrix object's _dims. """ -function gammaKpar!(vecA::PooledVector,vecB::PooledVector,results::DiBitMatrix, dims::Tuple) +function gammaKpar!(vecA::PooledVector,vecB::PooledVector,results::DiBitMatrix) + if @isdefined(_dims) == false + _dims = (length(vecA), length(vecB)) + end # Segment unique keys from missing key missingvals_x = findfirst(ismissing.(vecA.pool)) iter_x=filter(x -> x != missingvals_x, 0x00000001:UInt32(length(vecA.pool))) @@ -33,7 +36,7 @@ function gammaKpar!(vecA::PooledVector,vecB::PooledVector,results::DiBitMatrix, # set all to missing where x is missing if !isnothing(missingvals_x) missingindices = findall(vecA.refs .== missingvals_x) - Threads.@threads for iy in 1:dims[2] + Threads.@threads for iy in 1:_dims[2] for ix in missingindices results[ix,iy] = missingval end @@ -42,7 +45,81 @@ function gammaKpar!(vecA::PooledVector,vecB::PooledVector,results::DiBitMatrix, # set all to missing where y is missing if !isnothing(missingvals_y) missingindices = findall(vecB.refs .== missingvals_y) - Threads.@threads for ix in 1:dims[1] + Threads.@threads for ix in 1:_dims[1] + for iy in missingindices + results[ix,iy] = missingval + end + end + end + # Return nothing + return nothing +end + + +# term frequency adjusted version +function gammaKpar!(vecA::PooledVector,vecB::PooledVector,results::DiBitMatrix, + tf_table_x::SubArray{Float16}, + tf_table_y::SubArray{Float16}; + tf_minimum_u_value=0.001) + + if @isdefined(_dims) == false + _dims = (length(vecA), length(vecB)) + end + # Segment unique keys from missing key + missingvals_x = findfirst(ismissing.(vecA.pool)) + iter_x=filter(x -> x != missingvals_x, 0x00000001:UInt32(length(vecA.pool))) + + missingvals_y = findfirst(ismissing.(vecB.pool)) + iter_y=filter(x -> x != missingvals_y, 0x00000001:UInt32(length(vecB.pool))) + + # Form match matrices based on differing levels of matches + Threads.@threads for x in iter_x + indices_x = findall(vecA.refs .=== x) + # term frequency adjustment for x + tf_val_x = length(indices_x)/_dims[1] + for tf_i in indices_x + tf_table_x[tf_i] = max(tf_val_x, tf_minimum_u_value) + end + for y in iter_y + indices_y = findall(vecB.refs .=== y) + # term frequency adjustment for y + tf_val_y = length(indices_y)/_dims[2] + for tf_i in indices_y + tf_table_y[tf_i] = max(tf_val_y, tf_minimum_u_value) + end + # if matches at a threshold, go through result vector and assign new value + if vecA.pool[x] == vecB.pool[y] + for ix in indices_x,iy in indices_y + results[ix,iy] = match2 + end + end + end + end + + # set all to missing where x is missing + if !isnothing(missingvals_x) + missingindices = findall(vecA.refs .== missingvals_x) + # term frequency adjustment for x + tf_val_x = length(missingindices)/_dims[1] + for tf_i in missingindices + tf_table_x[tf_i] = max(tf_val_x, tf_minimum_u_value) + end + Threads.@threads for iy in 1:_dims[2] + for ix in missingindices + results[ix,iy] = missingval + end + end + end + # set all to missing where y is missing + if !isnothing(missingvals_y) + missingindices = findall(vecB.refs .== missingvals_y) + # term frequency adjustment for y + tf_val_y = length(missingindices)/_dims[2] + for tf_i in missingindices + tf_table_y[tf_i] = max(tf_val_y, tf_minimum_u_value) + end + + Threads.@threads for ix in 1:_dims[1] for iy in missingindices results[ix,iy] = missingval end diff --git a/src/gammas/gammaNUMCKpar.jl b/src/gammas/gammaNUMCKpar.jl index 1dc4162..72d9956 100755 --- a/src/gammas/gammaNUMCKpar.jl +++ b/src/gammas/gammaNUMCKpar.jl @@ -37,7 +37,7 @@ Numeric comparison of two columns - `cut_a::Number=1`: Lower bound for close string distances. - `cut_b::Number=2`: Lower bound for partial string distances. """ -function gammaNUMCKpar!(vecA::Vector,vecB::Vector, +function gammaNUMCKpar!(vecA,vecB, results::DiBitMatrix; cut_a=1,cut_b=2, partial::Bool=true) diff --git a/src/getMatches.jl b/src/getMatches.jl deleted file mode 100755 index 13c6914..0000000 --- a/src/getMatches.jl +++ /dev/null @@ -1,39 +0,0 @@ -# Decomposes match vector index back into original indices from initial inputs. -function get_2Dindex(index, nrows) - zero_based_index = index - 1 - row = Int(mod(zero_based_index, nrows)) + 1 - col = Int(div(zero_based_index, nrows)) + 1 - return (row, col) -end - -""" -Converts the matches from the tableCounts function based on the predefined threshold from the fastLink call into original indices from the input datasets. - -# Arguments -- `resultsEM::NamedTuple`: Output of the expectation maximization fuction (eg emlinkMARmov()) -- `threshold_match`: Lower bound for the posterior probability that will act as a cutoff for matches. -""" -function getMatches!(resultsEM::NamedTuple; - threshold_match=0.85,u_b=1e10) - resultsEM.patterns_w.ismatch = resultsEM.zeta_j .>= threshold_match .&& resultsEM.patterns_w.weights .<= u_b - return nothing -end - -# applies term frequency adjustments to table -function tf_adj_table(resultsEM::NamedTuple,varnames::Vector{String},tf_table_x::Vector{Vector{Float16}},tf_table_y::Vector{Vector{Float16}}) - tf_vec = [DataFrame() for _ in eachindex(resultsEM.indices)] - new_names=vcat("tf_" .* varnames .* "_a", "tf_" .* varnames .* "_b") - for i in eachindex(resultsEM.indices) - result_len=length(resultsEM.indices[i]) - tf_results=DataFrame(ones(Float16,(result_len, 2*length(varnames))),new_names) - Threads.@threads for ii in 1:result_len - val=resultsEM.indices[i][ii] - rowval=vcat([tf_table_x[varid][val.row] for varid in eachindex(varnames)],[tf_table_y[varid][val.col] for varid in eachindex(varnames)]) - tf_results[ii,:] = rowval - end - - tf_vec[i] = tf_results - end - - return tf_vec -end diff --git a/src/matchPatterns.jl b/src/matchPatterns.jl index 6467b18..8391477 100644 --- a/src/matchPatterns.jl +++ b/src/matchPatterns.jl @@ -1,3 +1,11 @@ +module matchpatterns +import Base: getindex, setindex! +import DataFrames: DataFrame + +using ..DiBitMat + +export ComparisonIndex, LocalPatterns, MatchPatterns + struct ComparisonIndex row::UInt32 col::UInt32 @@ -22,84 +30,20 @@ struct MatchPatterns end end -function indices_to_uids(vecA, vecB, - indices::Vector{Vector{ComparisonIndex}} - ) - batch_size=500 - inds=eachindex(indices) - paired_ids = [Vector{Tuple}() for _ in inds] - Threads.@threads for i in inds - len=length(indices[i]) - lk = ReentrantLock() - Threads.@threads for first_val in 1:batch_size:len - local_paired_ids=Vector{Tuple}() - last_val = min(first_val + batch_size - 1, len) - for ii in first_val:last_val - push!(local_paired_ids,(vecA[indices[i][ii].row],vecB[indices[i][ii].col])) - end - lock(lk) do - append!(paired_ids[i],local_paired_ids) - end - end - end - return paired_ids -end +# extending base with get and set based on comparison index +getindex(df::DataFrame, x::ComparisonIndex) = df[x.row,x.col] +setindex!(df::DataFrame, value, x::ComparisonIndex) = df[x.row,x.col] = value -function get_2Dindex(index::T, nrows::Int) where T <: Integer - zero_based_index = index - 1 - row = Int(mod(zero_based_index, nrows)) + 1 - col = Int(div(zero_based_index, nrows)) + 1 - return ComparisonIndex(row, col) +function getindex(vm::DiBitMatrix, x::ComparisonIndex) + linear_index = (x.col - 1) * vm.nrows + x.row + return vm.data[linear_index] +end +function setindex!(vm::DiBitMatrix, value::UInt8, x::ComparisonIndex) + linear_index = (x.col - 1) * vm.nrows + x.row + vm.data[linear_index] = value end -function get_local_patterns(x::Vector{Vector{UInt8}}, N::Int, S::Int) - patterns=Vector{Vector{UInt8}}() - hashes=Vector{UInt64}() - indices=Vector{Vector{UInt16}}() - for i in 1:S - pattern=zeros(UInt8,N) - for n in 1:N - pattern[n]=x[n][i] - end - pattern_hash=hash(pattern) - id = findfirst(pattern_hash .=== hashes) - if isnothing(id) - push!(patterns,pattern) - push!(hashes,pattern_hash) - push!(indices,[i]) - else - push!(indices[id],i) - end - end - return LocalPatterns(patterns,indices,hashes) -end +end # module MatchPatterns + -function get_match_patterns(res::Vector{DiBitMatrix}) - matches=MatchPatterns() - N = length(res) - dimy=res[1].nrows - len=Int(res[1].data.len) - lk = ReentrantLock() - Threads.@threads for first_loc in 0:1024:len - last_loc = first_loc + 1024 - if last_loc > len - last_loc=len - end - x=[res[n].data[(first_loc+1):last_loc] for n in 1:N] - patterns=get_local_patterns(x,N,last_loc-first_loc) - for i in eachindex(patterns.hashes) - lock(lk) do - id = findfirst(patterns.hashes[i] .=== matches.hashes) - if isnothing(id) - push!(matches.patterns,patterns.patterns[i]) - push!(matches.hashes,patterns.hashes[i]) - push!(matches.indices,get_2Dindex.(first_loc .+ patterns.indices[i],dimy)) - else - append!(matches.indices[id],get_2Dindex.(first_loc .+ patterns.indices[i],dimy)) - end - end - end - end - return matches -end diff --git a/src/patterns.jl b/src/patterns.jl new file mode 100644 index 0000000..b7c7313 --- /dev/null +++ b/src/patterns.jl @@ -0,0 +1,262 @@ +module patterns + +using ..matchpatterns, ..emlink, ..tf, ..DiBitMat +import ..match2, ..nonmatch + +export indices_to_uids, process_comparisons +function indices_to_uids(vecA, vecB, + indices::Vector{Vector{ComparisonIndex}} + ) + batch_size=500 + inds=eachindex(indices) + paired_ids = [Vector{Tuple}() for _ in inds] + Threads.@threads for i in inds + len=length(indices[i]) + lk = ReentrantLock() + Threads.@threads for first_val in 1:batch_size:len + local_paired_ids=Vector{Tuple}() + last_val = min(first_val + batch_size - 1, len) + for ii in first_val:last_val + push!(local_paired_ids,(vecA[indices[i][ii].row],vecB[indices[i][ii].col])) + end + lock(lk) do + append!(paired_ids[i],local_paired_ids) + end + end + end + return paired_ids +end + +# Decomposes match vector index back into original indices from initial inputs. +function get_2Dindex(index::T, nrows::Int) where T <: Integer + zero_based_index = index - 1 + row = Int(mod(zero_based_index, nrows)) + 1 + col = Int(div(zero_based_index, nrows)) + 1 + return ComparisonIndex(row, col) +end + +function get_local_patterns(x::Vector{Vector{UInt8}}, N::Int, S::Int) + patterns=Vector{Vector{UInt8}}() + hashes=Vector{UInt64}() + indices=Vector{Vector{UInt16}}() + + for i in 1:S + pattern=zeros(UInt8,N) + for n in 1:N + pattern[n]=x[n][i] + end + pattern_hash=hash(pattern) + id = findfirst(pattern_hash .=== hashes) + if isnothing(id) + push!(patterns,pattern) + push!(hashes,pattern_hash) + push!(indices,[i]) + else + push!(indices[id],i) + end + end + return LocalPatterns(patterns,indices,hashes) +end + + +function get_match_patterns(res::Vector{DiBitMatrix}) + matches=MatchPatterns() + N = length(res) + dimy=res[1].nrows + len=Int(res[1].data.len) + lk = ReentrantLock() + Threads.@threads for first_loc in 0:1024:len + last_loc = first_loc + 1024 + if last_loc > len + last_loc=len + end + x=[res[n].data[(first_loc+1):last_loc] for n in 1:N] + patterns=get_local_patterns(x,N,last_loc-first_loc) + for i in eachindex(patterns.hashes) + lock(lk) do + id = findfirst(patterns.hashes[i] .=== matches.hashes) + if isnothing(id) + push!(matches.patterns,patterns.patterns[i]) + push!(matches.hashes,patterns.hashes[i]) + push!(matches.indices,get_2Dindex.(first_loc .+ patterns.indices[i],dimy)) + else + append!(matches.indices[id],get_2Dindex.(first_loc .+ patterns.indices[i],dimy)) + end + end + end + end + return matches +end + +function get_match_patterns(res::Vector{DiBitMatrix}, tf_tables::Dict{String, Vector{Vector{Float16}}}, + tf_vars::Vector{String}, tf_indices::Vector{Int64}, isexact=Bool[]) + + tf_patterns = Dict("relevant_tf_indices"=>Vector{Int64}[], + "tf_denom_vals"=>Vector{Vector{Float16}}[]) + + matches=MatchPatterns() + N = length(res) + dimy=res[1].nrows + len=Int(res[1].data.len) + lk = ReentrantLock() + Threads.@threads for first_loc in 0:1024:len + last_loc = first_loc + 1024 + if last_loc > len + last_loc=len + end + x=[res[n].data[(first_loc+1):last_loc] for n in 1:N] + patterns=get_local_patterns(x,N,last_loc-first_loc) + for i in eachindex(patterns.hashes) + lock(lk) do + + id = findfirst(patterns.hashes[i] .=== matches.hashes) + pattern_indices = get_2Dindex.(first_loc .+ patterns.indices[i],dimy) + if isnothing(id) + push!(matches.patterns,patterns.patterns[i]) + push!(matches.hashes,patterns.hashes[i]) + push!(matches.indices, pattern_indices) + + relevant_tf_indices = find_tf_pattern_vars(patterns.patterns[i], tf_indices) + push!(tf_patterns["relevant_tf_indices"], relevant_tf_indices) + push!(tf_patterns["tf_denom_vals"], [match_level_tf_lookup(tf_tables[tf_vars[tfi]], pattern_indices, isexact[tfi]) + for tfi in relevant_tf_indices]) + else + append!(matches.indices[id], pattern_indices) + + for (tfi_loc, tfi) in enumerate(tf_patterns["relevant_tf_indices"][id]) + append!(tf_patterns["tf_denom_vals"][id][tfi_loc], match_level_tf_lookup(tf_tables[tf_vars[tfi]], pattern_indices, isexact[tfi])) + end + end + end + end + end + return (tf_patterns, matches) +end + + + +function match_level_tf_lookup(tf_table::Vector{Vector{Float16}}, pattern_indices::Vector{ComparisonIndex}, isexact::Bool) + if isexact + calculate_tf_denom = calculate_tf_denom_exact + else + calculate_tf_denom = calculate_tf_denom_fuzzy + end + return [calculate_tf_denom(tf_table[1][pi.row],tf_table[2][pi.col]) for pi in pattern_indices] +end + +function get_match_patternids(resultsEM; base="log2") + if base == "log2" + bf_to_prob = bf2_to_probability + else + bf_to_prob = bf_to_probability + end + + resultsEM["patterns_w"].weights .|> Float64 .|> + bf_to_prob .|> (x -> x >= resultsEM["threshold_match"]) |> + findall +end + +function update_pattern_level_DiBit!(pattern::DiBitMatrix, + match_vector::BitVector, + indices::Vector{ComparisonIndex}) + count = length(match_vector) + Threads.@threads for i in collect(1:count) + pattern[indices[i]] = match_vector[i] ? match2 : nonmatch + end + return nothing +end + +function patterns_to_DiBit(resultsTF::Vector{Dict{String, Any}}, + comparisons::Vector{Vector{ComparisonIndex}}, + _dims::Tuple{Int64,Int64}) + patterns = DiBitMatrix(_dims...) + + # find non adjusted matches + pattern_ids = findall([any(p["ismatch"]) && p["tf_adjusted"] == false for p in resultsTF]) + Threads.@threads for pattern_id in pattern_ids + Threads.@threads for obs in comparisons[pattern_id] + patterns[obs] = match2 + end + end + + # find tf_adjusted matches + pattern_ids = findall([any(p["ismatch"]) && p["tf_adjusted"] for p in resultsTF]) + Threads.@threads for pattern_id in pattern_ids + update_pattern_level_DiBit!(patterns, + resultsTF[pattern_id]["ismatch"], + comparisons[pattern_id]) + end + + return patterns +end + +function match_and_link(patterns::Vector{DiBitMatrix}, e::Dict{String, Any}, _dims::Tuple{Int64,Int64}, + final_name::String) + counts=get_match_patterns(patterns) + resultsEM=emlinkMARmov(counts.patterns, + length.(counts.indices), + _dims, + e["variables"]; + e["parameters"]...) + if e["name"] != final_name + pattern_ids = get_match_patternids(resultsEM; base="log") + resultsTF = [Dict{String,Any}("ismatch" => pi ∈ pattern_ids,"tf_adjusted" => false) + for pi in 1:resultsEM["number_of_unique_patterns"]] + return patterns_to_DiBit(resultsTF, counts.indices, _dims) + else + return (counts, resultsEM) + end +end + + +function match_and_link(patterns::Vector{DiBitMatrix}, e::Dict{String, Any}, _dims::Tuple{Int64,Int64}, + final_name::String, tf_vars::Vector{String}, parameters::Dict{String, Dict{String, Any}}, + tf_tables::Dict{String, Vector{Vector{Float16}}}) + tf_indices=[findfirst(i .== e["variables"]) for i in tf_vars] + isexact = [parameters[varname]["method"] == "exact" for varname in tf_vars] + tf_patterns, counts = get_match_patterns(patterns,tf_tables,tf_vars, tf_indices, isexact) + resultsEM = emlinkMARmov(counts.patterns, + length.(counts.indices), + _dims, + e["variables"]; + e["parameters"]...) + + tf_prior_weights = get_tf_adjustment_prior_weights(parameters, tf_vars) + resultsTF = generate_tf_adjustment_dict(resultsEM, tf_patterns, tf_prior_weights; base="log") + + if e["name"] != final_name + return patterns_to_DiBit(resultsTF, counts.indices, _dims) + else + return (counts, resultsEM, resultsTF) + end +end + +function process_comparisons(res::Dict{String, DiBitMatrix}, + emlink_configuration::Vector{Vector{Dict{String, Any}}}, + _dims::Tuple{Int64,Int64}, + parameters::Dict{String, Dict{String, Any}}, + tf_tables::Dict{String, Vector{Vector{Float16}}}) + final_name = last(emlink_configuration)[1]["name"] + for emconfig in emlink_configuration + for e in emconfig + tf_vars=intersect(e["variables"],keys(tf_tables)) + patterns=[pop!(res, v) for v in e["variables"]] + if isempty(tf_vars) + if e["name"] != final_name + res[e["name"]] = match_and_link(patterns, e, _dims, final_name) + else + return match_and_link(patterns, e, _dims, final_name) + end + + else + if e["name"] != final_name + res[e["name"]] = match_and_link(patterns, e, _dims, final_name, tf_vars, parameters, tf_tables) + else + return match_and_link(patterns, e, _dims, final_name, tf_vars, parameters, tf_tables) + end + + end + end + end +end +end #module patterns diff --git a/src/settings/settings.jl b/src/settings/settings.jl new file mode 100644 index 0000000..6de9e7d --- /dev/null +++ b/src/settings/settings.jl @@ -0,0 +1,104 @@ +module settings + +export namedtuple, fetch_parameters, retrieve, parse_configuration, remove_keys + +dictkeys(d::Dict) = (collect(Symbol.(keys(d)))...,) +dictvalues(d::Dict) = (collect(values(d))...,) + +namedtuple(d::Dict{String,T}) where {T} = + NamedTuple{dictkeys(d)}(dictvalues(d)) + +function fetch_parameters(vec::Vector, parameter::String) + for value in vec + retvalue = fetch_parameters(value, parameter) + if !isnothing(retvalue) + return retvalue + end + end +end + +function fetch_parameters(dict::Dict, parameter::String) + for (key, value) in dict + if key == "varname" + if value == parameter + return deepcopy(dict) + end + elseif key == "variables" || key == "comparisons" + return fetch_parameters(value, parameter) + end + end +end + + +# function to retrieve nested keys +function retrieve(vec::Vector, key_of_interest::String) + returnval = [] + for value in vec + if value isa Dict + append!(returnval,retrieve(value, key_of_interest)) + end + end + return returnval +end + +function retrieve(dict::Dict, key_of_interest::String) + returnval = [] + for (key, value) in dict + if key == key_of_interest + append!(returnval,[value]) + end + if value isa Dict || value isa Vector + append!(returnval,retrieve(value, key_of_interest)) + end + end + return returnval +end + +remove_keys(x::Dict,y) = filter(((c,v),) -> c ∈ collect(setdiff(keys(x), y)), x) + +get_config_varname(x::Dict) = haskey(x,"comparisons") ? x["comparisons"]["name"] : x["varname"] + +get_calculated_vars(x::Vector) = [i["comparisons"]["name"] for i in x if haskey(i, "comparisons")] + +get_config_variables(x::Vector) = [get_config_varname(i) for i in x] + +function extract_comparison_info(x::Dict, parent::Union{Nothing,String}) + return Dict("parent"=>parent, "name"=>x["name"], + "variables" => get_config_variables(x["variables"]), + "calculated_vars" => get_calculated_vars(x["variables"]), + "parameters" => namedtuple(remove_keys(x, ["name","variables"]))) +end + + + +function parse_configuration(config::Vector, results::Vector{Vector{Dict{String,Any}}}, parent::String, depth::Int) + append!(results[depth], extract_comparison_info.(config, parent)) + add_child=true + for i in config + comparisons=map(x->x["comparisons"], filter(c -> haskey(c, "comparisons"), i["variables"])) + if length(comparisons) > 0 + if add_child + push!(results,[]) + add_child=false + end + results = parse_configuration(comparisons, results, i["name"],depth+1) + end + end + + return results +end + +function parse_configuration(config::Dict) + results = [[extract_comparison_info(config["comparisons"], nothing)]] + push!(results,[]) + comparisons=map(x->x["comparisons"], filter(c -> haskey(c, "comparisons"), config["comparisons"]["variables"])) + if length(comparisons) > 0 + return [i for i in reverse(parse_configuration(comparisons, results, config["comparisons"]["name"], 2)) if i != []] + else + return [i for i in results if i != []] + end +end + +end + + diff --git a/src/term_frequency_adjustment.jl b/src/term_frequency_adjustment.jl new file mode 100644 index 0000000..43248c1 --- /dev/null +++ b/src/term_frequency_adjustment.jl @@ -0,0 +1,176 @@ +module tf +using DataFrames +import ..settings: retrieve +import ..match1, ..match2 + +export bf2_to_probability, bf_to_probability, find_tf_pattern_vars, generate_tf_adjustment_dict, get_tf_adjustment_prior_weights, calculate_tf_denom_fuzzy, calculate_tf_denom_exact + +# function get_kval(x, i) +# x_l = length(x) +# if i == 0 +# i = 1 +# elseif i == 2 +# i = x_l +# elseif i == 1 && x_l == 3 +# i = length(x) - 1 +# end +# return x[i] +# end + + +function generate_pattern_level_structure(N::Int64, varnames::Vector{String}, prior::Float64) + return Dict((v => zeros(Float64, N) + for v in varnames)..., + "final_weight" => fill(prior, N), + "ismatch" => falses(N), "final_zetaj" => zeros(Float64, N), "tf_adjusted"=> true) +end + +function generate_tf_skeleton(EMOutput::Dict{String,Any}, tf_indices::Vector{Vector{Int64}}) + collect(1:EMOutput["number_of_unique_patterns"]) .|> + x -> let colindices = tf_indices[x]; + patterns_w = EMOutput["patterns_w"][x,:]; + count = patterns_w.counts; + varnames = EMOutput["varnames"] + if isempty(colindices) + Dict("tf_adjustment_weight" => [0.0], + "final_weight" => [Float64(patterns_w.weights)], + "final_zetaj" => [bf_to_probability(Float64(patterns_w.weights))], + "ismatch" => bf_to_probability(Float64(patterns_w.weights)) >= EMOutput["threshold_match"], + "tf_adjusted" => false) + else + generate_pattern_level_structure(count, varnames[colindices], Float64(patterns_w.weights)) + end + end +end + +function get_tf_adjustment_prior_weights(parameters::Dict{String,Dict{String,Any}}, tf_vars::Vector{String}) + weights=[retrieve(parameters[v], "tf_adjustment_weight") for v in tf_vars] + [length(w) == 0 ? 1.0 : w[1] for w in weights] +end + + +function pattern_tf_adjustment!(tf_result::Dict{String, Any}, colnames::Vector{String}, + tfvals::Vector{Vector{Float16}}, uvals::Vector{Float64}, + threshold_match::Float64, count::Int64, prior_weight::Vector{Float64}; base="log2") + if base == "log2" + compute_tf_weight = tf_to_bf2 + bf_to_prob = bf2_to_probability + else + compute_tf_weight = tf_to_bf + bf_to_prob = bf_to_probability + end + Threads.@threads for ri in collect(1:count) + for (ci, cn) in enumerate(colnames) + adjustment = compute_tf_weight(tfvals[ci][ri], uvals[ci], prior_weight[ci]) + tf_result[cn][ri] = adjustment + tf_result["final_weight"][ri] += adjustment + end + zetaj = bf_to_prob(tf_result["final_weight"][ri]) + tf_result["final_zetaj"][ri] = zetaj + tf_result["ismatch"][ri] = zetaj >= threshold_match + end + return nothing +end + +function generate_tf_adjustment_dict(EMOutput::Dict{String,Any}, tfPatterns::Dict{String,Vector}, tf_prior_weights::Vector{Float64}; base="log2") + tfResults = generate_tf_skeleton(EMOutput, tfPatterns["relevant_tf_indices"]) + threshold_match = EMOutput["threshold_match"] + for pattern_id in collect(1:EMOutput["number_of_unique_patterns"]) + colindices = tfPatterns["relevant_tf_indices"][pattern_id] + count = EMOutput["patterns_w"].counts[pattern_id] + tf_uvals = get_tf_u_values(EMOutput["patterns_w"], colindices, pattern_id) + tf_pw = tf_prior_weights[colindices] + + if tfResults[pattern_id]["tf_adjusted"] + + pattern_tf_adjustment!(tfResults[pattern_id], + EMOutput["varnames"][colindices], + tfPatterns["tf_denom_vals"][pattern_id], + tf_uvals, + threshold_match, + count, + tf_pw; + base) + + + end + end + return tfResults +end + + + + + +function find_tf_pattern_vars(unique_pattern::Vector{UInt8}, tf_indices::Vector{Int64})::Vector{Int64} + return intersect(tf_indices, findall(unique_pattern .== match1 .|| unique_pattern .== match2)) +end + +function calculate_tf_denom_fuzzy(x, y) + return max(x,y) +end + +function calculate_tf_denom_exact(x, y) + return (x + y)/2 +end + +function tf_to_bf(denom, uval, prior_weight) + return log(uval/denom) * prior_weight +end + +function tf_to_bf2(denom, uval, prior_weight) + return log2(uval/denom) * prior_weight +end + +function bf_to_probability(w::Float64)::Float64 + return exp(w)/(1+exp(w)) +end +function bf2_to_probability(w::Float64)::Float64 + return exp2(w)/(1+exp2(w)) +end + +function get_tf_u_values(patterns_w::DataFrame, tf_indices::Vector{Int64}, row::Int64) + N_comparisons = sum(patterns_w[:,"counts"]) + tf_uvals=[[sum(patterns_w[ismissing.(patterns_w[:,tfi]) .== false .&& patterns_w[:,tfi] .== value,"counts"])/N_comparisons for value in 1:2] for tfi in tf_indices] + return [tf_uvals[i][v] for (i,v) in enumerate(patterns_w[row,tf_indices])] +end + +# applies term frequency adjustments to table +function tf_adj_table(resultsEM::NamedTuple,varnames::Vector{String},tf_table_x::Vector{Vector{Float16}},tf_table_y::Vector{Vector{Float16}}) + tf_vec = [DataFrame() for _ in eachindex(resultsEM.indices)] + new_names=vcat("tf_" .* varnames .* "_a", "tf_" .* varnames .* "_b") + for i in eachindex(resultsEM.indices) + result_len=length(resultsEM.indices[i]) + tf_results=DataFrame(ones(Float16,(result_len, 2*length(varnames))),new_names) + Threads.@threads for ii in 1:result_len + val=resultsEM.indices[i][ii] + rowval=vcat([tf_table_x[varid][val.row] for varid in eachindex(varnames)],[tf_table_y[varid][val.col] for varid in eachindex(varnames)]) + tf_results[ii,:] = rowval + end + + tf_vec[i] = tf_results + end + + return tf_vec +end + +function get_tf_weight(parameters::Dict, varname::String)::Float64 + return in("tf_adjustment_weight", keys(parameters[varname])) ? parameters[varname]["tf_adjustment_weight"] : 1.0 +end + + + + +function update_tf_zetas_ismatch(final_bf::Vector{Float64}, + threshold_match::Float64) + bf_len = length(final_bf) + final_zj = zeros(Float64, bf_len) + ismatch = falses(bf_len) + Threads.@threads for (i, v) in collect(zip(1:bf_len,final_bf)) + final_zj[i] = bf_to_probability(v) + ismatch[i] = final_zj[i] > threshold_match + end + return (final_zj, ismatch) +end + +end diff --git a/test/runtests.jl b/test/runtests.jl index 344083b..55ee065 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,6 +1,6 @@ using FastLink using Test -import DataFrames: DataFrame, nrow +import DataFrames: DataFrame, nrow, passmissing, PooledArray import CSV import Pkg.Artifacts: @artifact_str @@ -20,38 +20,61 @@ dfB=CSV.read("$(b_fil)/dfB.csv", DataFrame, dfA.ida = hash.(eachrow(dfA)) dfB.idb = hash.(eachrow(dfB)) +varnames=["firstname","middlename", "lastname","housenum"] + +for var in varnames[1:3] + dfA[!,var] = PooledArray(passmissing(x->uppercase(x)).(dfA[:,var])) + dfB[!,var] = PooledArray(passmissing(x->uppercase(x)).(dfB[:,var])) +end -varnames = ["firstname","middlename", "lastname","housenum"] -cut_a = [0.92,0.92,0.92,1] -cut_p = [0.88,0.88,0.88,2] -match_method = ["string","string","string","float"] -partials = [true,true,true,true] -stringdist_method = ["jw","jw","jw","jw"] -upper_case = [false,false,false,false] -jw_weight = [0.1,0.1,0.1,0.0] + +dfA.housenum = Vector(dfA.housenum) +dfB.housenum = Vector(dfB.housenum) +config = Dict("idvar" => ["ida", "idb"], + "link_type" => "link_only", + "comparisons" => Dict("name" => "total", + "variables" => [ + Dict("varname" => "firstname", + "partial" => true, + "method" => "jarowinkler", + "cut_a" => 0.92, + "cut_b" => 0.88, + "w" => 0.1), + Dict("varname" => "middlename", + "partial" => true, + "method" => "jarowinkler", + "cut_a" => 0.92, + "cut_b" => 0.88, + "w" => 0.1), + Dict("varname" => "lastname", + "partial" => true, + "method" => "jarowinkler", + "cut_a" => 0.92, + "cut_b" => 0.88, + "w" => 0.1), + Dict("varname" => "housenum", + "partial" => true, + "method" => "numeric", + "cut_a" => 1, + "cut_b" => 2) + ])) @testset "Testing FastLink Basic Run" begin @info "Executing fastLink()" - results=fastLink(dfA,dfB,varnames, - ("ida", "idb"), - match_method=match_method, - partials=partials, - upper_case=upper_case, - stringdist_method=stringdist_method, - cut_a=cut_a, - cut_p=cut_p, - jw_weight=jw_weight) - @test true + results=fastLink(dfA,dfB,config) + @info "Correct # of Matches" - @test sum(results.patterns_w.counts[results.patterns_w.ismatch]) == 50 - @info "Number of patterns == 18" - @test length(results.patterns_b) == 18 + p_w = results["resultsEM"]["patterns_w"] + inds = p_w.zeta_j .>= 0.85 + @test sum(p_w.counts[inds]) == 50 + @info "Number of patterns == 26" + @test results["resultsEM"]["number_of_unique_patterns"] == 26 @info "Number of counts == (N₁×N₂) " - @test sum(results.patterns_w.counts) == nrow(dfA) * nrow(dfB) + @test sum(p_w.counts) == nrow(dfA) * nrow(dfB) @info "Ρ(𝑢) >=.999" - @test results.p_u >= .999 + @test results["resultsEM"]["p_u"] >= .999 @info "Ρ(𝑚) <= .0005" - @test results.p_m <= .0005 + @test results["resultsEM"]["p_m"] <= .0005 true end diff --git a/test_parameters.json b/test_parameters.json new file mode 100644 index 0000000..1a52fc8 --- /dev/null +++ b/test_parameters.json @@ -0,0 +1,26 @@ +{ + "link_type": "link_only", + "idvar": ["id", "id2"], + "comparisons": { + "name": "total", + "prior_lambda": 0.000001, + "w_lambda": 0.5, + "threshold_match": 0.88, + "variables": [ + {"varname": "firstname", "method": "fuzzy", "partial": true, "cut_a": 0.92, "cut_b": 0.88, "upper": true, "tf_adjust": true, "w": 0.1}, + {"varname": "middlename", "method": "exact"}, + {"varname": "lastname", "method": "jarowinkler"}, + { + "comparisons": { + "name": "address", + "threshold_match": 0.92, + "variables": [ + {"varname": "housenum", "method": "exact", "tf_adjust": true, "tf_adjustment_weight":0.5, "tf_minimum_u_value": 0.001}, + {"varname": "streetname", "method": "jarowinkler", "w": 0.1, "tf_adjust": true, "tf_adjustment_weight":0.25, "tf_minimum_u_value": 0.001}, + {"varname": "city", "method": "jarowinkler", "tf_adjustment_weight":0.15, "tf_adjust": true} + ] + } + } + ] + } +}