Skip to content

WA7. Markov Decision Process

Statement

In this assignment, you will write pseudo-code for Markov Decision Process.

A Markov Decision Process also known as MDP model contains the following set of features:

  • A set of possible states S.
  • A set of Models.
  • A set of possible actions A.
  • A real valued reward function R(s, a).
  • A solution of Markov Decision Process.

Consider the following Grid (3 by 3):

Grid

  • An agent lives in a grid. It starts at grid number (1 * 1) and can roam around in the grid using the following actions: UP, DOWN, LEFT, RIGHT.
  • The goal of the agent is to reach the grid number (3 * 3) with the diamond state.
  • The agent must avoid the fire state at grid number (3 * 1) at any cost.
  • Also, there is a block grid at (1 * 3) state, which the agent can’t pass and must choose an alternate route.
  • The agent cannot pass a wall. For example, in the starting grid (1 * 1), the agent can only go either UP or RIGHT.
  • Based on the above information, write a pseudo-code in Java or Python to solve the problem using the Markov decision process.

Your pseudo-code must do the following:

  • Implementation of a static environment (grid) using an array or other data structure that will represent the above grid.
  • A function/method to determine what action to take. The decision should be based upon Markov Decision Process.
  • Consideration of reward policy that incorporates the action costs in addition to any prizes or penalties that may be awarded.
  • A function/method to calculate the optimal policy when a blocked state is encountered.
  • A function/method to calculate the optimal policy when the fire state is encountered.
  • A function/method to test if the desired goal is achieved or not.

Table of Contents


Introduction

  • The Pseudo Code & Output section explains the implementation of the Markov Decision Process (MDP) in Python along with screenshots of both the terminal output and the visual representation of the agent’s movement in the grid.
  • The Source Code Explanation section provides a detailed breakdown of the code, including the main functions and their purposes.
  • The Source Code section contains the complete code for the MDP implementation, which includes the agent’s movement logic, reward calculations, and solution function. The visualization using PyGame is stripped out for simplicity, but it can be found in the attached main.py file.
  • The Grading Criteria section outlines the specific requirements for the assignment and how they are met in the implementation.
  • The Conclusion section summarizes the results and confirms that all requirements have been met.

Pseudo Code & Output

  • I used PyGame to visualize the agent’s movement in the environment.
  • The source code is available at the end of this text.
  • You may need to install PyGame using pip install pygame.
  • If do want to install anything, you can comment out the the the following lines in the source code (in the main function) and/or delete the VisualWorld class:

        world = VisualWorld(state) # line 284
        world.visualize() # line 299
    
  • If you don’t want to visualize the agent’s movement, you can can still follow the logs (explained below) in the terminal.

  • The world will be visualized in a window, where a 3X3 grid will be displayed, with the following details:

    • The agent’s current position will be highlighted in green.
    • The blocked/fire state will be highlighted in red.
    • The diamond state will be highlighted in purple.
    • The grid number and the content of each cell will be displayed.
    • Next to the grid, an information panel will show:
      • The current position of the agent.
      • The target position.
      • Whether the goal is achieved or not.
      • The history of the last 10 actions taken by the agent.
      • The current command being executed.
  • The agent will navigate to the target position (3, 3) using the following policies:
    • At every step, evaluate all possible actions and choose the one with the highest reward.
    • Moving to empty cells will have a reward of -1.
    • Blocked and fire cells will have a reward of -100 and -1000 respectively, hence the agent will avoid them.
Image 1: Initial State of the world
Initial State

The agent will start at position (1, 1) and will navigate to the target position (3, 3) while avoiding the blocked and fire states, and here is an intermediate state of the world:

Image 2: Intermediate State of the world
Intermediate State

Notice how it avoids the blocked state (1, 3) and the fire state (3, 1) while moving towards the target position (3, 3). And here is the final state of the world (after arriving to the goal):

Image 3: Final State of the world
Final State

More details about the agent’s movement will be printed in the terminal, including:

  • Detailed information about each move.
  • The total reward accumulated.
  • When the goal is achieved, it will display the total moves taken and the history of actions.
Image 4: Terminal Output
Terminal Output

Source Code Explanation

  • The entry point is main function.
  • We start by initializing the state of the world, and then instantiate the VisualWorld class to visualize the agent’s movement.
  • This step is optional, and you can comment it out if you don’t want to install PyGame.
  • Every function has some comments for important information.
  • The go_to executes multiple agent moves, and after every move we do necessary checks to determine the next action/command.
  • The simulate_updates function ensures that visuals are redrawn after every cycle, and it uses threading to handle the updates in parallel.
  • The source code included below does not include the VisualWorld class, to keep it simpler; however, the entire source is code is attached in main.py file.

Source Code

def get_direction_with_highest_reward(state, allowed_actions):
    """Get the direction with the highest reward."""
    current_pos = state['current_pos']
    current_x = current_pos['x']
    current_y = current_pos['y']
    up_y = current_y - 1 if current_y > 0 else current_y
    down_y = current_y + 1 if current_y < state['rows'] - 1 else current_y
    left_x = current_x - 1 if current_x > 0 else current_x
    right_x = current_x + 1 if current_x < state['cols'] - 1 else current_x

    up_reward = state['rewards'].get(state['grid'][up_y][current_x], -1)
    down_reward = state['rewards'].get(state['grid'][down_y][current_x], -1)
    left_reward = state['rewards'].get(state['grid'][current_y][left_x], -1)
    right_reward = state['rewards'].get(state['grid'][current_y][right_x], -1)

    # Choose the direction with the highest reward among the allowed actions
    rewards_dict = {
        "UP": up_reward,
        "DOWN": down_reward,
        "LEFT": left_reward,
        "RIGHT": right_reward
    }

    allowed_rewards = {k: v for k, v in rewards_dict.items()
                       if k in allowed_actions}
    direction = max(allowed_rewards, key=allowed_rewards.get)
    reward = allowed_rewards[direction]
    return {"direction": direction, "reward": reward}


def decide_direction_for_next_move(state, target_pos):
    """Decide the direction for the next move."""
    current_pos = state['current_pos']
    current_x = current_pos['x']
    current_y = current_pos['y']
    target_x = target_pos['x']
    target_y = target_pos['y']
    direction = None

    if current_x < target_x:
        direction = get_direction_with_highest_reward(
            # Policy 1: Move right or down, avoid left and up as they will take us away from the target
            state, ["RIGHT", "DOWN"])
    elif current_x > target_x:
        direction = get_direction_with_highest_reward(
            # Policy 2: Move left or down, avoid right and up as they will take us away from the target
            state, ["LEFT", "DOWN"])
    elif current_y < target_y:
        direction = get_direction_with_highest_reward(
            # Policy 3: Move down or left/right, avoid up as it will take us away from the target
            state, ["DOWN", "LEFT", "RIGHT"])
    elif current_y > target_y:
        direction = get_direction_with_highest_reward(
            # Policy 4: Move up or left/right, avoid down as it will take us away from the target
            state, ["UP", "LEFT", "RIGHT"])

    return direction


def is_goal_achieved(state):
    """Check if the goal is achieved. This is the solution function."""
    current_pos = state['current_pos']
    target_pos = state['target_pos']
    current_x = current_pos['x']
    current_y = current_pos['y']
    target_x = target_pos['x']
    target_y = target_pos['y']
    current_content = state['grid'][current_y][current_x]
    return current_x == target_x and current_y == target_y and current_content == "Diamond"


def move(state, direction, reward):
    """Move the agent one step in a direction."""
    current_pos = state['current_pos']
    state['goal_achieved'] = is_goal_achieved(state)
    current_x = current_pos['x']
    current_y = current_pos['y']
    next_x = current_x
    next_y = current_y

    if direction == "UP" and current_y > 0:
        next_y = current_y - 1
    elif direction == "DOWN" and current_y < state['rows'] - 1:
        next_y = current_y + 1
    elif direction == "LEFT" and current_x > 0:
        next_x = current_x - 1
    elif direction == "RIGHT" and current_x < state['cols'] - 1:
        next_x = current_x + 1

    next_pos = {'x': next_x, 'y': next_y}
    state['current_pos'] = next_pos
    state['current_command'] = direction
    state['history'].append((next_pos, direction))
    state['total_reward'] += reward

    moveNumber = len(state['history'])
    print(f"πŸ•š Move: {moveNumber}")
    print(
        f"πŸ‘‰ Moving {direction} : ({current_x}, {current_y}) -> ({next_x}, {next_y})")
    rewardText = reward if reward < 0 else f"+{reward}"
    print(
        f"πŸ’° Total Reward: {state['total_reward']} | Current Reward: {rewardText}")
    print("\n")
    time.sleep(1)


def go_to(state, target_pos):
    """Instruct the agent to navigate to a specified target position."""
    state['target_pos'] = target_pos
    state['goal_achieved'] = is_goal_achieved(state)

    # strategy is to move in the x direction first, then in the y direction
    while not state['goal_achieved']:
        next_move = decide_direction_for_next_move(state, target_pos)
        move(state, next_move['direction'], next_move['reward'])
        state['goal_achieved'] = is_goal_achieved(state)

    if state['goal_achieved']:
        print("πŸŽ‰ Goal Achieved! πŸŽ‰")
        print(f"πŸ’° Total Reward: {state['total_reward']}")
        totalMoves = len(state['history'])
        print(f"πŸ‘‰ Total Moves: {totalMoves}")
        print(
            f"πŸ“ Agent Position: ({state['current_pos']['x']}, {state['current_pos']['y']})")
        print(f"⏰ History:")
        for i, (pos, action) in enumerate(state['history']):
            print(f"\t{i+1}. {action} ({pos['x']}, {pos['y']})")
        print("\n")


def main():
    state = {
        "current_pos": {"x": 0, "y": 0},
        "cols": 3,
        "rows": 3,
        "grid": [
            ["Start", "", "Blocked"],
            ["", "", ""],
            ["Fire", "", "Diamond"]
        ],
        "history": [],
        "target_pos": {"x": 2, "y": 2},
        "goal_achieved": False,
        "rewards": {
            "Start": 0,
            "Blocked": -100,
            "Fire": -1000,
            "Diamond": 100,
            "Move": -1
        },
        "actions": ["UP", "DOWN", "LEFT", "RIGHT"],
        "total_reward": 0
    }
    world = VisualWorld(state) # line 284, comment if you don't want to install pyGame

    def simulate_updates():
        stop = False
        time.sleep(20)
        while not stop:
            go_to(state, state['target_pos'])
            stop = state['goal_achieved']
            time.sleep(1)

    # # Run updates and visualization in parallel
    import threading
    update_thread = threading.Thread(target=simulate_updates)
    update_thread.start()
    world.visualize() # line 299, comment if you don't want to visualize the world.


if __name__ == "__main__":
    main()

Grading Criteria

Is the data structure appropriate for such an environment?

Here is the data structure used in the code:

    state = {
        "current_pos": {"x": 0, "y": 0},
        "cols": 3,
        "rows": 3,
        "grid": [
            ["Start", "", "Blocked"],
            ["", "", ""],
            ["Fire", "", "Diamond"]
        ],
        "history": [],
        "target_pos": {"x": 2, "y": 2},
        "goal_achieved": False,
        "rewards": {
            "Start": 0,
            "Blocked": -100,
            "Fire": -1000,
            "Diamond": 100,
            "Move": -1
        },
        "actions": ["UP", "DOWN", "LEFT", "RIGHT"],
        "total_reward": 0
    }

Does the algorithm take decisions based on Markov Decision Process?

Yes, the algorithm uses a Markov Decision Process (MDP) to decide the next action based on the current state and the target position. The decide_direction_for_next_move function evaluates the possible actions and selects the one with the highest reward.

def decide_direction_for_next_move(state, target_pos):
    """Decide the direction for the next move."""
    current_pos = state['current_pos']
    current_x = current_pos['x']
    current_y = current_pos['y']
    target_x = target_pos['x']
    target_y = target_pos['y']
    direction = None

    if current_x < target_x:
        direction = get_direction_with_highest_reward(
            # Policy 1: Move right or down, avoid left and up as they will take us away from the target
            state, ["RIGHT", "DOWN"])
    elif current_x > target_x:
        direction = get_direction_with_highest_reward(
            # Policy 2: Move left or down, avoid right and up as they will take us away from the target
            state, ["LEFT", "DOWN"])
    elif current_y < target_y:
        direction = get_direction_with_highest_reward(
            # Policy 3: Move down or left/right, avoid up as it will take us away from the target
            state, ["DOWN", "LEFT", "RIGHT"])
    elif current_y > target_y:
        direction = get_direction_with_highest_reward(
            # Policy 4: Move up or left/right, avoid down as it will take us away from the target
            state, ["UP", "LEFT", "RIGHT"])

    return direction

Does it calculate and identify the optimal solution when a blocked state is encountered?

Yes, the get_direction_with_highest_reward function evaluates the possible actions (4 directions) and avoids the blocked state by assigning a low reward to it (-100, as shown in state['rewards'] dictionary). The agent will not choose to move into the blocked state as other options will have higher rewards.

def get_direction_with_highest_reward(state, allowed_actions):
    """Get the direction with the highest reward."""
    current_pos = state['current_pos']
    current_x = current_pos['x']
    current_y = current_pos['y']
    up_y = current_y - 1 if current_y > 0 else current_y
    down_y = current_y + 1 if current_y < state['rows'] - 1 else current_y
    left_x = current_x - 1 if current_x > 0 else current_x
    right_x = current_x + 1 if current_x < state['cols'] - 1 else current_x

    up_reward = state['rewards'].get(state['grid'][up_y][current_x], -1)
    down_reward = state['rewards'].get(state['grid'][down_y][current_x], -1)
    left_reward = state['rewards'].get(state['grid'][current_y][left_x], -1)
    right_reward = state['rewards'].get(state['grid'][current_y][right_x], -1)

    # Choose the direction with the highest reward among the allowed actions
    rewards_dict = {
        "UP": up_reward,
        "DOWN": down_reward,
        "LEFT": left_reward,
        "RIGHT": right_reward
    }

    allowed_rewards = {k: v for k, v in rewards_dict.items()
                       if k in allowed_actions}
    direction = max(allowed_rewards, key=allowed_rewards.get)
    reward = allowed_rewards[direction]
    return {"direction": direction, "reward": reward}

Does it calculate and identify the optimal solution when a fire state is encountered?

Yes, similar to the blocked state, the fire state is also assigned a low reward (-1000) in the state['rewards'] dictionary. The algorithm will avoid moving into the fire state by selecting other actions with higher rewards (see get_direction_with_highest_reward function above).

Does the algorithm consider reward policy while taking steps?

Yes, the state of the agent includes a reward policy according to the content of each cell in the grid, with empty cells having a reward of -1, the blocked state having a reward of -100, and the fire state having a reward of -1000. The agent will choose actions based on these rewards to maximize its total reward while avoiding negative rewards.

    "rewards": {
        "Start": 0,
        "Blocked": -100,
        "Fire": -1000,
        "Diamond": 100,
        "Move": -1
    },

Conclusion

Item Achieved Comment
Is the data structure appropriate for such an environment? βœ… See main function.
Does the algorithm take decisions based on Markov Decision Process? βœ… See decide_direction_for_next_move function.
Does it calculate and identify the optimal solution when a blocked state is encountered? βœ… See get_direction_with_highest_reward function.
Does it calculate and identify the optimal solution when a fire state is encountered? βœ… See get_direction_with_highest_reward function.
Does the algorithm consider reward policy while taking steps? βœ… See state['rewards'] dictionary.