Skip to content

Commit

Permalink
Dispatch TileLoadWork properly
Browse files Browse the repository at this point in the history
  • Loading branch information
csciguy8 committed Nov 6, 2023
1 parent 480e065 commit 08e21b3
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 81 deletions.
18 changes: 10 additions & 8 deletions Cesium3DTilesSelection/include/Cesium3DTilesSelection/Tileset.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,14 @@ class RequestDispatcher
_pAssetAccessor(pAssetAccessor) {}
~RequestDispatcher() noexcept {}

void SetRequestHeaders(
std::vector<CesiumAsync::IAssetAccessor::THeader>& requestHeaders) {
_requestHeaders = requestHeaders;
}

void QueueLoadWork(std::vector<TileLoadWork>& work);
void QueueRequestWork(
std::vector<TileLoadWork>& work,
std::vector<CesiumAsync::IAssetAccessor::THeader>& requestHeaders);

void WakeIfNeeded();

void TakeCompletedWork(size_t maxCount, std::vector<TileLoadWork>& out);

private:
void dispatchRequest(TileLoadWork& request);
bool stageRequestWork(
Expand Down Expand Up @@ -570,11 +569,14 @@ class CESIUM3DTILESSELECTION_API Tileset final {
double priority);

void discoverLoadWork(
std::vector<TileLoadRequest> requests,
std::vector<TileLoadWork>& out);
std::vector<TileLoadRequest>& requests,
std::vector<TileLoadWork>& outRequests,
std::vector<TileLoadWork>& outProcessing);

void addLoadWorkToRequestDispatcher(std::vector<TileLoadWork>& newLoadWork);

void dispatchProcessingWork(std::vector<TileLoadWork>& workVector);

static TraversalDetails createTraversalDetailsForSingleTile(
const FrameState& frameState,
const Tile& tile,
Expand Down
182 changes: 109 additions & 73 deletions Cesium3DTilesSelection/src/Tileset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1418,73 +1418,72 @@ Tileset::TraversalDetails Tileset::_visitVisibleChildrenNearToFar(
void Tileset::_processWorkerThreadLoadQueue() {
CESIUM_TRACE("Tileset::_processWorkerThreadLoadQueue");

#if 1 // New path
// TODO
// -) Take out old maximumSimultaneousTileLoads throttling
// -) Remove setState(TileLoadState::ContentLoading) loading everywhere else
// -) Check on Unreal asset accessor (or leave it and make sure there is no
// network activity
// -) Dispatch normal doTileContentWork based on dispatcher output
// -) Modify doTileContentWork to not do CachingAccessor, or leave it
// -) go over TODOS
// -) Use worker thread not thread pool?

int32_t maxTileLoads =
static_cast<int32_t>(this->_options.maximumSimultaneousTileLoads);

// TODO, do we need to move this now?
if (_pTilesetContentManager->getNumberOfTilesLoading() >= maxTileLoads)
return;

std::vector<TileLoadWork> newLoadWork;
discoverLoadWork(this->_workerThreadLoadQueue, newLoadWork);
std::vector<TileLoadWork> newRequestWork;
std::vector<TileLoadWork> newProcessingWork;
discoverLoadWork(
this->_workerThreadLoadQueue,
newRequestWork,
newProcessingWork);

// Add all content requests to Request queue
addLoadWorkToRequestDispatcher(newLoadWork);

_pRequestDispatcher->SetRequestHeaders(
this->_pTilesetContentManager->getRequestHeaders());
addLoadWorkToRequestDispatcher(newRequestWork);

// Wake up the network request dispatcher
_pRequestDispatcher->WakeIfNeeded();

// Work broken down into load units. Either Tile content work or Raster work.
// TODO issue Tile url request work here and remove from doTileContentWork
//
// We have two input streams of processing work
// - Work that came in from update view this frame
// - Work that had a response that just completed
//
// Give preference to responses that just came in, these are older
//
std::vector<TileLoadWork> workToDispatch;

std::sort(newLoadWork.begin(), newLoadWork.end());

for (TileLoadWork& work : newLoadWork) {
int32_t numberOfTilesLoading =
this->_pTilesetContentManager->getNumberOfTilesLoading();
int32_t maximumSimultaneousTileLoads =
static_cast<int32_t>(this->_options.maximumSimultaneousTileLoads);

if (std::holds_alternative<Tile*>(work.workRef)) {
Tile* pTile = std::get<Tile*>(work.workRef);
assert(pTile);
// TODO, how to figure in raster tiles, getNumberOfThrottledTilesLoading?
// Currently raster tile work dispatches, but doesn't take a slot
int32_t availableSlots = maximumSimultaneousTileLoads - numberOfTilesLoading;
assert(availableSlots >= 0);
if (availableSlots == 0)
return;

if (_pTilesetContentManager->getNumberOfTilesLoading() >= maxTileLoads)
continue;
this->_pTilesetContentManager->doTileContentWork(
*pTile,
work.projections,
_options);
} else {
RasterMappedTo3DTile* pRasterTile =
std::get<RasterMappedTo3DTile*>(work.workRef);
assert(pRasterTile);
// Add completed request work
_pRequestDispatcher->TakeCompletedWork(availableSlots, workToDispatch);
availableSlots -= (int32_t)workToDispatch.size();
assert(availableSlots >= 0);

// Add processing work
if (availableSlots > 0) {
std::sort(newProcessingWork.begin(), newProcessingWork.end());
int countToAdd =
std::min((int32_t)newProcessingWork.size(), availableSlots);
workToDispatch.insert(
workToDispatch.end(),
newProcessingWork.begin(),
newProcessingWork.begin() + countToAdd);
}

RasterOverlayTile* pLoading = pRasterTile->getLoadingTile();
if (!pLoading)
continue;
// Dispatch it
dispatchProcessingWork(workToDispatch);

/*
THIS CODE NEEDS TO BE PUT BACK
RasterOverlayTileProvider& provider = pLoading->getTileProvider();
if (provider.getNumberOfThrottledTilesLoading() >= maxTileLoads)
continue;
pRasterTile->loadThrottled();
}
}

/*
THIS CODE NEEDS TO BE PUT BACK
// Finalize the parent if necessary, otherwise it may never reach the
// Done state. Also double check that we have render content in ensure
// we don't assert / crash in finishLoading. The latter will only ever
Expand All @@ -1495,28 +1494,8 @@ void Tileset::_processWorkerThreadLoadQueue() {
finishLoading(*pParentTile, tilesetOptions);
}
*/

#else
int32_t maximumSimultaneousTileLoads =
static_cast<int32_t>(this->_options.maximumSimultaneousTileLoads);
if (this->_pTilesetContentManager->getNumberOfTilesLoading() >=
maximumSimultaneousTileLoads) {
return;
}
std::vector<TileLoadTask>& queue = this->_workerThreadLoadQueue;
std::sort(queue.begin(), queue.end());
for (TileLoadTask& task : queue) {
this->_pTilesetContentManager->loadTileContent(*task.pTile, _options);
if (this->_pTilesetContentManager->getNumberOfTilesLoading() >=
maximumSimultaneousTileLoads) {
break;
}
}
#endif
}

void Tileset::_processMainThreadLoadQueue() {
CESIUM_TRACE("Tileset::_processMainThreadLoadQueue");
// Process deferred main-thread load tasks with a time budget.
Expand Down Expand Up @@ -1645,8 +1624,9 @@ Tileset::TraversalDetails Tileset::createTraversalDetailsForSingleTile(
}

void Tileset::discoverLoadWork(
std::vector<TileLoadRequest> requests,
std::vector<TileLoadWork>& out) {
std::vector<TileLoadRequest>& requests,
std::vector<TileLoadWork>& outRequests,
std::vector<TileLoadWork>& outProcessing) {
for (TileLoadRequest& loadRequest : requests) {
std::vector<TilesetContentManager::ParsedTileWork> parsedTileWork;
this->_pTilesetContentManager->parseTileWork(
Expand All @@ -1669,7 +1649,11 @@ void Tileset::discoverLoadWork(
work.projections,
loadRequest.group,
0};
out.push_back(newWorkUnit);

if (work.requestUrl.empty())
outProcessing.push_back(newWorkUnit);
else
outRequests.push_back(newWorkUnit);
}

// Add the last task at same as input priority
Expand All @@ -1683,7 +1667,10 @@ void Tileset::discoverLoadWork(
loadRequest.group,
loadRequest.priority};

out.push_back(newWorkUnit);
if (lastWork.requestUrl.empty())
outProcessing.push_back(newWorkUnit);
else
outRequests.push_back(newWorkUnit);
}
}

Expand All @@ -1702,19 +1689,48 @@ void Tileset::addLoadWorkToRequestDispatcher(
assert(pTile);

// Mark this tile as loading now so it doesn't get queued next frame
// TODO, what about raster tiles?
pTile->setState(TileLoadState::ContentLoading);

workToAdd.push_back(work);
}

_pRequestDispatcher->QueueLoadWork(workToAdd);
_pRequestDispatcher->QueueRequestWork(
workToAdd,
this->_pTilesetContentManager->getRequestHeaders());
}

void Tileset::dispatchProcessingWork(std::vector<TileLoadWork>& workVector) {
for (TileLoadWork& work : workVector) {
if (std::holds_alternative<Tile*>(work.workRef)) {
Tile* pTile = std::get<Tile*>(work.workRef);
assert(pTile);

this->_pTilesetContentManager->doTileContentWork(
*pTile,
work.projections,
_options);
} else {
RasterMappedTo3DTile* pRasterTile =
std::get<RasterMappedTo3DTile*>(work.workRef);
assert(pRasterTile);

RasterOverlayTile* pLoading = pRasterTile->getLoadingTile();
if (!pLoading)
continue;

pRasterTile->loadThrottled();
}
}
}

void RequestDispatcher::QueueLoadWork(std::vector<TileLoadWork>& work) {
void RequestDispatcher::QueueRequestWork(
std::vector<TileLoadWork>& work,
std::vector<CesiumAsync::IAssetAccessor::THeader>& requestHeaders) {
// TODO, assert tile is not already loading? or already post-processing?
std::lock_guard<std::mutex> lock(_requestsLock);
_queuedRequests.insert(_queuedRequests.end(), work.begin(), work.end());

_requestHeaders = requestHeaders;
}

void RequestDispatcher::dispatchRequest(TileLoadWork& request) {
Expand Down Expand Up @@ -1792,6 +1808,26 @@ bool RequestDispatcher::stageRequestWork(
return _queuedRequests.empty();
}

void RequestDispatcher::TakeCompletedWork(
size_t maxCount,
std::vector<TileLoadWork>& out) {
std::lock_guard<std::mutex> lock(_requestsLock);
size_t count = _doneRequests.size();
if (count == 0)
return;

// Populate our output
size_t numberToTake = std::min(count, maxCount);
out = std::vector<TileLoadWork>(
_doneRequests.begin(),
_doneRequests.begin() + numberToTake);

// Remove these entries from the source
_doneRequests = std::vector<TileLoadWork>(
_doneRequests.begin() + numberToTake,
_doneRequests.end());
}

void RequestDispatcher::WakeIfNeeded() {
{
std::lock_guard<std::mutex> lock(_requestsLock);
Expand Down

0 comments on commit 08e21b3

Please sign in to comment.