-
Notifications
You must be signed in to change notification settings - Fork 1
/
functions.cpp
78 lines (63 loc) · 2.33 KB
/
functions.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#include "include/functions.h"
namespace DeltaSharing
{
const std::vector<std::string> ParseURL(std::string path)
{
std::vector<std::string> urlparts;
std::string url = path;
auto pos = url.find_last_of('#');
if (pos == std::string::npos)
{
std::cerr << "Invalid path: " << url << std::endl;
return std::vector<std::string>();
}
urlparts.push_back(url.substr(0, pos));
url.erase(0, pos + 1);
while ((pos = url.find(".")) != std::string::npos)
{
urlparts.push_back(url.substr(0, pos));
url.erase(0, pos + 1);
}
urlparts.push_back(url);
if (urlparts.size() != 4)
{
std::cerr << "Path does not follow pattern: <server>#<share>.<schema>.<table>, " << path << std::endl;
}
return urlparts;
};
std::shared_ptr<DeltaSharingClient> NewDeltaSharingClient(std::string profile, boost::optional<std::string> cacheLocation)
{
return std::make_shared<DeltaSharingClient>(profile, cacheLocation);
};
const std::shared_ptr<arrow::Table> LoadAsArrowTable(std::string path, int fileno) {
auto p = ParseURL(path);
if(p.size() != 4) {
std::cerr << "PATH NOT CORRECT: " << path << std::endl;
return std::shared_ptr<arrow::Table>();
}
auto cl = NewDeltaSharingClient(p.at(0),boost::none);
DeltaSharingProtocol::Table t;
t.name = p.at(3);
t.schema = p.at(2);
t.share = p.at(1);
auto flist = cl->ListFilesInTable(t);
std::vector<std::thread> writethreads;
for(auto i = 0; i < flist->size(); i++)
{
auto arg = flist->at(i).url;
std::thread th(&DeltaSharingClient::PopulateCache, cl, arg);
writethreads.push_back(std::move(th));
}
for (auto i = writethreads.begin(); i != writethreads.end(); i++) {
if(i->joinable()) {
i->join();
}
}
if(flist->size() > fileno) {
auto f = flist->at(fileno);
std::cerr << "Number of threads supported: " << cl->GetNumberOfThreads() << std::endl;
return cl->LoadAsArrowTable(f.url);
} else
return std::shared_ptr<arrow::Table>();
};
};