# -*- coding: utf-8 -*-
"""
Created on Sat Apr 30 09:23:19 2022
@author: fiora
"""
from loguru import logger
from re import (
fullmatch,
findall,
search,
IGNORECASE
)
from typing import (
cast,
Any,
List,
Literal,
Dict
)
import requests
import random
from bs4 import BeautifulSoup
from datetime import (
timedelta,
datetime
)
from pathlib import Path
from attrs import (
field,
validators
)
from holidays import (
country_holidays
)
# PANDAS
from pandas import (
DataFrame as pandas_dataframe,
concat as pandas_concat,
Timestamp,
isnull,
bdate_range,
to_datetime,
Timedelta,
read_parquet as pandas_read_parquet,
read_csv as pandas_read_csv
)
from pandas.api.types import is_datetime64_any_dtype
from pandas.tseries.frequencies import to_offset
from pandas.tseries.offsets import DateOffset
# PYARROW
from pyarrow import (
float32 as pyarrow_float32,
timestamp as pyarrow_timestamp,
schema as pyarrow_schema,
Table,
table as pyarrow_table,
concat_tables,
csv as arrow_csv
)
from pyarrow.parquet import (
write_table,
read_table
)
# POLARS
from polars import (
Float32 as polars_float32,
Datetime as polars_datetime,
read_csv as polars_read_csv,
concat as polars_concat,
col,
len as polars_len,
read_parquet as polars_read_parquet,
from_arrow,
DataFrame as polars_dataframe,
LazyFrame as polars_lazyframe,
scan_csv as polars_scan_csv,
scan_parquet as polars_scan_parquet
)
# POLYGON real time provider
from polygon.rest.models.aggs import (
Agg as polygon_agg
)
from dateutil.rrule import (
rrule,
DAILY,
MO,
TU,
WE,
TH,
FR
)
__all__ = [
'YEARS',
'MONTHS',
'DATE_FORMAT_SQL',
'DATE_FORMAT_HISTDATA_CSV',
'HISTDATA_URL_TICKDATA_TEMPLATE',
'HISTDATA_BASE_DOWNLOAD_METHOD',
'HISTDATA_BASE_DOWNLOAD_URL',
'DEFAULT_PATHS',
'DATA_TYPE',
'BASE_DATA_COLUMN_NAME',
'DATA_FILE_COLUMN_INDEX',
'SUPPORTED_DATA_FILES',
'SUPPORTED_DATA_ENGINES',
'ASSET_TYPE',
'TEMP_FOLDER',
'TEMP_CSV_FILE',
'DTYPE_DICT',
'PYARROW_DTYPE_DICT',
'POLARS_DTYPE_DICT',
'DATA_COLUMN_NAMES',
'FILENAME_TEMPLATE',
'DATA_KEY',
'TICK_TIMEFRAME',
'FILENAME_STR',
'REALTIME_DATA_PROVIDER',
'ALPHA_VANTAGE_API_KEY',
'CANONICAL_INDEX',
'DATE_NO_HOUR_FORMAT',
'POLYGON_IO_API_KEY',
'AV_LIST_URL',
'PAIR_ALPHAVANTAGE_FORMAT',
'PAIR_POLYGON_FORMAT',
'SQL_COMPARISON_OPERATORS',
'SUPPORTED_SQL_COMPARISON_OPERATORS',
'SUPPORTED_BASE_DATA_COLUMN_NAME',
'SQL_CONDITION_AGGREGATION_MODES',
'SUPPORTED_SQL_CONDITION_AGGREGATION_MODES',
'HISTORICAL_DB_MIN_DATE',
'HISTORICAL_DB_MIN_DATE',
'validator_file_path',
'validator_dir_path',
'get_attrs_names',
'check_timeframe_str',
'any_date_to_datetime64',
'empty_dataframe',
'is_empty_dataframe',
'shape_dataframe',
'get_dataframe_column',
'get_dataframe_row',
'get_dataframe_element',
'astype',
'read_csv',
'polars_datetime',
'sort_dataframe',
'concat_data',
'list_remove_duplicates',
'reframe_data',
'write_csv',
'write_parquet',
'read_parquet',
'to_pandas_dataframe',
'get_pair_symbols',
'to_source_symbol',
'get_date_interval',
'random_date_between',
'polygon_agg_to_dict',
'get_histdata_tickers',
'TickerNotFoundError',
'TickerDataNotFoundError',
'TickerDataBadTypeException',
'TickerDataInvalidException',
'business_days_data',
'update_ticker_years_dict',
'get_class_attr_keys',
'get_class_attr_values',
'get_class_attr_dict',
]
# =============================================================================
# CUSTOM EXCEPTIONS
# =============================================================================
# TickerNotFoundError:
# This exception is raised when the ticker requested is misspelled
# or does not exist in the database.
[docs]
class TickerNotFoundError(Exception):
pass
# TickerDataNotFoundError:
# This exception is raised when the ticker is found
# but data is not available or data retrieval failed.
[docs]
class TickerDataNotFoundError(Exception):
pass
# TickerDataBadTypeException:
# This exception is raised when the ticker data
# is found but data type is not compliant with the expected type.
[docs]
class TickerDataBadTypeException(Exception):
pass
# TickerDataInvalidException:
# This exception is raised when the ticker data
# is not found or invalid for generic reasons.
[docs]
class TickerDataInvalidException(Exception):
pass
# common functions, constants and templates
TEMP_FOLDER = "Temp"
TEMP_CSV_FILE = "Temp.csv"
HISTDATA_URL_TICKDATA_TEMPLATE = (
'https://www.histdata.com/download-free-forex-historical-data/?/'
'ascii/tick-data-quotes/{ticker}/{year}/{month_num}'
)
HISTDATA_URL_ONEMINDATA_TEMPLATE = (
'http://www.histdata.com/download-free-forex-data/?/'
'ascii/1-minute-bar-quotes/{pair}/{year}/{month_num}'
)
HISTDATA_BASE_DOWNLOAD_URL = "http://www.histdata.com/get.php"
HISTDATA_BASE_DOWNLOAD_METHOD = 'POST'
MONTHS = ['January', 'February', 'March', 'April', 'May', 'June',
'July', 'August', 'September', 'October', 'November', 'December']
YEARS = list(range(2000, datetime.now().year + 1, 1))
HISTORICAL_DB_MIN_DATE = datetime(2000, 1, 1)
DATE_NO_HOUR_FORMAT = '%Y-%m-%d'
DATE_FORMAT_ISO8601 = 'ISO8601'
DATE_FORMAT_SQL = '%Y-%m-%d %H:%M:%S.%f'
DATE_FORMAT_HISTDATA_CSV = '%Y%m%d %H%M%S%f'
# DATA_KEY_TEMPLATE_STR = '{ticker}.Y{year}.{tf}'
# DATA_KEY_TEMPLATE_PATTERN = '^[A-Za-z]+.Y[0-9]+.[A-Za-z0-9]+'
# FILENAME_STR = '{ticker}_Y{year}_{tf}.{file_ext}'
DATA_KEY_TEMPLATE_STR = '{market}.{ticker}.{tf}'
DATA_KEY_TEMPLATE_PATTERN = '^[A-Za-z0-9]_[A-Za-z]+.[A-Za-z0-9]+'
FILENAME_STR = '{market}_{ticker}_{tf}.{file_ext}'
DEFAULT_TIMEZONE = 'utc'
TICK_TIMEFRAME = 'tick'
# ticker PAIR of forex market
SINGLE_CURRENCY_PATTERN_STR = '[A-Za-z]{3}'
TICKER_PATTERN = '^' + SINGLE_CURRENCY_PATTERN_STR \
+ SINGLE_CURRENCY_PATTERN_STR + '$'
PAIR_GENERIC_FORMAT = '{TO}/{FROM}'
# ALPHAVANTAGE
PAIR_ALPHAVANTAGE_FORMAT = '{TO}/{FROM}'
PAIR_ALPHAVANTAGE_PATTERN = '^' + SINGLE_CURRENCY_PATTERN_STR + '/' \
+ SINGLE_CURRENCY_PATTERN_STR + '$'
ALPHA_VANTAGE_API_KEY = 'ALPHA_VANTAGE_API_KEY'
AV_LIST_URL = (
'https://www.alphavantage.co/query?'
'function=LISTING_STATUS&apikey={api_key}'
)
# PAIR POLYGON IO
PAIR_POLYGON_FORMAT = 'C:{TO}{FROM}'
PAIR_POLYGON_PATTERN = '^C:' + SINGLE_CURRENCY_PATTERN_STR + \
SINGLE_CURRENCY_PATTERN_STR + '$'
POLYGON_IO_API_KEY = 'POLYGON_IO_API_KEY'
# TIME PATTERN
TIME_WINDOW_PATTERN_STR = '^[-+]?[0-9]+[A-Za-z]{1,}$'
TIME_WINDOW_COMPONENTS_PATTERN_STR = '^[-+]?[0-9]+|[A-Za-z]{1,}$'
TIME_WINDOW_UNIT_PATTERN_STR = '[A-Za-z]{1,}$'
GET_YEAR_FROM_TICK_KEY_PATTERN_STR = '^[A-Za-z].Y[0-9].TICK'
YEAR_FIELD_PATTERN_STR = '^Y([0-9]{4,})$'
POLARS_DURATION_PATTERN_STR = '^[0-9]+(ns|us|ms|s|m|h|d|w|mo|q|y|i)$'
PYARROW_DURATION_PATTERN_STR = '^[0-9]+(ns|us|ms|s|m|h|d|w|mo|q|y|i)$'
# GENERAL UTILITIES
[docs]
def get_class_attr_keys(var):
defined_keys = []
try:
keys = vars(var)
defined_keys = [key for key in keys
if not search(r'__(\w+)__',
key)]
except Exception:
return None
return defined_keys
[docs]
def get_class_attr_values(var):
defined_vals = []
dict_var = dict(vars(var))
keys = get_class_attr_keys(var)
try:
defined_vals = [dict_var[key] for key in keys]
except Exception:
return None
return defined_vals
[docs]
def get_class_attr_dict(var):
def_keys = get_class_attr_keys(var)
return {key: getattr(var, key)
for key in def_keys}
# auxiliary CONSTANT DEFINITIONS
# key template: <ticker>.Y<year>.<timeframe>.<data-type>
[docs]
class DATA_KEY:
MARKET = 0
TICKER_INDEX = 1
TF_INDEX = 2
# filename template : <ticker>_Y<year>_<timeframe>.<filetype>
[docs]
class FILENAME_TEMPLATE:
TICKER_INDEX = 0
YEAR_INDEX = 1
YEAR_NUMERICAL_CHAR = 1
TF_INDEX = 2
FILETYPE_INDEX = 3
[docs]
class DEFAULT_PATHS:
BASE_PATH = str(Path.home() / '.database')
HIST_DATA_FOLDER = 'HistoricalData'
REALTIME_DATA_FOLDER = 'RealtimeData'
[docs]
class DATA_TYPE:
CSV_FILETYPE = 'csv'
PARQUET_FILETYPE = 'parquet'
DUCKDB = 'duckdb'
[docs]
class DATA_FILE_COLUMN_INDEX:
TIMESTAMP = 0
SUPPORTED_DATA_FILES = [
DATA_TYPE.CSV_FILETYPE,
DATA_TYPE.PARQUET_FILETYPE,
DATA_TYPE.DUCKDB
]
# supported dataframe engines
# pyarrow is inserted but reframe operation all in pyarrow
# is not yet available, now it is masked
# to a refame call with polars
# reframe_data() on pyarrow Table
SUPPORTED_DATA_ENGINES = [
'pandas',
'pyarrow',
'polars',
'polars_lazy'
]
# SINGLE BASE DATA COMPOSIION TEMPLATE: ['open','close','high','low']
# with datetime/timestamp as index
# column names for dataframes TICK and timeframe filtered
# OHLC and related column names
[docs]
class DATA_COLUMN_NAMES:
TICK_DATA_NO_PVALUE = ['timestamp', 'ask', 'bid', 'vol']
TICK_DATA = ['timestamp', 'ask', 'bid', 'vol', 'p']
TF_DATA = ['timestamp', 'open', 'high', 'low', 'close']
TICK_DATA_TIME_INDEX = ['ask', 'bid', 'vol', 'p']
TF_DATA_TIME_INDEX = ['open', 'high', 'low', 'close']
POLYGON_IO_AGGS = ['open', 'high', 'low', 'close', 'volume', 'vwap',
'timestamp', 'transactions']
# SELECTED AS SINGLE BASE DATA COMPOSION TEMPLATE
BASE_DATA = DATA_COLUMN_NAMES.TF_DATA_TIME_INDEX
BASE_DATA_WITH_TIME = DATA_COLUMN_NAMES.TF_DATA
[docs]
class REALTIME_DATA_PROVIDER:
ALPHA_VANTAGE = 'ALPHA_VANTAGE'
POLYGON_IO = 'POLYGON-IO'
REALTIME_DATA_PROVIDER_LIST = [REALTIME_DATA_PROVIDER.ALPHA_VANTAGE,
REALTIME_DATA_PROVIDER.POLYGON_IO]
class DB_MODE:
FULL_MODE = 'FULL_MODE'
HISTORICAL_MODE = 'HISTORICAL_MODE'
REALTIME_MODE = 'REALTIME_MODE'
[docs]
class ASSET_TYPE:
STOCK = 'STOCK'
ETF = 'ETF'
FOREX = 'FOREX'
[docs]
class BASE_DATA_COLUMN_NAME:
TIMESTAMP = 'timestamp'
OPEN = 'open'
HIGH = 'high'
LOW = 'low'
CLOSE = 'close'
ASK = 'ask'
BID = 'bid'
VOL = 'vol'
P_VALUE = 'p'
TRANSACTIONS = 'transactions'
VWAP = 'vwap'
OTC = 'otc'
SUPPORTED_BASE_DATA_COLUMN_NAME = Literal[
BASE_DATA_COLUMN_NAME.TIMESTAMP,
BASE_DATA_COLUMN_NAME.OPEN,
BASE_DATA_COLUMN_NAME.HIGH,
BASE_DATA_COLUMN_NAME.LOW,
BASE_DATA_COLUMN_NAME.CLOSE,
BASE_DATA_COLUMN_NAME.ASK,
BASE_DATA_COLUMN_NAME.BID,
BASE_DATA_COLUMN_NAME.VOL,
BASE_DATA_COLUMN_NAME.P_VALUE,
BASE_DATA_COLUMN_NAME.TRANSACTIONS,
BASE_DATA_COLUMN_NAME.VWAP,
BASE_DATA_COLUMN_NAME.OTC
]
[docs]
class CANONICAL_INDEX:
AV_LATEST_DATA_INDEX = 0
AV_DF_DATA_INDEX = 0
AV_DICT_INFO_INDEX = 1
[docs]
class SQL_COMPARISON_OPERATORS:
GREATER_THAN = '>'
LESS_THAN = '<'
GREATER_THAN_OR_EQUAL = '>='
LESS_THAN_OR_EQUAL = '<='
EQUAL = '=='
NOT_EQUAL = '!='
SUPPORTED_SQL_COMPARISON_OPERATORS = Literal[
SQL_COMPARISON_OPERATORS.GREATER_THAN,
SQL_COMPARISON_OPERATORS.LESS_THAN,
SQL_COMPARISON_OPERATORS.GREATER_THAN_OR_EQUAL,
SQL_COMPARISON_OPERATORS.LESS_THAN_OR_EQUAL,
SQL_COMPARISON_OPERATORS.EQUAL,
SQL_COMPARISON_OPERATORS.NOT_EQUAL
]
[docs]
class SQL_CONDITION_AGGREGATION_MODES:
AND = 'AND'
OR = 'OR'
SUPPORTED_SQL_CONDITION_AGGREGATION_MODES = Literal[
SQL_CONDITION_AGGREGATION_MODES.AND,
SQL_CONDITION_AGGREGATION_MODES.OR
]
# auxiliary functions
# get elements from db key
[docs]
def get_db_key_elements(key):
res = fullmatch(DATA_KEY_TEMPLATE_STR, key)
if res:
return res.groups()
else:
logger.error(
f'key {key} does not respect regex template {DATA_KEY_TEMPLATE_STR}')
raise ValueError
# parse argument to get datetime object with date format as input
def infer_date_from_format_dt(s, date_format='ISO8601', unit=None, utc=False):
if unit:
return to_datetime(s,
unit=unit,
utc=utc)
else:
return to_datetime(s,
format=date_format,
utc=utc)
# parse timeframe as string and validate if it is valid
# following pandas DateOffset freqstr rules and 'TICK' (=lowest timeframe available)
# link to official pandas doc
# https://pandas.pydata.org/docs/user_guide/timeseries.html#dateoffset-objects
# add compatibility to polars frequency string
[docs]
def check_timeframe_str(tf: str | Timedelta | DateOffset,
engine: Literal['pandas',
'polars',
'polars_lazy',
'pyarrow'] = 'pandas'):
if tf == TICK_TIMEFRAME:
return tf
elif isinstance(tf, Timedelta):
'''
Timedelta type is acceptd by all engines supported
'''
return tf
elif engine == 'pandas':
'''
timeframe value check for pandas engine
'''
try:
if isinstance(to_offset(tf), DateOffset):
return tf
else:
logger.critical(f"Type check: Invalid timeframe for pandas: {tf}")
raise ValueError
except ValueError:
logger.critical(f"Type check: Invalid timeframe for pandas: {tf}")
raise
elif (
engine == 'polars' or
engine == 'polars_lazy' or
engine == 'pyarrow'
):
'''
timeframe value check for polars engine
'''
if isinstance(tf, str):
if fullmatch(POLARS_DURATION_PATTERN_STR, tf, flags=IGNORECASE):
return tf
else:
logger.critical(f"Type check: Invalid timeframe for polars: {tf}")
raise ValueError
elif isinstance(tf, timedelta):
return tf
else:
logger.critical(f"Type check: Invalid timeframe for polars: {tf}")
raise ValueError
else:
logger.critical(f"Type check: Invalid engine: {engine}")
raise ValueError
# PAIR symbol functions
[docs]
def get_pair_symbols(ticker):
components = findall(SINGLE_CURRENCY_PATTERN_STR, ticker)
if len(components) == 2:
return components[0], components[1]
else:
return None
def check_symbol(symbol, source):
if source == REALTIME_DATA_PROVIDER.ALPHA_VANTAGE:
if fullmatch(PAIR_ALPHAVANTAGE_PATTERN, symbol):
return True
else:
return False
elif source == REALTIME_DATA_PROVIDER.POLYGON_IO:
if fullmatch(PAIR_POLYGON_FORMAT, symbol):
return True
else:
return False
else:
if fullmatch(PAIR_POLYGON_FORMAT, symbol):
return True
else:
return False
[docs]
def to_source_symbol(ticker, source):
to_symbol, from_symbol = get_pair_symbols(ticker)
if source == REALTIME_DATA_PROVIDER.ALPHA_VANTAGE:
return PAIR_ALPHAVANTAGE_FORMAT.format(TO=to_symbol,
FROM=from_symbol)
elif source == REALTIME_DATA_PROVIDER.POLYGON_IO:
return PAIR_POLYGON_FORMAT.format(TO=to_symbol,
FROM=from_symbol)
else:
return PAIR_GENERIC_FORMAT.format(TO=to_symbol,
FROM=from_symbol)
# TIMESTAMP RELATED FUNCTIONS
def timewindow_str_to_timedelta(time_window_str):
if fullmatch(TIME_WINDOW_PATTERN_STR, time_window_str):
return Timedelta(time_window_str)
else:
logger.error('time window pattern not match: '
'"<integer_multiplier><unit>" str')
raise ValueError
[docs]
def any_date_to_datetime64(any_date,
date_format='ISO8601',
unit=None,
to_pydatetime=False):
try:
any_date = infer_date_from_format_dt(any_date,
date_format,
unit=unit)
if to_pydatetime:
any_date = any_date.to_pydatetime()
except Exception as e:
logger.error(f'date {any_date} conversion failed, '
f'failed conversion to {date_format} '
'date format')
raise
return any_date
[docs]
def get_date_interval(start=None,
end=None,
interval_start_mode=None,
interval_end_mode='now',
interval_timespan=None,
freq=None,
normalize=False,
bdays=False):
# create start and end date as timestamp instances
start_date = Timestamp(start)
end_date = Timestamp(end)
if interval_timespan:
# a variety of interval mode could be implemented
# 'now' - end of date interval is timestamp now
if interval_end_mode == 'now':
end_date = Timestamp.now()
start_date = end_date - timewindow_str_to_timedelta(interval_timespan)
if bdays:
components = findall(TIME_WINDOW_COMPONENTS_PATTERN_STR,
interval_timespan)
# fixed days redundancy check available only with 'd' type requested
# timespan
if components[1] == 'd':
days_list = list(
rrule(freq=DAILY,
dtstart=start_date,
until=end_date,
byweekday=(MO, TU, WE, TH, FR))
)
while len(days_list) < int(components[0]):
start_date = start_date - Timedelta(days=1)
days_list = list(
rrule(freq=DAILY,
dtstart=start_date,
until=end_date,
byweekday=(MO, TU, WE, TH, FR))
)
# Timestamp() constructor ensures these are Timestamp objects
if normalize:
if not isnull(start_date):
start_date = Timestamp.normalize(start_date)
if not isnull(end_date):
end_date = Timestamp.normalize(end_date)
start_date = any_date_to_datetime64(start_date)
end_date = any_date_to_datetime64(end_date)
# generate DateTimeIndex if freq is set
# otherwise return just start and end of interval
if freq:
bdate_dtindex = bdate_range(start=start_date,
end=end_date,
freq=freq,
tz=None,
normalize=normalize,
name='timestamp'
)
return start_date, end_date, bdate_dtindex
else:
return start_date, end_date
[docs]
def random_date_between(start_date, end_date):
"""
Get a random datetime between two datetime objects.
Args:
start_date (datetime): The start date.
end_date (datetime): The end date.
Returns:
datetime: A random datetime between start_date and end_date.
"""
delta = end_date - start_date
random_seconds = random.randint(0, int(delta.total_seconds()))
return start_date + timedelta(seconds=random_seconds)
# BASE OPERATIONS WITH DATAFRAME
# depending on dataframe engine support
# for supported engines see var SUPPORTED_DATA_ENGINES
# DATA ENGINES TYPES DICTIONARY
[docs]
class DTYPE_DICT:
TICK_DTYPE = {'ask': 'float32',
'bid': 'float32',
'vol': 'float32',
'p': 'float32'}
TF_DTYPE = {'open': 'float32',
'high': 'float32',
'low': 'float32',
'close': 'float32'}
TIME_TICK_DTYPE = {'timestamp': 'datetime64[ms]',
'ask': 'float32',
'bid': 'float32',
'vol': 'float32',
'p': 'float32'}
TIME_TF_DTYPE = {'timestamp': 'datetime64[ms]',
'open': 'float32',
'high': 'float32',
'low': 'float32',
'close': 'float32'}
[docs]
class PYARROW_DTYPE_DICT:
TICK_DTYPE = {'ask': pyarrow_float32(),
'bid': pyarrow_float32(),
'vol': pyarrow_float32(),
'p': pyarrow_float32()}
TF_DTYPE = {'open': pyarrow_float32(),
'high': pyarrow_float32(),
'low': pyarrow_float32(),
'close': pyarrow_float32()}
TIME_TICK_DTYPE = {'timestamp': pyarrow_timestamp('ms'),
'ask': pyarrow_float32(),
'bid': pyarrow_float32(),
'vol': pyarrow_float32(),
'p': pyarrow_float32()}
TIME_TF_DTYPE = {'timestamp': pyarrow_timestamp('ms'),
'open': pyarrow_float32(),
'high': pyarrow_float32(),
'low': pyarrow_float32(),
'close': pyarrow_float32()}
[docs]
class POLARS_DTYPE_DICT:
TICK_DTYPE = {'ask': polars_float32,
'bid': polars_float32,
'vol': polars_float32,
'p': polars_float32}
TF_DTYPE = {'open': polars_float32,
'high': polars_float32,
'low': polars_float32,
'close': polars_float32}
TIME_TICK_DTYPE = {'timestamp': polars_datetime('ms'),
'ask': polars_float32,
'bid': polars_float32,
'vol': polars_float32,
'p': polars_float32}
TIME_TF_DTYPE = {'timestamp': polars_datetime('ms'),
'open': polars_float32,
'high': polars_float32,
'low': polars_float32,
'close': polars_float32}
# DATA ENGINES FUNCTIONS
[docs]
def empty_dataframe(engine):
if engine == 'pandas':
return pandas_dataframe()
elif engine == 'pyarrow':
return pyarrow_table([])
elif engine == 'polars':
return polars_dataframe()
elif engine == 'polars_lazy':
return polars_lazyframe()
else:
logger.error('function empty_dataframe not available'
f' for engine {engine}')
raise ValueError
[docs]
def is_empty_dataframe(dataframe):
if isinstance(dataframe, pandas_dataframe):
return dataframe.empty
elif isinstance(dataframe, Table):
return (not bool(dataframe))
elif isinstance(dataframe, polars_dataframe):
return dataframe.is_empty()
elif isinstance(dataframe, polars_lazyframe):
return dataframe.collect().is_empty()
else:
logger.error('function is_empty_dataframe not available'
' for instance of type'
f' {type(dataframe)}')
raise ValueError
[docs]
def shape_dataframe(dataframe):
if isinstance(dataframe, pandas_dataframe):
return dataframe.shape
elif isinstance(dataframe, Table):
return dataframe.shape
elif isinstance(dataframe, polars_dataframe):
return dataframe.shape
elif isinstance(dataframe, polars_lazyframe):
return (
dataframe.select(polars_len()).collect().item(0, 0),
dataframe.collect_schema().len()
)
else:
logger.error('function shape_dataframe not available'
' for instance of type'
f' {type(dataframe)}')
raise ValueError
[docs]
def sort_dataframe(dataframe, column):
if isinstance(dataframe, pandas_dataframe):
return dataframe.sort_values(by=[column])
elif isinstance(dataframe, Table):
return dataframe.sort_by(column)
elif isinstance(dataframe, polars_dataframe):
return dataframe.sort(column, nulls_last=True)
elif isinstance(dataframe, polars_lazyframe):
return dataframe.sort(column, nulls_last=True)
else:
logger.error('function sort_dataframe not available'
' for instance of type'
f' {type(dataframe)}')
raise ValueError
[docs]
def get_dataframe_column(dataframe, column):
if isinstance(dataframe, pandas_dataframe):
return dataframe[column]
elif isinstance(dataframe, Table):
return dataframe[column]
elif isinstance(dataframe, polars_dataframe):
return dataframe[column]
elif isinstance(dataframe, polars_lazyframe):
return dataframe.select(column).collect()
else:
logger.error('function get_dataframe_column not available'
' for instance of type'
f' {type(dataframe)}')
[docs]
def get_dataframe_row(dataframe, row):
if isinstance(dataframe, pandas_dataframe):
return dataframe.loc[row]
elif isinstance(dataframe, Table):
return dataframe.slice(row, 1)
elif isinstance(dataframe, polars_dataframe):
return dataframe.slice(row, 1)
elif isinstance(dataframe, polars_lazyframe):
return dataframe.slice(row, 1)
else:
logger.error('function get_dataframe_row not available'
' for instance of type'
f' {type(dataframe)}')
[docs]
def get_dataframe_element(dataframe, column, row):
if isinstance(dataframe, pandas_dataframe):
return dataframe[column][row]
elif isinstance(dataframe, Table):
return dataframe[column][row]
elif isinstance(dataframe, polars_dataframe):
return dataframe[column][row]
elif isinstance(dataframe, polars_lazyframe):
return dataframe.select(column).collect().item(row, 0)
else:
logger.error('function get_dataframe_element not available'
' for instance of type'
f' {type(dataframe)}')
raise ValueError
def dtype_dict_to_pyarrow_schema(dtype_dict):
return pyarrow_schema(dtype_dict.items())
[docs]
def astype(dataframe, dtype_dict):
if isinstance(dataframe, pandas_dataframe):
return dataframe.astype(dtype_dict)
elif isinstance(dataframe, Table):
return dataframe.cast(dtype_dict_to_pyarrow_schema(dtype_dict))
elif isinstance(dataframe, polars_dataframe):
return dataframe.cast(dtype_dict)
elif isinstance(dataframe, polars_lazyframe):
return dataframe.cast(dtype_dict)
else:
logger.error('function astype not available'
' for instance of type'
f' {type(dataframe)}')
raise ValueError
[docs]
def read_parquet(engine, filepath):
if engine == 'pandas':
return pandas_read_parquet(filepath)
elif engine == 'pyarrow':
return read_table(filepath)
elif engine == 'polars':
return polars_read_parquet(filepath)
elif engine == 'polars_lazy':
return polars_scan_parquet(filepath)
else:
logger.error('function read_parquet not available'
f' for engine {engine}')
raise ValueError
[docs]
def write_parquet(dataframe, filepath):
if isinstance(dataframe, pandas_dataframe):
try:
dataframe.to_parquet(filepath, index=True)
except Exception as e:
logger.exception(f'pandas write parquet failed: {e}')
raise
elif isinstance(dataframe, Table):
try:
write_table(dataframe, filepath)
except Exception as e:
logger.exception(f'pyarrow write parquet failed: {e}')
raise
elif isinstance(dataframe, polars_dataframe):
try:
dataframe.write_parquet(filepath)
except Exception as e:
logger.exception(f'polars write parquet failed: {e}')
raise
elif isinstance(dataframe, polars_lazyframe):
try:
dataframe.sink_parquet(filepath)
# alternative to sink_parquet()
# dataframe.collect(streaming=False).write_parquet(filepath)
except Exception as e:
logger.exception(f'polars lazyframe sink '
'parquet failed: {e}')
raise
else:
logger.error('function write_parquet not available'
' for instance of type'
f' {type(dataframe)}')
raise ValueError
[docs]
def read_csv(engine, file, **kwargs):
if engine == 'pandas':
return pandas_read_csv(file, **kwargs)
elif engine == 'pyarrow':
return arrow_csv.read_csv(file, **kwargs)
elif engine == 'polars':
return polars_read_csv(file, **kwargs)
elif engine == 'polars_lazy':
return polars_scan_csv(file, **kwargs)
else:
logger.error('function read_csv not available'
f' for engine {engine}')
raise ValueError
[docs]
def write_csv(dataframe, file, **kwargs):
if isinstance(dataframe, pandas_dataframe):
try:
# IMPORTANT
# pandas dataframe case
# avoid date_format parameter since it is reported that
# it makes to_csv to be excessively long with column data
# being datetime data type
# see: https://github.com/pandas-dev/pandas/issues/37484
# https://stackoverflow.com/questions/65903287/pandas-1-2-1-to-csv-performance-with-datetime-as-the-index-and-setting-date-form
dataframe.to_csv(file,
header=True,
**kwargs)
except Exception as e:
logger.exception(f'Error writing csv file {file}'
f' with data type {type(dataframe)}: {e}')
raise IOError
elif isinstance(dataframe, Table):
try:
arrow_csv.write_csv(dataframe, file, **kwargs)
except Exception as e:
logger.exception(f'Error writing csv file {file}'
f' with data type {type(dataframe)}: {e}')
raise IOError
elif isinstance(dataframe, polars_dataframe):
try:
dataframe.write_csv(file, **kwargs)
except Exception as e:
logger.exception(f'Error writing csv file {file}'
f' with data type {type(dataframe)}: {e}')
raise IOError
elif isinstance(dataframe, polars_lazyframe):
try:
dataframe.sink_csv(file, **kwargs)
except Exception as e:
logger.exception(f'Error writing csv file {file}'
f' with data type {type(dataframe)}: {e}')
raise IOError
else:
logger.error('function write_csv not available'
' for instance of type'
f' {type(dataframe)}')
raise ValueError
[docs]
def concat_data(data_list=field(validator=validators.instance_of(list))):
if not isinstance(data_list, list):
logger.error('required input as list')
raise TypeError
# assume data type is unique by input
# get type from first element
if isinstance(data_list[0], pandas_dataframe):
return pandas_concat(data_list,
ignore_index=False,
copy=False)
elif isinstance(data_list[0], Table):
return concat_tables(data_list)
elif isinstance(data_list[0], polars_dataframe):
return polars_concat(data_list, how='vertical')
elif isinstance(data_list[0], polars_lazyframe):
return polars_concat(data_list, how='vertical')
else:
logger.error('function concat not available'
' for instance of type'
f' {type(data_list[0])}')
raise ValueError
[docs]
def to_pandas_dataframe(dataframe):
# convert to pandas dataframe
# useful for those calls
# requiring pandas as input
# or pandas functions not covered
# by other dataframe instance
if isinstance(dataframe, pandas_dataframe):
return dataframe
elif isinstance(dataframe, Table):
return dataframe.to_pandas()
elif isinstance(dataframe, polars_dataframe):
return dataframe.to_pandas(use_pyarrow_extension_array=True)
elif isinstance(dataframe, polars_lazyframe):
return dataframe.collect().to_pandas(use_pyarrow_extension_array=True)
else:
logger.error('function to_pandas() not available'
' for instance of type'
f' {type(dataframe)}')
raise ValueError
[docs]
def reframe_data(dataframe, tf):
'''
Parameters
----------
data : TYPE
DESCRIPTION.
tf : TYPE
DESCRIPTION.
Raises
------
ValueError
DESCRIPTION.
Returns
-------
Dataframe
DESCRIPTION.
'''
if is_empty_dataframe(dataframe):
return dataframe
if isinstance(dataframe, pandas_dataframe):
# assert timeframe input value
tf = check_timeframe_str(tf, engine='pandas')
if tf == TICK_TIMEFRAME:
logger.warning(f'reframe not possible with target {TICK_TIMEFRAME}')
return dataframe
if not is_datetime64_any_dtype(dataframe.index):
if BASE_DATA_COLUMN_NAME.TIMESTAMP in dataframe.columns:
if not is_datetime64_any_dtype(
dataframe[BASE_DATA_COLUMN_NAME.TIMESTAMP]):
try:
dataframe[BASE_DATA_COLUMN_NAME.TIMESTAMP] = any_date_to_datetime64(
dataframe[BASE_DATA_COLUMN_NAME.TIMESTAMP])
except Exception as e:
logger.exception('Pandas engine: '
'Failed conversion of timestamp columns '
'to DatetimeIndex')
raise
else:
logger.error('Pandas engine: required column with '
f'name {BASE_DATA_COLUMN_NAME.TIMESTAMP}')
raise ValueError
# use pandas functions to reframe data on pandas Dataframe
dataframe = sort_dataframe(dataframe, BASE_DATA_COLUMN_NAME.TIMESTAMP)
dataframe = dataframe.set_index(BASE_DATA_COLUMN_NAME.TIMESTAMP,
inplace=False,
drop=True
)
# resample based on p value
if all([col in DATA_COLUMN_NAMES.TICK_DATA_TIME_INDEX
for col in dataframe.columns]):
# resample along 'p' column, data in ask, bid, p format
dataframe = dataframe.p.resample(tf).ohlc().interpolate(method='nearest')
elif all([col in DATA_COLUMN_NAMES.TF_DATA_TIME_INDEX
for col in dataframe.columns]):
# resample along given data already in ohlc format
dataframe = dataframe.resample(tf).interpolate(method='nearest')
else:
logger.error(f'data columns {dataframe.columns} invalid, '
f'required {DATA_COLUMN_NAMES.TICK_DATA_TIME_INDEX} '
f'or {DATA_COLUMN_NAMES.TF_DATA_TIME_INDEX}')
raise ValueError
return dataframe.reset_index(drop=False)
elif isinstance(dataframe, Table):
# assert timeframe input value
tf = check_timeframe_str(tf, engine='pyarrow')
'''
use pyarrow functions to reframe data on pyarrow Table
could not find easy way to filter an arrow table
based on time interval
opened an enhancement issue on github
https://github.com/apache/arrow/issues/41049
As a temporary alternative, convert arrow Table to polars
and perform reframe with polars engine
'''
if all([col in DATA_COLUMN_NAMES.TICK_DATA
for col in dataframe.column_names]):
# convert to polars dataframe
dataframe = from_arrow(dataframe,
schema=cast(Any, POLARS_DTYPE_DICT.TIME_TICK_DTYPE))
elif all([col in DATA_COLUMN_NAMES.TF_DATA
for col in dataframe.column_names]):
# convert to polars dataframe
dataframe = from_arrow(dataframe,
schema=cast(Any, POLARS_DTYPE_DICT.TIME_TF_DTYPE))
# perform operation
# convert to arrow Table and return
return reframe_data(dataframe, tf).to_arrow()
elif isinstance(dataframe, polars_dataframe):
# assert timeframe input value
tf = check_timeframe_str(tf, engine='polars')
tf = tf.lower()
dataframe = sort_dataframe(dataframe, BASE_DATA_COLUMN_NAME.TIMESTAMP)
if all([col in DATA_COLUMN_NAMES.TICK_DATA
for col in dataframe.columns]):
return dataframe.group_by_dynamic(
BASE_DATA_COLUMN_NAME.TIMESTAMP,
every=tf).agg(col('p').first().alias(BASE_DATA_COLUMN_NAME.OPEN),
col('p').max().alias(BASE_DATA_COLUMN_NAME.HIGH),
col('p').min().alias(BASE_DATA_COLUMN_NAME.LOW),
col('p').last().alias(BASE_DATA_COLUMN_NAME.CLOSE)
)
elif all([col in DATA_COLUMN_NAMES.TF_DATA
for col in dataframe.columns]):
return dataframe.group_by_dynamic(
BASE_DATA_COLUMN_NAME.TIMESTAMP,
every=tf).agg(col(BASE_DATA_COLUMN_NAME.OPEN).first(),
col(BASE_DATA_COLUMN_NAME.HIGH).max(),
col(BASE_DATA_COLUMN_NAME.LOW).min(),
col(BASE_DATA_COLUMN_NAME.CLOSE).last()
)
else:
logger.error(f'data columns {dataframe.columns} invalid, '
f'required {DATA_COLUMN_NAMES.TICK_DATA} '
f'or {DATA_COLUMN_NAMES.TF_DATA}')
raise ValueError
elif isinstance(dataframe, polars_lazyframe):
tf = tf.lower()
dataframe = dataframe.sort('timestamp', nulls_last=True)
if all([col in DATA_COLUMN_NAMES.TICK_DATA
for col in dataframe.collect_schema().names()]):
return dataframe.group_by_dynamic(
BASE_DATA_COLUMN_NAME.TIMESTAMP,
every=tf).agg(col('p').first().alias(BASE_DATA_COLUMN_NAME.OPEN),
col('p').max().alias(BASE_DATA_COLUMN_NAME.HIGH),
col('p').min().alias(BASE_DATA_COLUMN_NAME.LOW),
col('p').last().alias(BASE_DATA_COLUMN_NAME.CLOSE)
)
elif all([col in DATA_COLUMN_NAMES.TF_DATA
for col in dataframe.collect_schema().names()]):
return dataframe.group_by_dynamic(
BASE_DATA_COLUMN_NAME.TIMESTAMP,
every=tf).agg(col(BASE_DATA_COLUMN_NAME.OPEN).first(),
col(BASE_DATA_COLUMN_NAME.HIGH).max(),
col(BASE_DATA_COLUMN_NAME.LOW).min(),
col(BASE_DATA_COLUMN_NAME.CLOSE).last()
)
else:
logger.error(f'data columns {dataframe.columns} invalid, '
f'required {DATA_COLUMN_NAMES.TICK_DATA} '
f'or {DATA_COLUMN_NAMES.TF_DATA}')
raise ValueError
# ATTRS
# ADDED VALIDATORS
[docs]
def validator_file_path(file_ext=None):
def validate_file_path(instance, attribute, value):
try:
filepath = Path(value)
except Exception as e:
logger.error(f'File {value} Path creation error: {e}')
raise
else:
if not (
filepath.exists() or
filepath.is_file()
):
logger.error(f'file {value} not exists')
raise FileExistsError
return validate_file_path
[docs]
def validator_dir_path(create_if_missing=False):
def validate_or_create_dir(instance, attribute, value):
if create_if_missing:
try:
Path(value).mkdir(parents=True, exist_ok=True)
except FileExistsError:
# A parallel process created the directory between the
# exist_ok check and the actual os.mkdir syscall (TOCTOU
# race on macOS). Safe to ignore if it is now a directory.
if not Path(value).is_dir():
logger.error(
f'Path {value} exists but is not a directory')
raise
else:
if not (
Path(value).exists() or
Path(value).is_dir()
):
logger.error(f'Directory {value} not valid')
raise TypeError()
return validate_or_create_dir
def validator_list_ge(min_value):
def validator_list_values(instance, attribute, value):
if not (
isinstance(value, list) and
all([isinstance(val, int)
for val in value])
):
logger.error('Required list of int type for argument '
f'{attribute}')
raise TypeError
if any([
val < min_value
for val in value
]):
fails = [
val for val in value
if val < min_value
]
logger.error(f'Values in {attribute}: {fails} '
f'are not greater than {min_value}')
raise ValueError
# ATTRIBUTES
[docs]
def get_attrs_names(instance_object, **kwargs):
if hasattr(instance_object, '__attrs_attrs__'):
return [attr.name
for attr in instance_object.__attrs_attrs__]
else:
logger.error('attribute "__attrs__attrs__" not found in '
f'object {instance_object}')
raise KeyError
# GENERIC UTILITIES
[docs]
def list_remove_duplicates(list_in):
return list(dict.fromkeys(list_in))
# HISTDATA data provider utilities
# Analyze the Histdata Forex download base page
# https://www.histdata.com/download-free-forex-data/?/ascii/1-minute-bar-quotes
# and get a list of all avilable tickers in the form as the example "EURUSD"
[docs]
def get_histdata_tickers() -> List[str]:
"""
Get all available tickers from HistData.com.
Returns
-------
List[str]
List of all available tickers (e.g., ['EURUSD', 'GBPUSD', ...]).
"""
url = "https://www.histdata.com/download-free-forex-data/?/ascii/1-minute-bar-quotes"
try:
requests.head(url, timeout=5)
except requests.RequestException:
logger.error(f'Failed to connect to {url}')
return []
try:
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
tickers = []
# Tickers are typically in links that lead to the pair's specific page
for link in soup.find_all('a', href=True):
href = link['href']
# Pattern check based on the observed links
if "/ascii/1-minute-bar-quotes/" in href:
parts = href.split('/')
ticker = parts[-1]
# Validate it's a valid ticker (usually 6 chars like EURUSD)
if ticker and len(ticker) >= 6:
tickers.append(ticker.upper())
return sorted(list(set(tickers)))
except Exception as e:
logger.error(f"Failed to retrieve tickers from HistData: {e}")
return []
# REAL TIME PROVIDERS UTILITIES
[docs]
def polygon_agg_to_dict(agg):
if not isinstance(agg, polygon_agg):
logger.error('argument invalid type, required '
'polygon.rest.models.aggs.Agg')
return {
BASE_DATA_COLUMN_NAME.TIMESTAMP: agg.timestamp,
BASE_DATA_COLUMN_NAME.OPEN: agg.open,
BASE_DATA_COLUMN_NAME.HIGH: agg.high,
BASE_DATA_COLUMN_NAME.LOW: agg.low,
BASE_DATA_COLUMN_NAME.CLOSE: agg.close,
BASE_DATA_COLUMN_NAME.VOL: agg.volume,
BASE_DATA_COLUMN_NAME.TRANSACTIONS: agg.transactions,
BASE_DATA_COLUMN_NAME.VWAP: agg.vwap,
BASE_DATA_COLUMN_NAME.OTC: agg.otc
}
# MARKETS FUNCTIONS
US_HOLIDAYS = country_holidays('US', years=YEARS)
# Convert US_HOLIDAYS to a list of date objects for efficient filtering
US_holiday_dates = [holiday_date for holiday_date in US_HOLIDAYS.keys()]
[docs]
def business_days_data(dataframe: polars_lazyframe |
polars_dataframe) -> polars_dataframe | polars_lazyframe:
'''
Remove non-business days data from the input dataframe.
Filter out weekends data: saturday and sunday.
Use holidays to get list of country holidays.
Consider dataframe always have a column named 'timestamp' of type datetime.
'''
# Filter out weekends (Saturday=6, Sunday=7) and holidays
# Use Polars' dt.weekday() where Monday=1, Tuesday=2, ..., Saturday=6, Sunday=7
return dataframe.filter(
(col('timestamp').dt.weekday() < 6) & # Keep Monday(1) through Friday(5)
(~col('timestamp').dt.date().is_in(US_holiday_dates)) # Exclude holidays
)
[docs]
def update_ticker_years_dict(
ticker_years_dict: Dict[str, Dict[str, List[int]]],
ticker: str,
timeframe: str,
years_to_add: List[int]
) -> bool:
"""
Update a ticker years dictionary with new years for a specific ticker and timeframe.
This function modifies the dictionary in place and returns whether any changes were made.
Parameters
----------
ticker_years_dict : Dict[str, Dict[str, List[int]]]
Dictionary containing ticker years data, structured as:
{ticker: {timeframe: [year1, year2, ...]}}
ticker : str
The ticker symbol to update
timeframe : str
The timeframe for the ticker data
years_to_add : List[int]
List of years to add to the years list
Returns
-------
bool
True if any changes were made, False otherwise
"""
# Initialize ticker if not present
if ticker not in ticker_years_dict:
ticker_years_dict[ticker] = {}
# Initialize timeframe if not present
if timeframe not in ticker_years_dict[ticker]:
ticker_years_dict[ticker][timeframe] = []
# Track if any changes were made
changes_made = False
# Add years if not already present
for y in years_to_add:
if y not in ticker_years_dict[ticker][timeframe]:
ticker_years_dict[ticker][timeframe].append(y)
changes_made = True
# Keep years sorted if changes were made
if changes_made:
ticker_years_dict[ticker][timeframe].sort()
return ticker_years_dict, changes_made