forex_data_aggregator package

Subpackages

Module contents

Created on Sun Jul 17 17:07:39 2022

@author: fiora

forex_data.read_config_file(config_file)[source]
Return type:

Dict[str, Any]

forex_data.read_config_string(config_str)[source]
Return type:

Dict[str, Any]

forex_data.read_config_folder(folder_path=None, file_pattern='appconfig.yaml')[source]
Return type:

Path

class forex_data.HistoricalManagerDB(**kwargs)[source]

Bases: object

config: str
data_type: str
engine: str
data_path: str | Path
db_files_year_partitioning: bool
ssl_verify: bool
connector_id: str
max_discrepancy_with_now: str
__init__(**kwargs)[source]
get_source_connectors()[source]
Return type:

List[RemoteConnector]

clear_database(filter=None)[source]
Return type:

None

add_timeframe(timeframe)[source]

Add and cache a new timeframe to the database.

Creates aggregated data for the specified timeframe from tick data and caches it in the database for faster future access. The timeframe is added to the internal list of available timeframes.

Parameters:

timeframe (Union[str, List[str]]) – Timeframe(s) to add. Can be a single string or list of strings. Supported values: ‘1m’, ‘5m’, ‘15m’, ‘30m’, ‘1h’, ‘4h’, ‘1D’, ‘1W’, ‘1M’

Return type:

None

Returns:

None

Raises:

TypeError – If timeframe is not a string or list of strings

Example

>>> manager = HistoricalManagerDB(config='data_config.yaml')
>>> manager.add_timeframe('1W')  # Add weekly timeframe
>>> manager.add_timeframe(['4h', '1D'])  # Add multiple timeframes

Note

  • Only new timeframes (not already in the list) will be processed

  • Aggregation can take time for large datasets

  • Once added, the timeframe is permanently cached in the database

get_data(ticker, timeframe, start, end, comparison_column_name=None, check_level=None, comparison_operator=None, aggregation_mode=None)[source]

Retrieve OHLC historical data for the specified ticker and timeframe.

Fetches historical forex data from the database, automatically downloading and aggregating data if not already available. Supports multiple timeframes and date ranges.

Parameters:
  • ticker (str) – Currency pair symbol (e.g., ‘EURUSD’, ‘GBPUSD’, ‘NZDUSD’). Case-insensitive.

  • timeframe (str) – Candle timeframe for data aggregation. Supported frames: 1s (1 second) 1m (1 minute) 1h (1 hour) 1d (1 calendar day) 1w (1 calendar week) 1mo (1 calendar month) 1q (1 calendar quarter) 1y (1 calendar year)

  • integer (and any multiple of these values by a positive) – ‘2m’, ‘3m’, ‘2h’, ‘3h’, etc.

  • e.g. – ‘2m’, ‘3m’, ‘2h’, ‘3h’, etc.

  • start (str | datetime) – Start date for data retrieval. Accepts: - ISO format: ‘YYYY-MM-DD’ or ‘YYYY-MM-DD HH:MM:SS’ - datetime object

  • end (str | datetime) – End date for data retrieval. Same format as start. Must be after start date.

  • comparison_column_name (Union[List[str], str, None]) – Column names to retrieve. Default is None.

  • check_level (Union[List[int | float], int, float, None]) – Check level for conditions. Default is None.

  • comparison_operator (Union[List[Literal['>', '<', '>=', '<=', '==', '!=']], Literal['>', '<', '>=', '<=', '==', '!='], None]) – Condition for data retrieval. Default is None.

  • aggregation_mode (Optional[Literal['AND', 'OR']]) – Aggregation mode for data retrieval. Default is None.

Returns:

DataFrame containing OHLC data with columns:
  • timestamp: datetime column with candle timestamps

  • open: Opening price (float32)

  • high: Highest price (float32)

  • low: Lowest price (float32)

  • close: Closing price (float32)

Return type:

Union[DataFrame, LazyFrame]

Raises:
  • TickerNotFoundError – If the ticker is not available in the historical database

  • ValueError – If timeframe is invalid or end date is before start date

Example

>>> manager = HistoricalManagerDB(config='data_config.yaml')
>>> data = manager.get_data(
...     ticker='EURUSD',
...     timeframe='1h',
...     start='2020-01-01',
...     end='2020-01-31'
... )
>>> print(f"Retrieved {len(data)} hourly candles")
Retrieved 744 hourly candles

Note

  • Data is automatically downloaded from histdata.com if not cached locally

  • First call for a new timeframe may take longer as it builds the aggregation

  • Downloaded data is cached for faster subsequent access

  • Ticker names are case-insensitive and automatically normalized

get_data_window(ticker, date, timeframe, periods, direction, comparison_column_name=None, check_level=None, comparison_operator=None, comparison_aggregation_mode=None)[source]

Retrieve OHLC historical window data for the specified ticker. The unit resoluton of the window is set equal to the timeframe. Unit resolution is the timespan between two candles (rows) in normal conditions: during weekends the rule does not apply. The window total number of candles (rows) is specified by timeframe * periods.

Fetches historical forex data from the database, automatically downloading and aggregating data if not already available. Supports multiple timeframes

Parameters:
  • ticker (str) – Currency pair symbol (e.g., ‘EURUSD’, ‘GBPUSD’, ‘NZDUSD’). Case-insensitive.

  • date (date) – date for data retrieval. Accepts: - ISO format: ‘YYYY-MM-DD’ or ‘YYYY-MM-DD HH:MM:SS’ - datetime object

  • timeframe (str) – Candle timeframe for data aggregation. Supported frames: 1s (1 second) 1m (1 minute) 1h (1 hour) 1d (1 calendar day) 1w (1 calendar week) 1mo (1 calendar month) 1q (1 calendar quarter) 1y (1 calendar year) periods (int): Number of timeframe units to look back or forward.

  • direction (Literal['backward', 'forward']) – Direction to look back (‘backward’ or ‘forward’).

  • comparison_column_name (Union[List[str], str, None]) – List of column names to compare. If None, no comparison is performed.

  • check_level (Union[List[int | float], int, float, None]) – List of values to compare against. If None, no comparison is performed.

  • | (comparison_operator (List[SUPPORTED_SQL_COMPARISON_OPERATORS]) – SUPPORTED_SQL_COMPARISON_OPERATORS | None): List of comparison operators to use for comparison. If None, no comparison is performed.

  • (SUPPORTED_SQL_CONDITION_AGGREGATION_MODES (comparison_aggregation_mode) –

    None): Aggregation mode to use for comparison.

    If None, no comparison is performed.

Returns:

DataFrame with the historical

data.

Return type:

Union[DataFrame, LazyFrame]

Raises:

Examples

>>> get_data_window(
...     ticker='EURUSD',
...     date='2022-01-01',
...     timeframe='1m',
...     window=10,
...     direction='backward'
... )
plot(ticker, timeframe, start_date, end_date)[source]

Plot candlestick chart for the specified ticker and date range.

Generates an interactive candlestick chart using mplfinance, displaying OHLC (Open, High, Low, Close) data for the specified time period.

Parameters:
  • ticker (str) – Currency pair symbol (e.g., ‘EURUSD’, ‘GBPUSD’)

  • timeframe (str) – Candle timeframe (e.g., ‘1m’, ‘5m’, ‘1h’, ‘1D’, ‘1W’)

  • start_date (str) – Start date in ISO format ‘YYYY-MM-DD’ or ‘YYYY-MM-DD HH:MM:SS’

  • end_date (str) – End date in ISO format ‘YYYY-MM-DD’ or ‘YYYY-MM-DD HH:MM:SS’

Returns:

Displays the chart using matplotlib

Return type:

None

Example

>>> manager = HistoricalManagerDB(config='data_config.yaml')
>>> manager.plot(
...     ticker='EURUSD',
...     timeframe='1D',
...     start_date='2020-01-01',
...     end_date='2020-12-31'
... )

Note

The chart will be displayed in a matplotlib window. The data is automatically fetched using get_data() and converted to the appropriate format for plotting.

close()[source]
class forex_data.BASE_DATA_COLUMN_NAME[source]

Bases: object

TIMESTAMP = 'timestamp'
OPEN = 'open'
HIGH = 'high'
LOW = 'low'
CLOSE = 'close'
ASK = 'ask'
BID = 'bid'
VOL = 'vol'
P_VALUE = 'p'
TRANSACTIONS = 'transactions'
VWAP = 'vwap'
OTC = 'otc'
class forex_data.DATA_FILE_COLUMN_INDEX[source]

Bases: object

TIMESTAMP = 0
class forex_data.DEFAULT_PATHS[source]

Bases: object

BASE_PATH = '/home/runner/.database'
HIST_DATA_FOLDER = 'HistoricalData'
REALTIME_DATA_FOLDER = 'RealtimeData'
class forex_data.SQL_COMPARISON_OPERATORS[source]

Bases: object

GREATER_THAN = '>'
LESS_THAN = '<'
GREATER_THAN_OR_EQUAL = '>='
LESS_THAN_OR_EQUAL = '<='
EQUAL = '=='
NOT_EQUAL = '!='
class forex_data.SQL_CONDITION_AGGREGATION_MODES[source]

Bases: object

AND = 'AND'
OR = 'OR'
forex_data.empty_dataframe(engine)[source]
forex_data.is_empty_dataframe(dataframe)[source]
forex_data.shape_dataframe(dataframe)[source]
forex_data.get_dataframe_column(dataframe, column)[source]
forex_data.get_dataframe_row(dataframe, row)[source]
forex_data.get_dataframe_element(dataframe, column, row)[source]
forex_data.get_attrs_names(instance_object, **kwargs)[source]
forex_data.any_date_to_datetime64(any_date, date_format='ISO8601', unit=None, to_pydatetime=False)[source]
forex_data.get_db_key_elements(key)[source]
forex_data.check_timeframe_str(tf, engine='pandas')[source]
class forex_data.DatabaseConnector(**kwargs)[source]

Bases: object

data_path: str | Path
data_type: str
engine: str
__init__(**kwargs)[source]
connect()[source]

Connect to database - must be implemented by subclasses.

Return type:

Any

check_connection()[source]

Check database connection - must be implemented by subclasses.

Return type:

bool

write_data(target_table, dataframe, clean=False)[source]

Write data to database - must be implemented by subclasses.

Return type:

None

read_data(market, ticker, timeframe, start, end)[source]

Read data from database - must be implemented by subclasses.

Return type:

LazyFrame

read_data_year(market, ticker, timeframe, years)[source]

Read data for specific year(s) - must be implemented by subclasses.

Return type:

LazyFrame

read_data_window(market, ticker, timeframe, date, periods, direction, comparison_column_name=None, check_level=None, comparison_operator=None, comparison_aggregation_mode=None)[source]

Read window of data - must be implemented by subclasses.

Return type:

LazyFrame

read_last_timestamp(market, ticker, timeframe=None)[source]

Read last timestamp from database - must be implemented by subclasses.

Return type:

datetime

get_tickers_list()[source]
Return type:

List[str]

get_ticker_keys(ticker, timeframe=None)[source]
Return type:

List[str]

get_ticker_timeframes_list(ticker)[source]

Get timeframes list for ticker - must be implemented by subclasses.

Return type:

List[str]

get_ticker_years_list(ticker, timeframe='tick')[source]
Return type:

List[int]

create_tickers_years_dict()[source]

Create a dictionary containing ticker years data, structured as: {ticker: {timeframe: [year1, year2, …]}}

If no data files exist yet, returns an empty dictionary.

Return type:

Dict[str, Dict[str, List[int]]]

save_tickers_years_info(ticker_years_dict)[source]

Save ticker years list to a JSON file.

Return type:

None

Parameters

ticker_years_dictDict[str, Dict[str, List[int]]]

Dictionary containing ticker years data, structured as: {ticker: {timeframe: [year1, year2, …]}}

filenamestr, optional

Name of the JSON file to save the data, by default ‘tickers_years.json’

Raises

TypeError

If ticker_years_dict is not a dictionary

IOError

If there’s an error writing the file

add_tickers_years_info_to_file(ticker, timeframe, year)[source]

In local info filepath, update just the years list of the given ticker and timeframe by adding the year(s) specified if not already present

Return type:

None

Parameters

tickerstr

The ticker symbol to update

timeframestr

The timeframe for the ticker data

yearUnion[int, List[int]]

The year or list of years to add to the years list

Raises

TypeError

If year is not an integer or list of integers

clear_tickers_years_info(filter=None)[source]

Clear the tickers years info file. If filter is specified, it has to be a ticker value and so only the tickers years info related to the filter are cleared. If filter is not specified, the entire file is cleared. Parameters ———- filter : Optional[str], optional

Filter to apply to the tickers years info file, by default None Filter has to be a ticker value

Return type:

None

clear_database(filter=None)[source]

Clear database files If filter is provided and is a ticker present in database (files present) delete only files related to that ticker

Return type:

None

load_tickers_years_info()[source]

Load ticker years list from a JSON file.

Return type:

Dict[str, Dict[str, List[int]]]

Returns

Dict[str, Dict[str, List[int]]]

Dictionary containing ticker years data, structured as: {ticker: {timeframe: [year1, year2, …]}}

Raises

FileNotFoundError

If the JSON file doesn’t exist

IOError

If there’s an error reading the file

class forex_data.LocalDBConnector(**kwargs)[source]

Bases: DatabaseConnector

__init__(**kwargs)[source]
write_data(target_table, dataframe, clean=False)[source]

Write data to database - must be implemented by subclasses.

Return type:

None

read_data(market, ticker, timeframe, start, end, comparison_column_name=None, check_level=None, comparison_operator=None, comparison_aggregation_mode=None)[source]

Read data from database - must be implemented by subclasses.

Return type:

LazyFrame

read_data_year(market, ticker, timeframe, years)[source]

Read data for specific year(s) using SQL YEAR() filter.

Return type:

LazyFrame

read_data_window(market, ticker, timeframe, date, periods, direction, comparison_column_name=None, check_level=None, comparison_operator=None, comparison_aggregation_mode=None)[source]

Read window of data specified by input requirements: the data window has timespan in order to return a dataframe with rows size equal to periods. Query the local db to calculate the start date of the window if direction is forward, or end date if backward.

Return type:

LazyFrame

read_last_timestamp(market, ticker, timeframe=None)[source]

Read last timestamp from database. If timeframe is not set (None), retrieve the smallest timeframe available in local database

Parameters:
  • market (str) – Market name

  • ticker (str) – Ticker symbol

  • timeframe (str) – Timeframe of the data

Returns:

Last timestamp in the local database for the

specified market, ticker and timeframe (or smallest available timeframe if timeframe is not set)

Return type:

datetime

class forex_data.RealTimeDBConnectorTwelveData(**kwargs)[source]

Bases: RemoteConnector

Class to read real-time data from the database using TwelveData API.

api_key: str
plan: str
property tier: str

self.tier.

Type:

Alias for plan to avoid AttributeError

__init__(**kwargs)[source]
property chunk_size: int

Max number of data points per request.

property max_requests_per_minute: int

Max number of requests per minute.

get_realtime_price(symbol)[source]

Fetches the instantaneous real-time price and outputs as a 1-row LazyFrame.

Return type:

LazyFrame

get_data(symbol, timeframe, start_date, end_date)[source]

Fetches historical data for a specific date range. Automatically checks boundaries for maximum output limits per call.

Return type:

LazyFrame

get_recent_data(symbol, timeframe, interval_window)[source]

Fetches recent data relative to the current time minus the interval_window. Example: Pass timedelta(days=90) to get the most recent rolling 3 months.

Return type:

LazyFrame

class forex_data.DukascopyConnector(**kwargs)[source]

Bases: RemoteConnector

Connector class that encapsulates all HTTP interactions with Dukascopy’s historical datafeed.

Wraps connectivity checks, ticker registry, and tick data downloading via tick_vault library behind a single RemoteConnector-derived interface.

ssl_verify: bool
__init__(**kwargs)[source]
connect()[source]

Configure session and tick_vault base directory.

Return type:

None

check_connection()[source]

Test connectivity to dukascopy.com.

Return type:

bool

get_available_tickers()[source]

Get list of available tickers from tick_vault’s registry and Dukascopy’s tools page.

Return type:

List[str]

Returns

List[str]

Sorted, deduplicated list of ticker symbols (e.g. [‘BTCUSD’, ‘EURUSD’, …]).

download_month_raw(ticker, year, month_num, temp_filepath='', engine='polars_lazy')[source]

Downloads tick data for a specific year and month from Dukascopy using tick_vault.

Return type:

Union[DataFrame, LazyFrame]

Parameters

ticker: str

The currency pair (e.g., ‘EURUSD’).

year: int

Year of the data to download.

month_num: int

Month of the data (1-12).

engine: str

Either ‘polars’ or ‘polars_lazy’.

Returns

Union[PolarsDataFrame, PolarsLazyFrame]

Polars DataFrame or LazyFrame with column names matching HistDataConnector.

get_recent_data(symbol, timeframe, interval_window, engine='polars_lazy')[source]

Fetches recent data relative to the current time minus the interval_window.

Return type:

Union[DataFrame, LazyFrame]

Parameters

symbol: str

The currency pair (e.g. ‘EURUSD’).

timeframe: str

The target timeframe (e.g. ‘TICK’, ‘1m’, ‘5m’).

interval_window: timedelta

The duration of data to fetch (e.g., timedelta(days=90)).

engine: str

Either ‘polars’ or ‘polars_lazy’.

Returns

Union[PolarsDataFrame, PolarsLazyFrame]

Polars DataFrame or LazyFrame containing recent data.

forex_data.concat_data(data_list=_CountingAttr(counter=27, _default=NOTHING, repr=True, eq=True, order=True, hash=None, init=True, on_setattr=None, alias=None, metadata={}))[source]
forex_data.validator_dir_path(create_if_missing=False)[source]
exception forex_data.TickerNotFoundError[source]

Bases: Exception

exception forex_data.TickerDataNotFoundError[source]

Bases: Exception

exception forex_data.TickerDataBadTypeException[source]

Bases: Exception

exception forex_data.TickerDataInvalidException[source]

Bases: Exception

forex_data.get_histdata_tickers(verify=True)[source]

Get all available tickers from HistData.com.

Return type:

List[str]

Parameters

verifybool, optional

Whether to verify SSL certificates. Default is True.

Returns

List[str]

List of all available tickers (e.g., [‘EURUSD’, ‘GBPUSD’, …]).

class forex_data.POLARS_DTYPE_DICT[source]

Bases: object

TICK_DTYPE = {'ask': Float32, 'bid': Float32, 'p': Float32, 'vol': Float32}
TF_DTYPE = {'close': Float32, 'high': Float32, 'low': Float32, 'open': Float32}
TIME_TICK_DTYPE = {'ask': Float32, 'bid': Float32, 'p': Float32, 'timestamp': Datetime(time_unit='ms', time_zone=None), 'vol': Float32}
TIME_TF_DTYPE = {'close': Float32, 'high': Float32, 'low': Float32, 'open': Float32, 'timestamp': Datetime(time_unit='ms', time_zone=None)}
forex_data.business_days_data(dataframe)[source]

Remove non-business days data from the input dataframe. Filter out weekends data: saturday and sunday. Use holidays to get list of country holidays. Consider dataframe always have a column named ‘timestamp’ of type datetime.

Return type:

DataFrame | LazyFrame

forex_data.random_date_between(start_date, end_date)[source]

Get a random datetime between two datetime objects.

Parameters:
  • start_date (datetime) – The start date.

  • end_date (datetime) – The end date.

Returns:

A random datetime between start_date and end_date.

Return type:

datetime