diff --git a/CMakeLists.txt b/CMakeLists.txt index c3610a3b..f11eee8c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -283,6 +283,7 @@ endfunction() include(DYADUtils) include_directories(${CMAKE_SOURCE_DIR}/include) # public header add_subdirectory(src/dyad) +add_subdirectory(tests/shuffle) #cmake_policy(SET CMP0079 NEW) # In case that we need more control over the target building order diff --git a/src/dyad/utils/read_all.h b/src/dyad/utils/read_all.h index a7acd4ce..a174bef5 100644 --- a/src/dyad/utils/read_all.h +++ b/src/dyad/utils/read_all.h @@ -32,7 +32,7 @@ ssize_t read_all (int fd, void **bufp); #if defined(__cplusplus) -}; +} #endif // defined(__cplusplus) #endif /* DYAD_UTILS_READ_ALL_H */ diff --git a/tests/shuffle/CMakeLists.txt b/tests/shuffle/CMakeLists.txt new file mode 100644 index 00000000..0ab69460 --- /dev/null +++ b/tests/shuffle/CMakeLists.txt @@ -0,0 +1,34 @@ +find_package(MPI) + +if (MPI_FOUND) + set(DYAD_TEST_SHUFFLE_SRC ${CMAKE_CURRENT_SOURCE_DIR}/shuffle.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/worker.cpp) + set(DYAD_TEST_SHUFFLE_PRIVATE_HEADERS ${CMAKE_CURRENT_SOURCE_DIR}/worker.hpp + ${CMAKE_CURRENT_SOURCE_DIR}/../src/dyad/utils/utils.h + ${CMAKE_CURRENT_SOURCE_DIR}/../src/dyad/utils/read_all.h) + set(DYAD_TEST_SHUFFLE_PUBLIC_HEADERS) + + + add_executable(shuffle ${CMAKE_CURRENT_SOURCE_DIR}/shuffle.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/worker.cpp) + target_compile_definitions(shuffle PUBLIC DYAD_HAS_CONFIG) + target_include_directories(shuffle PUBLIC + $ + $ + $ + $) + target_link_libraries(shuffle PUBLIC ${PROJECT_NAME}_utils MPI::MPI_CXX) + + + if(DYAD_LOGGER STREQUAL "CPP_LOGGER") + target_link_libraries(shuffle PRIVATE ${CPP_LOGGER_LIBRARIES}) + endif() + if(DYAD_PROFILER STREQUAL "DLIO_PROFILER") + target_link_libraries(shuffle PRIVATE ${DLIO_PROFILER_LIBRARIES}) + endif() + + + if (TARGET DYAD_CXX_FLAGS_werror) + target_link_libraries(shuffle PRIVATE DYAD_CXX_FLAGS_werror) + endif () +endif (MPI_FOUND) diff --git a/tests/shuffle/README.md b/tests/shuffle/README.md new file mode 100644 index 00000000..4d6efe93 --- /dev/null +++ b/tests/shuffle/README.md @@ -0,0 +1,19 @@ +Demonstration of input file shuffling in deep learning. + +Deep learning training often requires randomizing the order of input samples at +each epoch. In distributed or parallel training where each worker consumes a +subset of samples, the set of samples to read changes for each worker at every +epoch due to the randomization. +In this demo, we assume that each sample is read from a unique file such as an +image file. We also assume that the amount of input samples is too large to fit +in a single local storage of a worker. Therefore, input data resides on a shared +storage. +With DYAD, a worker will be able to avoid loading files from a shared storage at +every epoch. Instead, it can pull them from the local storages of other workers. +At the beginning, the entire set will be partitioned across workers. Each worker +loads files from its partition into its local storage. +We currently mimic this by creating a set of files under a DYAD-managed +directory on the local storage. Each worker can access any sample as if it exists +locally. +TODO: As the capacity of a local storage reaches its limit, some files will have +to be randomly evicted, especially when loading a new file. diff --git a/tests/shuffle/shuffle.cpp b/tests/shuffle/shuffle.cpp new file mode 100644 index 00000000..1f51fdbf --- /dev/null +++ b/tests/shuffle/shuffle.cpp @@ -0,0 +1,222 @@ +#include // open +#include // basename dirname +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include "worker.hpp" + +using std::cout; +using std::endl; + +int read_list (const std::string& flist_name, std::vector& flist) +{ + std::string item_name; + std::string line; + std::list fnames; + + std::ifstream flist_file; + flist_file.open (flist_name); + if (!flist_file) { + return EXIT_FAILURE; + } + + while (std::getline (flist_file, line)) { + fnames.emplace_back (line); + // std::cout << "file: " << line << std::endl; + } + flist.assign (std::make_move_iterator (fnames.begin ()), + std::make_move_iterator (fnames.end ())); + + return EXIT_SUCCESS; +} + +void create_local_files (const std::string& managed_dir, + const Worker& worker, + const mode_t md = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH) +{ + auto range = worker.get_iterator (); + auto it = range.first; + auto it_end = range.second; + const auto rank = worker.get_rank (); + const auto& flist = worker.get_file_list (); + + std::stringstream ss; + ss << "Owner rank " << rank << endl; + std::string header = ss.str (); + + for (; it != it_end; ++it) { + auto& fn = flist[*it]; + if (fn.empty ()) { + continue; + } + auto path = managed_dir + '/' + fn; + int fd = open (path.c_str (), O_CREAT | O_WRONLY, md); + + std::stringstream ss2; + for (unsigned i = 0u; i <= rank; ++i) { + ss2 << i << endl; + } + std::string str = header + fn + '\n' + ss2.str (); + + write (fd, str.c_str (), str.size ()); + close (fd); + } +} + +void read_files (const std::string& managed_dir, + const Worker& worker, + const bool validate = false) +{ + auto range = worker.get_iterator (); + auto it = range.first; + auto it_end = range.second; + const auto rank = worker.get_rank (); + const auto& flist = worker.get_file_list (); + + std::stringstream ss; + ss << rank; + + for (; it != it_end; ++it) { + auto& fn = flist[*it]; + if (fn.empty ()) { + continue; + } + auto path = managed_dir + '/' + fn; + int fd = open (path.c_str (), O_RDONLY); + char* buf; + auto sz = read_all (fd, reinterpret_cast (&buf)); + close (fd); + + if (validate) { + auto fn_copy = managed_dir + "/copy." + ss.str () + '.' + fn; + std::ofstream ofs; + ofs.open (fn_copy); + ofs.write (buf, sz); + ofs.close (); + } + } +} + +int main (int argc, char** argv) +{ + int rank; + int n_ranks; + bool is_managed_local = false; + unsigned seed = 0u; + unsigned n_epochs = 3u; + int rc = 0; + std::string list_name; + std::string managed_dir; + + if ((argc < 4) || (argc > 6)) { + cout << "usage: " << argv[0] << " list_file managed_dir is_local [n_epochs [seed]]" + << endl; + cout << " list_file: constains a list of files to be generated and used," << endl + << " one per line, without managed_dir prefix." << endl; + cout << " is_local: indicates if the managed_dir is local (1) or shared (0)" << endl; + cout << " seed: if not given, a random seed is used" << endl; + return EXIT_SUCCESS; + } + + list_name = argv[1]; + managed_dir = argv[2]; + is_managed_local = static_cast (atoi (argv[3])); + + if (argc > 4) { + n_epochs = static_cast (atoi (argv[4])); + } + + if (argc > 5) { + seed = static_cast (atoi (argv[5])); + } + + std::vector flist; + // TODO: root can broadcast + rc = read_list (list_name, flist); + + MPI_Init (&argc, &argv); + MPI_Comm_rank (MPI_COMM_WORLD, &rank); + MPI_Comm_size (MPI_COMM_WORLD, &n_ranks); + + if (rc != EXIT_SUCCESS) { + if (rank == 0) { + cout << "Failed to read '" << list_name << "'" << endl; + } + MPI_Abort (MPI_COMM_WORLD, errno); + return rc; + } + + if (is_managed_local) { // if directory is local, everyone should create + mode_t m = (S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH | S_ISGID); + int c = mkdir_as_needed (managed_dir.c_str (), m); + if (c < 0) { + cout << "Rank " << rank + << " could not create directory: '" << managed_dir << "'" << endl; + MPI_Abort (MPI_COMM_WORLD, errno); + return EXIT_FAILURE; + } + } + else if (rank == 0) { + mode_t m = (S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH | S_ISGID); + int c = mkdir_as_needed (managed_dir.c_str (), m); + if (c < 0) { + cout << "Could not create directory: '" << managed_dir << "'" << endl; + MPI_Abort (MPI_COMM_WORLD, errno); + return EXIT_FAILURE; + } + } + MPI_Barrier (MPI_COMM_WORLD); + + Worker worker; + worker.set_seed (seed); + worker.set_rank (rank); + worker.set_file_list (flist); + worker.split (n_ranks); + + /* + const auto rg = worker.get_range(); + + std::stringstream ss; + + ss << "Rank " << rank << " [" << rg.first << " - " << rg.second << "]"; + cout << ss.str() << std::endl; + cout << worker.get_my_list_str() << endl << std::flush; + */ + + if (rank == 0) { + cout << "Preparing local files under the managed directory" << endl; + } + create_local_files (managed_dir, worker, 0644); + + MPI_Barrier (MPI_COMM_WORLD); + + for (unsigned i = 0u; i < n_epochs; ++i) { + if (rank == 0) { + cout << "------ Shuffling ... -------" << endl << std::flush; + } + MPI_Barrier (MPI_COMM_WORLD); + + worker.shuffle (); + // cout << i << ' ' << worker.get_my_list_str() << endl << std::flush; + read_files (managed_dir, worker); + MPI_Barrier (MPI_COMM_WORLD); + } + + MPI_Finalize (); + + return rc; +} diff --git a/tests/shuffle/worker.cpp b/tests/shuffle/worker.cpp new file mode 100644 index 00000000..fc47da41 --- /dev/null +++ b/tests/shuffle/worker.cpp @@ -0,0 +1,102 @@ +#include +#define assertm(exp, msg) assert (((void)msg, exp)) + +#include +#include +#include + +#include "worker.hpp" + +Worker::Worker () : m_rank (0u), m_seed (0u), m_begin (0ul), m_end (0ul) +{ +} + +void Worker::set_rank (const int r) +{ + m_rank = static_cast (r); +} + +void Worker::clear_seed () +{ + m_seed = 0u; +} + +void Worker::set_seed_by_clock () +{ + m_seed = std::chrono::system_clock::now ().time_since_epoch ().count (); + seed (); +} + +void Worker::set_seed (const unsigned int s) +{ + m_seed = s; + seed (); +} + +void Worker::seed () +{ + m_gen.seed (m_seed); +} + +void Worker::set_file_list (std::vector&& flist) +{ + m_flist = flist; + set_file_list (); +} + +void Worker::set_file_list (const std::vector& flist) +{ + m_flist = flist; + set_file_list (); +} + +void Worker::set_file_list () +{ + m_fidx.resize (m_flist.size ()); + std::iota (m_fidx.begin (), m_fidx.end (), 0ul); +} + +void Worker::shuffle () +{ + std::shuffle (m_fidx.begin (), m_fidx.end (), m_gen); +} + +void Worker::split (int n_ranks) +{ + assertm ((n_ranks > 0), "Invalid number of ranks."); + size_t total = m_fidx.size (); + auto chunk = total / static_cast (n_ranks); + auto n_extra = total - chunk * static_cast (n_ranks); + + if (m_rank < n_extra) { + m_begin = (chunk + 1) * m_rank; + m_end = m_begin + chunk + 1; + } else { + m_begin = chunk * m_rank + n_extra; + m_end = m_begin + chunk; + } +} + +std::pair Worker::get_range () +{ + return std::make_pair (m_begin, m_end); +} + +std::pair Worker::get_iterator () const +{ + return std::make_pair (m_fidx.cbegin () + m_begin, m_fidx.cbegin () + m_end); +} + +std::string Worker::get_my_list_str () const +{ + auto range = get_iterator (); + auto it = range.first; + auto it_end = range.second; + std::stringstream ss; + ss << "Rank " << m_rank; + std::string str = ss.str (); + for (; it != it_end; ++it) { + str += ' ' + m_flist[*it]; + } + return str; +} diff --git a/tests/shuffle/worker.hpp b/tests/shuffle/worker.hpp new file mode 100644 index 00000000..49848eff --- /dev/null +++ b/tests/shuffle/worker.hpp @@ -0,0 +1,54 @@ +#ifndef DYAD_TEST_SHUFFLE_WORKER_HPP +#define DYAD_TEST_SHUFFLE_WORKER_HPP + +#include +#include +#include + +class Worker +{ + public: + using idx_it_t = std::vector::const_iterator; + + protected: + using rn_gen_t = std::mt19937; + unsigned int m_rank; + unsigned int m_seed; + rn_gen_t m_gen; + std::vector m_flist; + std::vector m_fidx; + size_t m_begin; + size_t m_end; + + public: + Worker (); + void set_rank (const int r); + unsigned int get_rank () const + { + return m_rank; + } + void clear_seed (); + void set_seed_by_clock (); + void set_seed (const unsigned int s); + void set_file_list (std::vector&& flist); + void set_file_list (const std::vector& flist); + void shuffle (); + void split (int n_ranks); + std::pair get_range (); + const std::vector& get_file_list () const + { + return m_flist; + } + const std::vector& get_indices () const + { + return m_fidx; + } + std::pair get_iterator () const; + std::string get_my_list_str () const; + + protected: + void seed (); + void set_file_list (); +}; + +#endif // DYAD_TEST_SHUFFLE_WORKER_HPP