Skip to content

Commit

Permalink
starting to fix resourcing
Browse files Browse the repository at this point in the history
  • Loading branch information
drewkerrigan committed Mar 21, 2016
1 parent 0d83351 commit b6ccf4b
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 18 deletions.
File renamed without changes.
File renamed without changes.
File renamed without changes.
39 changes: 27 additions & 12 deletions src/rms_node_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -250,17 +250,37 @@ apply_reserved_offer(NodeKey, OfferHelper,
Hostname = get_node_hostname(NodeKey),
AgentIdValue = get_node_agent_id_value(NodeKey),

%% Apply reserved resources.
%% Apply reserved resources for task.
OfferHelper1 =
rms_offer_helper:apply_reserved_resources(
NodeCpus, NodeMem, NodeDisk, undefined, Role,
Principal, PersistenceId, ContainerPath,
OfferHelper),
%% Apply unreserved resources.
%% Apply unreserved resources for task.
OfferHelper2 =
rms_offer_helper:apply_unreserved_resources(
undefined, undefined, undefined, NodeNumPorts,
OfferHelper1),
%% Grab Task resources from offer helper in current state.
TaskInfoReservedResources =
rms_offer_helper:get_reserved_applied_resources(OfferHelper2),
TaskInfoUnreservedResources =
rms_offer_helper:get_unreserved_applied_resources(OfferHelper2),
TaskInfoResources = TaskInfoReservedResources ++ TaskInfoUnreservedResources,

%% Apply Executor Resources against original offer helper.
OfferHelperExec =
rms_offer_helper:apply_unreserved_resources(
?CPUS_PER_EXECUTOR, ?MEM_PER_EXECUTOR, undefined, undefined,
OfferHelper),
%% Grab Executor resources from exec offer helper in current state.
ExecutorInfoResources =
rms_offer_helper:get_unreserved_applied_resources(OfferHelperExec),
%% Apply Executor Resources against the real offer helper.
OfferHelper3 =
rms_offer_helper:apply_unreserved_resources(
?CPUS_PER_EXECUTOR, ?MEM_PER_EXECUTOR, undefined, undefined,
OfferHelper2),

AgentId = erl_mesos_utils:agent_id(AgentIdValue),

Expand All @@ -277,10 +297,10 @@ apply_reserved_offer(NodeKey, OfferHelper,

TaskId = erl_mesos_utils:task_id(NodeKey),

ReservedPorts =
rms_offer_helper:get_unreserved_resources_ports(OfferHelper),
TaskDataPorts =
rms_offer_helper:get_applied_unreserved_resources_ports(OfferHelper3),

[HTTPPort, PBPort, DisterlPort | _Ports] = ReservedPorts,
[HTTPPort, PBPort, DisterlPort | _Ports] = TaskDataPorts,

NodeName = iolist_to_binary([NodeKey, "@", Hostname]),
TaskData = [{<<"FullyQualifiedNodeName">>, NodeName},
Expand All @@ -300,22 +320,17 @@ apply_reserved_offer(NodeKey, OfferHelper,
Source = "riak", %% FIXME
ExecutorInfo =
erl_mesos_utils:executor_info(ExecutorId, CommandInfo,
undefined, undefined,
ExecutorInfoResources, undefined, %% FrameworkID
Source),


TaskName = "riak",
%% FIXME Is this the appropriate place for ReservedResources?
ReservedResources = rms_offer_helper:get_resources_to_reserve(OfferHelper2),
UnreservedResources = rms_offer_helper:get_resources_to_unreserve(OfferHelper2),
TaskInfoResources = ReservedResources ++ UnreservedResources,
TaskInfo =
erl_mesos_utils:task_info(TaskName, TaskId, AgentId,
TaskInfoResources,
ExecutorInfo, undefined,
TaskDataBin),
{ok, rms_offer_helper:add_task_to_launch(TaskInfo,
OfferHelper)};
OfferHelper3)};
false ->
{error, not_enough_resources}
end;
Expand Down
41 changes: 35 additions & 6 deletions src/rms_offer_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
get_unreserved_resources_mem/1,
get_unreserved_resources_disk/1,
get_unreserved_resources_ports/1,
get_applied_unreserved_resources_ports/1,
get_resources_to_reserve/1,
get_resources_to_unreserve/1,
get_volumes_to_create/1,
Expand All @@ -33,6 +34,8 @@
make_volume/6,
apply_reserved_resources/9,
apply_unreserved_resources/5,
get_reserved_applied_resources/1,
get_unreserved_applied_resources/1,
can_fit_reserved/5,
can_fit_unreserved/5,
has_persistence_id/2,
Expand All @@ -44,6 +47,8 @@

-record(offer_helper, {offer :: erl_mesos:'Offer'(),
persistence_ids = [] :: [string()],
applied_reserved_resources = [] :: erl_mesos_utils:resources(),
applied_unreserved_resources = [] :: erl_mesos_utils:resources(),
reserved_resources :: erl_mesos_utils:resources(),
unreserved_resources :: erl_mesos_utils:resources(),
resources_to_reserve = [] :: [erl_mesos:'Resource'()],
Expand Down Expand Up @@ -144,6 +149,10 @@ get_unreserved_resources_disk(OfferHelper) ->
get_unreserved_resources_ports(OfferHelper) ->
erl_mesos_utils:resources_ports(get_unreserved_resources(OfferHelper)).

-spec get_applied_unreserved_resources_ports(offer_helper()) -> [non_neg_integer()].
get_applied_unreserved_resources_ports(OfferHelper) ->
erl_mesos_utils:resources_ports(get_unreserved_applied_resources(OfferHelper)).

-spec get_resources_to_reserve(offer_helper()) -> [erl_mesos:'Resource'()].
get_resources_to_reserve(#offer_helper{resources_to_reserve =
ResourcesToReserve}) ->
Expand Down Expand Up @@ -219,25 +228,45 @@ make_volume(Disk, Role, Principal, PersistenceId, ContainerPath,
offer_helper().
apply_reserved_resources(Cpus, Mem, Disk, NumPorts, Role, Principal,
PersistenceId, ContainerPath,
#offer_helper{reserved_resources = ReservedResources} =
#offer_helper{reserved_resources = ReservedResources,
applied_reserved_resources =
AppliedReservedResources} =
OfferHelper) ->
{ReservedResources1, _} =
{ReservedResources1, AppliedReservedResources1} =
apply(Cpus, Mem, Disk, NumPorts, Role, Principal, PersistenceId,
ContainerPath, ReservedResources),
OfferHelper#offer_helper{reserved_resources = ReservedResources1}.
OfferHelper#offer_helper{reserved_resources = ReservedResources1,
applied_reserved_resources =
AppliedReservedResources ++ AppliedReservedResources1}.

-spec apply_unreserved_resources(undefined | float(), undefined | float(),
undefined | float(), undefined | pos_integer(),
offer_helper()) ->
offer_helper().
apply_unreserved_resources(Cpus, Mem, Disk, NumPorts,
#offer_helper{unreserved_resources =
UnreservedResources} =
UnreservedResources,
applied_unreserved_resources =
AppliedUnreservedResources} =
OfferHelper) ->
{UnreservedResources1, _} =
{UnreservedResources1, AppliedUnreservedResources1} =
apply(Cpus, Mem, Disk, NumPorts, undefined, undefined, undefined,
undefined, UnreservedResources),
OfferHelper#offer_helper{unreserved_resources = UnreservedResources1}.
OfferHelper#offer_helper{unreserved_resources = UnreservedResources1,
applied_unreserved_resources =
AppliedUnreservedResources ++ AppliedUnreservedResources1}.

-spec get_reserved_applied_resources(offer_helper()) ->
erl_mesos_utils:resources().
get_reserved_applied_resources(#offer_helper{applied_reserved_resources =
AppliedReservedResources}) ->
AppliedReservedResources.

-spec get_unreserved_applied_resources(offer_helper()) ->
erl_mesos_utils:resources().
get_unreserved_applied_resources(#offer_helper{applied_unreserved_resources =
AppliedUnreservedResources}) ->
AppliedUnreservedResources.

-spec can_fit_reserved(undefined | float(), undefined | float(),
undefined | float(), undefined | pos_integer(),
Expand Down

0 comments on commit b6ccf4b

Please sign in to comment.