Source code for forex_data.data_management.common

# -*- coding: utf-8 -*-
"""
Created on Sat Apr 30 09:23:19 2022

@author: fiora
"""


from loguru import logger

from re import (
    fullmatch,
    findall,
    search,
    IGNORECASE
)

from typing import (
    cast,
    Any,
    List,
    Literal,
    Dict,
    Tuple
)

import requests
import random
from math import ceil
from bs4 import BeautifulSoup

from datetime import (
    timedelta,
    datetime,
    date
)

from pathlib import Path

from attrs import (
    field,
    validators
)

from holidays import (
    country_holidays
)

# PANDAS
from pandas import (
    DataFrame as pandas_dataframe,
    concat as pandas_concat,
    Timestamp,
    isnull,
    bdate_range,
    to_datetime,
    to_timedelta,
    Timedelta,
    read_parquet as pandas_read_parquet,
    read_csv as pandas_read_csv
)

from pandas.api.types import is_datetime64_any_dtype
from pandas.tseries.frequencies import to_offset
from pandas.tseries.offsets import DateOffset

# PYARROW
from pyarrow import (
    float32 as pyarrow_float32,
    timestamp as pyarrow_timestamp,
    schema as pyarrow_schema,
    Table,
    table as pyarrow_table,
    concat_tables,
    csv as arrow_csv
)

from pyarrow.parquet import (
    write_table,
    read_table
)

# POLARS
from polars import (
    Float32 as PolarsFloat32,
    Datetime as PolarsDatetime,
    read_csv as polars_read_csv,
    concat as polars_concat,
    col,
    len as polars_len,
    read_parquet as polars_read_parquet,
    from_arrow,
    DataFrame as PolarsDataFrame,
    LazyFrame as PolarsLazyFrame,
    scan_csv as polars_scan_csv,
    scan_parquet as polars_scan_parquet,
    business_day_count
)

from dateutil.rrule import (
    rrule,
    DAILY,
    MO,
    TU,
    WE,
    TH,
    FR
)

__all__ = [
    'YEARS',
    'MONTHS',
    'DATE_FORMAT_SQL',
    'DATE_FORMAT_HISTDATA_CSV',
    'HISTDATA_URL_TICKDATA_TEMPLATE',
    'HISTDATA_BASE_DOWNLOAD_METHOD',
    'HISTDATA_BASE_DOWNLOAD_URL',
    'DEFAULT_PATHS',
    'DATA_TYPE',
    'BASE_DATA_COLUMN_NAME',
    'DATA_FILE_COLUMN_INDEX',
    'SUPPORTED_DATA_FILES',
    'SUPPORTED_DATA_ENGINES',
    'ASSET_TYPE',
    'TEMP_FOLDER',
    'TEMP_CSV_FILE',
    'DTYPE_DICT',
    'PYARROW_DTYPE_DICT',
    'POLARS_DTYPE_DICT',
    'DATA_COLUMN_NAMES',
    'FILENAME_TEMPLATE',
    'DATA_KEY',
    'TICK_TIMEFRAME',
    'FILENAME_STR',
    'FILENAME_YEAR_STR',
    'DATE_NO_HOUR_FORMAT',
    'SQL_COMPARISON_OPERATORS',
    'SUPPORTED_SQL_COMPARISON_OPERATORS',
    'SUPPORTED_BASE_DATA_COLUMN_NAME',
    'SQL_CONDITION_AGGREGATION_MODES',
    'SUPPORTED_SQL_CONDITION_AGGREGATION_MODES',
    'HISTORICAL_DB_MIN_DATE',
    'FOREX_HOLIDAYS',
    'HISTDATA_PROVIDER',
    'DUKASCOPY_PROVIDER',
    'SUPPORTED_HISTORICAL_DATA_PROVIDERS',
    'TWELVEDATA_PROVIDER',
    'TWELVEDATA_PROVIDER_PLAN_LIST',
    'TWELVE_DATA_CHUNK_SIZE',
    'TWELVE_DATA_FREE_TIER_MINUTE_RATE_LIMIT',
    'TWELVE_DATA_FREE_TIER_DAY_RATE_LIMIT',
    'TWELVE_DATA_PRO_MINUTE_RATE_LIMIT',
    'TWELVE_DATA_PRO_DAY_RATE_LIMIT',
    'TWELVE_DATA_LIMIT_DATE',
    'TWELVE_DATA_TIMEFRAMES',
    'SUPPORTED_REALTIME_DATA_PROVIDERS',

    'validator_file_path',
    'validator_dir_path',
    'validate_timedelta_str',
    'get_attrs_names',
    'check_timeframe_str',
    'any_date_to_datetime64',
    'empty_dataframe',
    'is_empty_dataframe',
    'shape_dataframe',
    'get_dataframe_column',
    'get_dataframe_row',
    'get_dataframe_element',
    'astype',
    'read_csv',
    'sort_dataframe',
    'concat_data',
    'list_remove_duplicates',
    'reframe_data',
    'write_csv',
    'write_parquet',
    'read_parquet',
    'to_pandas_dataframe',
    'get_pair_symbols',
    'to_source_symbol',
    'get_date_interval',
    'random_date_between',
    'get_histdata_tickers',
    'TickerNotFoundError',
    'TickerDataNotFoundError',
    'TickerDataBadTypeException',
    'TickerDataInvalidException',
    'business_days_data',
    'update_ticker_years_dict',
    'get_class_attr_keys',
    'get_class_attr_values',
    'get_class_attr_dict',
    'get_forex_holidays',
    'adjust_end_date_to_business_days',
    'adjust_start_date_to_business_days',
    'estimate_start_date_to_business_days',
    'estimate_end_date_to_business_days',
    'Timestamp',
    'PolarsDatetime',
    'PolarsFloat32',
    'PolarsDataFrame',
    'PolarsLazyFrame',
]


# CUSTOM EXCEPTIONS
# =============================================================================

# TickerNotFoundError:
# This exception is raised when the ticker requested is misspelled
# or does not exist in the database.


[docs] class TickerNotFoundError(Exception): pass
# TickerDataNotFoundError: # This exception is raised when the ticker is found # but data is not available or data retrieval failed.
[docs] class TickerDataNotFoundError(Exception): pass
# TickerDataBadTypeException: # This exception is raised when the ticker data # is found but data type is not compliant with the expected type.
[docs] class TickerDataBadTypeException(Exception): pass
# TickerDataInvalidException: # This exception is raised when the ticker data # is not found or invalid for generic reasons.
[docs] class TickerDataInvalidException(Exception): pass
# common functions, constants and templates TEMP_FOLDER = "Temp" TEMP_CSV_FILE = "Temp.csv" HISTDATA_URL_TICKDATA_TEMPLATE = ( 'https://www.histdata.com/download-free-forex-historical-data/?/' 'ascii/tick-data-quotes/{ticker}/{year}/{month_num}' ) HISTDATA_URL_ONEMINDATA_TEMPLATE = ( 'http://www.histdata.com/download-free-forex-data/?/' 'ascii/1-minute-bar-quotes/{pair}/{year}/{month_num}' ) HISTDATA_BASE_DOWNLOAD_URL = "http://www.histdata.com/get.php" HISTDATA_BASE_DOWNLOAD_METHOD = 'POST' MONTHS = ['January', 'February', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December'] YEARS = list(range(2000, datetime.now().year + 1, 1)) HISTORICAL_DB_MIN_DATE = datetime(2000, 1, 1) DATE_NO_HOUR_FORMAT = '%Y-%m-%d' DATE_FORMAT_ISO8601 = 'ISO8601' DATE_FORMAT_SQL = '%Y-%m-%d %H:%M:%S.%f' DATE_FORMAT_HISTDATA_CSV = '%Y%m%d %H%M%S%3f' # DATA_KEY_TEMPLATE_STR = '{ticker}.Y{year}.{tf}' # DATA_KEY_TEMPLATE_PATTERN = '^[A-Za-z]+.Y[0-9]+.[A-Za-z0-9]+' # FILENAME_STR = '{ticker}_Y{year}_{tf}.{file_ext}' DATA_KEY_TEMPLATE_STR = '{market}.{ticker}.{tf}' DATA_KEY_TEMPLATE_PATTERN = '^[A-Za-z0-9]_[A-Za-z]+.[A-Za-z0-9]+' FILENAME_STR = '{market}_{ticker}_{tf}.{file_ext}' FILENAME_YEAR_STR = '{market}_{ticker}_{tf}_{year}.{file_ext}' DEFAULT_TIMEZONE = 'utc' TICK_TIMEFRAME = 'tick' # ticker PAIR of forex market SINGLE_CURRENCY_PATTERN_STR = '[A-Za-z]{3}' TICKER_PATTERN = '^' + SINGLE_CURRENCY_PATTERN_STR \ + SINGLE_CURRENCY_PATTERN_STR + '$' PAIR_GENERIC_FORMAT = '{TO}/{FROM}' # TIME PATTERN TIME_WINDOW_PATTERN_STR = '^[-+]?[0-9]+[A-Za-z]{1,}$' TIME_WINDOW_COMPONENTS_PATTERN_STR = '^[-+]?[0-9]+|[A-Za-z]{1,}$' TIME_WINDOW_UNIT_PATTERN_STR = '[A-Za-z]{1,}$' GET_YEAR_FROM_TICK_KEY_PATTERN_STR = '^[A-Za-z].Y[0-9].TICK' YEAR_FIELD_PATTERN_STR = '^Y([0-9]{4,})$' POLARS_DURATION_PATTERN_STR = '^[0-9]+(ns|us|ms|s|m|h|d|w|mo|q|y|i)$' PYARROW_DURATION_PATTERN_STR = '^[0-9]+(ns|us|ms|s|m|h|d|w|mo|q|y|i)$' # GENERAL UTILITIES
[docs] def get_class_attr_keys(var): defined_keys = [] try: keys = vars(var) defined_keys = [key for key in keys if not search(r'__(\w+)__', key)] except Exception: return None return defined_keys
[docs] def get_class_attr_values(var): defined_vals = [] dict_var = dict(vars(var)) keys = get_class_attr_keys(var) try: defined_vals = [dict_var[key] for key in keys] except Exception: return None return defined_vals
[docs] def get_class_attr_dict(var): def_keys = get_class_attr_keys(var) return {key: getattr(var, key) for key in def_keys}
# auxiliary CONSTANT DEFINITIONS # key template: <ticker>.Y<year>.<timeframe>.<data-type>
[docs] class DATA_KEY: MARKET = 0 TICKER_INDEX = 1 TF_INDEX = 2 YEAR_INDEX = 3
# filename template : <ticker>_Y<year>_<timeframe>.<filetype>
[docs] class FILENAME_TEMPLATE: TICKER_INDEX = 0 YEAR_INDEX = 1 YEAR_NUMERICAL_CHAR = 1 TF_INDEX = 2 FILETYPE_INDEX = 3
[docs] class DEFAULT_PATHS: BASE_PATH = str(Path.home() / '.database') HIST_DATA_FOLDER = 'HistoricalData' REALTIME_DATA_FOLDER = 'RealtimeData'
[docs] class DATA_TYPE: CSV_FILETYPE = 'csv' PARQUET_FILETYPE = 'parquet'
[docs] class DATA_FILE_COLUMN_INDEX: TIMESTAMP = 0
SUPPORTED_DATA_FILES = [ DATA_TYPE.CSV_FILETYPE, DATA_TYPE.PARQUET_FILETYPE ] # supported dataframe engines # pyarrow is inserted but reframe operation all in pyarrow # is not yet available, now it is masked # to a refame call with polars # reframe_data() on pyarrow Table SUPPORTED_DATA_ENGINES = [ 'pandas', 'pyarrow', 'polars', 'polars_lazy' ] # SINGLE BASE DATA COMPOSIION TEMPLATE: ['open','close','high','low'] # with datetime/timestamp as index # column names for dataframes TICK and timeframe filtered # OHLC and related column names
[docs] class DATA_COLUMN_NAMES: TICK_DATA_NO_PVALUE = ['timestamp', 'ask', 'bid', 'vol'] TICK_DATA = ['timestamp', 'ask', 'bid', 'vol', 'p'] TF_DATA = ['timestamp', 'open', 'high', 'low', 'close'] TICK_DATA_TIME_INDEX = ['ask', 'bid', 'vol', 'p'] TF_DATA_TIME_INDEX = ['open', 'high', 'low', 'close']
# SELECTED AS SINGLE BASE DATA COMPOSION TEMPLATE BASE_DATA = DATA_COLUMN_NAMES.TF_DATA_TIME_INDEX BASE_DATA_WITH_TIME = DATA_COLUMN_NAMES.TF_DATA class DB_MODE: FULL_MODE = 'FULL_MODE' HISTORICAL_MODE = 'HISTORICAL_MODE' REALTIME_MODE = 'REALTIME_MODE'
[docs] class ASSET_TYPE: STOCK = 'STOCK' ETF = 'ETF' FOREX = 'FOREX'
[docs] class BASE_DATA_COLUMN_NAME: TIMESTAMP = 'timestamp' OPEN = 'open' HIGH = 'high' LOW = 'low' CLOSE = 'close' ASK = 'ask' BID = 'bid' VOL = 'vol' P_VALUE = 'p' TRANSACTIONS = 'transactions' VWAP = 'vwap' OTC = 'otc'
SUPPORTED_BASE_DATA_COLUMN_NAME = Literal[ BASE_DATA_COLUMN_NAME.TIMESTAMP, BASE_DATA_COLUMN_NAME.OPEN, BASE_DATA_COLUMN_NAME.HIGH, BASE_DATA_COLUMN_NAME.LOW, BASE_DATA_COLUMN_NAME.CLOSE, BASE_DATA_COLUMN_NAME.ASK, BASE_DATA_COLUMN_NAME.BID, BASE_DATA_COLUMN_NAME.VOL, BASE_DATA_COLUMN_NAME.P_VALUE, BASE_DATA_COLUMN_NAME.TRANSACTIONS, BASE_DATA_COLUMN_NAME.VWAP, BASE_DATA_COLUMN_NAME.OTC ]
[docs] class SQL_COMPARISON_OPERATORS: GREATER_THAN = '>' LESS_THAN = '<' GREATER_THAN_OR_EQUAL = '>=' LESS_THAN_OR_EQUAL = '<=' EQUAL = '==' NOT_EQUAL = '!='
SUPPORTED_SQL_COMPARISON_OPERATORS = Literal[ SQL_COMPARISON_OPERATORS.GREATER_THAN, SQL_COMPARISON_OPERATORS.LESS_THAN, SQL_COMPARISON_OPERATORS.GREATER_THAN_OR_EQUAL, SQL_COMPARISON_OPERATORS.LESS_THAN_OR_EQUAL, SQL_COMPARISON_OPERATORS.EQUAL, SQL_COMPARISON_OPERATORS.NOT_EQUAL ]
[docs] class SQL_CONDITION_AGGREGATION_MODES: AND = 'AND' OR = 'OR'
SUPPORTED_SQL_CONDITION_AGGREGATION_MODES = Literal[ SQL_CONDITION_AGGREGATION_MODES.AND, SQL_CONDITION_AGGREGATION_MODES.OR ] # auxiliary functions # get elements from db key
[docs] def get_db_key_elements(key): res = fullmatch(DATA_KEY_TEMPLATE_STR, key) if res: return res.groups() else: logger.error( f'key {key} does not respect regex template {DATA_KEY_TEMPLATE_STR}') raise ValueError
# parse argument to get datetime object with date format as input def infer_date_from_format_dt(s, date_format='ISO8601', unit=None, utc=False): if unit: return to_datetime(s, unit=unit, utc=utc) else: return to_datetime(s, format=date_format, utc=utc) # parse timeframe as string and validate if it is valid # following pandas DateOffset freqstr rules and 'TICK' (=lowest timeframe available) # link to official pandas doc # https://pandas.pydata.org/docs/user_guide/timeseries.html#dateoffset-objects # add compatibility to polars frequency string
[docs] def check_timeframe_str(tf: str | Timedelta | DateOffset, engine: Literal['pandas', 'polars', 'polars_lazy', 'pyarrow'] = 'pandas'): if tf == TICK_TIMEFRAME: return tf elif isinstance(tf, Timedelta): ''' Timedelta type is acceptd by all engines supported ''' return tf elif engine == 'pandas': ''' timeframe value check for pandas engine ''' try: if isinstance(to_offset(tf), DateOffset): return tf else: logger.critical(f"Type check: Invalid timeframe for pandas: {tf}") raise ValueError except ValueError: logger.critical(f"Type check: Invalid timeframe for pandas: {tf}") raise elif ( engine == 'polars' or engine == 'polars_lazy' or engine == 'pyarrow' ): ''' timeframe value check for polars engine ''' if isinstance(tf, str): if fullmatch(POLARS_DURATION_PATTERN_STR, tf, flags=IGNORECASE): return tf else: logger.critical(f"Type check: Invalid timeframe for polars: {tf}") raise ValueError elif isinstance(tf, timedelta): return tf else: logger.critical(f"Type check: Invalid timeframe for polars: {tf}") raise ValueError else: logger.critical(f"Type check: Invalid engine: {engine}") raise ValueError
# PAIR symbol functions
[docs] def get_pair_symbols(ticker): components = findall(SINGLE_CURRENCY_PATTERN_STR, ticker) if len(components) == 2: return components[0], components[1] else: return None
[docs] def to_source_symbol(ticker, source): to_symbol, from_symbol = get_pair_symbols(ticker) return PAIR_TWELVE_DATA_FORMAT.format(TO=to_symbol, FROM=from_symbol)
# TIMESTAMP RELATED FUNCTIONS def timewindow_str_to_timedelta(time_window_str): if fullmatch(TIME_WINDOW_PATTERN_STR, time_window_str): return Timedelta(time_window_str) else: logger.error('time window pattern not match: ' '"<integer_multiplier><unit>" str') raise ValueError
[docs] def any_date_to_datetime64(any_date, date_format='ISO8601', unit=None, to_pydatetime=False): try: any_date = infer_date_from_format_dt(any_date, date_format, unit=unit) if to_pydatetime: any_date = any_date.to_pydatetime() except Exception as e: logger.error(f'date {any_date} conversion failed, ' f'failed conversion to {date_format} ' 'date format') raise return any_date
[docs] def get_date_interval(start=None, end=None, interval_start_mode=None, interval_end_mode='now', interval_timespan=None, freq=None, normalize=False, bdays=False): # create start and end date as timestamp instances start_date = Timestamp(start) end_date = Timestamp(end) if interval_timespan: # a variety of interval mode could be implemented # 'now' - end of date interval is timestamp now if interval_end_mode == 'now': end_date = Timestamp.now() start_date = end_date - timewindow_str_to_timedelta(interval_timespan) if bdays: components = findall(TIME_WINDOW_COMPONENTS_PATTERN_STR, interval_timespan) # fixed days redundancy check available only with 'd' type requested # timespan if components[1] == 'd': days_list = list( rrule(freq=DAILY, dtstart=start_date, until=end_date, byweekday=(MO, TU, WE, TH, FR)) ) while len(days_list) < int(components[0]): start_date = start_date - Timedelta(days=1) days_list = list( rrule(freq=DAILY, dtstart=start_date, until=end_date, byweekday=(MO, TU, WE, TH, FR)) ) # Timestamp() constructor ensures these are Timestamp objects if normalize: if not isnull(start_date): start_date = Timestamp.normalize(start_date) if not isnull(end_date): end_date = Timestamp.normalize(end_date) start_date = any_date_to_datetime64(start_date) end_date = any_date_to_datetime64(end_date) # generate DateTimeIndex if freq is set # otherwise return just start and end of interval if freq: bdate_dtindex = bdate_range(start=start_date, end=end_date, freq=freq, tz=None, normalize=normalize, name='timestamp' ) return start_date, end_date, bdate_dtindex else: return start_date, end_date
[docs] def random_date_between(start_date, end_date): """ Get a random datetime between two datetime objects. Args: start_date (datetime): The start date. end_date (datetime): The end date. Returns: datetime: A random datetime between start_date and end_date. """ delta = end_date - start_date random_seconds = random.randint(0, int(delta.total_seconds())) return start_date + timedelta(seconds=random_seconds)
# BASE OPERATIONS WITH DATAFRAME # depending on dataframe engine support # for supported engines see var SUPPORTED_DATA_ENGINES # DATA ENGINES TYPES DICTIONARY
[docs] class DTYPE_DICT: TICK_DTYPE = {'ask': 'float32', 'bid': 'float32', 'vol': 'float32', 'p': 'float32'} TF_DTYPE = {'open': 'float32', 'high': 'float32', 'low': 'float32', 'close': 'float32'} TIME_TICK_DTYPE = {'timestamp': 'datetime64[ms]', 'ask': 'float32', 'bid': 'float32', 'vol': 'float32', 'p': 'float32'} TIME_TF_DTYPE = {'timestamp': 'datetime64[ms]', 'open': 'float32', 'high': 'float32', 'low': 'float32', 'close': 'float32'}
[docs] class PYARROW_DTYPE_DICT: TICK_DTYPE = {'ask': pyarrow_float32(), 'bid': pyarrow_float32(), 'vol': pyarrow_float32(), 'p': pyarrow_float32()} TF_DTYPE = {'open': pyarrow_float32(), 'high': pyarrow_float32(), 'low': pyarrow_float32(), 'close': pyarrow_float32()} TIME_TICK_DTYPE = {'timestamp': pyarrow_timestamp('ms'), 'ask': pyarrow_float32(), 'bid': pyarrow_float32(), 'vol': pyarrow_float32(), 'p': pyarrow_float32()} TIME_TF_DTYPE = {'timestamp': pyarrow_timestamp('ms'), 'open': pyarrow_float32(), 'high': pyarrow_float32(), 'low': pyarrow_float32(), 'close': pyarrow_float32()}
[docs] class POLARS_DTYPE_DICT: TICK_DTYPE = {'ask': PolarsFloat32, 'bid': PolarsFloat32, 'vol': PolarsFloat32, 'p': PolarsFloat32} TF_DTYPE = {'open': PolarsFloat32, 'high': PolarsFloat32, 'low': PolarsFloat32, 'close': PolarsFloat32} TIME_TICK_DTYPE = {'timestamp': PolarsDatetime('ms'), 'ask': PolarsFloat32, 'bid': PolarsFloat32, 'vol': PolarsFloat32, 'p': PolarsFloat32} TIME_TF_DTYPE = {'timestamp': PolarsDatetime('ms'), 'open': PolarsFloat32, 'high': PolarsFloat32, 'low': PolarsFloat32, 'close': PolarsFloat32}
# DATA ENGINES FUNCTIONS
[docs] def empty_dataframe(engine): if engine == 'pandas': return pandas_dataframe() elif engine == 'pyarrow': return pyarrow_table([]) elif engine == 'polars': return PolarsDataFrame() elif engine == 'polars_lazy': return PolarsLazyFrame() else: logger.error('function empty_dataframe not available' f' for engine {engine}') raise ValueError
[docs] def is_empty_dataframe(dataframe): if isinstance(dataframe, pandas_dataframe): return dataframe.empty elif isinstance(dataframe, Table): return (not bool(dataframe)) elif isinstance(dataframe, PolarsDataFrame): return dataframe.is_empty() elif isinstance(dataframe, PolarsLazyFrame): return dataframe.collect().is_empty() else: logger.error('function is_empty_dataframe not available' ' for instance of type' f' {type(dataframe)}') raise ValueError
[docs] def shape_dataframe(dataframe): if isinstance(dataframe, pandas_dataframe): return dataframe.shape elif isinstance(dataframe, Table): return dataframe.shape elif isinstance(dataframe, PolarsDataFrame): return dataframe.shape elif isinstance(dataframe, PolarsLazyFrame): return ( dataframe.select(polars_len()).collect().item(0, 0), dataframe.collect_schema().len() ) else: logger.error('function shape_dataframe not available' ' for instance of type' f' {type(dataframe)}') raise ValueError
[docs] def sort_dataframe(dataframe, column): if isinstance(dataframe, pandas_dataframe): return dataframe.sort_values(by=[column]) elif isinstance(dataframe, Table): return dataframe.sort_by(column) elif isinstance(dataframe, PolarsDataFrame): return dataframe.sort(column, nulls_last=True) elif isinstance(dataframe, PolarsLazyFrame): return dataframe.sort(column, nulls_last=True) else: logger.error('function sort_dataframe not available' ' for instance of type' f' {type(dataframe)}') raise ValueError
[docs] def get_dataframe_column(dataframe, column): if isinstance(dataframe, pandas_dataframe): return dataframe[column] elif isinstance(dataframe, Table): return dataframe[column] elif isinstance(dataframe, PolarsDataFrame): return dataframe[column] elif isinstance(dataframe, PolarsLazyFrame): return dataframe.select(column).collect() else: logger.error('function get_dataframe_column not available' ' for instance of type' f' {type(dataframe)}')
[docs] def get_dataframe_row(dataframe, row): if isinstance(dataframe, pandas_dataframe): return dataframe.loc[row] elif isinstance(dataframe, Table): return dataframe.slice(row, 1) elif isinstance(dataframe, PolarsDataFrame): return dataframe.slice(row, 1) elif isinstance(dataframe, PolarsLazyFrame): return dataframe.slice(row, 1) else: logger.error('function get_dataframe_row not available' ' for instance of type' f' {type(dataframe)}')
[docs] def get_dataframe_element(dataframe, column, row): if isinstance(dataframe, pandas_dataframe): return dataframe[column][row] elif isinstance(dataframe, Table): return dataframe[column][row] elif isinstance(dataframe, PolarsDataFrame): return dataframe[column][row] elif isinstance(dataframe, PolarsLazyFrame): return dataframe.select(column).collect().item(row, 0) else: logger.error('function get_dataframe_element not available' ' for instance of type' f' {type(dataframe)}') raise ValueError
def dtype_dict_to_pyarrow_schema(dtype_dict): return pyarrow_schema(dtype_dict.items())
[docs] def astype(dataframe, dtype_dict): if isinstance(dataframe, pandas_dataframe): return dataframe.astype(dtype_dict) elif isinstance(dataframe, Table): return dataframe.cast(dtype_dict_to_pyarrow_schema(dtype_dict)) elif isinstance(dataframe, PolarsDataFrame): return dataframe.cast(dtype_dict) elif isinstance(dataframe, PolarsLazyFrame): return dataframe.cast(dtype_dict) else: logger.error('function astype not available' ' for instance of type' f' {type(dataframe)}') raise ValueError
[docs] def read_parquet(engine, filepath): if engine == 'pandas': return pandas_read_parquet(filepath) elif engine == 'pyarrow': return read_table(filepath) elif engine == 'polars': return polars_read_parquet(filepath) elif engine == 'polars_lazy': return polars_scan_parquet(filepath) else: logger.error('function read_parquet not available' f' for engine {engine}') raise ValueError
[docs] def write_parquet(dataframe, filepath): if isinstance(dataframe, pandas_dataframe): try: dataframe.to_parquet(filepath, index=True) except Exception as e: logger.exception(f'pandas write parquet failed: {e}') raise elif isinstance(dataframe, Table): try: write_table(dataframe, filepath) except Exception as e: logger.exception(f'pyarrow write parquet failed: {e}') raise elif isinstance(dataframe, PolarsDataFrame): try: dataframe.write_parquet(filepath) except Exception as e: logger.exception(f'polars write parquet failed: {e}') raise elif isinstance(dataframe, PolarsLazyFrame): try: dataframe.sink_parquet(filepath) # alternative to sink_parquet() # dataframe.collect(streaming=False).write_parquet(filepath) except Exception as e: logger.exception(f'polars lazyframe sink ' 'parquet failed: {e}') raise else: logger.error('function write_parquet not available' ' for instance of type' f' {type(dataframe)}') raise ValueError
[docs] def read_csv(engine, file, **kwargs): if engine == 'pandas': return pandas_read_csv(file, **kwargs) elif engine == 'pyarrow': return arrow_csv.read_csv(file, **kwargs) elif engine == 'polars': return polars_read_csv(file, **kwargs) elif engine == 'polars_lazy': return polars_scan_csv(file, **kwargs) else: logger.error('function read_csv not available' f' for engine {engine}') raise ValueError
[docs] def write_csv(dataframe, file, **kwargs): if isinstance(dataframe, pandas_dataframe): try: # IMPORTANT # pandas dataframe case # avoid date_format parameter since it is reported that # it makes to_csv to be excessively long with column data # being datetime data type # see: https://github.com/pandas-dev/pandas/issues/37484 # https://stackoverflow.com/questions/65903287/pandas-1-2-1-to-csv-performance-with-datetime-as-the-index-and-setting-date-form dataframe.to_csv(file, header=True, **kwargs) except Exception as e: logger.exception(f'Error writing csv file {file}' f' with data type {type(dataframe)}: {e}') raise IOError elif isinstance(dataframe, Table): try: arrow_csv.write_csv(dataframe, file, **kwargs) except Exception as e: logger.exception(f'Error writing csv file {file}' f' with data type {type(dataframe)}: {e}') raise IOError elif isinstance(dataframe, PolarsDataFrame): try: dataframe.write_csv(file, **kwargs) except Exception as e: logger.exception(f'Error writing csv file {file}' f' with data type {type(dataframe)}: {e}') raise IOError elif isinstance(dataframe, PolarsLazyFrame): try: dataframe.sink_csv(file, **kwargs) except Exception as e: logger.exception(f'Error writing csv file {file}' f' with data type {type(dataframe)}: {e}') raise IOError else: logger.error('function write_csv not available' ' for instance of type' f' {type(dataframe)}') raise ValueError
[docs] def concat_data(data_list=field(validator=validators.instance_of(list))): if not isinstance(data_list, list): logger.error('required input as list') raise TypeError # assume data type is unique by input # get type from first element if isinstance(data_list[0], pandas_dataframe): return pandas_concat(data_list, ignore_index=False, copy=False) elif isinstance(data_list[0], Table): return concat_tables(data_list) elif isinstance(data_list[0], PolarsDataFrame): return polars_concat(data_list, how='vertical') elif isinstance(data_list[0], PolarsLazyFrame): return polars_concat(data_list, how='vertical') else: logger.error('function concat not available' ' for instance of type' f' {type(data_list[0])}') raise ValueError
[docs] def to_pandas_dataframe(dataframe): # convert to pandas dataframe # useful for those calls # requiring pandas as input # or pandas functions not covered # by other dataframe instance if isinstance(dataframe, pandas_dataframe): return dataframe elif isinstance(dataframe, Table): return dataframe.to_pandas() elif isinstance(dataframe, PolarsDataFrame): return dataframe.to_pandas(use_pyarrow_extension_array=True) elif isinstance(dataframe, PolarsLazyFrame): return dataframe.collect().to_pandas(use_pyarrow_extension_array=True) else: logger.error('function to_pandas() not available' ' for instance of type' f' {type(dataframe)}') raise ValueError
[docs] def reframe_data(dataframe, tf): ''' Parameters ---------- data : TYPE DESCRIPTION. tf : TYPE DESCRIPTION. Raises ------ ValueError DESCRIPTION. Returns ------- Dataframe DESCRIPTION. ''' if is_empty_dataframe(dataframe): return dataframe if isinstance(dataframe, pandas_dataframe): # assert timeframe input value tf = check_timeframe_str(tf, engine='pandas') if tf == TICK_TIMEFRAME: logger.warning(f'reframe not possible with target {TICK_TIMEFRAME}') return dataframe if not is_datetime64_any_dtype(dataframe.index): if BASE_DATA_COLUMN_NAME.TIMESTAMP in dataframe.columns: if not is_datetime64_any_dtype( dataframe[BASE_DATA_COLUMN_NAME.TIMESTAMP]): try: dataframe[BASE_DATA_COLUMN_NAME.TIMESTAMP] = any_date_to_datetime64( dataframe[BASE_DATA_COLUMN_NAME.TIMESTAMP]) except Exception as e: logger.exception('Pandas engine: ' 'Failed conversion of timestamp columns ' 'to DatetimeIndex') raise else: logger.error('Pandas engine: required column with ' f'name {BASE_DATA_COLUMN_NAME.TIMESTAMP}') raise ValueError # use pandas functions to reframe data on pandas Dataframe dataframe = sort_dataframe(dataframe, BASE_DATA_COLUMN_NAME.TIMESTAMP) dataframe = dataframe.set_index(BASE_DATA_COLUMN_NAME.TIMESTAMP, inplace=False, drop=True ) # resample based on p value if all([col in DATA_COLUMN_NAMES.TICK_DATA_TIME_INDEX for col in dataframe.columns]): # resample along 'p' column, data in ask, bid, p format dataframe = dataframe.p.resample(tf).ohlc().interpolate(method='nearest') elif all([col in DATA_COLUMN_NAMES.TF_DATA_TIME_INDEX for col in dataframe.columns]): # resample along given data already in ohlc format dataframe = dataframe.resample(tf).interpolate(method='nearest') else: logger.error(f'data columns {dataframe.columns} invalid, ' f'required {DATA_COLUMN_NAMES.TICK_DATA_TIME_INDEX} ' f'or {DATA_COLUMN_NAMES.TF_DATA_TIME_INDEX}') raise ValueError return dataframe.reset_index(drop=False) elif isinstance(dataframe, Table): # assert timeframe input value tf = check_timeframe_str(tf, engine='pyarrow') ''' use pyarrow functions to reframe data on pyarrow Table could not find easy way to filter an arrow table based on time interval opened an enhancement issue on github https://github.com/apache/arrow/issues/41049 As a temporary alternative, convert arrow Table to polars and perform reframe with polars engine ''' if all([col in DATA_COLUMN_NAMES.TICK_DATA for col in dataframe.column_names]): # convert to polars dataframe dataframe = from_arrow(dataframe, schema=cast(Any, POLARS_DTYPE_DICT.TIME_TICK_DTYPE)) elif all([col in DATA_COLUMN_NAMES.TF_DATA for col in dataframe.column_names]): # convert to polars dataframe dataframe = from_arrow(dataframe, schema=cast(Any, POLARS_DTYPE_DICT.TIME_TF_DTYPE)) # perform operation # convert to arrow Table and return return reframe_data(dataframe, tf).to_arrow() elif isinstance(dataframe, PolarsDataFrame): # assert timeframe input value tf = check_timeframe_str(tf, engine='polars') tf = tf.lower() dataframe = sort_dataframe(dataframe, BASE_DATA_COLUMN_NAME.TIMESTAMP) if all([col in DATA_COLUMN_NAMES.TICK_DATA for col in dataframe.columns]): return dataframe.group_by_dynamic( BASE_DATA_COLUMN_NAME.TIMESTAMP, every=tf).agg(col('p').first().alias(BASE_DATA_COLUMN_NAME.OPEN), col('p').max().alias(BASE_DATA_COLUMN_NAME.HIGH), col('p').min().alias(BASE_DATA_COLUMN_NAME.LOW), col('p').last().alias(BASE_DATA_COLUMN_NAME.CLOSE) ) elif all([col in DATA_COLUMN_NAMES.TF_DATA for col in dataframe.columns]): return dataframe.group_by_dynamic( BASE_DATA_COLUMN_NAME.TIMESTAMP, every=tf).agg(col(BASE_DATA_COLUMN_NAME.OPEN).first(), col(BASE_DATA_COLUMN_NAME.HIGH).max(), col(BASE_DATA_COLUMN_NAME.LOW).min(), col(BASE_DATA_COLUMN_NAME.CLOSE).last() ) else: logger.error(f'data columns {dataframe.columns} invalid, ' f'required {DATA_COLUMN_NAMES.TICK_DATA} ' f'or {DATA_COLUMN_NAMES.TF_DATA}') raise ValueError elif isinstance(dataframe, PolarsLazyFrame): tf = tf.lower() dataframe = dataframe.sort('timestamp', nulls_last=True) if all([col in DATA_COLUMN_NAMES.TICK_DATA for col in dataframe.collect_schema().names()]): return dataframe.group_by_dynamic( BASE_DATA_COLUMN_NAME.TIMESTAMP, every=tf).agg(col('p').first().alias(BASE_DATA_COLUMN_NAME.OPEN), col('p').max().alias(BASE_DATA_COLUMN_NAME.HIGH), col('p').min().alias(BASE_DATA_COLUMN_NAME.LOW), col('p').last().alias(BASE_DATA_COLUMN_NAME.CLOSE) ) elif all([col in DATA_COLUMN_NAMES.TF_DATA for col in dataframe.collect_schema().names()]): return dataframe.group_by_dynamic( BASE_DATA_COLUMN_NAME.TIMESTAMP, every=tf).agg(col(BASE_DATA_COLUMN_NAME.OPEN).first(), col(BASE_DATA_COLUMN_NAME.HIGH).max(), col(BASE_DATA_COLUMN_NAME.LOW).min(), col(BASE_DATA_COLUMN_NAME.CLOSE).last() ) else: logger.error(f'data columns {dataframe.columns} invalid, ' f'required {DATA_COLUMN_NAMES.TICK_DATA} ' f'or {DATA_COLUMN_NAMES.TF_DATA}') raise ValueError
# ATTRS # ADDED VALIDATORS
[docs] def validator_file_path(file_ext=None): def validate_file_path(instance, attribute, value): try: filepath = Path(value) except Exception as e: logger.error(f'File {value} Path creation error: {e}') raise else: if not ( filepath.exists() or filepath.is_file() ): logger.error(f'file {value} not exists') raise FileExistsError return validate_file_path
[docs] def validator_dir_path(create_if_missing=False): def validate_or_create_dir(instance, attribute, value): if create_if_missing: try: Path(value).mkdir(parents=True, exist_ok=True) except FileExistsError: # A parallel process created the directory between the # exist_ok check and the actual os.mkdir syscall (TOCTOU # race on macOS). Safe to ignore if it is now a directory. if not Path(value).is_dir(): logger.error( f'Path {value} exists but is not a directory') raise else: if not ( Path(value).exists() or Path(value).is_dir() ): logger.error(f'Directory {value} not valid') raise TypeError() return validate_or_create_dir
def validator_list_ge(min_value): def validator_list_values(instance, attribute, value): if not ( isinstance(value, list) and all([isinstance(val, int) for val in value]) ): logger.error('Required list of int type for argument ' f'{attribute}') raise TypeError if any([ val < min_value for val in value ]): fails = [ val for val in value if val < min_value ] logger.error(f'Values in {attribute}: {fails} ' f'are not greater than {min_value}') raise ValueError
[docs] def validate_timedelta_str(instance, attribute, value): try: res = to_timedelta(value) if not isinstance(res, Timedelta): raise TypeError( f"'{attribute.name}' must be a valid timedelta representation, got {type(res).__name__}" ) except Exception as e: raise ValueError( f"'{attribute.name}' must be a valid timedelta representation, error: {e}" ) from e
# ATTRIBUTES
[docs] def get_attrs_names(instance_object, **kwargs): if hasattr(instance_object, '__attrs_attrs__'): return [attr.name for attr in instance_object.__attrs_attrs__] else: logger.error('attribute "__attrs__attrs__" not found in ' f'object {instance_object}') raise KeyError
# GENERIC UTILITIES
[docs] def list_remove_duplicates(list_in): return list(dict.fromkeys(list_in))
# HISTDATA data provider utilities # Analyze the Histdata Forex download base page # https://www.histdata.com/download-free-forex-data/?/ascii/1-minute-bar-quotes # and get a list of all avilable tickers in the form as the example "EURUSD"
[docs] def get_histdata_tickers(verify: bool = True) -> List[str]: """ Get all available tickers from HistData.com. Parameters ---------- verify : bool, optional Whether to verify SSL certificates. Default is True. Returns ------- List[str] List of all available tickers (e.g., ['EURUSD', 'GBPUSD', ...]). """ if not verify: import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) url = "https://www.histdata.com/download-free-forex-data/?/ascii/1-minute-bar-quotes" try: requests.head(url, timeout=5, verify=verify) except requests.RequestException as e: logger.error(f'Failed to connect to {url}: {e}') return [] try: response = requests.get(url, verify=verify) response.raise_for_status() soup = BeautifulSoup(response.content, 'html.parser') tickers = [] # Tickers are typically in links that lead to the pair's specific page for link in soup.find_all('a', href=True): href = link['href'] # Pattern check based on the observed links if "/ascii/1-minute-bar-quotes/" in href: parts = href.split('/') ticker = parts[-1] # Validate it's a valid ticker (usually 6 chars like EURUSD) if ticker and len(ticker) >= 6: tickers.append(ticker.upper()) return sorted(list(set(tickers))) except Exception as e: logger.error(f"Failed to retrieve tickers from HistData: {e}") return []
# MARKETS FUNCTIONS US_HOLIDAYS = country_holidays('US', years=YEARS) # Convert US_HOLIDAYS to a list of date objects for efficient filtering US_holiday_dates = [holiday_date for holiday_date in US_HOLIDAYS.keys()]
[docs] def get_forex_holidays(years: range): """ Generates standard Global Forex closure dates (Dec 25 & Jan 1) for a range of years. """ closure_dates = [] for year in years: closure_dates.append(date(year, 1, 1)) # New Year's Day closure_dates.append(date(year, 12, 25)) # Christmas Day return sorted(closure_dates)
FOREX_HOLIDAYS = get_forex_holidays(range(2000, datetime.now().year + 1))
[docs] def business_days_data(dataframe: PolarsLazyFrame | PolarsDataFrame) -> PolarsDataFrame | PolarsLazyFrame: ''' Remove non-business days data from the input dataframe. Filter out weekends data: saturday and sunday. Use holidays to get list of country holidays. Consider dataframe always have a column named 'timestamp' of type datetime. ''' # Filter out weekends (Saturday=6, Sunday=7) and holidays # Use Polars' dt.weekday() where Monday=1, Tuesday=2, ..., Saturday=6, Sunday=7 return dataframe.filter( (col('timestamp').dt.weekday() < 6) & # Keep Monday(1) through Friday(5) (~col('timestamp').dt.date().is_in(US_holiday_dates)) # Exclude holidays )
WEEKDAYS_ITERABLE = (True, True, True, True, True, False, False) def add_business_days( date: date | Timestamp | datetime, business_days_to_add: int, holidays: list, direction: str = 'backward' ) -> datetime: """ Add business days forward or backward from a given start date using Polars. """ # convert date inputs in datetime obj if ( isinstance(date, str) or isinstance(date, Timestamp) ): date = any_date_to_datetime64(date) if isinstance(date, datetime): date = date.date() if isinstance(date, PolarsDatetime): date = date.to_pandas().date() if direction not in ('backward', 'forward'): raise ValueError( f"direction must be 'backward' or 'forward', got '{direction}'" ) # Determine sign based on direction sign = -1 if direction == 'backward' else 1 # add business days using polars method df = PolarsDataFrame({ "start": [date] }) result = df.with_columns( new_start=col("start").dt.add_business_days( sign * business_days_to_add, week_mask=WEEKDAYS_ITERABLE, holidays=holidays, roll=direction ) ) # get the new start value new_date = result.select(col("new_start").first()).item() # if direction is backward, get the floor value if direction == 'backward': new_date = datetime.combine(new_date, datetime.min.time()) # if direction is forward, get the ceil value else: new_date = datetime.combine(new_date, datetime.max.time()) return new_date
[docs] def adjust_start_date_to_business_days( end: datetime | str | Timestamp | PolarsDatetime, timeframe: str | timedelta, periods: int, holidays: list, start: datetime | str | Timestamp | PolarsDatetime | None = None, ) -> Tuple[int, datetime]: """ Calculate if timeframe con fit exactly periods times in the timespan between start and end dates. If not, recursively calculates a new start by advancing backwards with business days. End date remains always unmodified. """ # Convert inputs to datetime objects if isinstance(start, (str, Timestamp)): start = any_date_to_datetime64(start) elif isinstance(start, PolarsDatetime): start = datetime.fromtimestamp(start.timestamp()) if isinstance(end, (str, Timestamp)): end = any_date_to_datetime64(end) elif isinstance(end, PolarsDatetime): end = datetime.fromtimestamp(end.timestamp()) # Convert timeframe to timedelta if it's a string if isinstance(timeframe, str): timeframe_td = Timedelta(timeframe).to_pytimedelta() else: timeframe_td = timeframe # Calculate total duration needed (timeframe * periods) total_duration_needed = timeframe_td * periods # if start is None, calculate it from end date if start is None: start = end - total_duration_needed # Recursive helper function to add business days backwards def add_business_days_backwards( current_start: datetime | date, current_end: datetime | date ) -> datetime | date: """ Recursively adjust start date backwards. """ # convert to date if datetime if isinstance(current_start, datetime): current_start = current_start.date() if isinstance(current_end, datetime): current_end = current_end.date() # count business days between current_start and current_end business_days = PolarsDataFrame({ "start": [current_start], "end": [current_end] }).select( business_day_count( "start", "end", week_mask=WEEKDAYS_ITERABLE, holidays=holidays ) ).item() # business days to timedelta business_days_td = timedelta(days=business_days) if business_days_td < total_duration_needed: # calculate how many business days to add backwards from start business_days_to_add = ceil( (total_duration_needed - business_days_td).days ) if business_days_to_add == 0: business_days_to_add = 1 # add business days using the helper function new_start = add_business_days( current_start, business_days_to_add, holidays, direction='backward' ) return add_business_days_backwards(new_start, current_end) else: # have enough business days return current_start # Call the recursive function new_start = add_business_days_backwards(start, end) final_start = ( new_start.date() if isinstance(new_start, datetime) else new_start ) final_end = end.date() if isinstance(end, datetime) else end # Calculate how many complete timeframe periods fit in the business timespan df_final = PolarsDataFrame({ "start": [final_start], "end": [final_end] }) final_business_days = df_final.select( business_day_count( "start", "end", week_mask=WEEKDAYS_ITERABLE, holidays=holidays ) ).item() final_business_days_td = timedelta(days=final_business_days) if timeframe_td.total_seconds() > 0: count = int(final_business_days_td // timeframe_td) else: count = 0 # convert back to datetime if needed if isinstance(new_start, date): new_start = datetime.combine(new_start, datetime.min.time()) return count, new_start
[docs] def adjust_end_date_to_business_days( start: datetime | str | Timestamp | PolarsDatetime, timeframe: str | timedelta, periods: int, holidays: list, end: datetime | str | Timestamp | PolarsDatetime | None = None ) -> Tuple[int, datetime]: """ Calculate if timeframe can fit exactly periods times in the timespan between start and end dates. If not, recursively calculates a new end by advancing forwards with business days. Start date remains always unmodified. """ # Convert inputs to datetime objects if isinstance(start, (str, Timestamp)): start = any_date_to_datetime64(start) elif isinstance(start, PolarsDatetime): start = datetime.fromtimestamp(start.timestamp()) if isinstance(end, (str, Timestamp)): end = any_date_to_datetime64(end) elif isinstance(end, PolarsDatetime): end = datetime.fromtimestamp(end.timestamp()) # Convert timeframe to timedelta if it's a string if isinstance(timeframe, str): timeframe_td = Timedelta(timeframe).to_pytimedelta() else: timeframe_td = timeframe # Calculate total duration needed (timeframe * periods) total_duration_needed = timeframe_td * periods # if end is None, calculate it from start date if end is None: end = start + total_duration_needed # Recursive helper function to add business days forwards def add_business_days_forwards( current_start: datetime | date, current_end: datetime | date ) -> datetime | date: """ Recursively adjust end date forwards. """ # convert to date if datetime if isinstance(current_start, datetime): current_start = current_start.date() if isinstance(current_end, datetime): current_end = current_end.date() # count business days between current_start and current_end business_days = PolarsDataFrame({ "start": [current_start], "end": [current_end] }).select( business_day_count( "start", "end", week_mask=WEEKDAYS_ITERABLE, holidays=holidays ) ).item() # business days to timedelta business_days_td = timedelta(days=business_days) if business_days_td < total_duration_needed: # calculate how many business days to add forwards from end business_days_to_add = ceil( (total_duration_needed - business_days_td).days ) if business_days_to_add == 0: business_days_to_add = 1 # add business days using the helper function new_end = add_business_days( current_end, business_days_to_add, holidays, direction='forward' ) return add_business_days_forwards(current_start, new_end) else: # have enough business days return current_end # Call the recursive function new_end = add_business_days_forwards(start, end) final_start = start.date() if isinstance(start, datetime) else start final_end = ( new_end.date() if isinstance(new_end, datetime) else new_end ) # Calculate how many complete timeframe periods fit in the business timespan df_final = PolarsDataFrame({ "start": [final_start], "end": [final_end] }) final_business_days = df_final.select( business_day_count( "start", "end", week_mask=WEEKDAYS_ITERABLE, holidays=holidays ) ).item() final_business_days_td = timedelta(days=final_business_days) if timeframe_td.total_seconds() > 0: count = int(final_business_days_td // timeframe_td) else: count = 0 # convert back to datetime if needed if isinstance(new_end, date): new_end = datetime.combine(new_end, datetime.max.time()) return count, new_end
[docs] def estimate_start_date_to_business_days( end_date: datetime | str | Timestamp | PolarsDatetime, timeframe: str | timedelta, window_size: int, holidays: list = [] ) -> datetime: """ Estimate start date needed to cover a window size of timeframe periods. """ _, start_date = adjust_start_date_to_business_days( end=end_date, timeframe=timeframe, periods=window_size, holidays=holidays ) return start_date
[docs] def estimate_end_date_to_business_days( start_date: datetime | str | Timestamp | PolarsDatetime, timeframe: str | timedelta, window_size: int, holidays: list = [] ) -> datetime: """ Estimate end date needed to cover a window size of timeframe periods. """ _, end_date = adjust_end_date_to_business_days( start=start_date, timeframe=timeframe, periods=window_size, holidays=holidays ) return end_date
[docs] def update_ticker_years_dict( ticker_years_dict: Dict[str, Dict[str, List[int]]], ticker: str, timeframe: str, years_to_add: List[int] ) -> bool: """ Update a ticker years dictionary with new years for a specific ticker and timeframe. This function modifies the dictionary in place and returns whether any changes were made. Parameters ---------- ticker_years_dict : Dict[str, Dict[str, List[int]]] Dictionary containing ticker years data, structured as: {ticker: {timeframe: [year1, year2, ...]}} ticker : str The ticker symbol to update timeframe : str The timeframe for the ticker data years_to_add : List[int] List of years to add to the years list Returns ------- bool True if any changes were made, False otherwise """ # convert to int years_to_add items years_to_add = [int(y) for y in years_to_add] # Initialize ticker if not present if ticker not in ticker_years_dict: ticker_years_dict[ticker] = {} # Initialize timeframe if not present if timeframe not in ticker_years_dict[ticker]: ticker_years_dict[ticker][timeframe] = [] # Track if any changes were made changes_made = False # Add years if not already present for y in years_to_add: if y not in ticker_years_dict[ticker][timeframe]: ticker_years_dict[ticker][timeframe].append(y) changes_made = True # Keep years sorted if changes were made if changes_made: ticker_years_dict[ticker][timeframe].sort() return ticker_years_dict, changes_made
''' HISTORICAL DATA PROVIDERS ''' HISTDATA_PROVIDER = 'Histdata' DUKASCOPY_PROVIDER = 'Dukascopy' SUPPORTED_HISTORICAL_DATA_PROVIDERS = [ HISTDATA_PROVIDER, DUKASCOPY_PROVIDER ] ''' REALTIME DATA PROVIDERS ''' # TWELVE DATA TWELVEDATA_PROVIDER = 'TwelveData' PAIR_TWELVE_DATA_FORMAT = PAIR_GENERIC_FORMAT PAIR_TWELVE_DATA_PATTERN = '^' + SINGLE_CURRENCY_PATTERN_STR + \ SINGLE_CURRENCY_PATTERN_STR + '$' TWELVE_DATA_CHUNK_SIZE = 5000 # LIST OF AVAILABLE PLANS ON TWELVE DATA TWELVEDATA_PROVIDER_PLAN_LIST = ["free", "grow", "pro", "ultra"] # FREE TIER CONSTANTS TWELVE_DATA_FREE_TIER_MINUTE_RATE_LIMIT = 8 # api calls per minute TWELVE_DATA_FREE_TIER_DAY_RATE_LIMIT = 800 # api calls per day # PAID SUBSCRIPTION CONSTANTS TWELVE_DATA_PRO_MINUTE_RATE_LIMIT = 55 # api calls per minute TWELVE_DATA_PRO_DAY_RATE_LIMIT = 5000 # api calls per day # TWELVE DATA LIMIT DATE TWELVE_DATA_LIMIT_DATE = datetime(2020, 1, 1) # TIMEFRAME e.g. interval admitted values TWELVE_DATA_TIMEFRAMES = [ "1min", "5min", "15min", "30min", "45min", "1h", "2h", "4h", "8h", "1day", "1week", "1month" ] SUPPORTED_REALTIME_DATA_PROVIDERS = [ TWELVEDATA_PROVIDER ]