diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index bf20525..d8563a6 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -19,7 +19,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v4 with: - go-version: '1.20' + go-version: '1.22' - name: Test and Coverage run: go test -v -race -cover ./... diff --git a/TestInputs/DistanceMatrix/Random_2_input.txt b/TestInputs/DistanceMatrix/Random_2_input.txt new file mode 100644 index 0000000..7ea96c5 --- /dev/null +++ b/TestInputs/DistanceMatrix/Random_2_input.txt @@ -0,0 +1,2 @@ +0 399 294 169 166 763 263 522 179 205 234 404 484 855 948 754 117 562 88 48 264 149 316 118 217 399 606 522 498 187 554 5 571 433 182 563 360 443 581 379 209 229 445 848 173 531 352 284 910 60 742 116 841 946 987 663 991 788 211 923 133 27 24 643 991 690 975 730 608 912 883 286 84 475 758 522 665 887 772 54 227 994 260 417 511 444 578 975 527 650 809 967 287 985 562 457 734 425 964 768 877 932 311 749 355 363 869 408 875 638 520 281 876 251 733 751 432 553 880 2 557 178 583 398 422 261 307 819 831 532 386 714 783 587 620 576 509 863 885 784 796 670 240 599 827 87 68 904 883 243 729 458 322 6 371 398 924 778 196 845 307 288 186 671 877 555 992 899 532 98 399 355 979 499 584 791 597 181 851 648 686 671 281 467 767 361 909 805 416 949 203 618 200 105 406 854 304 274 629 530 476 192 898 315 799 282 589 68 334 909 988 611 719 947 596 800 91 785 491 634 397 185 376 792 813 338 36 915 264 378 451 182 760 675 700 989 582 680 337 652 550 961 90 925 476 701 811 906 178 983 167 450 734 65 381 733 987 777 486 712 279 41 912 489 819 494 433 190 25 27 848 240 638 993 278 930 205 564 741 160 231 862 626 751 115 469 788 735 622 687 463 24 434 148 840 562 511 176 273 321 347 925 796 623 610 428 235 263 592 554 497 238 502 272 87 876 258 742 302 773 858 886 219 342 744 87 946 66 698 988 106 806 258 910 471 907 827 363 979 731 462 892 360 635 152 41 815 693 720 970 903 993 237 289 618 883 173 394 706 418 705 153 917 821 355 141 890 51 357 95 711 929 169 533 518 600 208 45 486 964 463 805 96 511 474 181 426 263 463 24 622 103 687 19 604 818 0 122 117 64 84 996 148 368 896 253 540 442 904 792 277 242 325 115 279 750 247 127 973 722 756 457 868 900 842 76 134 491 975 285 148 455 65 989 5 343 146 112 984 821 46 481 810 227 885 334 300 682 141 7 377 174 644 185 905 903 156 326 136 520 553 591 642 746 872 214 927 824 394 938 137 363 678 759 330 413 974 388 379 425 766 382 311 416 73 603 591 987 599 119 999 14 549 597 886 824 322 269 982 985 222 575 264 728 827 931 344 914 494 809 299 53 640 441 336 595 578 731 202 676 355 475 388 268 54 584 674 445 944 823 636 495 522 925 405 370 304 616 699 298 298 686 959 554 456 713 975 948 924 885 348 599 633 821 413 619 288 544 356 340 543 361 159 913 330 828 110 565 700 967 689 531 878 266 942 820 918 883 527 918 927 411 200 35 388 288 761 510 97 884 484 789 391 85 16 448 318 826 88 834 391 484 833 995 740 503 783 150 932 938 735 15 647 902 685 152 604 145 808 529 101 766 449 113 750 600 770 746 559 477 91 774 919 523 316 231 583 714 957 875 662 636 593 412 463 185 444 281 127 575 294 773 723 159 797 722 47 394 566 70 735 502 836 138 210 750 913 29 18 28 743 273 656 126 288 370 487 811 137 261 82 270 154 394 607 426 751 605 431 825 585 402 435 318 851 704 708 520 967 979 309 414 284 392 272 242 128 562 12 469 398 652 132 457 689 922 417 259 271 119 560 453 14 869 942 845 181 646 199 496 990 418 600 24 652 621 889 627 321 201 242 314 23 496 9 885 486 587 542 28 724 927 159 1 181 53 977 926 805 209 453 373 746 75 279 813 391 679 445 12 310 728 96 577 414 729 591 914 630 845 668 424 919 352 52 356 108 933 6 95 540 549 501 716 420 773 292 984 550 947 493 308 759 143 871 785 139 676 533 917 413 789 364 485 558 228 309 285 483 104 411 294 155 331 184 730 473 337 476 304 58 633 783 220 190 206 848 796 165 575 996 539 863 626 596 354 322 444 614 978 300 846 642 444 407 892 970 686 926 88 305 201 82 39 282 907 687 15 605 47 616 979 143 55 117 532 907 252 210 715 539 89 624 223 757 314 291 346 734 639 984 705 909 130 235 658 596 3 385 54 325 898 641 571 5 767 289 842 964 500 705 634 606 952 932 376 622 370 526 476 700 419 9 342 73 20 835 266 965 339 420 653 448 427 979 999 496 24 445 922 902 660 491 4 365 5 537 163 567 976 2 850 175 686 853 566 211 127 20 529 401 753 700 483 470 713 346 428 91 2 223 224 613 388 246 60 304 641 278 372 631 750 114 172 54 223 789 674 612 506 180 795 735 929 971 681 20 717 451 389 763 +1 409 645 664 609 522 814 849 716 290 465 520 856 328 560 340 604 438 611 953 731 957 856 620 708 410 703 108 247 936 712 779 272 26 710 738 135 390 221 377 653 701 963 739 323 666 309 662 723 833 999 759 366 41 522 989 633 302 199 524 727 836 119 184 567 389 475 233 866 720 839 674 336 902 684 590 647 990 487 675 725 942 464 490 318 741 968 709 940 83 698 728 820 611 828 684 540 127 125 237 181 521 898 259 908 577 874 197 858 947 858 917 549 221 566 111 689 301 816 763 950 298 283 58 178 255 425 802 562 919 921 62 683 24 910 552 879 568 852 917 918 259 296 947 775 384 73 930 583 488 583 358 34 481 418 879 987 674 359 496 83 101 471 905 285 146 832 803 643 732 571 280 861 191 478 432 824 317 27 269 8 225 611 402 965 632 529 65 120 146 261 60 31 137 611 365 227 511 117 726 484 14 212 490 644 986 627 717 969 146 325 712 643 943 321 952 416 590 59 376 940 310 268 394 929 535 955 125 621 187 108 567 910 120 807 49 781 176 989 318 213 425 130 996 86 690 594 615 623 473 511 610 314 217 841 375 409 635 449 100 838 131 264 230 241 617 794 928 465 866 547 410 712 850 169 358 358 888 396 677 975 61 210 758 63 397 541 513 338 439 945 67 415 437 253 446 303 838 583 996 92 391 72 269 525 376 934 43 681 57 195 760 964 105 234 63 622 207 488 396 870 977 826 102 758 567 310 680 273 327 231 121 528 434 703 636 952 643 972 80 962 124 303 686 690 565 105 916 869 640 562 627 707 422 42 881 736 746 337 663 278 772 528 518 792 236 376 558 845 304 106 863 382 149 12 646 849 728 511 104 213 437 425 832 194 550 468 107 763 369 403 659 379 884 181 856 671 62 849 623 68 509 151 724 982 805 560 721 956 280 978 163 438 162 22 576 404 905 459 189 917 160 559 25 848 432 466 869 474 596 137 41 429 721 564 274 443 974 314 485 272 273 487 963 883 864 228 743 512 75 150 476 720 126 930 122 293 523 187 289 339 12 594 948 346 788 723 881 711 532 475 715 760 882 270 755 861 597 924 869 46 138 839 634 600 592 478 117 859 387 396 829 137 343 862 594 382 74 306 84 286 562 176 193 688 497 715 353 309 644 770 329 285 204 777 121 762 393 983 853 264 5 668 434 515 459 938 223 705 741 151 669 269 136 961 132 697 4 500 463 800 705 997 507 936 334 465 501 523 871 749 515 203 19 888 487 357 598 533 336 640 969 737 535 430 401 821 864 290 98 643 849 987 983 986 53 111 951 917 642 599 230 715 910 599 988 470 263 461 869 271 603 604 332 104 193 283 658 918 687 101 540 745 830 520 831 603 805 201 746 659 287 413 76 914 12 221 199 677 144 271 836 494 566 921 530 301 872 363 87 882 545 663 18 725 940 578 253 77 534 805 864 939 510 697 525 917 911 852 931 26 357 246 347 590 664 544 440 224 687 638 889 707 612 28 663 39 491 527 815 708 598 124 991 119 203 550 307 893 919 66 820 877 423 745 440 336 368 823 146 127 18 289 780 656 144 748 81 14 167 319 359 829 382 233 166 383 742 194 821 852 478 854 579 475 756 497 246 51 45 170 682 458 139 624 150 621 783 196 54 280 556 376 438 76 968 803 724 766 185 742 134 401 401 836 255 80 7 96 506 751 417 312 251 53 724 670 0 875 398 657 578 827 739 997 927 748 629 398 669 157 317 846 577 799 697 858 970 927 536 319 233 966 336 621 847 693 466 847 4 441 794 645 617 249 276 257 876 701 191 735 769 100 177 698 590 372 485 322 258 317 898 339 542 295 44 636 962 506 589 182 622 762 127 686 982 708 281 846 983 950 696 656 643 630 456 890 353 146 762 773 306 965 196 248 890 572 872 623 528 785 66 806 965 764 504 667 475 95 685 291 305 530 209 106 798 509 681 150 379 901 666 211 794 965 286 920 656 364 903 934 844 26 694 891 703 390 2 385 312 366 61 616 965 853 393 980 316 881 156 142 639 976 399 8 793 424 10 293 139 814 827 260 485 943 686 452 27 556 445 5 831 636 392 901 959 679 275 120 381 864 130 3 830 413 919 291 319 873 275 185 130 244 927 65 272 161 228 637 224 753 24 746 190 516 935 998 502 852 106 52 985 660 109 370 184 394 378 668 540 230 315 950 774 736 927 283 429 184 290 649 316 136 987 662 918 171 57 810 639 161 243 588 61 522 396 60 127 568 748 244 diff --git a/cluster_test.go b/cluster_test.go index a1cef33..6b05d4a 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -2,7 +2,7 @@ package main import ( "bytes" - "io/ioutil" + "os" "path" "testing" ) @@ -24,8 +24,8 @@ func TestCluster(t *testing.T) { output_buffer.Flush() out_file.Close() - f1, _ := ioutil.ReadFile(output_tree) - f2, _ := ioutil.ReadFile(expected_output_tree) + f1, _ := os.ReadFile(output_tree) + f2, _ := os.ReadFile(expected_output_tree) if !bytes.Equal(f1, f2) { t.Fatal("Input and output files to not match.") } @@ -37,13 +37,13 @@ type LinkageMethodTest struct { } var LinkageMethodTests = []LinkageMethodTest{ - LinkageMethodTest{0, "average"}, - LinkageMethodTest{1, "centroid"}, - LinkageMethodTest{2, "complete"}, - LinkageMethodTest{3, "mcquitty"}, - LinkageMethodTest{4, "median"}, - LinkageMethodTest{5, "single"}, - LinkageMethodTest{6, "ward"}, + {0, "average"}, + {1, "centroid"}, + {2, "complete"}, + {3, "mcquitty"}, + {4, "median"}, + {5, "single"}, + {6, "ward"}, } func TestGetLinkageMethod(t *testing.T) { diff --git a/data_scheduler.go b/data_scheduler.go index 1c4cdab..eb63230 100644 --- a/data_scheduler.go +++ b/data_scheduler.go @@ -22,12 +22,38 @@ Determine how many bins of the input dataset should be processed when running th The bucket size means the x Profiles will be processed by a thread, which will directly relate to how many go routines are run at a time. */ -func CalculateBucketSize(data_length int, runtime_cpus int, cpu_modifier int) int { - if cpu_modifier <= 0 { - log.Fatal("CPU modifier must be greater than 0") +func CalculateBucketSize(data_length int, minimum_bins int, bucket_increase int) (int, int) { + + if minimum_bins == 0 { + log.Fatal("You must have a CPU modifier value greater than 0") + } + + bucket_size := (data_length / minimum_bins) + if bucket_size == 0 { + bucket_size++ + } + + remainder := data_length % minimum_bins + + if remainder == 0 { + /* + when the remainder is 0 there is now spill over resulting in a + temporary reduction in the number of "bins used". As there is no + spill over bin. + */ + bucket_size-- } - bucket_size := data_length / (runtime_cpus * cpu_modifier) - return bucket_size + + if data_length < bucket_size { + return data_length, 1 + } + + if bucket_size < minimum_bins { + bucket_size *= bucket_increase + minimum_bins = data_length / bucket_size + } + + return bucket_size, minimum_bins } // A pair containing the start and end values for a given range of data to be processed. @@ -35,48 +61,42 @@ type Bucket struct { start, end int } +// Get the difference in indices between the two bucket fields +func (t *Bucket) Diff() int { + return t.end - t.start +} + // The distance metric for a given comparison type ComparedProfile struct { compared, reference *string distance float64 } -/* -For a given data set determine the the start and end range of each of the bins to be used. -e.g. if a dataset has 1000 profiles, and our bucket size is 500 we will create bins with -an of [0, 500], [500, 1000] -*/ -func BucketsIndices(data_length int, bucket_size int) []Bucket { - var bucks []Bucket - cpu_load_factor := CPU_LOAD_FACTOR // Need to add description to global options - window := bucket_size - - cpu_load_string := fmt.Sprintf("CPU load factor x%d", cpu_load_factor) - log.Println(cpu_load_string) - - if window > data_length { - bucks = append(bucks, Bucket{0, data_length}) - log.Println("Running single threaded as there are too few entries to justify multithreading.") - return bucks - } +// Calculate the initial bin sizes to use for running profiles in parallel +func CreateBucketIndices(data_length int, bucket_size int, modifier int) []Bucket { + var buckets []Bucket - if data_length < (runtime.NumCPU() * cpu_load_factor) { - bucks = append(bucks, Bucket{0, data_length}) - log.Println("Running single threaded as there are too few entries to justify multithreading.") - return bucks + if (data_length-modifier) < bucket_size || bucket_size > data_length { + // Just return the one set of indices the values are small enough + buckets = append(buckets, Bucket{modifier, data_length}) + return buckets } - for i := window; i < data_length; i = i + window { - bucks = append(bucks, Bucket{i - window, i}) + for i := (bucket_size + modifier); i < data_length; i = i + bucket_size { + new_bucket := Bucket{i - bucket_size, i} + buckets = append(buckets, new_bucket) } - bucks = append(bucks, Bucket{bucks[len(bucks)-1].end, data_length}) + final_start := buckets[len(buckets)-1].end + final_end := data_length - threads_running := fmt.Sprintf("Using %d threads for running.", len(bucks)-1) - log.Println(threads_running) - profiles_to_thread := fmt.Sprintf("Allocating ~%d profiles per a thread.", window) - log.Println(profiles_to_thread) - return bucks + if final_end-final_start < bucket_size { + // Extend the last index if required if it is very small + buckets[len(buckets)-1].end = data_length + } else { + buckets = append(buckets, Bucket{final_start, final_end}) + } + return buckets } // Compute profile differences in a given go routine. @@ -103,7 +123,8 @@ them directly to the passed in bufio.Writer. func RunData(profile_data *[]*Profile, f *bufio.Writer) { /* Schedule and arrange the calculation of the data in parallel This function is quite large and likely has room for optimization. - TODO redistribute data across threads at run time + + Once day an incredible optimization here would be to go lockless, or re-use threads */ start := time.Now() @@ -113,29 +134,30 @@ func RunData(profile_data *[]*Profile, f *bufio.Writer) { bucket_index := 0 empty_name := "" - const cpu_modifier = 2 - bucket_size := CalculateBucketSize(len(data), runtime.NumCPU(), cpu_modifier) - buckets := BucketsIndices(len(data), bucket_size) - arr_pos := 1 + const cpu_modifier = 3 + data_size := len(data) + minimum_buckets := runtime.NumCPU() * cpu_modifier + bucket_size, _ := CalculateBucketSize(data_size, minimum_buckets, cpu_modifier) + buckets := CreateBucketIndices(data_size, bucket_size, 0) format_expression := GetFormatString() - - // TODO can create a pool of go routines and pass the profile to compare to each channel + initial_bucket_location := buckets[0].start var wg sync.WaitGroup - for g := range data[0:] { - profile_comp := data[g] // copy struct for each thread + + for idx := range data { + profile_comp := data[idx] // copy struct for each thread values_write := make([]*[]*ComparedProfile, len(buckets)-bucket_index) - // TODO an incredible optimization here would be to go lockless, or re-use threads - for i := bucket_index; i < len(buckets); i++ { - array_writes := make([]*ComparedProfile, buckets[i].end-buckets[i].start) - values_write[i-bucket_index] = &array_writes + for b_idx, b := range buckets { + array_writes := make([]*ComparedProfile, b.Diff()) + values_write[b_idx] = &array_writes wg.Add(1) go func(output_array *[]*ComparedProfile, bucket_compute Bucket, profile_compare *Profile) { ThreadExecution(&data, profile_compare, bucket_compute, dist, output_array) wg.Done() - }(&array_writes, buckets[i], profile_comp) + }(&array_writes, b, profile_comp) } - wg.Wait() // Wait for everyone to catch up - buckets[bucket_index].start++ // update the current buckets tracker + + wg.Wait() // Wait for everyone to catch up + buckets[0].start++ // update the current buckets tracker for _, i := range values_write { for _, value := range *i { @@ -143,19 +165,21 @@ func RunData(profile_data *[]*Profile, f *bufio.Writer) { } } - if len(buckets) > 1 && arr_pos%bucket_size == 0 { - for f := buckets[bucket_index].end - bucket_size; f < buckets[bucket_index].end; f++ { - data[f].profile = nil - data[f].name = empty_name - + if len(buckets) != 1 && buckets[0].Diff() < minimum_buckets { + bucket_size, minimum_buckets = CalculateBucketSize(data_size-idx, minimum_buckets, cpu_modifier) + buckets = CreateBucketIndices(data_size, bucket_size, idx) + for index := initial_bucket_location; index < buckets[0].start; index++ { + data[index].profile = nil + data[index].name = empty_name } - bucket_index++ - end := time.Now().Sub(start) - thread_depletion_time := fmt.Sprintf("One thread depleted in: %fs", end.Seconds()) + initial_bucket_location = buckets[0].start + buckets[0].start++ // start index is reserved so needs to be incremented + end := time.Since(start) + thread_depletion_time := fmt.Sprintf("Redistributing data across threads. %fs", end.Seconds()) log.Println(thread_depletion_time) start = time.Now() } - arr_pos++ + } wg.Wait() f.Flush() diff --git a/data_scheduler_test.go b/data_scheduler_test.go index b72b3cb..d06221f 100644 --- a/data_scheduler_test.go +++ b/data_scheduler_test.go @@ -2,7 +2,7 @@ package main import ( "bytes" - "io/ioutil" + "os" "path" "testing" ) @@ -12,13 +12,13 @@ type bucket_tests struct { } var bucket_size_tests = []bucket_tests{ - bucket_tests{10, 1, 1, 10}, - bucket_tests{10, 2, 1, 5}, + {10, 1, 1, 9}, + {10, 2, 1, 4}, } func TestCalculateBucketSize(t *testing.T) { for _, test := range bucket_size_tests { - if output := CalculateBucketSize(test.data_length, test.bucket_size, test.cpu_modifier); output != test.expected { + if output, _ := CalculateBucketSize(test.data_length, test.bucket_size, test.cpu_modifier); output != test.expected { t.Errorf("Output %d not equal to expected %d", output, test.expected) t.Errorf("Output %+v", output) } @@ -47,10 +47,86 @@ func TestRunData(t *testing.T) { out_file.Close() // Compare outputs line by line - f1, _ := ioutil.ReadFile(test_expected_output) - f2, _ := ioutil.ReadFile(test_output_file) - + f1, _ := os.ReadFile(test_expected_output) + f2, _ := os.ReadFile(test_output_file) if !bytes.Equal(f1, f2) { t.Fatal("Input and output files to not match.") } } + +func TestRunDataSmall(t *testing.T) { + tempdir := t.TempDir() + + t.Log("Starting end to end test for distance calculations.") + test_input := "TestInputs/DistanceMatrix/Random_2_input.txt" + test_output_file := path.Join(tempdir, "output.txt") + + t.Logf("Test Input: %s", test_input) + t.Logf("Test Output Temp File: %s", test_output_file) + t.Log("Creating output buffer.") + out_buffer, out_file := CreateOutputBuffer(test_output_file) + + t.Log("Loading test allele profiles.") + test_data := LoadProfile(test_input) + RunData(test_data, out_buffer) + out_buffer.Flush() + out_file.Close() + + // Compare outputs line by line + f2, _ := os.ReadFile(test_output_file) + output := string(f2) + if output != "1 1 0\n" { + t.Fatal("Input does not equal output.") + } +} + +// Testing the redistribution of bucket indices at runtime +func TestRedistributeBuckets(t *testing.T) { + var profile_size int = 100 + var cpus int = 6 + BUCKET_SCALE = 2 + minimum_bucket_size := cpus * BUCKET_SCALE + var buckets int + buckets, minimum_bucket_size = CalculateBucketSize(profile_size, minimum_bucket_size, BUCKET_SCALE) + bucket_indices := CreateBucketIndices(profile_size, buckets, 0) + + comparisons := make([][]int, profile_size) + for idx := range comparisons { + comparisons[idx] = make([]int, 0) + } + + corrected_profile_size := profile_size + for val := range profile_size { + for _, b := range bucket_indices { + for i := b.start; i < b.end; i++ { + comparisons[val] = append(comparisons[val], i) + } + } + + if len(bucket_indices) != 1 && bucket_indices[0].Diff() < minimum_bucket_size { + buckets, minimum_bucket_size = CalculateBucketSize(profile_size-val, minimum_bucket_size, BUCKET_SCALE) + bucket_indices = CreateBucketIndices(profile_size, buckets, val) + } + bucket_indices[0].start++ + } + + profile_sizes := 0 + // Check correct number of values computed + for idx := profile_sizes; idx != 0; idx-- { + if len(comparisons[idx]) != corrected_profile_size { + t.Fatalf("Mismatched number of outputs for entry for index %d: %d != %d", idx, profile_sizes, len(comparisons[idx])) + } + profile_sizes++ + } + + // Check content + for val := range corrected_profile_size { + idx := val + for _, i := range comparisons[val] { + if i != idx { + t.Fatalf("Index: %d, Incorrect outputs: %d != %d", val, i, idx) + } + idx++ + } + } +} diff --git a/dists_test.go b/dists_test.go index a4549ff..f1d14cc 100644 --- a/dists_test.go +++ b/dists_test.go @@ -15,20 +15,20 @@ type addTest struct { } var hamming_dist_tests = []addTest{ - addTest{[]int{1, 2, 3, 4}, []int{1, 2, 3, 4}, float64(0), float64(0)}, - addTest{[]int{2, 1, 1, 2}, []int{2, 2, 2, 2}, float64(2), float64(2)}, - addTest{[]int{2, 0, 0, 2}, []int{2, 0, 0, 2}, float64(0), float64(0)}, - addTest{[]int{2, 0, 0, 2}, []int{2, 0, 1, 2}, float64(0), float64(1)}, - addTest{[]int{1, 1, 1}, []int{2, 2, 2}, float64(3), float64(3)}, - addTest{[]int{1}, []int{2}, float64(1), float64(1)}, + {[]int{1, 2, 3, 4}, []int{1, 2, 3, 4}, float64(0), float64(0)}, + {[]int{2, 1, 1, 2}, []int{2, 2, 2, 2}, float64(2), float64(2)}, + {[]int{2, 0, 0, 2}, []int{2, 0, 0, 2}, float64(0), float64(0)}, + {[]int{2, 0, 0, 2}, []int{2, 0, 1, 2}, float64(0), float64(1)}, + {[]int{1, 1, 1}, []int{2, 2, 2}, float64(3), float64(3)}, + {[]int{1}, []int{2}, float64(1), float64(1)}, } var scaled_dist_tests = []addTest{ - addTest{[]int{1, 2, 3, 4}, []int{1, 2, 3, 4}, float64(100.0 * ((4.0 - 4.0) / 4.0)), float64(100.0 * ((4.0 - 4.0) / 4.0))}, - addTest{[]int{2, 1, 1, 2}, []int{2, 2, 2, 2}, float64(100.0 * ((4.0 - 2.0) / 4.0)), float64(100.0 * ((4.0 - 2.0) / 4.0))}, - addTest{[]int{2, 0, 0, 2}, []int{2, 0, 0, 2}, float64(100.0 * ((2.0 - 2.0) / 2.0)), float64(100.0 * ((4.0 - 4.0) / 4.0))}, - addTest{[]int{1, 1, 1}, []int{2, 2, 2}, float64(100.0 * ((3.0 - 0.0) / 3.0)), float64(100.0 * ((3.0 - 0.0) / 3.0))}, - addTest{[]int{1}, []int{2}, float64(100.0), float64(100.0)}, + {[]int{1, 2, 3, 4}, []int{1, 2, 3, 4}, float64(100.0 * ((0.0) / 4.0)), float64(100.0 * ((0.0) / 4.0))}, + {[]int{2, 1, 1, 2}, []int{2, 2, 2, 2}, float64(100.0 * ((4.0 - 2.0) / 4.0)), float64(100.0 * ((4.0 - 2.0) / 4.0))}, + {[]int{2, 0, 0, 2}, []int{2, 0, 0, 2}, float64(100.0 * ((0.0) / 2.0)), float64(100.0 * ((0.0) / 4.0))}, + {[]int{1, 1, 1}, []int{2, 2, 2}, float64(100.0 * ((3.0 - 0.0) / 3.0)), float64(100.0 * ((3.0 - 0.0) / 3.0))}, + {[]int{1}, []int{2}, float64(100.0), float64(100.0)}, } func TestHammingDistance(t *testing.T) { diff --git a/fast_match_test.go b/fast_match_test.go index 4fc4229..0a1f56f 100644 --- a/fast_match_test.go +++ b/fast_match_test.go @@ -2,7 +2,7 @@ package main import ( "bytes" - "io/ioutil" + "os" "path" "testing" ) @@ -23,8 +23,8 @@ func TestIdentifyMatches(t *testing.T) { output_buffer.Flush() out_file.Close() - f1, _ := ioutil.ReadFile(output_fm) - f2, _ := ioutil.ReadFile(expected_output) + f1, _ := os.ReadFile(output_fm) + f2, _ := os.ReadFile(expected_output) if !bytes.Equal(f1, f2) { t.Fatal("Input and output files to not match.") } diff --git a/go.mod b/go.mod index 8fd15f5..b5f9942 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module go-cluster -go 1.18 +go 1.22 require ( github.com/integrii/flaggy v1.5.2 // indirect diff --git a/ingest_profile.go b/ingest_profile.go index db73e39..ecac032 100644 --- a/ingest_profile.go +++ b/ingest_profile.go @@ -65,7 +65,7 @@ func InitializeLookup(scanner *bufio.Scanner, new_line_char string, line_delimit separated_line := SplitLine(scanner.Text(), new_line_char, line_delimiter) new_array := make([]*ProfileLookup, len(*separated_line)) - for idx, _ := range new_array { + for idx := range new_array { new_array[idx] = NewProfileLookup() } return &new_array, separated_line @@ -133,9 +133,7 @@ func LoadProfiles(reference_profiles string, query_profiles string) (*[]*Profile query_data := CreateProfiles(query_scanner, normalization_lookup, new_line_char, line_delimiter, missing_value) // Append query profiles to reference profiles - for _, query_profile := range *query_data { - *ref_data = append(*ref_data, query_profile) // Copying to reference should work here as we only ever read and do not write - } + *ref_data = append(*ref_data, *query_data...) normalization_lookup = nil log.Println("Finished ingesting and normalizing profiles.") diff --git a/mconversion.go b/mconversion.go index 85ba88e..c583e06 100644 --- a/mconversion.go +++ b/mconversion.go @@ -23,7 +23,7 @@ For creating file in memory: TODO Create buffer for to contain sorted writes TODO write to array in parallel, then sort to create sequential writes to a file - - This is currently no in parallel, but a buffer can be written to sorting writes output writes + - This is currently not in parallel, but a buffer can be written to sorting writes output writes apparently file systems to not like writing to files in parallel @@ -47,12 +47,11 @@ package main import ( "bufio" - "fmt" + "container/heap" "io" "log" "os" "sort" - "strconv" "strings" ) @@ -63,6 +62,14 @@ const ( separator = '\t' ) +// / This value is set up so that values can be stored before writing out to disc +// / The index field is used exclusively by the min-heap structure as it is needed in some of its operations +type WriteValue struct { + key int64 + value []byte + index int // needed to update the heap interface +} + func open_file(file_path string, open_type int) *os.File { file, err := os.OpenFile(file_path, int(open_type), 0o666) if err != nil { @@ -80,19 +87,11 @@ func open_file(file_path string, open_type int) *os.File { return file } -func parse_int(value string) int { - val, err := strconv.ParseInt(value, 10, 64) - if err != nil { - log.Fatal(err) - } - return int(val) -} - func get_keys(value *map[string]bool) (*[]string, int) { map_vals := make([]string, len(*value)) vals := 0 longest_key := 0 - for k, _ := range *value { + for k := range *value { if len(k) > longest_key { longest_key = len(k) } @@ -161,10 +160,22 @@ func make_mask(modulus int) []byte { return mask } +func WriteQueueToFile(queue *WriteQueue, output_file *os.File) { + output_file.Seek(0, io.SeekStart) + for queue.Len() > 0 { + output_value := heap.Pop(queue).(*WriteValue) + name_out, err := output_file.WriteAt(output_value.value, output_value.key) + _ = name_out + if err != nil { + log.Fatal(err) + } + } +} + func write_matrix(input_path string, output_path string, positions *map[string]int, longest_val int) { /* - TODO optimize for sequential writes + TODO optimize for sequential writes, priority queue is implemented now, to finish off the implementation 1. Fill array containing data pairs of output position, and text out 2. Sort array on position out 3. Subtract difference in location from each sequential write. @@ -189,8 +200,20 @@ func write_matrix(input_path string, output_path string, positions *map[string]i mask := make_mask(longest_val) pad_len := int64(len(mask)) + var buffered_writes int = 1000 + write_heap := make(WriteQueue, 0, buffered_writes) // Set capacity to write buffer size + heap.Init(&write_heap) + + /* + For optimizing the outputs, an AVL tree can be used to a balance them as the + positions used are calculated. Then the buffer can be purged afterwards. + */ rows := modulus_64 for { + if write_heap.Len() == buffered_writes { + WriteQueueToFile(&write_heap, output) + } + rl, err := reader.ReadString('\n') if err != nil { break @@ -209,60 +232,23 @@ func write_matrix(input_path string, output_path string, positions *map[string]i sp1 := calculate_buffer_position(p1, p2, modulus) sp2 := calculate_buffer_position(p2, p1, modulus) - // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - // Trying to output the profile name in the right spot - // Seems to be putting data in the correct spots for each row - // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - output.Seek(0, io.SeekStart) + // TODO making more writes than nessecary profile_1_name := pad_value(data[profile_1_pos], mask) - name_out, err := output.WriteAt(profile_1_name, int64(p1)*modulus_64*pad_len) - _ = name_out - if err != nil { - log.Fatal(err) - } + heap.Push(&write_heap, &WriteValue{key: int64(p1) * modulus_64 * pad_len, value: profile_1_name, index: 0}) + heap.Push(&write_heap, &WriteValue{key: int64(p1) * pad_len, value: profile_1_name, index: 0}) // Write the columns position - // === column 1 position - output.Seek(0, io.SeekStart) - name_out, err = output.WriteAt(profile_1_name, int64(p1)*pad_len) // column position - _ = name_out - if err != nil { - log.Fatal(err) - } - - output.Seek(0, io.SeekStart) profile_2_name := pad_value(data[profile_2_pos], mask) - name_out, err = output.WriteAt(profile_2_name, int64(p2)*modulus_64*pad_len) - _ = name_out - if err != nil { - log.Fatal(err) - } + heap.Push(&write_heap, &WriteValue{key: int64(p2) * modulus_64 * pad_len, value: profile_2_name, index: 0}) + heap.Push(&write_heap, &WriteValue{key: int64(p2) * pad_len, value: profile_2_name, index: 0}) // Column Position to write - // column 2 position - output.Seek(0, io.SeekStart) - name_out, err = output.WriteAt(profile_2_name, int64(p2)*pad_len) - _ = name_out - if err != nil { - log.Fatal(err) - } - - // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - - // Write at offsets - output.Seek(0, io.SeekStart) // * name pad_len should only be applied to one value, this will differ for the top row - b1, err := output.WriteAt(string_val, (sp1 * pad_len)) // increasing by one pad for label name - _ = b1 - if err != nil { - log.Fatal(err) - } + heap.Push(&write_heap, &WriteValue{key: sp1 * pad_len, value: string_val, index: 0}) + heap.Push(&write_heap, &WriteValue{key: sp2 * pad_len, value: string_val, index: 0}) - output.Seek(0, io.SeekStart) - b2, err := output.WriteAt(string_val, (sp2 * pad_len)) - _ = b2 + } - if err != nil { - log.Fatal(err) - } + if write_heap.Len() > 0 { + WriteQueueToFile(&write_heap, output) } // Add byte mask to start of file, to prevent binary inclusion @@ -294,12 +280,6 @@ func write_matrix(input_path string, output_path string, positions *map[string]i defer output.Close() } -func calculate_buffer_size(key_len int) int { - size := key_len * key_len - // TODO need to incorporate the profile name in this output - return size -} - func calculate_buffer_position(p1 int, p2 int, modulus int) int64 { /* rows and columns provided here, the value can go to two positions, @@ -307,23 +287,9 @@ func calculate_buffer_position(p1 int, p2 int, modulus int) int64 { e.g. to get rows (p1) * modulus + p2 (columns) and flip the location for the other value */ - //fmt.Fprintf(os.Stdout, "%d %d\n", p1, p2) - // TODO need to incorporate the profile name in this output return int64((p1 * modulus) + p2) } -func print_buffer(buffer *[]int, modulus int, buff_size int) { - // ! This will go once memory mapping is implemented - fmt.Fprintf(os.Stdout, "\n") - for i := 1; i < buff_size; i++ { - fmt.Fprintf(os.Stdout, "%d\t", (*buffer)[i-1]) - if i%modulus == 0 { - fmt.Fprintf(os.Stdout, "\n") - } - } - fmt.Fprintf(os.Stdout, "\n") -} - /* Function used to create a pairwise distance matrix from a previously generated molten output. */ diff --git a/mconversion_test.go b/mconversion_test.go index 3b2babc..4935b64 100644 --- a/mconversion_test.go +++ b/mconversion_test.go @@ -3,7 +3,7 @@ package main import ( "bytes" - "io/ioutil" + "os" "path" "testing" ) @@ -21,8 +21,8 @@ func TestPairwiseToMatrix(t *testing.T) { PairwiseToMatrix(test_input_file, test_output_file) - f1, _ := ioutil.ReadFile(test_expected_file) - f2, _ := ioutil.ReadFile(test_output_file) + f1, _ := os.ReadFile(test_expected_file) + f2, _ := os.ReadFile(test_output_file) if !bytes.Equal(f1, f2) { t.Fatal("Input and expected distance matrix files do not match.") } diff --git a/normalize_profiles_test.go b/normalize_profiles_test.go index 3fee47d..1e2accb 100644 --- a/normalize_profiles_test.go +++ b/normalize_profiles_test.go @@ -12,9 +12,9 @@ type value_insertion struct { } var value_insertion_tests = []value_insertion{ - value_insertion{"test", 1}, - value_insertion{"test", 1}, - value_insertion{"test2", 2}, + {"test", 1}, + {"test", 1}, + {"test2", 2}, } func TestInsertValue_NewProfile(t *testing.T) { diff --git a/pdists.go b/pdists.go index 0ae4d84..5aa916e 100644 --- a/pdists.go +++ b/pdists.go @@ -8,12 +8,13 @@ package main import ( "fmt" - "github.com/integrii/flaggy" "log" "os" + + "github.com/integrii/flaggy" ) -var CPU_LOAD_FACTOR int = 100 +var BUCKET_SCALE int = 3 var FM_THREAD_LIMIT int64 = 100 var COLUMN_DELIMITER = "\t" var NEWLINE_CHARACTER = "\n" @@ -29,7 +30,7 @@ var convert_matrix *flaggy.Subcommand var fast_match *flaggy.Subcommand var tree *flaggy.Subcommand -const version string = "0.0.1" +const version string = "0.0.2" const integer_required_distance_functions_threshold = 2 func cli() { @@ -52,10 +53,9 @@ func cli() { scaled_missing.help, scaled_missing.assignment) buffer_help := fmt.Sprintf("The default buffer size is: %d. Larger buffers may increase performance.", BUFFER_SIZE) + load_factor_help := fmt.Sprintf("This value is used to compute how many profile calculations are assigned to thread, a larger value will result in fewer threads being used. Default: %d", BUCKET_SCALE) distance_matrix.String(&INPUT_PROFILE, "i", "input", "File path to your alleles profiles.") - distance_matrix.Int(&CPU_LOAD_FACTOR, "l", "load-factor", - `Used to set the minimum number of values needed to use -multi-threading. e.g. if (number of cpus * load factor) > number of table rows. Only a single thread will be used. `) + distance_matrix.Int(&BUCKET_SCALE, "l", "load-factor", load_factor_help) distance_matrix.Int(&DIST_FUNC, "d", "distance", distance_func_help) distance_matrix.String(&OUTPUT_FILE, "o", "output", "Name of output file. If nothing is specified results will be sent to stdout.") distance_matrix.Int(&BUFFER_SIZE, "b", "buffer-size", buffer_help) @@ -83,7 +83,7 @@ multi-threading. e.g. if (number of cpus * load factor) > number of table rows. tree = flaggy.NewSubcommand("tree") tree.Description = "Create a dendrogram from a supplied distance matrix." tree.String(&INPUT_PROFILE, "i", "input", "File path to previously generate distance matrix.") - tree.String(&OUTPUT_FILE, "o", "output", "Name of output file. If nothing is specified results will be sent to stdout.") + tree.String(&OUTPUT_FILE, "o", "output", "Name of output file.") tree.Int(&LINKAGE_METHOD, "l", "linkage-method", linkage_methods_help) flaggy.AttachSubcommand(distance_matrix, 1) diff --git a/queue.go b/queue.go new file mode 100644 index 0000000..c5c1f7e --- /dev/null +++ b/queue.go @@ -0,0 +1,34 @@ +/* + Store outputs to a heap so that the min element can be popped from the heap each time + to order output writes. +*/ + +package main + +// / Creating a queue using a min-heap in order to make writes to out to the file system sequential instead of relying on random access +type WriteQueue []*WriteValue + +func (h WriteQueue) Len() int { return len(h) } +func (h WriteQueue) Less(i, j int) bool { return h[i].key < h[j].key } +func (h WriteQueue) Swap(i, j int) { + h[i], h[j] = h[j], h[i] + h[i].index = i + h[j].index = j +} + +func (h *WriteQueue) Push(x any) { + n := len(*h) + item := x.(*WriteValue) + item.index = n + *h = append(*h, item) +} + +func (h *WriteQueue) Pop() any { + old := *h + n := len(old) + item := old[n-1] + old[n-1] = nil // Prevents a memory leak + item.index = -1 + *h = old[0 : n-1] + return item +} diff --git a/queue_test.go b/queue_test.go new file mode 100644 index 0000000..fc8960a --- /dev/null +++ b/queue_test.go @@ -0,0 +1,62 @@ +// Test for heap writes + +package main + +import ( + "container/heap" + "reflect" + "testing" +) + +type WriteValueT struct { + value []WriteValue + expected []int64 +} + +var s func(s string) []byte = func(s string) []byte { return []byte(s) } // Prop to make string + +var QueueWriteTests = []WriteValueT{ + { + []WriteValue{ + {0, s("1"), 1}, + {2, s("2"), 2}, + {3, s("3"), 3}, + {4, s("4"), 4}, + }, + []int64{0, 2, 3, 4}, + }, + { + []WriteValue{ + {0, s("1"), 1}, + {2, s("2"), 2}, + {2, s("3"), 2}, + {3, s("3"), 3}, + {4, s("4"), 4}, + }, + []int64{0, 2, 2, 3, 4}, // Need to add behaviour for the offset in the writes + }, +} + +func TestWriteQueue(t *testing.T) { + + for _, test := range QueueWriteTests { + + wheap := make(WriteQueue, len(test.value)) + for idx := range test.value { + wheap[idx] = &test.value[idx] + } + + heap.Init(&wheap) + outputs := make([]int64, 0, len(test.value)) + for wheap.Len() > 0 { + item := heap.Pop(&wheap).(*WriteValue) + outputs = append(outputs, item.key) + } + + if !reflect.DeepEqual(outputs, test.expected) { + t.Errorf("Output: does not match expected %v %v", outputs, test.expected) + } + } +} + +// TODO need to benchmark heap vs append to array and sort