# Coding 12: Imitation Learning++

This week we will train a deep network that learns to race in SuperTuxKart using only the camera and speedometer.

The network will take in an image and directly regress to low-level controls (steering, acceleration, brake, drift).

<img src="https://a.fsdn.com/con/app/proj/supertuxkart/screenshots/500px-Hac.jpg/max/max/1" width=512px/>

We have already completed a basic implementation of this agent, this week we will improve it
* through better data collection
* Iterative training of better and better agents

## Installation

In [None]:
!pip install PySuperTuxKart

## The Environment

The first part of the code is the same as last week's imitation learning exercise.
No need to modify any of the code - just read over the `rollout` method to see how your agent is used.


In [None]:
import pystk
import numpy as np
import random

from tqdm.notebook import tqdm


class PyTux(object):
    INITED = False

    def __init__(self, track, screen_width=128, screen_height=96):
        self.race = None
        self.config = pystk.GraphicsConfig.ld()
        self.config.screen_width = screen_width
        self.config.screen_height = screen_height
        self.track = track

        if not PyTux.INITED:
            pystk.init(self.config)
            PyTux.INITED = True

    @staticmethod
    def _magical_auto_pilot(player, track, distance=20):
        """
        This function return a magical steering, acceleration and drift values.
        This is used in the auto-pilot and meant to be hard to read ;)
        Feel free to get inspired by this if you can decipher it
        (it's probably not worth your time though)
        """
        __ = PyTux._point_on_track(player.kart.distance_down_track+distance, track)
        __ = __ - np.array(player.kart.location)
        _ = np.array(player.kart.front) - np.array(player.kart.location)
        _ = _ / max(np.linalg.norm(_), 1e-10)
        _ = np.cross([0,1,0], _)
        return lambda ___: (_.dot(__), int(___<15), abs(__.dot(_))>1)

    @staticmethod
    def _point_on_track(distance, track):
        node_idx = np.searchsorted(track.path_distance[..., 1], distance % track.path_distance[-1, 1]) % len(track.path_nodes)
        d = track.path_distance[node_idx]
        x = track.path_nodes[node_idx]
        t = (distance - d[0]) / (d[1] - d[0])
        return x[1] * t + x[0] * (1 - t)

    def clean(self):
        if self.race is not None:
            self.race.stop()
            del self.race

    def __enter__(self):
        self.clean()

        config = pystk.RaceConfig(num_kart=1, laps=1, track=self.track, step_size=0.1)
        config.players[0].controller = pystk.PlayerConfig.Controller.PLAYER_CONTROL

        self.race = pystk.Race(config)
        self.race.start()
        self.race.step()

        return self

    def __exit__(self, type, value, traceback):
        self.clean()

    def rollout(self, agent, max_frames=1000):
        """
        agent: an object that implements the act method
        max_frames: maximum number of frames to play for

        returns: tuple of (number steps, overall distance, did the agent finish)
        """
        state = pystk.WorldState()
        track = pystk.Track()

        for t in range(max_frames):
            state.update()
            track.update()

            player = state.players[0]

            # Terminate if the kart finishes a lap.
            if np.isclose(player.kart.overall_distance / track.length, 1.0, atol=2e-3):
                return t, 200 - 100 * (t + 1) / max_frames, True

            image = np.array(self.race.render_data[0].image)
            auto_pilot = self._magical_auto_pilot(player, track)
            speed = np.linalg.norm(player.kart.velocity)

            self.race.step(agent.act(image, auto_pilot, speed))
        return t, 100 * player.kart.overall_distance / track.length, False

## The Agent

This is very similar to what we saw last week, except that the `AgentWrapper` class now takes a `noise` parameter. You should use this noise to randomly modify the actions returned by the `act` method. Higher noise values should correspond to more randomness.

In [None]:
class AgentWrapper(object):
    """
    Wraps any agent to collect extra information, used for
    - collecting data
    - visualizing runs
    """
    ACTIONS = ['steer', 'acceleration', 'brake', 'drift']

    def __init__(self, agent, noise=0):
        self.agent = agent
        self.noise = noise

        self.images = list()
        self.actions = list()

    def act(self, image, auto_pilot, speed):
        action = self.agent.act(image, auto_pilot, speed)
        self.images.append(image.copy())
        self.actions.append([getattr(action, x) for x in self.ACTIONS])

        # Use self.noise to randomly change the returned action -- if noise = 0,
        # then you should return the action unchanged, and the random deviation
        # should increase as noise increases.
        
        return action

    def show(self):
        """
        Call on the last line of a cell to visualize the most recent run.
        """
        from moviepy.editor import ImageSequenceClip
        from IPython.display import display

        display(ImageSequenceClip(self.images, fps=15).ipython_display(width=512, autoplay=True, loop=True))


class Autopilot(object):
    last_drift = False
    def act(self, image, auto_pilot, speed):
        """
        Implement your autopilot here.
        """
        action = pystk.Action()
        # Let's use a magical auto-pilot function to get the correct actions
        steer, acceleration, drift = auto_pilot(speed)
        # Let's make sure the auto-pilot's actions fall within our limits
        action.steer = np.clip(steer, -1, 1)
        action.acceleration = np.clip(acceleration,0,1)
        # Cheap trick, never drift twice in a row (you'll slide out of control)
        action.drift = int(drift) and not self.last_drift
        self.last_drift = action.drift
        action.brake = 0
        return action


TRACK = 'lighthouse'
TRACK_TIME = 700

# Test out your controller!
agent = AgentWrapper(Autopilot(), noise=1)

with PyTux(TRACK) as env:
    print('Time: %d Distance: %d Success: %d' % env.rollout(agent, TRACK_TIME))

agent.show()

## Data Collection

Once again, our data collection looks pretty much the same as last week.

For every frame we have the following data
* image
* action

All we have to do is let the autopilot drive a couple times,  
and we'll save this information from every frame.

You should choose a number of episodes to collect data from (`N_EPISODES`) as well as a noise value to use in each episode. Note that it might be useful to use different noise values in different episodes.

In [None]:
from pathlib import Path
from PIL import Image


DATASET_PATH = Path('drive_data')
N_EPISODES = 20

# Remove old data.
!rm -rf $DATASET_PATH

DATASET_PATH.mkdir()

for episode in range(N_EPISODES):
    # Change the noise value here. You might find it helpful to use different
    # noise values for different episodes.
    agent = AgentWrapper(Autopilot(), noise=0.25)

    # Run one rollout.
    with PyTux(TRACK) as env:
        env.rollout(agent, TRACK_TIME)

    # Save the data.
    episode_dir = Path(DATASET_PATH) / ('%03d' % episode)
    episode_dir.mkdir()

    for i, (image, action) in enumerate(zip(agent.images, agent.actions)):
        Image.fromarray(image).save(episode_dir / ('%05d.png' % i))
        np.savetxt(episode_dir / ('%05d.csv' % i), action, delimiter=',')

## Dataset

Now that the data is collected, we'll load it using our standard SuperTuxDataset (from HW1-3).

No need to change anything here, just take a look at the `visualize_sample` code as always.

In [None]:
%matplotlib inline

import pathlib
import torch
import torchvision
import matplotlib.pyplot as plt

from PIL import Image, ImageDraw


class SuperTuxDataset(torch.utils.data.Dataset):
    def __init__(self, episode_dir, transform=torchvision.transforms.ToTensor()):
        episode_dir = pathlib.Path(episode_dir)

        self.transform = transform
        self.image_paths = list((episode_dir).glob('*.png'))

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx])
        image = self.transform(image)

        action = np.loadtxt(str(self.image_paths[idx]).replace('.png', '.csv'), delimiter=',')
        action = torch.FloatTensor(action)

        return image, action


def load_data(dataset_path, batch_size=128, transform=torchvision.transforms.ToTensor()):
    dataset = list()

    for episode_dir in pathlib.Path(dataset_path).glob('*'):
        data = SuperTuxDataset(episode_dir, transform)
        dataset.append(data)

    dataset = torch.utils.data.ConcatDataset(dataset)

    return torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)


@torch.no_grad()
def visualize_sample(image, action):
    image = (255 * image).byte().numpy().transpose(1, 2, 0)
    image = Image.fromarray(image)
    draw = ImageDraw.Draw(image)

    for i, a in enumerate(action):
        draw.text((5, 5 + 15 * i), str(a.item()), fill=(255, 0, 0))

    return np.array(image)


n_samples = 4
fig, axes = plt.subplots(1, n_samples)
fig.set_size_inches(20, 10)

data = SuperTuxDataset(DATASET_PATH / '000')

for i in range(n_samples):
    idx = np.random.randint(len(data))
    axes[i].imshow(visualize_sample(*data[idx]))

plt.show()

## Tensorboard

In [None]:
import torch.utils.tensorboard as tb

log_dir = 'dagger_log'

%load_ext tensorboard
%tensorboard --logdir {log_dir} --reload_interval 1

## Model + Training

Now we can finally define our model. The provided model architecture is sufficient to complete a lap, but you may change it if you wish.

In [None]:
import time
import torch

from torch.utils.tensorboard import SummaryWriter


# You may change this model architecture if you like. I was able to complete a
# lap with this architecture, so you don't have to change it if you prefer not
# to.
class CNNClassifier(torch.nn.Module):
    def __init__(self, c_in=3, c_out=2):
        super().__init__()
        self.norm = torch.nn.BatchNorm2d(c_in)
        self.network = torch.nn.Sequential(
            torch.nn.Conv2d(c_in, 32, 5, stride=2, padding=2),
            torch.nn.ReLU(),
            torch.nn.Conv2d(32, 64, 5, stride=2, padding=2),
            torch.nn.ReLU(),
            torch.nn.Conv2d(64, 128, 5, stride=2, padding=2),
            torch.nn.ReLU()
        )
        self.classifier = torch.nn.Linear(128, c_out)

    def forward(self, x):
        x = self.norm(x)
        x = self.network(x)
        x = x.mean((2, 3))

        return self.classifier(x)

    def act(self, image, _, speed):
        """
        Implement this.

        Remember that image comes in as a np.uint8 array, with shape (h, w, 3).
        The values range from [0-255]
        """
        image = torchvision.transforms.ToTensor()(image)[None]
        pred = self(image)[0]
        action = pystk.Action()
        action.steer = pred[0]
        action.acceleration = pred[1]
        action.drift = 0
        action.brake = 0
        return action


def train(model, device, lr=0.001, epochs=20):
    logger = tb.SummaryWriter(log_dir + '/{}'.format(time.strftime('%H-%M-%S')), flush_secs=1)

    optim = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4)
    global_step = 0

    loss_func = torch.nn.SmoothL1Loss()
    
    model.to(device)

    image = None
    for epoch in tqdm(range(epochs)):
        model.train()
        
        for image_c, action_c in data_train:
            image, action = image_c.to(device), action_c.to(device)
            pred_steer = model(image)
            
            loss = loss_func(pred_steer[..., :2], action[..., :2])
            
            optim.zero_grad()
            loss.backward()
            optim.step()

            logger.add_scalar('loss/train', loss.item(), global_step)
            global_step += 1

            # Add image visualization.
            # visualize_sample(image, action)


# Train your model.
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
data_train = load_data('drive_data', batch_size=16)

model = CNNClassifier()
model.to(device)

train(model, device)

Scroll down to the bottom of the notebook for the visualization code you can use to see how your agent performs.

# DAgger

Now we'll look at solving the same problem with DAgger. How should you implement `act` in this case?

In [None]:
class DaggerAgentWrapper(object):
    """
    Wraps any agent to collect extra information, used for
    - collecting data
    - visualizing runs
    """
    ACTIONS = ['steer', 'acceleration', 'brake', 'drift']

    def __init__(self, autopilot_agent, agent, noise=0):
        self.autopilot_agent = autopilot_agent
        self.agent = agent
        self.noise = noise
        self._rescue = 0

        self.images = list()
        self.actions = list()

    def act(self, image, auto_pilot, speed):
        action = self.autopilot_agent.act(image, auto_pilot, speed)
        self.images.append(image.copy())
        self.actions.append([getattr(action, x) for x in self.ACTIONS])

        # This is not the correct implementation for DAgger

        return action

    def show(self):
        """
        Call on the last line of a cell to visualize the most recent run.
        """
        from moviepy.editor import ImageSequenceClip
        from IPython.display import display

        display(ImageSequenceClip(self.images, fps=15).ipython_display(width=512, autoplay=True, loop=True))


# Test out your controller!
dagger_agent = DaggerAgentWrapper(Autopilot(), model, noise=1)

with PyTux(TRACK) as env:
    print('Time: %d Distance: %d Success: %d' % env.rollout(dagger_agent, TRACK_TIME))

dagger_agent.show()

In [None]:
N_DAGGER_EPISODES = 1
for _ in range(N_DAGGER_EPISODES):
    # Reset the agent.
    dagger_agent = DaggerAgentWrapper(Autopilot(), model, noise=0.8)

    # Run one rollout.
    with PyTux(TRACK) as env:
        env.rollout(dagger_agent, TRACK_TIME)

    # Save the data.
    episode += 1
    episode_dir = Path(DATASET_PATH) / ('%03d' % episode)
    episode_dir.mkdir(exist_ok=True)

    for i, (image, action) in enumerate(zip(dagger_agent.images, dagger_agent.actions)):
        Image.fromarray(image).save(episode_dir / ('%05d.png' % i))
        np.savetxt(episode_dir / ('%05d.csv' % i), action, delimiter=',')

data_train = load_data('drive_data', batch_size=16)
train(model, device, epochs=5)

## Fully Autonomous Driving

Now that all that has been taken care of, it's time to finally test out your trained model.

In [None]:
# Is this necessary?
model = model.cpu()
#model.GAIN=1.2
#model.ACCEL_GAIN=1

# Test out your model!
with PyTux(TRACK) as env:
    agent = AgentWrapper(model)
    print('Time: %d Distance Completed Percentage: %f Success: %d' % env.rollout(agent, TRACK_TIME))

agent.show()