This module contains the legacy implementation for calculating distance to driver ahead.
:func:`inject_driver_ahead` adds 'DriverAhead' and 'DistanceToDriverAhead' to the position data for all laps of all
drivers. This functionality has been replaced with :func:`fastf1.core.Telemetry.add_driver_ahead`.
The new implementation provides smoother and more accurate results. Additionally, it can be applied to arbitrary slices
of data. But it suffers from integration error when used over multiple laps. The legacy implementation has no
integration error issues.
It is recommended to use the new version. If necessary, it should be applied lap
by lap and the lap data should be concatenated afterwards. Still, the old version can be used if so desired.
The following is an example comparison plot of the legacy version and the new version. It also shows how the two
versions can be used.
.. plot::
import fastf1
import fastf1.plotting
import fastf1.legacy
import numpy as np
import matplotlib.pyplot as plt
fastf1.plotting.setup_mpl(misc_mpl_mods=False, color_scheme='fastf1')
session = fastf1.get_session(2020, 'Italy', 'R')
DRIVER = 'VER' # which driver; need to specify number and abbreviation
LAP_N = 10 # which lap number to plot
drv_laps = session.laps.pick_driver(DRIVER)
drv_lap = drv_laps[(drv_laps['LapNumber'] == LAP_N)] # select the lap
# create a matplotlib figure
fig = plt.figure()
ax = fig.add_subplot()
# ############### new
df_new = drv_lap.get_car_data().add_driver_ahead()
ax.plot(df_new['Time'], df_new['DistanceToDriverAhead'], label='new')
# ############### legacy
df_legacy = fastf1.legacy.inject_driver_ahead(session)[DRIVER_NUMBER].slice_by_lap(drv_lap)
ax.plot(df_legacy['Time'], df_legacy['DistanceToDriverAhead'], label='legacy')
import numpy as np
import pandas as pd
import scipy.spatial
from fastf1.logger import get_logger
_logger = get_logger(__name__)
"""A distance in meters which indicates the resolution of the reference
lap. This reference is used to project car positions and calculate
things like distance between cars.
def _get_reference_lap(session):
"""Find a reference lap for creating the track map in `_make_trajectory`."""
times = session.laps['LapTime'].copy()
times = times.sort_values()
for i in range(len(session.laps)):
lap = session.laps.loc[times.index[i]]
car_data = lap.get_car_data() # TODO interpolate_edges ?
if np.all(car_data['Speed'] > 0): # check for valid telemetry
return None
return lap
def _make_trajectory(session, ref_lap):
"""Create telemetry Distance
telemetry = ref_lap.telemetry
if telemetry.size != 0:
x = telemetry['X'].values
y = telemetry['Y'].values
z = telemetry['Z'].values
s = telemetry['Distance'].values
# Assuming constant speed in the last tenth
dt0_ = (ref_lap['LapTime'] - telemetry['Time'].iloc[-1]).total_seconds()
ds0_ = (telemetry['Speed'].iloc[-1] / 3.6) * dt0_
total_s = s[-1] + ds0_
# To prolong start and finish and have a correct linear interpolation
full_s = np.concatenate([s - total_s, s, s + total_s])
full_x = np.concatenate([x, x, x])
full_y = np.concatenate([y, y, y])
full_z = np.concatenate([z, z, z])
reference_s = np.arange(0, total_s, REFERENCE_LAP_RESOLUTION)
reference_x = np.interp(reference_s, full_s, full_x)
reference_y = np.interp(reference_s, full_s, full_y)
reference_z = np.interp(reference_s, full_s, full_z)
ssize = len(reference_s)
"""Build track map and project driver position to one trajectory
def fix_suzuka(projection_index, _s):
"""Yes, suzuka is bad
# For tracks like suzuka (therefore only suzuka) we have
# a beautiful crossing point. So, FOR F**K SAKE, sometimes
# shortest distance may fall below the bridge or vice-versa
# gotta do some monotony sort of check. Not the cleanest
# solution.
def moving_average(a, n=3):
ret = np.cumsum(a, dtype=float)
ret[n:] = ret[n:] - ret[:-n]
ma = ret[n - 1:] / n
return np.concatenate([ma[0:n // 2], ma, ma[-n // 2:-1]])
ma_projection = moving_average(_s[projection_index], n=3)
spikes = np.absolute(_s[projection_index] - ma_projection)
# 1000 and 3000, very suzuka specific. Damn magic numbers
sel_bridge = np.logical_and(spikes > 1000, spikes < 3000)
unexpected = np.where(sel_bridge)[0]
max_length = _s[-1]
for p in unexpected:
# Just assuming linearity for this 2 or 3 samples
last_value = _s[projection_index[p - 1]]
last_step = last_value - _s[projection_index[p - 2]]
if (last_value + last_step) > max_length:
# Over the finish line
corrected_distance = -max_length + last_step + last_value
corrected_distance = last_value + last_step
corrected_index = np.argmin(np.abs(_s - corrected_distance))
projection_index[p] = corrected_index
return projection_index
track = np.empty((ssize, 3))
track[:, 0] = reference_x
track[:, 1] = reference_y
track[:, 2] = reference_z
track_tree = scipy.spatial.cKDTree(track)
drivers_list = np.array(list(session.drivers))
stream_length = len(session.pos_data[drivers_list[0]])
dmap = np.empty((stream_length, len(drivers_list)), dtype=int)
fast_query = {'workers': 2, 'distance_upper_bound': 500}
# fast_query < Increases speed
for index, drv in enumerate(drivers_list):
if drv not in session.pos_data.keys():
_logger.warning(f"Driver {drv: >2}: No position data. "
trajectory = session.pos_data[drv][['X', 'Y', 'Z']].values
projection_index = track_tree.query(trajectory, **fast_query)[1]
# When tree cannot solve super far points means there is some
# pit shit shutdown. We can replace these index with 0
projection_index[projection_index == len(reference_s)] = 0
dmap[:, index] = fix_suzuka(projection_index.copy(), reference_s)
"""Create transform matrix to change distance point of reference
t_matrix = np.empty((ssize, ssize))
for index in range(ssize):
rref = reference_s - reference_s[index]
rref[rref <= 0] = total_s + rref[rref <= 0]
t_matrix[index, :] = rref
"""Create mask to remove distance elements when car is on track
time = session.pos_data[drivers_list[0]]['Time']
pit_mask = np.zeros((stream_length, len(drivers_list)), dtype=bool)
for driver_index, driver_number in enumerate(drivers_list):
laps = session.laps.pick_driver(driver_number)
in_pit = True
times = [[], []]
for lap_index in laps.index:
lap = laps.loc[lap_index]
if not pd.isnull(lap['PitInTime']) and not in_pit:
in_pit = True
if not pd.isnull(lap['PitOutTime']) and in_pit:
in_pit = False
if not in_pit:
# Car crashed, we put a time and 'Status' will take care
times = np.transpose(np.array(times))
for inout in times:
out_of_pit = np.logical_and(time >= inout[0], time < inout[1])
pit_mask[:, driver_index] |= out_of_pit
on_track = (session.pos_data[driver_number]['Status'] == 'OnTrack')
pit_mask[:, driver_index] &= on_track.values
"""Calculate relative distances using transform matrix
driver_ahead = {}
stream_axis = np.arange(stream_length)
for my_di, my_d in enumerate(drivers_list):
rel_distance = np.empty(np.shape(dmap))
for his_di, his_d in enumerate(drivers_list):
my_pos_i = dmap[:, my_di]
his_pos_i = dmap[:, his_di]
rel_distance[:, his_di] = t_matrix[my_pos_i, his_pos_i]
his_in_pit = ~pit_mask.copy()
his_in_pit[:, my_di] = False
my_in_pit = ~pit_mask[:, drivers_list == my_d][:, 0]
rel_distance[his_in_pit] = np.nan
closest_index = np.nanargmin(rel_distance, axis=1)
closest_distance = rel_distance[stream_axis, closest_index]
closest_driver = drivers_list[closest_index].astype(object)
closest_distance[my_in_pit] = np.nan
closest_driver[my_in_pit] = None
data = {'DistanceToDriverAhead': closest_distance,
'DriverAhead': closest_driver}
driver_ahead[my_d] = session.pos_data[my_d].join(pd.DataFrame(data), how='outer')
# no data to base calculations on; create empty results
driver_ahead = dict()
for drv in session.drivers.driver_number:
data = {'DistanceToDriverAhead': (), 'DriverAhead': ()}
driver_ahead[drv] = session.pos_data[drv].join(pd.DataFrame(data), how='outer')
return driver_ahead
def inject_driver_ahead(session):
"""Add 'DistanceToDriverAhead' and 'DriverAhead' column to position data of all drivers in session.
session: :class:`fastf1.core.Session`
A dictionary containing :class:`fastf1.core.Telemetry` for each driver. The telemetry is the same as
:any:`fastf1.core.Session.pos_data` but with 'DriverAhead' and 'DistanceToDriverAhead' columns added
to each drivers telemetry.
ref_lap = _get_reference_lap(session)
if ref_lap is not None:
driver_ahead = _make_trajectory(session, ref_lap)
raise ValueError("No valid telemetry for calculating distance to driver ahead!")
return driver_ahead