Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ endfunction()
include(DYADUtils)
include_directories(${CMAKE_SOURCE_DIR}/include) # public header
add_subdirectory(src/dyad)
add_subdirectory(tests/shuffle)
#cmake_policy(SET CMP0079 NEW) # In case that we need more control over the target building order


Expand Down
2 changes: 1 addition & 1 deletion src/dyad/utils/read_all.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ ssize_t
read_all (int fd, void **bufp);

#if defined(__cplusplus)
};
}
#endif // defined(__cplusplus)

#endif /* DYAD_UTILS_READ_ALL_H */
34 changes: 34 additions & 0 deletions tests/shuffle/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
find_package(MPI)

if (MPI_FOUND)
set(DYAD_TEST_SHUFFLE_SRC ${CMAKE_CURRENT_SOURCE_DIR}/shuffle.cpp
${CMAKE_CURRENT_SOURCE_DIR}/worker.cpp)
set(DYAD_TEST_SHUFFLE_PRIVATE_HEADERS ${CMAKE_CURRENT_SOURCE_DIR}/worker.hpp
${CMAKE_CURRENT_SOURCE_DIR}/../src/dyad/utils/utils.h
${CMAKE_CURRENT_SOURCE_DIR}/../src/dyad/utils/read_all.h)
set(DYAD_TEST_SHUFFLE_PUBLIC_HEADERS)


add_executable(shuffle ${CMAKE_CURRENT_SOURCE_DIR}/shuffle.cpp
${CMAKE_CURRENT_SOURCE_DIR}/worker.cpp)
target_compile_definitions(shuffle PUBLIC DYAD_HAS_CONFIG)
target_include_directories(shuffle PUBLIC
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}/src>
$<BUILD_INTERFACE:${CMAKE_BINARY_DIR}/src>
$<BUILD_INTERFACE:${CMAKE_BINARY_DIR}/include>
$<INSTALL_INTERFACE:${DYAD_INSTALL_INCLUDE_DIR}>)
target_link_libraries(shuffle PUBLIC ${PROJECT_NAME}_utils MPI::MPI_CXX)


if(DYAD_LOGGER STREQUAL "CPP_LOGGER")
target_link_libraries(shuffle PRIVATE ${CPP_LOGGER_LIBRARIES})
endif()
if(DYAD_PROFILER STREQUAL "DLIO_PROFILER")
target_link_libraries(shuffle PRIVATE ${DLIO_PROFILER_LIBRARIES})
endif()


if (TARGET DYAD_CXX_FLAGS_werror)
target_link_libraries(shuffle PRIVATE DYAD_CXX_FLAGS_werror)
endif ()
endif (MPI_FOUND)
19 changes: 19 additions & 0 deletions tests/shuffle/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
Demonstration of input file shuffling in deep learning.

Deep learning training often requires randomizing the order of input samples at
each epoch. In distributed or parallel training where each worker consumes a
subset of samples, the set of samples to read changes for each worker at every
epoch due to the randomization.
In this demo, we assume that each sample is read from a unique file such as an
image file. We also assume that the amount of input samples is too large to fit
in a single local storage of a worker. Therefore, input data resides on a shared
storage.
With DYAD, a worker will be able to avoid loading files from a shared storage at
every epoch. Instead, it can pull them from the local storages of other workers.
At the beginning, the entire set will be partitioned across workers. Each worker
loads files from its partition into its local storage.
We currently mimic this by creating a set of files under a DYAD-managed
directory on the local storage. Each worker can access any sample as if it exists
locally.
TODO: As the capacity of a local storage reaches its limit, some files will have
to be randomly evicted, especially when loading a new file.
222 changes: 222 additions & 0 deletions tests/shuffle/shuffle.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
#include <fcntl.h> // open
#include <libgen.h> // basename dirname
#include <mpi.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>

#include <cerrno>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <fstream>
#include <iostream>
#include <list>
#include <sstream>
#include <vector>

#include <dyad/utils/read_all.h>
#include <dyad/utils/utils.h>
#include "worker.hpp"

using std::cout;
using std::endl;

int read_list (const std::string& flist_name, std::vector<std::string>& flist)
{
std::string item_name;
std::string line;
std::list<std::string> fnames;

std::ifstream flist_file;
flist_file.open (flist_name);
if (!flist_file) {
return EXIT_FAILURE;
}

while (std::getline (flist_file, line)) {
fnames.emplace_back (line);
// std::cout << "file: " << line << std::endl;
}
flist.assign (std::make_move_iterator (fnames.begin ()),
std::make_move_iterator (fnames.end ()));

return EXIT_SUCCESS;
}

void create_local_files (const std::string& managed_dir,
const Worker& worker,
const mode_t md = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)
{
auto range = worker.get_iterator ();
auto it = range.first;
auto it_end = range.second;
const auto rank = worker.get_rank ();
const auto& flist = worker.get_file_list ();

std::stringstream ss;
ss << "Owner rank " << rank << endl;
std::string header = ss.str ();

for (; it != it_end; ++it) {
auto& fn = flist[*it];
if (fn.empty ()) {
continue;
}
auto path = managed_dir + '/' + fn;
int fd = open (path.c_str (), O_CREAT | O_WRONLY, md);

std::stringstream ss2;
for (unsigned i = 0u; i <= rank; ++i) {
ss2 << i << endl;
}
std::string str = header + fn + '\n' + ss2.str ();

write (fd, str.c_str (), str.size ());
close (fd);
}
}

void read_files (const std::string& managed_dir,
const Worker& worker,
const bool validate = false)
{
auto range = worker.get_iterator ();
auto it = range.first;
auto it_end = range.second;
const auto rank = worker.get_rank ();
const auto& flist = worker.get_file_list ();

std::stringstream ss;
ss << rank;

for (; it != it_end; ++it) {
auto& fn = flist[*it];
if (fn.empty ()) {
continue;
}
auto path = managed_dir + '/' + fn;
int fd = open (path.c_str (), O_RDONLY);
char* buf;
auto sz = read_all (fd, reinterpret_cast<void**> (&buf));
close (fd);

if (validate) {
auto fn_copy = managed_dir + "/copy." + ss.str () + '.' + fn;
std::ofstream ofs;
ofs.open (fn_copy);
ofs.write (buf, sz);
ofs.close ();
}
}
}

int main (int argc, char** argv)
{
int rank;
int n_ranks;
bool is_managed_local = false;
unsigned seed = 0u;
unsigned n_epochs = 3u;
int rc = 0;
std::string list_name;
std::string managed_dir;

if ((argc < 4) || (argc > 6)) {
cout << "usage: " << argv[0] << " list_file managed_dir is_local [n_epochs [seed]]"
<< endl;
cout << " list_file: constains a list of files to be generated and used," << endl
<< " one per line, without managed_dir prefix." << endl;
cout << " is_local: indicates if the managed_dir is local (1) or shared (0)" << endl;
cout << " seed: if not given, a random seed is used" << endl;
return EXIT_SUCCESS;
}

list_name = argv[1];
managed_dir = argv[2];
is_managed_local = static_cast<bool> (atoi (argv[3]));

if (argc > 4) {
n_epochs = static_cast<unsigned> (atoi (argv[4]));
}

if (argc > 5) {
seed = static_cast<unsigned> (atoi (argv[5]));
}

std::vector<std::string> flist;
// TODO: root can broadcast
rc = read_list (list_name, flist);

MPI_Init (&argc, &argv);
MPI_Comm_rank (MPI_COMM_WORLD, &rank);
MPI_Comm_size (MPI_COMM_WORLD, &n_ranks);

if (rc != EXIT_SUCCESS) {
if (rank == 0) {
cout << "Failed to read '" << list_name << "'" << endl;
}
MPI_Abort (MPI_COMM_WORLD, errno);
return rc;
}

if (is_managed_local) { // if directory is local, everyone should create
mode_t m = (S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH | S_ISGID);
int c = mkdir_as_needed (managed_dir.c_str (), m);
if (c < 0) {
cout << "Rank " << rank
<< " could not create directory: '" << managed_dir << "'" << endl;
MPI_Abort (MPI_COMM_WORLD, errno);
return EXIT_FAILURE;
}
}
else if (rank == 0) {
mode_t m = (S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH | S_ISGID);
int c = mkdir_as_needed (managed_dir.c_str (), m);
if (c < 0) {
cout << "Could not create directory: '" << managed_dir << "'" << endl;
MPI_Abort (MPI_COMM_WORLD, errno);
return EXIT_FAILURE;
}
}
MPI_Barrier (MPI_COMM_WORLD);

Worker worker;
worker.set_seed (seed);
worker.set_rank (rank);
worker.set_file_list (flist);
worker.split (n_ranks);

/*
const auto rg = worker.get_range();

std::stringstream ss;

ss << "Rank " << rank << " [" << rg.first << " - " << rg.second << "]";
cout << ss.str() << std::endl;
cout << worker.get_my_list_str() << endl << std::flush;
*/

if (rank == 0) {
cout << "Preparing local files under the managed directory" << endl;
}
create_local_files (managed_dir, worker, 0644);

MPI_Barrier (MPI_COMM_WORLD);

for (unsigned i = 0u; i < n_epochs; ++i) {
if (rank == 0) {
cout << "------ Shuffling ... -------" << endl << std::flush;
}
MPI_Barrier (MPI_COMM_WORLD);

worker.shuffle ();
// cout << i << ' ' << worker.get_my_list_str() << endl << std::flush;
read_files (managed_dir, worker);
MPI_Barrier (MPI_COMM_WORLD);
}

MPI_Finalize ();

return rc;
}
102 changes: 102 additions & 0 deletions tests/shuffle/worker.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#include <cassert>
#define assertm(exp, msg) assert (((void)msg, exp))

#include <algorithm>
#include <chrono>
#include <sstream>

#include "worker.hpp"

Worker::Worker () : m_rank (0u), m_seed (0u), m_begin (0ul), m_end (0ul)
{
}

void Worker::set_rank (const int r)
{
m_rank = static_cast<unsigned int> (r);
}

void Worker::clear_seed ()
{
m_seed = 0u;
}

void Worker::set_seed_by_clock ()
{
m_seed = std::chrono::system_clock::now ().time_since_epoch ().count ();
seed ();
}

void Worker::set_seed (const unsigned int s)
{
m_seed = s;
seed ();
}

void Worker::seed ()
{
m_gen.seed (m_seed);
}

void Worker::set_file_list (std::vector<std::string>&& flist)
{
m_flist = flist;
set_file_list ();
}

void Worker::set_file_list (const std::vector<std::string>& flist)
{
m_flist = flist;
set_file_list ();
}

void Worker::set_file_list ()
{
m_fidx.resize (m_flist.size ());
std::iota (m_fidx.begin (), m_fidx.end (), 0ul);
}

void Worker::shuffle ()
{
std::shuffle (m_fidx.begin (), m_fidx.end (), m_gen);
}

void Worker::split (int n_ranks)
{
assertm ((n_ranks > 0), "Invalid number of ranks.");
size_t total = m_fidx.size ();
auto chunk = total / static_cast<size_t> (n_ranks);
auto n_extra = total - chunk * static_cast<size_t> (n_ranks);

if (m_rank < n_extra) {
m_begin = (chunk + 1) * m_rank;
m_end = m_begin + chunk + 1;
} else {
m_begin = chunk * m_rank + n_extra;
m_end = m_begin + chunk;
}
}

std::pair<size_t, size_t> Worker::get_range ()
{
return std::make_pair (m_begin, m_end);
}

std::pair<Worker::idx_it_t, Worker::idx_it_t> Worker::get_iterator () const
{
return std::make_pair (m_fidx.cbegin () + m_begin, m_fidx.cbegin () + m_end);
}

std::string Worker::get_my_list_str () const
{
auto range = get_iterator ();
auto it = range.first;
auto it_end = range.second;
std::stringstream ss;
ss << "Rank " << m_rank;
std::string str = ss.str ();
for (; it != it_end; ++it) {
str += ' ' + m_flist[*it];
}
return str;
}
Loading