Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added runtime thread re-distribution #4

Merged
merged 11 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.20'
go-version: '1.22'

- name: Test and Coverage
run: go test -v -race -cover ./...
Expand Down
2 changes: 2 additions & 0 deletions TestInputs/DistanceMatrix/Random_2_input.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
0 399 294 169 166 763 263 522 179 205 234 404 484 855 948 754 117 562 88 48 264 149 316 118 217 399 606 522 498 187 554 5 571 433 182 563 360 443 581 379 209 229 445 848 173 531 352 284 910 60 742 116 841 946 987 663 991 788 211 923 133 27 24 643 991 690 975 730 608 912 883 286 84 475 758 522 665 887 772 54 227 994 260 417 511 444 578 975 527 650 809 967 287 985 562 457 734 425 964 768 877 932 311 749 355 363 869 408 875 638 520 281 876 251 733 751 432 553 880 2 557 178 583 398 422 261 307 819 831 532 386 714 783 587 620 576 509 863 885 784 796 670 240 599 827 87 68 904 883 243 729 458 322 6 371 398 924 778 196 845 307 288 186 671 877 555 992 899 532 98 399 355 979 499 584 791 597 181 851 648 686 671 281 467 767 361 909 805 416 949 203 618 200 105 406 854 304 274 629 530 476 192 898 315 799 282 589 68 334 909 988 611 719 947 596 800 91 785 491 634 397 185 376 792 813 338 36 915 264 378 451 182 760 675 700 989 582 680 337 652 550 961 90 925 476 701 811 906 178 983 167 450 734 65 381 733 987 777 486 712 279 41 912 489 819 494 433 190 25 27 848 240 638 993 278 930 205 564 741 160 231 862 626 751 115 469 788 735 622 687 463 24 434 148 840 562 511 176 273 321 347 925 796 623 610 428 235 263 592 554 497 238 502 272 87 876 258 742 302 773 858 886 219 342 744 87 946 66 698 988 106 806 258 910 471 907 827 363 979 731 462 892 360 635 152 41 815 693 720 970 903 993 237 289 618 883 173 394 706 418 705 153 917 821 355 141 890 51 357 95 711 929 169 533 518 600 208 45 486 964 463 805 96 511 474 181 426 263 463 24 622 103 687 19 604 818 0 122 117 64 84 996 148 368 896 253 540 442 904 792 277 242 325 115 279 750 247 127 973 722 756 457 868 900 842 76 134 491 975 285 148 455 65 989 5 343 146 112 984 821 46 481 810 227 885 334 300 682 141 7 377 174 644 185 905 903 156 326 136 520 553 591 642 746 872 214 927 824 394 938 137 363 678 759 330 413 974 388 379 425 766 382 311 416 73 603 591 987 599 119 999 14 549 597 886 824 322 269 982 985 222 575 264 728 827 931 344 914 494 809 299 53 640 441 336 595 578 731 202 676 355 475 388 268 54 584 674 445 944 823 636 495 522 925 405 370 304 616 699 298 298 686 959 554 456 713 975 948 924 885 348 599 633 821 413 619 288 544 356 340 543 361 159 913 330 828 110 565 700 967 689 531 878 266 942 820 918 883 527 918 927 411 200 35 388 288 761 510 97 884 484 789 391 85 16 448 318 826 88 834 391 484 833 995 740 503 783 150 932 938 735 15 647 902 685 152 604 145 808 529 101 766 449 113 750 600 770 746 559 477 91 774 919 523 316 231 583 714 957 875 662 636 593 412 463 185 444 281 127 575 294 773 723 159 797 722 47 394 566 70 735 502 836 138 210 750 913 29 18 28 743 273 656 126 288 370 487 811 137 261 82 270 154 394 607 426 751 605 431 825 585 402 435 318 851 704 708 520 967 979 309 414 284 392 272 242 128 562 12 469 398 652 132 457 689 922 417 259 271 119 560 453 14 869 942 845 181 646 199 496 990 418 600 24 652 621 889 627 321 201 242 314 23 496 9 885 486 587 542 28 724 927 159 1 181 53 977 926 805 209 453 373 746 75 279 813 391 679 445 12 310 728 96 577 414 729 591 914 630 845 668 424 919 352 52 356 108 933 6 95 540 549 501 716 420 773 292 984 550 947 493 308 759 143 871 785 139 676 533 917 413 789 364 485 558 228 309 285 483 104 411 294 155 331 184 730 473 337 476 304 58 633 783 220 190 206 848 796 165 575 996 539 863 626 596 354 322 444 614 978 300 846 642 444 407 892 970 686 926 88 305 201 82 39 282 907 687 15 605 47 616 979 143 55 117 532 907 252 210 715 539 89 624 223 757 314 291 346 734 639 984 705 909 130 235 658 596 3 385 54 325 898 641 571 5 767 289 842 964 500 705 634 606 952 932 376 622 370 526 476 700 419 9 342 73 20 835 266 965 339 420 653 448 427 979 999 496 24 445 922 902 660 491 4 365 5 537 163 567 976 2 850 175 686 853 566 211 127 20 529 401 753 700 483 470 713 346 428 91 2 223 224 613 388 246 60 304 641 278 372 631 750 114 172 54 223 789 674 612 506 180 795 735 929 971 681 20 717 451 389 763
1 409 645 664 609 522 814 849 716 290 465 520 856 328 560 340 604 438 611 953 731 957 856 620 708 410 703 108 247 936 712 779 272 26 710 738 135 390 221 377 653 701 963 739 323 666 309 662 723 833 999 759 366 41 522 989 633 302 199 524 727 836 119 184 567 389 475 233 866 720 839 674 336 902 684 590 647 990 487 675 725 942 464 490 318 741 968 709 940 83 698 728 820 611 828 684 540 127 125 237 181 521 898 259 908 577 874 197 858 947 858 917 549 221 566 111 689 301 816 763 950 298 283 58 178 255 425 802 562 919 921 62 683 24 910 552 879 568 852 917 918 259 296 947 775 384 73 930 583 488 583 358 34 481 418 879 987 674 359 496 83 101 471 905 285 146 832 803 643 732 571 280 861 191 478 432 824 317 27 269 8 225 611 402 965 632 529 65 120 146 261 60 31 137 611 365 227 511 117 726 484 14 212 490 644 986 627 717 969 146 325 712 643 943 321 952 416 590 59 376 940 310 268 394 929 535 955 125 621 187 108 567 910 120 807 49 781 176 989 318 213 425 130 996 86 690 594 615 623 473 511 610 314 217 841 375 409 635 449 100 838 131 264 230 241 617 794 928 465 866 547 410 712 850 169 358 358 888 396 677 975 61 210 758 63 397 541 513 338 439 945 67 415 437 253 446 303 838 583 996 92 391 72 269 525 376 934 43 681 57 195 760 964 105 234 63 622 207 488 396 870 977 826 102 758 567 310 680 273 327 231 121 528 434 703 636 952 643 972 80 962 124 303 686 690 565 105 916 869 640 562 627 707 422 42 881 736 746 337 663 278 772 528 518 792 236 376 558 845 304 106 863 382 149 12 646 849 728 511 104 213 437 425 832 194 550 468 107 763 369 403 659 379 884 181 856 671 62 849 623 68 509 151 724 982 805 560 721 956 280 978 163 438 162 22 576 404 905 459 189 917 160 559 25 848 432 466 869 474 596 137 41 429 721 564 274 443 974 314 485 272 273 487 963 883 864 228 743 512 75 150 476 720 126 930 122 293 523 187 289 339 12 594 948 346 788 723 881 711 532 475 715 760 882 270 755 861 597 924 869 46 138 839 634 600 592 478 117 859 387 396 829 137 343 862 594 382 74 306 84 286 562 176 193 688 497 715 353 309 644 770 329 285 204 777 121 762 393 983 853 264 5 668 434 515 459 938 223 705 741 151 669 269 136 961 132 697 4 500 463 800 705 997 507 936 334 465 501 523 871 749 515 203 19 888 487 357 598 533 336 640 969 737 535 430 401 821 864 290 98 643 849 987 983 986 53 111 951 917 642 599 230 715 910 599 988 470 263 461 869 271 603 604 332 104 193 283 658 918 687 101 540 745 830 520 831 603 805 201 746 659 287 413 76 914 12 221 199 677 144 271 836 494 566 921 530 301 872 363 87 882 545 663 18 725 940 578 253 77 534 805 864 939 510 697 525 917 911 852 931 26 357 246 347 590 664 544 440 224 687 638 889 707 612 28 663 39 491 527 815 708 598 124 991 119 203 550 307 893 919 66 820 877 423 745 440 336 368 823 146 127 18 289 780 656 144 748 81 14 167 319 359 829 382 233 166 383 742 194 821 852 478 854 579 475 756 497 246 51 45 170 682 458 139 624 150 621 783 196 54 280 556 376 438 76 968 803 724 766 185 742 134 401 401 836 255 80 7 96 506 751 417 312 251 53 724 670 0 875 398 657 578 827 739 997 927 748 629 398 669 157 317 846 577 799 697 858 970 927 536 319 233 966 336 621 847 693 466 847 4 441 794 645 617 249 276 257 876 701 191 735 769 100 177 698 590 372 485 322 258 317 898 339 542 295 44 636 962 506 589 182 622 762 127 686 982 708 281 846 983 950 696 656 643 630 456 890 353 146 762 773 306 965 196 248 890 572 872 623 528 785 66 806 965 764 504 667 475 95 685 291 305 530 209 106 798 509 681 150 379 901 666 211 794 965 286 920 656 364 903 934 844 26 694 891 703 390 2 385 312 366 61 616 965 853 393 980 316 881 156 142 639 976 399 8 793 424 10 293 139 814 827 260 485 943 686 452 27 556 445 5 831 636 392 901 959 679 275 120 381 864 130 3 830 413 919 291 319 873 275 185 130 244 927 65 272 161 228 637 224 753 24 746 190 516 935 998 502 852 106 52 985 660 109 370 184 394 378 668 540 230 315 950 774 736 927 283 429 184 290 649 316 136 987 662 918 171 57 810 639 161 243 588 61 522 396 60 127 568 748 244
20 changes: 10 additions & 10 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

import (
"bytes"
"io/ioutil"
"os"
"path"
"testing"
)
Expand All @@ -24,8 +24,8 @@ func TestCluster(t *testing.T) {
output_buffer.Flush()
out_file.Close()

f1, _ := ioutil.ReadFile(output_tree)
f2, _ := ioutil.ReadFile(expected_output_tree)
f1, _ := os.ReadFile(output_tree)
f2, _ := os.ReadFile(expected_output_tree)
if !bytes.Equal(f1, f2) {
t.Fatal("Input and output files to not match.")
}
Expand All @@ -37,13 +37,13 @@ type LinkageMethodTest struct {
}

var LinkageMethodTests = []LinkageMethodTest{
LinkageMethodTest{0, "average"},
LinkageMethodTest{1, "centroid"},
LinkageMethodTest{2, "complete"},
LinkageMethodTest{3, "mcquitty"},
LinkageMethodTest{4, "median"},
LinkageMethodTest{5, "single"},
LinkageMethodTest{6, "ward"},
{0, "average"},
{1, "centroid"},
{2, "complete"},
{3, "mcquitty"},
{4, "median"},
{5, "single"},
{6, "ward"},
}

func TestGetLinkageMethod(t *testing.T) {
Expand Down
144 changes: 84 additions & 60 deletions data_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,61 +22,81 @@ Determine how many bins of the input dataset should be processed when running th
The bucket size means the x Profiles will be processed by a thread, which will directly
relate to how many go routines are run at a time.
*/
func CalculateBucketSize(data_length int, runtime_cpus int, cpu_modifier int) int {
if cpu_modifier <= 0 {
log.Fatal("CPU modifier must be greater than 0")
func CalculateBucketSize(data_length int, minimum_bins int, bucket_increase int) (int, int) {

if minimum_bins == 0 {
log.Fatal("You must have a CPU modifier value greater than 0")
}

bucket_size := (data_length / minimum_bins)
if bucket_size == 0 {
bucket_size++
}

remainder := data_length % minimum_bins

if remainder == 0 {
/*
when the remainder is 0 there is now spill over resulting in a
temporary reduction in the number of "bins used". As there is no
spill over bin.
*/
bucket_size--
}
bucket_size := data_length / (runtime_cpus * cpu_modifier)
return bucket_size

if data_length < bucket_size {
return data_length, 1
}

if bucket_size < minimum_bins {
bucket_size *= bucket_increase
minimum_bins = data_length / bucket_size
}

return bucket_size, minimum_bins
}

// A pair containing the start and end values for a given range of data to be processed.
type Bucket struct {
start, end int
}

// Get the difference in indices between the two bucket fields
func (t *Bucket) Diff() int {
return t.end - t.start
}

// The distance metric for a given comparison
type ComparedProfile struct {
compared, reference *string
distance float64
}

/*
For a given data set determine the the start and end range of each of the bins to be used.
e.g. if a dataset has 1000 profiles, and our bucket size is 500 we will create bins with
an of [0, 500], [500, 1000]
*/
func BucketsIndices(data_length int, bucket_size int) []Bucket {
var bucks []Bucket
cpu_load_factor := CPU_LOAD_FACTOR // Need to add description to global options
window := bucket_size

cpu_load_string := fmt.Sprintf("CPU load factor x%d", cpu_load_factor)
log.Println(cpu_load_string)

if window > data_length {
bucks = append(bucks, Bucket{0, data_length})
log.Println("Running single threaded as there are too few entries to justify multithreading.")
return bucks
}
// Calculate the initial bin sizes to use for running profiles in parallel
func CreateBucketIndices(data_length int, bucket_size int, modifier int) []Bucket {
var buckets []Bucket

if data_length < (runtime.NumCPU() * cpu_load_factor) {
bucks = append(bucks, Bucket{0, data_length})
log.Println("Running single threaded as there are too few entries to justify multithreading.")
return bucks
if (data_length-modifier) < bucket_size || bucket_size > data_length {
// Just return the one set of indices the values are small enough
buckets = append(buckets, Bucket{modifier, data_length})
return buckets
}

for i := window; i < data_length; i = i + window {
bucks = append(bucks, Bucket{i - window, i})
for i := (bucket_size + modifier); i < data_length; i = i + bucket_size {
new_bucket := Bucket{i - bucket_size, i}
buckets = append(buckets, new_bucket)
}

bucks = append(bucks, Bucket{bucks[len(bucks)-1].end, data_length})
final_start := buckets[len(buckets)-1].end
final_end := data_length

threads_running := fmt.Sprintf("Using %d threads for running.", len(bucks)-1)
log.Println(threads_running)
profiles_to_thread := fmt.Sprintf("Allocating ~%d profiles per a thread.", window)
log.Println(profiles_to_thread)
return bucks
if final_end-final_start < bucket_size {
// Extend the last index if required if it is very small
buckets[len(buckets)-1].end = data_length
} else {
buckets = append(buckets, Bucket{final_start, final_end})
}
return buckets
}

// Compute profile differences in a given go routine.
Expand All @@ -103,7 +123,8 @@ them directly to the passed in bufio.Writer.
func RunData(profile_data *[]*Profile, f *bufio.Writer) {
/* Schedule and arrange the calculation of the data in parallel
This function is quite large and likely has room for optimization.
TODO redistribute data across threads at run time

Once day an incredible optimization here would be to go lockless, or re-use threads
*/

start := time.Now()
Expand All @@ -113,49 +134,52 @@ func RunData(profile_data *[]*Profile, f *bufio.Writer) {

bucket_index := 0
empty_name := ""
const cpu_modifier = 2
bucket_size := CalculateBucketSize(len(data), runtime.NumCPU(), cpu_modifier)
buckets := BucketsIndices(len(data), bucket_size)
arr_pos := 1
const cpu_modifier = 3
data_size := len(data)
minimum_buckets := runtime.NumCPU() * cpu_modifier
bucket_size, _ := CalculateBucketSize(data_size, minimum_buckets, cpu_modifier)
buckets := CreateBucketIndices(data_size, bucket_size, 0)
format_expression := GetFormatString()

// TODO can create a pool of go routines and pass the profile to compare to each channel
initial_bucket_location := buckets[0].start
var wg sync.WaitGroup
for g := range data[0:] {
profile_comp := data[g] // copy struct for each thread

for idx := range data {
profile_comp := data[idx] // copy struct for each thread
values_write := make([]*[]*ComparedProfile, len(buckets)-bucket_index)
// TODO an incredible optimization here would be to go lockless, or re-use threads
for i := bucket_index; i < len(buckets); i++ {
array_writes := make([]*ComparedProfile, buckets[i].end-buckets[i].start)
values_write[i-bucket_index] = &array_writes
for b_idx, b := range buckets {
array_writes := make([]*ComparedProfile, b.Diff())
values_write[b_idx] = &array_writes
wg.Add(1)
go func(output_array *[]*ComparedProfile, bucket_compute Bucket, profile_compare *Profile) {
ThreadExecution(&data, profile_compare, bucket_compute, dist, output_array)
wg.Done()
}(&array_writes, buckets[i], profile_comp)
}(&array_writes, b, profile_comp)
}
wg.Wait() // Wait for everyone to catch up
buckets[bucket_index].start++ // update the current buckets tracker

wg.Wait() // Wait for everyone to catch up
buckets[0].start++ // update the current buckets tracker

for _, i := range values_write {
for _, value := range *i {
fmt.Fprintf(f, format_expression, *(*value).compared, *(*value).reference, (*value).distance)
}
}

if len(buckets) > 1 && arr_pos%bucket_size == 0 {
for f := buckets[bucket_index].end - bucket_size; f < buckets[bucket_index].end; f++ {
data[f].profile = nil
data[f].name = empty_name

if len(buckets) != 1 && buckets[0].Diff() < minimum_buckets {
bucket_size, minimum_buckets = CalculateBucketSize(data_size-idx, minimum_buckets, cpu_modifier)
buckets = CreateBucketIndices(data_size, bucket_size, idx)
for index := initial_bucket_location; index < buckets[0].start; index++ {
data[index].profile = nil
data[index].name = empty_name
}
bucket_index++
end := time.Now().Sub(start)
thread_depletion_time := fmt.Sprintf("One thread depleted in: %fs", end.Seconds())
initial_bucket_location = buckets[0].start
buckets[0].start++ // start index is reserved so needs to be incremented
end := time.Since(start)
thread_depletion_time := fmt.Sprintf("Redistributing data across threads. %fs", end.Seconds())
log.Println(thread_depletion_time)
start = time.Now()
}
arr_pos++

}
wg.Wait()
f.Flush()
Expand Down
90 changes: 83 additions & 7 deletions data_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

import (
"bytes"
"io/ioutil"
"os"
"path"
"testing"
)
Expand All @@ -12,13 +12,13 @@ type bucket_tests struct {
}

var bucket_size_tests = []bucket_tests{
bucket_tests{10, 1, 1, 10},
bucket_tests{10, 2, 1, 5},
{10, 1, 1, 9},
{10, 2, 1, 4},
}

func TestCalculateBucketSize(t *testing.T) {
for _, test := range bucket_size_tests {
if output := CalculateBucketSize(test.data_length, test.bucket_size, test.cpu_modifier); output != test.expected {
if output, _ := CalculateBucketSize(test.data_length, test.bucket_size, test.cpu_modifier); output != test.expected {
t.Errorf("Output %d not equal to expected %d", output, test.expected)
t.Errorf("Output %+v", output)
}
Expand Down Expand Up @@ -47,10 +47,86 @@ func TestRunData(t *testing.T) {
out_file.Close()

// Compare outputs line by line
f1, _ := ioutil.ReadFile(test_expected_output)
f2, _ := ioutil.ReadFile(test_output_file)

f1, _ := os.ReadFile(test_expected_output)
f2, _ := os.ReadFile(test_output_file)
if !bytes.Equal(f1, f2) {
t.Fatal("Input and output files to not match.")
}
}

func TestRunDataSmall(t *testing.T) {
tempdir := t.TempDir()

t.Log("Starting end to end test for distance calculations.")
test_input := "TestInputs/DistanceMatrix/Random_2_input.txt"
test_output_file := path.Join(tempdir, "output.txt")

t.Logf("Test Input: %s", test_input)
t.Logf("Test Output Temp File: %s", test_output_file)
t.Log("Creating output buffer.")
out_buffer, out_file := CreateOutputBuffer(test_output_file)

t.Log("Loading test allele profiles.")
test_data := LoadProfile(test_input)
RunData(test_data, out_buffer)
out_buffer.Flush()
out_file.Close()

// Compare outputs line by line
f2, _ := os.ReadFile(test_output_file)
output := string(f2)
if output != "1 1 0\n" {
t.Fatal("Input does not equal output.")
}
}

// Testing the redistribution of bucket indices at runtime
func TestRedistributeBuckets(t *testing.T) {
var profile_size int = 100
var cpus int = 6
BUCKET_SCALE = 2
minimum_bucket_size := cpus * BUCKET_SCALE
var buckets int
buckets, minimum_bucket_size = CalculateBucketSize(profile_size, minimum_bucket_size, BUCKET_SCALE)
bucket_indices := CreateBucketIndices(profile_size, buckets, 0)

comparisons := make([][]int, profile_size)
for idx := range comparisons {
comparisons[idx] = make([]int, 0)
}

corrected_profile_size := profile_size
for val := range profile_size {
for _, b := range bucket_indices {
for i := b.start; i < b.end; i++ {
comparisons[val] = append(comparisons[val], i)
}
}

if len(bucket_indices) != 1 && bucket_indices[0].Diff() < minimum_bucket_size {
buckets, minimum_bucket_size = CalculateBucketSize(profile_size-val, minimum_bucket_size, BUCKET_SCALE)
bucket_indices = CreateBucketIndices(profile_size, buckets, val)
}
bucket_indices[0].start++
}

profile_sizes := 0
// Check correct number of values computed
for idx := profile_sizes; idx != 0; idx-- {
if len(comparisons[idx]) != corrected_profile_size {
t.Fatalf("Mismatched number of outputs for entry for index %d: %d != %d", idx, profile_sizes, len(comparisons[idx]))
}
profile_sizes++
}

// Check content
for val := range corrected_profile_size {
idx := val
for _, i := range comparisons[val] {
if i != idx {
t.Fatalf("Index: %d, Incorrect outputs: %d != %d", val, i, idx)
}
idx++
}
}
}
Loading
Loading