Source code for pomdp_problems.light_dark.domain.action
-"""Defines the Action for the continuous light-dark domain;
-
-Origin: Belief space planning assuming maximum likelihood observations
-
-Action space:
-
- :math:`U\subseteq\mathbb{R}^2`. Quote from the paper: "The robot is
- modeled as a first-order system such that the robot velocity is determined
- by the control actions, :math:`u\in\mathbb{R}^2`.
-"""
-importpomdp_py
-
-
[docs]classAction(pomdp_py.Action):
- """The action is a vector of velocities"""
- def__init__(self,control):
- """
- Initializes a state in light dark domain.
-
- Args:
- control (tuple): velocity
- """
- iflen(control)!=2:
- raiseValueError("Action control must be a vector of length 2")
- self.control=control
-
- def__hash__(self):
- returnhash(self.control)
-
- def__eq__(self,other):
- ifisinstance(other,Action):
- returnself.control==other.control
- else:
- returnFalse
-
- def__str__(self):
- returnself.__repr__()
-
- def__repr__(self):
- return"Action(%s)"%(str(self.control))
Source code for pomdp_problems.light_dark.domain.observation
-importpomdp_py
-
-
[docs]classObservation(pomdp_py.Observation):
- """Defines the Observation for the continuous light-dark domain;
-
- Observation space:
-
- :math:`\Omega\subseteq\mathbb{R}^2` the observation of the robot is
- an estimate of the robot position :math:`g(x_t)\in\Omega`.
-
- """
- # the number of decimals to round up an observation when it is discrete.
- PRECISION=2
-
- def__init__(self,position,discrete=False):
- """
- Initializes a observation in light dark domain.
-
- Args:
- position (tuple): position of the robot.
- """
- self._discrete=discrete
- iflen(position)!=2:
- raiseValueError("Observation position must be a vector of length 2")
- ifself._discrete:
- self.position=position
- else:
- self.position=(round(position[0],Observation.PRECISION),
- round(position[1],Observation.PRECISION))
-
-
Source code for pomdp_problems.light_dark.domain.state
-"""Defines the State for the continuous light-dark domain;
-
-Origin: Belief space planning assuming maximum likelihood observations
-
-State space:
-
- :math:`X\subseteq\mathbb{R}^2` the state of the robot
-"""
-importpomdp_py
-importnumpyasnp
-
-
[docs]classState(pomdp_py.State):
- """The state of the problem is just the robot position"""
- def__init__(self,position):
- """
- Initializes a state in light dark domain.
-
- Args:
- position (tuple): position of the robot.
- """
- iflen(position)!=2:
- raiseValueError("State position must be a vector of length 2")
- self.position=position
-
- def__hash__(self):
- returnhash(self.position)
-
- def__eq__(self,other):
- ifisinstance(other,State):
- returnself.position==other.position
- else:
- returnFalse
-
- def__str__(self):
- returnself.__repr__()
-
- def__repr__(self):
- return"State(%s)"%(str(self.position))
Source code for pomdp_problems.light_dark.env.visual
-"""Plot the light dark environment"""
-importmatplotlib.pyplotasplt
-frommatplotlib.collectionsimportPolyCollection
-importpomdp_problems.light_darkasld
-frompomdp_py.utilsimportplotting,colors
-frompomdp_py.utils.miscimportremap
-
-
[docs]classLightDarkViz:
- """This class deals with visualizing a light dark domain"""
-
- def__init__(self,env,x_range,y_range,res):
- """
- Args:
- env (LightDarkEnvironment): Environment for light dark domain.
- x_range (tuple): a tuple of floats (x_min, x_max).
- y_range (tuple): a tuple of floats (y_min, y_max).
- res (float): specifies the size of each rectangular strip to draw;
- As in the paper, the light is at a location on the x axis.
- """
- self._env=env
- self._res=res
- self._x_range=x_range
- self._y_range=y_range
- fig=plt.gcf()
- self._ax=fig.add_subplot(1,1,1)
- self._goal_pos=None
- self._m_0=None# initial belief pose
-
- # For tracking the path; list of robot position tuples
- self._log_paths={}
-
-
Source code for pomdp_problems.light_dark.models.observation_model
-"""Defines the ObservationModel for the continuous light-dark domain;
-
-Origin: Belief space planning assuming maximum likelihood observations
-
-Quote from the paper:
-
- The observation function is identity, :math:`g(x_t) = x_t+\omega`,
- with zero-mean Gaussian observation noise a function of state,
- \omega\sim\mathcal{N}(\cdot | 0, w(x))` where
-
- :math:`w(x) = \frac{1}{2}(5-s_x)^2 + \text{const}`
-
- (Notational change; using :math:`s_x` to refer to first element of
- state (i.e. robot position). The number 5 indicates the x-coordinate
- of the light bar as shown in the figure (Fig.1 of the paper).
-"""
-importpomdp_py
-importcopy
-importnumpyasnp
-from..domain.observationimport*
-
-
[docs]classObservationModel(pomdp_py.ObservationModel):
-
- def__init__(self,light,const):
- """
- `light` and `const` are parameters in
- :math:`w(x) = \frac{1}{2}(\text{light}-s_x)^2 + \text{const}`
-
- They should both be floats. The quantity :math:`w(x)` will
- be used as the variance of the covariance matrix in the gaussian
- distribution (this is how I understood the paper).
- """
- self._light=light
- self._const=const
-
- def_compute_variance(self,pos):
- return0.5*(self._light-pos[0])**2+self._const
-
-
[docs]defprobability(self,observation,next_state,action):
- """
- The observation is :math:`g(x_t) = x_t+\omega`. So
- the probability of this observation is the probability
- of :math:`\omega` which follows the Gaussian distribution.
- """
- ifself._discrete:
- observation=observation.discretize()
- variance=self._compute_variance(next_state.position)
- gaussian_noise=pomdp_py.Gaussian([0,0],
- [[variance,0],
- [0,variance]])
- omega=(observation.position[0]-next_state.position[0],
- observation.position[1]-next_state.position[1])
- returngaussian_noise[omega]
-
-
[docs]defsample(self,next_state,action,argmax=False):
- """sample an observation."""
- # Sample a position shift according to the gaussian noise.
- obs_pos=self.func(next_state.position,mpe=argmax)
- returnObservation(tuple(obs_pos))
Source code for pomdp_problems.light_dark.models.transition_model
-"""Defines the TransitionModel for the continuous light-dark domain;
-
-Origin: Belief space planning assuming maximum likelihood observations
-
-Quote from the paper:
-
- The underlying system dynamics are linear with zero process noise,
- :math:`f(x_t,u_t)=x_t+u`. This means the transition dynamics is
- deterministic.
-"""
-importpomdp_py
-importcopy
-importnumpyasnp
-
-
-
[docs]classTransitionModel(pomdp_py.TransitionModel):
- """
- The underlying deterministic system dynamics
- """
- def__init__(self,epsilon=1e-9):
- self._epsilon=epsilon
-
-
[docs]defargmax(self,state,action):
- """Returns the most likely next state"""
- returnself.sample(state,action)
-
-
[docs]deffunc(self):
- """Returns the function of the underlying system dynamics.
- The function is: (xt, ut) -> xt+1 where xt, ut, xt+1 are
- all numpy arrays."""
- deff(xt,ut):
- returnnp.array([xt[0]+ut[0],
- xt[1]+ut[1]])
- returnf
-
-
[docs]defjac_dx(self):
- """Returns the function of the jacobian of the system dynamics
- function with respect to the state vector mt: (mt, ut) -> At"""
- defdfdx(mt,ut):
- # The result of computing the jacobian by hand
- returnnp.array([[ut[0],mt[1]+ut[1]],
- [mt[0]+ut[0],ut[1]]])
- returndfdx
-
-
[docs]defjac_du(self):
- """Returns the function of the jacobian of the system dynamics
- function with respect to the state vector mt: (mt, ut) -> Bt"""
- defdfdu(mt,ut):
- # The result of computing the jacobian by hand
- returnnp.array([[mt[0],mt[1]+ut[1]],
- [mt[0]+ut[0],mt[1]]])
- returndfdu
-
-
[docs]deffunc_noise(self,var_sysd=1e-9):
- """Returns a function that returns a state-dependent Gaussian noise."""
- deffn(mt):
- gaussian_noise=pomdp_py.Gaussian([0,0],
- [[var_sysd,0],
- [0,var_sysd]])
- returngaussian_noise
- returnfn
Source code for pomdp_problems.load_unload.load_unload
-"""The load unload problem. An agent is placed on a one dimensional grid world
-and is tasked with loading itself up on the right side of the world and
-unloading on the left. The agent can observe whether or not it is in the load or
-unload block but can not tell its exact location of whether it is loaded or
-unloaded. Therefore the agent must maintain belief about it's location and load
-status.
-
-States are defined by the location of the agent and whether or not it is loaded
-Actions: "move-left", "move-right"
-Rewards:
- +100 for moving into the unload block while loaded
- -1 otherwise
-
-"""
-
-importpomdp_py
-importrandom
-importnumpyasnp
-importsys
-importcopy
-importmatplotlib.pyplotasplt
-frommatplotlib.animationimportFuncAnimation
-
-
-EPSILON=1e-3
-LOAD_LOCATION=10
-
-
[docs]classLUState(pomdp_py.State):
- def__init__(self,x,loaded):
- iftype(x)!=intorx<0:
- raiseValueError("Invalid state: {}\n".format((x,loaded))+
- "x must be an integer > 0")
- iftype(loaded)!=bool:
- raiseValueError("Invalid state: {}\n".format((x,loaded))+
- "loaded must be a boolean")
- ifx==0andloaded==True:
- raiseValueError("Agent can not be loaded in the 0th position")
- ifx==LOAD_LOCATIONandloaded==False:
- raiseValueError("Agent can not be unloaded in the last position")
-
- self.x=x
- self.loaded=loaded
- def__hash__(self):
- returnhash((self.x,self.loaded))
- def__eq__(self,other):
- ifisinstance(other,LUState):
- returnself.x==other.xandself.loaded==self.loaded
- eliftype(other)==tuple:
- returnself.x==other[0]andself.loaded==other[1]
- def__str__(self):
- returnstr((self.x,self.loaded))
- def__repr__(self):
- return"State({})".format(self)
[docs]classLUObservationModel(pomdp_py.ObservationModel):
- """This problem is small enough for the probabilities to be directly given
- externally"""
-
[docs]defprobability(self,observation,next_state,action,normalized=False,**kwargs):
- ifobservation!=self.sample(next_state,action):
- # return EPSILON to avoid degradation of particles
- returnEPSILON
- else:
- return1-EPSILON
[docs]defsample(self,state,action,normalized=False,**kwargs):
- if((state.x==LOAD_LOCATIONandaction=="move-right")or
- (state.x==0andaction=="move-left")):
- # trying to make invalid move, stay in the same place
- returnstate
-
- ifaction=="move-right":
- # make sure we're always loaded in the far right cell
- ifstate.x==LOAD_LOCATION-1:
- returnLUState(state.x+1,True)
- returnLUState(state.x+1,state.loaded)
-
- ifaction=="move-left":
- # make sure we're always unloaded in the first cell
- ifstate.x==1:
- returnLUState(state.x-1,False)
- returnLUState(state.x-1,state.loaded)
-
-
[docs]defargmax(self,state,action,normalized=False,**kwargs):
- """Returns the most likely next state"""
- returnself.sample(state,action)
[docs]defsample(self,state,action,next_state,normalized=False,**kwargs):
- # if we are unloaded things, give reward 100, otherwise give -1
- ifaction=="move-left"andstate.loaded==Trueandstate.x==1:
- return100
- else:
- return-1
-
-
[docs]defargmax(self,state,action,next_state,normalized=False,**kwargs):
- """Returns the most likely reward"""
- returnself.sample(state,action)
-
-# Policy Model
-
[docs]classLUPolicyModel(pomdp_py.RandomRollout):
- """This is an extremely dumb policy model; To keep consistent
- with the framework."""
- def__init__(self):
- self._all_actions={LUAction("move-right"),LUAction("move-left")}
-
-
[docs]defprobability(self,action,state,normalized=False,**kwargs):
- raiseNotImplementedError# Never used
Source code for pomdp_problems.multi_object_search.agent.agent
-# Defines the agent. There's nothing special
-# about the MOS agent in fact, except that
-# it uses models defined in ..models, and
-# makes use of the belief initialization
-# functions in belief.py
-importpomdp_py
-from.beliefimport*
-from..models.transition_modelimport*
-from..models.observation_modelimport*
-from..models.reward_modelimport*
-from..models.policy_modelimport*
-
-
[docs]classMosAgent(pomdp_py.Agent):
- """One agent is one robot."""
- def__init__(self,
- robot_id,
- init_robot_state,# initial robot state (assuming robot state is observable perfectly)
- object_ids,# target object ids
- dim,# tuple (w,l) of the width (w) and length (l) of the gridworld search space.
- sensor,# Sensor equipped on the robot
- sigma=0.01,# parameter for observation model
- epsilon=1,# parameter for observation model
- belief_rep="histogram",# belief representation, either "histogram" or "particles".
- prior={},# prior belief, as defined in belief.py:initialize_belief
- num_particles=100,# used if the belief representation is particles
- grid_map=None):# GridMap used to avoid collision with obstacles (None if not provided)
- self.robot_id=robot_id
- self._object_ids=object_ids
- self.sensor=sensor
-
- # since the robot observes its own pose perfectly, it will have 100% prior
- # on this pose.
- prior[robot_id]={init_robot_state.pose:1.0}
- rth=init_robot_state.pose[2]
-
- # initialize belief
- init_belief=initialize_belief(dim,
- self.robot_id,
- self._object_ids,
- prior=prior,
- representation=belief_rep,
- robot_orientations={self.robot_id:rth},
- num_particles=num_particles)
- transition_model=MosTransitionModel(dim,
- {self.robot_id:self.sensor},
- self._object_ids)
- observation_model=MosObservationModel(dim,
- self.sensor,
- self._object_ids,
- sigma=sigma,
- epsilon=epsilon)
- reward_model=GoalRewardModel(self._object_ids,robot_id=self.robot_id)
- policy_model=PolicyModel(self.robot_id,grid_map=grid_map)
- super().__init__(init_belief,policy_model,
- transition_model=transition_model,
- observation_model=observation_model,
- reward_model=reward_model)
-
-
Source code for pomdp_problems.multi_object_search.env.visual
-# Visualization of a MOS instance using pygame
-#
-# Note to run this file, you need to run the following
-# in the parent directory of multi_object_search:
-#
-# python -m multi_object_search.env.visual
-#
-
-importpygame
-importcv2
-importmath
-importnumpyasnp
-importrandom
-importpomdp_py.utilsasutil
-frompomdp_problems.multi_object_search.env.envimport*
-frompomdp_problems.multi_object_search.domain.observationimport*
-frompomdp_problems.multi_object_search.domain.actionimport*
-frompomdp_problems.multi_object_search.domain.stateimport*
-frompomdp_problems.multi_object_search.example_worldsimport*
-
-# Deterministic way to get object color
-
[docs]defupdate(self,robot_id,action,observation,viz_observation,belief):
- """
- Update the visualization after there is new real action and observation
- and updated belief.
-
- Args:
- observation (MosOOObservation): Real observation
- viz_observation (MosOOObservation): An observation used to visualize
- the sensing region.
- """
- self._last_action[robot_id]=action
- self._last_observation[robot_id]=observation
- self._last_viz_observation[robot_id]=viz_observation
- self._last_belief[robot_id]=belief
[docs]defrender_env(self,display_surf):
- # draw robot, a circle and a vector
- img=np.copy(self._img)
- fori,robot_idinenumerate(self._env.robot_ids):
- rx,ry,rth=self._env.state.pose(robot_id)
- r=self._res# Not radius!
- last_observation=self._last_observation.get(robot_id,None)
- last_viz_observation=self._last_viz_observation.get(robot_id,None)
- last_belief=self._last_belief.get(robot_id,None)
- iflast_beliefisnotNone:
- MosViz.draw_belief(img,last_belief,r,r//3,self._target_colors)
- iflast_viz_observationisnotNone:
- MosViz.draw_observation(img,last_viz_observation,
- rx,ry,rth,r,r//4,color=(200,200,12))
- iflast_observationisnotNone:
- MosViz.draw_observation(img,last_observation,
- rx,ry,rth,r,r//8,color=(20,20,180))
-
- MosViz.draw_robot(img,rx*r,ry*r,rth,r,color=(12,255*(0.8*(i+1)),12))
- pygame.surfarray.blit_array(display_surf,img)
-
-
[docs]defunittest():
- # If you don't want occlusion, use this:
- laserstr=make_laser_sensor(90,(1,8),0.5,False)
- # If you want occlusion, use this
- # (the difference is mainly in angle_increment; this
- # is due to the discretization - discretization may
- # cause "strange" behavior when checking occlusion
- # but the model is actually doing the right thing.)
- laserstr_occ=make_laser_sensor(360,(1,8),0.5,True)
- # Proximity sensor
- proxstr=make_proximity_sensor(1.5,False)
- proxstr_occ=make_proximity_sensor(1.5,True)
-
- worldmap,robot=world1
- worldstr=equip_sensors(worldmap,{robot:laserstr})
-
- dim,robots,objects,obstacles,sensors=interpret(worldstr)
- init_state=MosOOState({**objects,**robots})
- env=MosEnvironment(dim,
- init_state,sensors,
- obstacles=obstacles)
- viz=MosViz(env,controllable=True)
- viz.on_execute()
Source code for pomdp_problems.multi_object_search.problem
-"""2D Multi-Object Search (MOS) Task.
-Uses the domain, models, and agent/environment
-to actually define the POMDP problem for multi-object search.
-Then, solve it using POUCT or POMCP."""
-importpomdp_py
-frompomdp_problems.multi_object_search.env.envimport*
-frompomdp_problems.multi_object_search.env.visualimport*
-frompomdp_problems.multi_object_search.agent.agentimport*
-frompomdp_problems.multi_object_search.example_worldsimport*
-frompomdp_problems.multi_object_search.domain.observationimport*
-frompomdp_problems.multi_object_search.models.components.grid_mapimport*
-importargparse
-importtime
-importrandom
-
-
[docs]classMosOOPOMDP(pomdp_py.OOPOMDP):
- """
- A MosOOPOMDP is instantiated given a string description
- of the search world, sensor descriptions for robots,
- and the necessary parameters for the agent's models.
-
- Note: This is of course a simulation, where you can
- generate a world and know where the target objects are
- and then construct the Environment object. But in the
- real robot scenario, you don't know where the objects
- are. In that case, as I have done it in the past, you
- could construct an Environment object and give None to
- the object poses.
- """
- def__init__(self,robot_id,env=None,grid_map=None,
- sensors=None,sigma=0.01,epsilon=1,
- belief_rep="histogram",prior={},num_particles=100,
- agent_has_map=False):
- """
- Args:
- robot_id (int or str): the id of the agent that will solve this MosOOPOMDP.
- If it is a `str`, it will be interpreted as an integer using `interpret_robot_id`
- in env/env.py.
- env (MosEnvironment): the environment.
- grid_map (str): Search space description. See env/env.py:interpret. An example:
- rx...
- .x.xT
- .....
- Ignored if env is not None
- sensors (dict): map from robot character to sensor string.
- For example: {'r': 'laser fov=90 min_range=1 max_range=5
- angle_increment=5'}
- Ignored if env is not None
- agent_has_map (bool): If True, we assume the agent is given the occupancy
- grid map of the world. Then, the agent can use this
- map to avoid planning invalid actions (bumping into things).
- But this map does not help the agent's prior belief directly.
-
- sigma, epsilon: observation model paramters
- belief_rep (str): belief representation. Either histogram or particles.
- prior (dict or str): either a dictionary as defined in agent/belief.py
- or a string, either "uniform" or "informed". For "uniform", a uniform
- prior will be given. For "informed", a perfect prior will be given.
- num_particles (int): setting for the particle belief representation
- """
- ifenvisNone:
- assertgrid_mapisnotNoneandsensorsisnotNone,\
- "Since env is not provided, you must provide string descriptions"\
- "of the world and sensors."
- worldstr=equip_sensors(grid_map,sensors)
- dim,robots,objects,obstacles,sensors=interpret(worldstr)
- init_state=MosOOState({**objects,**robots})
- env=MosEnvironment(dim,
- init_state,sensors,
- obstacles=obstacles)
-
- # construct prior
- iftype(prior)==str:
- ifprior=="uniform":
- prior={}
- elifprior=="informed":
- prior={}
- forobjidinenv.target_objects:
- groundtruth_pose=env.state.pose(objid)
- prior[objid]={groundtruth_pose:1.0}
-
- # Potential extension: a multi-agent POMDP. For now, the environment
- # can keep track of the states of multiple agents, but a POMDP is still
- # only defined over a single agent. Perhaps, MultiAgent is just a kind
- # of Agent, which will make the implementation of multi-agent POMDP cleaner.
- robot_id=robot_idiftype(robot_id)==intelseinterpret_robot_id(robot_id)
- grid_map=GridMap(env.width,env.length,
- {objid:env.state.pose(objid)
- forobjidinenv.obstacles})ifagent_has_mapelseNone
- agent=MosAgent(robot_id,
- env.state.object_states[robot_id],
- env.target_objects,
- (env.width,env.length),
- env.sensors[robot_id],
- sigma=sigma,
- epsilon=epsilon,
- belief_rep=belief_rep,
- prior=prior,
- num_particles=num_particles,
- grid_map=grid_map)
- super().__init__(agent,env,
- name="MOS(%d,%d,%d)"%(env.width,env.length,len(env.target_objects)))
-
-
-### Belief Update ###
-
[docs]defbelief_update(agent,real_action,real_observation,next_robot_state,planner):
- """Updates the agent's belief; The belief update may happen
- through planner update (e.g. when planner is POMCP)."""
- # Updates the planner; In case of POMCP, agent's belief is also updated.
- planner.update(agent,real_action,real_observation)
-
- # Update agent's belief, when planner is not POMCP
- ifnotisinstance(planner,pomdp_py.POMCP):
- # Update belief for every object
- forobjidinagent.cur_belief.object_beliefs:
- belief_obj=agent.cur_belief.object_belief(objid)
- ifisinstance(belief_obj,pomdp_py.Histogram):
- ifobjid==agent.robot_id:
- # Assuming the agent can observe its own state:
- new_belief=pomdp_py.Histogram({next_robot_state:1.0})
- else:
- # This is doing
- # B(si') = normalizer * O(oi|si',sr',a) * sum_s T(si'|s,a)*B(si)
- #
- # Notes: First, objects are static; Second,
- # O(oi|s',a) ~= O(oi|si',sr',a) according to the definition
- # of the observation model in models/observation.py. Note
- # that the exact belief update rule for this OOPOMDP needs to use
- # a model like O(oi|si',sr',a) because it's intractable to
- # consider s' (that means all combinations of all object
- # states must be iterated). Of course, there could be work
- # around (out of scope) - Consider a volumetric observaiton,
- # instead of the object-pose observation. That means oi is a
- # set of pixels (2D) or voxels (3D). Note the real
- # observation, oi, is most likely sampled from O(oi|s',a)
- # because real world considers the occlusion between objects
- # (due to full state s'). The problem is how to compute the
- # probability of this oi given s' and a, where it's
- # intractable to obtain s'. To this end, we can make a
- # simplifying assumption that an object is contained within
- # one pixel (or voxel); The pixel (or voxel) is labeled to
- # indicate free space or object. The label of each pixel or
- # voxel is certainly a result of considering the full state
- # s. The occlusion can be handled nicely with the volumetric
- # observation definition. Then that assumption can reduce the
- # observation model from O(oi|s',a) to O(label_i|s',a) and
- # it becomes easy to define O(label_i=i|s',a) and O(label_i=FREE|s',a).
- # These ideas are used in my recent 3D object search work.
- new_belief=pomdp_py.update_histogram_belief(belief_obj,
- real_action,
- real_observation.for_obj(objid),
- agent.observation_model[objid],
- agent.transition_model[objid],
- # The agent knows the objects are static.
- static_transition=objid!=agent.robot_id,
- oargs={"next_robot_state":next_robot_state})
- else:
- raiseValueError("Unexpected program state."\
- "Are you using the appropriate belief representation?")
-
- agent.cur_belief.set_object_belief(objid,new_belief)
-
-
-### Solve the problem with POUCT/POMCP planner ###
-### This is the main online POMDP solver logic ###
-
[docs]defsolve(problem,
- max_depth=10,# planning horizon
- discount_factor=0.99,
- planning_time=1.,# amount of time (s) to plan each step
- exploration_const=1000,# exploration constant
- visualize=True,
- max_time=120,# maximum amount of time allowed to solve the problem
- max_steps=500):# maximum number of planning steps the agent can take.
- """
- This function terminates when:
- - maximum time (max_time) reached; This time includes planning and updates
- - agent has planned `max_steps` number of steps
- - agent has taken n FindAction(s) where n = number of target objects.
-
- Args:
- visualize (bool) if True, show the pygame visualization.
- """
-
- random_objid=random.sample(problem.env.target_objects,1)[0]
- random_object_belief=problem.agent.belief.object_beliefs[random_objid]
- ifisinstance(random_object_belief,pomdp_py.Histogram):
- # Use POUCT
- planner=pomdp_py.POUCT(max_depth=max_depth,
- discount_factor=discount_factor,
- planning_time=planning_time,
- exploration_const=exploration_const,
- rollout_policy=problem.agent.policy_model)# Random by default
- elifisinstance(random_object_belief,pomdp_py.Particles):
- # Use POMCP
- planner=pomdp_py.POMCP(max_depth=max_depth,
- discount_factor=discount_factor,
- planning_time=planning_time,
- exploration_const=exploration_const,
- rollout_policy=problem.agent.policy_model)# Random by default
- else:
- raiseValueError("Unsupported object belief type %s"%str(type(random_object_belief)))
-
- robot_id=problem.agent.robot_id
- ifvisualize:
- viz=MosViz(problem.env,controllable=False)# controllable=False means no keyboard control.
- ifviz.on_init()==False:
- raiseException("Environment failed to initialize")
- viz.update(robot_id,
- None,
- None,
- None,
- problem.agent.cur_belief)
- viz.on_render()
-
- _time_used=0
- _find_actions_count=0
- _total_reward=0# total, undiscounted reward
- foriinrange(max_steps):
- # Plan action
- _start=time.time()
- real_action=planner.plan(problem.agent)
- _time_used+=time.time()-_start
- if_time_used>max_time:
- break# no more time to update.
-
- # Execute action
- reward=problem.env.state_transition(real_action,execute=True,
- robot_id=robot_id)
-
- # Receive observation
- _start=time.time()
- real_observation= \
- problem.env.provide_observation(problem.agent.observation_model,real_action)
-
- # Updates
- problem.agent.clear_history()# truncate history
- problem.agent.update_history(real_action,real_observation)
- belief_update(problem.agent,real_action,real_observation,
- problem.env.state.object_states[robot_id],
- planner)
- _time_used+=time.time()-_start
-
- # Info and render
- _total_reward+=reward
- ifisinstance(real_action,FindAction):
- _find_actions_count+=1
- print("==== Step %d ===="%(i+1))
- print("Action: %s"%str(real_action))
- print("Observation: %s"%str(real_observation))
- print("Reward: %s"%str(reward))
- print("Reward (Cumulative): %s"%str(_total_reward))
- print("Find Actions Count: %d"%_find_actions_count)
- ifisinstance(planner,pomdp_py.POUCT):
- print("__num_sims__: %d"%planner.last_num_sims)
-
- ifvisualize:
- # This is used to show the sensing range; Not sampled
- # according to observation model.
- robot_pose=problem.env.state.object_states[robot_id].pose
- viz_observation=MosOOObservation({})
- ifisinstance(real_action,LookAction)orisinstance(real_action,FindAction):
- viz_observation= \
- problem.env.sensors[robot_id].observe(robot_pose,
- problem.env.state)
- viz.update(robot_id,
- real_action,
- real_observation,
- viz_observation,
- problem.agent.cur_belief)
- viz.on_loop()
- viz.on_render()
-
- # Termination check
- ifset(problem.env.state.object_states[robot_id].objects_found)\
- ==problem.env.target_objects:
- print("Done!")
- break
- if_find_actions_count>=len(problem.env.target_objects):
- print("FindAction limit reached.")
- break
- if_time_used>max_time:
- print("Maximum time reached.")
- break
-
-# Test
-
[docs]defunittest():
- # random world
- grid_map,robot_char=random_world(10,10,5,10)
- laserstr=make_laser_sensor(90,(1,4),0.5,False)
- proxstr=make_proximity_sensor(4,False)
- problem=MosOOPOMDP(robot_char,# r is the robot character
- sigma=0.05,# observation model parameter
- epsilon=0.95,# observation model parameter
- grid_map=grid_map,
- sensors={robot_char:proxstr},
- prior="uniform",
- agent_has_map=True)
- solve(problem,
- max_depth=10,
- discount_factor=0.99,
- planning_time=1.,
- exploration_const=1000,
- visualize=True,
- max_time=120,
- max_steps=500)
-"""Largely based on MosViz, except this is not an OO-POMDP"""
-importpygame
-importcv2
-importmath
-importnumpyasnp
-importrandom
-importpomdp_py.utilsasutil
-frompomdp_problems.tag.env.envimport*
-frompomdp_problems.tag.domain.observationimport*
-frompomdp_problems.tag.domain.actionimport*
-frompomdp_problems.tag.domain.stateimport*
-frompomdp_problems.tag.example_worldsimport*
-frompomdp_problems.tag.models.observation_modelimport*
-
-#### Visualization through pygame ####
-
[docs]defupdate(self,action,observation,belief):
- """
- Update the visualization after there is new real action and observation
- and updated belief.
- """
- self._last_action=action
- self._last_observation=observation
- self._last_belief=belief
[docs]classBLQR(pomdp_py.Planner):
- def__init__(self,
- func_sysd,func_obs,jac_sysd,jac_obs,jac_sysd_u,
- noise_obs,noise_sysd,
- Qlarge,L,Q,R,
- planning_horizon=15):
- """To initialize the planenr of B-LQR, one needs to supply parameters
+
+[docs]
+classBLQR(pomdp_py.Planner):
+ def__init__(
+ self,
+ func_sysd,
+ func_obs,
+ jac_sysd,
+ jac_obs,
+ jac_sysd_u,
+ noise_obs,
+ noise_sysd,
+ Qlarge,
+ L,
+ Q,
+ R,
+ planning_horizon=15,
+ ):
+"""To initialize the planenr of B-LQR, one needs to supply parameters that describe the underlying linear gaussian system, and also matrices that describe the cost function. Note that math symbols
- (e.g. xt, ut) are vectors or matrices represented as np.arrays.
+ (e.g. xt, ut) are vectors or matrices represented as np.arrays. The B-LQR cost function (Eq.14 in the paper): :math:`J(b_{t:T},u_{t:T}) = \bar{m}_T^TQ_{large}\bar{m}_T+\bar{s}_T^T\Lambda\bar{s}_T + \sum_{k=t}^{T-1}\bar{m}_k^TQ\bar{m}_k+\bar{u}_t^TR\bar{u}_t`
-
+
Args: func_sysd (function): f: (xt, ut) -> xt+1 func_obs (function): g: (xt) -> zt
@@ -169,7 +177,7 @@
+[docs]
+ defekf_update_mlo(self,bt,ut):
+""" Performs the ekf belief update assuming maximum likelihood observation. This follows equations 12, 13 in the paper. It's the function :math:`F`.
@@ -199,28 +208,31 @@
Source code for pomdp_py.algorithms.bsp.blqr
observation function. This corresponds to Wt in equation 13.
"""# TODO: FIX
- mt,Cov_t=bt
+ mt,Cov_t=btAt=self._jac_sysd(mt,ut)Ct=self._jac_obs(mt)# based on maximum likelihood observationWt=self._noise_obs(mt).covGat=np.dot(np.dot(At,Cov_t),At.transpose())# Gamma_t = At*Cov_t*At^TCGC_W_inv=np.linalg.inv(np.dot(np.dot(Ct,Gat),Ct.transpose())+Wt)
- Cov_next=Gat-np.dot(np.dot(np.dot(np.dot(Gat,Ct.transpose()),CGC_W_inv),Ct),Gat)
+ Cov_next=Gat-np.dot(
+ np.dot(np.dot(np.dot(Gat,Ct.transpose()),CGC_W_inv),Ct),Gat
+ )m_next=self._func_sysd(mt,ut)+self._noise_sysd(mt).random()return(m_next,Cov_next)
+
def_b_u_seg(self,x,i):
- """Returns the elements of x that corresponds to the belief and controls,
+"""Returns the elements of x that corresponds to the belief and controls, as a tuple (m_i, Covi, u_i)"""d=self._dim_statec=self._dim_control
- start=i*(d+(d*d)+c)
- end=(i+1)*(d+(d*d)+c)
- m_i_end=i*(d+(d*d)+c)+d
- Covi_end=i*(d+(d*d)+c)+d+d*d
- u_end=i*(d+(d*d)+c)+d+d*d+c
+ start=i*(d+(d*d)+c)
+ end=(i+1)*(d+(d*d)+c)
+ m_i_end=i*(d+(d*d)+c)+d
+ Covi_end=i*(d+(d*d)+c)+d+d*d
+ u_end=i*(d+(d*d)+c)+d+d*d+creturnx[start:m_i_end],x[m_i_end:Covi_end],x[Covi_end:u_end]
-
+
def_control_max_constraint(self,x,i,max_val):_,_,u_i=self._b_u_seg(x,i)returnmax_val-u_i
@@ -230,9 +242,9 @@
[docs]defintegrate_belief_segment(self,b_i,u_i,num_segments):
- """This is to represent equation 18.
-
+
+[docs]
+ defintegrate_belief_segment(self,b_i,u_i,num_segments):
+"""This is to represent equation 18.
+
phi(b_i', u_i') = F(b_i', u_i') + sum F(b_{t+1}, u_i) - F(b_t, u_i) t in seg
@@ -257,18 +273,18 @@
-
+
def_opt_cost_func_seg(self,x,b_des,u_des,num_segments):
- """This will be used as part of the objective function for scipy
+"""This will be used as part of the objective function for scipy optimizer. Therefore we only take in one input vector, x. We require the structure of x to be:
@@ -283,8 +299,10 @@
[docs]defsegmented_cost_function(self,bu_traj,b_des,u_des,num_segments):
- """The cost function in eq 17.
+
+[docs]
+ defsegmented_cost_function(self,bu_traj,b_des,u_des,num_segments):
+"""The cost function in eq 17. Args: b_des (tuple): The desired belief (mT, CovT). This is needed to compute the cost function.
@@ -292,13 +310,17 @@
Source code for pomdp_py.algorithms.bsp.blqr
If this information is not available, pass in an empty list.
"""iflen(u_des)>0andlen(u_des)!=num_segments:
- raiseValueError("The list of desired controls, if provided, must have one control"\
- "per segment")
+ raiseValueError(
+ "The list of desired controls, if provided, must have one control"
+ "per segment"
+ )b_T,u_T=bu_traj[-1]m_T,Cov_T=b_T
- s_T=Cov_T.transpose().reshape(-1,)
-
+ s_T=Cov_T.transpose().reshape(
+ -1,
+ )
+
sLs=np.dot(np.dot(s_T.transpose(),self._L),s_T)Summation=0foriinrange(num_segments):
@@ -310,13 +332,25 @@
[docs]defcreate_plan(self,b_0,b_des,u_init,num_segments=5,control_bounds=None,
- opt_options={},opt_callback=None):
- """Solves the SQP problem using direct transcription, and produce belief points
+
+
+
+[docs]
+ defcreate_plan(
+ self,
+ b_0,
+ b_des,
+ u_init,
+ num_segments=5,
+ control_bounds=None,
+ opt_options={},
+ opt_callback=None,
+ ):
+"""Solves the SQP problem using direct transcription, and produce belief points and controls at segments. Reference: https://docs.scipy.org/doc/scipy/reference/tutorial/optimize.html"""# build initial guess from initial belief and the given trajectory.
@@ -324,61 +358,81 @@
Source code for pomdp_py.problems.light_dark.env.env
"""Defines the Environment for the light dark domain.Origin: Belief space planning assuming maximum likelihood observations"""importpomdp_py
-importpomdp_problems.light_darkasld
+importpomdp_py.problems.light_darkasldimportnumpyasnp
-
Source code for pomdp_problems.maze.domain.observation
-
-importpomdp_py
+
Source code for pomdp_py.problems.maze.domain.observation
+importpomdp_py# we index the walls around a grid cell in# clockwise fashion: top wall (0), right wall (1),# bottom wall (2), left wall (3).
-WALL={
- 0:"top",
- 1:"right",
- 2:"bottom",
- 3:"left"
-}
-
-
+[docs]
+classObservation(pomdp_py.Observation):def__init__(self,walls,orientation):
- """
+""" Args: walls (tuple) is a tuple of integers, that indicate the walls around a grid cell that are present.
@@ -149,8 +143,8 @@
Source code for pomdp_problems.maze.domain.observation
if notisinstance(other,Observation):returnFalseelse:
- returnself.walls==other.walls\
- andself.orientation==other.orientation
Source code for pomdp_py.problems.maze.domain.state
"""Defines the State for the maze domain, which is the position of the robot and its orientation."""
+
importpomdp_pyimportnumpyasnp
-
[docs]classState(pomdp_py.State):
- """The state of the problem is just the robot position"""
+
+
+[docs]
+classState(pomdp_py.State):
+"""The state of the problem is just the robot position"""
+
def__init__(self,positition,orientation):
- """
+""" Initializes a state in light dark domain. Args:
@@ -138,18 +140,19 @@
Source code for pomdp_py.problems.multi_object_search.agent.agent
+# Defines the agent. There's nothing special
+# about the MOS agent in fact, except that
+# it uses models defined in ..models, and
+# makes use of the belief initialization
+# functions in belief.py
+importpomdp_py
+from.beliefimport*
+from..models.transition_modelimport*
+from..models.observation_modelimport*
+from..models.reward_modelimport*
+from..models.policy_modelimport*
+
+
+
+[docs]
+classMosAgent(pomdp_py.Agent):
+"""One agent is one robot."""
+
+ def__init__(
+ self,
+ robot_id,
+ init_robot_state,# initial robot state (assuming robot state is observable perfectly)
+ object_ids,# target object ids
+ dim,# tuple (w,l) of the width (w) and length (l) of the gridworld search space.
+ sensor,# Sensor equipped on the robot
+ sigma=0.01,# parameter for observation model
+ epsilon=1,# parameter for observation model
+ belief_rep="histogram",# belief representation, either "histogram" or "particles".
+ prior={},# prior belief, as defined in belief.py:initialize_belief
+ num_particles=100,# used if the belief representation is particles
+ grid_map=None,
+ ):# GridMap used to avoid collision with obstacles (None if not provided)
+ self.robot_id=robot_id
+ self._object_ids=object_ids
+ self.sensor=sensor
+
+ # since the robot observes its own pose perfectly, it will have 100% prior
+ # on this pose.
+ prior[robot_id]={init_robot_state.pose:1.0}
+ rth=init_robot_state.pose[2]
+
+ # initialize belief
+ init_belief=initialize_belief(
+ dim,
+ self.robot_id,
+ self._object_ids,
+ prior=prior,
+ representation=belief_rep,
+ robot_orientations={self.robot_id:rth},
+ num_particles=num_particles,
+ )
+ transition_model=MosTransitionModel(
+ dim,{self.robot_id:self.sensor},self._object_ids
+ )
+ observation_model=MosObservationModel(
+ dim,self.sensor,self._object_ids,sigma=sigma,epsilon=epsilon
+ )
+ reward_model=GoalRewardModel(self._object_ids,robot_id=self.robot_id)
+ policy_model=PolicyModel(self.robot_id,grid_map=grid_map)
+ super().__init__(
+ init_belief,
+ policy_model,
+ transition_model=transition_model,
+ observation_model=observation_model,
+ reward_model=reward_model,
+ )
+
+
Source code for pomdp_problems.multi_object_search.agent.belief
+
Source code for pomdp_py.problems.multi_object_search.agent.belief
# Defines the belief distribution and update for the 2D Multi-Object Search domain;## The belief distribution is represented as a Histogram (or Tabular representation).
@@ -133,11 +130,15 @@
Source code for pomdp_problems.multi_object_search.agent.belief
import copyfrom..domain.stateimport*
-
[docs]classMosOOBelief(pomdp_py.OOBelief):
- """This is needed to make sure the belief is sampling the right
+
+
+[docs]
+classMosOOBelief(pomdp_py.OOBelief):
+"""This is needed to make sure the belief is sampling the right type of State for this problem."""
+
def__init__(self,robot_id,object_beliefs):
- """
+""" robot_id (int): The id of the robot that has this belief. object_beliefs (objid -> GenerativeDistribution) (includes robot)
@@ -145,16 +146,32 @@
Source code for pomdp_problems.multi_object_search.agent.belief
+[docs]
+definitialize_belief(
+ dim,
+ robot_id,
+ object_ids,
+ prior={},
+ representation="histogram",
+ robot_orientations={},
+ num_particles=100,
+):
+""" Returns a GenerativeDistribution that is the belief representation for the multi-object search problem.
@@ -165,7 +182,7 @@
Source code for pomdp_problems.multi_object_search.agent.belief
over; They are `assumed` to be the target objects, not obstacles,
because the robot doesn't really care about obstacle locations and modeling them just adds computation cost.
- prior (dict): A mapping {(objid|robot_id) -> {(x,y) -> [0,1]}}. If used, then
+ prior (dict): A mapping {(objid|robot_id) -> {(x,y) -> [0,1]}}. If used, then all locations not included in the prior will be treated to have 0 probability. If unspecified for an object, then the belief over that object is assumed to be a uniform distribution.
@@ -177,16 +194,20 @@
Source code for pomdp_problems.multi_object_search.agent.belief
-
+
+
def_initialize_histogram_belief(dim,robot_id,object_ids,prior,robot_orientations):
- """
+""" Returns the belief distribution represented as a histogram """oo_hists={}# objid -> Histogram
@@ -204,7 +225,7 @@
Source code for pomdp_problems.multi_object_search.agent.belief
# no prior knowledge. So uniform.
forxinrange(width):foryinrange(length):
- state=ObjectState(objid,"target",(x,y))
+ state=ObjectState(objid,"target",(x,y))hist[state]=1.0total_prob+=hist[state]
@@ -219,15 +240,17 @@
Source code for pomdp_problems.multi_object_search.agent.belief
# Its pose must have been provided in the `prior`.
assertrobot_idinprior,"Missing initial robot pose in prior."init_robot_pose=list(prior[robot_id].keys())[0]
- oo_hists[robot_id]=\
- pomdp_py.Histogram({RobotState(robot_id,init_robot_pose,(),None):1.0})
-
+ oo_hists[robot_id]=pomdp_py.Histogram(
+ {RobotState(robot_id,init_robot_pose,(),None):1.0}
+ )
+
returnMosOOBelief(robot_id,oo_hists)
-def_initialize_particles_belief(dim,robot_id,object_ids,prior,
- robot_orientations,num_particles=100):
- """This returns a single set of particles that represent the distribution over a
+def_initialize_particles_belief(
+ dim,robot_id,object_ids,prior,robot_orientations,num_particles=100
+):
+"""This returns a single set of particles that represent the distribution over a joint state space of all objects. Since it is very difficult to provide a prior knowledge over the joint state
@@ -242,11 +265,13 @@
Source code for pomdp_problems.multi_object_search.agent.belief
# Its pose must have been provided in the `prior`.
assertrobot_idinprior,"Missing initial robot pose in prior."init_robot_pose=list(prior[robot_id].keys())[0]
-
+
oo_particles={}# objid -> Particageleswidth,length=dimforobjidinobject_ids:
- particles=[RobotState(robot_id,init_robot_pose,(),None)]# list of states; Starting the observable robot state.
+ particles=[
+ RobotState(robot_id,init_robot_pose,(),None)
+ ]# list of states; Starting the observable robot state.ifobjidinprior:# prior knowledge provided. Just use the prior knowledgeprior_sum=sum(prior[objid][pose]forposeinprior[objid])
@@ -260,12 +285,12 @@
Source code for pomdp_problems.multi_object_search.agent.belief
for _inrange(num_particles):x=random.randrange(0,width)y=random.randrange(0,length)
- state=ObjectState(objid,"target",(x,y))
+ state=ObjectState(objid,"target",(x,y))particles.append(state)particles_belief=pomdp_py.Particles(particles)oo_particles[objid]=particles_belief
-
+
# Return Particles distribution which contains particles# that represent joint object statesparticles=[]
@@ -310,11 +335,11 @@
Source code for pomdp_problems.multi_object_search.agent.belief
Source code for pomdp_problems.multi_object_search.domain.action
+
Source code for pomdp_py.problems.multi_object_search.domain.action
"""Defines the Action for the 2D Multi-Object Search domain;
-Action space:
+Action space: Motion :math:`\cup` Look :math:`\cup` Find
@@ -133,42 +130,57 @@
Source code for pomdp_problems.multi_object_search.domain.action
It is possible to force "Look" after every N/S/E/W action;
then the Look action could be dropped. This is optional behavior."""
+
importpomdp_pyimportmath
+
###### Actions ######
-
[docs]classAction(pomdp_py.Action):
- """Mos action; Simple named action."""
+
-MOTION_SCHEME="xy"# can be either xy or vw
-STEP_SIZE=1
-
[docs]classMotionAction(Action):
+
+
+MOTION_SCHEME="xy"# can be either xy or vw
+STEP_SIZE=1
+
+
+
+[docs]
+classMotionAction(Action):# scheme 1 (vx,vy,th)SCHEME_XYTH="xyth"EAST=(STEP_SIZE,0,0)# x is horizontal; x+ is right. y is vertical; y+ is down.WEST=(-STEP_SIZE,0,math.pi)
- NORTH=(0,-STEP_SIZE,3*math.pi/2)
- SOUTH=(0,STEP_SIZE,math.pi/2)
-
+ NORTH=(0,-STEP_SIZE,3*math.pi/2)
+ SOUTH=(0,STEP_SIZE,math.pi/2)
+
# scheme 2 (vt, vw) translational, rotational velocities.
- SCHEME_VW="vw"
+ SCHEME_VW="vw"FORWARD=(STEP_SIZE,0)BACKWARD=(-STEP_SIZE,0)
- LEFT=(0,-math.pi/4)# left 45 deg
- RIGHT=(0,math.pi/4)# right 45 deg
+ LEFT=(0,-math.pi/4)# left 45 deg
+ RIGHT=(0,math.pi/4)# right 45 deg# scheme 3 (vx,vy)SCHEME_XY="xy"
@@ -179,10 +191,8 @@
Source code for pomdp_problems.multi_object_search.domain.action
SCHEMES ={"xyth","xy","vw"}
- def__init__(self,motion,
- scheme=MOTION_SCHEME,distance_cost=1,
- motion_name=None):
- """
+ def__init__(self,motion,scheme=MOTION_SCHEME,distance_cost=1,motion_name=None):
+""" motion (tuple): a tuple of floats that describes the motion; scheme (str): description of the motion scheme; Either "xy" or "vw"
@@ -191,51 +201,97 @@
Source code for pomdp_problems.multi_object_search.domain.action
Source code for pomdp_problems.multi_object_search.domain.observation
+
Source code for pomdp_py.problems.multi_object_search.domain.observation
"""Defines the Observation for the 2D Multi-Object Search domain;
@@ -131,48 +128,61 @@
Source code for pomdp_problems.multi_object_search.domain.observation
it could be something else. But the resulting observation
should be a map from object id to observed pose or NULL (not observed)."""
+
importpomdp_py
+
###### Observation ######
-
[docs]classObjectObservation(pomdp_py.Observation):
- """The xy pose of the object is observed; or NULL if not observed"""
+
+[docs]
+classObjectObservation(pomdp_py.Observation):
+"""The xy pose of the object is observed; or NULL if not observed"""
+
NULL=None
+
def__init__(self,objid,pose):self.objid=objid
- iftype(pose)==tupleandlen(pose)==2\
- orpose==ObjectObservation.NULL:
+ iftype(pose)==tupleandlen(pose)==2orpose==ObjectObservation.NULL:self.pose=poseelse:
- raiseValueError("Invalid observation %s for object"
- %(str(pose),objid))
+ raiseValueError("Invalid observation %s for object"%(str(pose),objid))
+
def__hash__(self):returnhash((self.objid,self.pose))
+
def__eq__(self,other):ifnotisinstance(other,ObjectObservation):returnFalseelse:
- returnself.objid==other.objid\
- andself.pose==other.pose
[docs]classMosOOObservation(pomdp_py.OOObservation):
- """Observation for Mos that can be factored by objects;
+
+[docs]
+classMosOOObservation(pomdp_py.OOObservation):
+"""Observation for Mos that can be factored by objects; thus this is an OOObservation."""
+
def__init__(self,objposes):
- """
+""" objposes (dict): map from objid to 2d pose or NULL (not ObjectObservation!). """self._hashcode=hash(frozenset(objposes.items()))self.objposes=objposes
-
+[docs]
+ deffactor(self,next_state,*params,**kwargs):
+"""Factor this OO-observation by objects"""
+ return{
+ objid:ObjectObservation(objid,self.objposes[objid])
+ forobjidinnext_state.object_states
+ ifobjid!=next_state.robot_id
+ }
+
+
+
+[docs]
+ @classmethoddefmerge(cls,object_observations,next_state,*params,**kwargs):
- """Merge `object_observations` into a single OOObservation object;
-
+"""Merge `object_observations` into a single OOObservation object;
+
object_observation (dict): Maps from objid to ObjectObservation"""
- returnMosOOObservation({objid:object_observations[objid].pose
- forobjidinobject_observations
- ifobjid!=next_state.object_states[objid].objclass!="robot"})
+[docs]
+ defstate_transition(self,action,execute=True,robot_id=None):
+"""state_transition(self, action, execute=True, **kwargs) Overriding parent class function. Simulates a state transition given `action`. If `execute` is set to True,
@@ -175,28 +177,37 @@
Source code for pomdp_problems.multi_object_search.env.env
is False.
"""
- assertrobot_idisnotNone,"state transition should happen for a specific robot"
+ assert(
+ robot_idisnotNone
+ ),"state transition should happen for a specific robot"next_state=copy.deepcopy(self.state)
- next_state.object_states[robot_id]=\
- self.transition_model[robot_id].sample(self.state,action)
-
- reward=self.reward_model.sample(self.state,action,next_state,
- robot_id=robot_id)
+ next_state.object_states[robot_id]=self.transition_model[robot_id].sample(
+ self.state,action
+ )
+
+ reward=self.reward_model.sample(
+ self.state,action,next_state,robot_id=robot_id
+ )ifexecute:self.apply_transition(next_state)returnrewardelse:
- returnnext_state,reward
+ returnnext_state,reward
+
+
+
#### Interpret string as an initial world state ####
-
+[docs]
+definterpret(worldstr):
+""" Interprets a problem instance description in `worldstr` and returns the corresponding MosEnvironment. For example: This string
-
+
.. code-block:: text rx...
@@ -204,7 +215,7 @@
Source code for pomdp_problems.multi_object_search.env.env
.....
*** r: laser fov=90 min_range=1 max_range=10
-
+
describes a 3 by 5 world where x indicates obsticles and T indicates the "target object". T could be replaced by any upper-case letter A-Z which will serve as the object's id. Lower-case letters a-z (except for x)
@@ -212,14 +223,14 @@
Source code for pomdp_problems.multi_object_search.env.env
After the world, the :code:`***` signals description of the sensor for each robot.
For example "r laser 90 1 10" means that robot `r` will have a Laser2Dsensor
- with fov 90, min_range 1.0, and max_range of 10.0.
+ with fov 90, min_range 1.0, and max_range of 10.0. Args: worldstr (str): a string that describes the initial state of the world. Returns: MosEnvironment: the corresponding environment for the world description.
-
+
"""worldlines=[]sensorlines=[]
@@ -234,12 +245,11 @@
Source code for pomdp_problems.multi_object_search.env.env
Source code for pomdp_problems.multi_object_search.env.env
# Parse sensors
forlineinsensorlines:if","inline:
- raiseValueError("Wrong Fromat. SHould not have ','. Separate tokens with space.")
+ raiseValueError(
+ "Wrong Fromat. SHould not have ','. Separate tokens with space."
+ )robot_name=line.split(":")[0].strip()robot_id=interpret_robot_id(robot_name)
- assertrobot_idinrobots,"Sensor specified for unknown robot %s"%(robot_name)
-
+ assertrobot_idinrobots,"Sensor specified for unknown robot %s"%(
+ robot_name
+ )
+
sensor_setting=line.split(":")[1].strip()sensor_type=sensor_setting.split(" ")[0].strip()sensor_params={}
@@ -289,7 +304,7 @@
Source code for pomdp_problems.multi_object_search.env.env
+[docs]
+defequip_sensors(worldmap,sensors):
+""" Args: worldmap (str): a string that describes the initial state of the world. sensors (dict) a map from robot character representation (e.g. 'r') to a
@@ -321,8 +343,12 @@
Source code for pomdp_problems.multi_object_search.env.env
Source code for pomdp_problems.multi_object_search.example_worlds
+
Source code for pomdp_py.problems.multi_object_search.example_worlds
"""This file has some examples of world string."""
+
importrandom############# Example Worlds ###########
@@ -125,14 +123,16 @@
Source code for pomdp_problems.multi_object_search.example_worlds
# the format
world0=(
-"""
+"""rx....x.xT.....
-""","r")
+""",
+ "r",
+)world1=(
-"""
+"""rx.T....x........xx..
@@ -140,11 +140,13 @@
Source code for pomdp_problems.multi_object_search.example_worlds
.xxx.T.
.xxx..........
-""","r")
+""",
+ "r",
+)# Used to test the shape of the sensorworld2=(
-"""
+"""....................................xxxxxxxxxxxxx..
@@ -166,11 +168,13 @@
Source code for pomdp_problems.multi_object_search.example_worlds
..xxxxxxxxxxxxx..
..................................
-""","r")
+""",
+ "r",
+)# Used to test sensor occlusionworld3=(
-"""
+"""....................................xxxxxxxxxxxxx..
@@ -192,11 +196,15 @@
Source code for pomdp_problems.multi_object_search.example_worlds
Source code for pomdp_problems.multi_object_search.models.components.grid_map
+
Source code for pomdp_py.problems.multi_object_search.models.components.grid_map
"""Optional grid map to assist collision avoidance during planning."""
-frompomdp_problems.multi_object_search.models.transition_modelimportRobotTransitionModel
-frompomdp_problems.multi_object_search.domain.actionimport*
-frompomdp_problems.multi_object_search.domain.stateimport*
+frompomdp_py.problems.multi_object_search.models.transition_modelimport(
+ RobotTransitionModel,
+)
+frompomdp_py.problems.multi_object_search.domain.actionimport*
+frompomdp_py.problems.multi_object_search.domain.stateimport*
+
-
[docs]classGridMap:
- """This map assists the agent to avoid planning invalid
+
+[docs]
+classGridMap:
+"""This map assists the agent to avoid planning invalid actions that will run into obstacles. Used if we assume the agent has a map. This map does not contain information about the object locations."""
+
def__init__(self,width,length,obstacles):
- """
+""" Args: obstacles (dict): Map from objid to (x,y); The object is supposed to be an obstacle. width (int): width of the grid map
- length (int): length of the grid map
+ length (int): length of the grid map """self.width=widthself.length=length
@@ -147,31 +150,36 @@
Source code for pomdp_problems.multi_object_search.models.components.grid_ma
forobjidinself._obstacles}# set of obstacle poses
- self.obstacle_poses=set({self._obstacles[objid]
- forobjidinself._obstacles})
+ self.obstacle_poses=set({self._obstacles[objid]forobjidinself._obstacles})
-
+[docs]
+ defvalid_motions(self,robot_id,robot_pose,all_motion_actions):
+""" Returns a set of MotionAction(s) that are valid to be executed from robot pose (i.e. they will not bump into obstacles). The validity is determined under the assumption that the robot dynamics is deterministic. """state=MosOOState(self._obstacle_states)
- state.set_object_state(robot_id,
- RobotState(robot_id,robot_pose,None,None))
+ state.set_object_state(robot_id,RobotState(robot_id,robot_pose,None,None))valid=set({})formotion_actioninall_motion_actions:ifnotisinstance(motion_action,MotionAction):
- raiseValueError("This (%s) is not a motion action"%str(motion_action))
+ raiseValueError(
+ "This (%s) is not a motion action"%str(motion_action)
+ )
- next_pose=RobotTransitionModel.if_move_by(robot_id,state,
- motion_action,(self.width,self.length))
+ next_pose=RobotTransitionModel.if_move_by(
+ robot_id,state,motion_action,(self.width,self.length)
+ )ifnext_pose!=robot_pose:# robot moved --> valid motionvalid.add(motion_action)
- returnvalid
+ returnvalid
+
+
@@ -181,11 +189,11 @@
Source code for pomdp_problems.multi_object_search.models.components.grid_ma
Source code for pomdp_problems.multi_object_search.models.components.sensor
+
Source code for pomdp_py.problems.multi_object_search.models.components.sensor
"""Sensor model (for example, laser scanner)"""importmathimportnumpyasnp
-frompomdp_problems.multi_object_search.domain.actionimport*
-frompomdp_problems.multi_object_search.domain.observationimport*
+frompomdp_py.problems.multi_object_search.domain.actionimport*
+frompomdp_py.problems.multi_object_search.domain.observationimport*# Note that the occlusion of an object is implemented based on# whether a beam will hit an obstacle or some other object before
@@ -132,32 +129,54 @@
Source code for pomdp_problems.multi_object_search.models.components.sensor<
# not receive observation for some regions in the field of view# making it a more challenging situation to deal with.
+
# Utility functions
-
+[docs]
+ defobserve(self,robot_pose,env_state):
+""" Returns an Observation with this sensor model. """raiseNotImplementedError
-
[docs]defwithin_range(self,robot_pose,point):
- """Returns true if the point is within range of the sensor; but the point might not
+
+
+[docs]
+ defwithin_range(self,robot_pose,point):
+"""Returns true if the point is within range of the sensor; but the point might not actually be visible due to occlusion or "gap" between beams"""raiseValueError
+[docs]
+classLaser2DSensor:
+"""Fan shaped 2D laser sensor"""
+
+ def__init__(
+ self,
+ robot_id,
+ fov=90,
+ min_range=1,
+ max_range=5,
+ angle_increment=5,
+ occlusion_enabled=False,
+ ):
+""" fov (float): angle between the start and end beams of one scan (degree). min_range (int or float) max_range (int or float)
@@ -191,60 +219,96 @@
Source code for pomdp_problems.multi_object_search.models.components.sensor<
# For example, the fov=pi, means the range scanner scans 180 degrees# in front of the robot. By our angle convention, 180 degrees maps to [0,90] and [270, 360]."""self._fov_left=(0,self.fov/2)
- self._fov_right=(2*math.pi-self.fov/2,2*math.pi)
+ self._fov_right=(2*math.pi-self.fov/2,2*math.pi)# beams that are actually within the fov (set of angles)
- self._beams={round(th,2)
- forthinnp.linspace(self._fov_left[0],
- self._fov_left[1],
- int(round((self._fov_left[1]-self._fov_left[0])/self.angle_increment)))}\
- |{round(th,2)
- forthinnp.linspace(self._fov_right[0],
- self._fov_right[1],
- int(round((self._fov_right[1]-self._fov_right[0])/self.angle_increment)))}
+ self._beams={
+ round(th,2)
+ forthinnp.linspace(
+ self._fov_left[0],
+ self._fov_left[1],
+ int(
+ round(
+ (self._fov_left[1]-self._fov_left[0])/self.angle_increment
+ )
+ ),
+ )
+ }|{
+ round(th,2)
+ forthinnp.linspace(
+ self._fov_right[0],
+ self._fov_right[1],
+ int(
+ round(
+ (self._fov_right[1]-self._fov_right[0])/self.angle_increment
+ )
+ ),
+ )
+ }# The size of the sensing region here is the area covered by the fan
- self._sensing_region_size=self.fov/(2*math.pi)*math.pi*(max_range-min_range)**2
-
-
[docs]defin_field_of_view(th,view_angles):
- """Determines if the beame at angle `th` is in a field of view of size `view_angles`.
+ self._sensing_region_size=(
+ self.fov/(2*math.pi)*math.pi*(max_range-min_range)**2
+ )
+
+
+[docs]
+ defin_field_of_view(th,view_angles):
+"""Determines if the beame at angle `th` is in a field of view of size `view_angles`. For example, the view_angles=180, means the range scanner scans 180 degrees
- in front of the robot. By our angle convention, 180 degrees maps to [0,90] and [270, 360]."""
+ in front of the robot. By our angle convention, 180 degrees maps to [0,90] and [270, 360].
+ """fov_right=(0,view_angles/2)
- fov_left=(2*math.pi-view_angles/2,2*math.pi)
+ fov_left=(2*math.pi-view_angles/2,2*math.pi)
+
-
[docs]defwithin_range(self,robot_pose,point):
- """Returns true if the point is within range of the sensor; but the point might not
+
+[docs]
+ defwithin_range(self,robot_pose,point):
+"""Returns true if the point is within range of the sensor; but the point might not actually be visible due to occlusion or "gap" between beams"""dist,bearing=self.shoot_beam(robot_pose,point)ifnotin_range(dist,(self.min_range,self.max_range)):returnFalse
- if(notin_range(bearing,self._fov_left))\
- and(notin_range(bearing,self._fov_right)):
+ if(notin_range(bearing,self._fov_left))and(
+ notin_range(bearing,self._fov_right)
+ ):returnFalsereturnTrue
-
[docs]defshoot_beam(self,robot_pose,point):
- """Shoots a beam from robot_pose at point. Returns the distance and bearing
+
+
+[docs]
+ defshoot_beam(self,robot_pose,point):
+"""Shoots a beam from robot_pose at point. Returns the distance and bearing of the beame (i.e. the length and orientation of the beame)"""rx,ry,rth=robot_pose
- dist=euclidean_dist(point,(rx,ry))
- bearing=(math.atan2(point[1]-ry,point[0]-rx)-rth)%(2*math.pi)# bearing (i.e. orientation)
+ dist=euclidean_dist(point,(rx,ry))
+ bearing=(math.atan2(point[1]-ry,point[0]-rx)-rth)%(
+ 2*math.pi
+ )# bearing (i.e. orientation)return(dist,bearing)
-
[docs]defvalid_beam(self,dist,bearing):
- """Returns true beam length (i.e. `dist`) is within range and its angle
+
+
+[docs]
+ defvalid_beam(self,dist,bearing):
+"""Returns true beam length (i.e. `dist`) is within range and its angle `bearing` is valid, that is, it is within the fov range and in accordance with the angle increment."""
- returndist>=self.min_rangeanddist<=self.max_range\
- andround(bearing,2)inself._beams
+
def_build_beam_map(self,beam,point,beam_map={}):
- """beam_map (dict): Maps from bearing to (dist, point)"""
+"""beam_map (dict): Maps from bearing to (dist, point)"""dist,bearing=beamvalid=self.valid_beam(dist,bearing)ifnotvalid:return
- bearing_key=round(bearing,2)
+ bearing_key=round(bearing,2)ifbearing_keyinbeam_map:# There's an object covered by this beame already.# see if this beame is closer
@@ -258,8 +322,10 @@
Source code for pomdp_problems.multi_object_search.models.components.sensor<
else:beam_map[bearing_key]=(dist,point)
-
[docs]classProximitySensor(Laser2DSensor):
- """This is a simple sensor; Observes a region centered
+
+
+[docs]
+classProximitySensor(Laser2DSensor):
+"""This is a simple sensor; Observes a region centered at the robot."""
- def__init__(self,robot_id,
- radius=5,
- occlusion_enabled=False):
- """
+
+ def__init__(self,robot_id,radius=5,occlusion_enabled=False):
+""" radius (int or float) radius of the sensing region. """self.robot_id=robot_id
@@ -316,12 +385,15 @@
Source code for pomdp_problems.multi_object_search.models.observation_model
+
Source code for pomdp_py.problems.multi_object_search.models.observation_model
"""Defines the ObservationModel for the 2D Multi-Object Search domain.Origin: Multi-Object Search using Object-Oriented POMDPs (ICRA 2019)
@@ -140,27 +137,31 @@
+[docs]
+classObjectObservationModel(pomdp_py.ObservationModel):def__init__(self,objid,sensor,dim,sigma=0,epsilon=1):
- """
+""" sigma and epsilon are parameters of the observation model (see paper), dim (tuple): a tuple (width, length) for the dimension of the world"""self._objid=objid
@@ -194,8 +200,10 @@
Source code for pomdp_problems.multi_object_search.models.observation_model<
gamma=self.epsilonreturnalpha,beta,gamma
-
+[docs]
+ defprobability(self,observation,next_state,action,**kwargs):
+""" Returns the probability of Pr (observation | next_state, action). Args:
@@ -216,13 +224,15 @@
Source code for pomdp_problems.multi_object_search.models.observation_model<
# The (funny) business of allowing histogram belief update using O(oi|si',sr',a).next_robot_state=kwargs.get("next_robot_state",None)ifnext_robot_stateisnotNone:
- assertnext_robot_state["id"]==self._sensor.robot_id,\
- "Robot id of observation model mismatch with given state"
+ assert(
+ next_robot_state["id"]==self._sensor.robot_id
+ ),"Robot id of observation model mismatch with given state"robot_pose=next_robot_state.poseifisinstance(next_state,ObjectState):
- assertnext_state["id"]==self._objid,\
- "Object id of observation model mismatch with given state"
+ assert(
+ next_state["id"]==self._objid
+ ),"Object id of observation model mismatch with given state"object_pose=next_state.poseelse:object_pose=next_state.pose(self._objid)
@@ -232,7 +242,9 @@
+[docs]
+ defsample(self,next_state,action,**kwargs):
+"""Returns observation"""ifnotisinstance(action,LookAction):# Not a look action. So no observationreturnObjectObservation(self._objid,ObjectObservation.NULL)
@@ -267,31 +281,39 @@
Source code for pomdp_problems.multi_object_search.models.observation_model<
object_pose=next_state.pose(self._objid)# Obtain observation according to distribution.
- alpha,beta,gamma=self._compute_params(self._sensor.within_range(robot_pose,object_pose))
+ alpha,beta,gamma=self._compute_params(
+ self._sensor.within_range(robot_pose,object_pose)
+ )# Requires Python >= 3.6
- event_occured=random.choices(["A","B","C"],weights=[alpha,beta,gamma],k=1)[0]
+ event_occured=random.choices(
+ ["A","B","C"],weights=[alpha,beta,gamma],k=1
+ )[0]zi=self._sample_zi(event_occured,next_state)returnObjectObservation(self._objid,zi)
Source code for pomdp_problems.multi_object_search.models.observation_model<
# TODO: FIX. zi should ONLY come from the field of view.# There is currently no easy way to sample from the field of view.width,height=self._dim
- zi=(random.randint(0,width),# x axis
- random.randint(0,height))# y axis
- else:# event == C
+ zi=(
+ random.randint(0,width),# x axis
+ random.randint(0,height),
+ )# y axis
+ else:# event == Czi=ObjectObservation.NULLreturnzi
Source code for pomdp_problems.multi_object_search.models.policy_model
-"""Policy model for 2D Multi-Object Search domain.
+
Source code for pomdp_py.problems.multi_object_search.models.policy_model
+"""Policy model for 2D Multi-Object Search domain.It is optional for the agent to be equipped with an occupancygrid map of the environment."""importpomdp_pyimportrandom
-frompomdp_problems.multi_object_search.domain.actionimport*
+frompomdp_py.problems.multi_object_search.domain.actionimport*
-
[docs]classPolicyModel(pomdp_py.RolloutPolicy):
- """Simple policy model. All actions are possible at any state."""
+
+
+[docs]
+classPolicyModel(pomdp_py.RolloutPolicy):
+"""Simple policy model. All actions are possible at any state."""def__init__(self,robot_id,grid_map=None):
- """FindAction can only be taken after LookAction"""
+"""FindAction can only be taken after LookAction"""self.robot_id=robot_idself._grid_map=grid_map
-
[docs]defargmax(self,state,**kwargs):
- """Returns the most likely action"""
+
+
+[docs]
+ defargmax(self,state,**kwargs):
+"""Returns the most likely action"""raiseNotImplementedError
-
[docs]defget_all_actions(self,state=None,history=None):
- """note: find can only happen after look."""
+
+
+[docs]
+ defget_all_actions(self,state=None,history=None):
+"""note: find can only happen after look."""can_find=FalseifhistoryisnotNoneandlen(history)>1:# last action
@@ -157,16 +168,20 @@
Source code for pomdp_problems.multi_object_search.models.policy_model