# -*- coding: utf-8 -*-
"""
Created on Sat Apr 30 09:23:19 2022
@author: fiora
"""
from loguru import logger
from re import (
fullmatch,
findall,
search,
IGNORECASE
)
from typing import (
cast,
Any,
List,
Literal,
Dict,
Tuple
)
import requests
import random
from math import ceil
from bs4 import BeautifulSoup
from datetime import (
timedelta,
datetime,
date
)
from pathlib import Path
from attrs import (
field,
validators
)
from holidays import (
country_holidays
)
# PANDAS
from pandas import (
DataFrame as pandas_dataframe,
concat as pandas_concat,
Timestamp,
isnull,
bdate_range,
to_datetime,
to_timedelta,
Timedelta,
read_parquet as pandas_read_parquet,
read_csv as pandas_read_csv
)
from pandas.api.types import is_datetime64_any_dtype
from pandas.tseries.frequencies import to_offset
from pandas.tseries.offsets import DateOffset
# PYARROW
from pyarrow import (
float32 as pyarrow_float32,
timestamp as pyarrow_timestamp,
schema as pyarrow_schema,
Table,
table as pyarrow_table,
concat_tables,
csv as arrow_csv
)
from pyarrow.parquet import (
write_table,
read_table
)
# POLARS
from polars import (
Float32 as PolarsFloat32,
Datetime as PolarsDatetime,
read_csv as polars_read_csv,
concat as polars_concat,
col,
len as polars_len,
read_parquet as polars_read_parquet,
from_arrow,
DataFrame as PolarsDataFrame,
LazyFrame as PolarsLazyFrame,
scan_csv as polars_scan_csv,
scan_parquet as polars_scan_parquet,
business_day_count
)
from dateutil.rrule import (
rrule,
DAILY,
MO,
TU,
WE,
TH,
FR
)
__all__ = [
'YEARS',
'MONTHS',
'DATE_FORMAT_SQL',
'DATE_FORMAT_HISTDATA_CSV',
'HISTDATA_URL_TICKDATA_TEMPLATE',
'HISTDATA_BASE_DOWNLOAD_METHOD',
'HISTDATA_BASE_DOWNLOAD_URL',
'DEFAULT_PATHS',
'DATA_TYPE',
'BASE_DATA_COLUMN_NAME',
'DATA_FILE_COLUMN_INDEX',
'SUPPORTED_DATA_FILES',
'SUPPORTED_DATA_ENGINES',
'ASSET_TYPE',
'TEMP_FOLDER',
'TEMP_CSV_FILE',
'DTYPE_DICT',
'PYARROW_DTYPE_DICT',
'POLARS_DTYPE_DICT',
'DATA_COLUMN_NAMES',
'FILENAME_TEMPLATE',
'DATA_KEY',
'TICK_TIMEFRAME',
'FILENAME_STR',
'FILENAME_YEAR_STR',
'DATE_NO_HOUR_FORMAT',
'SQL_COMPARISON_OPERATORS',
'SUPPORTED_SQL_COMPARISON_OPERATORS',
'SUPPORTED_BASE_DATA_COLUMN_NAME',
'SQL_CONDITION_AGGREGATION_MODES',
'SUPPORTED_SQL_CONDITION_AGGREGATION_MODES',
'HISTORICAL_DB_MIN_DATE',
'FOREX_HOLIDAYS',
'HISTDATA_PROVIDER',
'DUKASCOPY_PROVIDER',
'SUPPORTED_HISTORICAL_DATA_PROVIDERS',
'TWELVEDATA_PROVIDER',
'TWELVEDATA_PROVIDER_PLAN_LIST',
'TWELVE_DATA_CHUNK_SIZE',
'TWELVE_DATA_FREE_TIER_MINUTE_RATE_LIMIT',
'TWELVE_DATA_FREE_TIER_DAY_RATE_LIMIT',
'TWELVE_DATA_PRO_MINUTE_RATE_LIMIT',
'TWELVE_DATA_PRO_DAY_RATE_LIMIT',
'TWELVE_DATA_LIMIT_DATE',
'TWELVE_DATA_TIMEFRAMES',
'SUPPORTED_REALTIME_DATA_PROVIDERS',
'validator_file_path',
'validator_dir_path',
'validate_timedelta_str',
'get_attrs_names',
'check_timeframe_str',
'any_date_to_datetime64',
'empty_dataframe',
'is_empty_dataframe',
'shape_dataframe',
'get_dataframe_column',
'get_dataframe_row',
'get_dataframe_element',
'astype',
'read_csv',
'sort_dataframe',
'concat_data',
'list_remove_duplicates',
'reframe_data',
'write_csv',
'write_parquet',
'read_parquet',
'to_pandas_dataframe',
'get_pair_symbols',
'to_source_symbol',
'get_date_interval',
'random_date_between',
'get_histdata_tickers',
'TickerNotFoundError',
'TickerDataNotFoundError',
'TickerDataBadTypeException',
'TickerDataInvalidException',
'business_days_data',
'update_ticker_years_dict',
'get_class_attr_keys',
'get_class_attr_values',
'get_class_attr_dict',
'get_forex_holidays',
'adjust_end_date_to_business_days',
'adjust_start_date_to_business_days',
'estimate_start_date_to_business_days',
'estimate_end_date_to_business_days',
'Timestamp',
'PolarsDatetime',
'PolarsFloat32',
'PolarsDataFrame',
'PolarsLazyFrame',
]
# CUSTOM EXCEPTIONS
# =============================================================================
# TickerNotFoundError:
# This exception is raised when the ticker requested is misspelled
# or does not exist in the database.
[docs]
class TickerNotFoundError(Exception):
pass
# TickerDataNotFoundError:
# This exception is raised when the ticker is found
# but data is not available or data retrieval failed.
[docs]
class TickerDataNotFoundError(Exception):
pass
# TickerDataBadTypeException:
# This exception is raised when the ticker data
# is found but data type is not compliant with the expected type.
[docs]
class TickerDataBadTypeException(Exception):
pass
# TickerDataInvalidException:
# This exception is raised when the ticker data
# is not found or invalid for generic reasons.
[docs]
class TickerDataInvalidException(Exception):
pass
# common functions, constants and templates
TEMP_FOLDER = "Temp"
TEMP_CSV_FILE = "Temp.csv"
HISTDATA_URL_TICKDATA_TEMPLATE = (
'https://www.histdata.com/download-free-forex-historical-data/?/'
'ascii/tick-data-quotes/{ticker}/{year}/{month_num}'
)
HISTDATA_URL_ONEMINDATA_TEMPLATE = (
'http://www.histdata.com/download-free-forex-data/?/'
'ascii/1-minute-bar-quotes/{pair}/{year}/{month_num}'
)
HISTDATA_BASE_DOWNLOAD_URL = "http://www.histdata.com/get.php"
HISTDATA_BASE_DOWNLOAD_METHOD = 'POST'
MONTHS = ['January', 'February', 'March', 'April', 'May', 'June',
'July', 'August', 'September', 'October', 'November', 'December']
YEARS = list(range(2000, datetime.now().year + 1, 1))
HISTORICAL_DB_MIN_DATE = datetime(2000, 1, 1)
DATE_NO_HOUR_FORMAT = '%Y-%m-%d'
DATE_FORMAT_ISO8601 = 'ISO8601'
DATE_FORMAT_SQL = '%Y-%m-%d %H:%M:%S.%f'
DATE_FORMAT_HISTDATA_CSV = '%Y%m%d %H%M%S%3f'
# DATA_KEY_TEMPLATE_STR = '{ticker}.Y{year}.{tf}'
# DATA_KEY_TEMPLATE_PATTERN = '^[A-Za-z]+.Y[0-9]+.[A-Za-z0-9]+'
# FILENAME_STR = '{ticker}_Y{year}_{tf}.{file_ext}'
DATA_KEY_TEMPLATE_STR = '{market}.{ticker}.{tf}'
DATA_KEY_TEMPLATE_PATTERN = '^[A-Za-z0-9]_[A-Za-z]+.[A-Za-z0-9]+'
FILENAME_STR = '{market}_{ticker}_{tf}.{file_ext}'
FILENAME_YEAR_STR = '{market}_{ticker}_{tf}_{year}.{file_ext}'
DEFAULT_TIMEZONE = 'utc'
TICK_TIMEFRAME = 'tick'
# ticker PAIR of forex market
SINGLE_CURRENCY_PATTERN_STR = '[A-Za-z]{3}'
TICKER_PATTERN = '^' + SINGLE_CURRENCY_PATTERN_STR \
+ SINGLE_CURRENCY_PATTERN_STR + '$'
PAIR_GENERIC_FORMAT = '{TO}/{FROM}'
# TIME PATTERN
TIME_WINDOW_PATTERN_STR = '^[-+]?[0-9]+[A-Za-z]{1,}$'
TIME_WINDOW_COMPONENTS_PATTERN_STR = '^[-+]?[0-9]+|[A-Za-z]{1,}$'
TIME_WINDOW_UNIT_PATTERN_STR = '[A-Za-z]{1,}$'
GET_YEAR_FROM_TICK_KEY_PATTERN_STR = '^[A-Za-z].Y[0-9].TICK'
YEAR_FIELD_PATTERN_STR = '^Y([0-9]{4,})$'
POLARS_DURATION_PATTERN_STR = '^[0-9]+(ns|us|ms|s|m|h|d|w|mo|q|y|i)$'
PYARROW_DURATION_PATTERN_STR = '^[0-9]+(ns|us|ms|s|m|h|d|w|mo|q|y|i)$'
# GENERAL UTILITIES
[docs]
def get_class_attr_keys(var):
defined_keys = []
try:
keys = vars(var)
defined_keys = [key for key in keys
if not search(r'__(\w+)__',
key)]
except Exception:
return None
return defined_keys
[docs]
def get_class_attr_values(var):
defined_vals = []
dict_var = dict(vars(var))
keys = get_class_attr_keys(var)
try:
defined_vals = [dict_var[key] for key in keys]
except Exception:
return None
return defined_vals
[docs]
def get_class_attr_dict(var):
def_keys = get_class_attr_keys(var)
return {key: getattr(var, key)
for key in def_keys}
# auxiliary CONSTANT DEFINITIONS
# key template: <ticker>.Y<year>.<timeframe>.<data-type>
[docs]
class DATA_KEY:
MARKET = 0
TICKER_INDEX = 1
TF_INDEX = 2
YEAR_INDEX = 3
# filename template : <ticker>_Y<year>_<timeframe>.<filetype>
[docs]
class FILENAME_TEMPLATE:
TICKER_INDEX = 0
YEAR_INDEX = 1
YEAR_NUMERICAL_CHAR = 1
TF_INDEX = 2
FILETYPE_INDEX = 3
[docs]
class DEFAULT_PATHS:
BASE_PATH = str(Path.home() / '.database')
HIST_DATA_FOLDER = 'HistoricalData'
REALTIME_DATA_FOLDER = 'RealtimeData'
[docs]
class DATA_TYPE:
CSV_FILETYPE = 'csv'
PARQUET_FILETYPE = 'parquet'
[docs]
class DATA_FILE_COLUMN_INDEX:
TIMESTAMP = 0
SUPPORTED_DATA_FILES = [
DATA_TYPE.CSV_FILETYPE,
DATA_TYPE.PARQUET_FILETYPE
]
# supported dataframe engines
# pyarrow is inserted but reframe operation all in pyarrow
# is not yet available, now it is masked
# to a refame call with polars
# reframe_data() on pyarrow Table
SUPPORTED_DATA_ENGINES = [
'pandas',
'pyarrow',
'polars',
'polars_lazy'
]
# SINGLE BASE DATA COMPOSIION TEMPLATE: ['open','close','high','low']
# with datetime/timestamp as index
# column names for dataframes TICK and timeframe filtered
# OHLC and related column names
[docs]
class DATA_COLUMN_NAMES:
TICK_DATA_NO_PVALUE = ['timestamp', 'ask', 'bid', 'vol']
TICK_DATA = ['timestamp', 'ask', 'bid', 'vol', 'p']
TF_DATA = ['timestamp', 'open', 'high', 'low', 'close']
TICK_DATA_TIME_INDEX = ['ask', 'bid', 'vol', 'p']
TF_DATA_TIME_INDEX = ['open', 'high', 'low', 'close']
# SELECTED AS SINGLE BASE DATA COMPOSION TEMPLATE
BASE_DATA = DATA_COLUMN_NAMES.TF_DATA_TIME_INDEX
BASE_DATA_WITH_TIME = DATA_COLUMN_NAMES.TF_DATA
class DB_MODE:
FULL_MODE = 'FULL_MODE'
HISTORICAL_MODE = 'HISTORICAL_MODE'
REALTIME_MODE = 'REALTIME_MODE'
[docs]
class ASSET_TYPE:
STOCK = 'STOCK'
ETF = 'ETF'
FOREX = 'FOREX'
[docs]
class BASE_DATA_COLUMN_NAME:
TIMESTAMP = 'timestamp'
OPEN = 'open'
HIGH = 'high'
LOW = 'low'
CLOSE = 'close'
ASK = 'ask'
BID = 'bid'
VOL = 'vol'
P_VALUE = 'p'
TRANSACTIONS = 'transactions'
VWAP = 'vwap'
OTC = 'otc'
SUPPORTED_BASE_DATA_COLUMN_NAME = Literal[
BASE_DATA_COLUMN_NAME.TIMESTAMP,
BASE_DATA_COLUMN_NAME.OPEN,
BASE_DATA_COLUMN_NAME.HIGH,
BASE_DATA_COLUMN_NAME.LOW,
BASE_DATA_COLUMN_NAME.CLOSE,
BASE_DATA_COLUMN_NAME.ASK,
BASE_DATA_COLUMN_NAME.BID,
BASE_DATA_COLUMN_NAME.VOL,
BASE_DATA_COLUMN_NAME.P_VALUE,
BASE_DATA_COLUMN_NAME.TRANSACTIONS,
BASE_DATA_COLUMN_NAME.VWAP,
BASE_DATA_COLUMN_NAME.OTC
]
[docs]
class SQL_COMPARISON_OPERATORS:
GREATER_THAN = '>'
LESS_THAN = '<'
GREATER_THAN_OR_EQUAL = '>='
LESS_THAN_OR_EQUAL = '<='
EQUAL = '=='
NOT_EQUAL = '!='
SUPPORTED_SQL_COMPARISON_OPERATORS = Literal[
SQL_COMPARISON_OPERATORS.GREATER_THAN,
SQL_COMPARISON_OPERATORS.LESS_THAN,
SQL_COMPARISON_OPERATORS.GREATER_THAN_OR_EQUAL,
SQL_COMPARISON_OPERATORS.LESS_THAN_OR_EQUAL,
SQL_COMPARISON_OPERATORS.EQUAL,
SQL_COMPARISON_OPERATORS.NOT_EQUAL
]
[docs]
class SQL_CONDITION_AGGREGATION_MODES:
AND = 'AND'
OR = 'OR'
SUPPORTED_SQL_CONDITION_AGGREGATION_MODES = Literal[
SQL_CONDITION_AGGREGATION_MODES.AND,
SQL_CONDITION_AGGREGATION_MODES.OR
]
# auxiliary functions
# get elements from db key
[docs]
def get_db_key_elements(key):
res = fullmatch(DATA_KEY_TEMPLATE_STR, key)
if res:
return res.groups()
else:
logger.error(
f'key {key} does not respect regex template {DATA_KEY_TEMPLATE_STR}')
raise ValueError
# parse argument to get datetime object with date format as input
def infer_date_from_format_dt(s, date_format='ISO8601', unit=None, utc=False):
if unit:
return to_datetime(s,
unit=unit,
utc=utc)
else:
return to_datetime(s,
format=date_format,
utc=utc)
# parse timeframe as string and validate if it is valid
# following pandas DateOffset freqstr rules and 'TICK' (=lowest timeframe available)
# link to official pandas doc
# https://pandas.pydata.org/docs/user_guide/timeseries.html#dateoffset-objects
# add compatibility to polars frequency string
[docs]
def check_timeframe_str(tf: str | Timedelta | DateOffset,
engine: Literal['pandas',
'polars',
'polars_lazy',
'pyarrow'] = 'pandas'):
if tf == TICK_TIMEFRAME:
return tf
elif isinstance(tf, Timedelta):
'''
Timedelta type is acceptd by all engines supported
'''
return tf
elif engine == 'pandas':
'''
timeframe value check for pandas engine
'''
try:
if isinstance(to_offset(tf), DateOffset):
return tf
else:
logger.critical(f"Type check: Invalid timeframe for pandas: {tf}")
raise ValueError
except ValueError:
logger.critical(f"Type check: Invalid timeframe for pandas: {tf}")
raise
elif (
engine == 'polars' or
engine == 'polars_lazy' or
engine == 'pyarrow'
):
'''
timeframe value check for polars engine
'''
if isinstance(tf, str):
if fullmatch(POLARS_DURATION_PATTERN_STR, tf, flags=IGNORECASE):
return tf
else:
logger.critical(f"Type check: Invalid timeframe for polars: {tf}")
raise ValueError
elif isinstance(tf, timedelta):
return tf
else:
logger.critical(f"Type check: Invalid timeframe for polars: {tf}")
raise ValueError
else:
logger.critical(f"Type check: Invalid engine: {engine}")
raise ValueError
# PAIR symbol functions
[docs]
def get_pair_symbols(ticker):
components = findall(SINGLE_CURRENCY_PATTERN_STR, ticker)
if len(components) == 2:
return components[0], components[1]
else:
return None
[docs]
def to_source_symbol(ticker, source):
to_symbol, from_symbol = get_pair_symbols(ticker)
return PAIR_TWELVE_DATA_FORMAT.format(TO=to_symbol,
FROM=from_symbol)
# TIMESTAMP RELATED FUNCTIONS
def timewindow_str_to_timedelta(time_window_str):
if fullmatch(TIME_WINDOW_PATTERN_STR, time_window_str):
return Timedelta(time_window_str)
else:
logger.error('time window pattern not match: '
'"<integer_multiplier><unit>" str')
raise ValueError
[docs]
def any_date_to_datetime64(any_date,
date_format='ISO8601',
unit=None,
to_pydatetime=False):
try:
any_date = infer_date_from_format_dt(any_date,
date_format,
unit=unit)
if to_pydatetime:
any_date = any_date.to_pydatetime()
except Exception as e:
logger.error(f'date {any_date} conversion failed, '
f'failed conversion to {date_format} '
'date format')
raise
return any_date
[docs]
def get_date_interval(start=None,
end=None,
interval_start_mode=None,
interval_end_mode='now',
interval_timespan=None,
freq=None,
normalize=False,
bdays=False):
# create start and end date as timestamp instances
start_date = Timestamp(start)
end_date = Timestamp(end)
if interval_timespan:
# a variety of interval mode could be implemented
# 'now' - end of date interval is timestamp now
if interval_end_mode == 'now':
end_date = Timestamp.now()
start_date = end_date - timewindow_str_to_timedelta(interval_timespan)
if bdays:
components = findall(TIME_WINDOW_COMPONENTS_PATTERN_STR,
interval_timespan)
# fixed days redundancy check available only with 'd' type requested
# timespan
if components[1] == 'd':
days_list = list(
rrule(freq=DAILY,
dtstart=start_date,
until=end_date,
byweekday=(MO, TU, WE, TH, FR))
)
while len(days_list) < int(components[0]):
start_date = start_date - Timedelta(days=1)
days_list = list(
rrule(freq=DAILY,
dtstart=start_date,
until=end_date,
byweekday=(MO, TU, WE, TH, FR))
)
# Timestamp() constructor ensures these are Timestamp objects
if normalize:
if not isnull(start_date):
start_date = Timestamp.normalize(start_date)
if not isnull(end_date):
end_date = Timestamp.normalize(end_date)
start_date = any_date_to_datetime64(start_date)
end_date = any_date_to_datetime64(end_date)
# generate DateTimeIndex if freq is set
# otherwise return just start and end of interval
if freq:
bdate_dtindex = bdate_range(start=start_date,
end=end_date,
freq=freq,
tz=None,
normalize=normalize,
name='timestamp'
)
return start_date, end_date, bdate_dtindex
else:
return start_date, end_date
[docs]
def random_date_between(start_date, end_date):
"""
Get a random datetime between two datetime objects.
Args:
start_date (datetime): The start date.
end_date (datetime): The end date.
Returns:
datetime: A random datetime between start_date and end_date.
"""
delta = end_date - start_date
random_seconds = random.randint(0, int(delta.total_seconds()))
return start_date + timedelta(seconds=random_seconds)
# BASE OPERATIONS WITH DATAFRAME
# depending on dataframe engine support
# for supported engines see var SUPPORTED_DATA_ENGINES
# DATA ENGINES TYPES DICTIONARY
[docs]
class DTYPE_DICT:
TICK_DTYPE = {'ask': 'float32',
'bid': 'float32',
'vol': 'float32',
'p': 'float32'}
TF_DTYPE = {'open': 'float32',
'high': 'float32',
'low': 'float32',
'close': 'float32'}
TIME_TICK_DTYPE = {'timestamp': 'datetime64[ms]',
'ask': 'float32',
'bid': 'float32',
'vol': 'float32',
'p': 'float32'}
TIME_TF_DTYPE = {'timestamp': 'datetime64[ms]',
'open': 'float32',
'high': 'float32',
'low': 'float32',
'close': 'float32'}
[docs]
class PYARROW_DTYPE_DICT:
TICK_DTYPE = {'ask': pyarrow_float32(),
'bid': pyarrow_float32(),
'vol': pyarrow_float32(),
'p': pyarrow_float32()}
TF_DTYPE = {'open': pyarrow_float32(),
'high': pyarrow_float32(),
'low': pyarrow_float32(),
'close': pyarrow_float32()}
TIME_TICK_DTYPE = {'timestamp': pyarrow_timestamp('ms'),
'ask': pyarrow_float32(),
'bid': pyarrow_float32(),
'vol': pyarrow_float32(),
'p': pyarrow_float32()}
TIME_TF_DTYPE = {'timestamp': pyarrow_timestamp('ms'),
'open': pyarrow_float32(),
'high': pyarrow_float32(),
'low': pyarrow_float32(),
'close': pyarrow_float32()}
[docs]
class POLARS_DTYPE_DICT:
TICK_DTYPE = {'ask': PolarsFloat32,
'bid': PolarsFloat32,
'vol': PolarsFloat32,
'p': PolarsFloat32}
TF_DTYPE = {'open': PolarsFloat32,
'high': PolarsFloat32,
'low': PolarsFloat32,
'close': PolarsFloat32}
TIME_TICK_DTYPE = {'timestamp': PolarsDatetime('ms'),
'ask': PolarsFloat32,
'bid': PolarsFloat32,
'vol': PolarsFloat32,
'p': PolarsFloat32}
TIME_TF_DTYPE = {'timestamp': PolarsDatetime('ms'),
'open': PolarsFloat32,
'high': PolarsFloat32,
'low': PolarsFloat32,
'close': PolarsFloat32}
# DATA ENGINES FUNCTIONS
[docs]
def empty_dataframe(engine):
if engine == 'pandas':
return pandas_dataframe()
elif engine == 'pyarrow':
return pyarrow_table([])
elif engine == 'polars':
return PolarsDataFrame()
elif engine == 'polars_lazy':
return PolarsLazyFrame()
else:
logger.error('function empty_dataframe not available'
f' for engine {engine}')
raise ValueError
[docs]
def is_empty_dataframe(dataframe):
if isinstance(dataframe, pandas_dataframe):
return dataframe.empty
elif isinstance(dataframe, Table):
return (not bool(dataframe))
elif isinstance(dataframe, PolarsDataFrame):
return dataframe.is_empty()
elif isinstance(dataframe, PolarsLazyFrame):
return dataframe.collect().is_empty()
else:
logger.error('function is_empty_dataframe not available'
' for instance of type'
f' {type(dataframe)}')
raise ValueError
[docs]
def shape_dataframe(dataframe):
if isinstance(dataframe, pandas_dataframe):
return dataframe.shape
elif isinstance(dataframe, Table):
return dataframe.shape
elif isinstance(dataframe, PolarsDataFrame):
return dataframe.shape
elif isinstance(dataframe, PolarsLazyFrame):
return (
dataframe.select(polars_len()).collect().item(0, 0),
dataframe.collect_schema().len()
)
else:
logger.error('function shape_dataframe not available'
' for instance of type'
f' {type(dataframe)}')
raise ValueError
[docs]
def sort_dataframe(dataframe, column):
if isinstance(dataframe, pandas_dataframe):
return dataframe.sort_values(by=[column])
elif isinstance(dataframe, Table):
return dataframe.sort_by(column)
elif isinstance(dataframe, PolarsDataFrame):
return dataframe.sort(column, nulls_last=True)
elif isinstance(dataframe, PolarsLazyFrame):
return dataframe.sort(column, nulls_last=True)
else:
logger.error('function sort_dataframe not available'
' for instance of type'
f' {type(dataframe)}')
raise ValueError
[docs]
def get_dataframe_column(dataframe, column):
if isinstance(dataframe, pandas_dataframe):
return dataframe[column]
elif isinstance(dataframe, Table):
return dataframe[column]
elif isinstance(dataframe, PolarsDataFrame):
return dataframe[column]
elif isinstance(dataframe, PolarsLazyFrame):
return dataframe.select(column).collect()
else:
logger.error('function get_dataframe_column not available'
' for instance of type'
f' {type(dataframe)}')
[docs]
def get_dataframe_row(dataframe, row):
if isinstance(dataframe, pandas_dataframe):
return dataframe.loc[row]
elif isinstance(dataframe, Table):
return dataframe.slice(row, 1)
elif isinstance(dataframe, PolarsDataFrame):
return dataframe.slice(row, 1)
elif isinstance(dataframe, PolarsLazyFrame):
return dataframe.slice(row, 1)
else:
logger.error('function get_dataframe_row not available'
' for instance of type'
f' {type(dataframe)}')
[docs]
def get_dataframe_element(dataframe, column, row):
if isinstance(dataframe, pandas_dataframe):
return dataframe[column][row]
elif isinstance(dataframe, Table):
return dataframe[column][row]
elif isinstance(dataframe, PolarsDataFrame):
return dataframe[column][row]
elif isinstance(dataframe, PolarsLazyFrame):
return dataframe.select(column).collect().item(row, 0)
else:
logger.error('function get_dataframe_element not available'
' for instance of type'
f' {type(dataframe)}')
raise ValueError
def dtype_dict_to_pyarrow_schema(dtype_dict):
return pyarrow_schema(dtype_dict.items())
[docs]
def astype(dataframe, dtype_dict):
if isinstance(dataframe, pandas_dataframe):
return dataframe.astype(dtype_dict)
elif isinstance(dataframe, Table):
return dataframe.cast(dtype_dict_to_pyarrow_schema(dtype_dict))
elif isinstance(dataframe, PolarsDataFrame):
return dataframe.cast(dtype_dict)
elif isinstance(dataframe, PolarsLazyFrame):
return dataframe.cast(dtype_dict)
else:
logger.error('function astype not available'
' for instance of type'
f' {type(dataframe)}')
raise ValueError
[docs]
def read_parquet(engine, filepath):
if engine == 'pandas':
return pandas_read_parquet(filepath)
elif engine == 'pyarrow':
return read_table(filepath)
elif engine == 'polars':
return polars_read_parquet(filepath)
elif engine == 'polars_lazy':
return polars_scan_parquet(filepath)
else:
logger.error('function read_parquet not available'
f' for engine {engine}')
raise ValueError
[docs]
def write_parquet(dataframe, filepath):
if isinstance(dataframe, pandas_dataframe):
try:
dataframe.to_parquet(filepath, index=True)
except Exception as e:
logger.exception(f'pandas write parquet failed: {e}')
raise
elif isinstance(dataframe, Table):
try:
write_table(dataframe, filepath)
except Exception as e:
logger.exception(f'pyarrow write parquet failed: {e}')
raise
elif isinstance(dataframe, PolarsDataFrame):
try:
dataframe.write_parquet(filepath)
except Exception as e:
logger.exception(f'polars write parquet failed: {e}')
raise
elif isinstance(dataframe, PolarsLazyFrame):
try:
dataframe.sink_parquet(filepath)
# alternative to sink_parquet()
# dataframe.collect(streaming=False).write_parquet(filepath)
except Exception as e:
logger.exception(f'polars lazyframe sink '
'parquet failed: {e}')
raise
else:
logger.error('function write_parquet not available'
' for instance of type'
f' {type(dataframe)}')
raise ValueError
[docs]
def read_csv(engine, file, **kwargs):
if engine == 'pandas':
return pandas_read_csv(file, **kwargs)
elif engine == 'pyarrow':
return arrow_csv.read_csv(file, **kwargs)
elif engine == 'polars':
return polars_read_csv(file, **kwargs)
elif engine == 'polars_lazy':
return polars_scan_csv(file, **kwargs)
else:
logger.error('function read_csv not available'
f' for engine {engine}')
raise ValueError
[docs]
def write_csv(dataframe, file, **kwargs):
if isinstance(dataframe, pandas_dataframe):
try:
# IMPORTANT
# pandas dataframe case
# avoid date_format parameter since it is reported that
# it makes to_csv to be excessively long with column data
# being datetime data type
# see: https://github.com/pandas-dev/pandas/issues/37484
# https://stackoverflow.com/questions/65903287/pandas-1-2-1-to-csv-performance-with-datetime-as-the-index-and-setting-date-form
dataframe.to_csv(file,
header=True,
**kwargs)
except Exception as e:
logger.exception(f'Error writing csv file {file}'
f' with data type {type(dataframe)}: {e}')
raise IOError
elif isinstance(dataframe, Table):
try:
arrow_csv.write_csv(dataframe, file, **kwargs)
except Exception as e:
logger.exception(f'Error writing csv file {file}'
f' with data type {type(dataframe)}: {e}')
raise IOError
elif isinstance(dataframe, PolarsDataFrame):
try:
dataframe.write_csv(file, **kwargs)
except Exception as e:
logger.exception(f'Error writing csv file {file}'
f' with data type {type(dataframe)}: {e}')
raise IOError
elif isinstance(dataframe, PolarsLazyFrame):
try:
dataframe.sink_csv(file, **kwargs)
except Exception as e:
logger.exception(f'Error writing csv file {file}'
f' with data type {type(dataframe)}: {e}')
raise IOError
else:
logger.error('function write_csv not available'
' for instance of type'
f' {type(dataframe)}')
raise ValueError
[docs]
def concat_data(data_list=field(validator=validators.instance_of(list))):
if not isinstance(data_list, list):
logger.error('required input as list')
raise TypeError
# assume data type is unique by input
# get type from first element
if isinstance(data_list[0], pandas_dataframe):
return pandas_concat(data_list,
ignore_index=False,
copy=False)
elif isinstance(data_list[0], Table):
return concat_tables(data_list)
elif isinstance(data_list[0], PolarsDataFrame):
return polars_concat(data_list, how='vertical')
elif isinstance(data_list[0], PolarsLazyFrame):
return polars_concat(data_list, how='vertical')
else:
logger.error('function concat not available'
' for instance of type'
f' {type(data_list[0])}')
raise ValueError
[docs]
def to_pandas_dataframe(dataframe):
# convert to pandas dataframe
# useful for those calls
# requiring pandas as input
# or pandas functions not covered
# by other dataframe instance
if isinstance(dataframe, pandas_dataframe):
return dataframe
elif isinstance(dataframe, Table):
return dataframe.to_pandas()
elif isinstance(dataframe, PolarsDataFrame):
return dataframe.to_pandas(use_pyarrow_extension_array=True)
elif isinstance(dataframe, PolarsLazyFrame):
return dataframe.collect().to_pandas(use_pyarrow_extension_array=True)
else:
logger.error('function to_pandas() not available'
' for instance of type'
f' {type(dataframe)}')
raise ValueError
[docs]
def reframe_data(dataframe, tf):
'''
Parameters
----------
data : TYPE
DESCRIPTION.
tf : TYPE
DESCRIPTION.
Raises
------
ValueError
DESCRIPTION.
Returns
-------
Dataframe
DESCRIPTION.
'''
if is_empty_dataframe(dataframe):
return dataframe
if isinstance(dataframe, pandas_dataframe):
# assert timeframe input value
tf = check_timeframe_str(tf, engine='pandas')
if tf == TICK_TIMEFRAME:
logger.warning(f'reframe not possible with target {TICK_TIMEFRAME}')
return dataframe
if not is_datetime64_any_dtype(dataframe.index):
if BASE_DATA_COLUMN_NAME.TIMESTAMP in dataframe.columns:
if not is_datetime64_any_dtype(
dataframe[BASE_DATA_COLUMN_NAME.TIMESTAMP]):
try:
dataframe[BASE_DATA_COLUMN_NAME.TIMESTAMP] = any_date_to_datetime64(
dataframe[BASE_DATA_COLUMN_NAME.TIMESTAMP])
except Exception as e:
logger.exception('Pandas engine: '
'Failed conversion of timestamp columns '
'to DatetimeIndex')
raise
else:
logger.error('Pandas engine: required column with '
f'name {BASE_DATA_COLUMN_NAME.TIMESTAMP}')
raise ValueError
# use pandas functions to reframe data on pandas Dataframe
dataframe = sort_dataframe(dataframe, BASE_DATA_COLUMN_NAME.TIMESTAMP)
dataframe = dataframe.set_index(BASE_DATA_COLUMN_NAME.TIMESTAMP,
inplace=False,
drop=True
)
# resample based on p value
if all([col in DATA_COLUMN_NAMES.TICK_DATA_TIME_INDEX
for col in dataframe.columns]):
# resample along 'p' column, data in ask, bid, p format
dataframe = dataframe.p.resample(tf).ohlc().interpolate(method='nearest')
elif all([col in DATA_COLUMN_NAMES.TF_DATA_TIME_INDEX
for col in dataframe.columns]):
# resample along given data already in ohlc format
dataframe = dataframe.resample(tf).interpolate(method='nearest')
else:
logger.error(f'data columns {dataframe.columns} invalid, '
f'required {DATA_COLUMN_NAMES.TICK_DATA_TIME_INDEX} '
f'or {DATA_COLUMN_NAMES.TF_DATA_TIME_INDEX}')
raise ValueError
return dataframe.reset_index(drop=False)
elif isinstance(dataframe, Table):
# assert timeframe input value
tf = check_timeframe_str(tf, engine='pyarrow')
'''
use pyarrow functions to reframe data on pyarrow Table
could not find easy way to filter an arrow table
based on time interval
opened an enhancement issue on github
https://github.com/apache/arrow/issues/41049
As a temporary alternative, convert arrow Table to polars
and perform reframe with polars engine
'''
if all([col in DATA_COLUMN_NAMES.TICK_DATA
for col in dataframe.column_names]):
# convert to polars dataframe
dataframe = from_arrow(dataframe,
schema=cast(Any, POLARS_DTYPE_DICT.TIME_TICK_DTYPE))
elif all([col in DATA_COLUMN_NAMES.TF_DATA
for col in dataframe.column_names]):
# convert to polars dataframe
dataframe = from_arrow(dataframe,
schema=cast(Any, POLARS_DTYPE_DICT.TIME_TF_DTYPE))
# perform operation
# convert to arrow Table and return
return reframe_data(dataframe, tf).to_arrow()
elif isinstance(dataframe, PolarsDataFrame):
# assert timeframe input value
tf = check_timeframe_str(tf, engine='polars')
tf = tf.lower()
dataframe = sort_dataframe(dataframe, BASE_DATA_COLUMN_NAME.TIMESTAMP)
if all([col in DATA_COLUMN_NAMES.TICK_DATA
for col in dataframe.columns]):
return dataframe.group_by_dynamic(
BASE_DATA_COLUMN_NAME.TIMESTAMP,
every=tf).agg(col('p').first().alias(BASE_DATA_COLUMN_NAME.OPEN),
col('p').max().alias(BASE_DATA_COLUMN_NAME.HIGH),
col('p').min().alias(BASE_DATA_COLUMN_NAME.LOW),
col('p').last().alias(BASE_DATA_COLUMN_NAME.CLOSE)
)
elif all([col in DATA_COLUMN_NAMES.TF_DATA
for col in dataframe.columns]):
return dataframe.group_by_dynamic(
BASE_DATA_COLUMN_NAME.TIMESTAMP,
every=tf).agg(col(BASE_DATA_COLUMN_NAME.OPEN).first(),
col(BASE_DATA_COLUMN_NAME.HIGH).max(),
col(BASE_DATA_COLUMN_NAME.LOW).min(),
col(BASE_DATA_COLUMN_NAME.CLOSE).last()
)
else:
logger.error(f'data columns {dataframe.columns} invalid, '
f'required {DATA_COLUMN_NAMES.TICK_DATA} '
f'or {DATA_COLUMN_NAMES.TF_DATA}')
raise ValueError
elif isinstance(dataframe, PolarsLazyFrame):
tf = tf.lower()
dataframe = dataframe.sort('timestamp', nulls_last=True)
if all([col in DATA_COLUMN_NAMES.TICK_DATA
for col in dataframe.collect_schema().names()]):
return dataframe.group_by_dynamic(
BASE_DATA_COLUMN_NAME.TIMESTAMP,
every=tf).agg(col('p').first().alias(BASE_DATA_COLUMN_NAME.OPEN),
col('p').max().alias(BASE_DATA_COLUMN_NAME.HIGH),
col('p').min().alias(BASE_DATA_COLUMN_NAME.LOW),
col('p').last().alias(BASE_DATA_COLUMN_NAME.CLOSE)
)
elif all([col in DATA_COLUMN_NAMES.TF_DATA
for col in dataframe.collect_schema().names()]):
return dataframe.group_by_dynamic(
BASE_DATA_COLUMN_NAME.TIMESTAMP,
every=tf).agg(col(BASE_DATA_COLUMN_NAME.OPEN).first(),
col(BASE_DATA_COLUMN_NAME.HIGH).max(),
col(BASE_DATA_COLUMN_NAME.LOW).min(),
col(BASE_DATA_COLUMN_NAME.CLOSE).last()
)
else:
logger.error(f'data columns {dataframe.columns} invalid, '
f'required {DATA_COLUMN_NAMES.TICK_DATA} '
f'or {DATA_COLUMN_NAMES.TF_DATA}')
raise ValueError
# ATTRS
# ADDED VALIDATORS
[docs]
def validator_file_path(file_ext=None):
def validate_file_path(instance, attribute, value):
try:
filepath = Path(value)
except Exception as e:
logger.error(f'File {value} Path creation error: {e}')
raise
else:
if not (
filepath.exists() or
filepath.is_file()
):
logger.error(f'file {value} not exists')
raise FileExistsError
return validate_file_path
[docs]
def validator_dir_path(create_if_missing=False):
def validate_or_create_dir(instance, attribute, value):
if create_if_missing:
try:
Path(value).mkdir(parents=True, exist_ok=True)
except FileExistsError:
# A parallel process created the directory between the
# exist_ok check and the actual os.mkdir syscall (TOCTOU
# race on macOS). Safe to ignore if it is now a directory.
if not Path(value).is_dir():
logger.error(
f'Path {value} exists but is not a directory')
raise
else:
if not (
Path(value).exists() or
Path(value).is_dir()
):
logger.error(f'Directory {value} not valid')
raise TypeError()
return validate_or_create_dir
def validator_list_ge(min_value):
def validator_list_values(instance, attribute, value):
if not (
isinstance(value, list) and
all([isinstance(val, int)
for val in value])
):
logger.error('Required list of int type for argument '
f'{attribute}')
raise TypeError
if any([
val < min_value
for val in value
]):
fails = [
val for val in value
if val < min_value
]
logger.error(f'Values in {attribute}: {fails} '
f'are not greater than {min_value}')
raise ValueError
[docs]
def validate_timedelta_str(instance, attribute, value):
try:
res = to_timedelta(value)
if not isinstance(res, Timedelta):
raise TypeError(
f"'{attribute.name}' must be a valid timedelta representation, got {type(res).__name__}"
)
except Exception as e:
raise ValueError(
f"'{attribute.name}' must be a valid timedelta representation, error: {e}"
) from e
# ATTRIBUTES
[docs]
def get_attrs_names(instance_object, **kwargs):
if hasattr(instance_object, '__attrs_attrs__'):
return [attr.name
for attr in instance_object.__attrs_attrs__]
else:
logger.error('attribute "__attrs__attrs__" not found in '
f'object {instance_object}')
raise KeyError
# GENERIC UTILITIES
[docs]
def list_remove_duplicates(list_in):
return list(dict.fromkeys(list_in))
# HISTDATA data provider utilities
# Analyze the Histdata Forex download base page
# https://www.histdata.com/download-free-forex-data/?/ascii/1-minute-bar-quotes
# and get a list of all avilable tickers in the form as the example "EURUSD"
[docs]
def get_histdata_tickers(verify: bool = True) -> List[str]:
"""
Get all available tickers from HistData.com.
Parameters
----------
verify : bool, optional
Whether to verify SSL certificates. Default is True.
Returns
-------
List[str]
List of all available tickers (e.g., ['EURUSD', 'GBPUSD', ...]).
"""
if not verify:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
url = "https://www.histdata.com/download-free-forex-data/?/ascii/1-minute-bar-quotes"
try:
requests.head(url, timeout=5, verify=verify)
except requests.RequestException as e:
logger.error(f'Failed to connect to {url}: {e}')
return []
try:
response = requests.get(url, verify=verify)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
tickers = []
# Tickers are typically in links that lead to the pair's specific page
for link in soup.find_all('a', href=True):
href = link['href']
# Pattern check based on the observed links
if "/ascii/1-minute-bar-quotes/" in href:
parts = href.split('/')
ticker = parts[-1]
# Validate it's a valid ticker (usually 6 chars like EURUSD)
if ticker and len(ticker) >= 6:
tickers.append(ticker.upper())
return sorted(list(set(tickers)))
except Exception as e:
logger.error(f"Failed to retrieve tickers from HistData: {e}")
return []
# MARKETS FUNCTIONS
US_HOLIDAYS = country_holidays('US', years=YEARS)
# Convert US_HOLIDAYS to a list of date objects for efficient filtering
US_holiday_dates = [holiday_date for holiday_date in US_HOLIDAYS.keys()]
[docs]
def get_forex_holidays(years: range):
"""
Generates standard Global Forex closure dates (Dec 25 & Jan 1)
for a range of years.
"""
closure_dates = []
for year in years:
closure_dates.append(date(year, 1, 1)) # New Year's Day
closure_dates.append(date(year, 12, 25)) # Christmas Day
return sorted(closure_dates)
FOREX_HOLIDAYS = get_forex_holidays(range(2000, datetime.now().year + 1))
[docs]
def business_days_data(dataframe: PolarsLazyFrame |
PolarsDataFrame) -> PolarsDataFrame | PolarsLazyFrame:
'''
Remove non-business days data from the input dataframe.
Filter out weekends data: saturday and sunday.
Use holidays to get list of country holidays.
Consider dataframe always have a column named 'timestamp' of type datetime.
'''
# Filter out weekends (Saturday=6, Sunday=7) and holidays
# Use Polars' dt.weekday() where Monday=1, Tuesday=2, ..., Saturday=6, Sunday=7
return dataframe.filter(
(col('timestamp').dt.weekday() < 6) & # Keep Monday(1) through Friday(5)
(~col('timestamp').dt.date().is_in(US_holiday_dates)) # Exclude holidays
)
WEEKDAYS_ITERABLE = (True, True, True, True, True, False, False)
def add_business_days(
date: date | Timestamp | datetime,
business_days_to_add: int,
holidays: list,
direction: str = 'backward'
) -> datetime:
"""
Add business days forward or backward from a given start date using Polars.
"""
# convert date inputs in datetime obj
if (
isinstance(date, str)
or
isinstance(date, Timestamp)
):
date = any_date_to_datetime64(date)
if isinstance(date, datetime):
date = date.date()
if isinstance(date, PolarsDatetime):
date = date.to_pandas().date()
if direction not in ('backward', 'forward'):
raise ValueError(
f"direction must be 'backward' or 'forward', got '{direction}'"
)
# Determine sign based on direction
sign = -1 if direction == 'backward' else 1
# add business days using polars method
df = PolarsDataFrame({
"start": [date]
})
result = df.with_columns(
new_start=col("start").dt.add_business_days(
sign * business_days_to_add,
week_mask=WEEKDAYS_ITERABLE,
holidays=holidays,
roll=direction
)
)
# get the new start value
new_date = result.select(col("new_start").first()).item()
# if direction is backward, get the floor value
if direction == 'backward':
new_date = datetime.combine(new_date, datetime.min.time())
# if direction is forward, get the ceil value
else:
new_date = datetime.combine(new_date, datetime.max.time())
return new_date
[docs]
def adjust_start_date_to_business_days(
end: datetime | str | Timestamp | PolarsDatetime,
timeframe: str | timedelta,
periods: int,
holidays: list,
start: datetime | str | Timestamp | PolarsDatetime | None = None,
) -> Tuple[int, datetime]:
"""
Calculate if timeframe con fit exactly periods times in the timespan
between start and end dates.
If not, recursively calculates a new start by advancing backwards
with business days. End date remains always unmodified.
"""
# Convert inputs to datetime objects
if isinstance(start, (str, Timestamp)):
start = any_date_to_datetime64(start)
elif isinstance(start, PolarsDatetime):
start = datetime.fromtimestamp(start.timestamp())
if isinstance(end, (str, Timestamp)):
end = any_date_to_datetime64(end)
elif isinstance(end, PolarsDatetime):
end = datetime.fromtimestamp(end.timestamp())
# Convert timeframe to timedelta if it's a string
if isinstance(timeframe, str):
timeframe_td = Timedelta(timeframe).to_pytimedelta()
else:
timeframe_td = timeframe
# Calculate total duration needed (timeframe * periods)
total_duration_needed = timeframe_td * periods
# if start is None, calculate it from end date
if start is None:
start = end - total_duration_needed
# Recursive helper function to add business days backwards
def add_business_days_backwards(
current_start: datetime | date,
current_end: datetime | date
) -> datetime | date:
"""
Recursively adjust start date backwards.
"""
# convert to date if datetime
if isinstance(current_start, datetime):
current_start = current_start.date()
if isinstance(current_end, datetime):
current_end = current_end.date()
# count business days between current_start and current_end
business_days = PolarsDataFrame({
"start": [current_start],
"end": [current_end]
}).select(
business_day_count(
"start", "end",
week_mask=WEEKDAYS_ITERABLE,
holidays=holidays
)
).item()
# business days to timedelta
business_days_td = timedelta(days=business_days)
if business_days_td < total_duration_needed:
# calculate how many business days to add backwards from start
business_days_to_add = ceil(
(total_duration_needed - business_days_td).days
)
if business_days_to_add == 0:
business_days_to_add = 1
# add business days using the helper function
new_start = add_business_days(
current_start,
business_days_to_add,
holidays,
direction='backward'
)
return add_business_days_backwards(new_start, current_end)
else:
# have enough business days
return current_start
# Call the recursive function
new_start = add_business_days_backwards(start, end)
final_start = (
new_start.date() if isinstance(new_start, datetime) else new_start
)
final_end = end.date() if isinstance(end, datetime) else end
# Calculate how many complete timeframe periods fit in the business timespan
df_final = PolarsDataFrame({
"start": [final_start],
"end": [final_end]
})
final_business_days = df_final.select(
business_day_count(
"start", "end",
week_mask=WEEKDAYS_ITERABLE,
holidays=holidays
)
).item()
final_business_days_td = timedelta(days=final_business_days)
if timeframe_td.total_seconds() > 0:
count = int(final_business_days_td // timeframe_td)
else:
count = 0
# convert back to datetime if needed
if isinstance(new_start, date):
new_start = datetime.combine(new_start, datetime.min.time())
return count, new_start
[docs]
def adjust_end_date_to_business_days(
start: datetime | str | Timestamp | PolarsDatetime,
timeframe: str | timedelta,
periods: int,
holidays: list,
end: datetime | str | Timestamp | PolarsDatetime | None = None
) -> Tuple[int, datetime]:
"""
Calculate if timeframe can fit exactly periods times in the timespan
between start and end dates.
If not, recursively calculates a new end by advancing forwards
with business days. Start date remains always unmodified.
"""
# Convert inputs to datetime objects
if isinstance(start, (str, Timestamp)):
start = any_date_to_datetime64(start)
elif isinstance(start, PolarsDatetime):
start = datetime.fromtimestamp(start.timestamp())
if isinstance(end, (str, Timestamp)):
end = any_date_to_datetime64(end)
elif isinstance(end, PolarsDatetime):
end = datetime.fromtimestamp(end.timestamp())
# Convert timeframe to timedelta if it's a string
if isinstance(timeframe, str):
timeframe_td = Timedelta(timeframe).to_pytimedelta()
else:
timeframe_td = timeframe
# Calculate total duration needed (timeframe * periods)
total_duration_needed = timeframe_td * periods
# if end is None, calculate it from start date
if end is None:
end = start + total_duration_needed
# Recursive helper function to add business days forwards
def add_business_days_forwards(
current_start: datetime | date,
current_end: datetime | date
) -> datetime | date:
"""
Recursively adjust end date forwards.
"""
# convert to date if datetime
if isinstance(current_start, datetime):
current_start = current_start.date()
if isinstance(current_end, datetime):
current_end = current_end.date()
# count business days between current_start and current_end
business_days = PolarsDataFrame({
"start": [current_start],
"end": [current_end]
}).select(
business_day_count(
"start", "end",
week_mask=WEEKDAYS_ITERABLE,
holidays=holidays
)
).item()
# business days to timedelta
business_days_td = timedelta(days=business_days)
if business_days_td < total_duration_needed:
# calculate how many business days to add forwards from end
business_days_to_add = ceil(
(total_duration_needed - business_days_td).days
)
if business_days_to_add == 0:
business_days_to_add = 1
# add business days using the helper function
new_end = add_business_days(
current_end,
business_days_to_add,
holidays,
direction='forward'
)
return add_business_days_forwards(current_start, new_end)
else:
# have enough business days
return current_end
# Call the recursive function
new_end = add_business_days_forwards(start, end)
final_start = start.date() if isinstance(start, datetime) else start
final_end = (
new_end.date() if isinstance(new_end, datetime) else new_end
)
# Calculate how many complete timeframe periods fit in the business timespan
df_final = PolarsDataFrame({
"start": [final_start],
"end": [final_end]
})
final_business_days = df_final.select(
business_day_count(
"start", "end",
week_mask=WEEKDAYS_ITERABLE,
holidays=holidays
)
).item()
final_business_days_td = timedelta(days=final_business_days)
if timeframe_td.total_seconds() > 0:
count = int(final_business_days_td // timeframe_td)
else:
count = 0
# convert back to datetime if needed
if isinstance(new_end, date):
new_end = datetime.combine(new_end, datetime.max.time())
return count, new_end
[docs]
def estimate_start_date_to_business_days(
end_date: datetime | str | Timestamp | PolarsDatetime,
timeframe: str | timedelta,
window_size: int,
holidays: list = []
) -> datetime:
"""
Estimate start date needed to cover a window size of timeframe periods.
"""
_, start_date = adjust_start_date_to_business_days(
end=end_date,
timeframe=timeframe,
periods=window_size,
holidays=holidays
)
return start_date
[docs]
def estimate_end_date_to_business_days(
start_date: datetime | str | Timestamp | PolarsDatetime,
timeframe: str | timedelta,
window_size: int,
holidays: list = []
) -> datetime:
"""
Estimate end date needed to cover a window size of timeframe periods.
"""
_, end_date = adjust_end_date_to_business_days(
start=start_date,
timeframe=timeframe,
periods=window_size,
holidays=holidays
)
return end_date
[docs]
def update_ticker_years_dict(
ticker_years_dict: Dict[str, Dict[str, List[int]]],
ticker: str,
timeframe: str,
years_to_add: List[int]
) -> bool:
"""
Update a ticker years dictionary with new years for a specific ticker and timeframe.
This function modifies the dictionary in place and returns whether any changes were made.
Parameters
----------
ticker_years_dict : Dict[str, Dict[str, List[int]]]
Dictionary containing ticker years data, structured as:
{ticker: {timeframe: [year1, year2, ...]}}
ticker : str
The ticker symbol to update
timeframe : str
The timeframe for the ticker data
years_to_add : List[int]
List of years to add to the years list
Returns
-------
bool
True if any changes were made, False otherwise
"""
# convert to int years_to_add items
years_to_add = [int(y) for y in years_to_add]
# Initialize ticker if not present
if ticker not in ticker_years_dict:
ticker_years_dict[ticker] = {}
# Initialize timeframe if not present
if timeframe not in ticker_years_dict[ticker]:
ticker_years_dict[ticker][timeframe] = []
# Track if any changes were made
changes_made = False
# Add years if not already present
for y in years_to_add:
if y not in ticker_years_dict[ticker][timeframe]:
ticker_years_dict[ticker][timeframe].append(y)
changes_made = True
# Keep years sorted if changes were made
if changes_made:
ticker_years_dict[ticker][timeframe].sort()
return ticker_years_dict, changes_made
'''
HISTORICAL DATA PROVIDERS
'''
HISTDATA_PROVIDER = 'Histdata'
DUKASCOPY_PROVIDER = 'Dukascopy'
SUPPORTED_HISTORICAL_DATA_PROVIDERS = [
HISTDATA_PROVIDER,
DUKASCOPY_PROVIDER
]
'''
REALTIME DATA PROVIDERS
'''
# TWELVE DATA
TWELVEDATA_PROVIDER = 'TwelveData'
PAIR_TWELVE_DATA_FORMAT = PAIR_GENERIC_FORMAT
PAIR_TWELVE_DATA_PATTERN = '^' + SINGLE_CURRENCY_PATTERN_STR + \
SINGLE_CURRENCY_PATTERN_STR + '$'
TWELVE_DATA_CHUNK_SIZE = 5000
# LIST OF AVAILABLE PLANS ON TWELVE DATA
TWELVEDATA_PROVIDER_PLAN_LIST = ["free", "grow", "pro", "ultra"]
# FREE TIER CONSTANTS
TWELVE_DATA_FREE_TIER_MINUTE_RATE_LIMIT = 8 # api calls per minute
TWELVE_DATA_FREE_TIER_DAY_RATE_LIMIT = 800 # api calls per day
# PAID SUBSCRIPTION CONSTANTS
TWELVE_DATA_PRO_MINUTE_RATE_LIMIT = 55 # api calls per minute
TWELVE_DATA_PRO_DAY_RATE_LIMIT = 5000 # api calls per day
# TWELVE DATA LIMIT DATE
TWELVE_DATA_LIMIT_DATE = datetime(2020, 1, 1)
# TIMEFRAME e.g. interval admitted values
TWELVE_DATA_TIMEFRAMES = [
"1min",
"5min",
"15min",
"30min",
"45min",
"1h",
"2h",
"4h",
"8h",
"1day",
"1week",
"1month"
]
SUPPORTED_REALTIME_DATA_PROVIDERS = [
TWELVEDATA_PROVIDER
]