Skip to content

Commit

Permalink
automatic number of tasks per node
Browse files Browse the repository at this point in the history
  • Loading branch information
cpignedoli committed Oct 10, 2024
1 parent cbc577c commit 7f30328
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 15 deletions.
50 changes: 38 additions & 12 deletions aiida_nanotech_empa/workflows/cp2k/cp2k_benchmark_workchain.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import copy
import pathlib
import re
import math

import numpy as np
from ...utils import common_utils
Expand All @@ -11,6 +12,24 @@

ALLOWED_PROTOCOLS = ["standard"]

def is_perfect_square(x):
"""Check if x is a perfect square"""
root = int(math.isqrt(x))
return root * root == x

def find_multiples_of_ngpus(ngpus,n, max_N):
"""
Returns a list of integers N <= max_N that are multiples of ngpus
and such that n * N is a perfect square.
"""
result = []

for N in range(ngpus, max_N + 1, ngpus): # Only multiples of ngpus
if is_perfect_square(n * N):
result.append(N)

return result

@engine.calcfunction
def analyze_speedup(time_dict):
"""
Expand Down Expand Up @@ -38,14 +57,14 @@ def analyze_speedup(time_dict):
nnodes_str, ntasks_str, nthreads_str = key.split('_')
nnodes = int(nnodes_str)
# Collect time for each nnodes
times_per_nnodes[nnodes].append(time)
if time != 'FAILED':
times_per_nnodes[nnodes].append(time)

# Find the minimum time for each nnodes
min_times_per_nnodes = {}
for nnodes, times in times_per_nnodes.items():
min_time = min(times)
if min_time < 100000:
min_times_per_nnodes[nnodes] = min_time
min_times_per_nnodes[nnodes] = min_time

# Sort nnodes to find the lowest nnodes (reference)
sorted_nnodes = sorted(min_times_per_nnodes.keys())
Expand Down Expand Up @@ -97,7 +116,7 @@ def get_timing_from_FolderData(folder_node=None):
"""
# Load the FolderData node
if folder_node is None:
return orm.Float(100000)
return orm.Str('FAILED')

# Check if 'aiida.out' exists in the FolderData
if 'aiida.out' not in folder_node.list_object_names():
Expand Down Expand Up @@ -184,9 +203,16 @@ def define(cls, spec):
help="List of #nodes to be used in the benchmark.",
)
spec.input(
"list_tasks_per_node",
valid_type=orm.List,
default=lambda: orm.List(list=[6]),
"ngpus",
valid_type=orm.Int,
default=lambda: orm.Int(1),
required=True,
help="Number of GPUs per node.",
)
spec.input(
"max_tasks_per_node",
valid_type=orm.Int,
default=lambda: orm.Int(2),
required=True,
help="List of #tasks per node to be used in the benchmark.",
)
Expand Down Expand Up @@ -237,9 +263,9 @@ def setup(self):
"pseudo": orm.SinglefileData(
file=pathlib.Path(__file__).parent / "data" / "POTENTIAL"
),
"mpswrapper": orm.SinglefileData(
file=pathlib.Path(__file__).parent / "data" / "mps-wrapper.sh"
),
#"mpswrapper": orm.SinglefileData(
# file=pathlib.Path(__file__).parent / "data" / "mps-wrapper.sh"
#),
}
self.ctx.input_dict = cp2k_utils.load_protocol(
"benchmarks.yml", self.inputs.protocol.value)
Expand Down Expand Up @@ -283,7 +309,7 @@ def submit_calculations(self):
mywall=20

# Loop for mpi tasks
for ntasks in self.inputs.list_tasks_per_node:
for ntasks in find_multiples_of_ngpus(self.inputs.ngpus.value,nnodes, self.inputs.max_tasks_per_node.value):
for nthreads in self.inputs.list_threads_per_task:
# Loop for threads,check that nthreads * ntasks <= max_tasks
if nthreads<=self.ctx.max_tasks/ntasks :
Expand Down Expand Up @@ -326,7 +352,7 @@ def finalize(self):

for nnodes in self.inputs.list_nodes:
# Loop for mpi tasks
for ntasks in self.inputs.list_tasks_per_node:
for ntasks in find_multiples_of_ngpus(self.inputs.ngpus.value,nnodes, self.inputs.max_tasks_per_node.value):
# Loop for mpi tasks
for nthreads in self.inputs.list_threads_per_task:
# Check that nthreads * ntasks <= 72
Expand Down
7 changes: 4 additions & 3 deletions examples/workflows/example_cp2k_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

Cp2kBenchmarkWorkChain = plugins.WorkflowFactory("nanotech_empa.cp2k.benchmark")

def _example_cp2k_benchmark(cp2k_code, nnodes, ntasks,nthreads):
def _example_cp2k_benchmark(cp2k_code, nnodes, max_ntasks,nthreads):
# Check test geometries are already in database.
qb = orm.QueryBuilder()
qb.append(
Expand All @@ -36,7 +36,8 @@ def _example_cp2k_benchmark(cp2k_code, nnodes, ntasks,nthreads):
builder.code = cp2k_code
builder.protocol = orm.Str("scf_ot_no_wfn")
builder.list_nodes = orm.List(list=nnodes)
builder.list_tasks_per_node = orm.List(list=ntasks)
builder.max_tasks_per_node = orm.Int(max_ntasks)
builder.ngpus=orm.Int(1)
builder.list_threads_per_task = orm.List(list=nthreads)
builder.metadata.label = "CP2K_Scf"
builder.structure = structures["c2h2.xyz"]
Expand All @@ -50,7 +51,7 @@ def _example_cp2k_benchmark(cp2k_code, nnodes, ntasks,nthreads):
def run_all(cp2k_code):
print("#### Starting benchmark")
_example_cp2k_benchmark(
orm.load_code(cp2k_code),[1],[1],[1,2]
orm.load_code(cp2k_code),[1],8,[1,2]
)

if __name__ == "__main__":
Expand Down

0 comments on commit 7f30328

Please sign in to comment.