Source code for fastf1._api

import base64
import collections
import datetime
import json
import zlib
from typing import (
    Optional,
    Union
)

import numpy as np
import pandas as pd

from fastf1.internals.pandas_extensions import create_df_fast
from fastf1.logger import (
    get_logger,
    soft_exceptions
)
from fastf1.req import Cache
from fastf1.utils import (
    recursive_dict_get,
    to_datetime,
    to_timedelta
)


_logger = get_logger('api')

base_url = 'https://livetiming.formula1.com'
base_url_mirror = 'https://livetiming-mirror.fastf1.dev'

headers: dict[str, str] = {
    'Connection': 'close',
    'TE': 'identity',
    'User-Agent': 'BestHTTP',
    'Accept-Encoding': 'gzip, identity',
}

pages: dict[str, str] = {
    'session_data': 'SessionData.json',  # track + session status + lap count
    'session_info': 'SessionInfo.jsonStream',  # more rnd
    'archive_status': 'ArchiveStatus.json',  # rnd=1880327548
    'heartbeat': 'Heartbeat.jsonStream',  # Probably time synchronization?
    'audio_streams': 'AudioStreams.jsonStream',  # Link to audio commentary
    'driver_list': 'DriverList.jsonStream',  # Driver info and line story
    'extrapolated_clock': 'ExtrapolatedClock.jsonStream',  # Boolean
    'race_control_messages': 'RaceControlMessages.jsonStream',  # Flags etc
    'session_status': 'SessionStatus.jsonStream',  # Start and finish times
    'team_radio': 'TeamRadio.jsonStream',  # Links to team radios
    'timing_app_data': 'TimingAppData.jsonStream',  # Tyres and laps (juicy)
    'timing_stats': 'TimingStats.jsonStream',  # 'Best times/speed' useless
    'track_status': 'TrackStatus.jsonStream',  # SC, VSC and Yellow
    'weather_data': 'WeatherData.jsonStream',  # Temp, wind and rain
    'position': 'Position.z.jsonStream',  # Coordinates, not GPS? (.z)
    'car_data': 'CarData.z.jsonStream',  # Telemetry channels (.z)
    'content_streams': 'ContentStreams.jsonStream',  # Lap by lap feeds
    'timing_data': 'TimingData.jsonStream',  # Gap to car ahead
    'lap_count': 'LapCount.jsonStream',  # Lap counter
    'championship_prediction': 'ChampionshipPrediction.jsonStream',  # Points
    'index': 'Index.json'
}
"""Known API requests"""


[docs] def make_path(wname, wdate, sname, sdate): """Create the api path base string to append on livetiming.formula1.com for api requests. The api path base string changes for every session only. Args: wname: Weekend name (e.g. 'Italian Grand Prix') wdate: Weekend date (e.g. '2019-09-08') sname: Session name 'Qualifying' or 'Race' sdate: Session date (formatted as wdate) Returns: relative url path """ smooth_operator = f'{wdate[:4]}/{wdate} {wname}/{sdate} {sname}/' path = '/static/' + smooth_operator.replace(' ', '_') # Workaround for Brazil Qualifying on sunday (#652), TODO: fix properly path = path.replace("2024-11-03_Qualifying", "2024-11-02_Qualifying") return path
# define all empty columns for timing data EMPTY_LAPS = {'Time': pd.NaT, 'Driver': '', 'LapTime': pd.NaT, 'NumberOfLaps': np.nan, 'NumberOfPitStops': np.nan, 'PitOutTime': pd.NaT, 'PitInTime': pd.NaT, 'Sector1Time': pd.NaT, 'Sector2Time': pd.NaT, 'Sector3Time': pd.NaT, 'Sector1SessionTime': pd.NaT, 'Sector2SessionTime': pd.NaT, 'Sector3SessionTime': pd.NaT, 'SpeedI1': np.nan, 'SpeedI2': np.nan, 'SpeedFL': np.nan, 'SpeedST': np.nan, 'IsPersonalBest': False} EMPTY_STREAM = {'Time': pd.NaT, 'Driver': '', 'Position': np.nan, 'GapToLeader': np.nan, 'IntervalToPositionAhead': np.nan}
[docs] def timing_data(path: str, response: Optional[str] = None, livedata=None ) -> (pd.DataFrame, pd.DataFrame): """ .. warning:: :mod:`fastf1.api` will be considered private in future releases and potentially be removed or changed. Fetch and parse timing data. Timing data is a mixed stream of information. At a given time a packet of data may indicate position, lap time, speed trap, sector times and so on. While most of this data can be mapped lap by lap giving a readable and usable data structure (-> laps_data), other entries like position and time gaps are provided on a more frequent time base. Those values are separated and returned as a separate object (-> stream_data). .. note:: This function does not actually return "raw" API data. This is because of the need to process a mixed linear data stream into a usable object and because of frequent errors and inaccuracies in said stream. Occasionally an "educated guess" needs to be made for judging whether a value belongs to this lap or to another lap. Additionally, some values which are considered "obviously" wrong are removed from the data. This can happen with or without warnings, depending on the reason and severity. - Timestamps ('SessionTime') marking start or end of a lap are post-processed as the provided values are inaccurate. - Lap and sector times are not modified ever! They are considered as the absolute truth. If necessary, other values are adjusted to fit. Args: path: api path base string (usually ``Session.api_path``) response: api response can be passed if data was already downloaded livedata: An instance of :class:`fastf1.livetiming.data.LiveTimingData` to use as a source instead of the api Returns: **laps_data and stream_data** - laps_data (DataFrame): contains the following columns of data (one row per driver and lap) - Time (pandas.Timedelta): Session time at which the lap was set (i.e. finished) - LapTime (pandas.Timedelta): Lap time of the last finished lap (the lap in this row) - Driver (str): Driver number - NumberOfLaps (int): Number of laps driven by this driver including the lap in this row - NumberOfPitStops (int): Number of pit stops of this driver - PitInTime (pandas.Timedelta): Session time at which the driver entered the pits. Consequently, if this value is not NaT the lap in this row is an inlap. - PitOutTime (pandas.Timedelta): Session time at which the driver exited the pits. Consequently, if this value is not NaT, the lap in this row is an outlap. - Sector1/2/3Time (pandas.Timedelta): Sector times (one column for each sector time) - Sector1/2/3SessionTime (pandas.Timedelta): Session time at which the corresponding sector time was set (one column for each sector's session time) - SpeedI1/I2/FL/ST: Speed trap speeds; FL is speed at the finish line; I1 and I2 are speed traps in sector 1 and 2 respectively; ST maybe a speed trap on the longest straight (?) - stream_data (DataFrame): contains the following columns of data - Time (pandas.Timedelta): Session time at which this sample was created - Driver (str): Driver number - Position (int): Position in the field - GapToLeader (pandas.Timedelta): Time gap to leader in seconds - IntervalToPositionAhead (pandas.Timedelta): Time gap to car ahead Raises: SessionNotAvailableError: in case the F1 livetiming api returns no data """ # wraps _extended_timing_data to provide compatibility to the old return # values laps_data, stream_data, session_split_times \ = _extended_timing_data(path, response=response, livedata=livedata) return laps_data, stream_data
@Cache.api_request_wrapper def _extended_timing_data(path, response=None, livedata=None): # extended over the documentation of ``timing_data``: # - returns session_split_times for splitting Q1/Q2/Q3 additionally # possible optional sanity checks (TODO, maybe): # - inlap has to be followed by outlap # - pit stops may never be negative (missing outlap) # - speed traps against telemetry (especially in Q FastLap - Slow Lap) if livedata is not None and livedata.has('TimingData'): response = livedata.get('TimingData') elif response is None: # no previous response provided _logger.info("Fetching timing data...") response = fetch_page(path, 'timing_data') if response is None: # no response received raise SessionNotAvailableError( "No data for this session! If this session only finished " "recently, please try again in a few minutes." ) _logger.info("Parsing timing data...") # split up response per driver for easier iteration and processing later resp_per_driver = dict() for entry in response: if (len(entry) < 2) or 'Lines' not in entry[1]: continue for drv in entry[1]['Lines']: if drv not in resp_per_driver.keys(): resp_per_driver[drv] = [(entry[0], entry[1]['Lines'][drv])] else: resp_per_driver[drv].append((entry[0], entry[1]['Lines'][drv])) # create empty data dicts and populate them with data from all drivers after that laps_data = {key: list() for key, val in EMPTY_LAPS.items()} stream_data = {key: list() for key, val in EMPTY_STREAM.items()} session_split_times = [datetime.timedelta(days=1), ] * 3 for drv in resp_per_driver.keys(): drv_laps_data, drv_session_split_times \ = _laps_data_driver(resp_per_driver[drv], EMPTY_LAPS, drv) drv_stream_data = _stream_data_driver(resp_per_driver[drv], EMPTY_STREAM, drv) if (drv_laps_data is None) or (drv_stream_data is None): continue for i, split_time in enumerate(drv_session_split_times): session_split_times[i] = min(drv_session_split_times[i], session_split_times[i]) for key in EMPTY_LAPS.keys(): laps_data[key].extend(drv_laps_data[key]) for key in EMPTY_STREAM.keys(): stream_data[key].extend(drv_stream_data[key]) laps_data = pd.DataFrame(laps_data) stream_data = pd.DataFrame(stream_data) _align_laps(laps_data, stream_data) # pandas doesn't correctly infer bool dtype columns, set type explicitly laps_data[['IsPersonalBest']] = laps_data[['IsPersonalBest']].astype(bool) return laps_data, stream_data, session_split_times @soft_exceptions("lap alignment", "Failed to align laps between drivers!", logger=_logger) def _align_laps(laps_data, stream_data): # align lap start and end times between drivers based on Gap to leader # TODO: it should be possible to align based on different laps if not pd.isnull(stream_data['GapToLeader']).all(): expected_gap = dict() delta = dict() leader = None max_delta = None # try to align on the first lap where usable data is available # ideally, this is the end of the first lap offset = -1 # start at -1 so that value it is zero on first iteration max_offset = ( laps_data.loc[:, ('Driver', 'NumberOfLaps')].groupby('Driver') .max()['NumberOfLaps'] # find max lap count for each driver .min() # find the smallest max lap count (first retirement) - 1 # subtract one, because offset counts from zero ) while leader is None: offset += 1 if offset >= max_offset: _logger.warning('Skipping lap alignment (no suitable lap)!') return # find the leader after the first usable lap and get the expected # gaps to the leader for all other drivers if not pd.isnull( laps_data.loc[laps_data['NumberOfLaps'] == (offset + 1)] .loc[:, 'PitInTime']).all(): # cannot align on laps where one or more drivers pit, therefore # skip and try next one continue for drv in laps_data['Driver'].unique(): try: gap_str = _get_gap_str_for_drv(drv, offset, laps_data, stream_data) if 'LAP' in gap_str: leader = drv else: expected_gap[drv] = to_timedelta(gap_str) except IndexError: expected_gap[drv] = None # find the greatest delta between actual gap and currently calculated # gap leader_time \ = laps_data[laps_data['Driver'] == leader].iloc[offset]['Time'] for drv in expected_gap.keys(): if expected_gap[drv] is None: delta[drv] = None continue other_time \ = laps_data[laps_data['Driver'] == drv].iloc[offset]['Time'] is_gap = other_time - leader_time delta[drv] = expected_gap[drv] - is_gap if (max_delta is None) or (delta[drv] > max_delta): max_delta = delta[drv] # Subtract the maximum delta from all leader timestamps. # It is impossible that the data was received too early, which in turn # means that it must have been received too late if the delta is # greater than zero if max_delta > datetime.timedelta(0): max_delta = datetime.timedelta(0) laps_data.loc[laps_data['Driver'] == leader, 'Time'] -= max_delta # Subtract the delta between actual gap and currently calculated gap # from each drivers timestamps to align them. Correct for the max # delta shift of the timestamps of the leader. for drv in delta.keys(): if delta[drv] is None: continue laps_data.loc[laps_data['Driver'] == drv, 'Time'] \ -= (max_delta - delta[drv]) def _get_gap_str_for_drv(drv, idx, laps_data, stream_data): first_time = laps_data[laps_data['Driver'] == drv].iloc[idx]['Time'] ref_idx = (stream_data[stream_data['Driver'] == drv]['Time'] - first_time).abs().idxmin() gap_str = stream_data.loc[ref_idx]['GapToLeader'] return gap_str def _laps_data_driver(driver_raw, empty_vals, drv): """ .. warning:: :mod:`fastf1.api` will be considered private in future releases and potentially be removed or changed. Data is on a per-lap basis. Boolean flag 'PitOut' is not evaluated. Meaning is unknown and flag is only sometimes present when a car leaves the pits. Params: driver_raw (list): raw api response for this driver only [(Timestamp, data), (...), ...] empty_vals (dict): dictionary of column names and empty column values drv (str): driver identifier Returns: dictionary of laps data for this driver """ integrity_errors = list() # do a quick first pass over the data to find out when laps start and end # this is needed so we can work with a more efficient "look ahead" on the main pass # example: we can have 'PitOut' 0.01s before a new lap starts, but 'PitOut' belongs to the new lap, not the old one lapcnt = 0 # we're keeping two separate lap counts because sometimes the api has a non existent lap too much... api_lapcnt = 0 # ...at the beginning; we can correct that though; # api_lapcnt does not count backwards even if the source data does in_past = False # flag for when the data went back in time out_of_pit = False # flag set to true when driver drives out FOR THE FIRST TIME; stays true from then on # entries are prefilled with empty values and only overwritten if they exist in the response line drv_data = {key: [val, ] for key, val in empty_vals.items()} for time, resp in driver_raw: # the first three ifs are just edge case handling for the rare sessions were the data goes back in time if in_past and 'NumberOfLaps' in resp and resp['NumberOfLaps'] == api_lapcnt: in_past = False # we're back in the present if 'NumberOfLaps' in resp and ((prev_lapcnt := resp['NumberOfLaps']) < api_lapcnt): _logger.warning(f"Driver {drv: >2}: Ignoring late data for a " f"previously processed lap.The data may contain " f"errors (previous: {prev_lapcnt}; " f"current {lapcnt})") in_past = True continue if in_past: # still in the past, just continue and ignore everything continue if ('InPit' in resp) and (resp['InPit'] is False): out_of_pit = True # drove out of the pits for the first time # new lap; create next row if 'NumberOfLaps' in resp and resp['NumberOfLaps'] > api_lapcnt: api_lapcnt += 1 # make sure the car actually drove out of the pits already; it can't be a new lap if it didn't if out_of_pit: drv_data['Time'][lapcnt] = to_timedelta(time) lapcnt += 1 # append a new empty row; last row may not be populated (depending on session) and may be removed later for key, val in empty_vals.items(): drv_data[key].append(val) # now, do the main pass where all the other data is actually filled in # same counters and flags as before, reset them lapcnt = 0 # we're keeping two separate lap counts because sometimes the api has a non existent lap too much... api_lapcnt = 0 # ...at the beginning; we can correct that though; # api_lapcnt does not count backwards even if the source data does in_past = False # flag for when the data went back in time personal_best_lap_times = list() session_split_times = [datetime.timedelta(0)] # start times of (sub)sessions (Q1, Q2, Q3) pitstops = -1 # start with -1 because first is out lap, needs to be zero after that # iterate through the data; new lap triggers next row in data for time, resp in driver_raw: # the first three ifs are just edge case handling for the rare sessions were the data goes back in time if in_past and 'NumberOfLaps' in resp and resp['NumberOfLaps'] == api_lapcnt: in_past = False # we're back in the present if in_past or ('NumberOfLaps' in resp and resp['NumberOfLaps'] < api_lapcnt): in_past = True continue # values which are up to five seconds late are still counted towards the previous lap # (sector times, speed traps and lap times) lap_offset = 0 if (lapcnt > 0) and (to_timedelta(time) - drv_data['Time'][lapcnt - 1] < pd.Timedelta(5, 's')): lap_offset = 1 if 'Sectors' in resp and isinstance(resp['Sectors'], dict): # sometimes it's a list but then it never contains values... for sn, sector, sesst in (('0', 'Sector1Time', 'Sector1SessionTime'), ('1', 'Sector2Time', 'Sector2SessionTime'), ('2', 'Sector3Time', 'Sector3SessionTime')): if val := recursive_dict_get(resp, 'Sectors', sn, 'Value'): drv_data[sector][lapcnt - lap_offset] = to_timedelta(val) drv_data[sesst][lapcnt - lap_offset] = to_timedelta(time) if val := recursive_dict_get(resp, 'LastLapTime', 'Value'): # if 'LastLapTime' is received less than five seconds after the start of a new lap, it is still added # to the last lap val = to_timedelta(val) if val.total_seconds() < 150: # laps which are longer than 150 seconds are ignored; usually this is the case between Q1, Q2 and Q3 # because all three qualifying sessions are one session here. Those timestamps are often wrong and # sometimes associated with the wrong lap drv_data['LapTime'][lapcnt - lap_offset] = val if 'Speeds' in resp: for trapkey, trapname in (('I1', 'SpeedI1'), ('I2', 'SpeedI2'), ('FL', 'SpeedFL'), ('ST', 'SpeedST')): if val := recursive_dict_get(resp, 'Speeds', trapkey, 'Value'): # speed has to be float because int does not support NaN if trapkey == 'ST': # the ST trap value can occur early enough in a new lap # that it needs to be excluded from the usual offset # logic, therefore the offset is ignored here drv_data[trapname][lapcnt] = float(val) else: drv_data[trapname][lapcnt - lap_offset] = float(val) if 'InPit' in resp: # 'InPit': True is received once when entering pits, False is received once when leaving if resp['InPit'] is True: if pitstops >= 0: drv_data['PitInTime'][lapcnt] = to_timedelta(time) elif ((('NumberOfLaps' in resp) and resp['NumberOfLaps'] > api_lapcnt) or (drv_data['Time'][lapcnt] - to_timedelta(time)) < pd.Timedelta(5, 's')): # same response line as beginning of next lap # or beginning of next lap less than 5 seconds away drv_data['PitOutTime'][lapcnt + 1] = to_timedelta(time) # add to next lap pitstops += 1 else: drv_data['PitOutTime'][lapcnt] = to_timedelta(time) # add to current lap pitstops += 1 # Get save information about personal best lap times at the timestamp # at which this information was received. # Whenever a lap is deleted (if that happens quickly after it was set), # the previous 'BestLapTime' value is sent again. There is some extra # logic at then end that correctly marks personal best laps based on # the data that is saved here. if val := recursive_dict_get(resp, 'BestLapTime', 'Value'): personal_best_lap_times.append( (to_timedelta(time), to_timedelta(val)) ) # Create approximate (sub)session (i.e. quali) split times by # (mis)using the session number counter from 'BestLapTimes'. # (Note: those lap times cannot be used for correct personal best # detection, because the previous value is not resent here when a lap # is deleted.) if (val := resp.get('BestLapTimes')) and isinstance(val, dict): session_n = int(list(val.keys())[0]) if (session_n + 1) > len(session_split_times): session_split_times.append(to_timedelta(time)) # new lap; create next row if 'NumberOfLaps' in resp and resp['NumberOfLaps'] > api_lapcnt: api_lapcnt += 1 # make sure the car actually drove out of the pits already; it can't be a new lap if it didn't if pitstops >= 0: drv_data['Time'][lapcnt] = to_timedelta(time) drv_data['NumberOfLaps'][lapcnt] = lapcnt + 1 # don't use F1's lap count; ours is better drv_data['NumberOfPitStops'][lapcnt] = pitstops drv_data['Driver'][lapcnt] = drv lapcnt += 1 if lapcnt == 0: # no data at all for this driver return None, None # done reading the data, do postprocessing def data_in_lap(lap_n): relevant = ('Sector1Time', 'Sector2Time', 'Sector3Time', 'SpeedI1', 'SpeedI2', 'SpeedFL', 'SpeedST', 'LapTime') for col in relevant: if not pd.isnull(drv_data[col][lap_n]): return True return False # 'NumberOfLaps' always introduces a new lap (can be a previous one) but sometimes there is one more lap at the end # in this case the data will be added as usual above, lap count and pit stops are added here and the 'Time' is # calculated below from sector times if data_in_lap(lapcnt): drv_data['NumberOfLaps'][lapcnt] = lapcnt + 1 drv_data['NumberOfPitStops'][lapcnt] = pitstops drv_data['Driver'][lapcnt] = drv else: # if there was no more data after the last lap count increase, # delete the last empty record for key in drv_data.keys(): drv_data[key] = drv_data[key][:-1] if not data_in_lap(0): # remove first lap if there's no data; # "pseudo outlap" that didn't exist for key in drv_data.keys(): drv_data[key] = drv_data[key][1:] drv_data['NumberOfLaps'] = list(map(lambda n: n - 1, drv_data['NumberOfLaps'])) # reduce each lap count by one if not drv_data['Time']: # ensure that there is still data left after potentially removing a lap return drv_data, session_split_times for i in range(len(drv_data['Time'])): sector_sum = datetime.timedelta(0) na_sectors = list() # list of keys for missing sector times for key in ('Sector1Time', 'Sector2Time', 'Sector3Time'): st = drv_data[key][i] if pd.isna(st): na_sectors.append(key) continue sector_sum += st # check for incorrect lap times and remove them # fixes GH#167 among others if sector_sum > drv_data['LapTime'][i]: drv_data['LapTime'][i] = pd.NaT integrity_errors.append(i + 1) if i == 0: # only do following corrections for 2nd lap and onwards continue # The API only sends and update if a state changes, therefore, if two # lap times or sector times are exactly equal, the second value will # be missing. Missing sector times and lap times are calculated here # based on the available values for a lap (max one may be missing). If # the calculated value matches the previous value, it will be set. # lap time is missing if (not na_sectors) and pd.isna(drv_data['LapTime'][i]) \ and (drv_data['LapTime'][i - 1] == sector_sum): drv_data['LapTime'][i] = sector_sum # one sector time is missing elif (len(na_sectors) == 1) and not pd.isna(drv_data['LapTime'][i]): # create a list with the two keys for available sector times ref_sec = ['Sector1Time', 'Sector2Time', 'Sector3Time'] ref_sec.remove(na_sectors[0]) if (sec1 := (drv_data['LapTime'][i] - drv_data[ref_sec[0]][i] - drv_data[ref_sec[1]][i])) \ == drv_data[na_sectors[0]][i - 1]: drv_data[na_sectors[0]][i] = sec1 # lap time sync; check which sector time was triggered with the lowest latency # Sector3SessionTime == end of lap # Sector2SessionTime + Sector3Time == end of lap # Sector1SessionTime + Sector2Time + Sector3Time == end of lap # all of these three have slightly different times; take earliest one -> most exact because can't trigger too early for i in range(len(drv_data['Time'])): sector_sum = pd.Timedelta(0) min_time = drv_data['Time'][i] for sector_time, session_time in ((pd.Timedelta(0), drv_data['Sector3SessionTime'][i]), (drv_data['Sector3Time'][i], drv_data['Sector2SessionTime'][i]), (drv_data['Sector2Time'][i], drv_data['Sector1SessionTime'][i])): if pd.isnull(session_time): continue if pd.isnull(sector_time): break # need to stop here because else the sector sum will be incorrect sector_sum += sector_time new_time = session_time + sector_sum if not pd.isnull(new_time) and (new_time < min_time or pd.isnull(min_time)): min_time = new_time if i > 0 and min_time < drv_data['Time'][i - 1]: integrity_errors.append(i + 1) # not be possible if sector times and lap time are correct continue drv_data['Time'][i] = min_time # last lap needs to be removed if it does not have a 'Time' and it could not be calculated (likely an inlap) if pd.isnull(drv_data['Time'][-1]): if not pd.isnull(drv_data['PitInTime'][-1]): drv_data['Time'][-1] = drv_data['PitInTime'][-1] else: for key in drv_data.keys(): drv_data[key] = drv_data[key][:-1] if not drv_data['Time']: # ensure that there is still data left after potentially removing a lap return drv_data, session_split_times # more lap sync, this time check which lap triggered with the lowest latency for i in range(len(drv_data['Time']) - 1, 0, -1): if (new_time := drv_data['Time'][i] - drv_data['LapTime'][i]) < \ drv_data['Time'][i - 1]: if i > 1 and new_time < drv_data['Time'][i - 2]: integrity_errors.append(i + 1) # not be possible if sector times and lap time are correct else: drv_data['Time'][i - 1] = new_time # need to go both directions once to make everything match up; also recalculate sector times for i in range(len(drv_data['Time']) - 1): if any(pd.isnull(tst) for tst in ( drv_data['Time'][i], drv_data['LapTime'][i + 1], drv_data['Sector1Time'][i + 1], drv_data['Sector2Time'][i + 1], drv_data['Sector3Time'][i + 1])): continue # lap not usable, missing critical values if (new_time := drv_data['Time'][i] + drv_data['LapTime'][i+1]) \ < drv_data['Time'][i+1]: drv_data['Time'][i+1] = new_time if (new_s1_time := drv_data['Time'][i] + drv_data['Sector1Time'][i+1]) \ < drv_data['Sector1SessionTime'][i+1]: drv_data['Sector1SessionTime'][i+1] = new_s1_time if (new_s2_time := drv_data['Time'][i] + drv_data['Sector1Time'][i+1] + drv_data['Sector2Time'][i+1]) \ < drv_data['Sector2SessionTime'][i+1]: drv_data['Sector2SessionTime'][i+1] = new_s2_time if (new_s3_time := drv_data['Time'][i] + drv_data['Sector1Time'][i+1] + drv_data['Sector2Time'][i+1] + drv_data['Sector3Time'][i+1]) \ < drv_data['Sector3SessionTime'][i+1]: drv_data['Sector3SessionTime'][i+1] = new_s3_time # Iterate over list of personal lap times set 'IsPersonalBest'. # When a lap is deleted, the API resends the previous personal best. # Therefore, by iterating in reverse, if any lap is encountered that is # quicker than already processed personal best lap times, it must have # been deleted. # This is just best effort but not exhaustive as it can only handle lap # times that were deleted quickly (before the next personal best was set). _corrected_personal_best_lap_times = list() # list is only used for backreference within the loop cur_sn = len(session_split_times) - 1 # current (sub)session number, personal best lap times need to be # considered for each (sub)session individually for time, pb_lap_time in reversed(personal_best_lap_times): if time < session_split_times[cur_sn]: # transitioned into the previous (sub)session (reverse iteration!) # reset the reference list, so time are considered individually cur_sn -= 1 _corrected_personal_best_lap_times = list() if _corrected_personal_best_lap_times: if pb_lap_time in _corrected_personal_best_lap_times: continue elif pb_lap_time < min(_corrected_personal_best_lap_times): continue _corrected_personal_best_lap_times.append(pb_lap_time) # find the index of the corresponding lap by comparing with the lap # times and set 'IsPersonalBest' to True for that lap try: pb_idx = drv_data['LapTime'].index(pb_lap_time) except ValueError: # one example case where this error occurs, are wildly of personal # best times (>2 min lap time) that are sometimes present and # which have no corresponding lap time pass else: drv_data['IsPersonalBest'][pb_idx] = True # fix the number of pit stops; due to potentially multiple laps to the grid # where a car goes through the pit lane before finally taking its place # on the grid, the number of pit stops on the first lap may be already # greater than zero; therefore, apply correction so that we start with zero pitstop_offset = drv_data['NumberOfPitStops'][0] for i in range(len(drv_data['NumberOfPitStops'])): drv_data['NumberOfPitStops'][i] -= pitstop_offset # fix first lap PitInTime; same reason as above for pit stops, there may # be an incorrect PitInTime on the first lap. There always is a PitOutTime # for when the car leaves the box for the lap to the grid. There is a # PitInTime if the car drives multiple laps to the grid, discard these. # There is also a PitInTime if the car actually pits at the end of the # first lap, those need to be kept. if drv_data['PitInTime'][0] < drv_data['PitOutTime'][0]: drv_data['PitInTime'][0] = pd.NaT if integrity_errors: _logger.warning( f"Driver {drv: >2}: Encountered {len(integrity_errors)} timing " f"integrity error(s) near lap(s): {integrity_errors}.\n" f"This might be a bug and should be reported.") return drv_data, session_split_times def _stream_data_driver(driver_raw, empty_vals, drv): """ .. warning:: :mod:`fastf1.api` will be considered private in future releases and potentially be removed or changed. Data is on a timestamp basis. Params: driver_raw (list): raw api response for this driver only [(Timestamp, data), (...), ...] empty_vals (dict): dictionary of column names and empty column values drv (str): driver identifier Returns: dictionary of timing stream data for this driver """ # entries are prefilled with empty or previous values and only overwritten if they exist in the response line # basically interpolation by filling up with last known value because not every value is in every response drv_data = {key: [val, ] for key, val in empty_vals.items()} i = 0 # iterate through the data; timestamp + any of the values triggers new row in data for time, resp in driver_raw: new_entry = False if val := recursive_dict_get(resp, 'Position'): drv_data['Position'][i] = int(val) new_entry = True if val := recursive_dict_get(resp, 'GapToLeader'): drv_data['GapToLeader'][i] = val new_entry = True if val := recursive_dict_get(resp, 'IntervalToPositionAhead', 'Value'): drv_data['IntervalToPositionAhead'][i] = val new_entry = True # at least one value was present, create next row if new_entry: drv_data['Time'][i] = to_timedelta(time) drv_data['Driver'][i] = drv i += 1 # create next row of data from the last values; there will always be one row too much at the end which is # removed again for key, val in empty_vals.items(): drv_data[key].append(drv_data[key][-1]) for key in drv_data.keys(): drv_data[key] = drv_data[key][:-1] # remove very last row again return drv_data
[docs] @Cache.api_request_wrapper def timing_app_data(path, response=None, livedata=None): """ .. warning:: :mod:`fastf1.api` will be considered private in future releases and potentially be removed or changed. Fetch and parse 'timing app data'. Timing app data provides the following data channels per sample: - LapNumber (float or nan): Current lap number - Driver (str): Driver number - LapTime (pandas.Timedelta or None): Lap time of last lap - Stint (int): Counter for the number of driven stints - TotalLaps (float or nan): Total number of laps driven on this set of tires (includes laps driven in other sessions!) - Compound (str or None): Tire compound - New (bool or None): Whether the tire was new when fitted - TyresNotChanged (int or None): ??? Probably a flag to mark pit stops without tire changes - Time (pandas.Timedelta): Session time - LapFlags (float or nan): ??? unknown - LapCountTime (None or ???): ??? unknown; no data - StartLaps (float or nan): ??? Tire age when fitted (same as 'TotalLaps' in the same sample?!?) - Outlap (None or ???): ??? unknown; no data Only a few values are present per timestamp. Somewhat comprehensive information can therefore only be obtained by aggregating data (usually over the course of one lap). Some values are sent even less frequently (for example 'Compound' only after tire changes). Args: path (str): api path base string (usually ``Session.api_path``) response: Response as returned by :func:`fetch_page` can be passed if it was downloaded already. livedata: An instance of :class:`fastf1.livetiming.data.LiveTimingData` to use as a source instead of the api Returns: A DataFrame containing one column for each data channel listed above. Raises: SessionNotAvailableError: in case the F1 livetiming api returns no data """ if livedata is not None and livedata.has('TimingAppData'): response = livedata.get('TimingAppData') elif response is None: # no previous response provided _logger.info("Fetching timing app data...") response = fetch_page(path, 'timing_app_data') if response is None: # no response received raise SessionNotAvailableError( "No data for this session! If this session only finished " "recently, please try again in a few minutes." ) data = {'LapNumber': [], 'Driver': [], 'LapTime': [], 'Stint': [], 'TotalLaps': [], 'Compound': [], 'New': [], 'TyresNotChanged': [], 'Time': [], 'LapFlags': [], 'LapCountTime': [], 'StartLaps': [], 'Outlap': []} for entry in response: if (len(entry) < 2) or 'Lines' not in entry[1]: continue time = to_timedelta(entry[0]) row = entry[1] for driver_number in row['Lines']: if update := recursive_dict_get(row, 'Lines', driver_number, 'Stints'): for stint_number, stint in enumerate(update): if isinstance(update, dict): stint_number = int(stint) stint = update[stint] for key in data: if key in stint: val = stint[key] if key == 'LapTime': val = to_timedelta(val) elif key == 'New': val = True if val == 'true' else False data[key].append(val) else: data[key].append(None) for key in stint: if key not in data: _logger.debug(f"Found unknown key in timing app " f"data: {key}") data['Time'][-1] = time data['Driver'][-1] = driver_number data['Stint'][-1] = stint_number return pd.DataFrame(data)
[docs] @Cache.api_request_wrapper def car_data(path, response=None, livedata=None): """ .. warning:: :mod:`fastf1.api` will be considered private in future releases and potentially be removed or changed. Fetch and parse car data. Car data provides the following data channels per sample: - Time (pandas.Timedelta): session timestamp (time only); inaccurate, has duplicate values; use Date instead - Date (pandas.Timestamp): timestamp for this sample as Date + Time; more or less exact - Speed (int): Km/h - RPM (int) - Gear (int): [called 'nGear' in the data!] - Throttle (int): 0-100% - Brake (bool) - DRS (int): 0-14 (Odd DRS is Disabled, Even DRS is Enabled?) (More Research Needed?) - 0 = Off - 1 = Off - 2 = (?) - 3 = (?) - 8 = Detected, Eligible once in Activation Zone (Noted Sometimes) - 10 = On (Unknown Distinction) - 12 = On (Unknown Distinction) - 14 = On (Unknown Distinction) - Source (str): Indicates the source of a sample; 'car' for all values here The data stream has a sample rate of (usually) 240ms. The samples from the data streams for position data and car data do not line up. Resampling/interpolation is required to merge them. Args: path (str): api path base string (usually ``Session.api_path``) response: Response as returned by :func:`fetch_page` can be passed if it was downloaded already. livedata: An instance of :class:`fastf1.livetiming.data.LiveTimingData` to use as a source instead of the api Returns: | A dictionary containing one pandas DataFrame per driver. Dictionary keys are the driver's numbers as string (e.g. '16'). You should never assume that a number exists! | Each dataframe contains one column for each data channel as listed above Raises: SessionNotAvailableError: in case the F1 livetiming api returns no data """ # data recorded from live timing has a slightly different structure is_livedata = False # flag to indicate live timing data if livedata is not None and livedata.has('CarData.z'): response = livedata.get('CarData.z') is_livedata = True elif response is None: _logger.info("Fetching car data...") response = fetch_page(path, 'car_data') if response is None: # no response received raise SessionNotAvailableError( "No data for this session! If this session only finished " "recently, please try again in a few minutes." ) _logger.info("Parsing car data...") numeric_channels = ['RPM', 'Speed', 'nGear', 'Throttle', 'DRS'] bool_channels = ['Brake'] columns = ['Time', 'Date', 'RPM', 'Speed', 'nGear', 'Throttle', 'Brake', 'DRS', 'Source'] # correct order required! ts_length = 12 # length of timestamp: len('00:00:00:000') data = dict() decode_error_count = 0 for record in response: try: if is_livedata: time = to_timedelta(record[0]) jrecord: dict = parse(record[1], zipped=True) else: time = to_timedelta(record[:ts_length]) jrecord: dict = parse(record[ts_length:], zipped=True) for entry in jrecord['Entries']: # date format is '2020-08-08T09:45:03.0619797Z' with a varying # number of millisecond decimal points # always remove last char ('z'), max len 26, right pad to len # 26 with zeroes if shorter date = to_datetime(entry['Utc']) for drv in entry['Cars']: if drv not in data: # initialize dict entry for this driver data[drv] = list() try: rpm = entry['Cars'][drv]['Channels']['0'] speed = entry['Cars'][drv]['Channels']['2'] ngear = entry['Cars'][drv]['Channels']['3'] throttle = entry['Cars'][drv]['Channels']['4'] brake = entry['Cars'][drv]['Channels']['5'] drs = entry['Cars'][drv]['Channels']['45'] except KeyError: continue data[drv].append((time, date, rpm, speed, ngear, throttle, brake, drs, 'car')) except Exception: # too risky to specify an exception: unexpected invalid data! decode_error_count += 1 continue if decode_error_count > 0: _logger.warning(f"Car data: failed to decode {decode_error_count} " f"messages ({len(response)} messages total)") # create one dataframe per driver and check for the longest dataframe most_complete_ref = None for drv in data: arr_all = np.array(data[drv]) time = arr_all[:, 0].astype('timedelta64[ns]') date = arr_all[:, 1].astype('datetime64[ns]') rpm = arr_all[:, 2].astype('int64') speed = arr_all[:, 3].astype('int64') ngear = arr_all[:, 4].astype('int64') throttle = arr_all[:, 5].astype('int64') brake = arr_all[:, 6].astype('int64') # converted to bool later drs = arr_all[:, 7].astype('int64') source = arr_all[:, 8].astype('object') data[drv] = create_df_fast( arrays=[time, date, rpm, speed, ngear, throttle, brake, drs, source], columns=columns ) if (most_complete_ref is None) \ or (len(data[drv]['Date']) > len(most_complete_ref)): most_complete_ref = data[drv]['Date'] for drv in data: # if everything is well, all dataframes should have the same length # and no postprocessing is necessary if len(data[drv]['Date']) < len(most_complete_ref): # there is missing data for this driver # extend the Date column and fill up missing telemetry values with # zero, except Time which is left as NaT and will be calculated # correctly based on Session.t0_date anyway when creating Telemetry # instances in Session.load_telemetry data[drv] = data[drv] \ .merge(most_complete_ref, how='outer') \ .sort_values(by='Date') \ .reset_index(drop=True) _logger.warning(f"Driver {drv: >2}: Car data is incomplete!") # ensure that brake data is 'boolean-compatible' in case that this is # ever changed _unique_brake_values = data[drv].loc[:, 'Brake'].unique() if ((_unique_brake_values > 0) & (_unique_brake_values < 100)).any(): _logger.warning(f"Driver {drv: >2}: Raw brake data contains " f"non-boolean values!") # convert to correct datatypes data[drv][numeric_channels] = \ data[drv].loc[:, numeric_channels] \ .fillna(value=0, inplace=False) \ .astype('int64') data[drv][bool_channels] = \ data[drv].loc[:, bool_channels] \ .fillna(value=False, inplace=False) \ .astype('bool') return data
[docs] @Cache.api_request_wrapper def position_data(path, response=None, livedata=None): """ .. warning:: :mod:`fastf1.api` will be considered private in future releases and potentially be removed or changed. Fetch and parse position data. Position data provides the following data channels per sample: - Time (pandas.Timedelta): session timestamp (time only); inaccurate, has duplicate values; use Date instead - Date (pandas.Timestamp): timestamp for this sample as Date + Time; more or less exact - Status (str): 'OnTrack' or 'OffTrack' - X, Y, Z (int): Position coordinates; starting from 2020 the coordinates are given in 1/10 meter - Source (str): Indicates the source of a sample; 'pos' for all values here The data stream has a sample rate of (usually) 220ms. The samples from the data streams for position data and car data do not line up. Resampling/interpolation is required to merge them. Args: path (str): api path base string (usually ``Session.api_path``) response: Response as returned by :func:`fetch_page` can be passed if it was downloaded already. livedata: An instance of :class:`fastf1.livetiming.data.LiveTimingData` to use as a source instead of the api Returns: | A dictionary containing one pandas DataFrame per driver. Dictionary keys are the driver's numbers as string (e.g. '16'). You should never assume that a number exists! | Each dataframe contains one column for each data channel as listed above Raises: SessionNotAvailableError: in case the F1 livetiming api returns no data """ # data recorded from live timing has a slightly different structure is_livedata = False # flag to indicate live timing data if livedata is not None and livedata.has('Position.z'): response = livedata.get('Position.z') is_livedata = True elif response is None: _logger.info("Fetching position data...") response = fetch_page(path, 'position') if response is None: # no response received raise SessionNotAvailableError( "No data for this session! If this session only finished " "recently, please try again in a few minutes." ) _logger.info("Parsing position data...") if not response: return {} ts_length = 12 # length of timestamp: len('00:00:00:000') columns = ['Time', 'Date', 'Status', 'X', 'Y', 'Z', 'Source'] # correct order required! data = dict() decode_error_count = 0 for record in response: try: if is_livedata: time = record[0] jrecord: dict = parse(record[1], zipped=True) else: time = to_timedelta(record[:ts_length]) jrecord: dict = parse(record[ts_length:], zipped=True) for sample in jrecord['Position']: # date format is '2020-08-08T09:45:03.0619797Z' with a varying # number of millisecond decimal points # always remove last char ('z'), max len 26, right pad to len # 26 with zeroes if shorter date = to_datetime(sample['Timestamp']) for drv in sample['Entries']: if drv not in data: # initialize dict entry for this driver data[drv] = list() try: x = sample['Entries'][drv]['X'] y = sample['Entries'][drv]['Y'] z = sample['Entries'][drv]['Z'] except KeyError: continue try: status = sample['Entries'][drv]['Status'] except KeyError: status = None if str(status).isdigit(): # Fallback on older api status mapping and convert status = 'OffTrack' if int(status) else 'OnTrack' data[drv].append((time, date, status, x, y, z, 'pos')) except Exception: # too risky to specify an exception: unexpected invalid data! decode_error_count += 1 continue if decode_error_count > 0: _logger.warning( f"Position data: failed to decode {decode_error_count} " f"messages ({len(response)} messages total)") # create one dataframe per driver and check for the longest dataframe most_complete_ref = None for drv in data: arr_all = np.array(data[drv]) time = arr_all[:, 0].astype('timedelta64[ns]') date = arr_all[:, 1].astype('datetime64[ns]') status = arr_all[:, 2].astype('object') x = arr_all[:, 3].astype('int64') y = arr_all[:, 4].astype('int64') z = arr_all[:, 5].astype('int64') source = arr_all[:, 6].astype('object') data[drv] = create_df_fast( arrays=[time, date, status, x, y, z, source], columns=columns ) # check length of dataframe; sometimes there can be missing data if (most_complete_ref is None) \ or (len(data[drv]['Date']) > len(most_complete_ref)): most_complete_ref = data[drv]['Date'] # if everything is well, all dataframes should have the same length and no # postprocessing is necessary for drv in data: if len(data[drv]['Date']) < len(most_complete_ref): # there is missing data for this driver # extend the Date column and fill up missing telemetry values with # zero, except Time which is left as NaT and will be calculated # correctly based on Session.t0_date anyway when creating Telemetry # instances in Session.load_telemetry # and except Status which should be 'OffTrack' for missing data data[drv] = data[drv] \ .merge(most_complete_ref, how='outer') \ .sort_values(by='Date') \ .reset_index(drop=True) data[drv]['Status'] = data[drv]['Status'] \ .fillna(value='OffTrack', inplace=False) data[drv].loc[:, ['X', 'Y', 'Z']] = \ data[drv].loc[:, ['X', 'Y', 'Z']]\ .fillna(value=0, inplace=False) _logger.warning(f"Driver {drv: >2}: Position data is " f"incomplete!") return data
[docs] @Cache.api_request_wrapper def track_status_data(path, response=None, livedata=None): """ .. warning:: :mod:`fastf1.api` will be considered private in future releases and potentially be removed or changed. Fetch and parse track status data. Track status contains information on yellow/red/green flags, safety car and virtual safety car. It provides the following data channels per sample: - Time (datetime.timedelta): session timestamp (time only) - Status (str): contains track status changes as numeric values (described below) - Message (str): contains the same information as status but in easily understandable words ('Yellow', 'AllClear',...) A new value is sent every time the track status changes. Track status is indicated using single digit integer status codes (as string). List of known statuses: - '1': Track clear (beginning of session or to indicate the end of another status) - '2': Yellow flag (sectors are unknown) - '3': ??? Never seen so far, does not exist? - '4': Safety Car - '5': Red Flag - '6': Virtual Safety Car deployed - '7': Virtual Safety Car ending (As indicated on the drivers steering wheel, on tv and so on; status '1' will mark the actual end) Args: path (str): api path base string (usually ``Session.api_path``) response: Response as returned by :func:`fetch_page` can be passed if it was downloaded already. livedata: An instance of :class:`fastf1.livetiming.data.LiveTimingData` to use as a source instead of the api Returns: A dictionary containing one key for each data channel and a list of values per key. Raises: SessionNotAvailableError: in case the F1 livetiming api returns no data """ if livedata is not None and livedata.has('TrackStatus'): # does not need any further processing _logger.info("Loading track status data") response = livedata.get('TrackStatus') elif response is None: _logger.info("Fetching track status data...") response = fetch_page(path, 'track_status') if response is None: # no response received raise SessionNotAvailableError( "No data for this session! If this session only finished " "recently, please try again in a few minutes." ) data = {'Time': [], 'Status': [], 'Message': []} for entry in response: if len(entry) < 2: continue row = entry[1] if not isinstance(row, dict): continue data['Time'].append(to_timedelta(entry[0])) data['Status'].append(row.get('Status', '')) data['Message'].append(row.get('Message', '')) return data
[docs] @Cache.api_request_wrapper def session_status_data(path, response=None, livedata=None): """ .. warning:: :mod:`fastf1.api` will be considered private in future releases and potentially be removed or changed. Fetch and parse session status data. Session status contains information on when a session was started and when it ended (amongst others). It provides the following data channels per sample: - Time (datetime.timedelta): session timestamp (time only) - Status (str): status messages A new value is sent every time the session status changes. Args: path (str): api path base string (usually ``Session.api_path``) response: Response as returned by :func:`fetch_page` can be passed if it was downloaded already. livedata: An instance of :class:`fastf1.livetiming.data.LiveTimingData` to use as a source instead of the api Returns: A dictionary containing one key for each data channel and a list of values per key. Raises: SessionNotAvailableError: in case the F1 livetiming api returns no data """ if livedata is not None and livedata.has('SessionStatus'): # does not need any further processing _logger.info("Loading session status data") response = livedata.get('SessionStatus') elif response is None: _logger.info("Fetching session status data...") response = fetch_page(path, 'session_status') if response is None: # no response received raise SessionNotAvailableError( "No data for this session! If this session only finished " "recently, please try again in a few minutes." ) data = {'Time': [], 'Status': []} for entry in response: if len(entry) < 2: continue row = entry[1] if not isinstance(row, dict) or 'Status' not in row: continue data['Time'].append(to_timedelta(entry[0])) data['Status'].append(row['Status']) return data
[docs] @Cache.api_request_wrapper def race_control_messages(path, response=None, livedata=None): """ .. warning:: :mod:`fastf1.api` will be considered private in future releases and potentially be removed or changed. Fetch and parse race control messages. Race control messages are sent by race control to all teams to notify of decisions and statuses of the session. Every message has the following attributes: - Utc: Message timestamp - Category (str): Type of message, "Other", "Flag", "Drs", "CarEvent" - Message (str): Content of message Other possible attributes are: - Status (str): Status of context, e.g. "DISABLED" for disabling DRS - Flag (str): Type of flag being waved "GREEN", "RED", "YELLOW", "CLEAR", "CHEQUERED" - Scope (str): Scope of message "Track", "Sector", "Driver" - Sector (int): Affected track sector for sector-scoped messages - RacingNumber (str): Affected driver for CarEvent messages - Lap (int): Number of the lap in which the message was displayed Args: path (str): api path base string (usually ``Session.api_path``) response: Response as returned by :func:`fetch_page` can be passed if it was downloaded already. livedata: An instance of :class:`fastf1.livetiming.data.LiveTimingData` to use as a source instead of the api Returns: A dictionary containing one key for each data channel and a list of values per key. Raises: SessionNotAvailableError: in case the F1 livetiming api returns no data """ if livedata is not None and livedata.has('RaceControlMessages'): # does not need any further processing _logger.info("Loading race control messages") response = livedata.get('RaceControlMessages') elif response is None: _logger.info("Fetching race control messages...") response = fetch_page(path, 'race_control_messages') if response is None: # no response received raise SessionNotAvailableError( "No data for this session! If this session only finished " "recently, please try again in a few minutes." ) data = { 'Time': [], 'Category': [], 'Message': [], 'Status': [], 'Flag': [], 'Scope': [], 'Sector': [], 'RacingNumber': [], 'Lap': [] } data_keys = ('Category', 'Message', 'Status', 'Flag', 'Scope', 'Sector', 'RacingNumber', 'Lap') converters = (str, str, str, str, str, int, str, int) for line in response: messages = line[1]['Messages'] if isinstance(messages, dict): messages = list(messages.values()) for entry in messages: data['Time'].append(to_datetime(entry['Utc'])) for key, conv in zip(data_keys, converters): try: data[key].append(conv(entry[key])) except (KeyError, ValueError): # type conversion failed or key is missing data[key].append(None) return data
[docs] @Cache.api_request_wrapper def lap_count(path, response=None, livedata=None): """ .. warning:: :mod:`fastf1.api` will be considered private in future releases and potentially be removed or changed. Fetch and parse lap count data. It provides the following data channels per sample: - Time: session timestamp (time only) - TotalLaps (int): Intended number of total laps - CurrentLap (int): Current race lap A value can have both 'TotalLaps' and 'CurrentLap' or only one of them. A new value is sent every time a lap is completed or the intended number of laps changes. Args: path (str): api path base string (usually ``Session.api_path``) response: Response as returned by :func:`fetch_page` can be passed if it was downloaded already. livedata: An instance of :class:`fastf1.livetiming.data.LiveTimingData` to use as a source instead of the api Returns: A dictionary containing one key for each data channel and a list of values per key. Raises: SessionNotAvailableError: in case the F1 livetiming api returns no data """ if livedata is not None and livedata.has('LapCount'): # does not need any further processing _logger.info("Loading lap count data") response = livedata.get('LapCount') elif response is None: _logger.info("Fetching lap count data...") response = fetch_page(path, 'lap_count') if response is None: # no response received raise SessionNotAvailableError( "No data for this session! If this session only finished " "recently, please try again in a few minutes." ) data = {'Time': [], 'TotalLaps': [], 'CurrentLap': []} data_keys = ('TotalLaps', 'CurrentLap') converters = (int, int) for entry in response: data['Time'].append(to_timedelta(entry[0])) for key, conv in zip(data_keys, converters): try: data[key].append(conv(entry[1][key])) except (KeyError, ValueError): # type conversion failed or key is missing data[key].append(None) return data
[docs] @Cache.api_request_wrapper def driver_info(path, response=None, livedata=None): """ .. warning:: :mod:`fastf1.api` will be considered private in future releases and potentially be removed or changed. Fetch driver information. Driver information contains the following information about each driver: `['RacingNumber', 'BroadcastName', 'FullName', 'Tla', 'Line', 'TeamName', 'TeamColour', 'FirstName', 'LastName', 'Reference', 'HeadshotUrl', 'CountryCode']` Args: path (str): api path base string (usually ``Session.api_path``) response: Response as returned by :func:`fetch_page` can be passed if it was downloaded already. livedata: An instance of :class:`fastf1.livetiming.data.LiveTimingData` to use as a source instead of the api Returns: A dictionary containing one entry for each driver with the drivers racing number as key Raises: SessionNotAvailableError: in case the F1 livetiming api returns no data """ if livedata is not None and livedata.has('DriverList'): # does not need any further processing _logger.info("Loading driver list") response = livedata.get('DriverList') elif response is None: _logger.info("Fetching driver list...") response = fetch_page(path, 'driver_list') if response is None: # no response received raise SessionNotAvailableError( "No data for this session! If this session only finished " "recently, please try again in a few minutes." ) drivers = collections.defaultdict(dict) default_keys = [ 'RacingNumber', 'BroadcastName', 'FullName', 'Tla', 'Line', 'TeamName', 'TeamColour', 'FirstName', 'LastName', 'Reference', 'HeadshotUrl', 'CountryCode' ] for line in response: try: ts, content = line except ValueError: # unexpected data format, incorrect number of values to unpack continue if not isinstance(content, dict): continue # unexpected data format for drv_num, patch in content.items(): if not isinstance(patch, dict): continue # unexpected data format for key, val in patch.items(): if key not in default_keys: continue drivers[drv_num][key] = val return drivers
[docs] @Cache.api_request_wrapper def weather_data(path, response=None, livedata=None): """ .. warning:: :mod:`fastf1.api` will be considered private in future releases and potentially be removed or changed. Fetch and parse weather data. Weather data provides the following data channels per sample: - Time (datetime.timedelta): session timestamp (time only) - AirTemp (float): Air temperature [°C] - Humidity (float): Relative humidity [%] - Pressure (float): Air pressure [mbar] - Rainfall (bool): Shows if there is rainfall - TrackTemp (float): Track temperature [°C] - WindDirection (int): Wind direction [°] (0°-359°) - WindSpeed (float): Wind speed [m/s] Weather data is updated once per minute. Args: path (str): api path base string (usually ``Session.api_path``) response: Response as returned by :func:`fetch_page` can be passed if it was downloaded already. livedata: An instance of :class:`fastf1.livetiming.data.LiveTimingData` to use as a source instead of the api Returns: A dictionary containing one key for each data channel and a list of values per key. Raises: SessionNotAvailableError: in case the F1 live timing api returns no data """ if livedata is not None and livedata.has('WeatherData'): # does not need any further processing _logger.info("Loading weather data") response = livedata.get('WeatherData') elif response is None: _logger.info("Fetching weather data...") response = fetch_page(path, 'weather_data') if response is None: # no response received raise SessionNotAvailableError( "No data for this session! If this session only finished " "recently, please try again in a few minutes." ) data = { 'Time': [], 'AirTemp': [], 'Humidity': [], 'Pressure': [], 'Rainfall': [], 'TrackTemp': [], 'WindDirection': [], 'WindSpeed': [] } data_keys = ('AirTemp', 'Humidity', 'Pressure', 'Rainfall', 'TrackTemp', 'WindDirection', 'WindSpeed') converters = (float, float, float, lambda v: True if v == '1' else False, # rain: str -> bool float, int, float) for entry in response: if len(entry) < 2: continue row = entry[1] if not isinstance(row, dict): continue data['Time'].append(to_timedelta(entry[0])) for key, conv in zip(data_keys, converters): try: data[key].append(conv(row[key])) except (KeyError, ValueError): # type conversion failed or key is missing data[key].append(conv(0)) return data
@Cache.api_request_wrapper def season_schedule(path, response=None): if response is None: _logger.info("Fetching season schedule...") response = fetch_page(path, 'index') if response is None: # no response received raise SessionNotAvailableError( "No data for this session! If this session only finished " "recently, please try again in a few minutes." ) return response['Meetings'] @Cache.api_request_wrapper def session_info(path, response=None, livedata=None): """ .. warning:: :mod:`fastf1.api` will be considered private in future releases and potentially be removed or changed. Fetch and parse session info data. Returns: A dictionary containing the nested session info data. Timestamps and timedeltas are converted to ``datetime.datetime`` and ``datetime.timedelta`` respectively. Raises: SessionNotAvailableError: in case the F1 live timing api returns no data """ if livedata is not None and livedata.has('SessionInfo'): # does not need any further processing _logger.info("Loading session info data") response = livedata.get('SessionInfo') elif response is None: _logger.info("Fetching session info data...") response = fetch_page(path, 'session_info') if response is None: # no response received raise SessionNotAvailableError( "No data for this session! If this session only finished " "recently, please try again in a few minutes." ) ts, data = response[0] data['StartDate'] = to_datetime(data['StartDate']) data['EndDate'] = to_datetime(data['EndDate']) data['GmtOffset'] = to_timedelta(data['GmtOffset']) return data
[docs] def fetch_page(path, name): """ .. warning:: :mod:`fastf1.api` will be considered private in future releases and potentially be removed or changed. Fetch data from the formula1 livetiming web api, given url base path and page name. An attempt to parse json or decode known messages is made. Args: path (str): api path base string (usually ``Session.api_path``) name (str): page name (see ``api.pages`` for all known pages) Returns: - dictionary if content was json - list of entries if jsonStream, where each entry again contains two elements: [timestamp, content]. Content is parsed with :func:`parse` and will usually be a dictionary created from json data. - None if request failed """ page = pages[name] is_stream = 'jsonStream' in page is_z = '.z.' in page r = Cache.requests_get(base_url + path + pages[name], headers=headers) if r.status_code >= 400: _logger.debug(f"Falling back to livetiming mirror ({base_url_mirror})") r = Cache.requests_get(base_url_mirror + path + pages[name], headers=headers) if r.status_code == 200: raw = r.content.decode('utf-8-sig') if is_stream: records = raw.split('\r\n')[:-1] # last split is empty if name in ('position', 'car_data'): # Special case to improve memory efficiency return records else: decode_error_count = 0 tl = 12 # length of timestamp: len('00:00:00:000') ret = list() for e in records: try: ret.append([e[:tl], parse(e[tl:], zipped=is_z)]) except json.JSONDecodeError: decode_error_count += 1 continue if decode_error_count > 0: _logger.warning(f"Failed to decode {decode_error_count}" f" messages ({len(records)} messages " f"total)") return ret else: return parse(raw, is_z) else: return None
[docs] def parse(text: str, zipped: bool = False) -> Union[str, dict]: """ .. warning:: :mod:`fastf1.api` will be considered private in future releases and potentially be removed or changed. Parse json and jsonStream as returned by livetiming.formula1.com This function can only pass one data entry at a time, not a whole response. Timestamps and data need to be separated before and only the data must be passed as a string to be parsed. Args: text: The string which should be parsed zipped: Whether the text is compressed. This is the case for '.z' data (e.g. position data) Returns: Depending on data of which page is parsed - a dictionary created as a result of loading json data - a string """ if text[0] == '{': return json.loads(text) if text[0] == '"': text = text.strip('"') if zipped: text = zlib.decompress(base64.b64decode(text), -zlib.MAX_WBITS) return parse(text.decode('utf-8-sig')) _logger.warning("Couldn't parse text") return text
[docs] class SessionNotAvailableError(Exception): """Raised if an api request returned no data for the requested session. A likely cause is that the session does not exist because it was cancelled.""" def __init__(self, *args): super().__init__(*args)